class Fluent::DatadogOutput
Constants
- DD_DEFAULT_HTTP_ENDPOINT
- DD_DEFAULT_TCP_ENDPOINT
- DD_MAX_BATCH_LENGTH
Max limits for transport regardless of Fluentd buffer, respecting docs.datadoghq.com/api/?lang=bash#logs
- DD_MAX_BATCH_SIZE
- DD_TRUNCATION_SUFFIX
- DEFAULT_BUFFER_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 64 def initialize super end
Public Instance Methods
batch_http_events(encoded_events, max_batch_length, max_request_size)
click to toggle source
Group HTTP events in batches
# File lib/fluent/plugin/out_datadog.rb, line 166 def batch_http_events(encoded_events, max_batch_length, max_request_size) batches = [] current_batch = [] current_batch_size = 0 encoded_events.each_with_index do |encoded_event, i| current_event_size = encoded_event.bytesize # If this unique log size is bigger than the request size, truncate it if current_event_size > max_request_size encoded_event = truncate(encoded_event, max_request_size) current_event_size = encoded_event.bytesize end if (i > 0 and i % max_batch_length == 0) or (current_batch_size + current_event_size > max_request_size) batches << current_batch current_batch = [] current_batch_size = 0 end current_batch_size += encoded_event.bytesize current_batch << encoded_event end batches << current_batch batches end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 68 def configure(conf) compat_parameters_convert(conf, :buffer) super return if @dd_hostname if not @use_http and @host == DD_DEFAULT_HTTP_ENDPOINT @host = DD_DEFAULT_TCP_ENDPOINT end # Set dd_hostname if not already set (can be set when using fluentd as aggregator) @dd_hostname = %x[hostname -f 2> /dev/null].strip @dd_hostname = Socket.gethostname if @dd_hostname.empty? end
enrich_record(tag, time, record)
click to toggle source
Enrich records with metadata such as service, tags or source
# File lib/fluent/plugin/out_datadog.rb, line 211 def enrich_record(tag, time, record) if @dd_sourcecategory record["ddsourcecategory"] ||= @dd_sourcecategory end if @dd_source record["ddsource"] ||= @dd_source end if @dd_tags record["ddtags"] ||= @dd_tags end if @service record["service"] ||= @service end if @dd_hostname # set the record hostname to the configured dd_hostname only # if the record hostname is empty, ensuring having a hostname set # even if the record doesn't contain any. record["hostname"] ||= @dd_hostname end if @include_tag_key record[@tag_key] = tag end # If @timestamp_key already exists, we don't overwrite it. if @timestamp_key and record[@timestamp_key].nil? and time record[@timestamp_key] = Time.at(time).utc.iso8601(3) end container_tags = get_container_tags(record) unless container_tags.empty? if record["ddtags"].nil? || record["ddtags"].empty? record["ddtags"] = container_tags else record["ddtags"] = record["ddtags"] + "," + container_tags end end record end
format(tag, time, record)
click to toggle source
This method is called when an event reaches Fluentd.
# File lib/fluent/plugin/out_datadog.rb, line 105 def format(tag, time, record) # When Fluent::EventTime is msgpack'ed it gets converted to int with seconds # precision only. We explicitly convert it to floating point number, which # is compatible with Time.at below. record = enrich_record(tag, time.to_f, record) if @use_http record = Yajl.dump(record) else if @use_json record = "#{api_key} #{Yajl.dump(record)}" else record = "#{api_key} #{record}" end end [record].to_msgpack end
format_http_event_batch(events)
click to toggle source
Format batch of http events
# File lib/fluent/plugin/out_datadog.rb, line 206 def format_http_event_batch(events) "[#{events.join(',')}]" end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 86 def formatted_to_msgpack_binary? true end
gzip_compress(payload, compression_level)
click to toggle source
Compress logs with GZIP
# File lib/fluent/plugin/out_datadog.rb, line 251 def gzip_compress(payload, compression_level) gz = StringIO.new gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz, compression_level) begin z.write(payload) ensure z.close end gz.string end
max(a, b)
click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 201 def max(a, b) a > b ? a : b end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_datadog.rb, line 82 def multi_workers_ready? true end
new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression)
click to toggle source
Build a new transport client
# File lib/fluent/plugin/out_datadog.rb, line 264 def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression) if use_http DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key else DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port end end
process_http_events(events, use_compression, compression_level, max_retries, max_backoff, max_batch_length, max_batch_size)
click to toggle source
Process and send a set of http events. Potentially break down this set of http events in smaller batches
# File lib/fluent/plugin/out_datadog.rb, line 146 def process_http_events(events, use_compression, compression_level, max_retries, max_backoff, max_batch_length, max_batch_size) batches = batch_http_events(events, max_batch_length, max_batch_size) batches.each do |batched_event| formatted_events = format_http_event_batch(batched_event) if use_compression formatted_events = gzip_compress(formatted_events, compression_level) end @client.send_retries(formatted_events, max_retries, max_backoff) end end
process_tcp_event(event, max_retries, max_backoff, max_batch_size)
click to toggle source
Process and send a single tcp event
# File lib/fluent/plugin/out_datadog.rb, line 158 def process_tcp_event(event, max_retries, max_backoff, max_batch_size) if event.bytesize > max_batch_size event = truncate(event, max_batch_size) end @client.send_retries(event, max_retries, max_backoff) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 95 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 90 def start super @client = new_client(log, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @ssl_port, @port, @http_proxy, @use_compression) end
terminate()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog.rb, line 99 def terminate super @client.close if @client end
truncate(event, max_length)
click to toggle source
Truncate events over the provided max length, appending a marker when truncated
# File lib/fluent/plugin/out_datadog.rb, line 192 def truncate(event, max_length) if event.length > max_length event = event[0..max_length - 1] event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length - 1] = DD_TRUNCATION_SUFFIX return event end event end
write(chunk)
click to toggle source
NOTE! This method is called by internal thread, not Fluentd's main thread. 'chunk' is a buffer chunk that includes multiple formatted events.
# File lib/fluent/plugin/out_datadog.rb, line 125 def write(chunk) begin if @use_http events = Array.new chunk.msgpack_each do |record| next if record.empty? events.push record[0] end process_http_events(events, @use_compression, @compression_level, @max_retries, @max_backoff, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE) else chunk.msgpack_each do |record| next if record.empty? process_tcp_event(record[0], @max_retries, @max_backoff, DD_MAX_BATCH_SIZE) end end rescue Exception => e @logger.error("Uncaught processing exception in datadog forwarder #{e.message}") end end