class Fluent::Plugin::CMetricsParserFilter
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_cmetrics_parser.rb, line 42 def configure(conf) super @serde = ::CMetrics::Serde.new @record_accessor = record_accessor_create(@cmetrics_metric_key) @labels_accessor = record_accessor_create(@cmetrics_labels_key) @fields_accessors = {} conf.elements(name: "fields").each do |e| e.each_pair{|k, _v| e.has_key?(k) # Suppress unused warnings. @fields_accessors[k] = record_accessor_create(k) } end end
filter_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/filter_cmetrics_parser.rb, line 69 def filter_stream(tag, es) new_es = Fluent::MultiEventStream.new es.each do |time, record| data = @record_accessor.call(record) extra_fields = {} @fields_accessors.each do |key, accessor| extra_fields[key] = accessor.call(record) end @serde.feed_each(data) do |cmetrics| metrics = cmetrics.metrics metrics.each do |metric| next if metric.empty? metric.each do |inner| if @format_to_splunk_metric inner["name"], dims = format_to_splunk_style_with_dims(inner) if @dimensions_key inner[@dimensions_key] = dims else inner.merge!(dims) end end if @fields_accessors inner.merge!(extra_fields) end time = Time.at(inner.delete("timestamp")) new_es.add(Fluent::EventTime.new(time.to_i, time.nsec), inner) end end end end new_es end
format_to_splunk_style_with_dims(inner)
click to toggle source
# File lib/fluent/plugin/filter_cmetrics_parser.rb, line 56 def format_to_splunk_style_with_dims(inner) subsystem = inner.delete("subsystem") # labels will be treated as dimensions. dimensions = Hash.new(0) if labels = @labels_accessor.call(inner) labels.map {|k,v| dimensions[k] = v } end name = inner.delete("name") return [subsystem, name].compact.reject{|e| e.empty?}.join("."), dimensions end