class SemanticLogger::Appender::AsyncBatch
Log
asynchronously in batches using a separate thread.
Log
messages are grouped up and only logged when:
-
The number of queued messages is exceeded.
-
Or, the appropriate amount of time has passed since the last batch was sent.
Attributes
batch_seconds[RW]
batch_size[RW]
signal[R]
Public Class Methods
new(appender:, max_queue_size: 10_000, lag_threshold_s: 30, batch_size: 300, batch_seconds: 5)
click to toggle source
Batching Appender
proxy for appenders that support batches.
Parameters:
batch_size: [Integer] Maximum number of messages to batch up before sending. Default: 300 batch_seconds: [Integer] Maximum number of seconds between sending batches. Default: 5
See SemanticLogger::Appender::Async
for other paramaters
Note:
-
`lag_check_interval` is not applicable to batches, since the first message of every batch is the oldest and is always checked to see if the lag interval has been exceeded.
Calls superclass method
SemanticLogger::Appender::Async::new
# File lib/semantic_logger/appender/async_batch.rb, line 30 def initialize(appender:, max_queue_size: 10_000, lag_threshold_s: 30, batch_size: 300, batch_seconds: 5) @batch_size = batch_size @batch_seconds = batch_seconds @signal = Concurrent::Event.new super( appender: appender, max_queue_size: max_queue_size, lag_threshold_s: lag_threshold_s ) return if appender.respond_to?(:batch) raise(ArgumentError, "#{appender.class.name} does not support batching. It must implement #batch") end
Public Instance Methods
log(log)
click to toggle source
Add log message for processing.
Calls superclass method
SemanticLogger::Appender::Async#log
# File lib/semantic_logger/appender/async_batch.rb, line 51 def log(log) result = super(log) # Wake up the processing thread since the number of queued messages has been exceeded. signal.set if queue.size >= batch_size result end
Private Instance Methods
process_messages()
click to toggle source
Separate thread for batching up log messages before writing.
# File lib/semantic_logger/appender/async_batch.rb, line 61 def process_messages loop do # Wait for batch interval or number of messages to be exceeded. signal.wait(batch_seconds) logs = [] first = true message_count = queue.length message_count.times do # Queue#pop(true) raises an exception when there are no more messages, which is considered expensive. message = queue.pop if message.is_a?(Log) logs << message if first check_lag(message) first = false end else process_message(message) end end appender.batch(logs) if logs.size.positive? signal.reset unless queue.size >= batch_size end end
submit_request(command)
click to toggle source
Calls superclass method
SemanticLogger::Appender::Async#submit_request
# File lib/semantic_logger/appender/async_batch.rb, line 87 def submit_request(command) # Wake up the processing thread to process this command immediately. signal.set super end