class PikaQue::Handlers::RetryHandler

Constants

DEFAULT_RETRY_OPTS

Create following exchanges with retry_prefix = pika-que pika-que-retry pika-que-retry-requeue pika-que-error and following queues pika-que-retry-60 (default for const mode) pika-que-retry-120 pika-que-retry-240 pika-que-retry-480 pika-que-retry-960

retry_mode can be either :exp or :const

Public Class Methods

backoff_periods(max_retries, backoff_base) click to toggle source

formula base X = 0, 15(2x), 30(3x), 45(4x), 60(5x), 120, 180, etc defaults to 0 (X + 15) * 2 ** (count + 1)

# File lib/pika_que/handlers/retry_handler.rb, line 77
def self.backoff_periods(max_retries, backoff_base)
  (1..max_retries).map{ |c| next_ttl(c, backoff_base) }
end
new(opts = {}) click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 27
def initialize(opts = {})
  @opts = PikaQue.config.merge(DEFAULT_RETRY_OPTS).merge(opts)
  @connection = opts[:connection] || PikaQue.connection
  @channel = @connection.create_channel
  @retry_monitor = Monitor.new
  @error_monitor = Monitor.new

  @max_retries = @opts[:retry_max_times]
  @backoff_base = @opts[:retry_backoff_base]
  @backoff_multiplier = @opts[:retry_backoff_multiplier] # This is for example/dev/test

  @retry_name = "#{@opts[:retry_prefix]}-retry"
  @requeue_name = "#{@opts[:retry_prefix]}-retry-requeue"
  @error_name = "#{@opts[:retry_prefix]}-error"

  setup_exchanges
  setup_queues
end
next_ttl(count, backoff_base) click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 81
def self.next_ttl(count, backoff_base)
  (backoff_base + 15) * 2 ** (count + 1)
end

Public Instance Methods

bind_queue(queue, routing_key) click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 46
def bind_queue(queue, routing_key)
  # bind the worker queue to requeue exchange
  queue.bind(@requeue_exchange, :routing_key => routing_key)
end
close() click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 68
def close
  @channel.close unless @channel.closed?
end
handle(response_code, channel, delivery_info, metadata, msg, error = nil) click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 51
def handle(response_code, channel, delivery_info, metadata, msg, error = nil)
  case response_code
  when :ack
    PikaQue.logger.debug "RetryHandler acknowledge <#{msg}>"
    channel.acknowledge(delivery_info.delivery_tag, false)
  when :reject
    PikaQue.logger.debug "RetryHandler reject retry <#{msg}>"
    handle_retry(channel, delivery_info, metadata, msg, :reject)
  when :requeue
    PikaQue.logger.debug "RetryHandler requeue <#{msg}>"
    channel.reject(delivery_info.delivery_tag, true)
  else
    PikaQue.logger.debug "RetryHandler error retry <#{msg}>"
    handle_retry(channel, delivery_info, metadata, msg, error)
  end
end

Private Instance Methods

exchange_durable?() click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 123
def exchange_durable?
  @opts.fetch(:exchange_options, {}).fetch(:durable, false)
end
failure_count(headers) click to toggle source

Uses the header to determine the number of failures this job has seen in the past. This does not count the current failure. So for instance, the first time the job fails, this will return 0, the second time, 1, etc. @param headers [Hash] Hash of headers that Rabbit delivers as part of

the message

@return [Integer] Count of number of failures.

# File lib/pika_que/handlers/retry_handler.rb, line 157
def failure_count(headers)
  if headers.nil? || headers['count'].nil?
    0
  else
    headers['count']
  end
end
handle_retry(channel, delivery_info, metadata, msg, reason) click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 127
def handle_retry(channel, delivery_info, metadata, msg, reason)
  # +1 for the current attempt
  num_attempts = failure_count(metadata[:headers]) + 1
  if num_attempts <= @max_retries
    # Publish message to the x-dead-letter-exchange (ie. retry exchange)
    PikaQue.logger.info "RetryHandler msg=retrying, count=#{num_attempts}, headers=#{metadata[:headers] || {}}"
    
    if @opts[:retry_mode] == :exp
      backoff_ttl = RetryHandler.next_ttl(num_attempts, @backoff_base)
    else
      backoff_ttl = @opts[:retry_const_backoff]
    end

    publish_retry(delivery_info, msg, { backoff: backoff_ttl, count: num_attempts })
    channel.acknowledge(delivery_info.delivery_tag, false)
  else
    PikaQue.logger.info "RetryHandler msg=failing, retried_count=#{num_attempts - 1}, headers=#{metadata[:headers]}, reason=#{reason}"

    publish_error(delivery_info, msg)
    channel.acknowledge(delivery_info.delivery_tag, false)
  end
end
publish_error(delivery_info, msg) click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 171
def publish_error(delivery_info, msg)
  @error_monitor.synchronize do
    @error_exchange.publish(msg, routing_key: delivery_info.routing_key)
  end
end
publish_retry(delivery_info, msg, headers) click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 165
def publish_retry(delivery_info, msg, headers)
  @retry_monitor.synchronize do
    @retry_exchange.publish(msg, routing_key: delivery_info.routing_key, headers: headers)
  end
end
queue_durable?() click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 119
def queue_durable?
  @opts.fetch(:queue_options, {}).fetch(:durable, false)
end
setup_exchanges() click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 87
def setup_exchanges
  PikaQue.logger.debug "RetryHandler creating exchange=#{@retry_name}"
  @retry_exchange = @channel.exchange(@retry_name, :type => 'headers', :durable => exchange_durable?)
  @error_exchange, @requeue_exchange = [@error_name, @requeue_name].map do |name|
    PikaQue.logger.debug "RetryHandler creating exchange=#{name}"
    @channel.exchange(name, :type => 'topic', :durable => exchange_durable?)
  end
end
setup_queues() click to toggle source
# File lib/pika_que/handlers/retry_handler.rb, line 96
def setup_queues
  if @opts[:retry_mode] == :const
    backoffs = [@opts[:retry_const_backoff]]
  else
    backoffs = RetryHandler.backoff_periods(@max_retries, @backoff_base)
  end

  backoffs.each do |bo|
    PikaQue.logger.debug "RetryHandler creating queue=#{@retry_name}-#{bo} x-dead-letter-exchange=#{@requeue_name}"
    backoff_queue = @channel.queue("#{@retry_name}-#{bo}",
                                  :durable => queue_durable?,
                                  :arguments => {
                                    :'x-dead-letter-exchange' => @requeue_name,
                                    :'x-message-ttl' => bo * @backoff_multiplier
                                  })
    backoff_queue.bind(@retry_exchange, :arguments => { :backoff => bo })
  end

  PikaQue.logger.debug "RetryHandler creating queue=#{@error_name}"
  @error_queue = @channel.queue(@error_name, :durable => queue_durable?)
  @error_queue.bind(@error_exchange, :routing_key => '#')
end