class Vx::Common::AMQP::Session

Constants

CHANNEL_KEY

Attributes

conn[R]

Public Class Methods

resume() click to toggle source
# File lib/vx/common/amqp/session.rb, line 26
def resume
  @shutdown = false
end
shutdown() click to toggle source
# File lib/vx/common/amqp/session.rb, line 18
def shutdown
  @shutdown = true
end
shutdown?() click to toggle source
# File lib/vx/common/amqp/session.rb, line 22
def shutdown?
  @shutdown == true
end

Public Instance Methods

channel() click to toggle source
# File lib/vx/common/amqp/session.rb, line 103
def channel
  assert_connection_is_open

  Thread.current[CHANNEL_KEY] || conn.default_channel
end
close() click to toggle source
# File lib/vx/common/amqp/session.rb, line 31
def close
  if open?
    @@session_lock.synchronize do
      begin
        conn.close
      rescue Bunny::ChannelError => e
        warn e
      end
      while conn.status != :closed
        sleep 0.01
      end
      @conn = nil
    end
  end
end
config() click to toggle source
# File lib/vx/common/amqp/session.rb, line 136
def config
  Common::AMQP.config
end
conn_info() click to toggle source
# File lib/vx/common/amqp/session.rb, line 123
def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  end
end
declare_exchange(name, options = nil) click to toggle source
# File lib/vx/common/amqp/session.rb, line 84
def declare_exchange(name, options = nil)
  assert_connection_is_open

  options  ||= {}
  name     ||= config.default_exchange_name
  ch         = options.delete(:channel) || channel
  type, opts = get_exchange_type_and_options options
  ch.exchange name, opts.merge(type: type)
end
declare_queue(name, options = nil) click to toggle source
# File lib/vx/common/amqp/session.rb, line 94
def declare_queue(name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch = options.delete(:channel) || channel
  name, opts = get_queue_name_and_options(name, options)
  ch.queue name, opts
end
open(options = {}) click to toggle source
# File lib/vx/common/amqp/session.rb, line 47
def open(options = {})
  return self if open?

  @@session_lock.synchronize do
    self.class.resume

    @conn ||= Bunny.new(
      nil,  # from ENV['RABBITMQ_URL']
      heartbeat: :server,
      automatically_recover: false
    )

    instrumentation = {
      info:        conn_info,
      consumer:    options[:consumer],
      consumer_id: options[:consumer_id]
    }

    instrument("start_connecting.consumer.amqp", instrumentation)

    instrument("connect.consumer.amqp", instrumentation) do
      unless conn.open?
        conn.start
        while conn.connecting?
          sleep 0.01
        end
      end
    end
  end

  self
end
open?() click to toggle source
# File lib/vx/common/amqp/session.rb, line 80
def open?
  conn && conn.open? && conn.status == :open
end
server_name() click to toggle source
# File lib/vx/common/amqp/session.rb, line 129
def server_name
  if conn
    p = conn.server_properties || {}
    "#{p["product"]}/#{p["version"]}"
  end
end
with_channel() { || ... } click to toggle source
# File lib/vx/common/amqp/session.rb, line 109
def with_channel
  assert_connection_is_open

  old,new = nil
  begin
    old,new = Thread.current[CHANNEL_KEY], conn.create_channel
    Thread.current[CHANNEL_KEY] = new
    yield
  ensure
    Thread.current[CHANNEL_KEY] = old
    new.close if new && new.open?
  end
end

Private Instance Methods

assert_connection_is_open() click to toggle source
# File lib/vx/common/amqp/session.rb, line 153
def assert_connection_is_open
  open? || raise(ConnectionDoesNotExist.new "you need to run #{to_s}#open")
end
get_exchange_type_and_options(options) click to toggle source
# File lib/vx/common/amqp/session.rb, line 142
def get_exchange_type_and_options(options)
  options = config.default_exchange_options.merge(options || {})
  type = options.delete(:type) || config.default_exchange_type
  [type, options]
end
get_queue_name_and_options(name, options) click to toggle source
# File lib/vx/common/amqp/session.rb, line 148
def get_queue_name_and_options(name, options)
  name  ||= AMQ::Protocol::EMPTY_STRING
  [name, config.default_queue_options.merge(options || {})]
end