class Fluent::Plugin::SuppressFilter

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_suppress.rb, line 12
def configure(conf)
  super
  @keys  = @attr_keys ? @attr_keys.split(/ *, */) : nil
  @slots = {}
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_suppress.rb, line 18
def filter_stream(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    if @keys
      keys = @keys.map do |key|
        key.split(/\./).inject(record) {|r, k| r[k] }
      end
      key = tag + "\0" + keys.join("\0")
    else
      key = tag
    end
    slot = @slots[key] ||= []

    # expire old records time
    expired = time.to_f - @interval
    while slot.first && (slot.first <= expired)
      slot.shift
    end

    if slot.length >= @num
      log.debug "suppressed record: #{record.to_json}"
      next
    end

    if @slots.length > @max_slot_num
      (evict_key, evict_slot) = @slots.shift
      if evict_slot.last && (evict_slot.last > expired)
        log.warn "@slots length exceeded @max_slot_num: #{@max_slot_num}. Evicted slot for the key: #{evict_key}"
      end
    end

    slot.push(time.to_f)
    new_es.add(time, record)
  end
  return new_es
end