class Anschel::Filter

Attributes

filters[R]

Public Class Methods

new(config, stats, log) click to toggle source
# File lib/anschel/filter.rb, line 16
def initialize config, stats, log
  log.info event: 'filter-loading'
  log.info event: 'filter-config', config: config
  config ||= {} # Allow for nil config
  @filters = Hash.new { |h,k| h[k] = [] }
  config.each do |event_type, filter_defns|
    filter_defns.each do |filter_defn|
      filter_type = filter_defn.keys.first
      filter_conf = filter_defn[filter_type]
      @filters[event_type] << self.send(filter_type, filter_conf, stats, log)
    end
  end
  log.info event: 'filter-fully-loaded'
end

Public Instance Methods

apply(event) click to toggle source
# File lib/anschel/filter.rb, line 32
def apply event
  raise 'Event does not have a "type" field' unless event[:type]
  type = event[:type].to_sym # In case of modification
  filters[:_before].each { |f| f.call event }
  filters[type].each     { |f| f.call event }
  filters[:_after].each  { |f| f.call event }
  event
end
convert(conf, stats, log) click to toggle source
# File lib/anschel/filter/convert.rb, line 9
def convert conf, stats, log
  field = conf.delete :field
  type  = conf.delete :type

  raise 'Missing required "field" for "convert" filter' if field.nil?
  raise 'Missing required "type" for "convert" filter' if type.nil?

  field = field.to_sym

  type_conversions = {
    'integer' => :to_i,
    'float'   => :to_f,
    'string'  => :to_s
  }

  stats.create 'filter-convert'
  stats.create 'filter-convert-skipped'

  log.info event: 'filter-compiled', kind: 'convert', \
    field: field, type: type



  lambda do |event|
    unless event.has_key? field
      stats.inc 'filter-convert-skipped'
      return event
    end

    event[field] = event[field].send type_conversions[type]
    stats.inc 'filter-convert'
    filtered event, conf
  end

end
debug(conf, stats, log) click to toggle source
# File lib/anschel/filter/debug.rb, line 6
def debug conf, stats, log
  log.info event: 'filter-compiled', kind: 'debug'



  lambda do |event|
    log.debug \
      event: 'debug',
      event_repr: event.inspect,
      raw_event: event
    event
  end

end
gsub(conf, stats, log) click to toggle source
# File lib/anschel/filter/gsub.rb, line 10
def gsub conf, stats, log
  field   = conf.delete :field
  match   = conf.delete :match
  replace = conf.delete :replace

  raise 'Missing required "field" for "gsub" filter' if field.nil?
  raise 'Missing required "match" for "gsub" filter' if match.nil?
  raise 'Missing required "replace" for "gsub" filter' if replace.nil?

  field = field.to_sym

  match = match.is_a?(Array) ? match : [ match ]
  match = match.map { |m| Regexp.new m }

  stats.create 'filter-gsub'
  stats.create 'filter-gsub-skipped'

  log.info event: 'filter-compiled', kind: 'gsub', \
    field: field, match: match, replace: replace



  lambda do |event|
    unless event[field]
      stats.inc 'filter-gsub-skipped'
      return event
    end

    match.each do |m|
      event[field].gsub! m, replace
    end
    stats.inc 'filter-gsub'
    filtered event, conf
  end

end
index(conf, stats, log) click to toggle source
# File lib/anschel/filter/index.rb, line 11
def index conf, stats, log
  stamp  = conf.delete(:stamp)  || '@timestamp'
  prefix = conf.delete(:prefix) || 'logs-%{type}-'
  suffix = conf.delete(:suffix) || '%Y.%m.%d'
  format = conf.delete(:format) || %w[
    yyyy-MM-dd'T'HH:mm:ss.SSSZZ
    yyyy-MM-dd'T'HH:mm:ss.SSSZ
    yyyy-MM-dd'T'HH:mm:ssZZ
    yyyy-MM-dd'T'HH:mm:ssZ
  ] # ISO8601
  routing = conf.delete(:routing)

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'index-error'

  stamp = stamp.to_sym

  format = [ format ] unless format.is_a? Array

  joda = format.map do |f|
    j = org.joda.time.format.DateTimeFormat.forPattern f
    j.withDefaultYear(Time.new.year).withOffsetParsed
  end

  stats.create 'filter-index'
  stats.create 'filter-index-skipped'
  stats.create 'filter-index-error'

  log.info event: 'filter-compiled', kind: 'index', \
    stamp: stamp, prefix: prefix, suffix: suffix, format: format, routing: routing



  lambda do |event|
    idx_prefix = prefix % event

    event[:@routing] = routing % event if routing

    stamped = event.has_key? stamp
    matched = false

    joda.each do |j|
      begin
        millis = j.parseMillis event[stamp]
        idx_suffix = Time.at(0.001 * millis).utc.strftime(suffix)
        event[:@index] = idx_prefix + idx_suffix
        stats.inc 'filter-index'
        matched = true
      rescue java.lang.IllegalArgumentException => e
      end
      break if matched
    end if stamped

    return filtered(event, conf) if matched

    timestamp = Time.now.utc
    event[stamp] = timestamp.iso8601(3) unless stamped
    event[:@index] = idx_prefix + timestamp.strftime(suffix)

    log_event = {
      event: 'filter-index-warning',
      reason: 'could not parse event',
      remediation: 'added bogus index',
      remediation: "sending to best-guess index '#{event[:@index]}'",
      stamp: stamp,
      prefix: prefix,
      suffix: suffix,
      format: format
    }
    log_event[:raw_event] = event if log.debug?
    log.warn log_event

    if error_tag
      event[:tags] ||= []
      event[:tags] << error_tag
    end

    stats.inc 'filter-index-error'
    stats.inc 'filter-index'
    filtered event, conf
  end

end
parse(conf, stats, log) click to toggle source
# File lib/anschel/filter/parse.rb, line 9
def parse conf, stats, log
  field = conf.delete :field
  pattern = conf.delete(:pattern) || conf.delete(:patterns)
  unless_field = conf.delete(:unless_field) || '@timestamp'

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'parse-error'

  raise 'Missing required "field" for "parse" filter' if field.nil?
  raise 'Missing required "pattern" for "parse" filter' if pattern.nil?

  field = field.to_sym
  unless_field = unless_field.to_sym
  patterns = pattern.is_a?(Array) ? pattern : [ pattern ]
  patterns = patterns.map { |p| Regexp.new p }

  stats.create 'filter-parse'
  stats.create 'filter-parse-skipped'
  stats.create 'filter-parse-error'

  log.info event: 'filter-compiled', kind: 'parse', \
    field: field, patterns: patterns



  lambda do |event|
    unless event[field]
      stats.inc 'filter-parse-skipped'
      return event
    end

    if event[unless_field]
      stats.inc 'filter-parse-skipped'
      return event
    end

    mdata = nil
    patterns.each do |p|
      mdata = p.match event[field]
      break if mdata
    end

    if mdata.nil?
      log_event = {
        event: 'parse-filter-error',
        reason: 'regexp did not match',
        field: field,
        pattern: pattern
      }
      log_event[:raw_event] = event if log.debug?
      log.warn log_event

      stats.inc 'filter-parse-error'
      if error_tag
        event[:tags] ||= []
        event[:tags] << error_tag
      end
      return event
    end

    mdata.names.each do |group|
      event[group.to_sym] = mdata[group]
    end

    stats.inc 'filter-parse'
    filtered event, conf
  end

end
scan(conf, stats, log) click to toggle source
# File lib/anschel/filter/scan.rb, line 10
def scan conf, stats, log
  field   = conf.delete :field
  pattern = conf.delete :pattern
  target  = conf.delete :target

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'scan-error'

  raise 'Missing required "field" for "scan" filter' if field.nil?
  raise 'Missing required "pattern" for "scan" filter' if pattern.nil?
  raise 'Missing required "target" for "convert" filter' if target.nil?

  field  = field.to_sym
  target = target.to_sym
  match  = Regexp.new pattern

  stats.create 'filter-scan'
  stats.create 'filter-scan-skipped'
  stats.create 'filter-scan-nomatch'
  stats.create 'filter-scan-error'

  log.info event: 'filter-compiled', kind: 'scan', \
    field: field, pattern: pattern, match: match, target: target



  lambda do |event|
    unless event[field]
      stats.inc 'filter-scan-skipped'
      return event
    end

    results = event[field].scan(match).flatten.uniq.map do |s|
      s.reverse.reverse # N.B. There seems to be some issue with the "scan"
                        #      function in JRuby wherein the matches are
                        #      shared across threads or somehow mangled.
                        #      The reverse.reverse here ensures that we
                        #      create a new object with the original
                        #      contents still intact. If you have a
                        #      better solution, please contact me!
    end

    if results.empty?
      stats.inc 'filter-scan-nomatch'
      event

    else
      event[target] ||= []
      event[target]  += results
      stats.inc 'filter-scan'
      filtered event, conf
    end
  end

end
stamp(conf, stats, log) click to toggle source
# File lib/anschel/filter/stamp.rb, line 14
def stamp conf, stats, log
  utc       = conf.delete :utc?
  field     = conf.delete :field
  pattern   = conf.delete :pattern
  target    = conf.delete :target
  precision = conf.delete(:precision) || 3

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'stamp-error'

  raise 'Missing required "field" for "stamp" filter' if field.nil?
  raise 'Missing required "pattern" for "stamp" filter' if pattern.nil?

  patterns = pattern.is_a?(Array) ? pattern : [ pattern ]
  target ||= '@timestamp'

  field  = field.to_sym
  target = target.to_sym

  parsers = patterns.map do |p|
    joda = org.joda.time.format.DateTimeFormat.forPattern p
    joda = joda.withDefaultYear Time.new.year
    joda = joda.withOffsetParsed
  end

  offset_s = utc ? Time.zone_offset(Time.now.zone).to_f : 0.0

  stats.create 'filter-stamp'
  stats.create 'filter-stamp-skipped'
  stats.create 'filter-stamp-error'

  log.info event: 'filter-compiled', kind: 'stamp', \
    utc?: utc, field: field, pattern: pattern, target: target



  lambda do |event|
    unless event[field]
      stats.inc 'filter-stamp-skipped'
      return event
    end

    if event[target]
      log_event = {
        event: 'stamp-filter-warning',
        reason: 'event already has target field',
        utc?: utc,
        field: field,
        pattern: pattern,
        target: target
      }
      log_event[:raw_event] = event if log.debug?
      log.warn log_event
      event[target] = \
        DateTime.parse(event[target]).to_time.utc.iso8601(precision)
      return event
    end

    matched = false
    parsers.each do |joda|
      begin
        millis = joda.parseMillis event[field]
        event[target] = Time.at(0.001 * millis + offset_s).utc.iso8601(precision)
        stats.inc 'filter-stamp'
        matched = true
      rescue
      end
      break if matched
    end

    return filtered(event, conf) if matched

    log_event = {
      event: 'stamp-filter-warning',
      reason: 'could not parse event',
      remediation: 'using current time for stamp',
      utc?: utc,
      field: field,
      pattern: pattern,
      target: target
    }
    log_event[:raw_event] = event if log.debug?
    log.warn log_event

    if error_tag
      event[:tags] ||= []
      event[:tags] << error_tag
    end

    event[target] = Time.now.utc.iso8601(precision)
    stats.inc 'filter-stamp-error'
    filtered event, conf
  end

end
tag(conf, stats, log) click to toggle source
# File lib/anschel/filter/tag.rb, line 8
def tag conf, stats, log
  tags = conf.delete :with

  raise 'Missing required "with" for "tag" filter' if tags.nil?

  tags = tags.is_a?(Array) ? tags : [ tags ]

  stats.create 'filter-tag'

  log.info event: 'filter-compiled', kind: 'tag', with: tags



  lambda do |event|
    event[:tags] ||= []
    event[:tags]  += tags
    event[:tags].uniq!
    stats.inc 'filter-tag'
    filtered event, conf
  end

end

Private Instance Methods

filtered(event, options) click to toggle source
# File lib/anschel/filter.rb, line 44
def filtered event, options
  if remove_field = options[:remove_field]
    event.delete remove_field.to_sym
  end
  event
end