acknowledge(info, &callback)
click to toggle source
def acknowledge(info, &callback)
info.ack
callback.call(info) if callback
end
close()
click to toggle source
def close
callback = Proc.new { @connection.close }
connected? ? callback.call : EM.next_tick(callback)
end
connect(options={})
click to toggle source
def connect(options={})
reset
set_connection_options(options)
create_connection_timeout
connect_with_eligible_options
end
connected?()
click to toggle source
def connected?
@connection.connected?
end
publish(exchange_type, exchange_name, message, options={}, &callback)
click to toggle source
def publish(exchange_type, exchange_name, message, options={}, &callback)
begin
@channel.method(exchange_type.to_sym).call(exchange_name, options).publish(message) do
info = {}
callback.call(info) if callback
end
rescue => error
info = {:error => error}
callback.call(info) if callback
end
end
reconnect()
click to toggle source
def reconnect
unless @reconnecting
@reconnecting = true
@before_reconnect.call
reset
periodically_reconnect
end
end
stats(queue_name, options={}, &callback)
click to toggle source
def stats(queue_name, options={}, &callback)
options = options.merge(:auto_delete => true)
@channel.queue(queue_name, options).status do |messages, consumers|
info = {
:messages => messages,
:consumers => consumers
}
callback.call(info)
end
end
subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback)
click to toggle source
def subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback)
previously_declared = @queues.has_key?(queue_name)
@queues[queue_name] ||= @channel.queue!(queue_name, :auto_delete => true)
queue = @queues[queue_name]
queue.bind(@channel.method(exchange_type.to_sym).call(exchange_name))
unless previously_declared
queue.subscribe(options, &callback)
end
end
unsubscribe(&callback)
click to toggle source
def unsubscribe(&callback)
@queues.values.each do |queue|
if connected?
queue.unsubscribe
else
queue.before_recovery do
queue.unsubscribe
end
end
end
@queues = {}
@channel.recover if connected?
super
end
connect_with_eligible_options(&callback)
click to toggle source
def connect_with_eligible_options(&callback)
options = next_connection_options
setup_connection(options, &callback)
setup_channel(options)
end
create_connection_timeout()
click to toggle source
def create_connection_timeout
@connection_timeout = EM::Timer.new(20) do
reconnect
end
end
next_connection_options()
click to toggle source
def next_connection_options
if @eligible_options.nil? || @eligible_options.empty?
@eligible_options = @connection_options.shuffle
end
@eligible_options.shift
end
periodically_reconnect()
click to toggle source
def periodically_reconnect
timer = EM::PeriodicTimer.new(5) do
unless connected?
begin
connect_with_eligible_options do
@reconnecting = false
@after_reconnect.call
end
rescue EventMachine::ConnectionError
rescue Java::JavaLang::RuntimeException
end
else
timer.cancel
end
end
end
reconnect_callback()
click to toggle source
def reconnect_callback
Proc.new { reconnect }
end
reset()
click to toggle source
def reset
@queues = {}
@connection_timeout.cancel if @connection_timeout
@connection.close_connection if @connection
end
set_connection_options(options)
click to toggle source
def set_connection_options(options)
@connection_options = [options].flatten
end
setup_channel(options={})
click to toggle source
def setup_channel(options={})
@channel = AMQP::Channel.new(@connection)
@channel.auto_recovery = true
@channel.on_error do |channel, channel_close|
error = Error.new("rabbitmq channel closed")
@on_error.call(error)
end
prefetch = 1
if options.is_a?(Hash)
prefetch = options.fetch(:prefetch, 1)
end
@channel.prefetch(prefetch)
end
setup_connection(options={}, &callback)
click to toggle source
def setup_connection(options={}, &callback)
@connection = AMQP.connect(options, {
:on_tcp_connection_failure => reconnect_callback,
:on_possible_authentication_failure => reconnect_callback
})
@connection.logger = @logger
@connection.on_open do
@connection_timeout.cancel
callback.call if callback
end
@connection.on_tcp_connection_loss(&reconnect_callback)
@connection.on_skipped_heartbeats(&reconnect_callback)
end