class WaterDrop::Instrumentation::Vendors::Datadog::MetricsListener
Listener that can be used to subscribe to WaterDrop
producer to receive stats via StatsD and/or Datadog
@note You need to setup the ‘dogstatsd-ruby` client and assign it
Constants
- RdKafkaMetric
Value object for storing a single rdkafka metric publishing details
Public Class Methods
@param block [Proc] configuration block
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 56 def initialize(&block) configure setup(&block) if block end
Public Instance Methods
Increases the errors count by 1
@param _event [Karafka::Core::Monitoring::Event]
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 81 def on_error_occurred(_event) count('error_occurred', 1, tags: default_tags) end
Increases acknowledged messages counter @param _event [Karafka::Core::Monitoring::Event]
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 87 def on_message_acknowledged(_event) increment('acknowledged', tags: default_tags) end
Hooks up to WaterDrop
instrumentation for emitted statistics
@param event [Karafka::Core::Monitoring::Event]
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 70 def on_statistics_emitted(event) statistics = event[:statistics] rd_kafka_metrics.each do |metric| report_metric(metric, statistics) end end
@param block [Proc] configuration block @note We define this alias to be consistent with ‘WaterDrop#setup`
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 63 def setup(&block) configure(&block) end
Private Instance Methods
Wraps metric name in listener’s namespace @param metric_name [String] RdKafkaMetric
name @return [String]
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 172 def namespaced_metric(metric_name) "#{namespace}.#{metric_name}" end
Report that a message has been produced to a topic. @param topic [String] Kafka topic @param method_name [Symbol] method from which this message operation comes
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 165 def report_message(topic, method_name) increment(method_name, tags: default_tags + ["topic:#{topic}"]) end
Reports a given metric statistics to Datadog
@param metric [RdKafkaMetric] metric value object @param statistics [Hash] hash with all the statistics emitted
# File lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb, line 179 def report_metric(metric, statistics) case metric.scope when :root public_send( metric.type, metric.name, statistics.fetch(*metric.key_location), tags: default_tags ) when :brokers statistics.fetch('brokers').each_value do |broker_statistics| # Skip bootstrap nodes # Bootstrap nodes have nodeid -1, other nodes have positive # node ids next if broker_statistics['nodeid'] == -1 public_send( metric.type, metric.name, broker_statistics.dig(*metric.key_location), tags: default_tags + ["broker:#{broker_statistics['nodename']}"] ) end when :topics statistics.fetch('topics').each_value do |topic_statistics| public_send( metric.type, metric.name, topic_statistics.dig(*metric.key_location), tags: default_tags + ["topic:#{topic_statistics['topic']}"] ) end else raise ArgumentError, metric.scope end end