class GrpcKit::Session::ServerSession

Public Class Methods

new(io, dispatcher) click to toggle source

@param io [GrpcKit::Session::IO] @param pool [GrpcKit::RcpDispatcher]

Calls superclass method
# File lib/grpc_kit/session/server_session.rb, line 18
def initialize(io, dispatcher)
  opt = DS9::Option.new.tap do |o|
    # https://github.com/nghttp2/nghttp2/issues/810
    # grpc_kit doesn't need to retain closed stream.
    # This would derease the memory usage.
    o.set_no_closed_streams
  end
  super(option: opt) # initialize DS9::Session

  @io = io
  @streams = {}
  @stop = false
  @inflights = []
  @drain_controller = GrpcKit::Session::DrainController.new
  @control_queue = GrpcKit::Session::ControlQueue.new(waker: @io.method(:wake!))
  @dispatcher = dispatcher
end

Public Instance Methods

drain() click to toggle source

@return [void]

# File lib/grpc_kit/session/server_session.rb, line 85
def drain
  @drain_controller.start_draining
end
run_once() click to toggle source

@return [bool] return session can continue

# File lib/grpc_kit/session/server_session.rb, line 56
def run_once
  if @stop || !(want_read? || want_write?)
    # it could be called twice
    @streams.each_value(&:close)
    return false
  end

  if @drain_controller.start_draining?
    @drain_controller.next(self)
  end

  rs, ws = @io.select(timeout: 5, write: want_write?)

  if !rs.empty? && want_read?
    do_read
  end

  if !ws.empty? && want_write?
    send
  end

  true
rescue Errno::ECONNRESET, IOError => e
  GrpcKit.logger.error(e.message)
  shutdown
  false
end
shutdown() click to toggle source

@return [void]

# File lib/grpc_kit/session/server_session.rb, line 90
def shutdown
  stop
  @io.close
rescue StandardError => e
  GrpcKit.logger.error(e)
end
start() click to toggle source

@return [void]

# File lib/grpc_kit/session/server_session.rb, line 37
def start
  loop do
    invoke

    if @streams.empty?
      unless @io.wait_readable
        shutdown
        break
      end
    end

    continue = run_once
    break unless continue
  end
ensure
  GrpcKit.logger.debug('Finish server session')
end

Private Instance Methods

do_read() click to toggle source
# File lib/grpc_kit/session/server_session.rb, line 128
def do_read
  receive
rescue DS9::Exception => e
  shutdown

  case e.code
  when DS9::ERR_EOF
    GrpcKit.logger.debug('The peer performed a shutdown on the connection')
  when DS9::ERR_BAD_CLIENT_MAGIC
    GrpcKit.logger.error('Invalid client magic was received')
  else
    raise "#{e.message}. code=#{e.code}"
  end
end
invoke() click to toggle source
# File lib/grpc_kit/session/server_session.rb, line 103
def invoke
  while (event = @control_queue.pop)
    case event[0]
    when :submit_response
      stream = @streams[event[1]]
      # piggybacked previous invokeing #submit_response?
      unless stream && !stream.pending_send_data.empty?
        next
      end

      submit_response(event[1], event[2])
    when :submit_headers
      submit_headers(event[1], event[2])
    when :resume_data
      stream = @streams[event[1]]
      unless stream && stream.pending_send_data.need_resume?
        next
      end

      resume_data(event[1])
      stream.pending_send_data.no_resume
    end
  end
end
on_begin_headers(header) click to toggle source

nghttp2_session_callbacks_set_on_begin_headers_callback

# File lib/grpc_kit/session/server_session.rb, line 224
def on_begin_headers(header)
  stream_id = header.stream_id
  GrpcKit.logger.debug("on_begin_header stream_id=#{stream_id}")

  if @streams[stream_id]
    raise "#{stream_id} is already existed"
  end

  @streams[stream_id] = GrpcKit::Session::Stream.new(stream_id: stream_id)
end
on_data_chunk_recv(stream_id, data, _flags) click to toggle source

nghttp2_session_callbacks_set_on_data_chunk_recv_callback

# File lib/grpc_kit/session/server_session.rb, line 161
def on_data_chunk_recv(stream_id, data, _flags)
  stream = @streams[stream_id]
  if stream
    stream.pending_recv_data.write(data)
  end
end
on_data_source_read(stream_id, length) click to toggle source

`provider` for nghttp2_submit_response

# File lib/grpc_kit/session/server_session.rb, line 144
def on_data_source_read(stream_id, length)
  GrpcKit.logger.debug("on_data_source_read #{stream_id}, lenght=#{length}")

  stream = @streams[stream_id]
  data = @streams[stream_id].pending_send_data.read(length)
  if data.nil?
    submit_trailer(stream_id, stream.trailer_data)
    @io.wake!(:submit_trailer)

    # trailer header
    false
  else
    data
  end
end
on_frame_not_send(frame, reason) click to toggle source

nghttp2_session_callbacks_set_on_frame_not_send_callback

# File lib/grpc_kit/session/server_session.rb, line 218
def on_frame_not_send(frame, reason)
  GrpcKit.logger.debug("on_frame_not_send frame=#{frame}, reason=#{reason}")
  true
end
on_frame_recv(frame) click to toggle source

nghttp2_session_callbacks_set_on_frame_recv_callback Note: called after ServerSession#on_data_chunk_recv

# File lib/grpc_kit/session/server_session.rb, line 170
def on_frame_recv(frame)
  GrpcKit.logger.debug("on_frame_recv #{frame}") # Too many call

  case frame
  when DS9::Frames::Data
    stream = @streams[frame.stream_id]

    if frame.end_stream?
      stream.close_remote
    end

    unless stream.inflight
      stream.inflight = true
      @dispatcher.schedule([stream, @control_queue])
    end

  when DS9::Frames::Headers
    if frame.end_stream?
      stream = @streams[frame.stream_id]
      stream.close_remote
    end
  when DS9::Frames::Ping
    if frame.ping_ack?
      GrpcKit.logger.debug('ping ack is received')
      @drain_controller.recv_ping_ack
    end
    # when DS9::Frames::Goaway
    # when DS9::Frames::RstStream
  end

  true
end
on_frame_send(frame) click to toggle source

nghttp2_session_callbacks_set_on_frame_send_callback

# File lib/grpc_kit/session/server_session.rb, line 204
def on_frame_send(frame)
  GrpcKit.logger.debug("on_frame_send #{frame}") # Too many call
  case frame
  when DS9::Frames::Data, DS9::Frames::Headers
    if frame.end_stream?
      stream = @streams[frame.stream_id]
      stream.close_local
    end
  end

  true
end
on_header(name, value, frame, _flags) click to toggle source

nghttp2_session_callbacks_set_on_header_callback

# File lib/grpc_kit/session/server_session.rb, line 236
def on_header(name, value, frame, _flags)
  GrpcKit.logger.debug("#{name} => #{value}") # Too many call
  stream = @streams[frame.stream_id]
  stream.add_header(name, value)
end
on_invalid_frame_recv(frame, error_code) click to toggle source
# File lib/grpc_kit/session/server_session.rb, line 242
def on_invalid_frame_recv(frame, error_code)
  GrpcKit.logger.debug("on_invalid_frame_recv #{frame} error_code=#{error_code}")
  true
end
on_stream_close(stream_id, error_code) click to toggle source

nghttp2_session_callbacks_set_on_stream_close_callback

# File lib/grpc_kit/session/server_session.rb, line 248
def on_stream_close(stream_id, error_code)
  if error_code != DS9::NO_ERROR
    GrpcKit.logger.debug("on_stream_close stream_id=#{stream_id}, error_code=#{error_code}")
  end

  stream = @streams.delete(stream_id)
  stream.close if stream
end
stop() click to toggle source
# File lib/grpc_kit/session/server_session.rb, line 99
def stop
  @stop = true
end