class Fluent::MetricSenseOutput

Constants

AggregationKey
BACKENDS

Public Class Methods

register_backend(name, klass) click to toggle source
# File lib/fluent/plugin/out_metricsense.rb, line 26
def self.register_backend(name, klass)
  BACKENDS[name] = klass
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_metricsense.rb, line 76
def configure(conf)
  super

  if @remove_tag_prefix
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(@remove_tag_prefix) + "\\.?")
  end

  @no_segment_keys = (conf.has_key?('no_segment_keys') && (conf['no_segment_keys'].empty? || conf['no_segment_keys'] == 'true'))

  if @only_segment_keys
    @only_segment_keys = @only_segment_keys.strip.split(/\s*,\s*/)
  end

  if @exclude_segment_keys
    @exclude_segment_keys = @exclude_segment_keys.strip.split(/\s*,\s*/)
  end

  be = BACKENDS[@backend]
  unless be
    raise ConfigError, "unknown backend: #{@backend.inspect}"
  end

  # aggregate_interval must be a multiple of 60 to normalize values
  # into X per minute
  @aggregate_interval = @aggregate_interval.to_i / 60 * 60
  @normalize_factor = @aggregate_interval / 60

  @backend = be.new
  @backend.log = log
  @backend.configure(conf)
end
format_stream(tag, es) click to toggle source
# File lib/fluent/plugin/out_metricsense.rb, line 118
def format_stream(tag, es)
  # modify tag
  tag = tag.sub(@remove_tag_prefix, '') if @remove_tag_prefix
  tag = "#{add_tag_prefix}.#{tag}" if @add_tag_prefix

  out = ''
  es.each do |time,record|
    # dup record to modify
    record = record.dup

    # get value
    value = record.delete(@value_key)

    # ignore record if value is missing
    next if value.nil?

    # ignore record if value is invalid
    begin
      fv = value.to_f
    rescue
      next
    end
    next if fv.nan? || fv.infinite?

    # use integer if value.to_f == value.to_f.to_i
    iv = fv.to_i
    if iv.to_f == fv
      value = iv
    else
      value = fv
    end

    # get update_mode key
    update_mode = record.delete(@update_mode_key)
    case update_mode
    when "max"
      update_mode = UpdateMode::MAX
    when "average"
      update_mode = UpdateMode::AVERAGE
    when "count"
      update_mode = UpdateMode::COUNT
    else
      # default is add
      update_mode = UpdateMode::ADD
    end

    # get segments
    if @no_segment_keys
      segments = {}
    else
      if @only_segment_keys
        segments = {}
        @only_segment_keys.each {|key|
          if v = record[key]
            segments[key] = v
          end
        }
      else
        segments = record
      end
      if @exclude_segment_keys
        @exclude_segment_keys.each {|key|
          segments.delete(key)
        }
      end
    end

    [tag, time, value, segments, update_mode].to_msgpack(out)
  end

  out
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_metricsense.rb, line 113
def shutdown
  super
  @backend.shutdown
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_metricsense.rb, line 108
def start
  @backend.start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_metricsense.rb, line 259
def write(chunk)
  simple_counters = {}
  segmented_counters = {}

  # select sum(value) from chunk group by tag, time/60, seg_val, seg_key
  chunk.msgpack_each {|tag,time,value,segments,update_mode|
    begin
      time = time / @aggregate_interval * @aggregate_interval

      case update_mode
      when UpdateMode::ADD
        updater = AddUpdater
      when UpdateMode::MAX
        updater = MaxUpdater
      when UpdateMode::AVERAGE # AVERAGE uses MaxUpdater and calculate average on server-side aggregation
        updater = AverageUpdater
      when UpdateMode::COUNT
        updater = CountUpdater
      else  # default is AddUpdater
        updater = AddUpdater
      end

      # simple values
      ak = AggregationKey.new(tag, time, nil, nil)
      (simple_counters[ak] ||= updater.new).add(value)

      # segmented values
      segments.each_pair {|seg_key,seg_val|
        ak = AggregationKey.new(tag, time, seg_val, seg_key)
        (segmented_counters[ak] ||= updater.new).add(value)
      }
    rescue StandardError => e
      log.warn("ignoring broken chunk: #{e.inspect} - " + [tag, time, value, segments, update_mode].inspect)
    end
  }

  counters = segmented_counters
  counters.merge!(simple_counters)

  data = []
  counters.each_pair {|ak,up|
    data << [ak.tag, ak.time, up.normalized_value(@normalize_factor), ak.seg_key, ak.seg_val, up.mode]
  }

  @backend.write(data)
end