class Emque::Consuming::Adapters::RabbitMq::Worker

Attributes

channel[RW]
name[RW]
queue[RW]
topic[RW]

Public Class Methods

new(connection, topic) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 20
def initialize(connection, topic)
  self.topic = topic
  self.name = "#{self.topic} worker"
  self.shutdown = false

  # @note: channels are not thread safe, so is better to use
  #        a new channel in each worker.
  # https://github.com/jhbabon/amqp-celluloid/blob/master/lib/consumer.rb
  self.channel = connection.create_channel

  if config.adapter.options[:prefetch]
    channel.prefetch(config.adapter.options[:prefetch])
  end

  self.queue = channel
    .queue(
      "emque.#{config.app_name}.#{topic}",
      :durable => config.adapter.options[:durable],
      :auto_delete => config.adapter.options[:auto_delete],
      :arguments => {
        "x-dead-letter-exchange" => "#{config.app_name}.error"
      }
    )
    .bind(
      channel.fanout(
        topic.to_s, 
        :durable => true, 
        :auto_delete => false,
       )
    )
end

Public Instance Methods

actor_died(actor, reason) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 14
def actor_died(actor, reason)
  unless shutdown
    logger.error "#{log_prefix} actor_died - died: #{reason}"
  end
end
start() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 52
def start
  logger.info "#{log_prefix} starting..."
  queue.subscribe(:manual_ack => true, &method(:process_message))
  logger.debug "#{log_prefix} started"
end
stop() click to toggle source
Calls superclass method Emque::Consuming::Actor#stop
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 58
def stop
  logger.debug "#{log_prefix} stopping..."
  super do
    logger.debug "#{log_prefix} closing channel"
    channel.close
  end
  logger.debug "#{log_prefix} stopped"
end

Private Instance Methods

delayed_message_exchange() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 104
def delayed_message_exchange
  delayed_message_exchange = channel.exchange(
    "emque.#{config.app_name}.delayed_message",
    {
      :type => "x-delayed-message",
      :durable => true,
      :auto_delete => false,
      :arguments => {
        "x-delayed-type" => "direct",
      }
    }
  )
  channel.queue(
    "emque.#{config.app_name}.delayed_message",
    :durable => config.adapter.options[:durable],
    :auto_delete => config.adapter.options[:auto_delete],
    :arguments => {
      "x-dead-letter-exchange" => "#{config.app_name}.error"
    }
  ).bind(delayed_message_exchange)

  delayed_message_exchange
end
enable_delayed_message() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 71
def enable_delayed_message
  config.enable_delayed_message
end
log_prefix() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 128
def log_prefix
  "RabbitMQ Worker: #{object_id} #{name}"
end
process_message(delivery_info, metadata, payload) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 75
def process_message(delivery_info, metadata, payload)
  begin
    logger.info "#{log_prefix} processing message #{metadata}"
    logger.debug "#{log_prefix} payload #{payload}"
    message = Emque::Consuming::Message.new(
      :original => payload
    )
    ::Emque::Consuming::Consumer.new.consume(:process, message)
    channel.ack(delivery_info.delivery_tag)
  rescue StandardError => exception
    if enable_delayed_message
      begin
        publish_to_delayed_message(delivery_info, metadata, payload)
      rescue
        channel.nack(delivery_info.delivery_tag)
      end
    else
      channel.nack(delivery_info.delivery_tag)
    end
  end
end
publish_to_delayed_message(delivery_info, metadata, payload) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 97
def publish_to_delayed_message(delivery_info, metadata, payload)
  headers = metadata[:headers] || {}
  headers["x-delay"] = 1000
  delayed_message_exchange.publish(payload, { :headers => headers })
  channel.ack(delivery_info.delivery_tag)
end