class Fluent::ParserFilter
Attributes
parser[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_parser.rb, line 17 def initialize super require 'time' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_parser.rb, line 22 def configure(conf) super @parser = Fluent::TextParser.new @parser.estimate_current_event = false @parser.configure(conf) if !@time_parse && @parser.parser.respond_to?("time_key=".to_sym) # disable parse time @parser.parser.time_key = nil end self end
filter_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/filter_parser.rb, line 36 def filter_stream(tag, es) new_es = Fluent::MultiEventStream.new es.each do |time,record| raw_value = record[@key_name] if raw_value.nil? log.warn "#{@key_name} does not exist" unless @ignore_key_not_exist new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data next end begin @parser.parse(raw_value) do |t,values| if values t ||= time r = handle_parsed(tag, record, t, values) new_es.add(t, r) else log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log if @reserve_data t = time r = handle_parsed(tag, record, time, {}) new_es.add(t, r) end end end rescue Fluent::TextParser::ParserError => e log.warn e.message unless @suppress_parse_error_log rescue ArgumentError => e if @replace_invalid_sequence unless e.message.index("invalid byte sequence in") == 0 raise end replaced_string = replace_invalid_byte(raw_value) @parser.parse(replaced_string) do |t,values| if values t ||= time r = handle_parsed(tag, record, t, values) new_es.add(t, r) else log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log if @reserve_data t = time r = handle_parsed(tag, record, time, {}) new_es.add(t, r) end end end else raise end rescue => e log.warn "parse failed #{e.message}" unless @suppress_parse_error_log end end new_es end
Private Instance Methods
handle_parsed(tag, record, t, values)
click to toggle source
# File lib/fluent/plugin/filter_parser.rb, line 94 def handle_parsed(tag, record, t, values) if values && @inject_key_prefix values = Hash[values.map{|k,v| [ @inject_key_prefix + k, v ]}] end r = @hash_value_field ? {@hash_value_field => values} : values if @reserve_data r = r ? record.merge(r) : record end r end
replace_invalid_byte(string)
click to toggle source
# File lib/fluent/plugin/filter_parser.rb, line 105 def replace_invalid_byte(string) replace_options = { invalid: :replace, undef: :replace, replace: '?' } original_encoding = string.encoding temporal_encoding = (original_encoding == Encoding::UTF_8 ? Encoding::UTF_16BE : Encoding::UTF_8) string.encode(temporal_encoding, original_encoding, replace_options).encode(original_encoding) end