class Semlogr::Sinks::Batching
Constants
- MAX_FLUSH_ATTEMPTS
Public Class Methods
new(opts = {})
click to toggle source
# File lib/semlogr/sinks/batching.rb, line 12 def initialize(opts = {}) @flush_interval = opts[:flush_interval] || 3 @final_flush_timeout = opts[:final_flush_timeout] || 60 @batch_size = opts[:batch_size] || 1_000 @queue_max_size = opts[:queue_max_size] || 100_000 @queue = Utils::BoundedQueue.new(@queue_max_size) @flush_mutex = Mutex.new @running = false start_flush_thread at_exit { stop_flush_thread } end
Public Instance Methods
emit(log_event)
click to toggle source
# File lib/semlogr/sinks/batching.rb, line 26 def emit(log_event) return unless @running @queue.push(log_event) end
Private Instance Methods
emit_batch_with_retries(log_events)
click to toggle source
# File lib/semlogr/sinks/batching.rb, line 46 def emit_batch_with_retries(log_events) return true if log_events.empty? flush_attempts = 0 begin emit_batch(log_events) rescue StandardError => e flush_attempts += 1 SelfLogger.error("Failed to emit event batch to #{self.class}, attempts: #{flush_attempts}", e) if flush_attempts <= MAX_FLUSH_ATTEMPTS sleep 2**flush_attempts retry end return false end true end
flush()
click to toggle source
# File lib/semlogr/sinks/batching.rb, line 34 def flush @flush_mutex.synchronize do loop do log_events = @queue.pop_count(@batch_size) success = emit_batch_with_retries(log_events) break unless success break if log_events.empty? || log_events.size < @batch_size end end end
start_flush_thread()
click to toggle source
# File lib/semlogr/sinks/batching.rb, line 69 def start_flush_thread @running = true Thread.new do loop do break unless @running sleep @flush_interval flush end end end
stop_flush_thread()
click to toggle source
# File lib/semlogr/sinks/batching.rb, line 82 def stop_flush_thread @running = false Timeout.timeout(@final_flush_timeout) { flush } end