class Sensu::Transport::RabbitMQ

Public Instance Methods

ack(info)

A proper alias for acknowledge().

Alias for: acknowledge
acknowledge(info) click to toggle source

Acknowledge the delivery of a message from RabbitMQ.

@param info [Hash] message info containing its delivery tag. @yield [info] passes acknowledgment info to an optional

callback/block.
Calls superclass method Sensu::Transport::Base#acknowledge
# File lib/sensu/transport/rabbitmq.rb, line 133
def acknowledge(info)
  catch_errors do
    info.ack
  end
  super
end
Also aliased as: ack
close() click to toggle source

Close the RabbitMQ connections.

# File lib/sensu/transport/rabbitmq.rb, line 46
def close
  callback = Proc.new do
    [@primary_connection, @secondary_connection].each do |connection|
      connection && connection.close
    end
  end
  connected? ? callback.call : EM.next_tick(callback)
end
connect(options={}) click to toggle source

RabbitMQ connection setup. The deferred status is set to `:succeeded` (via `succeed()`) once the connection has been established.

@param options [Hash, String]

# File lib/sensu/transport/rabbitmq.rb, line 16
def connect(options={})
  reset
  set_connection_options(options)
  create_connection_timeout
  connect_with_eligible_options
end
connected?() click to toggle source

Indicates if connected to RabbitMQ.

@return [TrueClass, FalseClass]

# File lib/sensu/transport/rabbitmq.rb, line 39
def connected?
  [@primary_connection, @secondary_connection].all? do |connection|
    connection && connection.connected?
  end
end
publish(type, pipe, message, options={}) { |info| ... } click to toggle source

Publish a message to RabbitMQ. This method will only use the primary channel for publishing keepalive, otherwise it will use the secondary channel.

@param type [Symbol] the RabbitMQ exchange type, possible

values are: :direct and :fanout.

@param pipe [String] the RabbitMQ exchange name. @param message [String] the message to be published to

RabbitMQ.

@param options [Hash] the options to publish the message with. @yield [info] passes publish info to an optional

callback/block.

@yieldparam info [Hash] contains publish information.

# File lib/sensu/transport/rabbitmq.rb, line 68
def publish(type, pipe, message, options={})
  if connected?
    catch_errors do
      channel = (pipe == "keepalives" ? @primary_channel : @secondary_channel)
      channel.method(type.to_sym).call(pipe, options).publish(message) do
        info = {}
        yield(info) if block_given?
      end
    end
  else
    info = {:error => "Transport is not connected, triggering reconnect"}
    reconnect
    yield(info) if block_given?
  end
end
reconnect(force=false) click to toggle source

Reconnect to RabbitMQ.

@param force [Boolean] the reconnect.

# File lib/sensu/transport/rabbitmq.rb, line 26
def reconnect(force=false)
  unless @reconnecting
    @reconnecting = true
    @logger.debug("transport reconnecting...")
    @before_reconnect.call
    reset
    periodically_reconnect
  end
end
stats(funnel, options={}) { |info| ... } click to toggle source

RabbitMQ queue stats, including message and consumer counts.

@param funnel [String] the RabbitMQ queue to get stats for. @param options [Hash] the options to get queue stats with. @yield [info] passes queue stats to the callback/block. @yieldparam info [Hash] contains queue stats.

# File lib/sensu/transport/rabbitmq.rb, line 149
def stats(funnel, options={})
  catch_errors do
    options = options.merge(:auto_delete => true)
    @primary_channel.queue(funnel, options).status do |messages, consumers|
      info = {
        :messages => messages,
        :consumers => consumers
      }
      yield(info)
    end
  end
end
subscribe(type, pipe, funnel="", options={}, &callback) click to toggle source

Subscribe to a RabbitMQ queue.

@param type [Symbol] the RabbitMQ exchange type, possible

values are: :direct and :fanout.

@param pipe [String] the RabbitMQ exhange name. @param funnel [String] the RabbitMQ queue. @param options [Hash] the options to consume messages with. @yield [info, message] passes message info and content to the

consumer callback/block.

@yieldparam info [Hash] contains message information. @yieldparam message [String] message.

# File lib/sensu/transport/rabbitmq.rb, line 95
def subscribe(type, pipe, funnel="", options={}, &callback)
  catch_errors do
    previously_declared = @queues.has_key?(funnel)
    @queues[funnel] ||= @primary_channel.queue!(funnel, :auto_delete => true)
    queue = @queues[funnel]
    queue.bind(@primary_channel.method(type.to_sym).call(pipe))
    unless previously_declared
      queue.subscribe(options, &callback)
    end
  end
end
unsubscribe() click to toggle source

Unsubscribe from all RabbitMQ queues.

@yield [info] passes info to an optional callback/block. @yieldparam info [Hash] contains unsubscribe information.

Calls superclass method Sensu::Transport::Base#unsubscribe
# File lib/sensu/transport/rabbitmq.rb, line 111
def unsubscribe
  catch_errors do
    @queues.values.each do |queue|
      if connected?
        queue.unsubscribe
      else
        queue.before_recovery do
          queue.unsubscribe
        end
      end
    end
    @queues = {}
    @primary_channel.recover if connected?
  end
  super
end

Private Instance Methods

catch_errors() { || ... } click to toggle source

Catch RabbitMQ errors and call the on_error callback, providing it with the error object as an argument. This method is intended to be applied where necessary, not to be confused with a catch-all.

@yield [] callback/block to execute within a rescue block to

catch RabbitMQ errors.
# File lib/sensu/transport/rabbitmq.rb, line 171
def catch_errors
  begin
    yield
  rescue AMQP::Error => error
    @on_error.call(error)
  end
end
connect_with_eligible_options(&callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 275
def connect_with_eligible_options(&callback)
  next_connection_options do |options|
    @primary_connection = setup_connection(options, &callback)
    @primary_channel = setup_channel(options, @primary_connection)
    @secondary_connection = setup_connection(options, &callback)
    @secondary_channel = setup_channel(options, @secondary_connection)
  end
end
connection_ready(&callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 215
def connection_ready(&callback)
  if connected?
    @connection_timeout.cancel
    succeed
    callback.call if callback
  end
end
create_connection_timeout() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 191
def create_connection_timeout
  @connection_timeout = EM::Timer.new(20) do
    reconnect
  end
end
next_connection_options() { |merge(:host => ip_address)| ... } click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 197
def next_connection_options(&callback)
  if @eligible_options.nil? || @eligible_options.empty?
    @eligible_options = @connection_options.shuffle
  end
  options = @eligible_options.shift || {}
  if options.is_a?(Hash) && options[:host]
    resolve_host(options[:host]) do |ip_address|
      if ip_address.nil?
        reconnect
      else
        yield options.merge(:host => ip_address)
      end
    end
  else
    yield options
  end
end
periodically_reconnect(delay=2) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 284
def periodically_reconnect(delay=2)
  capped_delay = (delay >= 20 ? 20 : delay)
  EM::Timer.new(capped_delay) do
    unless connected?
      reset
      periodically_reconnect(capped_delay += 2)
      begin
        connect_with_eligible_options do
          @reconnecting = false
          @after_reconnect.call
        end
      rescue EventMachine::ConnectionError
      rescue Errno::ECONNREFUSED
      rescue Java::JavaLang::RuntimeException
      rescue Java::JavaNioChannels::UnresolvedAddressException
      end
    end
  end
end
reset() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 179
def reset
  @queues = {}
  @connection_timeout.cancel if @connection_timeout
  [@primary_connection, @secondary_connection].each do |connection|
    connection && connection.close_connection
  end
end
set_connection_options(options) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 187
def set_connection_options(options)
  @connection_options = [options].flatten
end
setup_channel(options={}, connection) click to toggle source

@param options [Hash] @param connection [Object] RabbitMQ connection. @return [Object] RabbitMQ connection channel.

# File lib/sensu/transport/rabbitmq.rb, line 260
def setup_channel(options={}, connection)
  channel = AMQP::Channel.new(connection)
  channel.auto_recovery = true
  channel.on_error do |channel, channel_close|
    error = Error.new("rabbitmq channel error")
    @on_error.call(error)
  end
  prefetch = 1
  if options.is_a?(Hash)
    prefetch = options.fetch(:prefetch, 1)
  end
  channel.prefetch(prefetch)
  channel
end
setup_connection(options={}, &callback) click to toggle source

@param options [Hash] @return [Object] RabbitMQ connection.

# File lib/sensu/transport/rabbitmq.rb, line 225
def setup_connection(options={}, &callback)
  reconnect_callback = Proc.new { reconnect }
  on_possible_auth_failure = Proc.new {
    @logger.warn("transport connection error", {
      :reason => "possible authentication failure. wrong credentials?",
      :user => options[:user]
    })
    reconnect
  }
  connection = AMQP.connect(options, {
    :on_tcp_connection_failure => reconnect_callback,
    :on_possible_authentication_failure => on_possible_auth_failure
  })
  connection.logger = @logger
  connection.on_open do
    @logger.debug("transport connection open")
    connection_ready(&callback)
  end
  connection.on_tcp_connection_loss do
    @logger.warn("transport connection error", :reason => "tcp connection lost")
    reconnect
  end
  connection.on_skipped_heartbeats do
    @logger.warn("transport connection error", :reason => "skipped heartbeats")
    reconnect
  end
  connection.on_closed do
    @logger.debug("transport connection closed")
  end
  connection
end