class LaGear::Sneakers::Handlers::ExponentialBackoff
Public Class Methods
new(channel, _queue, opts)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 5 def initialize(channel, _queue, opts) @channel = channel @opts = opts exchange = @opts.fetch(:exchange) @handler_opts = @opts.fetch(:handler_opts, {}) retry_name = @handler_opts.fetch(:retryexchange, "#{exchange}.retry") error_name = @handler_opts.fetch(:errorexchange, "#{exchange}.error") @publish_channel = setup_publish_channel @retry_exchange = setup_retry(@publish_channel, retry_name, exchange) @error_exchange = setup_error(@publish_channel, error_name) @max_retries = @handler_opts.fetch(:max_retries, 5) @expiration = @handler_opts.fetch(:expiration, 1000) end
Public Instance Methods
acknowledge(delivery_info, _metadata, _msg)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 23 def acknowledge(delivery_info, _metadata, _msg) @channel.acknowledge(delivery_info.delivery_tag, false) end
error(hdr, props, msg, err)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 31 def error(hdr, props, msg, err) retry_or_error(hdr, props, msg, err.to_s) end
noop(_delivery_info, _metadata, _msg)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 39 def noop(_delivery_info, _metadata, _msg); end
reject(hdr, props, msg, _requeue = false)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 27 def reject(hdr, props, msg, _requeue = false) retry_or_error(hdr, props, msg, 'rejected') end
timeout(hdr, props, msg)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 35 def timeout(hdr, props, msg) error(hdr, props, msg, 'Timeout: Sneakers worker timedout.') end
Private Instance Methods
get_expire_delay(failures = 0)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 109 def get_expire_delay(failures = 0) failures = failures.to_i + 1 @expiration * (2**failures) end
get_retries(headers)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 114 def get_retries(headers) headers ||= {} headers.fetch('sneakers-retries', 0).to_i end
logger()
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 105 def logger ::Sneakers.logger end
retry_or_error(hdr, props, msg, reason, _requeue=false)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 43 def retry_or_error(hdr, props, msg, reason, _requeue=false) retries = get_retries(props[:headers]) if retries >= @max_retries @error_exchange.publish( msg, routing_key: @handler_opts.fetch(:routing_key, hdr.routing_key), headers: { 'sneakers-error-reason' => reason || 'Doh! No reason given. :(' } ) else expire_delay = get_expire_delay(retries) @retry_exchange.publish(msg, routing_key: @handler_opts.fetch(:routing_key, hdr.routing_key), expiration: expire_delay, headers: { 'sneakers-retries' => retries + 1, 'sneakers-retry-reason' => reason || 'Doh! No reason given. :(' }) end @channel.acknowledge(hdr.delivery_tag, false) rescue => e logger.fatal "#{self} #{e}, hdr.routing_key #{hdr.routing_key}, props #{props}, msg #{msg}, reason #{reason}, handler_opts #{@handler_opts}, retries #{retries}" end
setup_error(publish_channel, error_name)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 81 def setup_error(publish_channel, error_name) error_exchange = publish_channel.exchange(error_name, type: 'topic', durable: 'true') error_queue = publish_channel.queue(error_name, durable: 'true') error_queue.bind(error_exchange, routing_key: '#') trace(error_queue, "#{self} error queue created.") error_exchange end
setup_publish_channel()
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 91 def setup_publish_channel return @channel unless @opts.to_hash.include?(:amqp_publish) publish_bunny = Bunny.new(@opts[:amqp_publish], vhost: @opts[:vhost], heartbeat: @opts[:heartbeat]) publish_bunny.start publish_channel = publish_bunny.create_channel publish_channel.prefetch(@opts[:prefetch]) logger.warn "#{self} publish endpoint used: #{@opts[:amqp_publish]}, vhost #{@opts[:vhost]}" publish_channel end
setup_retry(publish_channel, retry_name, exchange)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 67 def setup_retry(publish_channel, retry_name, exchange) retry_exchange = publish_channel.exchange(retry_name, type: 'topic', durable: 'true') retry_queue = publish_channel.queue(retry_name, durable: 'true', arguments: { :'x-dead-letter-exchange' => exchange, }) retry_queue.bind(retry_exchange, routing_key: '#') trace(retry_queue, "#{self} retry queue created.") retry_exchange end
trace(queue, msg)
click to toggle source
# File lib/la_gear/sneakers/exponential_backoff.rb, line 101 def trace(queue, msg) logger.debug "[#{Thread.current}][#{queue.name}][#{queue.options}] #{msg}" end