class Freddy::Consumers::TapIntoConsumer

Public Class Methods

consume(**attrs, &block) click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 6
def self.consume(**attrs, &block)
  new(**attrs).consume(&block)
end
new(thread_pool:, patterns:, channel:, options:) click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 10
def initialize(thread_pool:, patterns:, channel:, options:)
  @consume_thread_pool = thread_pool
  @patterns = patterns
  @channel = channel
  @options = options

  raise 'Do not use durable queues without specifying a group' if durable? && !group
end

Public Instance Methods

consume(&block) click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 19
def consume(&block)
  queue = create_queue

  consumer = queue.subscribe(manual_ack: true) do |delivery|
    process_message(queue, delivery, &block)
  end

  ResponderHandler.new(consumer, @consume_thread_pool)
end

Private Instance Methods

create_queue() click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 31
def create_queue
  topic_exchange = @channel.topic(exchange_name)

  queue =
    if group
      @channel.queue("groups.#{group}", durable: durable?)
    else
      @channel.queue('', exclusive: true)
    end

  @patterns.each do |pattern|
    queue.bind(topic_exchange, routing_key: pattern)
  end

  queue
end
durable?() click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 72
def durable?
  @options.fetch(:durable, false)
end
exchange_name() click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 80
def exchange_name
  @options.fetch(:exchange_name, Freddy::FREDDY_TOPIC_EXCHANGE_NAME)
end
group() click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 68
def group
  @options.fetch(:group, nil)
end
on_exception() click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 76
def on_exception
  @options.fetch(:on_exception, :ack)
end
process_message(_queue, delivery) { |payload, routing_key, timestamp| ... } click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 48
def process_message(_queue, delivery)
  @consume_thread_pool.post do
    delivery.in_span do
      yield delivery.payload, delivery.routing_key, delivery.timestamp
      @channel.acknowledge(delivery.tag)
    end
  rescue StandardError
    case on_exception
    when :reject
      @channel.reject(delivery.tag)
    when :requeue
      @channel.reject(delivery.tag, true)
    else
      @channel.acknowledge(delivery.tag)
    end

    raise
  end
end