class Fluent::DatadogOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 36
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 45
def configure(conf)
  super
end
format(tag, time, record) click to toggle source

This method is called when an event reaches Fluentd.

# File lib/fluent/plugin/out_datadog.rb, line 91
def format(tag, time, record)
  # When Fluent::EventTime is msgpack'ed it gets converted to int with seconds
  # precision only. We explicitly convert it to floating point number, which
  # is compatible with Time.at below.
  return [tag, time.to_f, record].to_msgpack
end
get_attribute_tags(record, attribute_tags) click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 220
def get_attribute_tags(record, attribute_tags)
  if not attribute_tags.nil? and not record.nil?
    tags = Array.new
    # attributes are separated by ,
    for attribute_tag in attribute_tags.split(",")
      # attribute split into key:attribute
      keyattr = attribute_tag.split(":")
      key = keyattr.at(0)
      if keyattr.length == 2
        attr = keyattr.at(1)
      else
        attr = key
      end

      # split attribute name by .
      subrecord = record
      attrparts = attr.split(".")
      for index in (0...attrparts.length - 1)
        subrecord = subrecord[attrparts[index]] unless subrecord[attrparts[index]].nil?
      end
      tags.push(key + ":" + subrecord[attrparts.at(attrparts.length - 1)]) unless subrecord[attrparts.at(attrparts.length - 1)].nil?

      # set source/host from here if present
      if key == "ddsource"
        record["ddsource"] = subrecord[attrparts.at(attrparts.length - 1)]
      elsif key == "host"
        record["host"] = subrecord[attrparts.at(attrparts.length - 1)]
      end
    end
    return tags.join(",")
  end
  return nil
end
get_container_tags(record) click to toggle source

Collect docker and kubernetes tags for your logs using `filter_kubernetes_metadata` plugin, for more information about the attribute names, check: github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter/blob/master/lib/fluent/plugin/filter_kubernetes_metadata.rb#L265

# File lib/fluent/plugin/out_datadog.rb, line 190
def get_container_tags(record)
  return [
    get_kubernetes_tags(record),
    get_docker_tags(record)
  ].compact.join(",")
end
get_docker_tags(record) click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 210
def get_docker_tags(record)
  if record.key?('docker') and not record.fetch('docker').nil?
    docker = record['docker']
    tags = Array.new
    tags.push("container_id:" + docker['container_id']) unless docker['container_id'].nil?
    return tags.join(",")
  end
  return nil
end
get_kubernetes_tags(record) click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 197
def get_kubernetes_tags(record)
  if record.key?('kubernetes') and not record.fetch('kubernetes').nil?
    kubernetes = record['kubernetes']
    tags = Array.new
    tags.push("image_name:" + kubernetes['container_image']) unless kubernetes['container_image'].nil?
    tags.push("container_name:" + kubernetes['container_name']) unless kubernetes['container_name'].nil?
    tags.push("kube_namespace:" + kubernetes['namespace_name']) unless kubernetes['namespace_name'].nil?
    tags.push("pod_name:" + kubernetes['pod_name']) unless kubernetes['pod_name'].nil?
    return tags.join(",")
  end
  return nil
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 49
def multi_workers_ready?
  true
end
new_client() click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 53
def new_client
  if @use_ssl
    context    = OpenSSL::SSL::SSLContext.new
    socket     = TCPSocket.new @host, @ssl_port
    ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
    ssl_client.connect
    return ssl_client
  else
    return TCPSocket.new @host, @port
  end
end
send_to_datadog(events) click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 153
def send_to_datadog(events)
  @my_mutex.synchronize do
    events.each do |event|
      log.trace "Datadog plugin: about to send event=#{event}"
      retries = 0
      begin
        log.info "New attempt to Datadog attempt=#{retries}" if retries > 1
        @client ||= new_client
        @client.write(event)
      rescue => e
        @client.close rescue nil
        @client = nil

        if retries == 0
          # immediately retry, in case it's just a server-side close
          retries += 1
          retry
        end

        if retries < @max_retries || @max_retries == -1
          a_couple_of_seconds = retries ** 2
          a_couple_of_seconds = 30 unless a_couple_of_seconds < 30
          retries += 1
          log.warn "Could not push event to Datadog, attempt=#{retries} max_attempts=#{max_retries} wait=#{a_couple_of_seconds}s error=#{e}"
          sleep a_couple_of_seconds
          retry
        end
        raise ConnectionFailure, "Could not push event to Datadog after #{retries} retries, #{e}"
      end
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 82
def shutdown
  super
  @running = false
  if @client
    @client.close
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 65
def start
  super
  @my_mutex = Mutex.new
  @running = true

  if @tcp_ping_rate > 0
    @timer = Thread.new do
      while @running do
        messages = Array.new
        messages.push("fp\n")
        send_to_datadog(messages)
        sleep(@tcp_ping_rate)
      end
    end
  end
end
write(chunk) click to toggle source

NOTE! This method is called by internal thread, not Fluentd's main thread. 'chunk' is a buffer chunk that includes multiple formatted events.

# File lib/fluent/plugin/out_datadog.rb, line 100
def write(chunk)
  messages = Array.new

  chunk.msgpack_each do |tag, time, record|
    next unless record.is_a? Hash
    next if record.empty?

    if @dd_sourcecategory
      record["ddsourcecategory"] = @dd_sourcecategory
    end
    if @dd_source
      record["ddsource"] = @dd_source
    end
    if @dd_tags
      record["ddtags"] = @dd_tags
    end

    if @include_tag_key
      record[@tag_key] = tag
    end
    # If @timestamp_key already exists, we don't overwrite it.
    if @timestamp_key and record[@timestamp_key].nil? and time
      record[@timestamp_key] = Time.at(time).utc.iso8601(3)
    end

    attribute_tags = get_attribute_tags(record, @dd_attribute_tags)
    if not attribute_tags.empty?
      if record["ddtags"].nil? || record["ddtags"].empty?
        record["ddtags"] = attribute_tags
      else
        record["ddtags"] = record["ddtags"] + "," + attribute_tags
      end
    end

    container_tags = get_container_tags(record)
    if not container_tags.empty?
      if record["ddtags"].nil? || record["ddtags"].empty?
        record["ddtags"] = container_tags
      else
        record["ddtags"] = record["ddtags"] + "," + container_tags
      end
    end

    if @use_json
      messages.push "#{api_key} " + Yajl.dump(record) + "\n"
    else
      next unless record.has_key? "message"
      messages.push "#{api_key} " + record["message"].strip + "\n"
    end
  end
  send_to_datadog(messages)
end