class MessageDriver::Adapters::BunnyAdapter::BunnyContext

Attributes

channel[RW]
subscription[RW]

Public Class Methods

new(adapter) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 302
def initialize(adapter)
  super(adapter)
  @is_transactional = false
  @rollback_only = false
  @need_channel_reset = false
  @in_transaction = false
  @require_commit = false
end

Public Instance Methods

args_to_message(delivery_info, properties, payload, destination) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 524
def args_to_message(delivery_info, properties, payload, destination)
  Message.new(self, delivery_info, properties, payload, destination)
end
ensure_channel() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 487
def ensure_channel
  @channel = adapter.connection.create_channel if @channel.nil?
  reset_channel if @need_channel_reset
  @channel
end
ensure_transactional_channel() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 493
def ensure_transactional_channel
  ensure_channel
  make_channel_transactional
end
handle_ack_message(message, _options = {}) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 408
def handle_ack_message(message, _options = {})
  with_channel(true) do |ch|
    ch.ack(message.delivery_tag)
  end
end
handle_begin_transaction(options = {}) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 328
def handle_begin_transaction(options = {})
  if in_transaction?
    raise MessageDriver::TransactionError,
          "you can't begin another transaction, you are already in one!"
  end
  @in_transaction = true
  @in_confirms_transaction = true if options[:type] == :confirm_and_wait
end
handle_commit_transaction(_ = nil) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 337
def handle_commit_transaction(_ = nil)
  if !in_transaction? && !@require_commit
    raise MessageDriver::TransactionError,
          "you can't finish the transaction unless you already in one!"
  end
  begin
    if @in_confirms_transaction
      @channel.wait_for_confirms unless @rollback_only || @channel.nil? || !@channel.using_publisher_confirms?
    elsif is_transactional? && valid? && !@need_channel_reset && @require_commit
      handle_errors do
        if @rollback_only
          @channel.tx_rollback
        else
          @channel.tx_commit
        end
      end
    end
  ensure
    @rollback_only = false
    @in_transaction = false
    @in_confirms_transaction = false
    @require_commit = false
  end
end
handle_consumer_count(destination) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 439
def handle_consumer_count(destination)
  if destination.respond_to?(:handle_consumer_count)
    destination.handle_consumer_count
  else
    super
  end
end
handle_create_destination(name, dest_options = {}, message_props = {}) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 311
def handle_create_destination(name, dest_options = {}, message_props = {})
  dest =  case type = dest_options.delete(:type)
          when :exchange
            ExchangeDestination.new(adapter, name, dest_options, message_props)
          when :queue, nil
            QueueDestination.new(adapter, name, dest_options, message_props)
          else
            raise MessageDriver::Error, "invalid destination type #{type}"
          end
  dest.after_initialize(self)
  dest
end
handle_errors() { || ... } click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 467
def handle_errors
  yield
rescue Bunny::ChannelLevelException => e
  @need_channel_reset = true
  @rollback_only = true if in_transaction?
  if e.is_a? Bunny::NotFound
    raise MessageDriver::QueueNotFound.new(e.to_s, e)
  else
    raise MessageDriver::WrappedError.new(e.to_s, e)
  end
rescue Bunny::ChannelAlreadyClosed => e
  @need_channel_reset = true
  @rollback_only = true if in_transaction?
  raise MessageDriver::WrappedError.new(e.to_s, e)
rescue *NETWORK_ERRORS => e
  @need_channel_reset = true
  @rollback_only = true if in_transaction?
  raise MessageDriver::ConnectionError.new(e.to_s, e)
end
handle_message_count(destination) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 431
def handle_message_count(destination)
  if destination.respond_to?(:handle_message_count)
    destination.handle_message_count
  else
    super
  end
end
handle_nack_message(message, options = {}) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 414
def handle_nack_message(message, options = {})
  requeue = options.fetch(:requeue, true)
  with_channel(true) do |ch|
    ch.reject(message.delivery_tag, requeue)
  end
end
handle_pop_message(destination, options = {}) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 389
def handle_pop_message(destination, options = {})
  raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination

  with_channel(false) do |ch|
    queue = ch.queue(destination.name, passive: true)

    message = queue.pop(adapter.ack_key => options.fetch(:client_ack, false))
    if message.nil? || message[0].nil?
      nil
    else
      args_to_message(*message, destination)
    end
  end
end
handle_publish(destination, body, headers = {}, properties = {}) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 376
def handle_publish(destination, body, headers = {}, properties = {})
  body, exchange, routing_key, props = *destination.publish_params(body, headers, properties)
  confirm = props.delete(:confirm)
  confirm = false if confirm.nil?
  with_channel(true) do |ch|
    if confirm == true
      ch.confirm_select unless ch.using_publisher_confirms?
    end
    ch.basic_publish(body, exchange, routing_key, props)
    ch.wait_for_confirms if confirm == true
  end
end
handle_rollback_transaction(_ = nil) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 362
def handle_rollback_transaction(_ = nil)
  @rollback_only = true
  commit_transaction
end
handle_subscribe(destination, options = {}, &consumer) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 425
def handle_subscribe(destination, options = {}, &consumer)
  sub = Subscription.new(adapter, destination, consumer, options)
  sub.start
  sub
end
in_transaction?() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 372
def in_transaction?
  @in_transaction
end
invalidate(in_unsubscribe = false) click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 447
def invalidate(in_unsubscribe = false)
  super()
  unless @subscription.nil? || in_unsubscribe
    begin
      @subscription.unsubscribe if adapter.connection.open?
    rescue => e
      logger.debug "error trying to end subscription\n#{exception_to_str(e)}"
    end
  end
  unless @channel.nil?
    begin
      @channel.close if @channel.open? && adapter.connection.open?
    rescue => e
      logger.debug "error trying to close channel\n#{exception_to_str(e)}"
    ensure
      begin @channel.maybe_kill_consumer_work_pool! rescue nil; end
    end
  end
end
is_transactional?()
Alias for: transactional?
make_channel_transactional() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 498
def make_channel_transactional
  unless is_transactional?
    @channel.tx_select
    @is_transactional = true
  end
end
supports_client_acks?() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 404
def supports_client_acks?
  true
end
supports_subscriptions?() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 421
def supports_subscriptions?
  true
end
supports_transactions?() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 324
def supports_transactions?
  true
end
transactional?() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 367
def transactional?
  @is_transactional
end
Also aliased as: is_transactional?
with_channel(require_commit = true) { |channel| ... } click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 505
def with_channel(require_commit = true)
  raise MessageDriver::TransactionRollbackOnly if @rollback_only
  raise MessageDriver::Error, 'this adapter context is not valid!' unless valid?
  ensure_channel
  @require_commit ||= require_commit
  if in_transaction?
    if @in_confirms_transaction
      @channel.confirm_select unless @channel.using_publisher_confirmations?
    else
      make_channel_transactional
    end
  end
  handle_errors do
    result = yield @channel
    commit_transaction if require_commit && is_transactional? && !in_transaction?
    result
  end
end

Private Instance Methods

reset_channel() click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 530
def reset_channel
  unless @channel.open?
    @channel = adapter.connection.create_channel
    @is_transactional = false
    @rollback_only = true if in_transaction?
    @require_commit
  end
  @need_channel_reset = false
end