class Vx::Lib::Consumer::Session

Attributes

conn[R]

Public Instance Methods

allocate_pub_channel() { || ... } click to toggle source
# File lib/vx/lib/consumer/session.rb, line 109
def allocate_pub_channel
  assert_connection_is_open

  key = :vx_consumer_session_pub_channel

  if Thread.current[key]
    yield
  else
    ch = conn.create_channel
    assign_error_handlers_to_channel(ch)
    Thread.current[key] = ch
    begin
      yield
    ensure
      ch = Thread.current[key]
      ch.close if ch.open?
      Thread.current[key] = nil
    end
  end
end
assign_error_handlers_to_channel(ch) click to toggle source
# File lib/vx/lib/consumer/session.rb, line 146
def assign_error_handlers_to_channel(ch)
  ch.on_uncaught_exception {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
  ch.on_error {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
end
close() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 39
def close
  if open?
    @@session_lock.synchronize do
      begin
        conn.close
        while conn.status != :closed
          sleep 0.01
        end
      rescue Bunny::ChannelError, Bunny::ClientTimeout => e
        Consumer.exception_handler(e, {})
      end
      @conn = nil
    end
  end
end
conn_info() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 90
def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  else
    "not connected"
  end
end
declare_exchange(ch, name, options = nil) click to toggle source
# File lib/vx/lib/consumer/session.rb, line 130
def declare_exchange(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  options = {} if name == ''.freeze

  ch.exchange name, options
end
declare_queue(ch, name, options = nil) click to toggle source
# File lib/vx/lib/consumer/session.rb, line 139
def declare_queue(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch.queue name, options
end
live?() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 24
def live?
  @@live
end
open(options = {}) click to toggle source
# File lib/vx/lib/consumer/session.rb, line 55
def open(options = {})
  return self if open?

  @@session_lock.synchronize do
    unless open?
      resume

      bunny_options = {
        heartbeat: Consumer.configuration.heartbeat,
        automatically_recover: false,
      }

      if Consumer.configuration.logger
        bunny_options.merge!(logger: Consumer.configuration.logger)
      end

      @conn ||= Bunny.new(
        nil,       # from ENV['RABBITMQ_URL']
        bunny_options
      )

      conn.start
      while conn.connecting?
        sleep 0.01
      end
    end
  end

  self
end
open?() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 86
def open?
  conn && conn.open? && conn.status == :open
end
resume() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 28
def resume
  @@live = true
end
shutdown() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 17
def shutdown
  @@shutdown_lock.synchronize do
    @@live = false
    @@shutdown.broadcast
  end
end
wait_shutdown(timeout = nil) click to toggle source
# File lib/vx/lib/consumer/session.rb, line 32
def wait_shutdown(timeout = nil)
  @@shutdown_lock.synchronize do
    @@shutdown.wait(@@shutdown_lock, timeout)
    not live?
  end
end
with_pub_channel() { |ch| ... } click to toggle source
# File lib/vx/lib/consumer/session.rb, line 98
def with_pub_channel
  key = :vx_consumer_session_pub_channel
  if ch = Thread.current[key]
    yield ch
  else
    conn.with_channel do |c|
      yield c
    end
  end
end

Private Instance Methods

assert_connection_is_open() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 153
def assert_connection_is_open
  open? || raise(ConnectionDoesNotExistError)
end
config() click to toggle source
# File lib/vx/lib/consumer/session.rb, line 157
def config
  Consumer.configuration
end