class Datadog::Statsd::SingleThreadSender

The SingleThreadSender is a sender synchronously buffering messages in a ‘MessageBuffer`. It is using current Process.PID to check it is the result of a recent fork and it is reseting the MessageBuffer if that’s the case.

Public Class Methods

new(message_buffer, logger: nil, flush_interval: nil, queue_size: 1) click to toggle source
# File lib/datadog/statsd/single_thread_sender.rb, line 10
def initialize(message_buffer, logger: nil, flush_interval: nil, queue_size: 1)
  @message_buffer = message_buffer
  @logger = logger
  @mx = Mutex.new
  @message_queue_size = queue_size
  @message_queue = []
  @flush_timer = if flush_interval
    Datadog::Statsd::Timer.new(flush_interval) { flush }
  else
    nil
  end
  # store the pid for which this sender has been created
  update_fork_pid
end

Public Instance Methods

add(message) click to toggle source
# File lib/datadog/statsd/single_thread_sender.rb, line 25
def add(message)
  @mx.synchronize {
    # we have just forked, meaning we have messages in the buffer that we should
    # not send, they belong to the parent process, let's clear the buffer.
    if forked?
      @message_buffer.reset
      @message_queue.clear
      @flush_timer.start if @flush_timer && @flush_timer.stop?
      update_fork_pid
    end

    @message_queue << message
    if @message_queue.size >= @message_queue_size
      drain_message_queue
    end
  }
end
flush(*) click to toggle source
# File lib/datadog/statsd/single_thread_sender.rb, line 43
def flush(*)
  @mx.synchronize {
    drain_message_queue
    @message_buffer.flush()
  }
end
rendez_vous() click to toggle source

Compatibility with ‘Sender`

# File lib/datadog/statsd/single_thread_sender.rb, line 59
def rendez_vous()
end
start() click to toggle source
# File lib/datadog/statsd/single_thread_sender.rb, line 50
def start()
  @flush_timer.start if @flush_timer
end
stop() click to toggle source
# File lib/datadog/statsd/single_thread_sender.rb, line 54
def stop()
  @flush_timer.stop if @flush_timer
end

Private Instance Methods

drain_message_queue() click to toggle source
# File lib/datadog/statsd/single_thread_sender.rb, line 64
def drain_message_queue
  while msg = @message_queue.shift
    @message_buffer.add(msg)
  end
end
forked?() click to toggle source

below are “fork management” methods to be able to clean the MessageBuffer if it detects that it is running in a unknown PID.

# File lib/datadog/statsd/single_thread_sender.rb, line 73
def forked?
  Process.pid != @fork_pid
end
update_fork_pid() click to toggle source
# File lib/datadog/statsd/single_thread_sender.rb, line 77
def update_fork_pid
  @fork_pid = Process.pid
end