class Sensu::Transport::Bunny
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/sensu/transport/bunny.rb, line 10 def initialize super @queues = {} @subscriptions = {} end
Public Instance Methods
acknowledge(info, &callback)
click to toggle source
# File lib/sensu/transport/bunny.rb, line 92 def acknowledge(info, &callback) info.ack callback.call(info) if callback end
close()
click to toggle source
# File lib/sensu/transport/bunny.rb, line 49 def close callback = Proc.new { @connection.close } connected? ? callback.call : EM.next_tick(callback) end
connect(options={})
click to toggle source
# File lib/sensu/transport/bunny.rb, line 16 def connect(options={}) if options.has_key?(:ssl) options[:tls] = true options[:tls_cert] = options[:ssl][:cert_chain_file] options[:tls_key] = options[:ssl][:private_key_file] end options[:logger] = @logger @connection = ::Bunny.new(options) @connection.start setup_channel(options) rescue ::Bunny::TCPConnectionFailed => e error = Error.new("failed to connect to rabbitmq (#{e.message}") @on_error.call(error) rescue ::Bunny::PossibleAuthenticationFailureError => e error = Error.new("failed to auth to rabbitmq (#{e.message}") @on_error.call(error) end
connected?()
click to toggle source
# File lib/sensu/transport/bunny.rb, line 45 def connected? @connection.connected? end
publish(exchange_type, exchange_name, message, options={}, &callback)
click to toggle source
# File lib/sensu/transport/bunny.rb, line 54 def publish(exchange_type, exchange_name, message, options={}, &callback) begin @channel.method(exchange_type.to_sym).call(exchange_name, options).publish(message) do info = {} callback.call(info) if callback end rescue => error info = {:error => error} callback.call(info) if callback end end
reconnect()
click to toggle source
# File lib/sensu/transport/bunny.rb, line 35 def reconnect unless @connection.connecting? @before_reconnect.call @connection.close @connection.start setup_channel @after_reconnect.call end end
stats(queue_name, options={}, &callback)
click to toggle source
# File lib/sensu/transport/bunny.rb, line 97 def stats(queue_name, options={}, &callback) options = options.merge(:auto_delete => true) status = @channel.queue(queue_name, options).status info = { :messages => status[:message_count], :consumers => status[:consumer_count] } callback.call(info) end
subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback)
click to toggle source
# File lib/sensu/transport/bunny.rb, line 66 def subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback) previously_declared = @queues.has_key?(queue_name) @queues[queue_name] ||= @channel.queue(queue_name, :auto_delete => true) queue = @queues[queue_name] queue.bind(@channel.method(exchange_type.to_sym).call(exchange_name)) unless previously_declared unless @subscriptions.has_key?(queue_name) @subscriptions[queue_name] = [] end @subscriptions[queue_name] << queue.subscribe(options) do |delivery_info, properties, payload| callback.call(payload) end end end
unsubscribe(&callback)
click to toggle source
Calls superclass method
# File lib/sensu/transport/bunny.rb, line 81 def unsubscribe(&callback) @queues.keys.each do |queue_name| @subscriptions[queue_name].each do |consumer| consumer.cancel end end @queues = {} @subscriptions = {} super end
Private Instance Methods
setup_channel(options={})
click to toggle source
# File lib/sensu/transport/bunny.rb, line 109 def setup_channel(options={}) @channel = @connection.create_channel prefetch = 1 if options.is_a?(Hash) prefetch = options.fetch(:prefetch, 1) end @channel.prefetch(prefetch) end