class InfluxDB::AsyncQueue::Server

Attributes

config[R]
queue[R]

Public Class Methods

new(config = nil) click to toggle source
# File lib/influxdb/async_queue/server.rb, line 11
def initialize(config = nil)
  @config = config || ::InfluxDB::AsyncQueue.config
  @queue = ::InfluxDB::AsyncQueue::Queue.new(@config.adapter)
end
run(*args) click to toggle source
# File lib/influxdb/async_queue/server.rb, line 7
def self.run(*args)
  new(*args).run
end

Public Instance Methods

iterate(points) click to toggle source
# File lib/influxdb/async_queue/server.rb, line 31
def iterate(points)
  points = queue.pop(config.batch_size)
  if points.empty?
    config.logger.debug 'Queue is empty; sleep'.freeze
    Kernel.sleep(config.sleep_timeout)
    return false
  end

  config.logger.info("Going to write #{points.size} points")
  config.influxdb_client.write(
    points.join("\n"),
    config.influxdb_precision,
    config.influxdb_retention_policy.presence,
    config.influxdb_database
  )
  true
rescue InfluxDB::Error => e
  config.logger.error(e.inspect)
  if points
    sleep(config.sleep_timeout)
    queue.push(*points)
    config.logger.debug(points.inspect)
  end
  true
rescue => e
  config.logger.error(e.inspect)
  raise
end
run() click to toggle source
# File lib/influxdb/async_queue/server.rb, line 17
def run
  config.logger.info("Start #{self.inspect}")
  points = nil

  loop do
    next unless iterate(points)
    points = nil
  end
rescue Interrupt
  config.logger.info('Interrupted; exit(0)'.freeze)
  queue.push(*points) if points
  exit(0)
end