class ThreadSafe::TimeslotCache

The throttle filter is for throttling the number of events. The filter is configured with a lower bound, the “before_count”, and upper bound, the “after_count”, and a period of time. All events passing through the filter will be counted based on their key and the event timestamp. As long as the count is less than the “before_count” or greater than the “after_count”, the event will be “throttled” which means the filter will be considered successful and any tags or fields will be added (or removed).

The plugin is thread-safe and properly tracks past events.

For example, if you wanted to throttle events so you only receive an event after 2 occurrences and you get no more than 3 in 10 minutes, you would use the configuration:

source,ruby

period => 600 max_age => 1200 before_count => 3 after_count => 5

Which would result in:

event 1 - throttled (successful filter, period start)
event 2 - throttled (successful filter)
event 3 - not throttled
event 4 - not throttled
event 5 - not throttled
event 6 - throttled (successful filter)
event 7 - throttled (successful filter)
event x - throttled (successful filter)
period end
event 1 - throttled (successful filter, period start)
event 2 - throttled (successful filter)
event 3 - not throttled
event 4 - not throttled
event 5 - not throttled
event 6 - throttled (successful filter)
...

Another example is if you wanted to throttle events so you only receive 1 event per hour, you would use the configuration:

source,ruby

period => 3600 max_age => 7200 before_count => -1 after_count => 1

Which would result in:

event 1 - not throttled (period start)
event 2 - throttled (successful filter)
event 3 - throttled (successful filter)
event 4 - throttled (successful filter)
event x - throttled (successful filter)
period end
event 1 - not throttled (period start)
event 2 - throttled (successful filter)
event 3 - throttled (successful filter)
event 4 - throttled (successful filter)
...

A common use case would be to use the throttle filter to throttle events before 3 and after 5 while using multiple fields for the key and then use the drop filter to remove throttled events. This configuration might appear as:

source,ruby

filter {

throttle {
  before_count => 3
  after_count => 5
  period => 3600
  max_age => 7200
  key => "%{host}%{message}"
  add_tag => "throttled"
}
if "throttled" in [tags] {
  drop { }
}

}

Another case would be to store all events, but only email non-throttled events so the op's inbox isn't flooded with emails in the event of a system error. This configuration might appear as:

source,ruby

filter {

throttle {
  before_count => 3
  after_count => 5
  period => 3600
  max_age => 7200
  key => "%{message}"
  add_tag => "throttled"
}

} output {

if "throttled" not in [tags] {
  email {
    from => "logstash@mycompany.com"
    subject => "Production System Alert"
    to => "ops@mycompany.com"
    via => "sendmail"
    body => "Alert on %{host} from path %{path}:\n\n%{message}"
    options => { "location" => "/usr/sbin/sendmail" }
  }
}
elasticsearch_http {
  host => "localhost"
  port => "19200"
}

}

When an event is received, the event key is stored in a key_cache. The key references a timeslot_cache. The event is allocated to a timeslot (created dynamically) based on the timestamp of the event. The timeslot counter is incremented. When the next event is received (same key), within the same “period”, it is allocated to the same timeslot. The timeslot counter is incremented once again.

The timeslot expires if the maximum age has been exceeded. The age is calculated based on the latest event timestamp and the max_age configuration option.

---[::.. DESIGN ..::]---

+- [key_cache] -+ +– [timeslot_cache] –+ | | | @created: 1439839636 |

                | @latest:  1439839836 |
[a.b.c]  =>     +----------------------+
                | [1439839636] => 1    |
                | [1439839736] => 3    |
                | [1439839836] => 2    |
                +----------------------+

                +-- [timeslot_cache] --+
                | @created: eeeeeeeeee |
                | @latest:  llllllllll |
[x.y.z]  =>     +----------------------+
                | [0000000060] => x    |
                | [0000000120] => y    |

| | | [.….…..] => N | --------------- ----------------------

Frank de Jong (@frapex) Mike Pilone (@mikepilone)

Attributes

created[R]

Public Class Methods

new(epoch, options = nil, &block) click to toggle source
Calls superclass method
# File lib/logstash/filters/throttle.rb, line 149
def initialize(epoch, options = nil, &block)
  @created = epoch
  @latest = Atomic.new(epoch)

  super(options, &block)
end

Public Instance Methods

latest() click to toggle source
# File lib/logstash/filters/throttle.rb, line 156
def latest
  @latest.value
end
latest=(val) click to toggle source
# File lib/logstash/filters/throttle.rb, line 160
def latest=(val)
  # only update if greater than current
  @latest.update { |v| v = (val > v) ? val : v }
end