class Fluent::Plugin::NumericCounterOutput
Constants
- DEFAULT_STORAGE_TYPE
- PATTERN_MAX_NUM
Attributes
counts[RW]
last_checked[RW]
patterns[RW]
saved_at[RW]
saved_duration[RW]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_numeric_counter.rb, line 83 def configure(conf) label_routing_specified = conf.has_key?('@label') super if @unit @count_interval = case @unit when :minute then 60 when :hour then 3600 when :day then 86400 else raise "unknown unit:#{@unit}" end end @patterns = [[0, 'unmatched', nil, nil]] # counts-index, name, low, high pattern_names = ['unmatched'] invalids = conf.keys.select{|k| k =~ /^pattern(\d+)$/ and not (1..PATTERN_MAX_NUM).include?($1.to_i)} if invalids.size > 0 log.warn "invalid number patterns (valid pattern number:1-#{PATTERN_MAX_NUM}):", invalids: invalids end (1..PATTERN_MAX_NUM).each do |i| next unless conf["pattern#{i}"] name,low,high = conf["pattern#{i}"].split(/ +/, 3) @patterns.push([i, name, parse_num(low), parse_num(high)]) pattern_names.push(name) end pattern_index_list = conf.keys.select{|s| s =~ /^pattern\d$/}.map{|v| (/^pattern(\d)$/.match(v))[1].to_i} unless pattern_index_list.reduce(true){|v,i| v and @patterns[i]} raise Fluent::ConfigError, "jump of pattern index found" end unless @patterns.length == pattern_names.uniq.length raise Fluent::ConfigError, "duplicated pattern names found" end @patterns[1..-1].each do |index, name, low, high| raise Fluent::ConfigError, "numbers of low/high missing" if low.nil? raise Fluent::ConfigError, "unspecified high threshold allowed only in last pattern" if high.nil? and index != @patterns.length - 1 end if @output_per_tag && (!label_routing_specified && !@tag_prefix) raise Fluent::ConfigError, "specify @label to route output events into other <label> sections." end if @output_per_tag && @tag_prefix @tag_prefix_string = @tag_prefix + '.' else @tag_prefix_string = nil end if @input_tag_remove_prefix @removed_prefix_string = @input_tag_remove_prefix + '.' @removed_length = @removed_prefix_string.length end if @store_storage @storage = storage_create(usage: 'resume') end if system_config.workers > 1 log.warn "Fluentd is now working with multi process workers, and numeric_counter plugin will produce counter results in each separeted processes." end @counts = count_initialized @mutex = Mutex.new end
count_initialized(keys=nil)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 172 def count_initialized(keys=nil) # counts['tag'][pattern_index_num] = count # counts['tag'][-1] = sum if @aggregate == :all {'all' => Array.new(@patterns.length + 1){|i| 0}} elsif keys values = Array.new(keys.length){|i| Array.new(@patterns.length + 1){|j| 0 } } Hash[[keys, values].transpose] else {} end end
countups(tag, counts)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 187 def countups(tag, counts) if @aggregate == :all tag = 'all' end @mutex.synchronize { @counts[tag] ||= [0] * (@patterns.length + 1) sum = 0 counts.each_with_index do |count, i| sum += count @counts[tag][i] += count end @counts[tag][-1] += sum } end
flush(step)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 257 def flush(step) # returns one message flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0}) generate_output(flushed, step) end
flush_emit(step)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 267 def flush_emit(step) if @output_per_tag time = Fluent::Engine.now flush_per_tags(step).each do |tag,message| if @tag_prefix_string router.emit(@tag_prefix_string + tag, time, message) else router.emit(tag, time, message) end end else message = flush(step) if message.keys.size > 0 router.emit(@tag, Fluent::Engine.now, message) end end end
generate_fields(step, target_counts, attr_prefix, output)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 210 def generate_fields(step, target_counts, attr_prefix, output) sum = if @outcast_unmatched target_counts[1..-2].inject(:+) else target_counts[-1] end messages = target_counts.delete_at(-1) target_counts.each_with_index do |count,i| name = @patterns[i][1] output[attr_prefix + name + '_count'] = count output[attr_prefix + name + '_rate'] = ((count * 100.0) / (1.00 * step)).floor / 100.0 unless i == 0 and @outcast_unmatched output[attr_prefix + name + '_percentage'] = count * 100.0 / (1.00 * sum) if sum > 0 end if @output_messages output[attr_prefix + 'messages'] = messages end end output end
generate_output(counts, step)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 233 def generate_output(counts, step) if @aggregate == :all return generate_fields(step, counts['all'], '', {}) end output = {} counts.keys.each do |tag| generate_fields(step, counts[tag], stripped_tag(tag) + '_', output) end output end
load_status(count_interval)
click to toggle source
Load internal status from a storage
@param [Interger] count_interval
# File lib/fluent/plugin/out_numeric_counter.rb, line 328 def load_status(count_interval) stored = @storage.get(:stored_value) return unless stored begin if stored["aggregate"] == @aggregate.to_s and stored["count_key"] == @count_key and stored["patterns"] == @patterns if Fluent::Engine.now <= stored["saved_at"] + count_interval @mutex.synchronize { @counts = stored["counts"] @saved_at = stored["saved_at"] @saved_duration = stored["saved_duration"] # skip the saved duration to continue counting @last_checked = Fluent::Engine.now - @saved_duration } else log.warn "stored data is outdated. ignore stored data" end else log.warn "configuration param was changed. ignore stored data" end rescue => e log.warn "Can't load store_storage", error: e end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 149 def multi_workers_ready? true end
parse_num(str)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 71 def parse_num(str) if str.nil? nil elsif str =~ /^[-0-9]+$/ str.to_i elsif str =~ /^[-.0-9]+$/ str.to_f else Fluent::Config.size_value(str) end end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 285 def process(tag, es) c = [0] * @patterns.length es.each do |time,record| value = record[@count_key] next if value.nil? value = value.to_f matched = false @patterns.each do |index, name, low, high| next if low.nil? or value < low or (not high.nil? and value >= high) c[index] += 1 matched = true break end c[0] += 1 unless matched end countups(tag, c) end
save_status()
click to toggle source
Store internal status into a storage
# File lib/fluent/plugin/out_numeric_counter.rb, line 307 def save_status() begin @saved_at = Fluent::Engine.now @saved_duration = @saved_at - @last_checked value = { "counts" => @counts, "saved_at" => @saved_at, "saved_duration" => @saved_duration, "aggregate" => @aggregate.to_s, "count_key" => @count_key, "patterns" => @patterns, } @storage.put(:stored_value, value) rescue => e log.warn "Can't write store_storage", error: e end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_numeric_counter.rb, line 167 def shutdown save_status() if @store_storage super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_numeric_counter.rb, line 153 def start super load_status(@count_interval) if @store_storage @last_checked = Fluent::Engine.now timer_execute(:out_numeric_counter_timer, @count_interval) do now = Fluent::Engine.now flush_emit(now - @last_checked) @last_checked = now end end
stripped_tag(tag)
click to toggle source
# File lib/fluent/plugin/out_numeric_counter.rb, line 203 def stripped_tag(tag) return tag unless @input_tag_remove_prefix return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length return tag[@removed_length..-1] if tag == @input_tag_remove_prefix tag end