class Fluent::LogDNAOutput

Constants

MAX_RETRIES

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logdna.rb, line 22
def configure(conf)
  super
  @host = conf["hostname"]

  # make these two variables globals
  timeout_unit_map = { s: 1.0, ms: 0.001 }
  timeout_regex = Regexp.new("^([0-9]+)\s*(#{timeout_unit_map.keys.join('|')})$")

  # this section goes into this part of the code
  num_component = 30.0
  unit_component = "s"

  timeout_regex.match(@request_timeout) do |match|
    num_component = match[1].to_f
    unit_component = match[2]
  end

  @request_timeout = num_component * timeout_unit_map[unit_component.to_sym]
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_logdna.rb, line 57
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logdna.rb, line 52
def shutdown
  super
  @ingester.close if @ingester
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logdna.rb, line 42
def start
  super
  require "json"
  require "base64"
  require "http"
  HTTP.default_options = { keep_alive_timeout: 60 }
  @ingester = HTTP.persistent @ingester_domain
  @requests = Queue.new
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_logdna.rb, line 61
def write(chunk)
  body = chunk_to_body(chunk)
  response = send_request(body)
  raise "Encountered server error" if response.code >= 400

  response.flush
end

Private Instance Methods

chunk_to_body(chunk) click to toggle source
# File lib/fluent/plugin/out_logdna.rb, line 71
def chunk_to_body(chunk)
  data = []

  chunk.msgpack_each do |(tag, time, record)|
    line = gather_line_data(tag, time, record)
    data << line unless line[:line].empty?
  end

  { lines: data }
end
gather_line_data(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_logdna.rb, line 82
def gather_line_data(tag, time, record)
  line = {
    level: record["level"] || record["severity"] || tag.split(".").last,
    timestamp: time,
    line: record.to_json
  }
  # At least one of "file" or "app" is required.
  line[:file] = record["file"]
  line[:file] ||= @file if @file
  line.delete(:file) if line[:file].nil?
  line[:app] = record["_app"] || record["app"]
  line[:app] ||= @app if @app
  line.delete(:app) if line[:app].nil?
  line[:env] = record["env"]
  line.delete(:env) if line[:env].nil?
  line[:meta] = record["meta"]
  line.delete(:meta) if line[:meta].nil?
  line
end
send_request(body) click to toggle source
# File lib/fluent/plugin/out_logdna.rb, line 102
def send_request(body)
  now = Time.now.to_i
  url = "#{@ingester_endpoint}?hostname=#{@host}&mac=#{@mac}&ip=#{@ip}&now=#{now}&tags=#{@tags}"
  @ingester.headers("apikey" => @api_key,
                    "content-type" => "application/json")
           .timeout(connect: @request_timeout, write: @request_timeout, read: @request_timeout)
           .post(url, json: body)
end