class AdvancedSneakersActiveJob::DelayedPublisher

This publisher relies on TTL to keep messages in a queue. When TTL is reached, messages go to another exchange (see dlx_exchange_name param).

Attributes

dlx_exchange_name[R]

Public Class Methods

new(exchange:, **options) click to toggle source
Calls superclass method
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 19
def initialize(exchange:, **options)
  super(**options.merge(exchange: [exchange, 'delayed'].join('-'), exchange_options: { type: 'headers', durable: true }))

  @dlx_exchange_name = exchange
end

Private Instance Methods

declare_republish_queue() click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 33
def declare_republish_queue
  queue_name = delayed_queue_name(delay: delay)

  queue_arguments = {
    'x-queue-mode' => 'lazy', # tell RabbitMQ not to use RAM for this queue as it won't be consumed
    'x-message-ttl' => delay * 1000, # make messages die after requested time
    'x-dead-letter-exchange' => dlx_exchange_name # dead messages go to original exchange and then routed to proper queues
  }

  logger.debug { "Creating delayed queue [#{queue_name}]" }

  channel.queue(queue_name, durable: true, arguments: queue_arguments)
end
declare_republish_queue_binding(queue) click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 51
def declare_republish_queue_binding(queue)
  queue.bind(exchange, arguments: { delay: delay })
end
delay() click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 47
def delay
  message_options.dig(:headers, 'delay')
end
delayed_queue_name(delay:) click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 55
def delayed_queue_name(delay:)
  [
    ::ActiveJob::Base.queue_name_prefix,
    [config_delayed_queue_prefix, delay].join(':')
  ].compact.join(::ActiveJob::Base.queue_name_delimiter)
end
log_message() click to toggle source
# File lib/advanced_sneakers_activejob/delayed_publisher.rb, line 27
def log_message
  logger.debug do
    "Publishing <#{message}> to [#{@exchange_name}] with routing_key [#{message_options[:routing_key]}] and delay [#{delay}]"
  end
end