class Fluent::Plugin::FlattenOutput
Public Instance Methods
flatten(record)
click to toggle source
# File lib/fluent/plugin/out_flatten.rb, line 42 def flatten(record) flattened = {} if record.has_key?(key) && !record[key].empty? hash = nil begin if parse_json # XXX work-around # fluentd seems to escape json value excessively json = record[key].gsub(/\\"/, '"') hash = JSON.parse(json) else hash = record[key] end rescue JSON::ParserError return flattened end processor = lambda do |root, hash| flattened = {} return flattened unless hash.is_a?(Hash) hash.each do |path, value| keypath = [root, path].join('.') if value.is_a?(Hash) flattened = flattened.merge(processor.call(keypath, value)) else flattened[keypath] = { inner_key => value } end end flattened end flattened = processor.call(key, hash) end flattened end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_flatten.rb, line 22 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_flatten.rb, line 26 def process(tag, es) es.each do |time, record| flattened = flatten(record) flattened.each do |keypath, value| tag_with_keypath = [tag.clone, keypath].join('.') filter_record(tag_with_keypath, time, value) if @replace_space_in_tag router.emit(tag_with_keypath.gsub(/\s+/, @replace_space_in_tag), time, value) else router.emit(tag_with_keypath, time, value) end end end end