class Fluent::Plugin::InfluxdbOutput
Constants
- DEFAULT_BUFFER_TYPE
- EMPTY_STRING
- FORMATTED_RESULT_FOR_INVALID_RECORD
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 64 def initialize super @seq = 0 @prev_timestamp = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 70 def configure(conf) compat_parameters_convert(conf, :buffer) super @time_precise = time_precise_lambda() raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 106 def format(tag, time, record) # nil and '' check should be in influxdb-ruby client... if record.empty? || record.has_value?(nil) || record.has_value?(EMPTY_STRING) log.warn "Skip record '#{record}' in '#{tag}', because either record has no value or at least a value is 'nil' or empty string inside the record." FORMATTED_RESULT_FOR_INVALID_RECORD else [precision_time(time), record].to_msgpack end end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 121 def formatted_to_msgpack_binary true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 125 def multi_workers_ready? true end
precision_time(time)
click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 227 def precision_time(time) # nsec is supported from v0.14 nstime = time * (10 ** 9) + (time.is_a?(Integer) ? 0 : time.nsec) @time_precise.call(nstime) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 116 def shutdown super @influxdb.stop! end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 77 def start super log.info "Connecting to database: #{@dbname}, host: #{@host}, port: #{@port}, username: #{@user}, use_ssl = #{@use_ssl}, verify_ssl = #{@verify_ssl}" # ||= for testing. @influxdb ||= InfluxDB::Client.new @dbname, hosts: @host.split(','), port: @port, username: @user, password: @password, async: false, retry: @retry, time_precision: @time_precision, use_ssl: @use_ssl, verify_ssl: @verify_ssl begin existing_databases = @influxdb.list_databases.map { |x| x['name'] } unless existing_databases.include? @dbname raise Fluent::ConfigError, 'Database ' + @dbname + ' doesn\'t exist. Create it first, please. Existing databases: ' + existing_databases.join(',') end rescue InfluxDB::AuthenticationError, InfluxDB::Error log.info "skip database presence check because '#{@user}' user doesn't have admin privilege. Check '#{@dbname}' exists on influxdb" end end
time_precise_lambda()
click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 207 def time_precise_lambda() case @time_precision.to_sym when :h then lambda{|nstime| nstime / (10 ** 9) / (60 ** 2) } when :m then lambda{|nstime| nstime / (10 ** 9) / 60 } when :s then lambda{|nstime| nstime / (10 ** 9) } when :ms then lambda{|nstime| nstime / (10 ** 6) } when :u then lambda{|nstime| nstime / (10 ** 3) } when :ns then lambda{|nstime| nstime } else raise Fluent::ConfigError, 'time_precision ' + @time_precision + ' is invalid.' + 'should specify either either hour (h), minutes (m), second (s), millisecond (ms), microsecond (u), or nanosecond (ns)' end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 129 def write(chunk) points = [] tag = chunk.metadata.tag chunk.msgpack_each do |time, record| timestamp = record.delete(@time_key) || time if tag_keys.empty? && !@auto_tags values = record tags = {} else values = {} tags = {} record.each_pair do |k, v| if (@auto_tags && v.is_a?(String)) || @tag_keys.include?(k) # If the tag value is not nil, empty, or a space, add the tag if v.to_s.strip != '' tags[k] = v end else values[k] = v end end end if @sequence_tag if @prev_timestamp == timestamp @seq += 1 else @seq = 0 end tags[@sequence_tag] = @seq @prev_timestamp = timestamp end if values.empty? log.warn "Skip record '#{record}', because InfluxDB requires at least one value in raw" next end if @cast_number_to_float values.each do |key, value| if value.is_a?(Integer) values[key] = Float(value) end end end point = { timestamp: timestamp, series: @measurement || tag, values: values, tags: tags, } retention_policy = @default_retention_policy unless @retention_policy_key.nil? retention_policy = record.delete(@retention_policy_key) || @default_retention_policy unless points.nil? if retention_policy != @default_retention_policy # flush the retention policy first @influxdb.write_points(points, nil, @default_retention_policy) points = nil end end end if points.nil? @influxdb.write_points([point], nil, retention_policy) else points << point end end unless points.nil? if @default_retention_policy.nil? @influxdb.write_points(points) else @influxdb.write_points(points, nil, @default_retention_policy) end end end