class GrpcKit::Stream::ClientStream

Constants

Status

Public Class Methods

new(transport, config, authority:, timeout: nil) click to toggle source

@param transport [GrpcKit::Transport::ClientTransport] @param config [GrpcKit::MethodConfig] @param authority [String]

# File lib/grpc_kit/stream/client_stream.rb, line 11
def initialize(transport, config, authority:, timeout: nil)
  @transport = transport
  @config = config

  @authority = authority
  @timeout = timeout
  @deadline = timeout&.to_absolute_time

  @started = false
end

Public Instance Methods

close_and_recv() click to toggle source

@return [Object]

# File lib/grpc_kit/stream/client_stream.rb, line 79
def close_and_recv
  validate_if_request_start!

  @transport.close_and_flush

  ret = do_recv(last: true)

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  ret
end
close_and_send() click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 67
def close_and_send
  validate_if_request_start!

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  # send?
  @transport.close_and_flush
end
recv_msg(last: false, blocking: true) click to toggle source

This method is not thread safe, never call from multiple threads at once. @raise [StopIteration] when recving message finished @param last [Boolean] @param blocking [Boolean] @return [Object]

# File lib/grpc_kit/stream/client_stream.rb, line 55
def recv_msg(last: false, blocking: true)
  validate_if_request_start!

  ret = do_recv(last: last, blocking: blocking)

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  ret
end
send_msg(data, metadata: {}, last: false) click to toggle source

@param data [Object] @param metadata [Hash<String,String>] @param last [Boolean] @return [void]

# File lib/grpc_kit/stream/client_stream.rb, line 26
def send_msg(data, metadata: {}, last: false)
  buf =
    begin
      @config.codec.encode(data)
    rescue ArgumentError => e
      raise GrpcKit::Errors::Internal, "Error while encoding in client: #{e}"
    end

  limit_size = @config.max_send_message_size
  if limit_size && buf.bytesize > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}"
  end

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  if @started
    @transport.write_data(buf, last: last)
  else
    start_request(buf, metadata: metadata, last: last)
  end
end

Private Instance Methods

check_status!() click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 145
def check_status!
  if status.code != GrpcKit::StatusCodes::OK
    raise GrpcKit::Errors.from_status_code(status.code, status.msg)
  else
    GrpcKit.logger.debug('request is success')
  end
end
do_recv(last: false, blocking: true) click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 101
def do_recv(last: false, blocking: true)
  data =
    if blocking
      @transport.read_data(last: last)
    else
      v = @transport.read_data_nonblock(last: last)
      if v == :wait_readable
        return v
      end

      v
    end

  if data.nil?
    check_status!
    raise StopIteration
  elsif last
    check_status!
  end

  compressed, size, buf = *data

  unless size == buf.size
    raise "inconsistent data: #{buf}"
  end

  limit_size = @config.max_receive_message_size
  if limit_size && size > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}"
  end

  if compressed
    raise 'compress option is unsupported'
  end

  raise StopIteration if buf.nil?

  begin
    @config.codec.decode(buf)
  rescue ArgumentError, TypeError => e
    raise GrpcKit::Errors::Internal, "Error while decoding in Client: #{e}"
  end
end
start_request(buf = nil, last: nil, metadata: {}, timeout: @timeout, authority: @authority) click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 163
def start_request(buf = nil, last: nil, metadata: {}, timeout: @timeout, authority: @authority)
  hdrs = {
    ':method' => 'POST',
    ':scheme' => 'http',
    ':path' => @config.path,
    ':authority' => authority,
    'grpc-timeout' => timeout&.to_s,
    'te' => 'trailers',
    'content-type' => 'application/grpc',
    'user-agent' => "grpc-ruby/#{GrpcKit::VERSION} (grpc_kit)",
    'grpc-accept-encoding' => 'identity,deflate,gzip',
  }

  metadata.each do |k, v|
    if k.start_with?('grpc-')
      # https://github.com/grpc/grpc/blob/ffac9d90b18cb076b1c952faa55ce4e049cbc9a6/doc/PROTOCOL-HTTP2.md
      GrpcKit.logger.info("metadata name wich starts with 'grpc-' is reserved for future GRPC metadata")
    else
      hdrs[k] = v
    end
  end

  @transport.start_request(buf, hdrs.compact, last: last)
  @started = true
end
status() click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 155
def status
  @status ||=
    begin
      headers = @transport.recv_headers
      Status.new(headers.grpc_status, headers.status_message, headers.metadata)
    end
end
validate_if_request_start!() click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 95
def validate_if_request_start!
  unless @started
    raise 'You should call `send_msg` method to send data'
  end
end