class RabbitmqClient::Publisher

Publisher class is responsible for publishing events to rabbitmq exhanges

Public Class Methods

new(**config) click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 8
def initialize(**config)
  @config = config
  @session_params = session_params
  @exchange_registry = @config.fetch(:exchange_registry, nil)
  @session_params.freeze
  @session_pool = create_connection_pool
  notify('publisher_created', @session_params)
end

Public Instance Methods

publish(data, options) click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 17
def publish(data, options)
  return nil unless @exchange_registry

  if async
    PublisherJob.perform_async(@exchange_registry,
                               @session_pool, data, options)
  else
    PublisherJob.new.perform(@exchange_registry,
                             @session_pool, data, options)
  end
end

Private Instance Methods

async() click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 31
def async
  @config.dig(:session_params, :async_publisher) || false
end
create_connection_pool() click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 58
def create_connection_pool
  pool_size = @session_params.fetch(:session_pool, 1)
  pool_timeout = @session_params.fetch(:session_pool_timeout, 5)
  ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
    Bunny.new(@config[:rabbitmq_url],
              { logger: RabbitmqClient.logger }.merge(@session_params))
  end
end
notify(event, payload = {}) click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 67
def notify(event, payload = {})
  ActiveSupport::Notifications.instrument(
    "#{event}.rabbitmq_client",
    payload
  )
end
overwritten_config?() click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 43
def overwritten_config?
  @config.dig(:session_params, :threaded) ||
    @config.dig(:session_params, :automatically_recover)
end
overwritten_config_notification() click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 35
def overwritten_config_notification
  return unless overwritten_config?

  notify('overriding_configs',
         threaded: false,
         automatically_recover: false)
end
session_params() click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 48
def session_params
  overwritten_config_notification
  @config.fetch(:session_params, {})
         .merge(threaded: false,
                automatically_recover: false,
                heartbeat: @config.dig(
                  :session_params, :heartbeat_publisher
                ) || 0)
end