class Lapine::Configuration

Public Class Methods

new() click to toggle source
# File lib/lapine/configuration.rb, line 3
def initialize
  @active_connections = {}
end

Public Instance Methods

active_connection(name) click to toggle source
# File lib/lapine/configuration.rb, line 43
def active_connection(name)
  conn = @active_connections[name]
  return conn if (conn && conn.connected?)

  @active_connections[name] = begin
    @conn = Bunny.new(connection_props_for(name)).tap do |conn|
      conn.start
    end
  end
end
channels_by_exchange_id() click to toggle source
# File lib/lapine/configuration.rb, line 15
def channels_by_exchange_id
  @channels_by_exchange_id ||= {}
end
cleanup_exchange(id) click to toggle source
# File lib/lapine/configuration.rb, line 23
def cleanup_exchange(id)
  return unless channels_by_exchange_id[id]
  channel = channels_by_exchange_id[id]
  channel.connection.logger.info "Closing channel for exchange #{id}, thread: #{Thread.current.object_id}"
  channel.close
  channels_by_exchange_id[id] = nil
end
close_connections!() click to toggle source
# File lib/lapine/configuration.rb, line 54
def close_connections!
  @active_connections.values.map(&:close)
  @active_connections = {}
  Thread.current.thread_variable_set(:lapine_exchanges, nil)
end
connection_properties() click to toggle source
# File lib/lapine/configuration.rb, line 11
def connection_properties
  @connection_properties ||= {}
end
connection_props_for(name) click to toggle source
# File lib/lapine/configuration.rb, line 60
def connection_props_for(name)
  return unless connection_properties[name]
  connection_properties[name].dup.tap do |props|
    if defined?(Rails)
      props.merge!(logger: Rails.logger)
    end
  end
end
connections() click to toggle source
# File lib/lapine/configuration.rb, line 7
def connections
  @connections ||= {}
end
exchange_properties() click to toggle source
# File lib/lapine/configuration.rb, line 39
def exchange_properties
  @exchange_properties ||= {}
end
exchanges() click to toggle source

Exchanges need to be saved in a thread-local variable, rather than a fiber-local variable, because in the context of some applications (such as Sidekiq, which uses Celluloid) individual bits of work are done in fibers that are immediately reaped.

# File lib/lapine/configuration.rb, line 34
def exchanges
  Thread.current.thread_variable_get(:lapine_exchanges) ||
    Thread.current.thread_variable_set(:lapine_exchanges, {})
end
register_channel(object_id, channel) click to toggle source
# File lib/lapine/configuration.rb, line 19
def register_channel(object_id, channel)
  channels_by_exchange_id[object_id] = channel
end