class Fluent::KubernetesOutput

Fluentd Kubernetes Output Plugin - Enrich Fluentd events with Kubernetes metadata

Copyright 2015 Red Hat, Inc.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kubernetes.rb, line 26
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kubernetes.rb, line 30
def configure(conf)
  super

  require 'docker'
  require 'json'
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_kubernetes.rb, line 37
def emit(tag, es, chain)
  es.each do |time,record|
    Fluent::Engine.emit('kubernetes',
                        time,
                        enrich_record(tag, record))
  end

  chain.next
end

Private Instance Methods

enrich_container_data(id, record) click to toggle source
# File lib/fluent/plugin/out_kubernetes.rb, line 65
def enrich_container_data(id, record)
  container = Docker::Container.get(id)
  if container
    container_name = container.json['Name']
    if container_name
      record["container_name"] = container_name[1..-1] if container_name[0] == '/'
      regex = Regexp.new(@kubernetes_pod_regex)
      match = container_name.match(regex)
      if match
        pod_container_name, pod_name, pod_namespace =
          match.captures
        record["pod_namespace"] = pod_namespace
        record["pod"] = pod_name
        record["pod_container"] = pod_container_name
      end
    end
  end
  record
end
enrich_record(tag, record) click to toggle source
# File lib/fluent/plugin/out_kubernetes.rb, line 55
def enrich_record(tag, record)
  id = interpolate(tag, @container_id)
  if !id.empty?
    record['container_id'] = id
    record = enrich_container_data(id, record)
    record = merge_json_log(record)
  end
  record
end
interpolate(tag, str) click to toggle source
# File lib/fluent/plugin/out_kubernetes.rb, line 49
def interpolate(tag, str)
  tag_parts = tag.split('.')

  str.gsub(/\$\{tag_parts\[(\d+)\]\}/) { |m| tag_parts[$1.to_i] }
end
merge_json_log(record) click to toggle source
# File lib/fluent/plugin/out_kubernetes.rb, line 85
def merge_json_log(record)
  if record.has_key?('log')
    log = record['log'].strip
    if log[0].eql?('{') && log[-1].eql?('}')
      begin
        parsed_log = JSON.parse(log)
        record = record.merge(parsed_log)
        unless parsed_log.has_key?('log')
          record.delete('log')
        end
      rescue JSON::ParserError
      end
    end
  end
  record
end