class ThreadSafe::TimeslotCache
The throttle filter is for throttling the number of events. The filter is configured with a lower bound, the “before_count”, and upper bound, the “after_count”, and a period of time. All events passing through the filter will be counted based on their key and the event timestamp. As long as the count is less than the “before_count” or greater than the “after_count”, the event will be “throttled” which means the filter will be considered successful and any tags or fields will be added (or removed).
The plugin is thread-safe and properly tracks past events.
For example, if you wanted to throttle events so you only receive an event after 2 occurrences and you get no more than 3 in 10 minutes, you would use the configuration:
- source,ruby
-
period => 600 max_age => 1200 before_count => 3 after_count => 5
Which would result in:
¶ ↑
event 1 - throttled (successful filter, period start) event 2 - throttled (successful filter) event 3 - not throttled event 4 - not throttled event 5 - not throttled event 6 - throttled (successful filter) event 7 - throttled (successful filter) event x - throttled (successful filter) period end event 1 - throttled (successful filter, period start) event 2 - throttled (successful filter) event 3 - not throttled event 4 - not throttled event 5 - not throttled event 6 - throttled (successful filter) ...
¶ ↑
Another example is if you wanted to throttle events so you only receive 1 event per hour, you would use the configuration:
- source,ruby
-
period => 3600 max_age => 7200 before_count => -1 after_count => 1
Which would result in:
¶ ↑
event 1 - not throttled (period start) event 2 - throttled (successful filter) event 3 - throttled (successful filter) event 4 - throttled (successful filter) event x - throttled (successful filter) period end event 1 - not throttled (period start) event 2 - throttled (successful filter) event 3 - throttled (successful filter) event 4 - throttled (successful filter) ...
¶ ↑
A common use case would be to use the throttle filter to throttle events before 3 and after 5 while using multiple fields for the key and then use the drop filter to remove throttled events. This configuration might appear as:
- source,ruby
-
filter {
throttle { before_count => 3 after_count => 5 period => 3600 max_age => 7200 key => "%{host}%{message}" add_tag => "throttled" } if "throttled" in [tags] { drop { } }
}
Another case would be to store all events, but only email non-throttled events so the op's inbox isn't flooded with emails in the event of a system error. This configuration might appear as:
- source,ruby
-
filter {
throttle { before_count => 3 after_count => 5 period => 3600 max_age => 7200 key => "%{message}" add_tag => "throttled" }
} output {
if "throttled" not in [tags] { email { from => "logstash@mycompany.com" subject => "Production System Alert" to => "ops@mycompany.com" via => "sendmail" body => "Alert on %{host} from path %{path}:\n\n%{message}" options => { "location" => "/usr/sbin/sendmail" } } } elasticsearch_http { host => "localhost" port => "19200" }
}
When an event is received, the event key is stored in a key_cache. The key references a timeslot_cache. The event is allocated to a timeslot (created dynamically) based on the timestamp of the event. The timeslot counter is incremented. When the next event is received (same key), within the same “period”, it is allocated to the same timeslot. The timeslot counter is incremented once again.
The timeslot expires if the maximum age has been exceeded. The age is calculated based on the latest event timestamp and the max_age configuration option.
---[::.. DESIGN ..::]---
+- [key_cache] -+ +– [timeslot_cache] –+ | | | @created: 1439839636 |
| @latest: 1439839836 | [a.b.c] => +----------------------+ | [1439839636] => 1 | | [1439839736] => 3 | | [1439839836] => 2 | +----------------------+ +-- [timeslot_cache] --+ | @created: eeeeeeeeee | | @latest: llllllllll | [x.y.z] => +----------------------+ | [0000000060] => x | | [0000000120] => y |
| | | [.….…..] => N | ---------------
----------------------
Frank de Jong (@frapex) Mike Pilone (@mikepilone)
Attributes
Public Class Methods
# File lib/logstash/filters/throttle.rb, line 149 def initialize(epoch, options = nil, &block) @created = epoch @latest = Atomic.new(epoch) super(options, &block) end
Public Instance Methods
# File lib/logstash/filters/throttle.rb, line 156 def latest @latest.value end
# File lib/logstash/filters/throttle.rb, line 160 def latest=(val) # only update if greater than current @latest.update { |v| v = (val > v) ? val : v } end