class LogCourier::ConnectionTcp

Representation of a single connected client

Attributes

peer[RW]

Public Class Methods

new(logger, sfd, peer, options) click to toggle source
# File lib/log-courier/server_tcp.rb, line 228
def initialize(logger, sfd, peer, options)
  @logger = logger
  @fd = sfd
  @peer = peer
  @peer_fields = {}
  @in_progress = false
  @options = options
  @client = 'Unknown'
  @major_version = 0
  @minor_version = 0
  @patch_version = 0
  @version = '0.0.0'
  @client_version = 'Unknown'

  return unless @options[:add_peer_fields]

  @peer_fields['peer'] = peer
  return unless @options[:transport] == 'tls' && !@fd.peer_cert.nil?

  @peer_fields['peer_ssl_cn'] = get_cn(@fd.peer_cert)
end

Public Instance Methods

add_fields(event) click to toggle source
# File lib/log-courier/server_tcp.rb, line 250
def add_fields(event)
  event.merge! @peer_fields unless @peer_fields.empty?
end
receive() click to toggle source
# File lib/log-courier/server_tcp.rb, line 302
def receive
  # Read message
  # Each message begins with a header
  # 4 byte signature
  # 4 byte length
  # Normally we would not parse this inside transport, but for TLS we have to in order to locate frame boundaries
  signature, length = recv(8).unpack('A4N')

  # Sanity
  raise ProtocolError, "packet too large (#{length} > #{@options[:max_packet_size]})" if length > @options[:max_packet_size]

  # While we're processing, EOF is bad as it may occur during send
  @in_progress = true

  # Read the message
  data = if length.zero?
           ''
         else
           recv(length)
         end

  # If we EOF next it's a graceful close
  @in_progress = false

  [signature, data]
end
run() { |signature, data, self| ... } click to toggle source
# File lib/log-courier/server_tcp.rb, line 254
def run(&block)
  handshake(&block)

  loop do
    signature, data = receive

    # Send for processing
    yield signature, data, self
  end
rescue TimeoutError
  # Timeout of the connection, we were idle too long without a ping/pong
  @logger&.warn 'Connection timed out', peer: @peer
  nil
rescue EOFError
  if @in_progress
    @logger&.warn 'Unexpected EOF', peer: @peer
  else
    @logger&.info 'Connection closed', peer: @peer
  end
  nil
rescue OpenSSL::SSL::SSLError => e
  # Read errors, only action is to shutdown which we'll do in ensure
  @logger&.warn 'SSL error, connection aborted', error: e.message, peer: @peer
  nil
rescue IOError, SystemCallError => e
  # Read errors, only action is to shutdown which we'll do in ensure
  @logger&.warn 'Connection aborted', error: e.message, peer: @peer
  nil
rescue ProtocolError => e
  # Connection abort request due to a protocol error
  @logger&.warn 'Protocol error, connection aborted', error: e.message, peer: @peer
  nil
rescue ShutdownSignal => e
  # Shutting down
  @logger&.info 'Server shutting down, closing connection', peer: @peer
  raise e
rescue StandardError => e
  # Some other unknown problem
  @logger&.warn e.message, hint: 'Unknown error, connection aborted', peer: @peer
  nil
ensure
  begin
    @fd.close
  rescue OpenSSL::SSL::SSLError, IOError
    # Ignore during close
  end
end
send(signature, message) click to toggle source
# File lib/log-courier/server_tcp.rb, line 329
def send(signature, message)
  reset_timeout
  data = signature + [message.length].pack('N') + message
  done = 0
  loop do
    begin
      written = @fd.write_nonblock(data[done...data.length])
    rescue IO::WaitReadable
      raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil?

      retry
    rescue IO::WaitWritable
      raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil?

      retry
    end
    raise ProtocolError, "write failure (#{done}/#{data.length})" if written.zero?

    done += written
    break if done >= data.length
  end
  nil
end

Private Instance Methods

get_cn(cert) click to toggle source
# File lib/log-courier/server_tcp.rb, line 383
def get_cn(cert)
  cert.subject.to_a.find do |oid, value|
    return value if oid == 'CN'
  end
  nil
end
handshake() { |signature, data, self| ... } click to toggle source
# File lib/log-courier/server_tcp.rb, line 355
def handshake
  return if @options[:disable_handshake]

  signature, data = receive
  if signature == 'JDAT'
    @helo = Protocol.parse_helo_vers('')
    @logger&.info 'Remote does not support protocol handshake', peer: @peer
    yield signature, data, self
    return
  elsif signature != 'HELO'
    raise ProtocolError, "unexpected #{signature} message"
  end

  @helo = Protocol.parse_helo_vers(data)
  @logger&.info 'Remote identified', peer: @peer, client_version: @helo[:client_version]

  # Flags 4 bytes - EVNT flag = 0
  # (Significant rewrite would be required to support streaming messages as currently we read
  #  first and then yield for processing. To support EVNT we have to move protocol parsing to
  #  the connection layer here so we can keep reading until we reach the end of the stream)
  # Major Version 4 bytes
  # Minor Version 4 bytes
  # Patch Version 4 bytes
  # Client String 4 bytes
  data = [0, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION, 'RYLC'].pack('NNNNA4')
  send 'VERS', data
end
recv(need) click to toggle source
# File lib/log-courier/server_tcp.rb, line 390
def recv(need)
  reset_timeout
  have = ''
  loop do
    begin
      buffer = @fd.read_nonblock need - have.length
    rescue IO::WaitReadable
      raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil?

      retry
    rescue IO::WaitWritable
      raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil?

      retry
    end
    raise EOFError if buffer.nil?
    raise ProtocolError, "read failure (#{have.length}/#{need})" if buffer.length.zero?

    if have.length.zero?
      have = buffer
    else
      have << buffer
    end
    break if have.length >= need
  end
  have
end
reset_timeout() click to toggle source
# File lib/log-courier/server_tcp.rb, line 418
def reset_timeout
  # TODO: Make configurable
  @timeout = Time.now.to_i + 1_800
  nil
end