class Freddy::Consumers::RespondToConsumer
Public Class Methods
consume(**attrs, &block)
click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 6 def self.consume(**attrs, &block) new(**attrs).consume(&block) end
new(thread_pool:, destination:, channel:, handler_adapter_factory:)
click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 10 def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:) @consume_thread_pool = thread_pool @destination = destination @channel = channel @handler_adapter_factory = handler_adapter_factory end
Public Instance Methods
consume() { |payload, msg_handler| ... }
click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 17 def consume consumer = consume_from_destination do |delivery| adapter = @handler_adapter_factory.for(delivery) msg_handler = MessageHandler.new(adapter, delivery) yield(delivery.payload, msg_handler) end ResponderHandler.new(consumer, @consume_thread_pool) end
Private Instance Methods
consume_from_destination(&block)
click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 30 def consume_from_destination(&block) @channel.queue(@destination).subscribe(manual_ack: true) do |delivery| process_message(delivery, &block) end end
process_message(delivery) { |delivery| ... }
click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 36 def process_message(delivery) @consume_thread_pool.post do delivery.in_span do yield(delivery) end ensure @channel.acknowledge(delivery.tag, false) end end