class BunnyPublisher::Base

Based on Publisher of Sneakers github.com/jondot/sneakers/blob/ed620b642b447701be490666ee284cf7d60ccf22/lib/sneakers/publisher.rb

Constants

RETRIABLE_ERRORS

A list of errors that can be fixed by a connection recovery

Attributes

channel[R]
connection[R]
exchange[R]
message[R]
message_options[R]

Public Class Methods

new(publish_connection: nil, connection: nil, exchange: nil, exchange_options: {}, **options) click to toggle source
# File lib/bunny_publisher/base.rb, line 19
def initialize(publish_connection: nil, connection: nil, exchange: nil, exchange_options: {}, **options)
  @mutex = Mutex.new

  @exchange_name = exchange
  @exchange_options = exchange_options
  @options = options

  # Arguments are compatible with Sneakers::CONFIG and if connection given publisher will use it.
  # But using of same connection for publishing & consumers could cause problems.
  # https://www.cloudamqp.com/blog/2017-12-29-part1-rabbitmq-best-practice.html#separate-connections-for-publisher-and-consumer
  # Therefore, publish_connection allows to explicitly make publishers use different connection
  @connection = publish_connection || connection
end

Public Instance Methods

close() click to toggle source
# File lib/bunny_publisher/base.rb, line 49
def close
  connection&.close
end
Also aliased as: stop
publish(message, message_options = {}) click to toggle source
# File lib/bunny_publisher/base.rb, line 33
def publish(message, message_options = {})
  @mutex.synchronize do
    @message = message
    @message_options = message_options

    run_callbacks(:publish) do
      with_errors_handling do
        ensure_connection!
        exchange.publish(message, message_options.dup) # Bunny modifies message options
      end
    end
  ensure
    @message = @message_options = nil
  end
end
stop()
Alias for: close

Private Instance Methods

build_connection() click to toggle source
# File lib/bunny_publisher/base.rb, line 119
def build_connection
  Bunny.new(@options[:amqp] || ENV['RABBITMQ_URL'], @options)
end
build_exchange() click to toggle source
# File lib/bunny_publisher/base.rb, line 123
def build_exchange
  return channel.default_exchange if @exchange_name.nil? || @exchange_name == ''

  channel.exchange(@exchange_name, @exchange_options)
end
connection_can_recover?() click to toggle source
# File lib/bunny_publisher/base.rb, line 95
def connection_can_recover?
  connection.automatically_recover? && connection.should_retry_recovery?
end
connection_open?() click to toggle source
# File lib/bunny_publisher/base.rb, line 99
def connection_open?
  # Do not trust Bunny::Session#open? - it uses :connected & :connecting statuses as "open",
  # while connection is not actually ready to work.
  connection.instance_variable_get(:@status_mutex).synchronize do
    connection.status == :open && connection.transport.open?
  end
end
ensure_connection!() click to toggle source
# File lib/bunny_publisher/base.rb, line 61
def ensure_connection!
  @connection ||= build_connection

  connection.start if should_start_connection?

  wait_until_connection_ready

  @channel ||= connection.create_channel
  @exchange ||= build_exchange
end
recovery_timeout() click to toggle source
# File lib/bunny_publisher/base.rb, line 107
def recovery_timeout
  # 60 seconds is a default heartbeat timeout https://www.rabbitmq.com/heartbeats.html#heartbeats-timeout
  # Recommended timeout is 5-20 https://www.rabbitmq.com/heartbeats.html#false-positives
  heartbeat_timeout = [
    (connection.respond_to?(:heartbeat_timeout) ? connection.heartbeat_timeout : connection.heartbeat) || 60,
    5
  ].max

  # Using x2 of heartbeat timeout to get Bunny chance to detect connection failure & try to recover it
  heartbeat_timeout * 2
end
reset_exchange!() click to toggle source
# File lib/bunny_publisher/base.rb, line 72
def reset_exchange!
  ensure_connection!
  @channel = connection.create_channel
  @exchange = build_exchange
end
should_start_connection?() click to toggle source
# File lib/bunny_publisher/base.rb, line 90
def should_start_connection?
  connection.status == :not_connected || # Lazy connection initialization
    connection.closed?
end
wait_until_connection_ready() click to toggle source
# File lib/bunny_publisher/base.rb, line 78
def wait_until_connection_ready
  Timeout.timeout(recovery_timeout * 2) do
    loop do
      return if connection_open? || !connection.automatically_recover?

      sleep 0.01
    end
  end
rescue Timeout::Error
  # Connection recovery takes too long, let the next interaction fail with error then.
end
with_errors_handling() { || ... } click to toggle source
# File lib/bunny_publisher/base.rb, line 129
def with_errors_handling
  yield
rescue Bunny::ChannelAlreadyClosed
  reset_exchange!
  retry
rescue *RETRIABLE_ERRORS => e
  raise unless connection_can_recover?

  logger.warn { e.inspect }
  retry
end