class Vx::Common::AMQP::Session
Constants
- CHANNEL_KEY
Attributes
conn[R]
Public Class Methods
resume()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 26 def resume @shutdown = false end
shutdown()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 18 def shutdown @shutdown = true end
shutdown?()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 22 def shutdown? @shutdown == true end
Public Instance Methods
channel()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 103 def channel assert_connection_is_open Thread.current[CHANNEL_KEY] || conn.default_channel end
close()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 31 def close if open? @@session_lock.synchronize do begin conn.close rescue Bunny::ChannelError => e warn e end while conn.status != :closed sleep 0.01 end @conn = nil end end end
config()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 136 def config Common::AMQP.config end
conn_info()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 123 def conn_info if conn "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}" end end
declare_exchange(name, options = nil)
click to toggle source
# File lib/vx/common/amqp/session.rb, line 84 def declare_exchange(name, options = nil) assert_connection_is_open options ||= {} name ||= config.default_exchange_name ch = options.delete(:channel) || channel type, opts = get_exchange_type_and_options options ch.exchange name, opts.merge(type: type) end
declare_queue(name, options = nil)
click to toggle source
# File lib/vx/common/amqp/session.rb, line 94 def declare_queue(name, options = nil) assert_connection_is_open options ||= {} ch = options.delete(:channel) || channel name, opts = get_queue_name_and_options(name, options) ch.queue name, opts end
open(options = {})
click to toggle source
# File lib/vx/common/amqp/session.rb, line 47 def open(options = {}) return self if open? @@session_lock.synchronize do self.class.resume @conn ||= Bunny.new( nil, # from ENV['RABBITMQ_URL'] heartbeat: :server, automatically_recover: false ) instrumentation = { info: conn_info, consumer: options[:consumer], consumer_id: options[:consumer_id] } instrument("start_connecting.consumer.amqp", instrumentation) instrument("connect.consumer.amqp", instrumentation) do unless conn.open? conn.start while conn.connecting? sleep 0.01 end end end end self end
open?()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 80 def open? conn && conn.open? && conn.status == :open end
server_name()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 129 def server_name if conn p = conn.server_properties || {} "#{p["product"]}/#{p["version"]}" end end
with_channel() { || ... }
click to toggle source
# File lib/vx/common/amqp/session.rb, line 109 def with_channel assert_connection_is_open old,new = nil begin old,new = Thread.current[CHANNEL_KEY], conn.create_channel Thread.current[CHANNEL_KEY] = new yield ensure Thread.current[CHANNEL_KEY] = old new.close if new && new.open? end end
Private Instance Methods
assert_connection_is_open()
click to toggle source
# File lib/vx/common/amqp/session.rb, line 153 def assert_connection_is_open open? || raise(ConnectionDoesNotExist.new "you need to run #{to_s}#open") end
get_exchange_type_and_options(options)
click to toggle source
# File lib/vx/common/amqp/session.rb, line 142 def get_exchange_type_and_options(options) options = config.default_exchange_options.merge(options || {}) type = options.delete(:type) || config.default_exchange_type [type, options] end
get_queue_name_and_options(name, options)
click to toggle source
# File lib/vx/common/amqp/session.rb, line 148 def get_queue_name_and_options(name, options) name ||= AMQ::Protocol::EMPTY_STRING [name, config.default_queue_options.merge(options || {})] end