class GrpcKit::Stream::ServerStream

Public Class Methods

new(transport) click to toggle source

@param transport [GrpcKit::transports::ServerTransport]

# File lib/grpc_kit/stream/server_stream.rb, line 9
def initialize(transport)
  @transport = transport
  @started = false
end

Public Instance Methods

invoke(rpc) click to toggle source

@return [void]

# File lib/grpc_kit/stream/server_stream.rb, line 15
def invoke(rpc)
  rpc.invoke(self, metadata: @transport.recv_headers.metadata)
rescue GrpcKit::Errors::BadStatus => e
  GrpcKit.logger.warn(e)
  send_status(status: e.code, msg: e.reason, metadata: {}) # TODO: metadata should be set
rescue StandardError => e
  GrpcKit.logger.warn(e)
  send_status(status: GrpcKit::StatusCodes::UNKNOWN, msg: e.message, metadata: {})
end
recv_msg(codec, last: false, limit_size: nil) click to toggle source

This method is not thread safe, never call from multiple threads at once. @raise [StopIteration] when recving message finished @param codec [GrpcKit::Codec] @param last [Boolean] @param limit_size [Integer] @return [Object]

# File lib/grpc_kit/stream/server_stream.rb, line 59
def recv_msg(codec, last: false, limit_size: nil)
  data = @transport.read_data(last: last)

  raise StopIteration if data.nil?

  compressed, size, buf = *data

  unless size == buf.size
    raise "inconsistent data: #{buf}"
  end

  if limit_size && size > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}"
  end

  if compressed
    raise 'compress option is unsupported'
  end

  begin
    codec.decode(buf)
  rescue ArgumentError => e
    raise GrpcKit::Errors::Internal, "Error while decoding in server: #{e}"
  end
end
send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) click to toggle source

@param data [Object] @param codec [GrpcKit::Codec] @param last [Boolean] @param limit_size [Integer] @param initial_metadata [Hash<String,String>] @param trailing_metadata [Hash<String,String>] @return [void]

# File lib/grpc_kit/stream/server_stream.rb, line 32
def send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {})
  buf =
    begin
      codec.encode(data)
    rescue ArgumentError, TypeError => e
      raise GrpcKit::Errors::Internal, "Error while encoding in server: #{e}"
    end

  if limit_size && buf.bytesize > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}"
  end

  if last
    send_status(data: buf, metadata: trailing_metadata)
  elsif @started
    @transport.write_data(buf)
  else
    start_response(buf, metadata: initial_metadata)
  end
end
send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) click to toggle source

@param status [GrpcKit::StatusCodes::BadStatus, GrpcKit::StatusCodes::OK] @param msg [String,nil] @param metadata [Hash<String,String>] @return [void]

# File lib/grpc_kit/stream/server_stream.rb, line 89
def send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {})
  t = build_trailers(status, msg, metadata)
  @transport.write_data(data, last: true) if data

  if @started # Complete stream
    @transport.write_trailers(t)
    @transport.end_write

  elsif data # Complete stream with a data
    @transport.write_trailers(t)
    @transport.end_write

    start_response # will send queued data and trailer.

  else # return status (likely non-200) and immediately complete stream.
    @transport.end_write
    send_headers(trailers: t)
  end
end

Private Instance Methods

build_trailers(status, msg, metadata) click to toggle source
# File lib/grpc_kit/stream/server_stream.rb, line 128
def build_trailers(status, msg, metadata)
  trailers = { 'grpc-status' => status.to_s }
  if msg
    trailers['grpc-message'] = msg
  end

  trailers.merge(metadata)
end
send_headers(trailers: {}) click to toggle source
# File lib/grpc_kit/stream/server_stream.rb, line 111
def send_headers(trailers: {})
  h = { ':status' => '200', 'content-type' => 'application/grpc' }
  h['accept-encoding'] = 'identity'

  @transport.submit_headers(h.merge(trailers))
  @started = true
end
start_response(data = nil, metadata: {}) click to toggle source
# File lib/grpc_kit/stream/server_stream.rb, line 119
def start_response(data = nil, metadata: {})
  h = { ':status' => '200', 'content-type' => 'application/grpc' }
  h['accept-encoding'] = 'identity'

  @transport.write_data(data) if data
  @transport.start_response(h.merge(metadata))
  @started = true
end