class Sensu::Transport::Bunny

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/sensu/transport/bunny.rb, line 10
def initialize
  super
  @queues = {}
  @subscriptions = {}
end

Public Instance Methods

acknowledge(info, &callback) click to toggle source
# File lib/sensu/transport/bunny.rb, line 92
def acknowledge(info, &callback)
  info.ack
  callback.call(info) if callback
end
close() click to toggle source
# File lib/sensu/transport/bunny.rb, line 49
def close
  callback = Proc.new { @connection.close }
  connected? ? callback.call : EM.next_tick(callback)
end
connect(options={}) click to toggle source
# File lib/sensu/transport/bunny.rb, line 16
def connect(options={})
  if options.has_key?(:ssl)
    options[:tls]      = true
    options[:tls_cert] = options[:ssl][:cert_chain_file]
    options[:tls_key]  = options[:ssl][:private_key_file]
  end
  options[:logger] = @logger

  @connection = ::Bunny.new(options)
  @connection.start
  setup_channel(options)
rescue ::Bunny::TCPConnectionFailed => e
  error = Error.new("failed to connect to rabbitmq (#{e.message}")
  @on_error.call(error)
rescue ::Bunny::PossibleAuthenticationFailureError => e
  error = Error.new("failed to auth to rabbitmq (#{e.message}")
  @on_error.call(error)
end
connected?() click to toggle source
# File lib/sensu/transport/bunny.rb, line 45
def connected?
  @connection.connected?
end
publish(exchange_type, exchange_name, message, options={}, &callback) click to toggle source
# File lib/sensu/transport/bunny.rb, line 54
def publish(exchange_type, exchange_name, message, options={}, &callback)
  begin
    @channel.method(exchange_type.to_sym).call(exchange_name, options).publish(message) do
      info = {}
      callback.call(info) if callback
    end
  rescue => error
    info = {:error => error}
    callback.call(info) if callback
  end
end
reconnect() click to toggle source
# File lib/sensu/transport/bunny.rb, line 35
def reconnect
  unless @connection.connecting?
    @before_reconnect.call
    @connection.close
    @connection.start
    setup_channel
    @after_reconnect.call
  end
end
stats(queue_name, options={}, &callback) click to toggle source
# File lib/sensu/transport/bunny.rb, line 97
def stats(queue_name, options={}, &callback)
  options = options.merge(:auto_delete => true)
  status = @channel.queue(queue_name, options).status
  info = {
    :messages => status[:message_count],
    :consumers => status[:consumer_count]
  }
  callback.call(info)
end
subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback) click to toggle source
# File lib/sensu/transport/bunny.rb, line 66
def subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback)
  previously_declared = @queues.has_key?(queue_name)
  @queues[queue_name] ||= @channel.queue(queue_name, :auto_delete => true)
  queue = @queues[queue_name]
  queue.bind(@channel.method(exchange_type.to_sym).call(exchange_name))
  unless previously_declared
    unless @subscriptions.has_key?(queue_name)
      @subscriptions[queue_name] = []
    end
    @subscriptions[queue_name] << queue.subscribe(options) do |delivery_info, properties, payload|
      callback.call(payload)
    end
  end
end
unsubscribe(&callback) click to toggle source
Calls superclass method
# File lib/sensu/transport/bunny.rb, line 81
def unsubscribe(&callback)
  @queues.keys.each do |queue_name|
    @subscriptions[queue_name].each do |consumer|
      consumer.cancel
    end
  end
  @queues = {}
  @subscriptions = {}
  super
end

Private Instance Methods

setup_channel(options={}) click to toggle source
# File lib/sensu/transport/bunny.rb, line 109
def setup_channel(options={})
  @channel = @connection.create_channel
  prefetch = 1
  if options.is_a?(Hash)
    prefetch = options.fetch(:prefetch, 1)
  end
  @channel.prefetch(prefetch)
end