class Fluent::Plugin::LogsFilter

Constants

FORMATTERS
REGEXPS_LOGS
RENAME_MAP

Public Instance Methods

filter(_tag, _time, record) click to toggle source
# File lib/fluent/plugin/filter_logs.rb, line 124
def filter(_tag, _time, record)
  log.trace { "filter_logs: (#{record.class}) #{record.inspect}" }
  record = record.merge(ow_parse_logs(record['log']))
  ow_post_process(record)
end
ow_parse_logs(text) click to toggle source
# File lib/fluent/plugin/filter_logs.rb, line 43
def ow_parse_logs(text)
  return {} unless text

  if text[0] == '{'
    begin
      return JSON.parse(text)
    rescue JSON::ParserError
      # byebug
    end
  end

  REGEXPS_LOGS.each do |r, additional|
    m = text.match(r)
    next unless m

    record = m.named_captures
    if additional
      record.merge!(additional) if additional.is_a?(Hash)
      record.merge!(additional.call(record)) if additional.is_a?(Proc)
    end
    return record
  end

  if text.match(/^(?:[a-zA-Z0-9]+=(?:\"[^"]*\"|\S*) ?)+/)
    return Logfmt.parse(text)
  end

  {}
end
ow_post_process(record) click to toggle source
# File lib/fluent/plugin/filter_logs.rb, line 95
def ow_post_process(record)
  text = record['log']
  record.delete('log')

  if record['data']
    record['status_code'] = record['data']['status']
    record['level'] = 'DEBUG'
    record['message'] = JSON.dump(record.delete('data'))
  end

  if record['status_code']
    record['status_code'] = record['status_code'].to_i
  end

  RENAME_MAP.each do |src, dst|
    if record[src] && record[dst].nil?
      record[dst] = record[src]
      record.delete(src)
    end
  end

  FORMATTERS.each do |k, formatter|
    record[k] = formatter.call(record[k]) if record[k]
  end

  record['message'] ||= text
  record
end