class LogStash::Filters::Throttle

Constants

MC_INCR_PCT
MC_MAX_PCT
MC_MIN_PCT

The memory control mechanism automatically ajusts the maximum age of a timeslot based on the maximum number of counters.

MC_STEP_PCT

Public Instance Methods

filter(event) click to toggle source

filters the event

# File lib/logstash/filters/throttle.rb, line 223
def filter(event)
  key = event.sprintf(@key)                 # substitute field
  period = event.sprintf(@period).to_i      # substitute period
  period = 60 if period == 0                # fallback if unparsable
  epoch = event.timestamp.to_i              # event epoch time

  while true
    # initialise timeslot cache (if required)
    @key_cache.compute_if_absent(key) { ThreadSafe::TimeslotCache.new(epoch) }
    timeslot_cache = @key_cache[key]        # try to get timeslot cache
    break unless timeslot_cache.nil?        # retry until succesful

    @logger.warn? and @logger.warn(
      "filters/#{self.class.name}: timeslot cache disappeared, increase max_counters to prevent this.")
  end

  timeslot_cache.latest = epoch             # update to latest epoch

  # find target timeslot
  timeslot_key = epoch - (epoch - timeslot_cache.created) % period

  while true
    # initialise timeslot and counter (if required)
    timeslot_cache.compute_if_absent(timeslot_key) { Atomic.new(0) }
    timeslot = timeslot_cache[timeslot_key] # try to get timeslot
    break unless timeslot.nil?              # retry until succesful

    @logger.warn? and @logger.warn(
      "filters/#{self.class.name}: timeslot disappeared, increase max_age to prevent this.")
  end
    
  timeslot.update { |v| v + 1 }             # increment counter
  count = timeslot.value                    # get latest counter value

  @logger.debug? and @logger.debug(
    "filters/#{self.class.name}: counter incremented",
    { key: key, epoch: epoch, timeslot: timeslot_key, count: count }
  )

  # throttle event if counter value not in range
  if ((@before_count != -1 && count < @before_count) ||
      (@after_count != -1 && count > @after_count))
    @logger.debug? and @logger.debug(
      "filters/#{self.class.name}: throttling event",
      { key: key, epoch: epoch }
    )

    filter_matched(event)
  end

  # Delete expired timeslots older than the latest.  Do not use variable
  # timeslot_cache.latest for this.  If used, it might delete the latest timeslot.
  latest_timeslot = timeslot_cache.keys.max || 0
  timeslot_cache.each_key { |key| timeslot_cache.delete(key) if key < (latest_timeslot - @max_age) }
end
flush(options = {}) click to toggle source
# File lib/logstash/filters/throttle.rb, line 280
def flush(options = {})
  max_latest = 0                            # get maximum epoch
  @key_cache.each_value { |tc| max_latest = tc.latest if tc.latest > max_latest }

  total_counters = 0
  @key_cache.each_pair do |key,timeslot_cache|
    if timeslot_cache.latest < max_latest - @max_age
      @key_cache.delete(key)                # delete expired timeslot cache
    else
      total_counters += timeslot_cache.size # get total number of counters
    end
  end

  @logger.debug? and @logger.debug(
    "filters/#{self.class.name}: statistics",
    { total_counters: total_counters, max_age: @max_age }
  )

  # memory control mechanism
  if @max_counters != -1
    over_limit = total_counters - @max_counters

    # decrease max age of timeslot cache by x percent
    if (over_limit > 0) && (@max_age > @max_age_orig * MC_MIN_PCT / 100)
      @max_age -= @max_age_orig * MC_STEP_PCT / 100
      @logger.warn? and @logger.warn(
        "filters/#{self.class.name}: Decreased timeslot max_age to #{@max_age} because " +
        "max_counters exceeded by #{over_limit}. Use a better key to prevent too many unique event counters.")

    # increase max age of timeslot cache by x percent
    elsif (@max_age < @max_age_orig * MC_MAX_PCT / 100) && (total_counters < (@max_counters * MC_INCR_PCT / 100))
      @max_age += @max_age_orig * MC_STEP_PCT / 100
      @logger.warn? and @logger.warn(
        "filters/#{self.class.name}: Increased timeslot max_age to #{@max_age} because max_counters no longer exceeded.")
    end
  end

  return
end
register() click to toggle source

performs initialization of the filter

# File lib/logstash/filters/throttle.rb, line 216
def register
  @key_cache = ThreadSafe::Cache.new
  @max_age_orig = @max_age
end