class Fluffle::Server

Attributes

default_server[RW]
confirms[R]
connection[R]
handler_pool[R]
handlers[R]
mandatory[R]
publish_timeout[RW]
shutdown_timeout[RW]

Public Class Methods

new(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false) click to toggle source

url: - Optional URL to pass to `Bunny.new` to immediately connect concurrency: - Number of threads to handle messages on (default: 1) confirms: - Whether or not to use RabbitMQ confirms

# File lib/fluffle/server.rb, line 14
def initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false)
  url_or_connection = url || connection
  self.connect(url_or_connection) if url_or_connection

  @confirms         = confirms
  @mandatory        = mandatory
  @publish_timeout  = 5
  @shutdown_timeout = 15

  @handlers     = {}
  @handler_pool = Concurrent::FixedThreadPool.new concurrency
  @consumers    = []

  self.class.default_server ||= self
end

Public Instance Methods

call_handler(handler:, request:) click to toggle source

handler - Instance of a `Handler` that may receive `#call` request - `Hash` representing a decoded Request

# File lib/fluffle/server.rb, line 178
def call_handler(handler:, request:)
  t0 = Time.now

  begin
    id = request['id']

    self.validate_request request

    result = handler.call id: id,
                          method: request['method'],
                          params: request['params'],
                          meta: {}
  rescue => err
    log_error(err) if Fluffle.logger.error?

    error = self.build_error_response err
  end

  response = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'meta'    => {
      'handler_duration' => (Time.now - t0)
    }
  }

  if error
    response['error'] = error
  else
    response['result'] = result
  end

  response
end
decode(payload) click to toggle source

Deserialize a JSON payload and extract its 3 members: id, method, params

payload - `String` of the payload from the queue

Returns a `Hash` from parsing the JSON payload (keys should be `String`)

# File lib/fluffle/server.rb, line 218
def decode(payload)
  Oj.load payload
end
drain(queue: 'default', handler: nil, &block) click to toggle source
# File lib/fluffle/server.rb, line 34
def drain(queue: 'default', handler: nil, &block)
  if handler && block
    raise ArgumentError, 'Cannot provide both handler: and block'
  end

  handler = Fluffle::Handlers::Dispatcher.new(&block) if block

  raise ArgumentError, 'Handler cannot be nil' if handler.nil?

  @handlers[queue.to_s] = handler
end
handle_request(handler:, properties:, payload:) click to toggle source
# File lib/fluffle/server.rb, line 135
def handle_request(handler:, properties:, payload:)
  reply_to = properties[:reply_to]

  id       = nil
  response = nil

  begin
    request = self.decode payload
    id      = request['id']

    response = self.call_handler handler: handler, request: request
  rescue => err
    response = {
      'jsonrpc' => '2.0',
      'id'      => id,
      'error'   => self.build_error_response(err)
    }
  end

  stack = Fluffle::MiddlewareStack.new

  if confirms
    stack.push ->(publish) {
      @confirmer.with_confirmation timeout: publish_timeout, &publish
    }
  end

  stack.call do
    @exchange.publish Oj.dump(response), routing_key: reply_to,
                                         correlation_id: response['id']
  end

  if handler.respond_to? :after_response
    begin
      handler.after_response request: request
    rescue => err
      log_error(err) if Fluffle.logger.error?
    end
  end
end
handle_returns() click to toggle source
# File lib/fluffle/server.rb, line 91
def handle_returns
  @exchange.on_return do |return_info, _properties, _payload|
    message = Kernel.sprintf "Received return from exchange for routing key `%s' (%d %s)", return_info.routing_key, return_info.reply_code, return_info.reply_text
    Fluffle.logger.error "[Fluffle::Server] #{message}"
  end
end
start() click to toggle source
# File lib/fluffle/server.rb, line 46
def start
  @handlers.freeze

  @channel  = @connection.create_channel
  @exchange = @channel.default_exchange

  # Ensure we only receive 1 message at a time for each consumer
  @channel.prefetch 1

  if confirms
    @confirmer = Fluffle::Confirmer.new channel: @channel
    @confirmer.confirm_select
  end

  if mandatory
    handle_returns
  end

  raise 'No handlers defined' if @handlers.empty?

  @handlers.each do |name, handler|
    qualified_name = Fluffle.request_queue_name name
    queue          = @channel.queue qualified_name

    consumer = queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
      @handler_pool.post do
        begin
          handle_request handler: handler,
                         properties: properties,
                         payload: payload
        rescue => err
          # Ensure we don't loose any errors on the handler pool's thread
          Fluffle.logger.error "[Fluffle::Server] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}"
        ensure
          @channel.ack delivery_info.delivery_tag
        end
      end
    end

    @consumers << consumer
  end

  self.wait_for_signal
end
validate_request(request) click to toggle source

Raises if elements of the request payload do not comply with the spec

payload - Decoded `Hash` of the payload (`String` keys)

# File lib/fluffle/server.rb, line 225
def validate_request(request)
  raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash)
  raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method']
end
wait_for_signal() click to toggle source

NOTE: Keeping this in its own method so its functionality can be more

easily overwritten by `Fluffle::Testing`.
# File lib/fluffle/server.rb, line 100
def wait_for_signal
  signal_read, signal_write = IO.pipe

  %w[INT TERM].each do |signal|
    Signal.trap(signal) do
      signal_write.puts signal
    end
  end

  # Adapted from Sidekiq:
  #   https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97
  while io = IO.select([signal_read])
    readables = io.first
    signal    = readables.first.gets.strip

    Fluffle.logger.info "Received #{signal}; shutting down..."

    # First stop the consumers from receiving messages
    @consumers.each &:cancel

    # Then wait for worker pools to finish processing their active jobs
    @handler_pool.shutdown
    unless @handler_pool.wait_for_termination(@shutdown_timeout)
      # `wait_for_termination` returns false if it didn't shut down in time,
      # so we need to kill it
      @handler_pool.kill
    end

    # Finally close the connection
    @channel.close

    return
  end
end

Protected Instance Methods

build_error_response(err) click to toggle source

Convert a Ruby error into a hash complying with the JSON-RPC spec for `Error` response objects

# File lib/fluffle/server.rb, line 251
def build_error_response(err)
  if err.is_a? Errors::BaseError
    err.to_response

  elsif err.is_a? NoMethodError
    { 'code' => -32601, 'message' => 'Method not found' }

  else
    response = {
      'code' => 0,
      'message' => "#{err.class}: #{err.message}"
    }

    response['data'] = err.data if err.respond_to? :data

    response
  end
end
log_error(err) click to toggle source

Logs a nicely-formmated error to `Fluffle.logger` with the class, message, and backtrace (if available)

# File lib/fluffle/server.rb, line 234
def log_error(err)
  backtrace = Array(err.backtrace).flatten.compact

  backtrace =
    if backtrace.empty?
      ''
    else
      prefix = "\n  "
      prefix + backtrace.join(prefix)
    end

  message = "#{err.class}: #{err.message}#{backtrace}"
  Fluffle.logger.error message
end