class Fluent::ECSFilter

Parses ECS data from docker to make fluentd logs more useful.

Public Instance Methods

configure(conf) click to toggle source

Get the configuration for the plugin

Calls superclass method
# File lib/fluent/plugin/filter_ecs_filter.rb, line 31
def configure(conf)
  super

  require 'docker-api'
  require 'lru_redux'
  require 'oj'
  require 'time'
  require 'vine'

  @cache_ttl = :none if @cache_ttl < 0

  @cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl)
end
filter_stream(tag, es) click to toggle source

Gets the log event stream and moifies it. This is where the plugin hooks into the fluentd envent stream.

# File lib/fluent/plugin/filter_ecs_filter.rb, line 47
def filter_stream(tag, es)
  new_es = MultiEventStream.new
  container_id_from_tag = nil

  if container_id_attr.nil?
    container_id_from_tag = get_container_id_from_tag(tag)
  end

  es.each do |time, record|
    #puts "OMG #{container_id_from_tag}"
    if container_id_from_tag.nil?
      puts "GETTING FROM RECORD"
      container_id = get_container_id_from_record(record)
      puts "GOT CONTAINER ID #{container_id} from record"
    else
      container_id = container_id_from_tag
    end
    next unless container_id
    new_es.add(time, modify_record(record, get_ecs_data(container_id)))
  end
  new_es
end
get_container_id_from_record(record) click to toggle source

If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there. If no container_id can be found, the record is not processed.

Attributes::

  • record - The record that is being transformed by the filter

Returns:

  • A docker container id

# File lib/fluent/plugin/filter_ecs_filter.rb, line 137
def get_container_id_from_record(record)
  record.access(@container_id_attr)
end
get_container_id_from_tag(tag) click to toggle source

Gets the container id from the last element in the tag. If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there.

Attributes:

  • tag - The tag of the log being processed

Returns:

  • A docker container id

# File lib/fluent/plugin/filter_ecs_filter.rb, line 125
def get_container_id_from_tag(tag)
  tag.split('.').last
end
get_container_metadata(id) click to toggle source

Goes out to docker container to pull ecs data from labels.

Attributes:

  • id - The id of the container to look at for ecs metadata.

Returns:

  • A hash that describes a ecs task gathered from the Docker API

# File lib/fluent/plugin/filter_ecs_filter.rb, line 104
def get_container_metadata(id)
  task_data = {}
  container = Docker::Container.get(id)
  if container
    labels = container.json['Config']['Labels']
    task_data['task_family']  = labels['com.amazonaws.ecs.task-definition-family']
    task_data['task_family'].prepend(@task_family_prepend) if @task_family_prepend
    task_data['task_version'] = labels['com.amazonaws.ecs.task-definition-version']
    task_data['task_id']      = labels['com.amazonaws.ecs.task-arn'].split('/').last
  end
  task_data
end
get_ecs_data(container_id) click to toggle source

Gets the ecs data about a container from the cache or calls the Docker api to retrieve the data about the container and store it in the cache.

Attributes:

  • container_id - The container_id where the log record originated from.

Returns:

  • A hash of data that describes a ecs task

# File lib/fluent/plugin/filter_ecs_filter.rb, line 92
def get_ecs_data(container_id)
  @cache.getset(container_id) do
    get_container_metadata(container_id)
  end
end
merge_json_log(record) click to toggle source

Look at the log value and if it is valid json then we will parse the json and merge it into the log record. If a namespace is present then the log record is placed under that key.

Attributes:

  • record - The record we are transforming in the fluentd event stream.

Examples

# Docker captures stdout and passes it in the 'log' record attribute. # We try to discover is the value of 'log' is json, if it is then we # will parse the json and add the keys and values to the record.

Returns:

  • A record hash that has json log data merged into the record

# File lib/fluent/plugin/filter_ecs_filter.rb, line 152
def merge_json_log(record)
  if record.key?('log')
    log = record['log'].strip
    namespace = record['namespace']
    if log[0].eql?('{') && log[-1].eql?('}')
      begin
        log_json = Oj.load(log)
        if namespace
          record[namespace] = log_json
        else
          record = log_json.merge(record)
        end
      rescue Oj::ParseError
      end
    end
  end
  record
end
modify_record(record, ecs_data) click to toggle source

Injects the ecs data into the record and also merges the json log if that configuration is enabled.

Attributes:

  • record - The log record being processed

  • ecs_data - The ecs data retrived from the docker container

Returns:

  • A record hash that has ecs data and optinally log data added

# File lib/fluent/plugin/filter_ecs_filter.rb, line 79
def modify_record(record, ecs_data)
  modified_record = record.merge(ecs_data)
  modified_record = merge_json_log(modified_record) if @merge_json_log
  modified_record
end