class Anschel::Input::RabbitMQ
Public Class Methods
new(output, config, stats, log)
click to toggle source
# File lib/anschel/input/rabbitmq.rb, line 9 def initialize output, config, stats, log connection_defaults = { heartbeat_interval: 30, connection_timeout: 10, automatically_recover: false } exchange_defaults = { type: 'x-consistent-hash', durable: true } queue_defaults = { exclusive: false, auto_delete: false, durable: true } binding_defaults = { routing_key: '100' } subscription_defaults = { block: true, ack: true, manual_ack: true } connection = ::MarchHare.connect \ connection_defaults.merge(config[:connection] || {}) handle_errors connection, log exchange_name = config[:exchange].delete(:name) @threads = config[:queues].map do |queue_name, queue_config| Thread.new do channel = connection.create_channel channel.prefetch = config[:prefetch] || 256 exchange = channel.exchange exchange_name, \ exchange_defaults.merge(config[:exchange]) subscription = subscription_defaults.merge \ (config[:subscription] || {}) queue = channel.queue queue_name.to_s, \ queue_defaults.merge(queue_config) queue.bind exchange, \ binding_defaults.merge(config[:binding] || {}) log.debug \ event: 'input-rabbitmq-connecting-queue', queue: queue_name queue.subscribe(subscription) do |meta, message| output << message stats.inc 'input' channel.ack meta.delivery_tag if subscription[:manual_ack] end end end end
Public Instance Methods
stop()
click to toggle source
# File lib/anschel/input/rabbitmq.rb, line 74 def stop return if @stopped @threads.map(&:kill) @stopped = true end
Private Instance Methods
handle_errors(connection, log)
click to toggle source
Ensure broker errors results in logs and exits appropriately. Seems like the March Hare is catching exceptions under the hood, so we can’t just raise within these callbacks, and we’re stuck with this jankiness
# File lib/anschel/input/rabbitmq.rb, line 85 def handle_errors connection, log shutdown, blocked = false, false connection.on_blocked { |reason| blocked = reason } connection.on_unblocked { blocked = nil } connection.on_shutdown { |_, reason| shutdown = reason } Thread.new do loop do sleep 1 if shutdown log.fatal \ event: 'input-rabbitmq-shutdown', reason: shutdown raise RuntimeError end if blocked log.warn \ event: 'input-rabbitmq-blocked', reason: blocked elsif blocked.nil? log.warn \ event: 'input-rabbitmq-unblocked' end blocked = false end end end