class WaterDrop::Instrumentation::Vendors::Datadog::MetricsListener

Listener that can be used to subscribe to WaterDrop producer to receive stats via StatsD and/or Datadog

@note You need to setup the ‘dogstatsd-ruby` client and assign it

Constants

RdKafkaMetric

Value object for storing a single rdkafka metric publishing details

Public Class Methods

new(&block) click to toggle source

@param block [Proc] configuration block

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 56
def initialize(&block)
  configure
  setup(&block) if block
end

Public Instance Methods

on_error_occurred(_event) click to toggle source

Increases the errors count by 1

@param _event [Karafka::Core::Monitoring::Event]

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 81
def on_error_occurred(_event)
  count('error_occurred', 1, tags: default_tags)
end
on_message_acknowledged(_event) click to toggle source

Increases acknowledged messages counter @param _event [Karafka::Core::Monitoring::Event]

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 87
def on_message_acknowledged(_event)
  increment('acknowledged', tags: default_tags)
end
on_statistics_emitted(event) click to toggle source

Hooks up to WaterDrop instrumentation for emitted statistics

@param event [Karafka::Core::Monitoring::Event]

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 70
def on_statistics_emitted(event)
  statistics = event[:statistics]

  rd_kafka_metrics.each do |metric|
    report_metric(metric, statistics)
  end
end
setup(&block) click to toggle source

@param block [Proc] configuration block @note We define this alias to be consistent with ‘WaterDrop#setup`

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 63
def setup(&block)
  configure(&block)
end

Private Instance Methods

namespaced_metric(metric_name) click to toggle source

Wraps metric name in listener’s namespace @param metric_name [String] RdKafkaMetric name @return [String]

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 172
def namespaced_metric(metric_name)
  "#{namespace}.#{metric_name}"
end
report_message(topic, method_name) click to toggle source

Report that a message has been produced to a topic. @param topic [String] Kafka topic @param method_name [Symbol] method from which this message operation comes

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 165
def report_message(topic, method_name)
  increment(method_name, tags: default_tags + ["topic:#{topic}"])
end
report_metric(metric, statistics) click to toggle source

Reports a given metric statistics to Datadog @param metric [RdKafkaMetric] metric value object @param statistics [Hash] hash with all the statistics emitted

# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 179
def report_metric(metric, statistics)
  case metric.scope
  when :root
    public_send(
      metric.type,
      metric.name,
      statistics.fetch(*metric.key_location),
      tags: default_tags
    )
  when :brokers
    statistics.fetch('brokers').each_value do |broker_statistics|
      # Skip bootstrap nodes
      # Bootstrap nodes have nodeid -1, other nodes have positive
      # node ids
      next if broker_statistics['nodeid'] == -1

      public_send(
        metric.type,
        metric.name,
        broker_statistics.dig(*metric.key_location),
        tags: default_tags + ["broker:#{broker_statistics['nodename']}"]
      )
    end
  when :topics
    statistics.fetch('topics').each_value do |topic_statistics|
      public_send(
        metric.type,
        metric.name,
        topic_statistics.dig(*metric.key_location),
        tags: default_tags + ["topic:#{topic_statistics['topic']}"]
      )
    end
  else
    raise ArgumentError, metric.scope
  end
end