class InfluxDB2::WriteApi
Write time series data into InfluxDB.
Constants
- WORKER_MUTEX
Attributes
Public Class Methods
@param [Hash] options The options to be used by the client. @param [WriteOptions] write_options Write api configuration. @param [PointSettings] point_settings Default tags configuration
InfluxDB2::DefaultApi::new
# File lib/influxdb2/client/write_api.rb, line 129 def initialize(options:, write_options: SYNCHRONOUS, point_settings: InfluxDB2::PointSettings.new) super(options: options) @write_options = write_options @point_settings = point_settings @closed = false @options[:tags].each { |key, value| point_settings.add_default_tag(key, value) } if @options.key?(:tags) end
Public Instance Methods
@return [ true ] Always true.
# File lib/influxdb2/client/write_api.rb, line 191 def close! _worker.flush_all unless _worker.nil? @closed = true true end
Write data into specified Bucket.
@example write(data:
[ { name: 'cpu', tags: { host: 'server_nl', region: 'us' }, fields: {internal: 5, external: 6}, time: 1422568543702900257 }, {name: 'gpu', fields: {value: 0.9999}} ], precision: InfluxDB2::WritePrecision::NANOSECOND, bucket: 'my-bucket', org: 'my-org'
)
@example write(data: 'h2o,location=west value=33i 15')
@example point = InfluxDB2::Point.new
(name: 'h2o')
.add_tag('location', 'europe') .add_field('level', 2)
hash = { name: 'h2o', tags: { host: 'aws', region: 'us' }, fields: { level: 5, saturation: '99%' }, time: 123 }
write(data: ['h2o,location=west value=33i 15', point, hash])
@param [Object] data DataPoints to write into InfluxDB. The data could be represent by [Hash], [Point], [String]
or by collection of these types
@param [WritePrecision] precision The precision for the unix timestamps within the body line-protocol @param [String] bucket specifies the destination bucket for writes @param [String] org specifies the destination organization for writes
# File lib/influxdb2/client/write_api.rb, line 170 def write(data:, precision: nil, bucket: nil, org: nil) precision_param = precision || @options[:precision] bucket_param = bucket || @options[:bucket] org_param = org || @options[:org] _check('precision', precision_param) _check('bucket', bucket_param) _check('org', org_param) _add_default_tags(data) payload = _generate_payload(data, bucket: bucket_param, org: org_param, precision: precision_param) return nil if payload.nil? if WriteType::BATCHING == @write_options.write_type _worker.push(payload) else write_raw(payload, precision: precision_param, bucket: bucket_param, org: org_param) end end
@param [String] payload data as String @param [WritePrecision] precision The precision for the unix timestamps within the body line-protocol @param [String] bucket specifies the destination bucket for writes @param [String] org specifies the destination organization for writes
# File lib/influxdb2/client/write_api.rb, line 201 def write_raw(payload, precision: nil, bucket: nil, org: nil) precision_param = precision || @options[:precision] bucket_param = bucket || @options[:bucket] org_param = org || @options[:org] _check('precision', precision_param) _check('bucket', bucket_param) _check('org', org_param) return nil unless payload.instance_of?(String) || payload.empty? uri = _parse_uri('/api/v2/write') uri.query = URI.encode_www_form(bucket: bucket_param, org: org_param, precision: precision_param.to_s) _post_text(payload, uri) end
Private Instance Methods
# File lib/influxdb2/client/write_api.rb, line 272 def _add_default_tag(data, key, value) if data.is_a?(Point) data.add_tag(key, value) elsif data.is_a?(Hash) data[:tags][key] = value elsif data.respond_to? :map data.map do |item| _add_default_tag(item, key, value) end.reject(&:nil?) end end
# File lib/influxdb2/client/write_api.rb, line 284 def _generate_payload(data, precision: nil, bucket: nil, org: nil) if data.nil? nil elsif data.is_a?(Point) _generate_payload(data.to_line_protocol, bucket: bucket, org: org, precision: data.precision || DEFAULT_WRITE_PRECISION) elsif data.is_a?(String) if data.empty? nil elsif @write_options.write_type == WriteType::BATCHING BatchItem.new(BatchItemKey.new(bucket, org, precision), data) else data end elsif data.is_a?(Hash) _generate_payload(Point.from_hash(data), bucket: bucket, org: org, precision: precision) elsif data.respond_to? :map payloads = data.map do |item| _generate_payload(item, bucket: bucket, org: org, precision: precision) end.reject(&:nil?) if @write_options.write_type == WriteType::BATCHING payloads else payloads.join("\n".freeze) end end end
# File lib/influxdb2/client/write_api.rb, line 249 def _worker return nil unless @write_options.write_type == WriteType::BATCHING return @worker if @worker WORKER_MUTEX.synchronize do # this return is necessary because the previous mutex holder # might have already assigned the @worker return @worker if @worker @worker = Worker.new(self, @write_options) end end