class Fluent::Plugin::LokiOutput

Subclass of Fluent Plugin Output

Constants

DEFAULT_BUFFER_TYPE

Attributes

record_accessors[RW]

Public Instance Methods

client_cert_configured?() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 128
def client_cert_configured?
  !@key.nil? && !@cert.nil?
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 81
def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
  compat_parameters_convert(conf, :buffer)
  super
  @uri = URI.parse(@url + '/loki/api/v1/push')
  unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
    raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme'
  end

  @record_accessors = {}
  conf.elements.select { |element| element.name == 'label' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning
      v = k if v.empty?
      @record_accessors[k] = record_accessor_create(v)
    end
  end
  @remove_keys_accessors = []
  @remove_keys.each do |key|
    @remove_keys_accessors.push(record_accessor_create(key))
  end

  # If configured, load and validate client certificate (and corresponding key)
  if client_cert_configured?
    load_client_cert
    validate_client_cert_key
  end

  raise "bearer_token_file #{@bearer_token_file} not found" if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)

  @auth_token_bearer = nil
  if !@bearer_token_file.nil?
    if !File.exist?(@bearer_token_file)
      raise "bearer_token_file #{@bearer_token_file} not found"
    end

    # Read the file once, assume long-lived authentication token.
    @auth_token_bearer = File.read(@bearer_token_file)
    if @auth_token_bearer.empty?
      raise "bearer_token_file #{@bearer_token_file} is empty"
    end
    log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header"
  end


  raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end
generic_to_loki(chunk) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 201
def generic_to_loki(chunk)
  # log.debug("GenericToLoki: converting #{chunk}")
  streams = chunk_to_loki(chunk)
  payload = payload_builder(streams)
  payload
end
http_request_opts(uri) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 171
def http_request_opts(uri)
  opts = {
    use_ssl: uri.scheme == 'https'
  }

  # Optionally disable server server certificate verification.
  if @insecure_tls
    opts = opts.merge(
      verify_mode: OpenSSL::SSL::VERIFY_NONE
    )
  end

  # Optionally present client certificate
  if !@cert.nil? && !@key.nil?
    opts = opts.merge(
      cert: @cert,
      key: @key
    )
  end

  # For server certificate verification: set custom CA bundle.
  # Only takes effect when `insecure_tls` is not set.
  unless @ca_cert.nil?
    opts = opts.merge(
      ca_file: @ca_cert
    )
  end
  opts
end
load_client_cert() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 132
def load_client_cert
  @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
  @key = OpenSSL::PKey.read(File.read(@key)) if @key
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 143
def multi_workers_ready?
  true
end
validate_client_cert_key() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 137
def validate_client_cert_key
  if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
    raise "Unsupported private key type #{key.class}"
  end
end
write(chunk) click to toggle source

flush a chunk to loki

# File lib/fluent/plugin/out_loki.rb, line 148
def write(chunk)
  # streams by label
  payload = generic_to_loki(chunk)
  body = { 'streams' => payload }

  tenant = extract_placeholders(@tenant, chunk) if @tenant

  # add ingest path to loki url
  res = loki_http_request(body, tenant)

  if res.is_a?(Net::HTTPSuccess)
    log.debug "POST request was responded to with status code #{res.code}"
    return
  end

  res_summary = "#{res.code} #{res.message} #{res.body}"
  log.warn "failed to write post to #{@uri} (#{res_summary})"
  log.debug Yajl.dump(body)

  # Only retry 429 and 500s
  raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError)
end

Private Instance Methods

chunk_to_loki(chunk) click to toggle source

iterate through each chunk and create a loki stream entry

# File lib/fluent/plugin/out_loki.rb, line 342
def chunk_to_loki(chunk)
  streams = {}
  last_time = nil
  chunk.each do |time, record|
    # each chunk has a unique set of labels
    last_time = time if last_time.nil?
    result = line_to_loki(record)
    chunk_labels = result[:labels]
    # initialize a new stream with the chunk_labels if it does not exist
    streams[chunk_labels] = [] if streams[chunk_labels].nil?
    # NOTE: timestamp must include nanoseconds
    # append to matching chunk_labels key
    streams[chunk_labels].push(
      'ts' => to_nano(time),
      'line' => result[:line]
    )
  end
  streams
end
format_labels(data_labels) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 235
def format_labels(data_labels)
  formatted_labels = {}
  # merge extra_labels with data_labels. If there are collisions extra_labels win.
  data_labels = {} if data_labels.nil?
  data_labels = data_labels.merge(@extra_labels)
  # sanitize label values
  data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') if v && v&.is_a?(String) }
  formatted_labels
end
line_to_loki(record) click to toggle source

convert a line to loki line with labels

# File lib/fluent/plugin/out_loki.rb, line 297
def line_to_loki(record)
  chunk_labels = {}
  line = ''
  if record.is_a?(Hash)
    @record_accessors&.each do |name, accessor|
      new_key = name.gsub(%r{[.\-\/]}, '_')
      chunk_labels[new_key] = accessor.call(record)
      accessor.delete(record)
    end

    if @extract_kubernetes_labels && record.key?('kubernetes')
      kubernetes_labels = record['kubernetes']['labels']
      kubernetes_labels.each_key do |l|
        new_key = l.gsub(%r{[.\-\/]}, '_')
        chunk_labels[new_key] = kubernetes_labels[l]
      end
    end

    # remove needless keys.
    @remove_keys_accessors&.each do |deleter|
      deleter.delete(record)
    end

    line = record_to_line(record)
  else
    line = record.to_s
  end

  # add buffer flush thread title as a label if there are multiple flush threads
  # this prevents "entry out of order" errors in loki by making the label constellation
  # unique per flush thread
  # note that flush thread != fluentd worker. if you use multiple workers you still need to
  # add the worker id as a label
  if @buffer_config.flush_thread_count > 1
    chunk_labels['fluentd_thread'] = Thread.current[:_fluentd_plugin_helper_thread_title].to_s
  end

  # return both the line content plus the labels found in the record
  {
    line: line,
    labels: chunk_labels
  }
end
loki_http_request(body, tenant) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 210
def loki_http_request(body, tenant)
  req = Net::HTTP::Post.new(
    @uri.request_uri
  )
  req.add_field('Content-Type', 'application/json')
  req.add_field('Authorization', "Bearer #{@auth_token_bearer}") if !@auth_token_bearer.nil?
  req.add_field('X-Scope-OrgID', tenant) if tenant
  req.body = Yajl.dump(body)
  req.basic_auth(@username, @password) if @username

  opts = http_request_opts(@uri)

  msg = "sending #{req.body.length} bytes to loki"
  msg += " (tenant: \"#{tenant}\")" if tenant
  log.debug msg

  Net::HTTP.start(@uri.host, @uri.port, **opts) { |http| http.request(req) }
end
numeric?(val) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 229
def numeric?(val)
  !Float(val).nil?
rescue StandardError
  false
end
payload_builder(streams) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 245
def payload_builder(streams)
  payload = []
  streams.each do |k, v|
    # create a stream for each label set.
    # Additionally sort the entries by timestamp just in case we
    # got them out of order.
    entries = v.sort_by.with_index { |hsh, i| [hsh['ts'], i] }
    payload.push(
      'stream' => format_labels(k),
      'values' => entries.map { |e| [e['ts'].to_s, e['line']] }
    )
  end
  payload
end
record_to_line(record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 270
def record_to_line(record)
  line = ''
  if @drop_single_key && record.keys.length == 1
    line = record[record.keys[0]]
  else
    case @line_format
    when :json
      line = Yajl.dump(record)
    when :key_value
      formatted_labels = []
      record.each do |k, v|
        # Escape double quotes and backslashes by prefixing them with a backslash
        v = v.to_s.gsub(%r{(["\\])}, '\\\\\1')
        if v.include?(' ') || v.include?('=')
          formatted_labels.push(%(#{k}="#{v}"))
        else
          formatted_labels.push(%(#{k}=#{v}))
        end
      end
      line = formatted_labels.join(' ')
    end
  end
  line
end
to_nano(time) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 260
def to_nano(time)
  # time is a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch)
  # https://docs.fluentd.org/plugin-development/api-plugin-output#chunk-each-and-block
  if time.is_a?(Fluent::EventTime)
    time.to_i * (10**9) + time.nsec
  else
    time.to_i * (10**9)
  end
end