class Hutch::Worker

Public Class Methods

new(broker, consumers) click to toggle source
# File lib/hutch/worker.rb, line 10
def initialize(broker, consumers)
  @broker        = broker
  self.consumers = consumers
end

Public Instance Methods

consumers=(val) click to toggle source
# File lib/hutch/worker.rb, line 103
def consumers=(val)
  if val.empty?
    logger.warn "no consumer loaded, ensure there's no configuration issue"
  end
  @consumers = val
end
handle_error(message_id, payload, consumer, ex) click to toggle source
# File lib/hutch/worker.rb, line 97
def handle_error(message_id, payload, consumer, ex)
  Hutch::Config[:error_handlers].each do |backend|
    backend.handle(message_id, payload, consumer, ex)
  end
end
handle_message(consumer, delivery_info, properties, payload) click to toggle source

Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.

# File lib/hutch/worker.rb, line 75
def handle_message(consumer, delivery_info, properties, payload)
  logger.info("message(#{properties.message_id || '-'}): " +
              "routing key: #{delivery_info.routing_key}, " +
              "consumer: #{consumer}, " +
              "payload: #{payload}")

  broker = @broker
  begin
    message = Message.new(delivery_info, properties, payload)
    consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
    with_tracing(consumer_instance).handle(message)
    broker.ack(delivery_info.delivery_tag)
  rescue StandardError => ex
    broker.nack(delivery_info.delivery_tag)
    handle_error(properties.message_id, payload, consumer, ex)
  end
end
handle_signals() click to toggle source

Handle any pending signals

# File lib/hutch/worker.rb, line 43
def handle_signals
  signal = Thread.main[:signal_queue].shift
  if signal
    logger.info "caught sig#{signal.downcase}, stopping hutch..."
    stop
  end
end
register_signal_handlers() click to toggle source

Register handlers for SIG{QUIT,TERM,INT} to shut down the worker gracefully. Forceful shutdowns are very bad!

# File lib/hutch/worker.rb, line 31
def register_signal_handlers
  Thread.main[:signal_queue] = []
  %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig|
    # This needs to be reentrant, so we queue up signals to be handled
    # in the run loop, rather than acting on signals here
    trap(sig) do
      Thread.main[:signal_queue] << sig
    end
  end
end
run() click to toggle source

Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.

# File lib/hutch/worker.rb, line 18
def run
  setup_queues

  # Set up signal handlers for graceful shutdown
  register_signal_handlers

  # Take a break from Thread#join every 0.1 seconds to check if we've
  # been sent any signals
  handle_signals until @broker.wait_on_threads(0.1)
end
setup_queue(consumer) click to toggle source

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.

# File lib/hutch/worker.rb, line 64
def setup_queue(consumer)
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)

  queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
    handle_message(consumer, delivery_info, properties, payload)
  end
end
setup_queues() click to toggle source

Set up the queues for each of the worker’s consumers.

# File lib/hutch/worker.rb, line 57
def setup_queues
  logger.info 'setting up queues'
  @consumers.each { |consumer| setup_queue(consumer) }
end
stop() click to toggle source

Stop a running worker by killing all subscriber threads.

# File lib/hutch/worker.rb, line 52
def stop
  @broker.stop
end
with_tracing(klass) click to toggle source
# File lib/hutch/worker.rb, line 93
def with_tracing(klass)
  Hutch::Config[:tracer].new(klass)
end