class GrpcKit::Calls::Client::BidiStreamer

Public Class Methods

new(**) click to toggle source
Calls superclass method GrpcKit::Call::new
# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 13
def initialize(**)
  super
  @recv_mutex = Mutex.new

  @send = false
  @send_cv = Thread::ConditionVariable.new
  @send_mutex = Mutex.new
end

Public Instance Methods

close_and_send() click to toggle source
# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 50
def close_and_send
  @send_mutex.synchronize do
    @stream.close_and_send
  end
end
each() { |recv| ... } click to toggle source

@yieldparam response [Object] each response object of bidi streaming RPC

# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 57
def each
  @recv_mutex.synchronize do
    loop { yield(recv) }
  end
end
recv() click to toggle source

Receive a message from peer. This method is not thread safe, never call from multiple threads at once. @return [Object] response object @raise [StopIteration]

# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 39
def recv
  @send_mutex.synchronize { @send_cv.wait(@send_mutex) until @send } unless @send

  msg = @stream.recv_msg(blocking: true)
  return msg if msg
  raise StopIteration
rescue GrpcKit::Errors::BadStatus => e
  @reason = e
  raise e
end
send_msg(data) click to toggle source

@param data [Object] request message @return [void]

# File lib/grpc_kit/calls/client_bidi_streamer.rb, line 24
def send_msg(data)
  if @reason
    raise "Upstream returns an error status: #{@reason}"
  end

  @send_mutex.synchronize do
    @stream.send_msg(data, metadata: outgoing_metadata)
    @send = true
    @send_cv.broadcast
  end
end