class Fluent::StatsNotifierOutput
Attributes
counts[RW]
last_checked[RW]
queues[RW]
saved_at[RW]
saved_duration[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_stats_notifier.rb, line 15 def initialize super require 'pathname' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_stats_notifier.rb, line 43 def configure(conf) super @interval = @interval.to_i if @less_than and @less_equal raise Fluent::ConfigError, "out_stats_notifier: Only either of `less_than` or `less_equal` can be specified." end if @greater_than and @greater_equal raise Fluent::ConfigError, "out_stats_notifier: Only either of `greater_than` or `greater_equal` can be specified." end @aggregate_stats = @compare_with if @compare_with # Support old version compatibility case @aggregate_stats when "sum" @aggregate_stats = :sum when "max" @aggregate_stats = :max when "min" @aggregate_stats = :min when "avg" @aggregate_stats = :avg else raise Fluent::ConfigError, "out_stats_notifier: `aggregate_stats` must be one of `sum`, `max`, `min`, `avg`" end case @stats when "sum" @stats = :sum when "max" @stats = :max when "min" @stats = :min when "avg" @stats = :avg else raise Fluent::ConfigError, "out_stats_notifier: `stats` must be one of `sum`, `max`, `min`, `avg`" end if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? raise Fluent::ConfigError, "out_stats_notifier: No tag option is specified" end @tag_proc = tag_proc case @aggregate when 'all' raise Fluent::ConfigError, "out_stats_notifier: `tag` must be specified with aggregate all" if @tag.nil? @aggregate = :all when 'tag' # raise Fluent::ConfigError, "out_stats_notifier: `add_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? @aggregate = :tag else raise Fluent::ConfigError, "out_stats_notifier: aggregate allows tag/all" end @counts = {} @queues = {} @mutex = Mutex.new end
emit(tag, es, chain)
click to toggle source
Called when new line comes. This method actually does not emit
# File lib/fluent/plugin/out_stats_notifier.rb, line 117 def emit(tag, es, chain) key = @target_key # enqueus count = 0; queues = {} es.each do |time,record| if record[key] queues[key] ||= [] queues[key] << record[key] end count += 1 end # thread safe merge @counts[tag] ||= 0 @queues[tag] ||= {} @mutex.synchronize do if queues[key] @queues[tag][key] ||= [] @queues[tag][key].concat(queues[key]) end @counts[tag] += count end chain.next rescue => e log.warn "#{e.class} #{e.message} #{e.backtrace.first}" end
flush_emit(step)
click to toggle source
This method is the real one to emit
# File lib/fluent/plugin/out_stats_notifier.rb, line 167 def flush_emit(step) time = Fluent::Engine.now counts, queues, @counts, @queues = @counts, @queues, {}, {} # Get statistical value among events evented_queues = {} queues.each do |tag, queue| evented_queues[tag] ||= {} evented_queues[tag][@target_key] = get_stats(queue[@target_key], @stats) if queue[@target_key] end if @aggregate == :all values = evented_queues.values.map {|queue| queue[@target_key] }.compact value = get_stats(values, @aggregate_stats) output = generate_output(value) if value router.emit(@tag, time, output) if output else # aggregate tag evented_queues.each do |tag, queue| value = queue[@target_key] output = generate_output(value) if value emit_tag = @tag_proc.call(tag) router.emit(emit_tag, time, output) if output end end end
generate_output(value)
click to toggle source
# File lib/fluent/plugin/out_stats_notifier.rb, line 206 def generate_output(value) return nil if value == 0 # ignore 0 because standby nodes receive 0 message usually return nil if @less_than and @less_than <= value return nil if @less_equal and @less_equal < value return nil if @greater_than and value <= @greater_than return nil if @greater_equal and value < @greater_equal output = {} output[@target_key] = value output end
get_stats(values, method = :max)
click to toggle source
# File lib/fluent/plugin/out_stats_notifier.rb, line 193 def get_stats(values, method = :max) case method when :sum stats = values.inject(:+) when :max stats = values.max when :min stats = values.min when :avg stats = values.inject(:+) / values.count unless values.empty? end end
load_status(file_path, interval)
click to toggle source
Load internal status from a file
@param [String] file_path @param [Interger] interval
# File lib/fluent/plugin/out_stats_notifier.rb, line 266 def load_status(file_path, interval) return unless (f = Pathname.new(file_path)).exist? begin f.open('rb') do |f| stored = Marshal.load(f) if stored[:target_key] == @target_key if stored[:queues] if Fluent::Engine.now <= stored[:saved_at] + interval @counts = stored[:counts] @queues = stored[:queues] @saved_at = stored[:saved_at] @saved_duration = stored[:saved_duration] # skip the saved duration to continue counting @last_checked = Fluent::Engine.now - @saved_duration else log.warn "out_stats_notifier: stored data is outdated. ignore stored data" end else log.warn "out_stats_notifier: stored data is incompatible. ignore stored data" end else log.warn "out_stats_notifier: configuration param was changed. ignore stored data" end end rescue => e log.warn "out_stats_notifier: Can't load store_file #{e.class} #{e.message}" end end
save_status(file_path)
click to toggle source
Store internal status into a file
@param [String] file_path
# File lib/fluent/plugin/out_stats_notifier.rb, line 242 def save_status(file_path) return unless file_path begin Pathname.new(file_path).open('wb') do |f| @saved_at = Fluent::Engine.now @saved_duration = @saved_at - @last_checked Marshal.dump({ :counts => @counts, :queues => @queues, :saved_at => @saved_at, :saved_duration => @saved_duration, :target_key => @target_key, }, f) end rescue => e log.warn "out_stats_notifier: Can't write store_file #{e.class} #{e.message}" end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_stats_notifier.rb, line 109 def shutdown super @watcher.terminate @watcher.join save_status(@store_file) if @store_file end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_stats_notifier.rb, line 103 def start super load_status(@store_file, @interval) if @store_file @watcher = Thread.new(&method(:watcher)) end
tag_proc()
click to toggle source
# File lib/fluent/plugin/out_stats_notifier.rb, line 218 def tag_proc rstrip = Proc.new {|str, substr| str.chomp(substr) } lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str } tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix tag_fixed = @tag if @tag if tag_fixed Proc.new {|tag| tag_fixed } elsif tag_prefix_match and tag_suffix_match Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag, tag_prefix_match), tag_suffix_match)}#{tag_suffix}" } elsif tag_prefix_match Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag, tag_prefix_match)}#{tag_suffix}" } elsif tag_suffix_match Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag, tag_suffix_match)}#{tag_suffix}" } else Proc.new {|tag| "#{tag_prefix}#{tag}#{tag_suffix}" } end end
watcher()
click to toggle source
thread callback
# File lib/fluent/plugin/out_stats_notifier.rb, line 147 def watcher # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now # skip the passed time when loading @counts form file @last_checked -= @passed_time if @passed_time while true sleep 0.5 begin if Fluent::Engine.now - @last_checked >= @interval now = Fluent::Engine.now flush_emit(now - @last_checked) @last_checked = now end rescue => e log.warn "#{e.class} #{e.message} #{e.backtrace.first}" end end end