class RabbitmqClient::Publisher
Publisher
class is responsible for publishing events to rabbitmq exhanges
Public Class Methods
new(**config)
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 8 def initialize(**config) @config = config @session_params = session_params @exchange_registry = @config.fetch(:exchange_registry, nil) @session_params.freeze @session_pool = create_connection_pool notify('publisher_created', @session_params) end
Public Instance Methods
publish(data, options)
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 17 def publish(data, options) return nil unless @exchange_registry if async PublisherJob.perform_async(@exchange_registry, @session_pool, data, options) else PublisherJob.new.perform(@exchange_registry, @session_pool, data, options) end end
Private Instance Methods
async()
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 31 def async @config.dig(:session_params, :async_publisher) || false end
create_connection_pool()
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 58 def create_connection_pool pool_size = @session_params.fetch(:session_pool, 1) pool_timeout = @session_params.fetch(:session_pool_timeout, 5) ConnectionPool.new(size: pool_size, timeout: pool_timeout) do Bunny.new(@config[:rabbitmq_url], { logger: RabbitmqClient.logger }.merge(@session_params)) end end
notify(event, payload = {})
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 67 def notify(event, payload = {}) ActiveSupport::Notifications.instrument( "#{event}.rabbitmq_client", payload ) end
overwritten_config?()
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 43 def overwritten_config? @config.dig(:session_params, :threaded) || @config.dig(:session_params, :automatically_recover) end
overwritten_config_notification()
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 35 def overwritten_config_notification return unless overwritten_config? notify('overriding_configs', threaded: false, automatically_recover: false) end
session_params()
click to toggle source
# File lib/rabbitmq_client/publisher.rb, line 48 def session_params overwritten_config_notification @config.fetch(:session_params, {}) .merge(threaded: false, automatically_recover: false, heartbeat: @config.dig( :session_params, :heartbeat_publisher ) || 0) end