class Anschel::Input::RabbitMQ

Public Class Methods

new(output, config, stats, log) click to toggle source
# File lib/anschel/input/rabbitmq.rb, line 9
def initialize output, config, stats, log
  connection_defaults = {
    heartbeat_interval: 30,
    connection_timeout: 10,
    automatically_recover: false
  }

  exchange_defaults = {
    type: 'x-consistent-hash',
    durable: true
  }

  queue_defaults = {
    exclusive: false,
    auto_delete: false,
    durable: true
  }

  binding_defaults = {
    routing_key: '100'
  }

  subscription_defaults = {
    block: true,
    ack: true,
    manual_ack: true
  }

  connection = ::MarchHare.connect \
    connection_defaults.merge(config[:connection] || {})

  handle_errors connection, log

  exchange_name = config[:exchange].delete(:name)

  @threads = config[:queues].map do |queue_name, queue_config|
    Thread.new do
      channel = connection.create_channel
      channel.prefetch = config[:prefetch] || 256

      exchange = channel.exchange exchange_name, \
        exchange_defaults.merge(config[:exchange])

      subscription = subscription_defaults.merge \
        (config[:subscription] || {})

      queue = channel.queue queue_name.to_s, \
        queue_defaults.merge(queue_config)

      queue.bind exchange, \
        binding_defaults.merge(config[:binding] || {})

      log.debug \
        event: 'input-rabbitmq-connecting-queue',
        queue: queue_name

      queue.subscribe(subscription) do |meta, message|
        output << message
        stats.inc 'input'
        channel.ack meta.delivery_tag if subscription[:manual_ack]
      end
    end
  end
end

Public Instance Methods

stop() click to toggle source
# File lib/anschel/input/rabbitmq.rb, line 74
def stop
  return if @stopped
  @threads.map(&:kill)
  @stopped = true
end

Private Instance Methods

handle_errors(connection, log) click to toggle source

Ensure broker errors results in logs and exits appropriately. Seems like the March Hare is catching exceptions under the hood, so we can’t just raise within these callbacks, and we’re stuck with this jankiness

# File lib/anschel/input/rabbitmq.rb, line 85
def handle_errors connection, log
  shutdown, blocked = false, false

  connection.on_blocked { |reason| blocked = reason }

  connection.on_unblocked { blocked = nil }

  connection.on_shutdown { |_, reason| shutdown = reason }

  Thread.new do
    loop do
      sleep 1

      if shutdown
        log.fatal \
          event: 'input-rabbitmq-shutdown',
          reason: shutdown
        raise RuntimeError
      end

      if blocked
        log.warn \
          event: 'input-rabbitmq-blocked',
          reason: blocked

      elsif blocked.nil?
        log.warn \
          event: 'input-rabbitmq-unblocked'
      end

      blocked = false
    end
  end
end