class Observability::Collector::RabbitMQ
A collector that re-injects events over AMQP to a RabbitMQ
cluster.
Constants
- DEFAULT_PUBLISH_OPTIONS
Default options for publication
- LOOP_TIMER
The number of seconds to wait between IO loops
- MAX_EVENT_BYTES
The maximum size of event messages
Public Class Methods
amqp_session_options()
click to toggle source
Fetch a Hash of AMQP options.
# File lib/observability/collector/rabbitmq.rb, line 66 def self::amqp_session_options return { logger: Loggability[ Observability ], heartbeat: self.heartbeat, exchange: self.exchange, vhost: self.vhost, threaded: self.threaded, } end
capabilities_list( server_info )
click to toggle source
Return a formatted list of the server's capabilities listed in server_info
.
# File lib/observability/collector/rabbitmq.rb, line 78 def self::capabilities_list( server_info ) server_info. map {|name,enabled| enabled ? name : nil }. compact.join(', ') end
configured_amqp_session()
click to toggle source
Establish the connection to RabbitMQ
based on the loaded configuration.
# File lib/observability/collector/rabbitmq.rb, line 86 def self::configured_amqp_session uri = self.broker_uri or raise "No broker_uri configured." options = self.amqp_session_options session = Bunny.new( uri, options ) session.start self.log.info "Connected to %s v%s server: %s" % [ session.server_properties['product'], session.server_properties['version'], self.capabilities_list( session.server_properties['capabilities'] ), ] return session end
new()
click to toggle source
Create a new UDP collector
Calls superclass method
# File lib/observability/collector/rabbitmq.rb, line 105 def initialize super @socket = UDPSocket.new @amqp_session = nil @amqp_channel = Concurrent::ThreadLocalVar.new { @amqp_session.create_channel } @amqp_exchange = Concurrent::ThreadLocalVar.new do @amqp_channel.value.headers( self.class.exchange, passive: true ) end @processing = false end
Public Instance Methods
read_next_event()
click to toggle source
Read the next event from the socket
# File lib/observability/collector/rabbitmq.rb, line 160 def read_next_event self.log.debug "Reading next event." data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false ) if data == :wait_readable IO.select( [@socket], nil, nil, LOOP_TIMER ) return nil elsif data.empty? return nil else self.log.info "Read %d bytes" % [ data.bytesize ] return JSON.parse( data ) end end
start()
click to toggle source
Start receiving events.
# File lib/observability/collector/rabbitmq.rb, line 123 def start self.log.info "Starting up." @amqp_session = self.class.configured_amqp_session @socket.bind( self.class.host, self.class.port ) self.start_processing end
start_processing()
click to toggle source
Start consuming incoming events and storing them.
# File lib/observability/collector/rabbitmq.rb, line 143 def start_processing @processing = true while @processing event = self.read_next_event or next self.log.debug "Read event: %p" % [ event ] self.store_event( event ) end end
stop()
click to toggle source
Stop receiving events.
# File lib/observability/collector/rabbitmq.rb, line 134 def stop self.stop_processing @socket.shutdown( :SHUT_RDWR ) @amqp_session.close end
stop_processing()
click to toggle source
Stop consuming events.
# File lib/observability/collector/rabbitmq.rb, line 154 def stop_processing @processing = false end
store_event( event )
click to toggle source
Store the specified event
.
# File lib/observability/collector/rabbitmq.rb, line 177 def store_event( event ) time = event.delete( '@timestamp' ) type = event.delete( '@type' ) version = event.delete( '@version' ) data = JSON.generate( event ) headers = { time: time, type: type, version: version, content_type: 'application/json', content_encoding: data.encoding.name, timestamp: Time.now.to_f, } @amqp_exchange.value.publish( data, headers ) end