class PrometheusAggregator::Exporter

Constants

CONNECTION_RETRY_INTERVAL
LOOP_INTERVAL
QUEUE_CAPACITY
STALENESS_THRESHOLD

Public Class Methods

new(host, port, opts = {}) click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 16
def initialize(host, port, opts = {})
  @host = host
  @port = port
  @ssl_params = opts[:ssl_params]
  @connection_retry_interval =
    opts[:connection_retry_interval] || CONNECTION_RETRY_INTERVAL
  @queue_capacity = opts[:queue_capacity] || QUEUE_CAPACITY
  @staleness_threshold = opts[:staleness_threshold] || STALENESS_THRESHOLD
  @registered = {}

  @stop = false
  @pid = nil
  @mutex = Mutex.new
  @queue = []
  @socket = nil
  @thread = nil
end

Public Instance Methods

backlog() click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 52
def backlog
  @queue.length
end
enqueue(record) click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 34
def enqueue(record)
  @mutex.synchronize do
    if Process.pid != @pid
      PrometheusAggregator.logger.info("Pid mismatch, spawning new thread")
      @thread&.kill
      @thread = nil
    end

    if @thread.nil?
      @thread = Thread.new { write_loop }
      @pid = Process.pid
    end

    @queue << [Time.now, record]
    @queue.shift while @queue.length > QUEUE_CAPACITY
  end
end
stop() click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 56
def stop
  @stop = true
end

Private Instance Methods

connect() click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 100
def connect
  @registered = {}

  @socket&.close

  @socket = Net::TCPClient.new(
    server: "#{@host}:#{@port}",
    ssl: @ssl_params,
    connect_timeout: 3.0,
    write_timeout: 3.0,
    read_timeout: 3.0,
    connect_retry_count: 0
  )
rescue Net::TCPClient::ConnectionFailure => err
  PrometheusAggregator.logger.debug(err)
  @socket&.close
  @socket = nil
end
connection_ok?() click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 96
def connection_ok?
  !@socket.nil? && @socket.alive?
end
emit_value(record) click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 128
def emit_value(record)
  send_line(JSON.fast_generate(record.slice(:name, :value, :labels)))
end
register(record) click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 119
def register(record)
  declaration = record.reject { |k| k == :value }
  json_declaration = JSON.fast_generate(declaration)
  return if @registered[json_declaration]

  send_line(json_declaration)
  @registered[json_declaration] = true
end
send_line(line) click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 132
def send_line(line)
  @socket.write(line + "\n")
rescue Net::TCPClient::ConnectionFailure => err
  PrometheusAggregator.logger.debug(err)
  @socket&.close
  @socket = nil
end
stale?(record) click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 92
def stale?(record)
  record[0] < Time.now - @staleness_threshold
end
write_loop() click to toggle source
# File lib/prometheus_aggregator/exporter.rb, line 62
def write_loop
  loop do
    break if @stop

    connect unless connection_ok?
    unless connection_ok?
      PrometheusAggregator.logger.warn(
        "Not connected to prometheus agggregator (#{@host}:#{@port})"
      )

      sleep(@connection_retry_interval)
      next
    end

    event = @mutex.synchronize do
      @queue.shift while !@queue.empty? && stale?(@queue.first)
      @queue.shift
    end

    if event.nil?
      sleep(LOOP_INTERVAL)
      next
    end

    record = event[1]
    register(record)
    emit_value(record)
  end
end