class Evrone::Common::AMQP::Session

Constants

CHANNEL_KEY

Attributes

conn[R]

Public Class Methods

resume() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 26
def resume
  @shutdown = false
end
shutdown() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 18
def shutdown
  @shutdown = true
end
shutdown?() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 22
def shutdown?
  @shutdown == true
end

Public Instance Methods

channel() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 95
def channel
  assert_connection_is_open

  Thread.current[CHANNEL_KEY] || conn.default_channel
end
close() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 31
def close
  if open?
    @@session_lock.synchronize do
      info "closing connection"
      begin
        conn.close
      rescue Bunny::ChannelError => e
        warn e
      end
      info "wait..."
      while conn.status != :closed
        sleep 0.01
      end
      @conn = nil
      info "connection closed"
    end
  end
end
config() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 128
def config
  Common::AMQP.config
end
conn_info() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 115
def conn_info
  if conn
    "#{conn.user}:#{conn.host}:#{conn.port}/#{conn.vhost}"
  end
end
declare_exchange(name, options = nil) click to toggle source
# File lib/evrone/common/amqp/session.rb, line 76
def declare_exchange(name, options = nil)
  assert_connection_is_open

  options  ||= {}
  name     ||= config.default_exchange_name
  ch         = options.delete(:channel) || channel
  type, opts = get_exchange_type_and_options options
  ch.exchange name, opts.merge(type: type)
end
declare_queue(name, options = nil) click to toggle source
# File lib/evrone/common/amqp/session.rb, line 86
def declare_queue(name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch = options.delete(:channel) || channel
  name, opts = get_queue_name_and_options(name, options)
  ch.queue name, opts
end
open() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 50
def open
  return self if open?

  @@session_lock.synchronize do
    self.class.resume

    @conn ||= Bunny.new config.url, heartbeat: :server

    unless conn.open?
      info "connecting to #{conn_info}"
      conn.start
      info "wait connection to #{conn_info}"
      while conn.connecting?
        sleep 0.01
      end
      info "connected successfuly (#{server_name})"
    end
  end

  self
end
open?() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 72
def open?
  conn && conn.open? && conn.status == :open
end
server_name() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 121
def server_name
  if conn
    p = conn.server_properties || {}
    "#{p["product"]}/#{p["version"]}"
  end
end
with_channel() { || ... } click to toggle source
# File lib/evrone/common/amqp/session.rb, line 101
def with_channel
  assert_connection_is_open

  old,new = nil
  begin
    old,new = Thread.current[CHANNEL_KEY], conn.create_channel
    Thread.current[CHANNEL_KEY] = new
    yield
  ensure
    Thread.current[CHANNEL_KEY] = old
    new.close if new && new.open?
  end
end

Private Instance Methods

assert_connection_is_open() click to toggle source
# File lib/evrone/common/amqp/session.rb, line 145
def assert_connection_is_open
  open? || raise(ConnectionDoesNotExist.new "you need to run #{to_s}#open")
end
get_exchange_type_and_options(options) click to toggle source
# File lib/evrone/common/amqp/session.rb, line 134
def get_exchange_type_and_options(options)
  options = config.default_exchange_options.merge(options || {})
  type = options.delete(:type) || config.default_exchange_type
  [type, options]
end
get_queue_name_and_options(name, options) click to toggle source
# File lib/evrone/common/amqp/session.rb, line 140
def get_queue_name_and_options(name, options)
  name  ||= AMQ::Protocol::EMPTY_STRING
  [name, config.default_queue_options.merge(options || {})]
end