class MultipleMan::Consumers::General

Attributes

queue[R]
subscribers[R]
topic[R]

Public Class Methods

new(subscribers:, queue:, topic:) click to toggle source
# File lib/multiple_man/consumers/general.rb, line 7
def initialize(subscribers:, queue:, topic:)
  self.subscribers = subscribers
  @topic = topic
  @queue = queue
end

Public Instance Methods

listen() click to toggle source
# File lib/multiple_man/consumers/general.rb, line 13
def listen
  MultipleMan.logger.debug "Starting listeners."
  create_bindings

  queue.subscribe(block: true, manual_ack: true) do |delivery_info, meta_data, payload|
    MultipleMan.logger.debug "Processing message for #{delivery_info.routing_key}."
    message = JSON.parse(payload).with_indifferent_access
    receive(delivery_info, meta_data, message)
  end
end

Private Instance Methods

create_bindings() click to toggle source
# File lib/multiple_man/consumers/general.rb, line 28
def create_bindings
  subscribers.values.each do |subscriber|
    MultipleMan.logger.info "Listening for #{subscriber.listen_to} with routing key #{subscriber.routing_key}."
    queue.bind(topic, routing_key: routing_key_for_subscriber(subscriber))
  end
end
dispatch_subscribers(delivery_info, meta_data, message) click to toggle source
# File lib/multiple_man/consumers/general.rb, line 50
def dispatch_subscribers(delivery_info, meta_data, message)
  method = operation(message, delivery_info.routing_key)

  subscribers.select { |k,s| k.match(delivery_info.routing_key) }.values.each do |s|
    tracer = MultipleMan.configuration.tracer.new(s)
    tracer.handle(delivery_info, meta_data, message, method)
  end
end
operation(message, routing_key) click to toggle source
# File lib/multiple_man/consumers/general.rb, line 59
def operation(message, routing_key)
  message['operation'] || routing_key.split('.').last
end
receive(delivery_info, meta_data, message) click to toggle source
# File lib/multiple_man/consumers/general.rb, line 35
def receive(delivery_info, meta_data, message)
  dispatch_subscribers(delivery_info, meta_data, message)
  queue.channel.acknowledge(delivery_info.delivery_tag, false)

  MultipleMan.logger.debug "Successfully processed! #{delivery_info.routing_key}"
rescue => ex
  begin
    raise ConsumerError
  rescue => wrapped_ex
    MultipleMan.logger.debug "\t#{wrapped_ex.class} #{wrapped_ex.cause.message} \n#{wrapped_ex.cause.backtrace}"
    MultipleMan.error(wrapped_ex, reraise: false, payload: message, delivery_info: delivery_info)
    queue.channel.nack(delivery_info.delivery_tag)
  end
end
routing_key_for_subscriber(subscriber) click to toggle source
# File lib/multiple_man/consumers/general.rb, line 63
def routing_key_for_subscriber(subscriber)
  subscriber.routing_key
end
subscribers=(subscribers) click to toggle source
# File lib/multiple_man/consumers/general.rb, line 67
def subscribers=(subscribers)
  @subscribers = subscribers.map { |s|
    key = routing_key_for_subscriber(s).gsub('.', '\.').gsub('#', '.*')
    [/^#{key}$/, s]
  }.to_h
end