class Telemetry::AMQP::Base
Public Class Methods
new(auto_start: false, **opts)
click to toggle source
# File lib/telemetry/amqp/base.rb, line 13 def initialize(auto_start: false, **opts) @opts = opts connect! if auto_start && !nodes.nil? end
Public Instance Methods
channel()
click to toggle source
# File lib/telemetry/amqp/base.rb, line 58 def channel if !@channel_thread.nil? && !@channel_thread.value.nil? && @channel_thread.value.open? return @channel_thread.value end @channel_thread = Concurrent::ThreadLocalVar.new(nil) if @channel_thread.nil? @channel_thread.value = create_channel end
connect!()
click to toggle source
# File lib/telemetry/amqp/base.rb, line 36 def connect! @session = Concurrent::AtomicReference.new( ::Bunny.new( hosts: nodes || ['localhost:5672'], username: username, password: password, vhost: vhost, port: port, connection_name: connection_name, log_level: ::Logger::WARN, logger: Telemetry::Logger, automatically_recover: opts[:automatically_recover] || true, verify_peer: opts[:verify_peer] || true, tls: use_ssl? ) ) @session.value.start @channel_thread = Concurrent::ThreadLocalVar.new(nil) set_amqp_block_helpers @session.value end
create_channel(consumer_pool_size: 1, abort_on_exception: false, timeout: 30)
click to toggle source
# File lib/telemetry/amqp/base.rb, line 67 def create_channel(consumer_pool_size: 1, abort_on_exception: false, timeout: 30) session.create_channel(nil, consumer_pool_size, abort_on_exception, timeout) end
session()
click to toggle source
# File lib/telemetry/amqp/base.rb, line 18 def session connect! if @session.nil? || !@session.respond_to?(:value) @session.value end
set_amqp_block_helpers()
click to toggle source
# File lib/telemetry/amqp/base.rb, line 24 def set_amqp_block_helpers session.on_blocked { puts 'Telemetry::AMQP is being blocked by RabbitMQ!' } if session.respond_to? :on_blocked if session.respond_to? :on_unblocked session.on_unblocked { puts 'Telemetry::AMQP is no longer being blocked by RabbitMQ' } end if session.respond_to? :after_recovery_completed # rubocop:disable Style/GuardClause session.after_recovery_completed { puts 'Telemetry::AMQP has completed recovery' } end end