class Fluent::Plugin::FlattenOutput

Public Instance Methods

flatten(record) click to toggle source
# File lib/fluent/plugin/out_flatten.rb, line 42
def flatten(record)
  flattened = {}

  if record.has_key?(key) && !record[key].empty?
    hash = nil

    begin
      if parse_json
        # XXX work-around
        # fluentd seems to escape json value excessively
        json = record[key].gsub(/\\"/, '"')
        hash = JSON.parse(json)
      else
        hash = record[key]
      end
    rescue JSON::ParserError
      return flattened
    end

    processor = lambda do |root, hash|
      flattened = {}
      return flattened unless hash.is_a?(Hash)

      hash.each do |path, value|
        keypath = [root, path].join('.')

        if value.is_a?(Hash)
          flattened = flattened.merge(processor.call(keypath, value))
        else
          flattened[keypath] = { inner_key => value }
        end
      end

      flattened
    end

    flattened  = processor.call(key, hash)
  end

  flattened
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_flatten.rb, line 22
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_flatten.rb, line 26
def process(tag, es)
  es.each do |time, record|
    flattened = flatten(record)

    flattened.each do |keypath, value|
      tag_with_keypath = [tag.clone, keypath].join('.')
      filter_record(tag_with_keypath, time, value)
      if @replace_space_in_tag
        router.emit(tag_with_keypath.gsub(/\s+/, @replace_space_in_tag), time, value)
      else
        router.emit(tag_with_keypath, time, value)
      end
    end
  end
end