class InfluxDB2::WriteApi

Write time series data into InfluxDB.

Constants

WORKER_MUTEX

Attributes

closed[R]

Public Class Methods

new(options:, write_options: SYNCHRONOUS, point_settings: InfluxDB2::PointSettings.new) click to toggle source

@param [Hash] options The options to be used by the client. @param [WriteOptions] write_options Write api configuration. @param [PointSettings] point_settings Default tags configuration

Calls superclass method InfluxDB2::DefaultApi::new
# File lib/influxdb2/client/write_api.rb, line 129
def initialize(options:, write_options: SYNCHRONOUS, point_settings: InfluxDB2::PointSettings.new)
  super(options: options)
  @write_options = write_options
  @point_settings = point_settings
  @closed = false
  @options[:tags].each { |key, value| point_settings.add_default_tag(key, value) } if @options.key?(:tags)
end

Public Instance Methods

close!() click to toggle source

@return [ true ] Always true.

# File lib/influxdb2/client/write_api.rb, line 191
def close!
  _worker.flush_all unless _worker.nil?
  @closed = true
  true
end
write(data:, precision: nil, bucket: nil, org: nil) click to toggle source

Write data into specified Bucket.

@example write(data:

[
  {
    name: 'cpu',
    tags: { host: 'server_nl', region: 'us' },
    fields: {internal: 5, external: 6},
    time: 1422568543702900257
  },
  {name: 'gpu', fields: {value: 0.9999}}
],
precision: InfluxDB2::WritePrecision::NANOSECOND,
bucket: 'my-bucket',
org: 'my-org'

)

@example write(data: 'h2o,location=west value=33i 15')

@example point = InfluxDB2::Point.new(name: 'h2o')

.add_tag('location', 'europe')
.add_field('level', 2)

hash = { name: 'h2o', tags: { host: 'aws', region: 'us' }, fields: { level: 5, saturation: '99%' }, time: 123 }

write(data: ['h2o,location=west value=33i 15', point, hash])

@param [Object] data DataPoints to write into InfluxDB. The data could be represent by [Hash], [Point], [String]

or by collection of these types

@param [WritePrecision] precision The precision for the unix timestamps within the body line-protocol @param [String] bucket specifies the destination bucket for writes @param [String] org specifies the destination organization for writes

# File lib/influxdb2/client/write_api.rb, line 170
def write(data:, precision: nil, bucket: nil, org: nil)
  precision_param = precision || @options[:precision]
  bucket_param = bucket || @options[:bucket]
  org_param = org || @options[:org]
  _check('precision', precision_param)
  _check('bucket', bucket_param)
  _check('org', org_param)

  _add_default_tags(data)

  payload = _generate_payload(data, bucket: bucket_param, org: org_param, precision: precision_param)
  return nil if payload.nil?

  if WriteType::BATCHING == @write_options.write_type
    _worker.push(payload)
  else
    write_raw(payload, precision: precision_param, bucket: bucket_param, org: org_param)
  end
end
write_raw(payload, precision: nil, bucket: nil, org: nil) click to toggle source

@param [String] payload data as String @param [WritePrecision] precision The precision for the unix timestamps within the body line-protocol @param [String] bucket specifies the destination bucket for writes @param [String] org specifies the destination organization for writes

# File lib/influxdb2/client/write_api.rb, line 201
def write_raw(payload, precision: nil, bucket: nil, org: nil)
  precision_param = precision || @options[:precision]
  bucket_param = bucket || @options[:bucket]
  org_param = org || @options[:org]
  _check('precision', precision_param)
  _check('bucket', bucket_param)
  _check('org', org_param)

  return nil unless payload.instance_of?(String) || payload.empty?

  uri = _parse_uri('/api/v2/write')
  uri.query = URI.encode_www_form(bucket: bucket_param, org: org_param, precision: precision_param.to_s)

  _post_text(payload, uri)
end

Private Instance Methods

_add_default_tag(data, key, value) click to toggle source
# File lib/influxdb2/client/write_api.rb, line 272
def _add_default_tag(data, key, value)
  if data.is_a?(Point)
    data.add_tag(key, value)
  elsif data.is_a?(Hash)
    data[:tags][key] = value
  elsif data.respond_to? :map
    data.map do |item|
      _add_default_tag(item, key, value)
    end.reject(&:nil?)
  end
end
_add_default_tags(data) click to toggle source
# File lib/influxdb2/client/write_api.rb, line 263
def _add_default_tags(data)
  default_tags = @point_settings.default_tags

  default_tags.each do |key, expression|
    value = PointSettings.get_value(expression)
    _add_default_tag(data, key, value)
  end
end
_generate_payload(data, precision: nil, bucket: nil, org: nil) click to toggle source
# File lib/influxdb2/client/write_api.rb, line 284
def _generate_payload(data, precision: nil, bucket: nil, org: nil)
  if data.nil?
    nil
  elsif data.is_a?(Point)
    _generate_payload(data.to_line_protocol, bucket: bucket, org: org, precision: data.precision ||
        DEFAULT_WRITE_PRECISION)
  elsif data.is_a?(String)
    if data.empty?
      nil
    elsif @write_options.write_type == WriteType::BATCHING
      BatchItem.new(BatchItemKey.new(bucket, org, precision), data)
    else
      data
    end
  elsif data.is_a?(Hash)
    _generate_payload(Point.from_hash(data), bucket: bucket, org: org, precision: precision)
  elsif data.respond_to? :map
    payloads = data.map do |item|
      _generate_payload(item, bucket: bucket, org: org, precision: precision)
    end.reject(&:nil?)
    if @write_options.write_type == WriteType::BATCHING
      payloads
    else
      payloads.join("\n".freeze)
    end
  end
end
_worker() click to toggle source
# File lib/influxdb2/client/write_api.rb, line 249
def _worker
  return nil unless @write_options.write_type == WriteType::BATCHING

  return @worker if @worker

  WORKER_MUTEX.synchronize do
    # this return is necessary because the previous mutex holder
    # might have already assigned the @worker
    return @worker if @worker

    @worker = Worker.new(self, @write_options)
  end
end