class LogCourier::ConnectionTcp
Representation of a single connected client
Attributes
peer[RW]
Public Class Methods
new(logger, sfd, peer, options)
click to toggle source
# File lib/log-courier/server_tcp.rb, line 228 def initialize(logger, sfd, peer, options) @logger = logger @fd = sfd @peer = peer @peer_fields = {} @in_progress = false @options = options @client = 'Unknown' @major_version = 0 @minor_version = 0 @patch_version = 0 @version = '0.0.0' @client_version = 'Unknown' return unless @options[:add_peer_fields] @peer_fields['peer'] = peer return unless @options[:transport] == 'tls' && !@fd.peer_cert.nil? @peer_fields['peer_ssl_cn'] = get_cn(@fd.peer_cert) end
Public Instance Methods
add_fields(event)
click to toggle source
# File lib/log-courier/server_tcp.rb, line 250 def add_fields(event) event.merge! @peer_fields unless @peer_fields.empty? end
receive()
click to toggle source
# File lib/log-courier/server_tcp.rb, line 302 def receive # Read message # Each message begins with a header # 4 byte signature # 4 byte length # Normally we would not parse this inside transport, but for TLS we have to in order to locate frame boundaries signature, length = recv(8).unpack('A4N') # Sanity raise ProtocolError, "packet too large (#{length} > #{@options[:max_packet_size]})" if length > @options[:max_packet_size] # While we're processing, EOF is bad as it may occur during send @in_progress = true # Read the message data = if length.zero? '' else recv(length) end # If we EOF next it's a graceful close @in_progress = false [signature, data] end
run() { |signature, data, self| ... }
click to toggle source
# File lib/log-courier/server_tcp.rb, line 254 def run(&block) handshake(&block) loop do signature, data = receive # Send for processing yield signature, data, self end rescue TimeoutError # Timeout of the connection, we were idle too long without a ping/pong @logger&.warn 'Connection timed out', peer: @peer nil rescue EOFError if @in_progress @logger&.warn 'Unexpected EOF', peer: @peer else @logger&.info 'Connection closed', peer: @peer end nil rescue OpenSSL::SSL::SSLError => e # Read errors, only action is to shutdown which we'll do in ensure @logger&.warn 'SSL error, connection aborted', error: e.message, peer: @peer nil rescue IOError, SystemCallError => e # Read errors, only action is to shutdown which we'll do in ensure @logger&.warn 'Connection aborted', error: e.message, peer: @peer nil rescue ProtocolError => e # Connection abort request due to a protocol error @logger&.warn 'Protocol error, connection aborted', error: e.message, peer: @peer nil rescue ShutdownSignal # Shutting down @logger&.info 'Server shutting down, closing connection', peer: @peer nil rescue StandardError => e # Some other unknown problem @logger&.warn e.message, hint: 'Unknown error, connection aborted', peer: @peer nil ensure begin @fd.close rescue OpenSSL::SSL::SSLError, IOError # Ignore during close end end
send(signature, message)
click to toggle source
# File lib/log-courier/server_tcp.rb, line 329 def send(signature, message) reset_timeout data = signature + [message.length].pack('N') + message done = 0 loop do begin written = @fd.write_nonblock(data[done...data.length]) rescue IO::WaitReadable raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? retry rescue IO::WaitWritable raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? retry end raise ProtocolError, "write failure (#{done}/#{data.length})" if written.zero? done += written break if done >= data.length end nil end
Private Instance Methods
get_cn(cert)
click to toggle source
# File lib/log-courier/server_tcp.rb, line 383 def get_cn(cert) cert.subject.to_a.find do |oid, value| return value if oid == 'CN' end nil end
handshake() { |signature, data, self| ... }
click to toggle source
# File lib/log-courier/server_tcp.rb, line 355 def handshake return if @options[:disable_handshake] signature, data = receive if signature == 'JDAT' @helo = Protocol.parse_helo_vers('') @logger&.info 'Remote does not support protocol handshake', peer: @peer yield signature, data, self return elsif signature != 'HELO' raise ProtocolError, "unexpected #{signature} message" end @helo = Protocol.parse_helo_vers(data) @logger&.info 'Remote identified', peer: @peer, client_version: @helo[:client_version] # Flags 4 bytes - EVNT flag = 0 # (Significant rewrite would be required to support streaming messages as currently we read # first and then yield for processing. To support EVNT we have to move protocol parsing to # the connection layer here so we can keep reading until we reach the end of the stream) # Major Version 4 bytes # Minor Version 4 bytes # Patch Version 4 bytes # Client String 4 bytes data = [0, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION, 'RYLC'].pack('NNNNA4') send 'VERS', data end
recv(need)
click to toggle source
# File lib/log-courier/server_tcp.rb, line 390 def recv(need) reset_timeout have = '' loop do begin buffer = @fd.read_nonblock need - have.length rescue IO::WaitReadable raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? retry rescue IO::WaitWritable raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? retry end raise EOFError if buffer.nil? raise ProtocolError, "read failure (#{have.length}/#{need})" if buffer.length.zero? if have.length.zero? have = buffer else have << buffer end break if have.length >= need end have end
reset_timeout()
click to toggle source
# File lib/log-courier/server_tcp.rb, line 418 def reset_timeout # TODO: Make configurable @timeout = Time.now.to_i + 1_800 nil end