class Fluent::Plugin::SamplingFilter

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_sampling.rb, line 14
def configure(conf)
  super

  @counts = {}
  @resets = {} if @minimum_rate_per_min
  @accessor = record_accessor_create(@sample_unit) unless %w(all tag).include?(@sample_unit)
end
filter(tag, _time, record) click to toggle source

Access to @counts SHOULD be protected by mutex, with a heavy penalty. Code below is not thread safe, but @counts (counter for sampling rate) is not so serious value (and probably will not be broken…), then i let here as it is now.

# File lib/fluent/plugin/filter_sampling.rb, line 27
def filter(tag, _time, record)
  t = record_key(tag, record)
  if @minimum_rate_per_min
    filter_with_minimum_rate(t, record)
  else
    filter_simple(t, record)
  end
end
filter_simple(t, record) click to toggle source
# File lib/fluent/plugin/filter_sampling.rb, line 36
def filter_simple(t, record)
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  # reset only just before @counts[t] is to be bignum from fixnum
  @counts[t] = 0 if c > 0x6fffffff
  if c % @interval == 0
    record
  else
    nil
  end
end
filter_with_minimum_rate(t, record) click to toggle source
# File lib/fluent/plugin/filter_sampling.rb, line 47
def filter_with_minimum_rate(t, record)
  @resets[t] ||= Fluent::Clock.now + (60 - rand(30))
  if Fluent::Clock.now > @resets[t]
    @resets[t] = Fluent::Clock.now + 60
    @counts[t] = 0
  end
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  if c < @minimum_rate_per_min || c % @interval == 0
    record.dup
  else
    nil
  end
end
record_key(tag, record) click to toggle source
# File lib/fluent/plugin/filter_sampling.rb, line 61
def record_key(tag, record)
  case @sample_unit
  when 'all'
    'all'
  when 'tag'
    tag
  else
    @accessor.call(record)
  end
end