class GRPC::BidiCall

The BiDiCall class orchestrates execution of a BiDi stream on a client or server.

Constants

END_OF_READS
END_OF_WRITES

Public Class Methods

new(call, marshal, unmarshal, metadata_received: false, req_view: nil) click to toggle source

Creates a BidiCall.

BidiCall should only be created after a call is accepted. That means different things on a client and a server. On the client, the call is accepted after call.invoke. On the server, this is after call.accept.

initialize cannot determine if the call is accepted or not; so if a call that's not accepted is used here, the error won't be visible until the BidiCall#run is called.

deadline is the absolute deadline for the call.

@param call [Call] the call used by the ActiveCall @param marshal [Function] f(obj)->string that marshal requests @param unmarshal [Function] f(string)->obj that unmarshals responses @param metadata_received [true|false] indicates if metadata has already

been received. Should always be true for server calls
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 44
def initialize(call, marshal, unmarshal, metadata_received: false,
               req_view: nil)
  fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
  @call = call
  @marshal = marshal
  @op_notifier = nil  # signals completion on clients
  @unmarshal = unmarshal
  @metadata_received = metadata_received
  @reads_complete = false
  @writes_complete = false
  @complete = false
  @done_mutex = Mutex.new
  @req_view = req_view
end

Public Instance Methods

read_next_loop(finalize_stream, is_client = false) click to toggle source

Read the next stream iteration

@param [Proc] finalize_stream callback to call when the reads have been

completely read through.

@param [Boolean] is_client If this is a client or server request

# File src/ruby/lib/grpc/generic/bidi_call.rb, line 114
def read_next_loop(finalize_stream, is_client = false)
  read_loop(finalize_stream, is_client: is_client)
end
run_on_client(requests, set_input_stream_done, set_output_stream_done, &blk) click to toggle source

Begins orchestration of the Bidi stream for a client sending requests.

The method either returns an Enumerator of the responses, or accepts a block that can be invoked with each response.

@param requests the Enumerable of requests to send @param set_input_stream_done [Proc] called back when we're done

reading the input stream

@param set_output_stream_done [Proc] called back when we're done

sending data on the output stream

@return an Enumerator of requests to yield

# File src/ruby/lib/grpc/generic/bidi_call.rb, line 70
def run_on_client(requests,
                  set_input_stream_done,
                  set_output_stream_done,
                  &blk)
  @enq_th = Thread.new do
    write_loop(requests, set_output_stream_done: set_output_stream_done)
  end
  read_loop(set_input_stream_done, &blk)
end
run_on_server(gen_each_reply, requests) click to toggle source

Begins orchestration of the Bidi stream for a server generating replies.

N.B. gen_each_reply is a func(Enumerable<Requests>)

It takes an enumerable of requests as an arg, in case there is a relationship between the stream of requests and the stream of replies.

This does not mean that must necessarily be one. E.g, the replies produced by gen_each_reply could ignore the received_msgs

@param [Proc] gen_each_reply generates the BiDi stream replies. @param [Enumerable] requests The enumerable of requests to run

# File src/ruby/lib/grpc/generic/bidi_call.rb, line 92
def run_on_server(gen_each_reply, requests)
  replies = nil

  # Pass in the optional call object parameter if possible
  if gen_each_reply.arity == 1
    replies = gen_each_reply.call(requests)
  elsif gen_each_reply.arity == 2
    replies = gen_each_reply.call(requests, @req_view)
  else
    fail 'Illegal arity of reply generator'
  end

  write_loop(replies, is_client: false)
end

Private Instance Methods

read_loop(set_input_stream_done, is_client: true) { |res| ... } click to toggle source

Provides an enumerator that yields results of remote reads

# File src/ruby/lib/grpc/generic/bidi_call.rb, line 191
def read_loop(set_input_stream_done, is_client: true)
  return enum_for(:read_loop,
                  set_input_stream_done,
                  is_client: is_client) unless block_given?
  GRPC.logger.debug('bidi-read-loop: starting')
  begin
    count = 0
    # queue the initial read before beginning the loop
    loop do
      GRPC.logger.debug("bidi-read-loop: #{count}")
      count += 1
      batch_result = read_using_run_batch

      # handle the next message
      if batch_result.nil? || batch_result.message.nil?
        GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")

        if is_client
          batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
          @call.status = batch_result.status
          @call.trailing_metadata = @call.status.metadata if @call.status
          GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
          batch_result.check_status
        end

        GRPC.logger.debug('bidi-read-loop: done reading!')
        break
      end

      res = @unmarshal.call(batch_result.message)
      yield res
    end
  rescue StandardError => e
    GRPC.logger.warn('bidi: read-loop failed')
    GRPC.logger.warn(e)
    raise e
  ensure
    set_input_stream_done.call
  end
  GRPC.logger.debug('bidi-read-loop: finished')
  # Make sure that the write loop is done before finishing the call.
  # Note that blocking is ok at this point because we've already received
  # a status
  @enq_th.join if is_client
end
read_using_run_batch() click to toggle source

performs a read using @call.run_batch, ensures metadata is set up

# File src/ruby/lib/grpc/generic/bidi_call.rb, line 124
def read_using_run_batch
  ops = { RECV_MESSAGE => nil }
  ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
  begin
    batch_result = @call.run_batch(ops)
    unless @metadata_received
      @call.metadata = batch_result.metadata
      @metadata_received = true
    end
    batch_result
  rescue GRPC::Core::CallError => e
    GRPC.logger.warn('bidi call: read_using_run_batch failed')
    GRPC.logger.warn(e)
    nil
  end
end
write_loop(requests, is_client: true, set_output_stream_done: nil) click to toggle source

set_output_stream_done is relevant on client-side rubocop:disable Metrics/PerceivedComplexity

# File src/ruby/lib/grpc/generic/bidi_call.rb, line 143
def write_loop(requests, is_client: true, set_output_stream_done: nil)
  GRPC::Core.fork_unsafe_begin
  GRPC.logger.debug('bidi-write-loop: starting')
  count = 0
  requests.each do |req|
    GRPC.logger.debug("bidi-write-loop: #{count}")
    count += 1
    payload = @marshal.call(req)
    # Fails if status already received
    begin
      @req_view.send_initial_metadata unless @req_view.nil?
      @call.run_batch(SEND_MESSAGE => payload)
    rescue GRPC::Core::CallError => e
      # This is almost definitely caused by a status arriving while still
      # writing. Don't re-throw the error
      GRPC.logger.warn('bidi-write-loop: ended with error')
      GRPC.logger.warn(e)
      break
    end
  end
  GRPC.logger.debug("bidi-write-loop: #{count} writes done")
  if is_client
    GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
    begin
      @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
    rescue GRPC::Core::CallError => e
      GRPC.logger.warn('bidi-write-loop: send close failed')
      GRPC.logger.warn(e)
    end
    GRPC.logger.debug('bidi-write-loop: done')
  end
  GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
  GRPC.logger.warn('bidi-write-loop: failed')
  GRPC.logger.warn(e)
  if is_client
    @call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN,
                             "GRPC bidi call error: #{e.inspect}")
  else
    raise e
  end
ensure
  GRPC::Core.fork_unsafe_end
  set_output_stream_done.call if is_client
end