class MessageDriver::Adapters::BunnyAdapter::Subscription

Attributes

error_handler[R]
sub_ctx[R]

Public Instance Methods

start() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 146
def start
  unless destination.is_a? QueueDestination
    raise MessageDriver::Error,
          'subscriptions are only supported with QueueDestinations'
  end
  @sub_ctx = adapter.new_subscription_context(self)
  @error_handler = options[:error_handler]
  @message_handler =  case options.delete(:ack)
                      when :auto, nil
                        AutoAckHandler.new(self)
                      when :manual
                        ManualAckHandler.new(self)
                      when :transactional
                        TransactionalAckHandler.new(self)
                      else
                        raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
                      end
  start_subscription
end
unsubscribe() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 166
def unsubscribe
  unless @bunny_consumer.nil?
    @bunny_consumer.cancel
    @bunny_consumer = nil
  end
  unless @sub_ctx.nil?
    @sub_ctx.invalidate(true)
    @sub_ctx = nil
  end
end

Private Instance Methods

start_subscription() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 241
def start_subscription
  @sub_ctx.with_channel do |ch|
    queue = destination.bunny_queue(@sub_ctx.channel)
    ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size
    sub_opts = options.merge(adapter.ack_key => true)
    @bunny_consumer = queue.subscribe(sub_opts) do |delivery_info, properties, payload|
      adapter.broker.client.with_adapter_context(@sub_ctx) do
        @message_handler.call(delivery_info, properties, payload)
      end
    end
  end
end