class PikaQue::Handlers::DLXRetryHandler

Constants

DEFAULT_RETRY_OPTS

Create following exchanges with retry_prefix = pika-que and default backoff pika-que-retry-60 pika-que-retry-requeue pika-que-error and following queue pika-que-retry-60 (with default backoff)

retry_mode can be either :exp or :const

Public Class Methods

new(opts = {}) click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 21
def initialize(opts = {})
  @opts = PikaQue.config.merge(DEFAULT_RETRY_OPTS).merge(opts)
  @connection = opts[:connection] || PikaQue.connection
  @channel = @connection.create_channel
  @error_monitor = Monitor.new

  @max_retries = @opts[:retry_max_times]
  @backoff_multiplier = @opts[:retry_backoff_multiplier] # This is for example/dev/test

  @retry_ex_name = @opts[:retry_dlx] || "#{@opts[:retry_prefix]}-retry-#{@opts[:retry_backoff]}"
  @retry_name = "#{@opts[:retry_prefix]}-retry"
  @requeue_name = "#{@opts[:retry_prefix]}-retry-requeue"
  @error_name = "#{@opts[:retry_prefix]}-error"

  @queue_name_lookup = {}

  setup_exchanges
  setup_queues
end

Public Instance Methods

bind_queue(queue, routing_key) click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 41
def bind_queue(queue, routing_key)
  # bind the worker queue to requeue exchange
  @queue_name_lookup[routing_key] = queue.name
  queue.bind(@requeue_exchange, :routing_key => routing_key)
end
close() click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 64
def close
  @channel.close unless @channel.closed?
end
handle(response_code, channel, delivery_info, metadata, msg, error = nil) click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 47
def handle(response_code, channel, delivery_info, metadata, msg, error = nil)
  case response_code
  when :ack
    PikaQue.logger.debug "DLXRetryHandler acknowledge <#{msg}>"
    channel.acknowledge(delivery_info.delivery_tag, false)
  when :reject
    PikaQue.logger.debug "DLXRetryHandler reject retry <#{msg}>"
    handle_retry(channel, delivery_info, metadata, msg, :reject)
  when :requeue
    PikaQue.logger.debug "DLXRetryHandler requeue <#{msg}>"
    channel.reject(delivery_info.delivery_tag, true)
  else
    PikaQue.logger.debug "DLXRetryHandler error retry <#{msg}>"
    handle_retry(channel, delivery_info, metadata, msg, error)
  end
end

Private Instance Methods

exchange_durable?() click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 98
def exchange_durable?
  @opts.fetch(:exchange_options, {}).fetch(:durable, false)
end
failure_count(headers, delivery_info) click to toggle source

Uses the x-death header to determine the number of failures this job has seen in the past. This does not count the current failure. So for instance, the first time the job fails, this will return 0, the second time, 1, etc. @param headers [Hash] Hash of headers that Rabbit delivers as part of

the message

@return [Integer] Count of number of failures.

# File lib/pika_que/handlers/dlx_retry_handler.rb, line 125
def failure_count(headers, delivery_info)
  if headers.nil? || headers['x-death'].nil?
    0
  else
    queue_name = headers['x-first-death-queue'] || @queue_name_lookup[delivery_info.routing_key]
    x_death_array = headers['x-death'].select do |x_death|
      x_death['queue'] == queue_name
    end
    if x_death_array.count > 0 && x_death_array.first['count']
      # Newer versions of RabbitMQ return headers with a count key
      x_death_array.inject(0) {|sum, x_death| sum + x_death['count']}
    else
      # Older versions return a separate x-death header for each failure
      x_death_array.count
    end
  end
end
handle_retry(channel, delivery_info, metadata, msg, reason) click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 102
def handle_retry(channel, delivery_info, metadata, msg, reason)
  # +1 for the current attempt
  num_attempts = failure_count(metadata[:headers], delivery_info) + 1
  if num_attempts <= @max_retries
    # Publish message to the x-dead-letter-exchange (ie. retry exchange)
    PikaQue.logger.info "DLXRetryHandler msg=retrying, count=#{num_attempts}, headers=#{metadata[:headers] || {}}"
    
    channel.reject(delivery_info.delivery_tag, false)
  else
    PikaQue.logger.info "DLXRetryHandler msg=failing, retried_count=#{num_attempts - 1}, headers=#{metadata[:headers]}, reason=#{reason}"

    publish_error(delivery_info, msg)
    channel.acknowledge(delivery_info.delivery_tag, false)
  end
end
publish_error(delivery_info, msg) click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 143
def publish_error(delivery_info, msg)
  @error_monitor.synchronize do
    @error_exchange.publish(msg, routing_key: delivery_info.routing_key)
  end
end
queue_durable?() click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 94
def queue_durable?
  @opts.fetch(:queue_options, {}).fetch(:durable, false)
end
setup_exchanges() click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 70
def setup_exchanges
  @retry_exchange, @error_exchange, @requeue_exchange = [@retry_ex_name, @error_name, @requeue_name].map do |name|
    PikaQue.logger.debug "DLXRetryHandler creating exchange=#{name}"
    @channel.exchange(name, :type => 'topic', :durable => exchange_durable?)
  end
end
setup_queues() click to toggle source
# File lib/pika_que/handlers/dlx_retry_handler.rb, line 77
def setup_queues
  bo = @opts[:retry_backoff]

  PikaQue.logger.debug "DLXRetryHandler creating queue=#{@retry_name}-#{bo} x-dead-letter-exchange=#{@requeue_name}"
  backoff_queue = @channel.queue("#{@retry_name}-#{bo}",
                                :durable => queue_durable?,
                                :arguments => {
                                  :'x-dead-letter-exchange' => @requeue_name,
                                  :'x-message-ttl' => bo * @backoff_multiplier
                                })
  backoff_queue.bind(@retry_exchange, :routing_key => '#')

  PikaQue.logger.debug "DLXRetryHandler creating queue=#{@error_name}"
  @error_queue = @channel.queue(@error_name, :durable => queue_durable?)
  @error_queue.bind(@error_exchange, :routing_key => '#')
end