class Fluent::DatadogOutput

Constants

DD_DEFAULT_HTTP_ENDPOINT
DD_DEFAULT_TCP_ENDPOINT
DD_MAX_BATCH_LENGTH

Max limits for transport regardless of Fluentd buffer, respecting docs.datadoghq.com/api/?lang=bash#logs

DD_MAX_BATCH_SIZE
DD_TRUNCATION_SUFFIX
DEFAULT_BUFFER_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 64
def initialize
  super
end

Public Instance Methods

batch_http_events(encoded_events, max_batch_length, max_request_size) click to toggle source

Group HTTP events in batches

# File lib/fluent/plugin/out_datadog.rb, line 166
def batch_http_events(encoded_events, max_batch_length, max_request_size)
  batches = []
  current_batch = []
  current_batch_size = 0
  encoded_events.each_with_index do |encoded_event, i|
    current_event_size = encoded_event.bytesize
    # If this unique log size is bigger than the request size, truncate it
    if current_event_size > max_request_size
      encoded_event = truncate(encoded_event, max_request_size)
      current_event_size = encoded_event.bytesize
    end

    if (i > 0 and i % max_batch_length == 0) or (current_batch_size + current_event_size > max_request_size)
      batches << current_batch
      current_batch = []
      current_batch_size = 0
    end

    current_batch_size += encoded_event.bytesize
    current_batch << encoded_event
  end
  batches << current_batch
  batches
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 68
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  return if @dd_hostname

  if not @use_http and @host == DD_DEFAULT_HTTP_ENDPOINT
    @host = DD_DEFAULT_TCP_ENDPOINT
  end

  # Set dd_hostname if not already set (can be set when using fluentd as aggregator)
  @dd_hostname = %x[hostname -f 2> /dev/null].strip
  @dd_hostname = Socket.gethostname if @dd_hostname.empty?
end
enrich_record(tag, time, record) click to toggle source

Enrich records with metadata such as service, tags or source

# File lib/fluent/plugin/out_datadog.rb, line 211
def enrich_record(tag, time, record)
  if @dd_sourcecategory
    record["ddsourcecategory"] ||= @dd_sourcecategory
  end
  if @dd_source
    record["ddsource"] ||= @dd_source
  end
  if @dd_tags
    record["ddtags"] ||= @dd_tags
  end
  if @service
    record["service"] ||= @service
  end
  if @dd_hostname
    # set the record hostname to the configured dd_hostname only
    # if the record hostname is empty, ensuring having a hostname set
    # even if the record doesn't contain any.
    record["hostname"] ||= @dd_hostname
  end

  if @include_tag_key
    record[@tag_key] = tag
  end
  # If @timestamp_key already exists, we don't overwrite it.
  if @timestamp_key and record[@timestamp_key].nil? and time
    record[@timestamp_key] = Time.at(time).utc.iso8601(3)
  end

  container_tags = get_container_tags(record)
  unless container_tags.empty?
    if record["ddtags"].nil? || record["ddtags"].empty?
      record["ddtags"] = container_tags
    else
      record["ddtags"] = record["ddtags"] + "," + container_tags
    end
  end
  record
end
format(tag, time, record) click to toggle source

This method is called when an event reaches Fluentd.

# File lib/fluent/plugin/out_datadog.rb, line 105
def format(tag, time, record)
  # When Fluent::EventTime is msgpack'ed it gets converted to int with seconds
  # precision only. We explicitly convert it to floating point number, which
  # is compatible with Time.at below.
  record = enrich_record(tag, time.to_f, record)
  if @use_http
    record = Yajl.dump(record)
  else
    if @use_json
      record = "#{api_key} #{Yajl.dump(record)}"
    else
      record = "#{api_key} #{record}"
    end
  end
  [record].to_msgpack
end
format_http_event_batch(events) click to toggle source

Format batch of http events

# File lib/fluent/plugin/out_datadog.rb, line 206
def format_http_event_batch(events)
  "[#{events.join(',')}]"
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 86
def formatted_to_msgpack_binary?
  true
end
get_container_tags(record) click to toggle source

Collect docker and kubernetes tags for your logs using `filter_kubernetes_metadata` plugin, for more information about the attribute names, check: github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter/blob/master/lib/fluent/plugin/filter_kubernetes_metadata.rb#L265

# File lib/fluent/plugin/out_datadog.rb, line 395
def get_container_tags(record)
  [
      get_kubernetes_tags(record),
      get_docker_tags(record)
  ].compact.join(",")
end
get_docker_tags(record) click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 415
def get_docker_tags(record)
  if record.key?('docker') and not record.fetch('docker').nil?
    docker = record['docker']
    tags = Array.new
    tags.push("container_id:" + docker['container_id']) unless docker['container_id'].nil?
    return tags.join(",")
  end
  nil
end
get_kubernetes_tags(record) click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 402
def get_kubernetes_tags(record)
  if record.key?('kubernetes') and not record.fetch('kubernetes').nil?
    kubernetes = record['kubernetes']
    tags = Array.new
    tags.push("image_name:" + kubernetes['container_image']) unless kubernetes['container_image'].nil?
    tags.push("container_name:" + kubernetes['container_name']) unless kubernetes['container_name'].nil?
    tags.push("kube_namespace:" + kubernetes['namespace_name']) unless kubernetes['namespace_name'].nil?
    tags.push("pod_name:" + kubernetes['pod_name']) unless kubernetes['pod_name'].nil?
    return tags.join(",")
  end
  nil
end
gzip_compress(payload, compression_level) click to toggle source

Compress logs with GZIP

# File lib/fluent/plugin/out_datadog.rb, line 251
def gzip_compress(payload, compression_level)
  gz = StringIO.new
  gz.set_encoding("BINARY")
  z = Zlib::GzipWriter.new(gz, compression_level)
  begin
    z.write(payload)
  ensure
    z.close
  end
  gz.string
end
max(a, b) click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 201
def max(a, b)
  a > b ? a : b
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 82
def multi_workers_ready?
  true
end
new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression) click to toggle source

Build a new transport client

# File lib/fluent/plugin/out_datadog.rb, line 264
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression)
  if use_http
    DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key
  else
    DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port
  end
end
process_http_events(events, use_compression, compression_level, max_retries, max_backoff, max_batch_length, max_batch_size) click to toggle source

Process and send a set of http events. Potentially break down this set of http events in smaller batches

# File lib/fluent/plugin/out_datadog.rb, line 146
def process_http_events(events, use_compression, compression_level, max_retries, max_backoff, max_batch_length, max_batch_size)
  batches = batch_http_events(events, max_batch_length, max_batch_size)
  batches.each do |batched_event|
    formatted_events = format_http_event_batch(batched_event)
    if use_compression
      formatted_events = gzip_compress(formatted_events, compression_level)
    end
    @client.send_retries(formatted_events, max_retries, max_backoff)
  end
end
process_tcp_event(event, max_retries, max_backoff, max_batch_size) click to toggle source

Process and send a single tcp event

# File lib/fluent/plugin/out_datadog.rb, line 158
def process_tcp_event(event, max_retries, max_backoff, max_batch_size)
  if event.bytesize > max_batch_size
    event = truncate(event, max_batch_size)
  end
  @client.send_retries(event, max_retries, max_backoff)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 95
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 90
def start
  super
  @client = new_client(log, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @ssl_port, @port, @http_proxy, @use_compression)
end
terminate() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 99
def terminate
  super
  @client.close if @client
end
truncate(event, max_length) click to toggle source

Truncate events over the provided max length, appending a marker when truncated

# File lib/fluent/plugin/out_datadog.rb, line 192
def truncate(event, max_length)
  if event.length > max_length
    event = event[0..max_length - 1]
    event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length - 1] = DD_TRUNCATION_SUFFIX
    return event
  end
  event
end
write(chunk) click to toggle source

NOTE! This method is called by internal thread, not Fluentd's main thread. 'chunk' is a buffer chunk that includes multiple formatted events.

# File lib/fluent/plugin/out_datadog.rb, line 125
def write(chunk)
  begin
    if @use_http
      events = Array.new
      chunk.msgpack_each do |record|
        next if record.empty?
        events.push record[0]
      end
      process_http_events(events, @use_compression, @compression_level, @max_retries, @max_backoff, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE)
    else
      chunk.msgpack_each do |record|
        next if record.empty?
        process_tcp_event(record[0], @max_retries, @max_backoff, DD_MAX_BATCH_SIZE)
      end
    end
  rescue Exception => e
    @logger.error("Uncaught processing exception in datadog forwarder #{e.message}")
  end
end