class Hutch::Worker
Public Class Methods
new(broker, consumers)
click to toggle source
# File lib/hutch/worker.rb, line 10 def initialize(broker, consumers) @broker = broker self.consumers = consumers end
Public Instance Methods
consumers=(val)
click to toggle source
# File lib/hutch/worker.rb, line 103 def consumers=(val) if val.empty? logger.warn "no consumer loaded, ensure there's no configuration issue" end @consumers = val end
handle_error(message_id, payload, consumer, ex)
click to toggle source
# File lib/hutch/worker.rb, line 97 def handle_error(message_id, payload, consumer, ex) Hutch::Config[:error_handlers].each do |backend| backend.handle(message_id, payload, consumer, ex) end end
handle_message(consumer, delivery_info, properties, payload)
click to toggle source
Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.
# File lib/hutch/worker.rb, line 75 def handle_message(consumer, delivery_info, properties, payload) logger.info("message(#{properties.message_id || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{payload}") broker = @broker begin message = Message.new(delivery_info, properties, payload) consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info } with_tracing(consumer_instance).handle(message) broker.ack(delivery_info.delivery_tag) rescue StandardError => ex broker.nack(delivery_info.delivery_tag) handle_error(properties.message_id, payload, consumer, ex) end end
handle_signals()
click to toggle source
Handle any pending signals
# File lib/hutch/worker.rb, line 43 def handle_signals signal = Thread.main[:signal_queue].shift if signal logger.info "caught sig#{signal.downcase}, stopping hutch..." stop end end
register_signal_handlers()
click to toggle source
Register handlers for SIG{QUIT,TERM,INT} to shut down the worker gracefully. Forceful shutdowns are very bad!
# File lib/hutch/worker.rb, line 31 def register_signal_handlers Thread.main[:signal_queue] = [] %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig| # This needs to be reentrant, so we queue up signals to be handled # in the run loop, rather than acting on signals here trap(sig) do Thread.main[:signal_queue] << sig end end end
run()
click to toggle source
Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.
# File lib/hutch/worker.rb, line 18 def run setup_queues # Set up signal handlers for graceful shutdown register_signal_handlers # Take a break from Thread#join every 0.1 seconds to check if we've # been sent any signals handle_signals until @broker.wait_on_threads(0.1) end
setup_queue(consumer)
click to toggle source
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
# File lib/hutch/worker.rb, line 64 def setup_queue(consumer) queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(manual_ack: true) do |delivery_info, properties, payload| handle_message(consumer, delivery_info, properties, payload) end end
setup_queues()
click to toggle source
Set up the queues for each of the worker’s consumers.
# File lib/hutch/worker.rb, line 57 def setup_queues logger.info 'setting up queues' @consumers.each { |consumer| setup_queue(consumer) } end
stop()
click to toggle source
Stop a running worker by killing all subscriber threads.
# File lib/hutch/worker.rb, line 52 def stop @broker.stop end
with_tracing(klass)
click to toggle source
# File lib/hutch/worker.rb, line 93 def with_tracing(klass) Hutch::Config[:tracer].new(klass) end