class Observability::Collector::RabbitMQ

A collector that re-injects events over AMQP to a RabbitMQ cluster.

Constants

DEFAULT_PUBLISH_OPTIONS

Default options for publication

LOOP_TIMER

The number of seconds to wait between IO loops

MAX_EVENT_BYTES

The maximum size of event messages

Public Class Methods

amqp_session_options() click to toggle source

Fetch a Hash of AMQP options.

# File lib/observability/collector/rabbitmq.rb, line 66
def self::amqp_session_options
        return {
                logger: Loggability[ Observability ],
                heartbeat: self.heartbeat,
                exchange: self.exchange,
                vhost: self.vhost,
                threaded: self.threaded,
        }
end
capabilities_list( server_info ) click to toggle source

Return a formatted list of the server's capabilities listed in server_info.

# File lib/observability/collector/rabbitmq.rb, line 78
def self::capabilities_list( server_info )
        server_info.
                map {|name,enabled| enabled ? name : nil }.
                compact.join(', ')
end
configured_amqp_session() click to toggle source

Establish the connection to RabbitMQ based on the loaded configuration.

# File lib/observability/collector/rabbitmq.rb, line 86
def self::configured_amqp_session
        uri = self.broker_uri or raise "No broker_uri configured."
        options = self.amqp_session_options

        session = Bunny.new( uri, options )
        session.start

        self.log.info "Connected to %s v%s server: %s" % [
                session.server_properties['product'],
                session.server_properties['version'],
                self.capabilities_list( session.server_properties['capabilities'] ),
        ]

        return session
end
new() click to toggle source

Create a new UDP collector

Calls superclass method
# File lib/observability/collector/rabbitmq.rb, line 105
def initialize
        super

        @socket        = UDPSocket.new
        @amqp_session  = nil
        @amqp_channel  = Concurrent::ThreadLocalVar.new { @amqp_session.create_channel }
        @amqp_exchange = Concurrent::ThreadLocalVar.new do
                @amqp_channel.value.headers( self.class.exchange, passive: true )
        end
        @processing    = false
end

Public Instance Methods

read_next_event() click to toggle source

Read the next event from the socket

# File lib/observability/collector/rabbitmq.rb, line 160
def read_next_event
        self.log.debug "Reading next event."
        data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false )

        if data == :wait_readable
                IO.select( [@socket], nil, nil, LOOP_TIMER )
                return nil
        elsif data.empty?
                return nil
        else
                self.log.info "Read %d bytes" % [ data.bytesize ]
                return JSON.parse( data )
        end
end
start() click to toggle source

Start receiving events.

# File lib/observability/collector/rabbitmq.rb, line 123
def start
        self.log.info "Starting up."

        @amqp_session = self.class.configured_amqp_session
        @socket.bind( self.class.host, self.class.port )

        self.start_processing
end
start_processing() click to toggle source

Start consuming incoming events and storing them.

# File lib/observability/collector/rabbitmq.rb, line 143
def start_processing
        @processing = true
        while @processing
                event = self.read_next_event or next
                self.log.debug "Read event: %p" % [ event ]
                self.store_event( event )
        end
end
stop() click to toggle source

Stop receiving events.

# File lib/observability/collector/rabbitmq.rb, line 134
def stop
        self.stop_processing

        @socket.shutdown( :SHUT_RDWR )
        @amqp_session.close
end
stop_processing() click to toggle source

Stop consuming events.

# File lib/observability/collector/rabbitmq.rb, line 154
def stop_processing
        @processing = false
end
store_event( event ) click to toggle source

Store the specified event.

# File lib/observability/collector/rabbitmq.rb, line 177
def store_event( event )
        time    = event.delete( '@timestamp' )
        type    = event.delete( '@type' )
        version = event.delete( '@version' )

        data = JSON.generate( event )
        headers = {
                time: time,
                type: type,
                version: version,
                content_type: 'application/json',
                content_encoding: data.encoding.name,
                timestamp: Time.now.to_f,
        }

        @amqp_exchange.value.publish( data, headers )
end