class MultipleMan::Consumers::Transitional
Attributes
queue[R]
subscription[R]
topic[R]
Public Class Methods
new(subscription:, queue:, topic:)
click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 9 def initialize(subscription:, queue:, topic:) @subscription = subscription @topic = topic @queue = queue end
Public Instance Methods
handle_error(ex, delivery_info)
click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 33 def handle_error(ex, delivery_info) MultipleMan.logger.error " Error - #{ex.message}\n\n#{ex.backtrace}" MultipleMan.error(ex, reraise: false) # Requeue the message queue.channel.nack(delivery_info.delivery_tag) end
listen()
click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 15 def listen MultipleMan.logger.info "Listening for #{subscription.listen_to} with routing key #{routing_key}." queue.unbind(topic, routing_key: routing_key).subscribe(manual_ack: true) do |delivery_info, _, payload| process_message(delivery_info, payload) end end
operation(delivery_info, payload)
click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 41 def operation(delivery_info, payload) payload['operation'] || delivery_info.routing_key.split(".").last end
process_message(delivery_info, payload)
click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 22 def process_message(delivery_info, payload) MultipleMan.logger.debug "Processing message for #{delivery_info.routing_key}." payload = JSON.parse(payload).with_indifferent_access subscription.send(operation(delivery_info, payload), payload) MultipleMan.logger.debug " Successfully processed!" queue.channel.acknowledge(delivery_info.delivery_tag, false) rescue => ex raise MultipleMan::ConsumerError rescue handle_error($!, delivery_info) end
routing_key()
click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 45 def routing_key subscription.routing_key end