class PrometheusAggregator::Exporter
Constants
- CONNECTION_RETRY_INTERVAL
- LOOP_INTERVAL
- QUEUE_CAPACITY
- STALENESS_THRESHOLD
Public Class Methods
new(host, port, opts = {})
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 16 def initialize(host, port, opts = {}) @host = host @port = port @ssl_params = opts[:ssl_params] @connection_retry_interval = opts[:connection_retry_interval] || CONNECTION_RETRY_INTERVAL @queue_capacity = opts[:queue_capacity] || QUEUE_CAPACITY @staleness_threshold = opts[:staleness_threshold] || STALENESS_THRESHOLD @registered = {} @stop = false @pid = nil @mutex = Mutex.new @queue = [] @socket = nil @thread = nil end
Public Instance Methods
backlog()
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 52 def backlog @queue.length end
enqueue(record)
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 34 def enqueue(record) @mutex.synchronize do if Process.pid != @pid PrometheusAggregator.logger.info("Pid mismatch, spawning new thread") @thread&.kill @thread = nil end if @thread.nil? @thread = Thread.new { write_loop } @pid = Process.pid end @queue << [Time.now, record] @queue.shift while @queue.length > QUEUE_CAPACITY end end
stop()
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 56 def stop @stop = true end
Private Instance Methods
connect()
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 100 def connect @registered = {} @socket&.close @socket = Net::TCPClient.new( server: "#{@host}:#{@port}", ssl: @ssl_params, connect_timeout: 3.0, write_timeout: 3.0, read_timeout: 3.0, connect_retry_count: 0 ) rescue Net::TCPClient::ConnectionFailure => err PrometheusAggregator.logger.debug(err) @socket&.close @socket = nil end
connection_ok?()
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 96 def connection_ok? !@socket.nil? && @socket.alive? end
emit_value(record)
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 128 def emit_value(record) send_line(JSON.fast_generate(record.slice(:name, :value, :labels))) end
register(record)
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 119 def register(record) declaration = record.reject { |k| k == :value } json_declaration = JSON.fast_generate(declaration) return if @registered[json_declaration] send_line(json_declaration) @registered[json_declaration] = true end
send_line(line)
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 132 def send_line(line) @socket.write(line + "\n") rescue Net::TCPClient::ConnectionFailure => err PrometheusAggregator.logger.debug(err) @socket&.close @socket = nil end
stale?(record)
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 92 def stale?(record) record[0] < Time.now - @staleness_threshold end
write_loop()
click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 62 def write_loop loop do break if @stop connect unless connection_ok? unless connection_ok? PrometheusAggregator.logger.warn( "Not connected to prometheus agggregator (#{@host}:#{@port})" ) sleep(@connection_retry_interval) next end event = @mutex.synchronize do @queue.shift while !@queue.empty? && stale?(@queue.first) @queue.shift end if event.nil? sleep(LOOP_INTERVAL) next end record = event[1] register(record) emit_value(record) end end