class Fluent::Plugin::SuppressOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_suppress.rb, line 16 def configure(conf) super @labelled = !conf['@label'].nil? if !@labelled && !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix raise Fluent::ConfigError, "out_suppress: Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix." end @keys = @attr_keys ? @attr_keys.split(/ *, */) : nil @slots = {} end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_suppress.rb, line 29 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_suppress.rb, line 33 def process(tag, es) es.each do |time, record| if @keys keys = @keys.map do |key| key.split(/\./).inject(record) {|r, k| r[k] } end key = tag + "\0" + keys.join("\0") else key = tag end slot = @slots[key] ||= [] # expire old records time expired = time.to_f - @interval while slot.first && (slot.first <= expired) slot.shift end if slot.length >= @num log.debug "suppressed record: #{record.to_json}" next end if @slots.length > @max_slot_num (evict_key, evict_slot) = @slots.shift if evict_slot.last && (evict_slot.last > expired) log.warn "@slots length exceeded @max_slot_num: #{@max_slot_num}. Evicted slot for the key: #{evict_key}" end end slot.push(time.to_f) _tag = tag.clone filter_record(_tag, time, record) if @labelled || tag != _tag router.emit(_tag, time, record) else log.warn "Drop record #{record} tag '#{tag}' was not replaced. Can't emit record, cause infinity looping. Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix correctly." end end end