class Semlogr::Sinks::Batching

Constants

MAX_FLUSH_ATTEMPTS

Public Class Methods

new(opts = {}) click to toggle source
# File lib/semlogr/sinks/batching.rb, line 12
def initialize(opts = {})
  @flush_interval = opts[:flush_interval] || 3
  @final_flush_timeout = opts[:final_flush_timeout] || 60
  @batch_size = opts[:batch_size] || 1_000
  @queue_max_size = opts[:queue_max_size] || 100_000
  @queue = Utils::BoundedQueue.new(@queue_max_size)
  @flush_mutex = Mutex.new
  @running = false

  start_flush_thread

  at_exit { stop_flush_thread }
end

Public Instance Methods

emit(log_event) click to toggle source
# File lib/semlogr/sinks/batching.rb, line 26
def emit(log_event)
  return unless @running

  @queue.push(log_event)
end

Private Instance Methods

emit_batch_with_retries(log_events) click to toggle source
# File lib/semlogr/sinks/batching.rb, line 46
def emit_batch_with_retries(log_events)
  return true if log_events.empty?

  flush_attempts = 0

  begin
    emit_batch(log_events)
  rescue StandardError => e
    flush_attempts += 1

    SelfLogger.error("Failed to emit event batch to #{self.class}, attempts: #{flush_attempts}", e)

    if flush_attempts <= MAX_FLUSH_ATTEMPTS
      sleep 2**flush_attempts
      retry
    end

    return false
  end

  true
end
flush() click to toggle source
# File lib/semlogr/sinks/batching.rb, line 34
def flush
  @flush_mutex.synchronize do
    loop do
      log_events = @queue.pop_count(@batch_size)
      success = emit_batch_with_retries(log_events)

      break unless success
      break if log_events.empty? || log_events.size < @batch_size
    end
  end
end
start_flush_thread() click to toggle source
# File lib/semlogr/sinks/batching.rb, line 69
def start_flush_thread
  @running = true

  Thread.new do
    loop do
      break unless @running

      sleep @flush_interval
      flush
    end
  end
end
stop_flush_thread() click to toggle source
# File lib/semlogr/sinks/batching.rb, line 82
def stop_flush_thread
  @running = false

  Timeout.timeout(@final_flush_timeout) { flush }
end