class Philotic::Connection
Attributes
connection[R]
connection_attempts[R]
logger[RW]
publisher[W]
subscriber[W]
Public Class Methods
new()
click to toggle source
# File lib/philotic/connection.rb, line 29 def initialize @connection_attempts = 0 end
Public Instance Methods
attempt_connection()
click to toggle source
# File lib/philotic/connection.rb, line 74 def attempt_connection @connection_attempts += 1 logger.warn { "Connecting to RabbitMQ: #{config.sanitized_rabbit_url}. Attempt #{connection_attempts} of #{config.connection_attempts}" } if connection_attempts > 1 @connection = Bunny.new(config.rabbit_url, connection_settings) @connection.start @connection_attempts = 0 end
bind_queue(queue, config)
click to toggle source
# File lib/philotic/connection.rb, line 139 def bind_queue(queue, config) queue_exchange = exchange_from_config(config) bindings = config[:bindings] bindings.each do |arguments| queue.bind(queue_exchange, {arguments: arguments}) logger.info { "Added binding to queue. queue: #{queue.name} binding: #{arguments}" } end logger.info { "Finished adding bindings to queue. queue: #{queue.name}" } end
channel()
click to toggle source
# File lib/philotic/connection.rb, line 102 def channel @channel ||= connection.create_channel end
close()
click to toggle source
# File lib/philotic/connection.rb, line 91 def close logger.warn { "closing connection to RabbitMQ: #{config.sanitized_rabbit_url}" } connection.close if connected? @channel = nil @exchange = nil end
config()
click to toggle source
# File lib/philotic/connection.rb, line 41 def config @config ||= Philotic::Config.new self end
connect!()
click to toggle source
# File lib/philotic/connection.rb, line 45 def connect! return if connected? start_connection! if connected? logger.info { "Connected to RabbitMQ: #{config.sanitized_rabbit_url}" } set_exchange_return_handler! true else logger.error { "Failed to connect to RabbitMQ: #{config.sanitized_rabbit_url}" } false end end
connected?()
click to toggle source
# File lib/philotic/connection.rb, line 98 def connected? connection && connection.connected? end
connection_settings()
click to toggle source
# File lib/philotic/connection.rb, line 83 def connection_settings { automatically_recover: config.automatically_recover, network_recovery_interval: config.network_recovery_interval, continuation_timeout: config.continuation_timeout, } end
exchange()
click to toggle source
# File lib/philotic/connection.rb, line 106 def exchange @exchange ||= channel.send(config.exchange_type, config.exchange_name, durable: true) end
exchange_from_config(config)
click to toggle source
# File lib/philotic/connection.rb, line 150 def exchange_from_config(config) config[:exchange] ? channel.send(self.config.exchange_type, config[:exchange], durable: true) : exchange end
initialize_named_queue!(queue_name, config)
click to toggle source
# File lib/philotic/connection.rb, line 116 def initialize_named_queue!(queue_name, config) raise RuntimeError.new 'Philotic.config.initialize_named_queues must be true to run Philotic.initialize_named_queue!' unless self.config.initialize_named_queues connect! queue_exists = connection.queue_exists? queue_name should_delete_queue = queue_exists && self.config.delete_existing_queues should_create_queue = !queue_exists || self.config.delete_existing_queues if should_delete_queue channel.queue(queue_name, passive: true).delete logger.info { "deleted old queue. queue: #{queue_name}" } end if should_create_queue config = config.deep_symbolize_keys queue = queue_from_config(queue_name, config) bind_queue(queue, config) else logger.warn { "Queue #{queue_name} not created; it already exists. self.config.delete_existing_queues must be true to override." } end end
publisher()
click to toggle source
# File lib/philotic/connection.rb, line 33 def publisher @publisher ||= Philotic::Publisher.new self end
queue_from_config(queue_name, config)
click to toggle source
# File lib/philotic/connection.rb, line 154 def queue_from_config(queue_name, config) queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS.dup queue_options.merge!(config[:options] || {}) channel.queue(queue_name, queue_options).tap do logger.info { "Created queue. queue:#{queue_name}" } end end
set_exchange_return_handler!()
click to toggle source
# File lib/philotic/connection.rb, line 110 def set_exchange_return_handler! exchange.on_return do |basic_return, metadata, payload| config.message_return_handler.call(basic_return, metadata, payload) end end
start_connection!()
click to toggle source
# File lib/philotic/connection.rb, line 60 def start_connection! begin attempt_connection rescue ::Bunny::TCPConnectionFailed => e if connection_attempts < config.connection_attempts retry else attempts = connection_attempts @connection_attempts = 0 raise TCPConnectionFailed.new "Failed to connect to RabbitMQ server after #{attempts} attempts", config.sanitized_rabbit_url end end end
subscriber()
click to toggle source
# File lib/philotic/connection.rb, line 37 def subscriber @subscriber ||= Philotic::Subscriber.new self end