class LaGear::Sneakers::Handlers::ExponentialBackoff

Public Class Methods

new(channel, _queue, opts) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 5
def initialize(channel, _queue, opts)
  @channel = channel
  @opts = opts

  exchange = @opts.fetch(:exchange)
  @handler_opts = @opts.fetch(:handler_opts, {})

  retry_name = @handler_opts.fetch(:retryexchange, "#{exchange}.retry")
  error_name = @handler_opts.fetch(:errorexchange, "#{exchange}.error")

  @publish_channel = setup_publish_channel
  @retry_exchange = setup_retry(@publish_channel, retry_name, exchange)
  @error_exchange = setup_error(@publish_channel, error_name)

  @max_retries = @handler_opts.fetch(:max_retries, 5)
  @expiration = @handler_opts.fetch(:expiration, 1000)
end

Public Instance Methods

acknowledge(delivery_info, _metadata, _msg) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 23
def acknowledge(delivery_info, _metadata, _msg)
  @channel.acknowledge(delivery_info.delivery_tag, false)
end
error(hdr, props, msg, err) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 31
def error(hdr, props, msg, err)
  retry_or_error(hdr, props, msg, err.to_s)
end
noop(_delivery_info, _metadata, _msg) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 39
def noop(_delivery_info, _metadata, _msg); end
reject(hdr, props, msg, _requeue = false) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 27
def reject(hdr, props, msg, _requeue = false)
  retry_or_error(hdr, props, msg, 'rejected')
end
timeout(hdr, props, msg) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 35
def timeout(hdr, props, msg)
  error(hdr, props, msg, 'Timeout: Sneakers worker timedout.')
end

Private Instance Methods

get_expire_delay(failures = 0) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 109
def get_expire_delay(failures = 0)
  failures = failures.to_i + 1
  @expiration * (2**failures)
end
get_retries(headers) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 114
def get_retries(headers)
  headers ||= {}
  headers.fetch('sneakers-retries', 0).to_i
end
logger() click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 105
def logger
  ::Sneakers.logger
end
retry_or_error(hdr, props, msg, reason, _requeue=false) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 43
def retry_or_error(hdr, props, msg, reason, _requeue=false)
  retries = get_retries(props[:headers])
  if retries >= @max_retries
    @error_exchange.publish(
      msg,
      routing_key: @handler_opts.fetch(:routing_key, hdr.routing_key),
      headers: { 'sneakers-error-reason' => reason || 'Doh! No reason given. :(' }
    )
  else
    expire_delay = get_expire_delay(retries)

    @retry_exchange.publish(msg,
                            routing_key: @handler_opts.fetch(:routing_key, hdr.routing_key),
                            expiration: expire_delay,
                            headers: {
                              'sneakers-retries' => retries + 1,
                              'sneakers-retry-reason' => reason || 'Doh! No reason given. :('
                            })
  end
  @channel.acknowledge(hdr.delivery_tag, false)
rescue => e
  logger.fatal "#{self} #{e}, hdr.routing_key #{hdr.routing_key}, props #{props}, msg #{msg}, reason #{reason}, handler_opts #{@handler_opts}, retries #{retries}"
end
setup_error(publish_channel, error_name) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 81
def setup_error(publish_channel, error_name)
  error_exchange = publish_channel.exchange(error_name,
                                            type: 'topic',
                                            durable: 'true')
  error_queue = publish_channel.queue(error_name, durable: 'true')
  error_queue.bind(error_exchange, routing_key: '#')
  trace(error_queue, "#{self} error queue created.")
  error_exchange
end
setup_publish_channel() click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 91
def setup_publish_channel
  return @channel unless @opts.to_hash.include?(:amqp_publish)
  publish_bunny = Bunny.new(@opts[:amqp_publish], vhost: @opts[:vhost], heartbeat: @opts[:heartbeat])
  publish_bunny.start
  publish_channel = publish_bunny.create_channel
  publish_channel.prefetch(@opts[:prefetch])
  logger.warn "#{self} publish endpoint used: #{@opts[:amqp_publish]}, vhost #{@opts[:vhost]}"
  publish_channel
end
setup_retry(publish_channel, retry_name, exchange) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 67
def setup_retry(publish_channel, retry_name, exchange)
  retry_exchange = publish_channel.exchange(retry_name,
                                            type: 'topic',
                                            durable: 'true')
  retry_queue = publish_channel.queue(retry_name,
                                      durable: 'true',
                                      arguments: {
                                        :'x-dead-letter-exchange' => exchange,
                                      })
  retry_queue.bind(retry_exchange, routing_key: '#')
  trace(retry_queue, "#{self} retry queue created.")
  retry_exchange
end
trace(queue, msg) click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 101
def trace(queue, msg)
  logger.debug "[#{Thread.current}][#{queue.name}][#{queue.options}] #{msg}"
end