class GrpcKit::Session::ClientSession

Constants

MAX_STREAM_ID

Public Class Methods

new(io, **opts) click to toggle source

@param io [GrpcKit::Session::IO] @param opts [Hash]

Calls superclass method
# File lib/grpc_kit/session/client_session.rb, line 21
def initialize(io, **opts)
  super() # initialize DS9::Session

  @io = io
  @streams = {}
  @opts = opts
  @draining = false
  @stop = false
  @no_write_data = false
  @mutex = Mutex.new
end

Public Instance Methods

run_once() click to toggle source

@return [void]

# File lib/grpc_kit/session/client_session.rb, line 67
def run_once
  @mutex.synchronize do
    return if @stop

    if @draining && @drain_time < Time.now
      raise 'trasport is closing'
    end

    if @no_write_data && !@streams.empty?
      @io.wait_readable

      if want_read?
        do_read
      end
    else
      rs, ws = @io.select
      if !rs.empty? && want_read?
        do_read
      end

      if !ws.empty? && want_write?
        send
      end
    end
  end
end
send_request(headers) click to toggle source

@param headers [Hash<String,String>] @return [void]

# File lib/grpc_kit/session/client_session.rb, line 35
def send_request(headers)
  if @draining
    raise ConnectionClosing, "You can't send new request. becuase this connection will shuting down"
  end

  stream = GrpcKit::Session::Stream.new(stream_id: 0) # set later
  stream_id = submit_request(headers, stream.pending_send_data).to_i
  stream.stream_id = stream_id
  @streams[stream_id] = stream
  @no_write_data = false
  stream
end
start(stream_id) click to toggle source

@param stream_id [Integer] @return [void]

# File lib/grpc_kit/session/client_session.rb, line 50
def start(stream_id)
  stream = @streams[stream_id]
  return unless stream # stream might have already close

  loop do
    if (!want_read? && !want_write?) || stream.close?
      break
    end

    run_once
  end
rescue Errno::ECONNRESET, IOError => e
  GrpcKit.logger.debug(e.message)
  shutdown
end

Private Instance Methods

do_read() click to toggle source
# File lib/grpc_kit/session/client_session.rb, line 101
def do_read
  receive
rescue IOError => e
  shutdown
  raise e
rescue DS9::Exception => e
  GrpcKit.logger.debug(e.message)
  if DS9::ERR_EOF == e.code
    raise EOFError, e
  end

  raise e
end
handle_goaway(frame) click to toggle source

# for nghttp2_session_callbacks_set_on_invalid_frame_recv_callback def on_invalid_frame_recv(frame, error_code) end

# File lib/grpc_kit/session/client_session.rb, line 200
def handle_goaway(frame)
  # shutdown notice
  last_stream_id = frame.last_stream_id
  if last_stream_id == MAX_STREAM_ID && frame.error_code == DS9::NO_ERROR
    @draining = true
    @drain_time = Time.now + 10 # XXX
    @streams.each_value(&:drain)
  end

  @streams.each do |id, stream|
    if id > last_stream_id
      stream.close
    end
  end

  shutdown if @streams.empty?
end
on_data_chunk_recv(stream_id, data, _flags) click to toggle source

nghttp2_session_callbacks_set_on_data_chunk_recv_callback

# File lib/grpc_kit/session/client_session.rb, line 170
def on_data_chunk_recv(stream_id, data, _flags)
  stream = @streams[stream_id]
  if stream
    stream.pending_recv_data.write(data)
  end
end
on_frame_recv(frame) click to toggle source

nghttp2_session_callbacks_set_on_frame_send_callback

# File lib/grpc_kit/session/client_session.rb, line 116
def on_frame_recv(frame)
  GrpcKit.logger.debug("on_frame_recv #{frame}")
  case frame
  when DS9::Frames::Data
    stream = @streams[frame.stream_id]

    if frame.end_stream?
      stream.close_remote
    end

    unless stream.inflight
      stream.inflight = true
    end

  when DS9::Frames::Headers
    stream = @streams[frame.stream_id]

    if frame.end_stream?
      stream.close_remote
    end
  when DS9::Frames::Goaway
    handle_goaway(frame)
  end

  true
end
on_frame_send(frame) click to toggle source

nghttp2_session_callbacks_set_on_frame_send_callback

# File lib/grpc_kit/session/client_session.rb, line 144
def on_frame_send(frame)
  GrpcKit.logger.debug("on_frame_send #{frame}")
  case frame
  when DS9::Frames::Data, DS9::Frames::Headers
    stream = @streams[frame.stream_id]
    if frame.end_stream?
      stream.close_local
      @no_write_data = @streams.all? { |_, v| v.close_local? }
    end
  end

  true
end
on_header(name, value, frame, _flags) click to toggle source

for nghttp2_session_callbacks_set_on_header_callback

# File lib/grpc_kit/session/client_session.rb, line 182
def on_header(name, value, frame, _flags)
  GrpcKit.logger.debug("#{name} => #{value}")
  stream = @streams[frame.stream_id]
  stream.add_header(name, value)
end
on_stream_close(stream_id, error_code) click to toggle source

nghttp2_session_callbacks_set_on_stream_close_callback

# File lib/grpc_kit/session/client_session.rb, line 159
def on_stream_close(stream_id, error_code)
  GrpcKit.logger.debug("on_stream_close stream_id=#{stream_id}, error_code=#{error_code}")
  stream = @streams.delete(stream_id)
  unless stream
    GrpcKit.logger.warn("on_stream_close stream_id=#{stream_id} not remain on ClientSession")
    return
  end
  stream.close
end
shutdown() click to toggle source
# File lib/grpc_kit/session/client_session.rb, line 96
def shutdown
  @stop = true
  @io.close
end