class AdvancedSneakersActiveJob::DelayedPublisher
This publisher relies on TTL to keep messages in a queue. When TTL is reached, messages go to another exchange (see dlx_exchange_name
param).
Attributes
dlx_exchange_name[R]
Public Class Methods
new(exchange:, **options)
click to toggle source
Calls superclass method
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 19 def initialize(exchange:, **options) super(**options.merge(exchange: [exchange, 'delayed'].join('-'), exchange_options: { type: 'headers', durable: true })) @dlx_exchange_name = exchange end
Private Instance Methods
declare_republish_queue()
click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 33 def declare_republish_queue queue_name = delayed_queue_name(delay: delay) queue_arguments = { 'x-queue-mode' => 'lazy', # tell RabbitMQ not to use RAM for this queue as it won't be consumed 'x-message-ttl' => delay * 1000, # make messages die after requested time 'x-dead-letter-exchange' => dlx_exchange_name # dead messages go to original exchange and then routed to proper queues } logger.debug { "Creating delayed queue [#{queue_name}]" } channel.queue(queue_name, durable: true, arguments: queue_arguments) end
declare_republish_queue_binding(queue)
click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 51 def declare_republish_queue_binding(queue) queue.bind(exchange, arguments: { delay: delay }) end
delay()
click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 47 def delay message_options.dig(:headers, 'delay') end
delayed_queue_name(delay:)
click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 55 def delayed_queue_name(delay:) [ ::ActiveJob::Base.queue_name_prefix, [config_delayed_queue_prefix, delay].join(':') ].compact.join(::ActiveJob::Base.queue_name_delimiter) end
log_message()
click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 27 def log_message logger.debug do "Publishing <#{message}> to [#{@exchange_name}] with routing_key [#{message_options[:routing_key]}] and delay [#{delay}]" end end