class Fluffle::Client

Attributes

confirms[R]
default_timeout[RW]
logger[RW]
mandatory[R]

Public Class Methods

new(url: nil, connection: nil, confirms: false, mandatory: false) click to toggle source
# File lib/fluffle/client.rb, line 17
def initialize(url: nil, connection: nil, confirms: false, mandatory: false)
  self.connect(url || connection)

  @confirms        = confirms
  @mandatory       = mandatory
  @default_timeout = 5
  @logger          = Fluffle.logger

  @uuid        = UUIDTools::UUID.timestamp_create.to_s
  @channel     = @connection.create_channel
  @exchange    = @channel.default_exchange
  @reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true

  # Used for generating unique message IDs
  @prng = Random.new

  if confirms
    @confirmer = Fluffle::Confirmer.new channel: @channel
    @confirmer.confirm_select
  end

  if mandatory
    handle_returns
  end

  @pending_responses = Concurrent::Map.new
  subscribe
end

Public Instance Methods

call(method, params = [], queue: 'default', raw_response: false, **opts) click to toggle source
# File lib/fluffle/client.rb, line 90
def call(method, params = [], queue: 'default', raw_response: false, **opts)
  # Using `.fetch` here so that we can pass `nil` as the timeout and have
  # it be respected
  timeout = opts.fetch :timeout, self.default_timeout

  id = random_bytes_as_hex 8

  payload = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'method'  => method,
    'params'  => params
  }

  response = publish_and_wait payload, queue: queue,
                                       timeout: timeout

  return response if raw_response

  if response.key? 'result'
    response['result']
  else
    error = response['error'] || {}

    raise Errors::CustomError.new code: error['code'] || 0,
                                  message: error['message'] || "Missing both `result' and `error' on Response object",
                                  data: error['data']
  end
end
describe_payload(payload) click to toggle source

Returns a nice formatted description of a payload with its method name

and arity
# File lib/fluffle/client.rb, line 166
def describe_payload(payload)
  method = payload['method']
  arity  = (payload['params'] && payload['params'].length) || 0

  "#{method}/#{arity}"
end
handle_reply(delivery_info:, properties:, payload:) click to toggle source

Fetch and set the `IVar` with a response from the server. This method is called from the reply queue's background thread; the main thread will normally be waiting for the `IVar` to be set.

# File lib/fluffle/client.rb, line 77
def handle_reply(delivery_info:, properties:, payload:)
  payload = Oj.load payload
  id      = payload['id']

  ivar = @pending_responses.delete id

  if ivar
    ivar.set payload
  else
    self.logger.error "Missing pending response IVar: id=#{id || 'null'}"
  end
end
handle_returns() click to toggle source
# File lib/fluffle/client.rb, line 60
def handle_returns
  @exchange.on_return do |return_info, properties, _payload|
    id   = properties[:correlation_id]
    ivar = @pending_responses.delete id

    if ivar
      message = Kernel.sprintf "Received return from exchange for routing key `%s' (%d %s)", return_info.routing_key, return_info.reply_code, return_info.reply_text

      error = Fluffle::Errors::ReturnError.new message
      ivar.set error
    end
  end
end
publish(payload, queue:) click to toggle source
# File lib/fluffle/client.rb, line 173
def publish(payload, queue:)
  opts = {
    routing_key: Fluffle.request_queue_name(queue),
    correlation_id: payload['id'],
    reply_to: @reply_queue.name,
    mandatory: @mandatory,
  }

  @exchange.publish Oj.dump(payload), opts
end
publish_and_wait(payload, queue:, timeout:) click to toggle source

Publish a payload to the server and wait (block) for the response

It creates an `IVar` future for the response, stores that in `@pending_responses`, and then publishes the payload to the server. After publishing it waits for the `IVar` to be set with the response. It also clears that `IVar` if it times out to avoid leaking.

Returns a `Hash` from the JSON response from the server Raises `Fluffle::Errors::TimeoutError` if the server failed to respond

within the given time in `timeout:`
# File lib/fluffle/client.rb, line 130
def publish_and_wait(payload, queue:, timeout:)
  id = payload['id']

  response_ivar = Concurrent::IVar.new
  @pending_responses[id] = response_ivar

  stack = Fluffle::MiddlewareStack.new

  if confirms
    stack.push ->(publish) do
      @confirmer.with_confirmation timeout: timeout, &publish
    end
  end

  stack.call do
    publish payload, queue: queue
  end

  response = response_ivar.value timeout

  if response_ivar.incomplete?
    raise Errors::TimeoutError.new("Timed out waiting for response to `#{describe_payload(payload)}'")
  elsif response.is_a? StandardError
    # Exchange returns will preempt the response and set it to an error
    # that we can raise
    raise response
  end

  return response
ensure
  # Don't leak the `IVar` if it timed out
  @pending_responses.delete id
end
subscribe() click to toggle source
# File lib/fluffle/client.rb, line 46
def subscribe
  @reply_queue.subscribe do |delivery_info, properties, payload|
    begin
      self.handle_reply delivery_info: delivery_info,
                        properties: properties,
                        payload: payload
    rescue => err
      # Bunny will let uncaptured errors silently wreck the reply thread,
      # so we must be extra-careful about capturing them
      Fluffle.logger.error "[Fluffle::Client] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}"
    end
  end
end

Protected Instance Methods

random_bytes_as_hex(bytes) click to toggle source
# File lib/fluffle/client.rb, line 186
def random_bytes_as_hex(bytes)
  # Adapted from `SecureRandom.hex`
  @prng.bytes(bytes).unpack('H*')[0]
end