class Fluent::HistogramOutput
Attributes
flush_interval[RW]
hists[RW]
remove_prefix_string[RW]
zero_hist[RW]
Public Class Methods
new()
click to toggle source
fluentd output plugin’s methods
Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 29 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 33 def configure(conf) super raise Fluent::ConfigError, 'bin_num must be > 0' if @bin_num <= 0 raise Fluent::ConfigError, 'sampling_rate must be >= 1' if @sampling_rate < 1 $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100 @sampling = true if !!conf['sampling_rate'] @tag_prefix_string = @tag_prefix + '.' if @tag_prefix @tag_suffix_string = '.' + @tag_suffix if @tag_suffix if @input_tag_remove_prefix @remove_prefix_string = @input_tag_remove_prefix + '.' @remove_prefix_length = @remove_prefix_string.length end @zero_hist = [0] * @bin_num @hists = initialize_hists @sampling_counter = 0 @tick = @sampling ? @sampling_rate.to_i : 1 if @alpha > 0 @revalue = (@alpha+1)**2 if @alpha != 0 else @disable_revalue = true end @mutex = Mutex.new end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 114 def emit(tag, es, chain) chain.next es.each do |time, record| keys = record[@count_key] if keys.instance_of? Hash keys.each do |k, v| if !@sampling increment(tag, k, v) else @sampling_counter += v if @sampling_counter >= @sampling_rate increment(tag, k, v) @sampling_counter = 0 end end end else [keys].flatten.each do |k| if !@sampling increment(tag, k) else @sampling_counter += 1 if @sampling_counter >= @sampling_rate increment(tag, k) @sampling_counter = 0 end end end end end # es.each }}} end
flush()
click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 198 def flush flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup) tagging(flushed) end
flush_emit(now)
click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 203 def flush_emit(now) flushed = flush flushed.each do |tag, data| Fluent::Engine.emit(tag, now, data) end end
generate_output(flushed)
click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 173 def generate_output(flushed) output = {} flushed.each do |tag, hist| output[tag] = {} act_hist = hist.dup.select!{|v| v > 0} if act_hist.size == 0 # equal to zero_hist sum = 0 avg = 0 sd = 0 else sum = act_hist.inject(:+) avg = sum / act_hist.size sd = act_hist.instance_eval do sigmas = map { |n| (avg - n)**2 } Math.sqrt(sigmas.inject(:+) / size) end end output[tag][:hist] = hist if @out_include_hist output[tag][:sum] = @disable_revalue ? sum : sum / @revalue output[tag][:avg] = @disable_revalue ? avg : avg / @revalue output[tag][:sd] = sd.to_i end output end
increment(tag, key, v=1)
click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 100 def increment(tag, key, v=1) @hists[tag] ||= @zero_hist.dup # id = key.hash % @bin_num id = key.to_s[0..9].codepoints.collect{|cp| cp}.join().to_i % @bin_num # attention to long key(length > 10) @mutex.synchronize { (0..@alpha).each do |alpha| (-alpha..alpha).each do |al| @hists[tag][(id + al) % @bin_num] += @tick * v end end } end
initialize_hists(tags=nil)
click to toggle source
Histogram plugin’s method
# File lib/fluent/plugin/out_histogram.rb, line 90 def initialize_hists(tags=nil) hists = {} if tags tags.each do |tag| hists[tag] = @zero_hist.dup end end hists end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 81 def shutdown super @watcher.terminate @watcher.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_histogram.rb, line 64 def start super @watcher = Thread.new(&method(:watch)) end
tagging(flushed)
click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 147 def tagging(flushed) tagged = {} tagged = Hash[ flushed.map do |tag, hist| tagged_tag = tag.dup if @tag tagged_tag = @tag else if @input_tag_remove_prefix && ( ( tag.start_with?(@remove_prefix_string) && tag.length > @remove_prefix_length ) || tag == @input_tag_remove_prefix) tagged_tag = tagged_tag[@input_tag_remove_prefix.length..-1] end tagged_tag = @tag_prefix_string + tagged_tag if @tag_prefix tagged_tag << @tag_suffix_string if @tag_suffix tagged_tag.gsub!(/(^\.+)|(\.+$)/, '') tagged_tag.gsub!(/(\.\.+)/, '.') end [tagged_tag, hist] end ] tagged end
watch()
click to toggle source
# File lib/fluent/plugin/out_histogram.rb, line 69 def watch @last_checked = Fluent::Engine.now while true sleep 0.5 if Fluent::Engine.now - @last_checked >= @flush_interval now = Fluent::Engine.now flush_emit(now) @last_checked = now end end end