class Fluent::Plugin::NumericMonitorOutput

Constants

EMIT_STREAM_RECORDS

Attributes

count[RW]
last_checked[RW]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_numeric_monitor.rb, line 36
def configure(conf)
  label_routing_specified = conf.has_key?('@label')

  super

  if @unit
    @count_interval = case @unit
                      when :minute then 60
                      when :hour then 3600
                      when :day then 86400
                      else
                        raise "unknown unit: #{@unit}"
                      end
  end

  if @input_tag_remove_prefix
    @removed_prefix_string = @input_tag_remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  @key_prefix_string = ''
  if @output_key_prefix
    @key_prefix_string = @output_key_prefix + '_'
  end

  if @output_per_tag && (!label_routing_specified && !@tag_prefix)
    raise Fluent::ConfigError, "specify @label to route output events into other <label> sections."
  end
  if @output_per_tag && @tag_prefix
    @tag_prefix_string = @tag_prefix + '.'
  else
    @tag_prefix_string = nil
  end

  if system_config.workers > 1
    log.warn "Fluentd is now working with multi process workers, and numeric_monitor plugin will produce monitor results in each separated processes."
  end

  @count = count_initialized
  @mutex = Mutex.new
end
count_initialized(keys=nil) click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 93
def count_initialized(keys=nil)
  # counts['tag'] = {:min => num, :max => num, :sum => num, :num => num [, :sample => [....]]}
  if @aggregate == :all
    if @percentiles
      {'all' => {min: nil, max: nil, sum: nil, num: 0, sample: []}}
    else
      {'all' => {min: nil, max: nil, sum: nil, num: 0}}
    end
  elsif keys
    values = if @percentiles
               Array.new(keys.length) {|i| {min: nil, max: nil, sum: nil, num: 0, sample: []}}
             else
               Array.new(keys.length) {|i| {min: nil, max: nil, sum: nil, num: 0}}
             end
    Hash[[keys, values].transpose]
  else
    {}
  end
end
countups(tag, min, max, sum, num, sample) click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 187
def countups(tag, min, max, sum, num, sample)
  if @aggregate == :all
    tag = 'all'
  end

  @mutex.synchronize do
    c = (@count[tag] ||= {min: nil, max: nil, sum: nil, num: 0})

    if c[:min].nil? or c[:min] > min
      c[:min] = min
    end
    if c[:max].nil? or c[:max] < max
      c[:max] = max
    end
    c[:sum] = (c[:sum] || 0) + sum
    c[:num] += num

    if @percentiles
      c[:sample] ||= []
      if c[:sample].size + sample.size > @samples_limit
        (c[:sample].size + sample.size - @samples_limit).times do
          c[:sample].delete_at(rand(c[:sample].size))
        end
      end
      c[:sample] += sample
    end
  end
end
flush() click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 167
def flush
  flushed,@count = @count,count_initialized(@count.keys.dup)
  generate_output(flushed)
end
flush_emit() click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 172
def flush_emit
  if @output_per_tag
    time = Fluent::Engine.now
    flush.each do |tag, message|
      if @tag_prefix_string
        router.emit(@tag_prefix_string + tag, time, message)
      else
        router.emit(tag, time, message)
      end
    end
  else
    router.emit(@tag, Fluent::Engine.now, flush)
  end
end
generate_fields(count, key_prefix = '', output = {}) click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 120
def generate_fields(count, key_prefix = '', output = {})
  output[key_prefix + 'num'] = count[:num] if count[:num]
  output[key_prefix + 'min'] = count[:min] if count[:min]
  output[key_prefix + 'max'] = count[:max] if count[:max]
  output[key_prefix + 'avg'] = (count[:sum] / (count[:num] * 1.0)) if count[:num] > 0
  output[key_prefix + 'sum'] = count[:sum] if count[:sum]
  if @percentiles
    sorted = count[:sample].sort
    @percentiles.each do |p|
      i = (count[:num] * p / 100).floor
      if i > 0
        i -= 1
      end
      output[key_prefix + "percentile_#{p}"] = sorted[i]
    end
  end
  output
end
generate_output(count) click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 139
def generate_output(count)
  if @aggregate == :all
    if @output_per_tag
      # tag_prefix_all: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... } }
      output = {'all' => generate_fields(count['all'], @key_prefix_string)}
    else
      # tag: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... }
      output = generate_fields(count['all'], @key_prefix_string)
    end
  else
    output = {}
    if @output_per_tag
      # tag_prefix_tag1: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... }
      # tag_prefix_tag2: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... }
      count.keys.each do |tag|
        output[stripped_tag(tag)] = generate_fields(count[tag], @key_prefix_string)
      end
    else
      # tag: { 'key_prefix_tag1_min' => -10, 'key_prefix_tag1_max' => 10, ..., 'key_prefix_tag2_min' => -10, 'key_prefix_tag2_max' => 10, ... }
      count.keys.each do |tag|
        key_prefix = @key_prefix_string + stripped_tag(tag) + '_'
        generate_fields(count[tag], key_prefix, output)
      end
    end
  end
  output
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 78
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 216
def process(tag, es)
  min = nil
  max = nil
  sum = 0
  num = 0
  sample = if @percentiles then [] else nil end

  es.each do |time,record|
    value = record[@monitor_key]
    next if value.nil?

    value = value.to_f
    if min.nil? or min > value
      min = value
    end
    if max.nil? or max < value
      max = value
    end
    sum += value
    num += 1

    if @percentiles
      sample.push(value)
    end
  end
  if @percentiles && sample.size > @samples_limit
    (sample.size - @samples_limit / 2).to_i.times do
      sample.delete_at(rand(sample.size))
    end
  end
  countups(tag, min, max, sum, num, sample)
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_numeric_monitor.rb, line 82
def start
  super

  @last_checked = Fluent::Engine.now
  timer_execute(:out_numeric_counter_watcher, @count_interval) do
    now = Fluent::Engine.now
    flush_emit
    @last_checked = now
  end
end
stripped_tag(tag) click to toggle source
# File lib/fluent/plugin/out_numeric_monitor.rb, line 113
def stripped_tag(tag)
  return tag unless @input_tag_remove_prefix
  return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
  return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
  tag
end