class Smith::Messaging::Requeue
Public Class Methods
new(message, metadata, opts={})
click to toggle source
# File lib/smith/messaging/requeue.rb, line 7 def initialize(message, metadata, opts={}) @message = message @metadata = metadata @queue = opts[:queue] @exchange = opts[:exchange] @count = opts[:count] || 3 @delay = opts[:delay] || 5 @strategy = opts[:strategy] || :linear @on_requeue = opts[:on_requeue] || ->(count, total_count, cumulative_delay) { logger.info { "Requeuing (#{@strategy}) message on queue: #{@queue.name}, count: #{count} of #{total_count}." } } @on_requeue_limit = opts[:on_requeue_limit] || ->(message, count, total_count, cumulative_delay) { logger.info { "Not attempting any more requeues, requeue limit reached: #{total_count} for queue: #{@queue.name}, cummulative delay: #{cumulative_delay}s." } } end
Public Instance Methods
requeue()
click to toggle source
# File lib/smith/messaging/requeue.rb, line 26 def requeue requeue_with_strategy do opts = @queue.opts.clone.tap do |o| o.delete(:queue) o.delete(:exchange) o[:headers] = increment_requeue_count o[:routing_key] = @queue.name o[:type] = @metadata.type end logger.verbose { "Requeuing to: #{@queue.name} [options]: #{opts}" } logger.verbose { "Requeuing to: #{@queue.name} [message]: #{@message.to_hash}" } @exchange.publish(@message, opts) end end
Private Instance Methods
current_requeue_number()
click to toggle source
# File lib/smith/messaging/requeue.rb, line 46 def current_requeue_number @metadata.headers['requeue'] || 0 end
exponential_no_initial_delay_strategy(delay)
click to toggle source
# File lib/smith/messaging/requeue.rb, line 73 def exponential_no_initial_delay_strategy(delay) delay * (2 ** current_requeue_number - 1) end
exponential_strategy(delay)
click to toggle source
# File lib/smith/messaging/requeue.rb, line 77 def exponential_strategy(delay) delay * (2 ** current_requeue_number) end
increment_requeue_count()
click to toggle source
# File lib/smith/messaging/requeue.rb, line 50 def increment_requeue_count @metadata.headers.tap do |m| m['requeue'] = (m['requeue']) ? m['requeue'] + 1 : 1 end end
linear_strategy(delay)
click to toggle source
# File lib/smith/messaging/requeue.rb, line 81 def linear_strategy(delay) delay * (current_requeue_number + 1) end
requeue_with_strategy(&block)
click to toggle source
# File lib/smith/messaging/requeue.rb, line 56 def requeue_with_strategy(&block) if current_requeue_number < @count method = "#{@strategy}_strategy".to_sym if respond_to?(method, true) cumulative_delay = send(method, @delay) @on_requeue.call(current_requeue_number + 1, @count, @delay * current_requeue_number) EM.add_timer(cumulative_delay) do block.call(cumulative_delay, current_requeue_number + 1) end else raise RuntimeError, "Unknown requeue strategy. #{method}" end else @on_requeue_limit.call(@message, current_requeue_number + 1, @count, @delay * current_requeue_number) end end