module Cottontail::Consumer
The Cottontail::Consumer
is the module for receiving asynchronous AMQP messages.
@example A basic worker
class Worker include Cottontail::Consumer session ENV['RABBITMQ_URL'] do |worker, session| channel = session.create_channel queue = channel.queue('', durable: true) worker.subscribe(queue, exclusive: true, ack: false) end consume do |delivery_info, properties, payload| logger.info payload.inspect end end
@example More custom worker
class Worker include Cottontail::Consumer session ENV['RABBITMQ_URL'] do |worker, session| # You always need a separate channel channel = session.create_channel # Creates a `topic` exchange ('cottontail-exchange'), binds a # queue ('cottontail-queue') to it and listens to any possible # routing key ('#'). exchange = channel.topic('cottontail-exchange') queue = channel.queue('cottontail-queue', durable: true) .bind(exchange, routing_key: '#') # Now you need to subscribe the worker instance to this queue. worker.subscribe(queue, exclusive: true, ack: false) end consume 'custom-route' do |delivery_info, properties, payload| logger.info "routing_key: 'custom-route' | #{payload.inspect}" end consume do |delivery_info, properties, payload| logger.info "any routing key | #{payload.inspect}" end end
Attributes
options[RW]
Public Class Methods
new(options = {})
click to toggle source
# File lib/cottontail/consumer.rb, line 175 def initialize(options = {}) @options = options run_callbacks :initialize do @__running__ = false @__launcher__ = Cottontail::Consumer::Launcher.new(self) @__session__ = Cottontail::Consumer::Session.new(self) end logger.debug '[Cottontail] initialized' end
Public Instance Methods
logger()
click to toggle source
@private
# File lib/cottontail/consumer.rb, line 218 def logger config.get(:logger) end
running?()
click to toggle source
# File lib/cottontail/consumer.rb, line 206 def running? @__running__ end
start(blocking = true)
click to toggle source
# File lib/cottontail/consumer.rb, line 188 def start(blocking = true) logger.info '[Cottontail] starting up' @__session__.start @__running__ = true @__launcher__.start if blocking end
stop()
click to toggle source
# File lib/cottontail/consumer.rb, line 196 def stop return unless running? logger.info '[Cottontail] shutting down' @__launcher__.stop @__session__.stop @__running__ = false end
subscribe(queue, options)
click to toggle source
@private
# File lib/cottontail/consumer.rb, line 211 def subscribe(queue, options) queue.subscribe(options) do |delivery_info, properties, payload| consume(delivery_info, properties, payload) end end
Private Instance Methods
consume(delivery_info, properties, payload)
click to toggle source
# File lib/cottontail/consumer.rb, line 224 def consume(delivery_info, properties, payload) run_callbacks :consume do execute(delivery_info, properties, payload) end rescue => exception logger.error exception if config.get(:raise_on_exception) stop raise(exception, caller) end end
execute(delivery_info, properties, payload)
click to toggle source
# File lib/cottontail/consumer.rb, line 238 def execute(delivery_info, properties, payload) entity = config.get(:consumables).find(delivery_info, properties, payload) if entity.nil? logger.warn '[Cottontail] Could not consume message' else entity.exec(self, delivery_info, properties, payload) end end