class Pwwka::Receiver
Attributes
channel[R]
channel_connector[R]
queue_name[R]
routing_key[R]
topic_exchange[R]
Public Class Methods
new(queue_name, routing_key, prefetch: Pwwka.configuration.default_prefetch)
click to toggle source
# File lib/pwwka/receiver.rb, line 12 def initialize(queue_name, routing_key, prefetch: Pwwka.configuration.default_prefetch) @queue_name = queue_name @routing_key = routing_key @channel_connector = ChannelConnector.new(prefetch: prefetch, connection_name: "c: #{Pwwka.configuration.app_id} #{Pwwka.configuration.process_name}".strip) @channel = @channel_connector.channel @topic_exchange = @channel_connector.topic_exchange end
subscribe(handler_klass, queue_name, routing_key: "
click to toggle source
# File lib/pwwka/receiver.rb, line 20 def self.subscribe(handler_klass, queue_name, routing_key: "#.#", block: true, prefetch: Pwwka.configuration.default_prefetch, payload_parser: Pwwka.configuration.payload_parser) raise "#{handler_klass.name} must respond to `handle!`" unless handler_klass.respond_to?(:handle!) receiver = new(queue_name, routing_key, prefetch: prefetch) begin info "Receiving on #{queue_name}" receiver.topic_queue.subscribe(manual_ack: true, block: block) do |delivery_info, properties, payload| begin payload = payload_parser.(payload) handler_klass.handle!(delivery_info, properties, payload) receiver.ack(delivery_info.delivery_tag) logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key rescue => exception Pwwka::ErrorHandlers::Chain.new( Pwwka.configuration.error_handling_chain ).handle_error( handler_klass, receiver, queue_name, payload, delivery_info, exception) end end rescue Interrupt => _ # TODO: trap TERM within channel.work_pool info "Interrupting queue #{queue_name} subscriber safely" ensure receiver.channel_connector.connection_close end return receiver end
Public Instance Methods
ack(delivery_tag)
click to toggle source
# File lib/pwwka/receiver.rb, line 64 def ack(delivery_tag) channel.acknowledge(delivery_tag, false) end
drop_queue()
click to toggle source
# File lib/pwwka/receiver.rb, line 76 def drop_queue topic_queue.purge topic_queue.delete end
nack(delivery_tag)
click to toggle source
# File lib/pwwka/receiver.rb, line 68 def nack(delivery_tag) channel.nack(delivery_tag, false, false) end
nack_requeue(delivery_tag)
click to toggle source
# File lib/pwwka/receiver.rb, line 72 def nack_requeue(delivery_tag) channel.nack(delivery_tag, false, true) end
test_teardown()
click to toggle source
# File lib/pwwka/receiver.rb, line 81 def test_teardown drop_queue topic_exchange.delete channel_connector.connection_close end
topic_queue()
click to toggle source
# File lib/pwwka/receiver.rb, line 56 def topic_queue @topic_queue ||= begin queue = channel.queue(queue_name, durable: true, arguments: {}) routing_key.split(',').each { |k| queue.bind(topic_exchange, routing_key: k) } queue end end