class StatsD::Instrument::BatchedUDPSink::Dispatcher
Constants
- BUFFER_CLASS
- NEWLINE
Public Class Methods
new(host, port, flush_interval)
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 53 def initialize(host, port, flush_interval) @host = host @port = port @interrupted = false @flush_interval = flush_interval @buffer = BUFFER_CLASS.new @dispatcher_thread = Thread.new { dispatch } end
Public Instance Methods
<<(datagram)
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 62 def <<(datagram) unless @dispatcher_thread&.alive? # If the dispatcher thread is dead, we assume it is because # the process was forked. So to avoid ending datagrams twice # we clear the buffer. @buffer.clear @dispatcher_thread = Thread.new { dispatch } end @buffer << datagram self end
shutdown(wait = @flush_interval * 2)
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 74 def shutdown(wait = @flush_interval * 2) @interrupted = true if @dispatcher_thread&.alive? @dispatcher_thread.join(wait) else flush end end
Private Instance Methods
dispatch()
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 102 def dispatch until @interrupted begin start = Process.clock_gettime(Process::CLOCK_MONOTONIC) flush next_sleep_duration = @flush_interval - (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) sleep(next_sleep_duration) if next_sleep_duration > 0 rescue => error report_error(error) end end flush invalidate_socket end
flush()
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 86 def flush return if @buffer.empty? datagrams = @buffer.shift(@buffer.size) until datagrams.empty? packet = String.new(datagrams.pop, encoding: Encoding::BINARY, capacity: MAX_PACKET_SIZE) until datagrams.empty? || packet.bytesize + datagrams.first.bytesize + 1 > MAX_PACKET_SIZE packet << NEWLINE << datagrams.shift end send_packet(packet) end end
invalidate_socket()
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 151 def invalidate_socket @socket&.close ensure @socket = nil end
report_error(error)
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 119 def report_error(error) StatsD.logger.error do "[#{self.class.name}] The dispatcher thread encountered an error #{error.class}: #{error.message}" end end
send_packet(packet)
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 125 def send_packet(packet) retried = false socket.send(packet, 0) rescue SocketError, IOError, SystemCallError => error StatsD.logger.debug do "[#{self.class.name}] Resetting connection because of #{error.class}: #{error.message}" end invalidate_socket if retried StatsD.logger.warning do "[#{self.class.name}] Events were dropped because of #{error.class}: #{error.message}" end else retried = true retry end end
socket()
click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 143 def socket @socket ||= begin socket = UDPSocket.new socket.connect(@host, @port) socket end end