module Cottontail::Consumer

The Cottontail::Consumer is the module for receiving asynchronous AMQP messages.

@example A basic worker

class Worker
  include Cottontail::Consumer

  session ENV['RABBITMQ_URL'] do |worker, session|
    channel = session.create_channel

    queue = channel.queue('', durable: true)
    worker.subscribe(queue, exclusive: true, ack: false)
  end

  consume do |delivery_info, properties, payload|
    logger.info payload.inspect
  end
end

@example More custom worker

class Worker
  include Cottontail::Consumer

  session ENV['RABBITMQ_URL'] do |worker, session|
    # You always need a separate channel
    channel = session.create_channel

    # Creates a `topic` exchange ('cottontail-exchange'), binds a
    # queue ('cottontail-queue') to it and listens to any possible
    # routing key ('#').
    exchange = channel.topic('cottontail-exchange')
    queue = channel.queue('cottontail-queue', durable: true)
      .bind(exchange, routing_key: '#')

    # Now you need to subscribe the worker instance to this queue.
    worker.subscribe(queue, exclusive: true, ack: false)
  end

  consume 'custom-route' do |delivery_info, properties, payload|
    logger.info "routing_key: 'custom-route' | #{payload.inspect}"
  end

  consume do |delivery_info, properties, payload|
    logger.info "any routing key | #{payload.inspect}"
  end
end

Attributes

options[RW]

Public Class Methods

new(options = {}) click to toggle source
# File lib/cottontail/consumer.rb, line 175
def initialize(options = {})
  @options = options

  run_callbacks :initialize do
    @__running__ = false

    @__launcher__ = Cottontail::Consumer::Launcher.new(self)
    @__session__ = Cottontail::Consumer::Session.new(self)
  end

  logger.debug '[Cottontail] initialized'
end

Public Instance Methods

logger() click to toggle source

@private

# File lib/cottontail/consumer.rb, line 218
def logger
  config.get(:logger)
end
running?() click to toggle source
# File lib/cottontail/consumer.rb, line 206
def running?
  @__running__
end
start(blocking = true) click to toggle source
# File lib/cottontail/consumer.rb, line 188
def start(blocking = true)
  logger.info '[Cottontail] starting up'

  @__session__.start
  @__running__ = true
  @__launcher__.start if blocking
end
stop() click to toggle source
# File lib/cottontail/consumer.rb, line 196
def stop
  return unless running?

  logger.info '[Cottontail] shutting down'

  @__launcher__.stop
  @__session__.stop
  @__running__ = false
end
subscribe(queue, options) click to toggle source

@private

# File lib/cottontail/consumer.rb, line 211
def subscribe(queue, options)
  queue.subscribe(options) do |delivery_info, properties, payload|
    consume(delivery_info, properties, payload)
  end
end

Private Instance Methods

consume(delivery_info, properties, payload) click to toggle source
# File lib/cottontail/consumer.rb, line 224
def consume(delivery_info, properties, payload)
  run_callbacks :consume do
    execute(delivery_info, properties, payload)
  end
rescue => exception
  logger.error exception

  if config.get(:raise_on_exception)
    stop

    raise(exception, caller)
  end
end
execute(delivery_info, properties, payload) click to toggle source
# File lib/cottontail/consumer.rb, line 238
def execute(delivery_info, properties, payload)
  entity = config.get(:consumables).find(delivery_info, properties, payload)

  if entity.nil?
    logger.warn '[Cottontail] Could not consume message'
  else
    entity.exec(self, delivery_info, properties, payload)
  end
end