class Fluent::LogtailOutput

Constants

CONTENT_TYPE
HOST
MAX_ATTEMPTS
PATH
PORT
RETRYABLE_CODES
USER_AGENT
VERSION

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logtail.rb, line 20
def configure(conf)
  @source_token = conf["source_token"]
  super
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_logtail.rb, line 25
def format(tag, time, record)
  force_utf8_string_values(record.merge("dt" => Time.at(time).utc.iso8601)).to_msgpack
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_logtail.rb, line 29
def write(chunk)
  deliver(chunk, 1)
end

Private Instance Methods

build_http_client() click to toggle source
# File lib/fluent/plugin/out_logtail.rb, line 85
def build_http_client
  http = Net::HTTP.new(HOST, PORT)
  http.use_ssl = true
  # Verification on Windows fails despite having a valid certificate.
  http.verify_mode = OpenSSL::SSL::VERIFY_NONE
  http.read_timeout = 30
  http.ssl_timeout = 10
  http.open_timeout = 10
  http
end
build_request(body) click to toggle source
# File lib/fluent/plugin/out_logtail.rb, line 96
def build_request(body)
  path = '/'
  req = Net::HTTP::Post.new(path)
  req["Authorization"] = "Bearer #{@source_token}"
  req["Content-Type"] = CONTENT_TYPE
  req["User-Agent"] = USER_AGENT
  req.body = body
  req
end
deliver(chunk, attempt) click to toggle source
# File lib/fluent/plugin/out_logtail.rb, line 34
def deliver(chunk, attempt)
  if attempt > MAX_ATTEMPTS
    log.error("msg=\"Max attempts exceeded dropping chunk\" attempt=#{attempt}")
    return false
  end

  http = build_http_client
  body = chunk.read

  begin
    resp = http.start do |conn|
      req = build_request(body)
      conn.request(req)
    end
  ensure
    http.finish if http.started?
  end

  code = resp.code.to_i
  if code >= 200 && code <= 299
    true
  elsif RETRYABLE_CODES.include?(code)
    sleep_time = sleep_for_attempt(attempt)
    log.warn("msg=\"Retryable response from the Logtail API\" " +
      "code=#{code} attempt=#{attempt} sleep=#{sleep_time}")
    sleep(sleep_time)
    deliver(chunk, attempt + 1)
  else
    log.error("msg=\"Fatal response from the Logtail API\" code=#{code} attempt=#{attempt}")
    false
  end
end
force_utf8_string_values(data) click to toggle source
# File lib/fluent/plugin/out_logtail.rb, line 73
def force_utf8_string_values(data)
  data.transform_values do |val|
    if val.is_a?(Hash)
      force_utf8_string_values(val)
    elsif val.respond_to?(:force_encoding)
      val.force_encoding('UTF-8')
    else
      val
    end
  end
end
sleep_for_attempt(attempt) click to toggle source
# File lib/fluent/plugin/out_logtail.rb, line 67
def sleep_for_attempt(attempt)
  sleep_for = attempt ** 2
  sleep_for = sleep_for <= 60 ? sleep_for : 60
  (sleep_for / 2) + (rand(0..sleep_for) / 2)
end