class Fluent::Plugin::ThrottleFilter
Constants
- Group
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_throttle.rb, line 46 def configure(conf) super @group_key_paths = group_key.map { |key| key.split(".") } raise "group_bucket_period_s must be > 0" \ unless @group_bucket_period_s > 0 @group_gc_timeout_s = 2 * @group_bucket_period_s raise "group_bucket_limit must be > 0" \ unless @group_bucket_limit > 0 @group_rate_limit = (@group_bucket_limit / @group_bucket_period_s) @group_reset_rate_s = @group_rate_limit \ if @group_reset_rate_s == nil raise "group_reset_rate_s must be >= -1" \ unless @group_reset_rate_s >= -1 raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \ unless @group_reset_rate_s <= @group_rate_limit raise "group_warning_delay_s must be >= 1" \ unless @group_warning_delay_s >= 1 end
filter(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 84 def filter(tag, time, record) now = Time.now rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record group = extract_group(record) # Ruby hashes are ordered by insertion. # Deleting and inserting moves the item to the end of the hash (most recently used) counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil) counter.rate_count += 1 since_last_rate_reset = now - counter.rate_last_reset if since_last_rate_reset >= 1 # compute and store rate/s at most every second counter.aprox_rate = (counter.rate_count / since_last_rate_reset).round() counter.rate_count = 0 counter.rate_last_reset = now end # try to evict the least recently used group lru_group, lru_counter = @counters.first if !lru_group.nil? && now - lru_counter.rate_last_reset > @group_gc_timeout_s @counters.delete(lru_group) end if (now.to_i / @group_bucket_period_s) \ > (counter.bucket_last_reset.to_i / @group_bucket_period_s) # next time period reached. # wait until rate drops back down (if enabled). if counter.bucket_count == -1 and @group_reset_rate_s != -1 if counter.aprox_rate < @group_reset_rate_s log_rate_back_down(now, group, counter) else log_rate_limit_exceeded(now, group, counter) return rate_limit_exceeded end end # reset counter for the rest of time period. counter.bucket_count = 0 counter.bucket_last_reset = now else # if current time period credit is exhausted, drop the record. if counter.bucket_count == -1 log_rate_limit_exceeded(now, group, counter) return rate_limit_exceeded end end counter.bucket_count += 1 # if we are out of credit, we drop logs for the rest of the time period. if counter.bucket_count > @group_bucket_limit log_rate_limit_exceeded(now, group, counter) counter.bucket_count = -1 return rate_limit_exceeded end record end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_throttle.rb, line 79 def shutdown log.debug("counters summary: #{@counters}") super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_throttle.rb, line 73 def start super @counters = {} end
Private Instance Methods
extract_group(record)
click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 147 def extract_group(record) @group_key_paths.map do |key_path| record.dig(*key_path) || record.dig(*key_path.map(&:to_sym)) end end
log_items(now, group, counter)
click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 166 def log_items(now, group, counter) since_last_reset = now - counter.bucket_last_reset rate = since_last_reset > 0 ? (counter.bucket_count / since_last_reset).round : Float::INFINITY aprox_rate = counter.aprox_rate rate = aprox_rate if aprox_rate > rate {'group_key': group, 'rate_s': rate, 'period_s': @group_bucket_period_s, 'limit': @group_bucket_limit, 'rate_limit_s': @group_rate_limit, 'reset_rate_s': @group_reset_rate_s} end
log_rate_back_down(now, group, counter)
click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 162 def log_rate_back_down(now, group, counter) log.info("rate back down", log_items(now, group, counter)) end
log_rate_limit_exceeded(now, group, counter)
click to toggle source
# File lib/fluent/plugin/filter_throttle.rb, line 153 def log_rate_limit_exceeded(now, group, counter) emit = counter.last_warning == nil ? true \ : (now - counter.last_warning) >= @group_warning_delay_s if emit log.warn("rate exceeded", log_items(now, group, counter)) counter.last_warning = now end end