class MessageDriver::Adapters::BunnyAdapter::Subscription
Attributes
error_handler[R]
sub_ctx[R]
Public Instance Methods
start()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 146 def start unless destination.is_a? QueueDestination raise MessageDriver::Error, 'subscriptions are only supported with QueueDestinations' end @sub_ctx = adapter.new_subscription_context(self) @error_handler = options[:error_handler] @message_handler = case options.delete(:ack) when :auto, nil AutoAckHandler.new(self) when :manual ManualAckHandler.new(self) when :transactional TransactionalAckHandler.new(self) else raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" end start_subscription end
unsubscribe()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 166 def unsubscribe unless @bunny_consumer.nil? @bunny_consumer.cancel @bunny_consumer = nil end unless @sub_ctx.nil? @sub_ctx.invalidate(true) @sub_ctx = nil end end
Private Instance Methods
start_subscription()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 241 def start_subscription @sub_ctx.with_channel do |ch| queue = destination.bunny_queue(@sub_ctx.channel) ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size sub_opts = options.merge(adapter.ack_key => true) @bunny_consumer = queue.subscribe(sub_opts) do |delivery_info, properties, payload| adapter.broker.client.with_adapter_context(@sub_ctx) do @message_handler.call(delivery_info, properties, payload) end end end end