class Fluent::Plugin::InfluxdbOutput

Constants

DEFAULT_BUFFER_TYPE
EMPTY_STRING
FORMATTED_RESULT_FOR_INVALID_RECORD

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 64
def initialize
  super
  @seq = 0
  @prev_timestamp = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 70
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  @time_precise = time_precise_lambda()
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 106
def format(tag, time, record)
  # nil and '' check should be in influxdb-ruby client...
  if record.empty? || record.has_value?(nil) || record.has_value?(EMPTY_STRING)
    log.warn "Skip record '#{record}' in '#{tag}', because either record has no value or at least a value is 'nil' or empty string inside the record."
    FORMATTED_RESULT_FOR_INVALID_RECORD
  else
    [precision_time(time), record].to_msgpack
  end
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 121
def formatted_to_msgpack_binary
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 125
def multi_workers_ready?
  true
end
precision_time(time) click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 227
def precision_time(time)
  # nsec is supported from v0.14
  nstime = time * (10 ** 9) + (time.is_a?(Integer) ? 0 : time.nsec)
  @time_precise.call(nstime)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 116
def shutdown
  super
  @influxdb.stop!
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb.rb, line 77
def start
  super

  log.info "Connecting to database: #{@dbname}, host: #{@host}, port: #{@port}, username: #{@user}, use_ssl = #{@use_ssl}, verify_ssl = #{@verify_ssl}"

  # ||= for testing.
  @influxdb ||= InfluxDB::Client.new @dbname, hosts: @host.split(','),
                                              port: @port,
                                              username: @user,
                                              password: @password,
                                              async: false,
                                              retry: @retry,
                                              time_precision: @time_precision,
                                              use_ssl: @use_ssl,
                                              verify_ssl: @verify_ssl

  begin
    existing_databases = @influxdb.list_databases.map { |x| x['name'] }
    unless existing_databases.include? @dbname
      raise Fluent::ConfigError, 'Database ' + @dbname + ' doesn\'t exist. Create it first, please. Existing databases: ' + existing_databases.join(',')
    end
  rescue InfluxDB::AuthenticationError, InfluxDB::Error
    log.info "skip database presence check because '#{@user}' user doesn't have admin privilege. Check '#{@dbname}' exists on influxdb"
  end
end
time_precise_lambda() click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 207
def time_precise_lambda()
  case @time_precision.to_sym
  when :h then
    lambda{|nstime| nstime / (10 ** 9) / (60 ** 2) }
  when :m then
    lambda{|nstime| nstime / (10 ** 9) / 60 }
  when :s then
    lambda{|nstime| nstime / (10 ** 9) }
  when :ms then
    lambda{|nstime| nstime / (10 ** 6) }
  when :u then
    lambda{|nstime| nstime / (10 ** 3) }
  when :ns then
    lambda{|nstime| nstime }
  else
    raise Fluent::ConfigError, 'time_precision ' + @time_precision + ' is invalid.' +
      'should specify either either hour (h), minutes (m), second (s), millisecond (ms), microsecond (u), or nanosecond (ns)'
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_influxdb.rb, line 129
def write(chunk)
  points = []
  tag = chunk.metadata.tag
  chunk.msgpack_each do |time, record|
    timestamp = record.delete(@time_key) || time
    if tag_keys.empty? && !@auto_tags
      values = record
      tags = {}
    else
      values = {}
      tags = {}
      record.each_pair do |k, v|
        if (@auto_tags && v.is_a?(String)) || @tag_keys.include?(k)
          # If the tag value is not nil, empty, or a space, add the tag
          if v.to_s.strip != ''
            tags[k] = v
          end
        else
          values[k] = v
        end
      end
    end
    if @sequence_tag
      if @prev_timestamp == timestamp
        @seq += 1
      else
        @seq = 0
      end
      tags[@sequence_tag] = @seq
      @prev_timestamp = timestamp
    end

    if values.empty?
        log.warn "Skip record '#{record}', because InfluxDB requires at least one value in raw"
        next
    end
    
    if @cast_number_to_float
      values.each do |key, value|
        if value.is_a?(Integer)
          values[key] = Float(value)
        end
      end
    end

    point = {
      timestamp: timestamp,
      series: @measurement || tag,
      values: values,
      tags: tags,
    }
    retention_policy = @default_retention_policy
    unless @retention_policy_key.nil?
      retention_policy = record.delete(@retention_policy_key) || @default_retention_policy
      unless points.nil?
        if retention_policy != @default_retention_policy
          # flush the retention policy first
          @influxdb.write_points(points, nil, @default_retention_policy)
          points = nil
        end
      end
    end
    if points.nil?
      @influxdb.write_points([point], nil, retention_policy)
    else
      points << point
    end
  end

  unless points.nil?
    if @default_retention_policy.nil?
      @influxdb.write_points(points)
    else
      @influxdb.write_points(points, nil, @default_retention_policy)
    end
  end
end