class InfluxDB2::Worker
Worker
for handling write batching queue
Public Class Methods
new(api_client, write_options)
click to toggle source
# File lib/influxdb2/client/worker.rb, line 25 def initialize(api_client, write_options) @api_client = api_client @write_options = write_options @queue = Queue.new @queue_event = Queue.new @queue_event.push(true) @thread_flush = Thread.new do until api_client.closed sleep @write_options.flush_interval.to_f / 1_000 _check_background_queue end end @thread_flush.abort_on_exception = @write_options.batch_abort_on_exception @thread_size = Thread.new do until api_client.closed _check_background_queue(size: true) if @queue.length >= @write_options.batch_size sleep 0.01 end end @thread_size.abort_on_exception = @write_options.batch_abort_on_exception end
Public Instance Methods
flush_all()
click to toggle source
# File lib/influxdb2/client/worker.rb, line 61 def flush_all _check_background_queue until @queue.empty? end
push(payload)
click to toggle source
# File lib/influxdb2/client/worker.rb, line 51 def push(payload) if payload.respond_to? :each payload.each do |item| push(item) end else @queue.push(payload) end end
Private Instance Methods
_check_background_queue(size: false)
click to toggle source
# File lib/influxdb2/client/worker.rb, line 67 def _check_background_queue(size: false) @queue_event.pop data = {} points = 0 if size && @queue.length < @write_options.batch_size @queue_event.push(true) return end while (points < @write_options.batch_size) && !@queue.empty? begin item = @queue.pop(true) key = item.key data[key] = [] unless data.key?(key) data[key] << item.data points += 1 rescue ThreadError @queue_event.push(true) return end end begin _write(data) unless data.values.flatten.empty? ensure @queue_event.push(true) end end
_write(data)
click to toggle source
# File lib/influxdb2/client/worker.rb, line 97 def _write(data) data.each do |key, points| _write_raw(key, points) end end
_write_raw(key, points)
click to toggle source
# File lib/influxdb2/client/worker.rb, line 103 def _write_raw(key, points) write_retry = InfluxDB2::WriteRetry.new( api_client: @api_client, max_retries: @write_options.max_retries, exponential_base: @write_options.exponential_base, retry_interval: @write_options.retry_interval, max_retry_delay: @write_options.max_retry_delay, max_retry_time: @write_options.max_retry_time ) if @write_options.jitter_interval > 0 jitter_delay = (@write_options.jitter_interval.to_f / 1_000) * rand sleep jitter_delay end write_retry.retry do @api_client.write_raw(points.join("\n"), precision: key.precision, bucket: key.bucket, org: key.org) end end