class Kafka::Statsd::ProducerSubscriber

Public Instance Methods

ack_message(event) click to toggle source
# File lib/kafka/statsd.rb, line 231
def ack_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  # Number of messages ACK'd for the topic.
  increment("producer.#{client}.#{topic}.ack.messages")

  # Histogram of delay between a message being produced and it being ACK'd.
  timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay))
end
buffer_overflow(event) click to toggle source
# File lib/kafka/statsd.rb, line 206
def buffer_overflow(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.produce.errors")
end
deliver_messages(event) click to toggle source
# File lib/kafka/statsd.rb, line 213
def deliver_messages(event)
  client = event.payload.fetch(:client_id)
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  if event.payload.key?(:exception)
    increment("producer.#{client}.deliver.errors")
  end

  timing("producer.#{client}.deliver.latency", event.duration)

  # Messages delivered to Kafka:
  count("producer.#{client}.deliver.messages", message_count)

  # Number of attempts to deliver messages:
  timing("producer.#{client}.deliver.attempts", attempts)
end
produce_message(event) click to toggle source
# File lib/kafka/statsd.rb, line 184
def produce_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f
  buffer_fill_percentage = buffer_fill_ratio * 100.0

  # This gets us the write rate.
  increment("producer.#{client}.#{topic}.produce.messages")

  timing("producer.#{client}.#{topic}.produce.message_size", message_size)

  # This gets us the avg/max buffer size per producer.
  timing("producer.#{client}.buffer.size", buffer_size)

  # This gets us the avg/max buffer fill ratio per producer.
  timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio)
  timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage)
end
topic_error(event) click to toggle source
# File lib/kafka/statsd.rb, line 242
def topic_error(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.ack.errors")
end