# File lib/fluent/plugin/filter_record_transformer.rb, line 23 def initialize require 'socket' super end
# File lib/fluent/plugin/filter_record_transformer.rb, line 33 def configure(conf) super @map = {} # <record></record> directive conf.elements.select { |element| element.name == 'record' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # to suppress unread configuration warning @map[k] = parse_value(v) end end if @remove_keys @remove_keys = @remove_keys.split(',') end if @keep_keys raise Fluent::ConfigError, "`renew_record` must be true to use `keep_keys`" unless @renew_record @keep_keys = @keep_keys.split(',') end @placeholder_expander = if @enable_ruby # require utilities which would be used in ruby placeholders require 'pathname' require 'uri' require 'cgi' RubyPlaceholderExpander.new(log) else PlaceholderExpander.new(log) end @hostname = Socket.gethostname end
# File lib/fluent/plugin/filter_record_transformer.rb, line 68 def filter_stream(tag, es) new_es = MultiEventStream.new tag_parts = tag.split('.') tag_prefix = tag_prefix(tag_parts) tag_suffix = tag_suffix(tag_parts) placeholders = { 'tag' => tag, 'tag_parts' => tag_parts, 'tag_prefix' => tag_prefix, 'tag_suffix' => tag_suffix, 'hostname' => @hostname, } last_record = nil es.each do |time, record| last_record = record # for debug log new_record = reform(time, record, placeholders) new_es.add(time, new_record) end new_es rescue => e log.warn "failed to reform records", :error_class => e.class, :error => e.message log.warn_backtrace log.debug "map:#{@map} record:#{last_record} placeholders:#{placeholders}" end
# File lib/fluent/plugin/filter_record_transformer.rb, line 117 def interpolate(value) if value.is_a?(String) value = @placeholder_expander.expand(value) elsif value.is_a?(Hash) new_value = {} value.each_pair do |k, v| new_value[@placeholder_expander.expand(k)] = interpolate(v) end value = new_value elsif value.is_a?(Array) value.each_with_index do |v, i| value[i] = interpolate(v) end end value end
# File lib/fluent/plugin/filter_record_transformer.rb, line 95 def parse_value(value_str) if value_str.start_with?('{', '[') JSON.parse(value_str) else value_str end rescue => e log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", :error_class => e.class, :error => e.message value_str # emit as string end
# File lib/fluent/plugin/filter_record_transformer.rb, line 106 def reform(time, record, opts) @placeholder_expander.prepare_placeholders(time, record, opts) new_record = @renew_record ? {} : record.dup @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record @map.each_pair {|k, v| new_record[k] = interpolate(v)} @remove_keys.each {|k| new_record.delete(k) } if @remove_keys new_record end
# File lib/fluent/plugin/filter_record_transformer.rb, line 134 def tag_prefix(tag_parts) return [] if tag_parts.empty? tag_prefix = [tag_parts.first] 1.upto(tag_parts.size-1).each do |i| tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}" end tag_prefix end
# File lib/fluent/plugin/filter_record_transformer.rb, line 143 def tag_suffix(tag_parts) return [] if tag_parts.empty? rev_tag_parts = tag_parts.reverse rev_tag_suffix = [rev_tag_parts.first] 1.upto(tag_parts.size-1).each do |i| rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}" end rev_tag_suffix.reverse! end