class Fluent::Plugin::ThrottleFilter

Constants

Group

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_throttle.rb, line 46
def configure(conf)
  super

  @group_key_paths = group_key.map { |key| key.split(".") }

  raise "group_bucket_period_s must be > 0" \
    unless @group_bucket_period_s > 0

  @group_gc_timeout_s = 2 * @group_bucket_period_s

  raise "group_bucket_limit must be > 0" \
    unless @group_bucket_limit > 0

  @group_rate_limit = (@group_bucket_limit / @group_bucket_period_s)

  @group_reset_rate_s = @group_rate_limit \
    if @group_reset_rate_s == nil

  raise "group_reset_rate_s must be >= -1" \
    unless @group_reset_rate_s >= -1
  raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \
    unless @group_reset_rate_s <= @group_rate_limit

  raise "group_warning_delay_s must be >= 1" \
    unless @group_warning_delay_s >= 1
end
filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 84
def filter(tag, time, record)
  now = Time.now
  rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
  group = extract_group(record)
  
  # Ruby hashes are ordered by insertion.
  # Deleting and inserting moves the item to the end of the hash (most recently used)
  counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil)

  counter.rate_count += 1
  since_last_rate_reset = now - counter.rate_last_reset
  if since_last_rate_reset >= 1
    # compute and store rate/s at most every second
    counter.aprox_rate = (counter.rate_count / since_last_rate_reset).round()
    counter.rate_count = 0
    counter.rate_last_reset = now
  end

  # try to evict the least recently used group
  lru_group, lru_counter = @counters.first
  if !lru_group.nil? && now - lru_counter.rate_last_reset > @group_gc_timeout_s
    @counters.delete(lru_group)
  end

  if (now.to_i / @group_bucket_period_s) \
      > (counter.bucket_last_reset.to_i / @group_bucket_period_s)
    # next time period reached.

    # wait until rate drops back down (if enabled).
    if counter.bucket_count == -1 and @group_reset_rate_s != -1
      if counter.aprox_rate < @group_reset_rate_s
        log_rate_back_down(now, group, counter)
      else
        log_rate_limit_exceeded(now, group, counter)
        return rate_limit_exceeded
      end
    end

    # reset counter for the rest of time period.
    counter.bucket_count = 0
    counter.bucket_last_reset = now
  else
    # if current time period credit is exhausted, drop the record.
    if counter.bucket_count == -1
      log_rate_limit_exceeded(now, group, counter)
      return rate_limit_exceeded
    end
  end

  counter.bucket_count += 1

  # if we are out of credit, we drop logs for the rest of the time period.
  if counter.bucket_count > @group_bucket_limit
    log_rate_limit_exceeded(now, group, counter)
    counter.bucket_count = -1
    return rate_limit_exceeded
  end

  record
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_throttle.rb, line 79
def shutdown
  log.debug("counters summary: #{@counters}")
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_throttle.rb, line 73
def start
  super

  @counters = {}
end

Private Instance Methods

extract_group(record) click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 147
def extract_group(record)
  @group_key_paths.map do |key_path|
    record.dig(*key_path) || record.dig(*key_path.map(&:to_sym))
  end
end
log_items(now, group, counter) click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 166
def log_items(now, group, counter)
  since_last_reset = now - counter.bucket_last_reset
  rate = since_last_reset > 0 ? (counter.bucket_count / since_last_reset).round : Float::INFINITY
  aprox_rate = counter.aprox_rate
  rate = aprox_rate if aprox_rate > rate

  {'group_key': group,
   'rate_s': rate,
   'period_s': @group_bucket_period_s,
   'limit': @group_bucket_limit,
   'rate_limit_s': @group_rate_limit,
   'reset_rate_s': @group_reset_rate_s}
end
log_rate_back_down(now, group, counter) click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 162
def log_rate_back_down(now, group, counter)
  log.info("rate back down", log_items(now, group, counter))
end
log_rate_limit_exceeded(now, group, counter) click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 153
def log_rate_limit_exceeded(now, group, counter)
  emit = counter.last_warning == nil ? true \
    : (now - counter.last_warning) >= @group_warning_delay_s
  if emit
    log.warn("rate exceeded", log_items(now, group, counter))
    counter.last_warning = now
  end
end