class LogStash::Outputs::DatadogLogs

DatadogLogs lets you send logs to Datadog based on LogStash events.

Constants

DD_MAX_BATCH_LENGTH

Respect limit documented at docs.datadoghq.com/api/?lang=bash#logs

DD_MAX_BATCH_SIZE
DD_TRUNCATION_SUFFIX

Public Instance Methods

batch_http_events(encoded_events, max_batch_length, max_request_size) click to toggle source

Group HTTP events in batches

# File lib/logstash/outputs/datadog_logs.rb, line 92
def batch_http_events(encoded_events, max_batch_length, max_request_size)
  batches = []
  current_batch = []
  current_batch_size = 0
  encoded_events.each_with_index do |event, i|
    encoded_event = event.last
    current_event_size = encoded_event.bytesize
    # If this unique log size is bigger than the request size, truncate it
    if current_event_size > max_request_size
      encoded_event = truncate(encoded_event, max_request_size)
      current_event_size = encoded_event.bytesize
    end

    if (i > 0 and i % max_batch_length == 0) or (current_batch_size + current_event_size > max_request_size)
      batches << current_batch
      current_batch = []
      current_batch_size = 0
    end

    current_batch_size += encoded_event.bytesize
    current_batch << encoded_event
  end
  batches << current_batch
  batches
end
close() click to toggle source

Logstash shutdown hook

# File lib/logstash/outputs/datadog_logs.rb, line 44
def close
  @client.close
end
format_http_event_batch(batched_events) click to toggle source

Format HTTP events

# File lib/logstash/outputs/datadog_logs.rb, line 87
def format_http_event_batch(batched_events)
  "[#{batched_events.join(',')}]"
end
format_tcp_event(payload, api_key, max_request_size) click to toggle source

Format TCP event

# File lib/logstash/outputs/datadog_logs.rb, line 78
def format_tcp_event(payload, api_key, max_request_size)
  formatted_payload = "#{api_key} #{payload}"
  if (formatted_payload.bytesize > max_request_size)
    return truncate(formatted_payload, max_request_size)
  end
  formatted_payload
end
gzip_compress(payload, compression_level) click to toggle source

Compress logs with GZIP

# File lib/logstash/outputs/datadog_logs.rb, line 133
def gzip_compress(payload, compression_level)
  gz = StringIO.new
  gz.set_encoding("BINARY")
  z = Zlib::GzipWriter.new(gz, compression_level)
  begin
    z.write(payload)
  ensure
    z.close
  end
  gz.string
end
max(a, b) click to toggle source
# File lib/logstash/outputs/datadog_logs.rb, line 128
def max(a, b)
  a > b ? a : b
end
multi_receive(events) click to toggle source

Entry point of the plugin, receiving a set of Logstash events

# File lib/logstash/outputs/datadog_logs.rb, line 50
def multi_receive(events)
  return if events.empty?
  encoded_events = @codec.multi_encode(events)
  begin
    if @use_http
      batches = batch_http_events(encoded_events, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE)
      batches.each do |batched_event|
        process_encoded_payload(format_http_event_batch(batched_event))
      end
    else
      encoded_events.each do |encoded_event|
        process_encoded_payload(format_tcp_event(encoded_event.last, @api_key, DD_MAX_BATCH_SIZE))
      end
    end
  rescue => e
    @logger.error("Uncaught processing exception in datadog forwarder #{e.message}")
  end
end
new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression) click to toggle source

Build a new transport client

# File lib/logstash/outputs/datadog_logs.rb, line 146
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression)
  if use_http
    DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key
  else
    DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port
  end
end
process_encoded_payload(payload) click to toggle source

Process and send each encoded payload

# File lib/logstash/outputs/datadog_logs.rb, line 70
def process_encoded_payload(payload)
  if @use_compression and @use_http
    payload = gzip_compress(payload, @compression_level)
  end
  @client.send_retries(payload, @max_retries, @max_backoff)
end
register() click to toggle source

Register the plugin to logstash

# File lib/logstash/outputs/datadog_logs.rb, line 39
def register
  @client = new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression)
end
truncate(event, max_length) click to toggle source

Truncate events over the provided max length, appending a marker when truncated

# File lib/logstash/outputs/datadog_logs.rb, line 119
def truncate(event, max_length)
  if event.length > max_length
    event = event[0..max_length - 1]
    event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length - 1] = DD_TRUNCATION_SUFFIX
    return event
  end
  event
end