module BunnyPublisher::Mandatory
Enforces mandatory option for message publishing. Catches returned message if they are not routed. Creates queue/binding before re-publishing the same message again. This publisher DUPLICATES the connection for re-publishing messages!
Attributes
queue_name[R]
queue_options[R]
returned_messages[R]
Public Class Methods
new(queue: nil, queue_options: {}, timeout_at_exit: 5, **options)
click to toggle source
Calls superclass method
# File lib/bunny_publisher/mandatory.rb, line 41 def initialize(queue: nil, queue_options: {}, timeout_at_exit: 5, **options) super(**options) @queue_name = queue @queue_options = queue_options @returned_messages = ::Queue.new # ruby queue, not Bunny's one at_exit { wait_for_unrouted_messages_processing(timeout: timeout_at_exit) } end
Public Instance Methods
declare_republish_queue()
click to toggle source
# File lib/bunny_publisher/mandatory.rb, line 55 def declare_republish_queue name = queue_name || message_options[:routing_key] ensure_can_create_queue!(name) channel.queue(name, queue_options) end
declare_republish_queue_binding(queue)
click to toggle source
# File lib/bunny_publisher/mandatory.rb, line 63 def declare_republish_queue_binding(queue) routing_key = message_options[:routing_key] || queue_name queue.bind(exchange, routing_key: routing_key) end
publish(message, options = {})
click to toggle source
Calls superclass method
# File lib/bunny_publisher/mandatory.rb, line 51 def publish(message, options = {}) super(message, options.merge(mandatory: true)) end
Private Instance Methods
configure_exchange_to_process_returns()
click to toggle source
# File lib/bunny_publisher/mandatory.rb, line 84 def configure_exchange_to_process_returns case (callback = self.class.on_message_return_callback) when nil exchange.on_return { |*attrs| on_message_return(*attrs) } when Proc exchange.on_return { |*attrs| callback.call(*attrs) } when Symbol exchange.on_return { |*attrs| send(callback, *attrs) } end @on_return_set = true end
ensure_can_create_queue!(name)
click to toggle source
# File lib/bunny_publisher/mandatory.rb, line 164 def ensure_can_create_queue!(name) return if name.present? raise BunnyPublisher::CannotCreateQueue, message: message, message_options: message_options end
ensure_connection!()
click to toggle source
Calls superclass method
# File lib/bunny_publisher/mandatory.rb, line 73 def ensure_connection! super configure_exchange_to_process_returns unless @on_return_set end
on_message_return(return_info, properties, message)
click to toggle source
`on_return` is called within a frameset of amqp connection. Any interaction within the same connection leads to error. This is why we process the returned message in a separate thread. github.com/ruby-amqp/bunny/blob/7fb05abf36637557f75a69790be78f9cc1cea807/lib/bunny/session.rb#L683
# File lib/bunny_publisher/mandatory.rb, line 101 def on_message_return(return_info, properties, message) message_options = properties.to_h.merge(routing_key: return_info.routing_key).compact if return_info.reply_text == 'NO_ROUTE' returned_messages << [message, message_options] Thread.new { process_returned_message }.tap do |thread| thread.abort_on_exception = false thread.report_on_exception = true end else # Do not raise error here! # The best we can do here is to log to STDERR warn 'BunnyPublisher::UnsupportedReplyText: '\ 'Broker has returned the message with reply_text other than NO_ROUTE '\ "#{[return_info, properties, message]}" end end
process_returned_message()
click to toggle source
# File lib/bunny_publisher/mandatory.rb, line 120 def process_returned_message @mutex.synchronize do @unrouted_message_processing = true @message, @message_options = returned_messages.pop run_callbacks(:republish) do with_errors_handling do ensure_connection! setup_queue_for_republish exchange.publish(message, message_options.merge(mandatory: true)) end end ensure @message = @message_options = nil @unrouted_message_processing = false end end
reset_exchange!()
click to toggle source
Calls superclass method
# File lib/bunny_publisher/mandatory.rb, line 79 def reset_exchange! super configure_exchange_to_process_returns end
setup_queue_for_republish()
click to toggle source
# File lib/bunny_publisher/mandatory.rb, line 138 def setup_queue_for_republish queue = declare_republish_queue # default exchange already has bindings with queues, but routing key is required if exchange.name == '' message_options[:routing_key] = queue.name else declare_republish_queue_binding(queue) end channel.deregister_queue(queue) # we are not going to work with this queue in this channel end
wait_for_unrouted_messages_processing(timeout:)
click to toggle source
TODO: introduce more reliable way to wait for handling of unrouted messages at exit
# File lib/bunny_publisher/mandatory.rb, line 152 def wait_for_unrouted_messages_processing(timeout:) sleep(0.05) # gives exchange some time to receive retuned message return unless @unrouted_message_processing logger.warn { "Waiting up to #{timeout} seconds for unrouted messages handling" } Timeout.timeout(timeout) { sleep 0.01 while @unrouted_message_processing } rescue Timeout::Error logger.warn { 'Some unrouted messages are lost on process exit!' } end