class Fluent::HistogramOutput

Attributes

flush_interval[RW]
hists[RW]
remove_prefix_string[RW]
zero_hist[RW]

Public Class Methods

new() click to toggle source

fluentd output plugin’s methods

Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 29
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 33
def configure(conf)
  super

  raise Fluent::ConfigError, 'bin_num must be > 0' if @bin_num <= 0
  raise Fluent::ConfigError, 'sampling_rate must be >= 1' if @sampling_rate < 1
  $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100
  @sampling = true if !!conf['sampling_rate']

  @tag_prefix_string = @tag_prefix + '.' if @tag_prefix
  @tag_suffix_string = '.' + @tag_suffix if @tag_suffix
  if @input_tag_remove_prefix
    @remove_prefix_string = @input_tag_remove_prefix + '.'
    @remove_prefix_length = @remove_prefix_string.length
  end

  @zero_hist = [0] * @bin_num

  @hists = initialize_hists
  @sampling_counter = 0
  @tick = @sampling ? @sampling_rate.to_i : 1

  if @alpha > 0
    @revalue = (@alpha+1)**2 if @alpha != 0
  else
    @disable_revalue = true
  end

  @mutex = Mutex.new

end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 114
def emit(tag, es, chain)
  chain.next

  es.each do |time, record|
    keys = record[@count_key]
    if keys.instance_of? Hash
      keys.each do |k, v|
        if !@sampling
          increment(tag, k, v)
        else
          @sampling_counter += v
          if @sampling_counter >= @sampling_rate
            increment(tag, k, v)
            @sampling_counter = 0
          end
        end
      end
    else
      [keys].flatten.each do |k|
        if !@sampling
          increment(tag, k)
        else
          @sampling_counter += 1
          if @sampling_counter >= @sampling_rate
            increment(tag, k)
            @sampling_counter = 0
          end
        end
      end
    end
  end # es.each }}}
end
flush() click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 198
def flush
  flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup)
  tagging(flushed)
end
flush_emit(now) click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 203
def flush_emit(now)
  flushed = flush
  flushed.each do |tag, data|
    Fluent::Engine.emit(tag, now, data)
  end
end
generate_output(flushed) click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 173
def generate_output(flushed)
  output = {}
  flushed.each do |tag, hist|
    output[tag] = {}
    act_hist = hist.dup.select!{|v| v > 0}
    if act_hist.size == 0 # equal to zero_hist
      sum = 0
      avg = 0
      sd = 0
    else
      sum = act_hist.inject(:+)
      avg = sum / act_hist.size
      sd = act_hist.instance_eval do
        sigmas = map { |n| (avg - n)**2 }
        Math.sqrt(sigmas.inject(:+) / size)
      end
    end
    output[tag][:hist] = hist if @out_include_hist
    output[tag][:sum] = @disable_revalue ? sum : sum / @revalue
    output[tag][:avg] = @disable_revalue ? avg : avg / @revalue
    output[tag][:sd] = sd.to_i
  end
  output
end
increment(tag, key, v=1) click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 100
def increment(tag, key, v=1)
  @hists[tag] ||= @zero_hist.dup

  # id = key.hash % @bin_num
  id = key.to_s[0..9].codepoints.collect{|cp| cp}.join().to_i % @bin_num # attention to long key(length > 10)
  @mutex.synchronize {
    (0..@alpha).each do |alpha|
      (-alpha..alpha).each do |al|
        @hists[tag][(id + al) % @bin_num] += @tick * v
      end
    end
  }
end
initialize_hists(tags=nil) click to toggle source

Histogram plugin’s method

# File lib/fluent/plugin/out_histogram.rb, line 90
def initialize_hists(tags=nil)
  hists = {}
  if tags
    tags.each do |tag|
      hists[tag] = @zero_hist.dup
    end
  end
  hists
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 81
def shutdown
  super
  @watcher.terminate
  @watcher.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 64
def start
  super
  @watcher = Thread.new(&method(:watch))
end
tagging(flushed) click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 147
def tagging(flushed)
  tagged = {}
  tagged = Hash[ flushed.map do |tag, hist|
    tagged_tag = tag.dup
    if @tag
      tagged_tag = @tag
    else
      if @input_tag_remove_prefix &&
        ( ( tag.start_with?(@remove_prefix_string) &&
           tag.length > @remove_prefix_length ) ||
           tag == @input_tag_remove_prefix)
        tagged_tag = tagged_tag[@input_tag_remove_prefix.length..-1]
      end

      tagged_tag = @tag_prefix_string + tagged_tag if @tag_prefix
      tagged_tag << @tag_suffix_string if @tag_suffix

      tagged_tag.gsub!(/(^\.+)|(\.+$)/, '')
      tagged_tag.gsub!(/(\.\.+)/, '.')
    end

    [tagged_tag, hist]
  end ]
  tagged
end
watch() click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 69
def watch
  @last_checked = Fluent::Engine.now
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= @flush_interval
      now = Fluent::Engine.now
      flush_emit(now)
      @last_checked = now
    end
  end
end