class MultipleMan::Consumers::Transitional

Attributes

queue[R]
subscription[R]
topic[R]

Public Class Methods

new(subscription:, queue:, topic:) click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 9
def initialize(subscription:, queue:, topic:)
  @subscription = subscription
  @topic = topic
  @queue = queue
end

Public Instance Methods

handle_error(ex, delivery_info) click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 33
def handle_error(ex, delivery_info)
  MultipleMan.logger.error "   Error - #{ex.message}\n\n#{ex.backtrace}"
  MultipleMan.error(ex, reraise: false)

  # Requeue the message
  queue.channel.nack(delivery_info.delivery_tag)
end
listen() click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 15
def listen
  MultipleMan.logger.info "Listening for #{subscription.listen_to} with routing key #{routing_key}."
  queue.unbind(topic, routing_key: routing_key).subscribe(manual_ack: true) do |delivery_info, _, payload|
    process_message(delivery_info, payload)
  end
end
operation(delivery_info, payload) click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 41
def operation(delivery_info, payload)
  payload['operation'] || delivery_info.routing_key.split(".").last
end
process_message(delivery_info, payload) click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 22
def process_message(delivery_info, payload)
  MultipleMan.logger.debug "Processing message for #{delivery_info.routing_key}."

  payload = JSON.parse(payload).with_indifferent_access
  subscription.send(operation(delivery_info, payload), payload)
  MultipleMan.logger.debug "   Successfully processed!"
  queue.channel.acknowledge(delivery_info.delivery_tag, false)
rescue => ex
  raise MultipleMan::ConsumerError rescue handle_error($!, delivery_info)
end
routing_key() click to toggle source
# File lib/multiple_man/consumers/transitional.rb, line 45
def routing_key
  subscription.routing_key
end