class Fluent::Plugin::LokiOutput

Constants

DEFAULT_BUFFER_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 13
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 55
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered
end
create_request(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 106
def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url+"/api/prom/push")
  req = Net::HTTP::Post.new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 195
def format(tag, time, record)
  [time, record].to_msgpack
end
format_labels(labels, record = {}) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 172
def format_labels(labels, record = {})
  st = "{"
  labels.each do |key, val|
    st+= "#{key}=\"#{val}\","
  end
  record.each do |key, val|
    st+= "#{key}=\"#{val}\"," unless key == "log"
  end
  st[-1]="}"
  return st
end
format_url(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 78
def format_url(tag, time, record)
  @endpoint_url
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 199
def formatted_to_msgpack_binary?
  true
end
handle_record(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 183
def handle_record(tag, time, record)
  rec = {"streams"=>[{"labels"=>format_labels(@labels, record), "entries"=>[{"ts"=>Time.now.iso8601(3), "line"=>record["log"]}]}]}
  # I used time now instead of at 'time' because it cause 'Entry out of order' on loki's side
  log.debug(rec.to_json)
  req, uri = create_request(tag, time, rec)
  send_request(req, uri)
end
http_opts(uri) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 115
def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 203
def multi_workers_ready?
  true
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 191
def prefer_buffered_processing
  @buffered
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 207
def process(tag, es)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end
proxies() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 124
def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end
send_request(req, uri) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 128
def send_request(req, uri)
  #is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  #if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    #log.info('Dropped request due to rate limiting')
    #return
  #end

  res = nil

  begin
    if @authentication == :basic
      req.basic_auth(@username, @password)
    elsif @authentication == :bearer
      req['authorization'] = "bearer #{@token}"
    elsif @authentication == :jwt
      req['authorization'] = "jwt #{@token}"
    end
    @last_request_time = Time.now.to_f

    if proxy = proxies
      proxy_uri = URI.parse(proxy)

      res = Net::HTTP.start(uri.host, uri.port,
                            proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password,
                            **http_opts(uri)) {|http| http.request(req) }
    else
      res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) }
    end

  rescue => e # rescue all StandardErrors
    # server didn't respond
    log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        log.warn "failed to #{req.method} #{uri} (#{res_summary})"
     end #end unless
  end # end begin
end
set_body(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 82
def set_body(req, tag, time, record)
  set_json_body(req, record)
  req
end
set_header(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 87
def set_header(req, tag, time, record)
  if @tenant
    req["X-Scope-OrgID"] = @tenant
  end
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end
set_json_body(req, data) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 101
def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 74
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 70
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 213
def write(chunk)
  tag = chunk.metadata.tag
  @endpoint_url = extract_placeholders(@endpoint_url, chunk.metadata)
  chunk.msgpack_each do |time, record|
    handle_record(tag, time, record)
  end
end