module ActionSubscriber::MessageRetry

Constants

SCHEDULE

Public Class Methods

get_last_attempt_number(env) click to toggle source

Private Implementation

# File lib/action_subscriber/message_retry.rb, line 26
def self.get_last_attempt_number(env)
  attempt_header = env.headers.fetch("as-attempt", "1")
  attempt_header.to_i
end
redeliver_message_with_backoff(env, backoff_schedule = SCHEDULE) click to toggle source
# File lib/action_subscriber/message_retry.rb, line 15
def self.redeliver_message_with_backoff(env, backoff_schedule = SCHEDULE)
  next_attempt = get_last_attempt_number(env) + 1
  ttl = backoff_schedule[next_attempt]
  return unless ttl
  retry_queue_name = "#{env.queue}.retry_#{ttl}"
  with_exchange(env, ttl, retry_queue_name) do |exchange|
    exchange.publish(env.encoded_payload, retry_options(env, next_attempt, retry_queue_name))
  end
end
retry_headers(env, attempt) click to toggle source
# File lib/action_subscriber/message_retry.rb, line 31
def self.retry_headers(env, attempt)
  env.headers.reject do |key, val|
    key == "x-death"
  end.merge({
    "as-attempt" => attempt.to_s,
    "x-dead-letter-routing-key" => env.queue,
  })
end
retry_options(env, attempt, retry_queue_name) click to toggle source
# File lib/action_subscriber/message_retry.rb, line 40
def self.retry_options(env, attempt, retry_queue_name)
  {
    :content_type => env.content_type,
    :routing_key => retry_queue_name,
    :headers => retry_headers(env, attempt),
  }
end
with_exchange(env, ttl, retry_queue_name) { |exchange| ... } click to toggle source
# File lib/action_subscriber/message_retry.rb, line 48
def self.with_exchange(env, ttl, retry_queue_name)
  channel = env.channel
  begin
    channel.confirm_select
    # an empty string is the default exchange [see bunny docs](http://rubybunny.info/articles/exchanges.html#default_exchange)
    exchange = channel.topic("")
    queue = channel.queue(retry_queue_name, :arguments => {"x-dead-letter-exchange" => "", "x-message-ttl" => ttl, "x-dead-letter-routing-key" => env.queue})
    yield(exchange)
    channel.wait_for_confirms
  end
end