class MessageDriver::Adapters::BunnyAdapter::BunnyContext
Attributes
channel[RW]
subscription[RW]
Public Class Methods
new(adapter)
click to toggle source
Calls superclass method
MessageDriver::Adapters::ContextBase::new
# File lib/message_driver/adapters/bunny_adapter.rb, line 302 def initialize(adapter) super(adapter) @is_transactional = false @rollback_only = false @need_channel_reset = false @in_transaction = false @require_commit = false end
Public Instance Methods
args_to_message(delivery_info, properties, payload, destination)
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 524 def args_to_message(delivery_info, properties, payload, destination) Message.new(self, delivery_info, properties, payload, destination) end
ensure_channel()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 487 def ensure_channel @channel = adapter.connection.create_channel if @channel.nil? reset_channel if @need_channel_reset @channel end
ensure_transactional_channel()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 493 def ensure_transactional_channel ensure_channel make_channel_transactional end
handle_ack_message(message, _options = {})
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 408 def handle_ack_message(message, _options = {}) with_channel(true) do |ch| ch.ack(message.delivery_tag) end end
handle_begin_transaction(options = {})
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 328 def handle_begin_transaction(options = {}) if in_transaction? raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" end @in_transaction = true @in_confirms_transaction = true if options[:type] == :confirm_and_wait end
handle_commit_transaction(_ = nil)
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 337 def handle_commit_transaction(_ = nil) if !in_transaction? && !@require_commit raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" end begin if @in_confirms_transaction @channel.wait_for_confirms unless @rollback_only || @channel.nil? || !@channel.using_publisher_confirms? elsif is_transactional? && valid? && !@need_channel_reset && @require_commit handle_errors do if @rollback_only @channel.tx_rollback else @channel.tx_commit end end end ensure @rollback_only = false @in_transaction = false @in_confirms_transaction = false @require_commit = false end end
handle_consumer_count(destination)
click to toggle source
Calls superclass method
MessageDriver::Adapters::ContextBase#handle_consumer_count
# File lib/message_driver/adapters/bunny_adapter.rb, line 439 def handle_consumer_count(destination) if destination.respond_to?(:handle_consumer_count) destination.handle_consumer_count else super end end
handle_create_destination(name, dest_options = {}, message_props = {})
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 311 def handle_create_destination(name, dest_options = {}, message_props = {}) dest = case type = dest_options.delete(:type) when :exchange ExchangeDestination.new(adapter, name, dest_options, message_props) when :queue, nil QueueDestination.new(adapter, name, dest_options, message_props) else raise MessageDriver::Error, "invalid destination type #{type}" end dest.after_initialize(self) dest end
handle_errors() { || ... }
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 467 def handle_errors yield rescue Bunny::ChannelLevelException => e @need_channel_reset = true @rollback_only = true if in_transaction? if e.is_a? Bunny::NotFound raise MessageDriver::QueueNotFound.new(e.to_s, e) else raise MessageDriver::WrappedError.new(e.to_s, e) end rescue Bunny::ChannelAlreadyClosed => e @need_channel_reset = true @rollback_only = true if in_transaction? raise MessageDriver::WrappedError.new(e.to_s, e) rescue *NETWORK_ERRORS => e @need_channel_reset = true @rollback_only = true if in_transaction? raise MessageDriver::ConnectionError.new(e.to_s, e) end
handle_message_count(destination)
click to toggle source
Calls superclass method
MessageDriver::Adapters::ContextBase#handle_message_count
# File lib/message_driver/adapters/bunny_adapter.rb, line 431 def handle_message_count(destination) if destination.respond_to?(:handle_message_count) destination.handle_message_count else super end end
handle_nack_message(message, options = {})
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 414 def handle_nack_message(message, options = {}) requeue = options.fetch(:requeue, true) with_channel(true) do |ch| ch.reject(message.delivery_tag, requeue) end end
handle_pop_message(destination, options = {})
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 389 def handle_pop_message(destination, options = {}) raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination with_channel(false) do |ch| queue = ch.queue(destination.name, passive: true) message = queue.pop(adapter.ack_key => options.fetch(:client_ack, false)) if message.nil? || message[0].nil? nil else args_to_message(*message, destination) end end end
handle_publish(destination, body, headers = {}, properties = {})
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 376 def handle_publish(destination, body, headers = {}, properties = {}) body, exchange, routing_key, props = *destination.publish_params(body, headers, properties) confirm = props.delete(:confirm) confirm = false if confirm.nil? with_channel(true) do |ch| if confirm == true ch.confirm_select unless ch.using_publisher_confirms? end ch.basic_publish(body, exchange, routing_key, props) ch.wait_for_confirms if confirm == true end end
handle_rollback_transaction(_ = nil)
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 362 def handle_rollback_transaction(_ = nil) @rollback_only = true commit_transaction end
handle_subscribe(destination, options = {}, &consumer)
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 425 def handle_subscribe(destination, options = {}, &consumer) sub = Subscription.new(adapter, destination, consumer, options) sub.start sub end
in_transaction?()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 372 def in_transaction? @in_transaction end
invalidate(in_unsubscribe = false)
click to toggle source
Calls superclass method
MessageDriver::Adapters::ContextBase#invalidate
# File lib/message_driver/adapters/bunny_adapter.rb, line 447 def invalidate(in_unsubscribe = false) super() unless @subscription.nil? || in_unsubscribe begin @subscription.unsubscribe if adapter.connection.open? rescue => e logger.debug "error trying to end subscription\n#{exception_to_str(e)}" end end unless @channel.nil? begin @channel.close if @channel.open? && adapter.connection.open? rescue => e logger.debug "error trying to close channel\n#{exception_to_str(e)}" ensure begin @channel.maybe_kill_consumer_work_pool! rescue nil; end end end end
make_channel_transactional()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 498 def make_channel_transactional unless is_transactional? @channel.tx_select @is_transactional = true end end
supports_client_acks?()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 404 def supports_client_acks? true end
supports_subscriptions?()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 421 def supports_subscriptions? true end
supports_transactions?()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 324 def supports_transactions? true end
transactional?()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 367 def transactional? @is_transactional end
Also aliased as: is_transactional?
with_channel(require_commit = true) { |channel| ... }
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 505 def with_channel(require_commit = true) raise MessageDriver::TransactionRollbackOnly if @rollback_only raise MessageDriver::Error, 'this adapter context is not valid!' unless valid? ensure_channel @require_commit ||= require_commit if in_transaction? if @in_confirms_transaction @channel.confirm_select unless @channel.using_publisher_confirmations? else make_channel_transactional end end handle_errors do result = yield @channel commit_transaction if require_commit && is_transactional? && !in_transaction? result end end
Private Instance Methods
reset_channel()
click to toggle source
# File lib/message_driver/adapters/bunny_adapter.rb, line 530 def reset_channel unless @channel.open? @channel = adapter.connection.create_channel @is_transactional = false @rollback_only = true if in_transaction? @require_commit end @need_channel_reset = false end