class THTP::Client::Dispatcher

RPC-over-HTTP protocol implementation and executor

Constants

SUCCESS_FIELD

Public Class Methods

new(service, connection, protocol) click to toggle source

@param service [Class] The Thrift service whose schema to use for de/serialisation @parma connection [ConnectionPool<Patron::Session>] The configured HTTP client pool @param protocol [Thrift::BaseProtocol] The default protocol with which to serialise

# File lib/thtp/client.rb, line 31
def initialize(service, connection, protocol)
  @service = service
  @connection = connection
  @protocol = protocol
  # define RPC proxy methods on this instance
  extract_rpcs(service).each do |rpc|
    define_singleton_method(rpc) { |*rpc_args| post_rpc(rpc, *rpc_args) }
  end
end

Private Instance Methods

post_rpc(rpc, *args) click to toggle source
# File lib/thtp/client.rb, line 43
def post_rpc(rpc, *args)
  # send request over persistent HTTP connection
  response = @connection.with { |c| c.post(rpc, write_call(rpc, args)) }
  # interpret HTTP status code to determine message type and deserialise appropriately
  protocol = Encoding.protocol(response.headers['Content-Type']) || @protocol
  return read_reply(rpc, response.body, protocol) if response.status == Status::REPLY
  return read_exception(response.body, protocol) if response.status == Status::EXCEPTION
  # if the HTTP status code was unrecognised, report back
  raise UnknownMessageType, rpc, response.status, response.body
rescue Patron::TimeoutError, Timeout::Error
  raise RpcTimeoutError, rpc
rescue Patron::Error
  raise ServerUnreachableError
end
read_exception(exception, protocol) click to toggle source
# File lib/thtp/client.rb, line 88
def read_exception(exception, protocol)
  raise deserialize_buffer(exception, Thrift::ApplicationException.new, protocol)
end
read_reply(rpc, reply, protocol) click to toggle source
# File lib/thtp/client.rb, line 71
def read_reply(rpc, reply, protocol)
  # deserialise reply into result struct
  result_struct = result_class(@service, rpc).new
  deserialize_buffer(reply, result_struct, protocol)
  # results have at most one field set; find it and return/raise it
  result_struct.struct_fields.each_value do |field|
    reply = result_struct.public_send(field[:name])
    next if reply.nil? # this isn't the set field, keep looking
    return reply if field[:name] == SUCCESS_FIELD # 'success' is special and means no worries
    raise reply # any other set field must be an exception
  end
  # if no field is set and there's no `success` field, the RPC returned `void``
  return nil unless result_struct.respond_to?(:success)
  # otherwise, we don't recognise the response (our schema is out of date, or it's invalid)
  raise BadResponseError, rpc
end
write_call(rpc, args) click to toggle source
# File lib/thtp/client.rb, line 58
def write_call(rpc, args)
  # args are named methods, but RPC signatures use positional arguments;
  # convert between the two using struct_fields, which is an ordered hash.
  args_struct = args_class(@service, rpc).new
  args_struct.struct_fields.values.zip(args).each do |field, val|
    args_struct.public_send("#{field[:name]}=", val)
  end
  # serialise and return bytestring
  serialize_buffer(args_struct, @protocol)
rescue Thrift::TypeError => e
  raise ClientValidationError, e.message
end