class Emque::Consuming::Adapters::RabbitMq::DelayedMessageWorker

Attributes

channel[RW]
delayed_message_exchange[RW]
queue[RW]

Public Class Methods

new(connection) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 18
def initialize(connection)
  self.channel = connection.create_channel

  if config.adapter.options[:prefetch]
    channel.prefetch(config.adapter.options[:prefetch])
  end

  self.delayed_message_exchange = channel.exchange(
    "emque.#{config.app_name}.delayed_message",
    {
      :type => "x-delayed-message",
      :durable => true,
      :auto_delete => false,
      :arguments => {
        "x-delayed-type" => "direct",
      }
    }
  )

  self.queue = channel.queue(
    "emque.#{config.app_name}.delayed_message",
    :durable => config.adapter.options[:durable],
    :auto_delete => config.adapter.options[:auto_delete],
    :arguments => {
      "x-dead-letter-exchange" => "#{config.app_name}.error"
    }
  ).bind(delayed_message_exchange)
end

Public Instance Methods

actor_died(actor, reason) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 12
def actor_died(actor, reason)
  unless shutdown
    logger.error "#{log_prefix} actor_died - died: #{reason}"
  end
end
start() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 47
def start
  logger.info "#{log_prefix} starting..."
  queue.subscribe(:manual_ack => true, &method(:process_message))
  logger.debug "#{log_prefix} started"
end
stop() click to toggle source
Calls superclass method Emque::Consuming::Actor#stop
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 53
def stop
  logger.debug "#{log_prefix} stopping..."
  super do
    logger.debug "#{log_prefix} closing channel"
    channel.close
  end
  logger.debug "#{log_prefix} stopped"
end

Private Instance Methods

log_prefix() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 84
def log_prefix
  "RabbitMQ DelayedMessageWorker:"
end
process_message(delivery_info, metadata, payload) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 66
def process_message(delivery_info, metadata, payload)
  begin
    logger.info "#{log_prefix} processing message #{metadata}"
    logger.debug "#{log_prefix} payload #{payload}"
    message = Emque::Consuming::Message.new(
      :original => payload
    )
    ::Emque::Consuming::Consumer.new.consume(:process, message)
    channel.ack(delivery_info.delivery_tag)
  rescue StandardError => exception
    if retryable_errors.any? { |error| exception.class.to_s =~ /#{error}/ }
      retry_error(delivery_info, metadata, payload, exception)
    else
      channel.nack(delivery_info.delivery_tag)
    end
  end
end