class Observability::Collector::Timescale
Constants
- JSON_CONFIG
The config to pass to JSON.parse
- LOOP_TIMER
The number of seconds to wait between IO loops
- MAX_EVENT_BYTES
The maximum size of event messages
Public Class Methods
new()
click to toggle source
Create a new UDP collector
Calls superclass method
# File lib/observability/collector/timescale.rb, line 48 def initialize super @socket = UDPSocket.new @db = nil @cursor = nil @processing = false end
Public Instance Methods
read_next_event()
click to toggle source
Read the next event from the socket
# File lib/observability/collector/timescale.rb, line 107 def read_next_event data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false ) if data == :wait_readable IO.select( [@socket], nil, nil, LOOP_TIMER ) return nil elsif data.empty? return nil else self.log.info "Read %d bytes" % [ data.bytesize ] return JSON.parse( data ) end end
start()
click to toggle source
Start receiving events.
# File lib/observability/collector/timescale.rb, line 63 def start self.log.info "Starting up." @db = Sequel.connect( self.class.db ) @db.extension( :pg_json ) # @cursor = @db[ :events ].prepare( :insert, :insert_new_event, # time: :$time, # type: :$type, # version: :$version, # data: :$data # ) @socket.bind( self.class.host, self.class.port ) self.start_processing end
start_processing()
click to toggle source
Start consuming incoming events and storing them.
# File lib/observability/collector/timescale.rb, line 90 def start_processing @processing = true while @processing event = self.read_next_event or next self.log.debug "Read event: %p" % [ event ] self.store_event( event ) end end
stop()
click to toggle source
Stop receiving events.
# File lib/observability/collector/timescale.rb, line 81 def stop self.stop_processing @cursor = nil @db.disconnect end
stop_processing()
click to toggle source
Stop consuming events.
# File lib/observability/collector/timescale.rb, line 101 def stop_processing @processing = false end
store_event( event )
click to toggle source
Store the specified event
.
# File lib/observability/collector/timescale.rb, line 123 def store_event( event ) self.log.debug "Storing event: %p" % [ event ] time = event.delete('@timestamp') type = event.delete('@type') version = event.delete('@version') # @cursor.call( time: time, type: type, version: version, data: event ) @db[ :events ].insert( time: time, type: type, version: version, data: Sequel.pg_json( event ) ) end