class LogCourier::ServerTcp

TLS transport implementation for server

Attributes

port[R]

Public Class Methods

new(options = {}) click to toggle source

Create a new TLS transport endpoint

# File lib/log-courier/server_tcp.rb, line 56
def initialize(options = {})
  @options = {
    logger: nil,
    transport: 'tls',
    port: 0,
    address: '0.0.0.0',
    ssl_certificate: nil,
    ssl_key: nil,
    ssl_key_passphrase: nil,
    ssl_verify: false,
    ssl_verify_default_ca: false,
    ssl_verify_ca: nil,
    max_packet_size: 10_485_760,
    add_peer_fields: false,
    min_tls_version: 1.2,
    disable_handshake: false,
  }.merge!(options)

  @logger = @options[:logger]

  if @options[:transport] == 'tls'
    [:ssl_certificate, :ssl_key].each do |k|
      raise "input/courier: '#{k}' is required" if @options[k].nil?
    end

    if @options[:ssl_verify] && (!@options[:ssl_verify_default_ca] && @options[:ssl_verify_ca].nil?)
      raise 'input/courier: Either \'ssl_verify_default_ca\' or \'ssl_verify_ca\' must be specified when ssl_verify is true'
    end
  end

  begin
    @tcp_server = ExtendedTCPServer.new(@options[:address], @options[:port])

    # Query the port in case the port number is '0'
    # TCPServer#addr == [ address_family, port, address, address ]
    @port = @tcp_server.addr[1]

    if @options[:transport] == 'tls'
      ssl = OpenSSL::SSL::SSLContext.new

      # Disable SSLv2 and SSLv3
      # Call set_params first to ensure options attribute is there (hmmmm?)
      ssl.set_params
      # Modify the default options to ensure SSLv2 and SSLv3 is disabled
      # This retains any beneficial options set by default in the current Ruby implementation
      # TODO: https://github.com/jruby/jruby-openssl/pull/215 is fixed in JRuby 9.3.0.0
      #       As of 7.15 Logstash, JRuby version is still 9.2
      #       Once 9.3 is in use we can switch to using min_version and max_version
      ssl.options |= OpenSSL::SSL::OP_NO_SSLv2
      ssl.options |= OpenSSL::SSL::OP_NO_SSLv3
      ssl.options |= OpenSSL::SSL::OP_NO_TLSv1 if @options[:min_tls_version] > 1
      ssl.options |= OpenSSL::SSL::OP_NO_TLSv1_1 if @options[:min_tls_version] > 1.1
      ssl.options |= OpenSSL::SSL::OP_NO_TLSv1_2 if @options[:min_tls_version] > 1.2
      raise 'Invalid min_tls_version - max is 1.3' if @options[:min_tls_version] > 1.3

      # Set the certificate file
      ssl.cert = OpenSSL::X509::Certificate.new(File.read(@options[:ssl_certificate]))
      ssl.key = OpenSSL::PKey::RSA.new(File.read(@options[:ssl_key]), @options[:ssl_key_passphrase])

      if @options[:ssl_verify]
        cert_store = OpenSSL::X509::Store.new

        # Load the system default certificate path to the store
        cert_store.set_default_paths if @options[:ssl_verify_default_ca]

        if File.directory?(@options[:ssl_verify_ca])
          cert_store.add_path(@options[:ssl_verify_ca])
        else
          cert_store.add_file(@options[:ssl_verify_ca])
        end

        ssl.cert_store = cert_store

        ssl.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
      end

      # Create the OpenSSL server - set start_immediately to false so we can multithread handshake
      @server = OpenSSL::SSL::SSLServer.new(@tcp_server, ssl)
      @server.start_immediately = false
    else
      @server = @tcp_server
    end

    @logger&.warn 'Ephemeral port allocated', transport: @options[:transport], port: @port if @options[:port].zero?
  rescue StandardError => e
    raise "input/courier: Failed to initialise: #{e}"
  end
end

Public Instance Methods

run(&block) click to toggle source
# File lib/log-courier/server_tcp.rb, line 145
def run(&block)
  client_threads = {}

  loop do
    # Because start_immediately is false, TCP accept is single thread but
    # handshake is essentiall multithreaded as we defer it to the thread
    @tcp_server.reset_peer
    client = nil
    begin
      client = @server.accept
    rescue OpenSSL::SSL::SSLError, IOError => e
      # Accept failure or other issue
      @logger&.warn 'Connection failed to accept', error: e.message, peer: @tcp_server.peer
      begin
        client&.close
      rescue OpenSSL::SSL::SSLError, IOError
        # Ignore IO error during close
      end
      next
    end

    @logger&.info 'New connection', peer: @tcp_server.peer

    # Clear up finished threads
    client_threads.delete_if do |_, thr|
      !thr.alive?
    end

    # Start a new connection thread
    client_threads[client] = Thread.new(client, @tcp_server.peer) do |client_copy, peer_copy|
      run_thread client_copy, peer_copy, &block
    end
  end
  nil
rescue ShutdownSignal
  nil
rescue StandardError => e
  # Some other unknown problem
  @logger&.warn e.message, hint: 'Unknown error, shutting down'
  nil
ensure
  # Raise shutdown in all client threads and join then
  client_threads.each do |_, thr|
    thr.raise ShutdownSignal
  end

  client_threads.each(&:join)

  @tcp_server.close
end

Private Instance Methods

run_thread(client, peer, &block) click to toggle source
# File lib/log-courier/server_tcp.rb, line 198
def run_thread(client, peer, &block)
  # Perform the handshake inside the new thread so we don't block TCP accept
  if @options[:transport] == 'tls'
    begin
      client.accept
    rescue OpenSSL::SSL::SSLError, IOError => e
      # Handshake failure or other issue
      @logger&.warn 'Connection failed to initialise', error: e.message, peer: peer
      begin
        client.close
      rescue OpenSSL::SSL::SSLError, IOError
        # Ignore during close
      end
      return
    end

    @logger&.info 'Connection setup successfully', peer: peer, ssl_version: client.ssl_version
  end

  ConnectionTcp.new(@logger, client, peer, @options).run(&block)
rescue ShutdownSignal
  # Shutting down
  @logger&.info 'Server shutting down, connection closed', peer: peer
end