class Sensu::Transport::RabbitMQ
Public Instance Methods
Acknowledge the delivery of a message from RabbitMQ
.
@param info [Hash] message info containing its delivery tag. @yield [info] passes acknowledgment info to an optional
callback/block.
Sensu::Transport::Base#acknowledge
# File lib/sensu/transport/rabbitmq.rb, line 133 def acknowledge(info) catch_errors do info.ack end super end
Close the RabbitMQ
connections.
# File lib/sensu/transport/rabbitmq.rb, line 46 def close callback = Proc.new do [@primary_connection, @secondary_connection].each do |connection| connection && connection.close end end connected? ? callback.call : EM.next_tick(callback) end
RabbitMQ
connection setup. The deferred status is set to `:succeeded` (via `succeed()`) once the connection has been established.
@param options [Hash, String]
# File lib/sensu/transport/rabbitmq.rb, line 16 def connect(options={}) reset set_connection_options(options) create_connection_timeout connect_with_eligible_options end
Indicates if connected to RabbitMQ
.
@return [TrueClass, FalseClass]
# File lib/sensu/transport/rabbitmq.rb, line 39 def connected? [@primary_connection, @secondary_connection].all? do |connection| connection && connection.connected? end end
Publish a message to RabbitMQ
. This method will only use the primary channel for publishing keepalive, otherwise it will use the secondary channel.
@param type [Symbol] the RabbitMQ
exchange type, possible
values are: :direct and :fanout.
@param pipe [String] the RabbitMQ
exchange name. @param message [String] the message to be published to
RabbitMQ.
@param options [Hash] the options to publish the message with. @yield [info] passes publish info to an optional
callback/block.
@yieldparam info [Hash] contains publish information.
# File lib/sensu/transport/rabbitmq.rb, line 68 def publish(type, pipe, message, options={}) if connected? catch_errors do channel = (pipe == "keepalives" ? @primary_channel : @secondary_channel) channel.method(type.to_sym).call(pipe, options).publish(message) do info = {} yield(info) if block_given? end end else info = {:error => "Transport is not connected, triggering reconnect"} reconnect yield(info) if block_given? end end
Reconnect to RabbitMQ
.
@param force [Boolean] the reconnect.
# File lib/sensu/transport/rabbitmq.rb, line 26 def reconnect(force=false) unless @reconnecting @reconnecting = true @logger.debug("transport reconnecting...") @before_reconnect.call reset periodically_reconnect end end
RabbitMQ
queue stats, including message and consumer counts.
@param funnel [String] the RabbitMQ
queue to get stats for. @param options [Hash] the options to get queue stats with. @yield [info] passes queue stats to the callback/block. @yieldparam info [Hash] contains queue stats.
# File lib/sensu/transport/rabbitmq.rb, line 149 def stats(funnel, options={}) catch_errors do options = options.merge(:auto_delete => true) @primary_channel.queue(funnel, options).status do |messages, consumers| info = { :messages => messages, :consumers => consumers } yield(info) end end end
Subscribe to a RabbitMQ
queue.
@param type [Symbol] the RabbitMQ
exchange type, possible
values are: :direct and :fanout.
@param pipe [String] the RabbitMQ
exhange name. @param funnel [String] the RabbitMQ
queue. @param options [Hash] the options to consume messages with. @yield [info, message] passes message info and content to the
consumer callback/block.
@yieldparam info [Hash] contains message information. @yieldparam message [String] message.
# File lib/sensu/transport/rabbitmq.rb, line 95 def subscribe(type, pipe, funnel="", options={}, &callback) catch_errors do previously_declared = @queues.has_key?(funnel) @queues[funnel] ||= @primary_channel.queue!(funnel, :auto_delete => true) queue = @queues[funnel] queue.bind(@primary_channel.method(type.to_sym).call(pipe)) unless previously_declared queue.subscribe(options, &callback) end end end
Unsubscribe from all RabbitMQ
queues.
@yield [info] passes info to an optional callback/block. @yieldparam info [Hash] contains unsubscribe information.
Sensu::Transport::Base#unsubscribe
# File lib/sensu/transport/rabbitmq.rb, line 111 def unsubscribe catch_errors do @queues.values.each do |queue| if connected? queue.unsubscribe else queue.before_recovery do queue.unsubscribe end end end @queues = {} @primary_channel.recover if connected? end super end
Private Instance Methods
Catch RabbitMQ
errors and call the on_error callback, providing it with the error object as an argument. This method is intended to be applied where necessary, not to be confused with a catch-all.
@yield [] callback/block to execute within a rescue block to
catch RabbitMQ errors.
# File lib/sensu/transport/rabbitmq.rb, line 171 def catch_errors begin yield rescue AMQP::Error => error @on_error.call(error) end end
# File lib/sensu/transport/rabbitmq.rb, line 275 def connect_with_eligible_options(&callback) next_connection_options do |options| @primary_connection = setup_connection(options, &callback) @primary_channel = setup_channel(options, @primary_connection) @secondary_connection = setup_connection(options, &callback) @secondary_channel = setup_channel(options, @secondary_connection) end end
# File lib/sensu/transport/rabbitmq.rb, line 215 def connection_ready(&callback) if connected? @connection_timeout.cancel succeed callback.call if callback end end
# File lib/sensu/transport/rabbitmq.rb, line 191 def create_connection_timeout @connection_timeout = EM::Timer.new(20) do reconnect end end
# File lib/sensu/transport/rabbitmq.rb, line 197 def next_connection_options(&callback) if @eligible_options.nil? || @eligible_options.empty? @eligible_options = @connection_options.shuffle end options = @eligible_options.shift || {} if options.is_a?(Hash) && options[:host] resolve_host(options[:host]) do |ip_address| if ip_address.nil? reconnect else yield options.merge(:host => ip_address) end end else yield options end end
# File lib/sensu/transport/rabbitmq.rb, line 284 def periodically_reconnect(delay=2) capped_delay = (delay >= 20 ? 20 : delay) EM::Timer.new(capped_delay) do unless connected? reset periodically_reconnect(capped_delay += 2) begin connect_with_eligible_options do @reconnecting = false @after_reconnect.call end rescue EventMachine::ConnectionError rescue Errno::ECONNREFUSED rescue Java::JavaLang::RuntimeException rescue Java::JavaNioChannels::UnresolvedAddressException end end end end
# File lib/sensu/transport/rabbitmq.rb, line 179 def reset @queues = {} @connection_timeout.cancel if @connection_timeout [@primary_connection, @secondary_connection].each do |connection| connection && connection.close_connection end end
# File lib/sensu/transport/rabbitmq.rb, line 187 def set_connection_options(options) @connection_options = [options].flatten end
@param options [Hash] @param connection [Object] RabbitMQ
connection. @return [Object] RabbitMQ
connection channel.
# File lib/sensu/transport/rabbitmq.rb, line 260 def setup_channel(options={}, connection) channel = AMQP::Channel.new(connection) channel.auto_recovery = true channel.on_error do |channel, channel_close| error = Error.new("rabbitmq channel error") @on_error.call(error) end prefetch = 1 if options.is_a?(Hash) prefetch = options.fetch(:prefetch, 1) end channel.prefetch(prefetch) channel end
@param options [Hash] @return [Object] RabbitMQ
connection.
# File lib/sensu/transport/rabbitmq.rb, line 225 def setup_connection(options={}, &callback) reconnect_callback = Proc.new { reconnect } on_possible_auth_failure = Proc.new { @logger.warn("transport connection error", { :reason => "possible authentication failure. wrong credentials?", :user => options[:user] }) reconnect } connection = AMQP.connect(options, { :on_tcp_connection_failure => reconnect_callback, :on_possible_authentication_failure => on_possible_auth_failure }) connection.logger = @logger connection.on_open do @logger.debug("transport connection open") connection_ready(&callback) end connection.on_tcp_connection_loss do @logger.warn("transport connection error", :reason => "tcp connection lost") reconnect end connection.on_skipped_heartbeats do @logger.warn("transport connection error", :reason => "skipped heartbeats") reconnect end connection.on_closed do @logger.debug("transport connection closed") end connection end