class Datadog::Statsd::MessageBuffer

Constants

PAYLOAD_SIZE_TOLERANCE

Attributes

buffer[R]
connection[R]
max_payload_size[R]
max_pool_size[R]
overflowing_stategy[R]

Public Class Methods

new(connection, max_payload_size: nil, max_pool_size: DEFAULT_BUFFER_POOL_SIZE, overflowing_stategy: :drop, serializer: ) click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 8
def initialize(connection,
  max_payload_size: nil,
  max_pool_size: DEFAULT_BUFFER_POOL_SIZE,
  overflowing_stategy: :drop,
  serializer:
)
  raise ArgumentError, 'max_payload_size keyword argument must be provided' unless max_payload_size
  raise ArgumentError, 'max_pool_size keyword argument must be provided' unless max_pool_size

  @connection = connection
  @max_payload_size = max_payload_size
  @max_pool_size = max_pool_size
  @overflowing_stategy = overflowing_stategy
  @serializer = serializer

  @buffer = String.new
  clear_buffer
end

Public Instance Methods

add(message) click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 27
def add(message)
  # Serializes the message if it hasn't been already. Part of the
  # delay_serialization feature.
  if message.is_a?(Array)
    stat, delta, type, tags, sample_rate = message
    message = @serializer.to_stat(stat, delta, type, tags: tags, sample_rate: sample_rate)
  end

  message_size = message.bytesize

  return nil unless message_size > 0 # to avoid adding empty messages to the buffer
  return nil unless ensure_sendable!(message_size)

  flush if should_flush?(message_size)

  buffer << "\n" unless buffer.empty?
  buffer << message

  @message_count += 1

  # flush when we're pretty sure that we won't be able
  # to add another message to the buffer
  flush if preemptive_flush?

  true
end
flush() click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 59
def flush
  return if buffer.empty?

  connection.write(buffer)
  clear_buffer
end
reset() click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 54
def reset
  clear_buffer
  connection.reset_telemetry
end

Private Instance Methods

bytesize_threshold() click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 101
def bytesize_threshold
  @bytesize_threshold ||= (max_payload_size - PAYLOAD_SIZE_TOLERANCE * max_payload_size).to_i
end
clear_buffer() click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 82
def clear_buffer
  buffer.clear
  @message_count = 0
end
ensure_sendable!(message_size) click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 91
def ensure_sendable!(message_size)
  return true if message_size <= max_payload_size

  if overflowing_stategy == :raise
    raise Error, 'Message too big for payload limit'
  end

  false
end
preemptive_flush?() click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 87
def preemptive_flush?
  @message_count == max_pool_size || buffer.bytesize > bytesize_threshold
end
should_flush?(message_size) click to toggle source
# File lib/datadog/statsd/message_buffer.rb, line 76
def should_flush?(message_size)
  return true if buffer.bytesize + 1 + message_size >= max_payload_size

  false
end