class SemanticLogger::Appender::AsyncBatch

Log asynchronously in batches using a separate thread.

Log messages are grouped up and only logged when:

Attributes

batch_seconds[RW]
batch_size[RW]
signal[R]

Public Class Methods

new(appender:, max_queue_size: 10_000, lag_threshold_s: 30, batch_size: 300, batch_seconds: 5) click to toggle source

Batching Appender proxy for appenders that support batches.

Parameters:

batch_size: [Integer]
  Maximum number of messages to batch up before sending.
  Default: 300

batch_seconds: [Integer]
  Maximum number of seconds between sending batches.
  Default: 5

See SemanticLogger::Appender::Async for other paramaters

Note:

  • `lag_check_interval` is not applicable to batches, since the first message of every batch is the oldest and is always checked to see if the lag interval has been exceeded.

Calls superclass method SemanticLogger::Appender::Async::new
# File lib/semantic_logger/appender/async_batch.rb, line 30
def initialize(appender:,
               max_queue_size: 10_000,
               lag_threshold_s: 30,
               batch_size: 300,
               batch_seconds: 5)

  @batch_size    = batch_size
  @batch_seconds = batch_seconds
  @signal        = Concurrent::Event.new
  super(
    appender:        appender,
    max_queue_size:  max_queue_size,
    lag_threshold_s: lag_threshold_s
  )

  return if appender.respond_to?(:batch)

  raise(ArgumentError, "#{appender.class.name} does not support batching. It must implement #batch")
end

Public Instance Methods

log(log) click to toggle source

Add log message for processing.

Calls superclass method SemanticLogger::Appender::Async#log
# File lib/semantic_logger/appender/async_batch.rb, line 51
def log(log)
  result = super(log)
  # Wake up the processing thread since the number of queued messages has been exceeded.
  signal.set if queue.size >= batch_size
  result
end

Private Instance Methods

process_messages() click to toggle source

Separate thread for batching up log messages before writing.

# File lib/semantic_logger/appender/async_batch.rb, line 61
def process_messages
  loop do
    # Wait for batch interval or number of messages to be exceeded.
    signal.wait(batch_seconds)

    logs          = []
    first         = true
    message_count = queue.length
    message_count.times do
      # Queue#pop(true) raises an exception when there are no more messages, which is considered expensive.
      message = queue.pop
      if message.is_a?(Log)
        logs << message
        if first
          check_lag(message)
          first = false
        end
      else
        process_message(message)
      end
    end
    appender.batch(logs) if logs.size.positive?
    signal.reset unless queue.size >= batch_size
  end
end
submit_request(command) click to toggle source
# File lib/semantic_logger/appender/async_batch.rb, line 87
def submit_request(command)
  # Wake up the processing thread to process this command immediately.
  signal.set
  super
end