class Fluent::Plugin::CMetricsParserFilter

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_cmetrics_parser.rb, line 42
def configure(conf)
  super
  @serde = ::CMetrics::Serde.new
  @record_accessor = record_accessor_create(@cmetrics_metric_key)
  @labels_accessor = record_accessor_create(@cmetrics_labels_key)
  @fields_accessors = {}
  conf.elements(name: "fields").each do |e|
    e.each_pair{|k, _v|
      e.has_key?(k) # Suppress unused warnings.
      @fields_accessors[k] = record_accessor_create(k)
    }
  end
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_cmetrics_parser.rb, line 69
def filter_stream(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    data = @record_accessor.call(record)
    extra_fields = {}
    @fields_accessors.each do |key, accessor|
      extra_fields[key] = accessor.call(record)
    end
    @serde.feed_each(data) do |cmetrics|
      metrics = cmetrics.metrics
      metrics.each do |metric|
        next if metric.empty?

        metric.each do |inner|
          if @format_to_splunk_metric
            inner["name"], dims = format_to_splunk_style_with_dims(inner)
            if @dimensions_key
              inner[@dimensions_key] = dims
            else
              inner.merge!(dims)
            end
          end
          if @fields_accessors
            inner.merge!(extra_fields)
          end
          time = Time.at(inner.delete("timestamp"))
          new_es.add(Fluent::EventTime.new(time.to_i, time.nsec), inner)
        end
      end
    end
  end
  new_es
end
format_to_splunk_style_with_dims(inner) click to toggle source
# File lib/fluent/plugin/filter_cmetrics_parser.rb, line 56
def format_to_splunk_style_with_dims(inner)
  subsystem = inner.delete("subsystem")
  # labels will be treated as dimensions.
  dimensions = Hash.new(0)
  if labels = @labels_accessor.call(inner)
    labels.map {|k,v|
      dimensions[k] = v
    }
  end
  name = inner.delete("name")
  return [subsystem, name].compact.reject{|e| e.empty?}.join("."), dimensions
end