class GrpcKit::Stream::ClientStream
Constants
- Status
Public Class Methods
new(transport, config, authority:, timeout: nil)
click to toggle source
@param transport [GrpcKit::Transport::ClientTransport] @param config [GrpcKit::MethodConfig] @param authority [String]
# File lib/grpc_kit/stream/client_stream.rb, line 11 def initialize(transport, config, authority:, timeout: nil) @transport = transport @config = config @authority = authority @timeout = timeout @deadline = timeout&.to_absolute_time @started = false end
Public Instance Methods
close_and_recv()
click to toggle source
@return [Object]
# File lib/grpc_kit/stream/client_stream.rb, line 79 def close_and_recv validate_if_request_start! @transport.close_and_flush ret = do_recv(last: true) if @deadline && Time.now > @deadline raise GrpcKit::Errors::DeadlineExceeded, @deadline end ret end
close_and_send()
click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 67 def close_and_send validate_if_request_start! if @deadline && Time.now > @deadline raise GrpcKit::Errors::DeadlineExceeded, @deadline end # send? @transport.close_and_flush end
recv_msg(last: false, blocking: true)
click to toggle source
This method is not thread safe, never call from multiple threads at once. @raise [StopIteration] when recving message finished @param last [Boolean] @param blocking [Boolean] @return [Object]
# File lib/grpc_kit/stream/client_stream.rb, line 55 def recv_msg(last: false, blocking: true) validate_if_request_start! ret = do_recv(last: last, blocking: blocking) if @deadline && Time.now > @deadline raise GrpcKit::Errors::DeadlineExceeded, @deadline end ret end
send_msg(data, metadata: {}, last: false)
click to toggle source
@param data [Object] @param metadata [Hash<String,String>] @param last [Boolean] @return [void]
# File lib/grpc_kit/stream/client_stream.rb, line 26 def send_msg(data, metadata: {}, last: false) buf = begin @config.codec.encode(data) rescue ArgumentError => e raise GrpcKit::Errors::Internal, "Error while encoding in client: #{e}" end limit_size = @config.max_send_message_size if limit_size && buf.bytesize > limit_size raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}" end if @deadline && Time.now > @deadline raise GrpcKit::Errors::DeadlineExceeded, @deadline end if @started @transport.write_data(buf, last: last) else start_request(buf, metadata: metadata, last: last) end end
Private Instance Methods
check_status!()
click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 145 def check_status! if status.code != GrpcKit::StatusCodes::OK raise GrpcKit::Errors.from_status_code(status.code, status.msg) else GrpcKit.logger.debug('request is success') end end
do_recv(last: false, blocking: true)
click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 101 def do_recv(last: false, blocking: true) data = if blocking @transport.read_data(last: last) else v = @transport.read_data_nonblock(last: last) if v == :wait_readable return v end v end if data.nil? check_status! raise StopIteration elsif last check_status! end compressed, size, buf = *data unless size == buf.size raise "inconsistent data: #{buf}" end limit_size = @config.max_receive_message_size if limit_size && size > limit_size raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}" end if compressed raise 'compress option is unsupported' end raise StopIteration if buf.nil? begin @config.codec.decode(buf) rescue ArgumentError, TypeError => e raise GrpcKit::Errors::Internal, "Error while decoding in Client: #{e}" end end
start_request(buf = nil, last: nil, metadata: {}, timeout: @timeout, authority: @authority)
click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 163 def start_request(buf = nil, last: nil, metadata: {}, timeout: @timeout, authority: @authority) hdrs = { ':method' => 'POST', ':scheme' => 'http', ':path' => @config.path, ':authority' => authority, 'grpc-timeout' => timeout&.to_s, 'te' => 'trailers', 'content-type' => 'application/grpc', 'user-agent' => "grpc-ruby/#{GrpcKit::VERSION} (grpc_kit)", 'grpc-accept-encoding' => 'identity,deflate,gzip', } metadata.each do |k, v| if k.start_with?('grpc-') # https://github.com/grpc/grpc/blob/ffac9d90b18cb076b1c952faa55ce4e049cbc9a6/doc/PROTOCOL-HTTP2.md GrpcKit.logger.info("metadata name wich starts with 'grpc-' is reserved for future GRPC metadata") else hdrs[k] = v end end @transport.start_request(buf, hdrs.compact, last: last) @started = true end
status()
click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 155 def status @status ||= begin headers = @transport.recv_headers Status.new(headers.grpc_status, headers.status_message, headers.metadata) end end
validate_if_request_start!()
click to toggle source
# File lib/grpc_kit/stream/client_stream.rb, line 95 def validate_if_request_start! unless @started raise 'You should call `send_msg` method to send data' end end