class Freddy::Consumers::RespondToConsumer

Public Class Methods

consume(**attrs, &block) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 6
def self.consume(**attrs, &block)
  new(**attrs).consume(&block)
end
new(thread_pool:, destination:, channel:, handler_adapter_factory:) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 10
def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:)
  @consume_thread_pool = thread_pool
  @destination = destination
  @channel = channel
  @handler_adapter_factory = handler_adapter_factory
end

Public Instance Methods

consume() { |payload, msg_handler| ... } click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 17
def consume
  consumer = consume_from_destination do |delivery|
    adapter = @handler_adapter_factory.for(delivery)

    msg_handler = MessageHandler.new(adapter, delivery)
    yield(delivery.payload, msg_handler)
  end

  ResponderHandler.new(consumer, @consume_thread_pool)
end

Private Instance Methods

consume_from_destination(&block) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 30
def consume_from_destination(&block)
  @channel.queue(@destination).subscribe(manual_ack: true) do |delivery|
    process_message(delivery, &block)
  end
end
process_message(delivery) { |delivery| ... } click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 36
def process_message(delivery)
  @consume_thread_pool.post do
    delivery.in_span do
      yield(delivery)
    end
  ensure
    @channel.acknowledge(delivery.tag, false)
  end
end