class LogStash::Filters::Throttle
Constants
- MC_INCR_PCT
- MC_MAX_PCT
- MC_MIN_PCT
The memory control mechanism automatically ajusts the maximum age of a timeslot based on the maximum number of counters.
- MC_STEP_PCT
Public Instance Methods
filter(event)
click to toggle source
filters the event
# File lib/logstash/filters/throttle.rb, line 223 def filter(event) key = event.sprintf(@key) # substitute field period = event.sprintf(@period).to_i # substitute period period = 60 if period == 0 # fallback if unparsable epoch = event.timestamp.to_i # event epoch time while true # initialise timeslot cache (if required) @key_cache.compute_if_absent(key) { ThreadSafe::TimeslotCache.new(epoch) } timeslot_cache = @key_cache[key] # try to get timeslot cache break unless timeslot_cache.nil? # retry until succesful @logger.warn? and @logger.warn( "filters/#{self.class.name}: timeslot cache disappeared, increase max_counters to prevent this.") end timeslot_cache.latest = epoch # update to latest epoch # find target timeslot timeslot_key = epoch - (epoch - timeslot_cache.created) % period while true # initialise timeslot and counter (if required) timeslot_cache.compute_if_absent(timeslot_key) { Atomic.new(0) } timeslot = timeslot_cache[timeslot_key] # try to get timeslot break unless timeslot.nil? # retry until succesful @logger.warn? and @logger.warn( "filters/#{self.class.name}: timeslot disappeared, increase max_age to prevent this.") end timeslot.update { |v| v + 1 } # increment counter count = timeslot.value # get latest counter value @logger.debug? and @logger.debug( "filters/#{self.class.name}: counter incremented", { key: key, epoch: epoch, timeslot: timeslot_key, count: count } ) # throttle event if counter value not in range if ((@before_count != -1 && count < @before_count) || (@after_count != -1 && count > @after_count)) @logger.debug? and @logger.debug( "filters/#{self.class.name}: throttling event", { key: key, epoch: epoch } ) filter_matched(event) end # Delete expired timeslots older than the latest. Do not use variable # timeslot_cache.latest for this. If used, it might delete the latest timeslot. latest_timeslot = timeslot_cache.keys.max || 0 timeslot_cache.each_key { |key| timeslot_cache.delete(key) if key < (latest_timeslot - @max_age) } end
flush(options = {})
click to toggle source
# File lib/logstash/filters/throttle.rb, line 280 def flush(options = {}) max_latest = 0 # get maximum epoch @key_cache.each_value { |tc| max_latest = tc.latest if tc.latest > max_latest } total_counters = 0 @key_cache.each_pair do |key,timeslot_cache| if timeslot_cache.latest < max_latest - @max_age @key_cache.delete(key) # delete expired timeslot cache else total_counters += timeslot_cache.size # get total number of counters end end @logger.debug? and @logger.debug( "filters/#{self.class.name}: statistics", { total_counters: total_counters, max_age: @max_age } ) # memory control mechanism if @max_counters != -1 over_limit = total_counters - @max_counters # decrease max age of timeslot cache by x percent if (over_limit > 0) && (@max_age > @max_age_orig * MC_MIN_PCT / 100) @max_age -= @max_age_orig * MC_STEP_PCT / 100 @logger.warn? and @logger.warn( "filters/#{self.class.name}: Decreased timeslot max_age to #{@max_age} because " + "max_counters exceeded by #{over_limit}. Use a better key to prevent too many unique event counters.") # increase max age of timeslot cache by x percent elsif (@max_age < @max_age_orig * MC_MAX_PCT / 100) && (total_counters < (@max_counters * MC_INCR_PCT / 100)) @max_age += @max_age_orig * MC_STEP_PCT / 100 @logger.warn? and @logger.warn( "filters/#{self.class.name}: Increased timeslot max_age to #{@max_age} because max_counters no longer exceeded.") end end return end
register()
click to toggle source
performs initialization of the filter
# File lib/logstash/filters/throttle.rb, line 216 def register @key_cache = ThreadSafe::Cache.new @max_age_orig = @max_age end