class Fluent::Plugin::SuppressOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_suppress.rb, line 16
def configure(conf)
  super

  @labelled = !conf['@label'].nil?

  if !@labelled && !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix
    raise Fluent::ConfigError, "out_suppress: Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix."
  end

  @keys  = @attr_keys ? @attr_keys.split(/ *, */) : nil
  @slots = {}
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_suppress.rb, line 29
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_suppress.rb, line 33
def process(tag, es)
  es.each do |time, record|
    if @keys
      keys = @keys.map do |key|
        key.split(/\./).inject(record) {|r, k| r[k] }
      end
      key = tag + "\0" + keys.join("\0")
    else
      key = tag
    end
    slot = @slots[key] ||= []

    # expire old records time
    expired = time.to_f - @interval
    while slot.first && (slot.first <= expired)
      slot.shift
    end

    if slot.length >= @num
      log.debug "suppressed record: #{record.to_json}"
      next
    end

    if @slots.length > @max_slot_num
      (evict_key, evict_slot) = @slots.shift
      if evict_slot.last && (evict_slot.last > expired)
        log.warn "@slots length exceeded @max_slot_num: #{@max_slot_num}. Evicted slot for the key: #{evict_key}"
      end
    end

    slot.push(time.to_f)
    _tag = tag.clone
    filter_record(_tag, time, record)
    if @labelled || tag != _tag
      router.emit(_tag, time, record)
    else
      log.warn "Drop record #{record} tag '#{tag}' was not replaced. Can't emit record, cause infinity looping. Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix correctly."
    end
  end
end