class AdvancedSneakersActiveJob::Handler

Handler puts error details to message header and reenqueues job with delay

Public Instance Methods

error(delivery_info, properties, message, error) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 6
def error(delivery_info, properties, message, error)
  params = properties.to_h
  params[:headers] = patch_headers(params[:headers] || {}, delivery_info, error)
  params[:routing_key] = delivery_info.routing_key

  AdvancedSneakersActiveJob.delayed_publisher.publish(message, params)

  acknowledge(delivery_info, properties, message)
end

Private Instance Methods

build_death_row(queue, exchange, routing_key) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 48
def build_death_row(queue, exchange, routing_key)
  {
    'count' => 1,
    'reason' => 'rejected',
    'queue' => queue,
    'time' => Time.now,
    'exchange' => exchange,
    'routing-keys' => [routing_key]
  }
end
calculate_delay(headers, delivery_info) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 70
def calculate_delay(headers, delivery_info)
  death_count = death_header(headers, queue_name(delivery_info)).fetch('count')

  AdvancedSneakersActiveJob.config.retry_delay_proc.call(death_count)
end
death_header(headers, queue_name) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 80
def death_header(headers, queue_name)
  headers.fetch('x-death').detect { |death| death.fetch('queue') == queue_name }
end
patch_headers(headers, delivery_info, error) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 18
def patch_headers(headers, delivery_info, error)
  queue = queue_name(delivery_info)
  exchange = delivery_info.exchange
  routing_key = delivery_info.routing_key

  track_error_in_headers(headers, error)
  track_death_in_headers(headers, queue, exchange, routing_key)
  set_delay_in_headers(headers, delivery_info)

  headers
end
queue_name(delivery_info) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 76
def queue_name(delivery_info)
  delivery_info.consumer.queue.name
end
set_delay_in_headers(headers, delivery_info) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 44
def set_delay_in_headers(headers, delivery_info)
  headers['delay'] = calculate_delay(headers, delivery_info)
end
track_death_in_headers(headers, queue, exchange, routing_key) click to toggle source

Headers are patched to mimic behavior of “nack” and DLX

# File lib/advanced_sneakers_activejob/handler.rb, line 31
def track_death_in_headers(headers, queue, exchange, routing_key)
  headers['x-first-death-exchange'] ||= exchange
  headers['x-first-death-queue'] ||= queue
  headers['x-first-death-reason'] ||= 'rejected'
  headers['x-death'] ||= []

  if (death = death_header(headers, queue))
    death['count'] += 1
  else
    headers['x-death'] << build_death_row(queue, exchange, routing_key)
  end
end
track_error_in_headers(headers, error) click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 59
def track_error_in_headers(headers, error)
  details = if error.respond_to?(:full_message) # ruby 2.5+
              error.full_message
            else
              ([error.message] + error.backtrace).join("\n")
            end

  headers['x-last-error-name'] = error.class.name
  headers['x-last-error-details'] = Base64.encode64(ActiveSupport::Gzip.compress(details))
end