class BunnyPublisher::Base
Based on Publisher of Sneakers github.com/jondot/sneakers/blob/ed620b642b447701be490666ee284cf7d60ccf22/lib/sneakers/publisher.rb
Constants
- RETRIABLE_ERRORS
A list of errors that can be fixed by a connection recovery
Attributes
channel[R]
connection[R]
exchange[R]
message[R]
message_options[R]
Public Class Methods
new(publish_connection: nil, connection: nil, exchange: nil, exchange_options: {}, **options)
click to toggle source
# File lib/bunny_publisher/base.rb, line 19 def initialize(publish_connection: nil, connection: nil, exchange: nil, exchange_options: {}, **options) @mutex = Mutex.new @exchange_name = exchange @exchange_options = exchange_options @options = options # Arguments are compatible with Sneakers::CONFIG and if connection given publisher will use it. # But using of same connection for publishing & consumers could cause problems. # https://www.cloudamqp.com/blog/2017-12-29-part1-rabbitmq-best-practice.html#separate-connections-for-publisher-and-consumer # Therefore, publish_connection allows to explicitly make publishers use different connection @connection = publish_connection || connection end
Public Instance Methods
close()
click to toggle source
# File lib/bunny_publisher/base.rb, line 49 def close connection&.close end
Also aliased as: stop
publish(message, message_options = {})
click to toggle source
# File lib/bunny_publisher/base.rb, line 33 def publish(message, message_options = {}) @mutex.synchronize do @message = message @message_options = message_options run_callbacks(:publish) do with_errors_handling do ensure_connection! exchange.publish(message, message_options.dup) # Bunny modifies message options end end ensure @message = @message_options = nil end end
Private Instance Methods
build_connection()
click to toggle source
# File lib/bunny_publisher/base.rb, line 119 def build_connection Bunny.new(@options[:amqp] || ENV['RABBITMQ_URL'], @options) end
build_exchange()
click to toggle source
# File lib/bunny_publisher/base.rb, line 123 def build_exchange return channel.default_exchange if @exchange_name.nil? || @exchange_name == '' channel.exchange(@exchange_name, @exchange_options) end
connection_can_recover?()
click to toggle source
# File lib/bunny_publisher/base.rb, line 95 def connection_can_recover? connection.automatically_recover? && connection.should_retry_recovery? end
connection_open?()
click to toggle source
# File lib/bunny_publisher/base.rb, line 99 def connection_open? # Do not trust Bunny::Session#open? - it uses :connected & :connecting statuses as "open", # while connection is not actually ready to work. connection.instance_variable_get(:@status_mutex).synchronize do connection.status == :open && connection.transport.open? end end
ensure_connection!()
click to toggle source
# File lib/bunny_publisher/base.rb, line 61 def ensure_connection! @connection ||= build_connection connection.start if should_start_connection? wait_until_connection_ready @channel ||= connection.create_channel @exchange ||= build_exchange end
recovery_timeout()
click to toggle source
# File lib/bunny_publisher/base.rb, line 107 def recovery_timeout # 60 seconds is a default heartbeat timeout https://www.rabbitmq.com/heartbeats.html#heartbeats-timeout # Recommended timeout is 5-20 https://www.rabbitmq.com/heartbeats.html#false-positives heartbeat_timeout = [ (connection.respond_to?(:heartbeat_timeout) ? connection.heartbeat_timeout : connection.heartbeat) || 60, 5 ].max # Using x2 of heartbeat timeout to get Bunny chance to detect connection failure & try to recover it heartbeat_timeout * 2 end
reset_exchange!()
click to toggle source
# File lib/bunny_publisher/base.rb, line 72 def reset_exchange! ensure_connection! @channel = connection.create_channel @exchange = build_exchange end
should_start_connection?()
click to toggle source
# File lib/bunny_publisher/base.rb, line 90 def should_start_connection? connection.status == :not_connected || # Lazy connection initialization connection.closed? end
wait_until_connection_ready()
click to toggle source
# File lib/bunny_publisher/base.rb, line 78 def wait_until_connection_ready Timeout.timeout(recovery_timeout * 2) do loop do return if connection_open? || !connection.automatically_recover? sleep 0.01 end end rescue Timeout::Error # Connection recovery takes too long, let the next interaction fail with error then. end
with_errors_handling() { || ... }
click to toggle source
# File lib/bunny_publisher/base.rb, line 129 def with_errors_handling yield rescue Bunny::ChannelAlreadyClosed reset_exchange! retry rescue *RETRIABLE_ERRORS => e raise unless connection_can_recover? logger.warn { e.inspect } retry end