class AdvancedSneakersActiveJob::Handler
Handler
puts error details to message header and reenqueues job with delay
Public Instance Methods
error(delivery_info, properties, message, error)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 6 def error(delivery_info, properties, message, error) params = properties.to_h params[:headers] = patch_headers(params[:headers] || {}, delivery_info, error) params[:routing_key] = delivery_info.routing_key AdvancedSneakersActiveJob.delayed_publisher.publish(message, params) acknowledge(delivery_info, properties, message) end
Private Instance Methods
build_death_row(queue, exchange, routing_key)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 48 def build_death_row(queue, exchange, routing_key) { 'count' => 1, 'reason' => 'rejected', 'queue' => queue, 'time' => Time.now, 'exchange' => exchange, 'routing-keys' => [routing_key] } end
calculate_delay(headers, delivery_info)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 70 def calculate_delay(headers, delivery_info) death_count = death_header(headers, queue_name(delivery_info)).fetch('count') AdvancedSneakersActiveJob.config.retry_delay_proc.call(death_count) end
death_header(headers, queue_name)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 80 def death_header(headers, queue_name) headers.fetch('x-death').detect { |death| death.fetch('queue') == queue_name } end
patch_headers(headers, delivery_info, error)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 18 def patch_headers(headers, delivery_info, error) queue = queue_name(delivery_info) exchange = delivery_info.exchange routing_key = delivery_info.routing_key track_error_in_headers(headers, error) track_death_in_headers(headers, queue, exchange, routing_key) set_delay_in_headers(headers, delivery_info) headers end
queue_name(delivery_info)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 76 def queue_name(delivery_info) delivery_info.consumer.queue.name end
set_delay_in_headers(headers, delivery_info)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 44 def set_delay_in_headers(headers, delivery_info) headers['delay'] = calculate_delay(headers, delivery_info) end
track_death_in_headers(headers, queue, exchange, routing_key)
click to toggle source
Headers are patched to mimic behavior of “nack” and DLX
# File lib/advanced_sneakers_activejob/handler.rb, line 31 def track_death_in_headers(headers, queue, exchange, routing_key) headers['x-first-death-exchange'] ||= exchange headers['x-first-death-queue'] ||= queue headers['x-first-death-reason'] ||= 'rejected' headers['x-death'] ||= [] if (death = death_header(headers, queue)) death['count'] += 1 else headers['x-death'] << build_death_row(queue, exchange, routing_key) end end
track_error_in_headers(headers, error)
click to toggle source
# File lib/advanced_sneakers_activejob/handler.rb, line 59 def track_error_in_headers(headers, error) details = if error.respond_to?(:full_message) # ruby 2.5+ error.full_message else ([error.message] + error.backtrace).join("\n") end headers['x-last-error-name'] = error.class.name headers['x-last-error-details'] = Base64.encode64(ActiveSupport::Gzip.compress(details)) end