class GrpcKit::Transport::ClientTransport

Public Class Methods

new(session) click to toggle source

@param session [GrpcKit::Session::ClientSession]

# File lib/grpc_kit/transport/client_transport.rb, line 11
def initialize(session)
  @session = session
  @stream = nil # set later
end

Public Instance Methods

close_and_flush() click to toggle source

@return [void]

# File lib/grpc_kit/transport/client_transport.rb, line 26
def close_and_flush
  @stream.end_write
  send_data

  @session.start(@stream.stream_id)
  @stream.end_read
  @deferred = false
end
read_data(last: false) click to toggle source

@param last [Boolean] @return [nil,Array<Boolean,Integer,String>] nil when closed, tuple of Length-Prefixed-Message

# File lib/grpc_kit/transport/client_transport.rb, line 45
def read_data(last: false)
  data_in_buffer = unpack(nil)
  return data_in_buffer if data_in_buffer
  loop do
    data = recv_data(last: last)
    return unpack(nil) unless data
    message = unpack(data)
    return message if message
  end
end
read_data_nonblock(last: false) click to toggle source

@param last [Boolean] @return [nil,Array<Boolean,Integer,String>,Symbol] nil when closed, tuple of Length-Prefixed-Message, or :wait_readable

# File lib/grpc_kit/transport/client_transport.rb, line 58
def read_data_nonblock(last: false)
  data_in_buffer = unpack(nil) 
  return data_in_buffer if data_in_buffer

  data = nonblock_recv_data(last: last)
  if data == :wait_readable
    :wait_readable
  elsif data == nil
    return unpack(nil)
  else
    unpack(data) || :wait_readable
  end
end
recv_headers() click to toggle source

@return [Hash<String,String>]

# File lib/grpc_kit/transport/client_transport.rb, line 73
def recv_headers
  wait_close
  @stream.headers
end
start_request(data, headers, last: false) click to toggle source

@param data [String] @param headers [Hash<String, String>] @param last [Boolean] @return [void]

# File lib/grpc_kit/transport/client_transport.rb, line 20
def start_request(data, headers, last: false)
  @stream = @session.send_request(headers)
  write_data(data, last: last)
end
write_data(buf, last: false) click to toggle source

@param buf [String] @param last [Boolean] @return [void]

# File lib/grpc_kit/transport/client_transport.rb, line 38
def write_data(buf, last: false)
  write(@stream.pending_send_data, pack(buf), last: last)
  send_data
end

Private Instance Methods

nonblock_recv_data(last: false) click to toggle source
# File lib/grpc_kit/transport/client_transport.rb, line 91
def nonblock_recv_data(last: false)
  data = @stream.read_recv_data(last: last, blocking: false)
  return data if data.is_a?(String)
  return nil unless data

  :wait_readable
end
recv_data(last: false) click to toggle source
# File lib/grpc_kit/transport/client_transport.rb, line 99
def recv_data(last: false)
  loop do
    # FIXME: GrpcKit::Client isn't threaded, this cannot be blocked to trigger ClientSession#run_once appropriately
    #        but run_once would block while no outbound requests. Could be problematic on BiDi calls.
    data = @stream.read_recv_data(last: last, blocking: false)
    case data
    when :wait_readable
      @session.run_once
    when String
      return data
    when nil
      return nil
    end
  end
end
send_data() click to toggle source
# File lib/grpc_kit/transport/client_transport.rb, line 115
def send_data
  if @stream.pending_send_data.need_resume?
    @session.resume_data(@stream.stream_id)
  end

  @session.run_once
end
wait_close() click to toggle source
# File lib/grpc_kit/transport/client_transport.rb, line 80
def wait_close
  # XXX: wait until half close (remote) to get grpc-status
  until @stream.close_remote?
    @session.run_once
  end
end
write(stream, buf, last: false) click to toggle source
# File lib/grpc_kit/transport/client_transport.rb, line 87
def write(stream, buf, last: false)
  stream.write(buf, last: last)
end