class Sensu::Transport::RabbitMQ

Public Instance Methods

acknowledge(info, &callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 72
def acknowledge(info, &callback)
  info.ack
  callback.call(info) if callback
end
close() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 30
def close
  callback = Proc.new { @connection.close }
  connected? ? callback.call : EM.next_tick(callback)
end
connect(options={}) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 10
def connect(options={})
  reset
  set_connection_options(options)
  create_connection_timeout
  connect_with_eligible_options
end
connected?() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 26
def connected?
  @connection.connected?
end
publish(exchange_type, exchange_name, message, options={}, &callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 35
def publish(exchange_type, exchange_name, message, options={}, &callback)
  begin
    @channel.method(exchange_type.to_sym).call(exchange_name, options).publish(message) do
      info = {}
      callback.call(info) if callback
    end
  rescue => error
    info = {:error => error}
    callback.call(info) if callback
  end
end
reconnect() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 17
def reconnect
  unless @reconnecting
    @reconnecting = true
    @before_reconnect.call
    reset
    periodically_reconnect
  end
end
stats(queue_name, options={}, &callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 77
def stats(queue_name, options={}, &callback)
  options = options.merge(:auto_delete => true)
  @channel.queue(queue_name, options).status do |messages, consumers|
    info = {
      :messages => messages,
      :consumers => consumers
    }
    callback.call(info)
  end
end
subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 47
def subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback)
  previously_declared = @queues.has_key?(queue_name)
  @queues[queue_name] ||= @channel.queue!(queue_name, :auto_delete => true)
  queue = @queues[queue_name]
  queue.bind(@channel.method(exchange_type.to_sym).call(exchange_name))
  unless previously_declared
    queue.subscribe(options, &callback)
  end
end
unsubscribe(&callback) click to toggle source
Calls superclass method Sensu::Transport::Base#unsubscribe
# File lib/sensu/transport/rabbitmq.rb, line 57
def unsubscribe(&callback)
  @queues.values.each do |queue|
    if connected?
      queue.unsubscribe
    else
      queue.before_recovery do
        queue.unsubscribe
      end
    end
  end
  @queues = {}
  @channel.recover if connected?
  super
end

Private Instance Methods

connect_with_eligible_options(&callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 145
def connect_with_eligible_options(&callback)
  options = next_connection_options
  setup_connection(options, &callback)
  setup_channel(options)
end
create_connection_timeout() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 100
def create_connection_timeout
  @connection_timeout = EM::Timer.new(20) do
    reconnect
  end
end
next_connection_options() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 106
def next_connection_options
  if @eligible_options.nil? || @eligible_options.empty?
    @eligible_options = @connection_options.shuffle
  end
  @eligible_options.shift
end
periodically_reconnect() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 151
def periodically_reconnect
  timer = EM::PeriodicTimer.new(5) do
    unless connected?
      begin
        connect_with_eligible_options do
          @reconnecting = false
          @after_reconnect.call
        end
      rescue EventMachine::ConnectionError
      rescue Java::JavaLang::RuntimeException
      end
    else
      timer.cancel
    end
  end
end
reconnect_callback() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 113
def reconnect_callback
  Proc.new { reconnect }
end
reset() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 90
def reset
  @queues = {}
  @connection_timeout.cancel if @connection_timeout
  @connection.close_connection if @connection
end
set_connection_options(options) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 96
def set_connection_options(options)
  @connection_options = [options].flatten
end
setup_channel(options={}) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 131
def setup_channel(options={})
  @channel = AMQP::Channel.new(@connection)
  @channel.auto_recovery = true
  @channel.on_error do |channel, channel_close|
    error = Error.new("rabbitmq channel closed")
    @on_error.call(error)
  end
  prefetch = 1
  if options.is_a?(Hash)
    prefetch = options.fetch(:prefetch, 1)
  end
  @channel.prefetch(prefetch)
end
setup_connection(options={}, &callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 117
def setup_connection(options={}, &callback)
  @connection = AMQP.connect(options, {
    :on_tcp_connection_failure => reconnect_callback,
    :on_possible_authentication_failure => reconnect_callback
  })
  @connection.logger = @logger
  @connection.on_open do
    @connection_timeout.cancel
    callback.call if callback
  end
  @connection.on_tcp_connection_loss(&reconnect_callback)
  @connection.on_skipped_heartbeats(&reconnect_callback)
end