class GrpcKit::Calls::Client::BidiStreamer
Public Class Methods
new(**)
click to toggle source
Calls superclass method
GrpcKit::Call::new
# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 13 def initialize(**) super @recv_mutex = Mutex.new @send = false @send_cv = Thread::ConditionVariable.new @send_mutex = Mutex.new end
Public Instance Methods
close_and_send()
click to toggle source
# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 50 def close_and_send @send_mutex.synchronize do @stream.close_and_send end end
each() { |recv| ... }
click to toggle source
@yieldparam response [Object] each response object of bidi streaming RPC
# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 57 def each @recv_mutex.synchronize do loop { yield(recv) } end end
recv()
click to toggle source
Receive a message from peer. This method is not thread safe, never call from multiple threads at once. @return [Object] response object @raise [StopIteration]
# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 39 def recv @send_mutex.synchronize { @send_cv.wait(@send_mutex) until @send } unless @send msg = @stream.recv_msg(blocking: true) return msg if msg raise StopIteration rescue GrpcKit::Errors::BadStatus => e @reason = e raise e end
send_msg(data)
click to toggle source
@param data [Object] request message @return [void]
# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 24 def send_msg(data) if @reason raise "Upstream returns an error status: #{@reason}" end @send_mutex.synchronize do @stream.send_msg(data, metadata: outgoing_metadata) @send = true @send_cv.broadcast end end