class WaterDrop::Instrumentation::LoggerListener

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Waterdrop flow @note It is a module as we can use it then as a part of the Karafka framework listener

as well as we can use it standalone

Public Class Methods

new(logger, log_messages: true) click to toggle source

@param logger [Object] logger we want to use @param log_messages [Boolean] Should we report the messages content (payload and metadata)

with each message operation.

This can be extensive, especially when producing a lot of messages. We provide this
despite the fact that we only report payloads in debug, because Rails by default operates
with debug level. This means, that when working with Rails in development, every single
payload dispatched will go to logs. In majority of the cases this is extensive and simply
floods the end user.
# File lib/waterdrop/instrumentation/logger_listener.rb, line 20
def initialize(logger, log_messages: true)
  @logger = logger
  @log_messages = log_messages
end

Public Instance Methods

on_buffer_flushed_async(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 94
def on_buffer_flushed_async(event)
  messages = event[:messages]

  info(event, "Async flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end
on_buffer_flushed_sync(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 105
def on_buffer_flushed_sync(event)
  messages = event[:messages]

  info(event, "Sync flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end
on_buffer_purged(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 116
def on_buffer_purged(event)
  info(event, 'Successfully purging buffer')
end
on_error_occurred(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the error details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 138
def on_error_occurred(event)
  error = event[:error]
  type = event[:type]

  error(event, "Error occurred: #{error} - #{type}")
end
on_message_buffered(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 72
def on_message_buffered(event)
  message = event[:message]

  info(event, "Buffering of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, [message])
end
on_message_produced_async(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 26
def on_message_produced_async(event)
  message = event[:message]

  info(event, "Async producing of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, message)
end
on_message_produced_sync(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 37
def on_message_produced_sync(event)
  message = event[:message]

  info(event, "Sync producing of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, message)
end
on_messages_buffered(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 83
def on_messages_buffered(event)
  messages = event[:messages]

  info(event, "Buffering of #{messages.size} messages")

  return unless log_messages?

  debug(event, [messages, messages.size])
end
on_messages_produced_async(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 48
def on_messages_produced_async(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(event, "Async producing of #{messages.size} messages to #{topics_count} topics")

  return unless log_messages?

  debug(event, messages)
end
on_messages_produced_sync(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 60
def on_messages_produced_sync(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(event, "Sync producing of #{messages.size} messages to #{topics_count} topics")

  return unless log_messages?

  debug(event, messages)
end
on_producer_closed(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details @note While this says “Closing producer”, it produces a nice message with time taken:

"Closing producer took 12 ms" indicating it happened in the past.
# File lib/waterdrop/instrumentation/logger_listener.rb, line 128
def on_producer_closed(event)
  info(event, 'Closing producer')
end
on_producer_closing(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 121
def on_producer_closing(event)
  info(event, 'Closing producer')
end
on_producer_reloaded(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 133
def on_producer_reloaded(event)
  info(event, 'Producer successfully reloaded')
end
on_transaction_aborted(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 151
def on_transaction_aborted(event)
  info(event, 'Aborting transaction')
end
on_transaction_committed(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 156
def on_transaction_committed(event)
  info(event, 'Committing transaction')
end
on_transaction_finished(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 175
def on_transaction_finished(event)
  info(event, 'Processing transaction')
end
on_transaction_marked_as_consumed(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 161
def on_transaction_marked_as_consumed(event)
  message = event[:message]
  topic = message.topic
  partition = message.partition
  offset = message.offset
  loc = "#{topic}/#{partition}"

  info(
    event,
    "Marking message with offset #{offset} for topic #{loc} as consumed in a transaction"
  )
end
on_transaction_started(event) click to toggle source

@param event [Dry::Events::Event] event that happened with the details

# File lib/waterdrop/instrumentation/logger_listener.rb, line 146
def on_transaction_started(event)
  info(event, 'Starting transaction')
end

Private Instance Methods

debug(event, log_message) click to toggle source

@param event [Dry::Events::Event] event that happened with the details @param log_message [String] message we want to publish

# File lib/waterdrop/instrumentation/logger_listener.rb, line 188
def debug(event, log_message)
  @logger.debug("[#{event[:producer_id]}] #{log_message}")
end
error(event, log_message) click to toggle source

@param event [Dry::Events::Event] event that happened with the details @param log_message [String] message we want to publish

# File lib/waterdrop/instrumentation/logger_listener.rb, line 204
def error(event, log_message)
  @logger.error("[#{event[:producer_id]}] #{log_message}")
end
info(event, log_message) click to toggle source

@param event [Dry::Events::Event] event that happened with the details @param log_message [String] message we want to publish

# File lib/waterdrop/instrumentation/logger_listener.rb, line 194
def info(event, log_message)
  if event.payload.key?(:time)
    @logger.info("[#{event[:producer_id]}] #{log_message} took #{event[:time].round(2)} ms")
  else
    @logger.info("[#{event[:producer_id]}] #{log_message}")
  end
end
log_messages?() click to toggle source

@return [Boolean] should we report the messages details in the debug mode.

# File lib/waterdrop/instrumentation/logger_listener.rb, line 182
def log_messages?
  @log_messages
end