class MultipleMan::Consumers::General
Attributes
queue[R]
subscribers[R]
topic[R]
Public Class Methods
new(subscribers:, queue:, topic:)
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 7 def initialize(subscribers:, queue:, topic:) self.subscribers = subscribers @topic = topic @queue = queue end
Public Instance Methods
listen()
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 13 def listen MultipleMan.logger.debug "Starting listeners." create_bindings queue.subscribe(block: true, manual_ack: true) do |delivery_info, meta_data, payload| MultipleMan.logger.debug "Processing message for #{delivery_info.routing_key}." message = JSON.parse(payload).with_indifferent_access receive(delivery_info, meta_data, message) end end
Private Instance Methods
create_bindings()
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 28 def create_bindings subscribers.values.each do |subscriber| MultipleMan.logger.info "Listening for #{subscriber.listen_to} with routing key #{subscriber.routing_key}." queue.bind(topic, routing_key: routing_key_for_subscriber(subscriber)) end end
dispatch_subscribers(delivery_info, meta_data, message)
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 50 def dispatch_subscribers(delivery_info, meta_data, message) method = operation(message, delivery_info.routing_key) subscribers.select { |k,s| k.match(delivery_info.routing_key) }.values.each do |s| tracer = MultipleMan.configuration.tracer.new(s) tracer.handle(delivery_info, meta_data, message, method) end end
operation(message, routing_key)
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 59 def operation(message, routing_key) message['operation'] || routing_key.split('.').last end
receive(delivery_info, meta_data, message)
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 35 def receive(delivery_info, meta_data, message) dispatch_subscribers(delivery_info, meta_data, message) queue.channel.acknowledge(delivery_info.delivery_tag, false) MultipleMan.logger.debug "Successfully processed! #{delivery_info.routing_key}" rescue => ex begin raise ConsumerError rescue => wrapped_ex MultipleMan.logger.debug "\t#{wrapped_ex.class} #{wrapped_ex.cause.message} \n#{wrapped_ex.cause.backtrace}" MultipleMan.error(wrapped_ex, reraise: false, payload: message, delivery_info: delivery_info) queue.channel.nack(delivery_info.delivery_tag) end end
routing_key_for_subscriber(subscriber)
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 63 def routing_key_for_subscriber(subscriber) subscriber.routing_key end
subscribers=(subscribers)
click to toggle source
# File lib/multiple_man/consumers/general.rb, line 67 def subscribers=(subscribers) @subscribers = subscribers.map { |s| key = routing_key_for_subscriber(s).gsub('.', '\.').gsub('#', '.*') [/^#{key}$/, s] }.to_h end