class Fluent::ParserOutput

Attributes

parser[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_parser.rb, line 20
def initialize
  super
  require 'time'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_parser.rb, line 30
def configure(conf)
  super

  if not @tag and not @remove_prefix and not @add_prefix
    raise Fluent::ConfigError, "missing both of remove_prefix and add_prefix"
  end
  if @tag and (@remove_prefix or @add_prefix)
    raise Fluent::ConfigError, "both of tag and remove_prefix/add_prefix must not be specified"
  end
  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
  if @add_prefix
    @added_prefix_string = @add_prefix + '.'
  end

  @parser = Fluent::TextParser.new
  @parser.estimate_current_event = false
  @parser.configure(conf)
  if !@time_parse && @parser.parser.respond_to?("time_key=".to_sym)
    # disable parse time
    @parser.parser.time_key = nil
  end

  self
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_parser.rb, line 58
def emit(tag, es, chain)
  tag = if @tag
          @tag
        else
          if @remove_prefix and
              ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
            tag = tag[@removed_length..-1]
          end
          if @add_prefix
            tag = if tag and tag.length > 0
                    @added_prefix_string + tag
                  else
                    @add_prefix
                  end
          end
          tag
        end
  es.each do |time,record|
    raw_value = record[@key_name]
    if raw_value.nil?
      log.warn "#{@key_name} does not exist" unless @ignore_key_not_exist
      handle_parsed(tag, record, time, {}) if @reserve_data
      next
    end
    begin
      @parser.parse(raw_value) do |t,values|
        if values
          t ||= time
          handle_parsed(tag, record, t, values)
        else
          log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log
          if @reserve_data
            t = time
            handle_parsed(tag, record, time, {})
          end
        end
      end
    rescue Fluent::TextParser::ParserError => e
      log.warn e.message unless @suppress_parse_error_log
    rescue ArgumentError => e
      if @replace_invalid_sequence
        unless e.message.index("invalid byte sequence in") == 0
          raise
        end
        replaced_string = replace_invalid_byte(raw_value)
        @parser.parse(replaced_string) do |t,values|
          if values
            t ||= time
            handle_parsed(tag, record, t, values)
          else
            log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log
            if @reserve_data
              t = time
              handle_parsed(tag, record, time, {})
            end
          end
        end
      else
        raise
      end
    rescue => e
      log.warn "parse failed #{e.message}" unless @suppress_parse_error_log
    end
  end

  chain.next
end

Private Instance Methods

handle_parsed(tag, record, t, values) click to toggle source
# File lib/fluent/plugin/out_parser.rb, line 128
def handle_parsed(tag, record, t, values)
  if values && @inject_key_prefix
    values = Hash[values.map{|k,v| [ @inject_key_prefix + k, v ]}]
  end
  r = @hash_value_field ? {@hash_value_field => values} : values
  if @reserve_data
    r = r ? record.merge(r) : record
  end
  router.emit(tag, t, r)
end
replace_invalid_byte(string) click to toggle source
# File lib/fluent/plugin/out_parser.rb, line 139
def replace_invalid_byte(string)
  replace_options = { invalid: :replace, undef: :replace, replace: '?' }
  original_encoding = string.encoding
  temporal_encoding = (original_encoding == Encoding::UTF_8 ? Encoding::UTF_16BE : Encoding::UTF_8)
  string.encode(temporal_encoding, original_encoding, replace_options).encode(original_encoding)
end