class Fluent::MetricSenseOutput
Constants
- AggregationKey
- BACKENDS
Public Class Methods
register_backend(name, klass)
click to toggle source
# File lib/fluent/plugin/out_metricsense.rb, line 26 def self.register_backend(name, klass) BACKENDS[name] = klass end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_metricsense.rb, line 76 def configure(conf) super if @remove_tag_prefix @remove_tag_prefix = Regexp.new('^' + Regexp.escape(@remove_tag_prefix) + "\\.?") end @no_segment_keys = (conf.has_key?('no_segment_keys') && (conf['no_segment_keys'].empty? || conf['no_segment_keys'] == 'true')) if @only_segment_keys @only_segment_keys = @only_segment_keys.strip.split(/\s*,\s*/) end if @exclude_segment_keys @exclude_segment_keys = @exclude_segment_keys.strip.split(/\s*,\s*/) end be = BACKENDS[@backend] unless be raise ConfigError, "unknown backend: #{@backend.inspect}" end # aggregate_interval must be a multiple of 60 to normalize values # into X per minute @aggregate_interval = @aggregate_interval.to_i / 60 * 60 @normalize_factor = @aggregate_interval / 60 @backend = be.new @backend.log = log @backend.configure(conf) end
format_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/out_metricsense.rb, line 118 def format_stream(tag, es) # modify tag tag = tag.sub(@remove_tag_prefix, '') if @remove_tag_prefix tag = "#{add_tag_prefix}.#{tag}" if @add_tag_prefix out = '' es.each do |time,record| # dup record to modify record = record.dup # get value value = record.delete(@value_key) # ignore record if value is missing next if value.nil? # ignore record if value is invalid begin fv = value.to_f rescue next end next if fv.nan? || fv.infinite? # use integer if value.to_f == value.to_f.to_i iv = fv.to_i if iv.to_f == fv value = iv else value = fv end # get update_mode key update_mode = record.delete(@update_mode_key) case update_mode when "max" update_mode = UpdateMode::MAX when "average" update_mode = UpdateMode::AVERAGE when "count" update_mode = UpdateMode::COUNT else # default is add update_mode = UpdateMode::ADD end # get segments if @no_segment_keys segments = {} else if @only_segment_keys segments = {} @only_segment_keys.each {|key| if v = record[key] segments[key] = v end } else segments = record end if @exclude_segment_keys @exclude_segment_keys.each {|key| segments.delete(key) } end end [tag, time, value, segments, update_mode].to_msgpack(out) end out end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_metricsense.rb, line 113 def shutdown super @backend.shutdown end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_metricsense.rb, line 108 def start @backend.start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_metricsense.rb, line 259 def write(chunk) simple_counters = {} segmented_counters = {} # select sum(value) from chunk group by tag, time/60, seg_val, seg_key chunk.msgpack_each {|tag,time,value,segments,update_mode| begin time = time / @aggregate_interval * @aggregate_interval case update_mode when UpdateMode::ADD updater = AddUpdater when UpdateMode::MAX updater = MaxUpdater when UpdateMode::AVERAGE # AVERAGE uses MaxUpdater and calculate average on server-side aggregation updater = AverageUpdater when UpdateMode::COUNT updater = CountUpdater else # default is AddUpdater updater = AddUpdater end # simple values ak = AggregationKey.new(tag, time, nil, nil) (simple_counters[ak] ||= updater.new).add(value) # segmented values segments.each_pair {|seg_key,seg_val| ak = AggregationKey.new(tag, time, seg_val, seg_key) (segmented_counters[ak] ||= updater.new).add(value) } rescue StandardError => e log.warn("ignoring broken chunk: #{e.inspect} - " + [tag, time, value, segments, update_mode].inspect) end } counters = segmented_counters counters.merge!(simple_counters) data = [] counters.each_pair {|ak,up| data << [ak.tag, ak.time, up.normalized_value(@normalize_factor), ak.seg_key, ak.seg_val, up.mode] } @backend.write(data) end