class Datadog::Statsd::Forwarder

Attributes

connection[R]
sender[R]
telemetry[R]
transport_type[R]

Public Class Methods

new( connection_cfg: nil, buffer_max_payload_size: nil, buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, buffer_flush_interval: nil, sender_queue_size: nil, telemetry_flush_interval: nil, global_tags: [], single_thread: false, logger: nil, serializer: ) click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 9
def initialize(
  connection_cfg: nil,

  buffer_max_payload_size: nil,
  buffer_max_pool_size: nil,
  buffer_overflowing_stategy: :drop,
  buffer_flush_interval: nil,

  sender_queue_size: nil,

  telemetry_flush_interval: nil,
  global_tags: [],

  single_thread: false,

  logger: nil,

  serializer:
)
  @transport_type = connection_cfg.transport_type

  @telemetry = if telemetry_flush_interval
    Telemetry.new(telemetry_flush_interval,
      global_tags: global_tags,
      transport_type: @transport_type
    )
  else
    nil
  end

  @connection = connection_cfg.make_connection(logger: logger, telemetry: telemetry)

  # Initialize buffer
  buffer_max_payload_size ||= (@transport_type == :udp ?
                               UDP_DEFAULT_BUFFER_SIZE : UDS_DEFAULT_BUFFER_SIZE)

  if buffer_max_payload_size <= 0
    raise ArgumentError, 'buffer_max_payload_size cannot be <= 0'
  end

  unless telemetry.nil? || telemetry.would_fit_in?(buffer_max_payload_size)
    raise ArgumentError, "buffer_max_payload_size is not high enough to use telemetry (tags=(#{global_tags.inspect}))"
  end

  buffer = MessageBuffer.new(@connection,
    max_payload_size: buffer_max_payload_size,
    max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
    overflowing_stategy: buffer_overflowing_stategy,
    serializer: serializer
  )

  sender_queue_size ||= 1 if single_thread
  sender_queue_size ||= (@transport_type == :udp ?
                         UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE)

  @sender = single_thread ?
    SingleThreadSender.new(
      buffer,
      logger: logger,
      flush_interval: buffer_flush_interval,
      queue_size: sender_queue_size) :
    Sender.new(
      buffer,
      logger: logger,
      flush_interval: buffer_flush_interval,
      telemetry: @telemetry,
      queue_size: sender_queue_size)
  @sender.start
end

Public Instance Methods

close() click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 113
def close
  sender.stop
  connection.close
end
flush(flush_telemetry: false, sync: false) click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 89
def flush(flush_telemetry: false, sync: false)
  do_flush_telemetry if telemetry && flush_telemetry

  sender.flush(sync: sync)
end
host() click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 95
def host
  return nil unless transport_type == :udp

  connection.host
end
port() click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 101
def port
  return nil unless transport_type == :udp

  connection.port
end
send_message(message) click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 79
def send_message(message)
  sender.add(message)

  tick_telemetry
end
socket_path() click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 107
def socket_path
  return nil unless transport_type == :uds

  connection.socket_path
end
sync_with_outbound_io() click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 85
def sync_with_outbound_io
  sender.rendez_vous
end

Private Instance Methods

do_flush_telemetry() click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 122
def do_flush_telemetry
  telemetry_snapshot = telemetry.flush
  telemetry.reset

  telemetry_snapshot.each do |message|
    sender.add(message)
  end
end
tick_telemetry() click to toggle source
# File lib/datadog/statsd/forwarder.rb, line 131
def tick_telemetry
  return nil unless telemetry

  do_flush_telemetry if telemetry.should_flush?
end