class Fluent::Plugin::NumericMonitorOutput
Constants
- EMIT_STREAM_RECORDS
Attributes
count[RW]
last_checked[RW]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_numeric_monitor.rb, line 36 def configure(conf) label_routing_specified = conf.has_key?('@label') super if @unit @count_interval = case @unit when :minute then 60 when :hour then 3600 when :day then 86400 else raise "unknown unit: #{@unit}" end end if @input_tag_remove_prefix @removed_prefix_string = @input_tag_remove_prefix + '.' @removed_length = @removed_prefix_string.length end @key_prefix_string = '' if @output_key_prefix @key_prefix_string = @output_key_prefix + '_' end if @output_per_tag && (!label_routing_specified && !@tag_prefix) raise Fluent::ConfigError, "specify @label to route output events into other <label> sections." end if @output_per_tag && @tag_prefix @tag_prefix_string = @tag_prefix + '.' else @tag_prefix_string = nil end if system_config.workers > 1 log.warn "Fluentd is now working with multi process workers, and numeric_monitor plugin will produce monitor results in each separated processes." end @count = count_initialized @mutex = Mutex.new end
count_initialized(keys=nil)
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 93 def count_initialized(keys=nil) # counts['tag'] = {:min => num, :max => num, :sum => num, :num => num [, :sample => [....]]} if @aggregate == :all if @percentiles {'all' => {min: nil, max: nil, sum: nil, num: 0, sample: []}} else {'all' => {min: nil, max: nil, sum: nil, num: 0}} end elsif keys values = if @percentiles Array.new(keys.length) {|i| {min: nil, max: nil, sum: nil, num: 0, sample: []}} else Array.new(keys.length) {|i| {min: nil, max: nil, sum: nil, num: 0}} end Hash[[keys, values].transpose] else {} end end
countups(tag, min, max, sum, num, sample)
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 187 def countups(tag, min, max, sum, num, sample) if @aggregate == :all tag = 'all' end @mutex.synchronize do c = (@count[tag] ||= {min: nil, max: nil, sum: nil, num: 0}) if c[:min].nil? or c[:min] > min c[:min] = min end if c[:max].nil? or c[:max] < max c[:max] = max end c[:sum] = (c[:sum] || 0) + sum c[:num] += num if @percentiles c[:sample] ||= [] if c[:sample].size + sample.size > @samples_limit (c[:sample].size + sample.size - @samples_limit).times do c[:sample].delete_at(rand(c[:sample].size)) end end c[:sample] += sample end end end
flush()
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 167 def flush flushed,@count = @count,count_initialized(@count.keys.dup) generate_output(flushed) end
flush_emit()
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 172 def flush_emit if @output_per_tag time = Fluent::Engine.now flush.each do |tag, message| if @tag_prefix_string router.emit(@tag_prefix_string + tag, time, message) else router.emit(tag, time, message) end end else router.emit(@tag, Fluent::Engine.now, flush) end end
generate_fields(count, key_prefix = '', output = {})
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 120 def generate_fields(count, key_prefix = '', output = {}) output[key_prefix + 'num'] = count[:num] if count[:num] output[key_prefix + 'min'] = count[:min] if count[:min] output[key_prefix + 'max'] = count[:max] if count[:max] output[key_prefix + 'avg'] = (count[:sum] / (count[:num] * 1.0)) if count[:num] > 0 output[key_prefix + 'sum'] = count[:sum] if count[:sum] if @percentiles sorted = count[:sample].sort @percentiles.each do |p| i = (count[:num] * p / 100).floor if i > 0 i -= 1 end output[key_prefix + "percentile_#{p}"] = sorted[i] end end output end
generate_output(count)
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 139 def generate_output(count) if @aggregate == :all if @output_per_tag # tag_prefix_all: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... } } output = {'all' => generate_fields(count['all'], @key_prefix_string)} else # tag: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... } output = generate_fields(count['all'], @key_prefix_string) end else output = {} if @output_per_tag # tag_prefix_tag1: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... } # tag_prefix_tag2: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... } count.keys.each do |tag| output[stripped_tag(tag)] = generate_fields(count[tag], @key_prefix_string) end else # tag: { 'key_prefix_tag1_min' => -10, 'key_prefix_tag1_max' => 10, ..., 'key_prefix_tag2_min' => -10, 'key_prefix_tag2_max' => 10, ... } count.keys.each do |tag| key_prefix = @key_prefix_string + stripped_tag(tag) + '_' generate_fields(count[tag], key_prefix, output) end end end output end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 78 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 216 def process(tag, es) min = nil max = nil sum = 0 num = 0 sample = if @percentiles then [] else nil end es.each do |time,record| value = record[@monitor_key] next if value.nil? value = value.to_f if min.nil? or min > value min = value end if max.nil? or max < value max = value end sum += value num += 1 if @percentiles sample.push(value) end end if @percentiles && sample.size > @samples_limit (sample.size - @samples_limit / 2).to_i.times do sample.delete_at(rand(sample.size)) end end countups(tag, min, max, sum, num, sample) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_numeric_monitor.rb, line 82 def start super @last_checked = Fluent::Engine.now timer_execute(:out_numeric_counter_watcher, @count_interval) do now = Fluent::Engine.now flush_emit @last_checked = now end end
stripped_tag(tag)
click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 113 def stripped_tag(tag) return tag unless @input_tag_remove_prefix return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length return tag[@removed_length..-1] if tag == @input_tag_remove_prefix tag end