class GrpcKit::Session::ClientSession
Constants
- MAX_STREAM_ID
Public Class Methods
new(io, **opts)
click to toggle source
@param io [GrpcKit::Session::IO] @param opts [Hash]
Calls superclass method
# File lib/grpc_kit/session/client_session.rb, line 21 def initialize(io, **opts) super() # initialize DS9::Session @io = io @streams = {} @opts = opts @draining = false @stop = false @no_write_data = false @mutex = Mutex.new end
Public Instance Methods
run_once()
click to toggle source
@return [void]
# File lib/grpc_kit/session/client_session.rb, line 67 def run_once @mutex.synchronize do return if @stop if @draining && @drain_time < Time.now raise 'trasport is closing' end if @no_write_data && !@streams.empty? @io.wait_readable if want_read? do_read end else rs, ws = @io.select if !rs.empty? && want_read? do_read end if !ws.empty? && want_write? send end end end end
send_request(headers)
click to toggle source
@param headers [Hash<String,String>] @return [void]
# File lib/grpc_kit/session/client_session.rb, line 35 def send_request(headers) if @draining raise ConnectionClosing, "You can't send new request. becuase this connection will shuting down" end stream = GrpcKit::Session::Stream.new(stream_id: 0) # set later stream_id = submit_request(headers, stream.pending_send_data).to_i stream.stream_id = stream_id @streams[stream_id] = stream @no_write_data = false stream end
start(stream_id)
click to toggle source
@param stream_id [Integer] @return [void]
# File lib/grpc_kit/session/client_session.rb, line 50 def start(stream_id) stream = @streams[stream_id] return unless stream # stream might have already close loop do if (!want_read? && !want_write?) || stream.close? break end run_once end rescue Errno::ECONNRESET, IOError => e GrpcKit.logger.debug(e.message) shutdown end
Private Instance Methods
do_read()
click to toggle source
# File lib/grpc_kit/session/client_session.rb, line 101 def do_read receive rescue IOError => e shutdown raise e rescue DS9::Exception => e GrpcKit.logger.debug(e.message) if DS9::ERR_EOF == e.code raise EOFError, e end raise e end
handle_goaway(frame)
click to toggle source
# for nghttp2_session_callbacks_set_on_invalid_frame_recv_callback def on_invalid_frame_recv(frame, error_code) end
# File lib/grpc_kit/session/client_session.rb, line 200 def handle_goaway(frame) # shutdown notice last_stream_id = frame.last_stream_id if last_stream_id == MAX_STREAM_ID && frame.error_code == DS9::NO_ERROR @draining = true @drain_time = Time.now + 10 # XXX @streams.each_value(&:drain) end @streams.each do |id, stream| if id > last_stream_id stream.close end end shutdown if @streams.empty? end
on_data_chunk_recv(stream_id, data, _flags)
click to toggle source
nghttp2_session_callbacks_set_on_data_chunk_recv_callback
# File lib/grpc_kit/session/client_session.rb, line 170 def on_data_chunk_recv(stream_id, data, _flags) stream = @streams[stream_id] if stream stream.pending_recv_data.write(data) end end
on_frame_recv(frame)
click to toggle source
nghttp2_session_callbacks_set_on_frame_send_callback
# File lib/grpc_kit/session/client_session.rb, line 116 def on_frame_recv(frame) GrpcKit.logger.debug("on_frame_recv #{frame}") case frame when DS9::Frames::Data stream = @streams[frame.stream_id] if frame.end_stream? stream.close_remote end unless stream.inflight stream.inflight = true end when DS9::Frames::Headers stream = @streams[frame.stream_id] if frame.end_stream? stream.close_remote end when DS9::Frames::Goaway handle_goaway(frame) end true end
on_frame_send(frame)
click to toggle source
nghttp2_session_callbacks_set_on_frame_send_callback
# File lib/grpc_kit/session/client_session.rb, line 144 def on_frame_send(frame) GrpcKit.logger.debug("on_frame_send #{frame}") case frame when DS9::Frames::Data, DS9::Frames::Headers stream = @streams[frame.stream_id] if frame.end_stream? stream.close_local @no_write_data = @streams.all? { |_, v| v.close_local? } end end true end
on_header(name, value, frame, _flags)
click to toggle source
for nghttp2_session_callbacks_set_on_header_callback
# File lib/grpc_kit/session/client_session.rb, line 182 def on_header(name, value, frame, _flags) GrpcKit.logger.debug("#{name} => #{value}") stream = @streams[frame.stream_id] stream.add_header(name, value) end
on_stream_close(stream_id, error_code)
click to toggle source
nghttp2_session_callbacks_set_on_stream_close_callback
# File lib/grpc_kit/session/client_session.rb, line 159 def on_stream_close(stream_id, error_code) GrpcKit.logger.debug("on_stream_close stream_id=#{stream_id}, error_code=#{error_code}") stream = @streams.delete(stream_id) unless stream GrpcKit.logger.warn("on_stream_close stream_id=#{stream_id} not remain on ClientSession") return end stream.close end
shutdown()
click to toggle source
# File lib/grpc_kit/session/client_session.rb, line 96 def shutdown @stop = true @io.close end