class LogCourier::ServerTcp
TLS transport implementation for server
Attributes
port[R]
Public Class Methods
new(options = {})
click to toggle source
Create a new TLS transport endpoint
# File lib/log-courier/server_tcp.rb, line 56 def initialize(options = {}) @options = { logger: nil, transport: 'tls', port: 0, address: '0.0.0.0', ssl_certificate: nil, ssl_key: nil, ssl_key_passphrase: nil, ssl_verify: false, ssl_verify_default_ca: false, ssl_verify_ca: nil, max_packet_size: 10_485_760, add_peer_fields: false, min_tls_version: 1.2, disable_handshake: false, }.merge!(options) @logger = @options[:logger] if @options[:transport] == 'tls' [:ssl_certificate, :ssl_key].each do |k| raise "input/courier: '#{k}' is required" if @options[k].nil? end if @options[:ssl_verify] && (!@options[:ssl_verify_default_ca] && @options[:ssl_verify_ca].nil?) raise 'input/courier: Either \'ssl_verify_default_ca\' or \'ssl_verify_ca\' must be specified when ssl_verify is true' end end begin @tcp_server = ExtendedTCPServer.new(@options[:address], @options[:port]) # Query the port in case the port number is '0' # TCPServer#addr == [ address_family, port, address, address ] @port = @tcp_server.addr[1] if @options[:transport] == 'tls' ssl = OpenSSL::SSL::SSLContext.new # Disable SSLv2 and SSLv3 # Call set_params first to ensure options attribute is there (hmmmm?) ssl.set_params # Modify the default options to ensure SSLv2 and SSLv3 is disabled # This retains any beneficial options set by default in the current Ruby implementation # TODO: https://github.com/jruby/jruby-openssl/pull/215 is fixed in JRuby 9.3.0.0 # As of 7.15 Logstash, JRuby version is still 9.2 # Once 9.3 is in use we can switch to using min_version and max_version ssl.options |= OpenSSL::SSL::OP_NO_SSLv2 ssl.options |= OpenSSL::SSL::OP_NO_SSLv3 ssl.options |= OpenSSL::SSL::OP_NO_TLSv1 if @options[:min_tls_version] > 1 ssl.options |= OpenSSL::SSL::OP_NO_TLSv1_1 if @options[:min_tls_version] > 1.1 ssl.options |= OpenSSL::SSL::OP_NO_TLSv1_2 if @options[:min_tls_version] > 1.2 raise 'Invalid min_tls_version - max is 1.3' if @options[:min_tls_version] > 1.3 # Set the certificate file ssl.cert = OpenSSL::X509::Certificate.new(File.read(@options[:ssl_certificate])) ssl.key = OpenSSL::PKey::RSA.new(File.read(@options[:ssl_key]), @options[:ssl_key_passphrase]) if @options[:ssl_verify] cert_store = OpenSSL::X509::Store.new # Load the system default certificate path to the store cert_store.set_default_paths if @options[:ssl_verify_default_ca] if File.directory?(@options[:ssl_verify_ca]) cert_store.add_path(@options[:ssl_verify_ca]) else cert_store.add_file(@options[:ssl_verify_ca]) end ssl.cert_store = cert_store ssl.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end # Create the OpenSSL server - set start_immediately to false so we can multithread handshake @server = OpenSSL::SSL::SSLServer.new(@tcp_server, ssl) @server.start_immediately = false else @server = @tcp_server end @logger&.warn 'Ephemeral port allocated', transport: @options[:transport], port: @port if @options[:port].zero? rescue StandardError => e raise "input/courier: Failed to initialise: #{e}" end end
Public Instance Methods
run(&block)
click to toggle source
# File lib/log-courier/server_tcp.rb, line 145 def run(&block) client_threads = {} loop do # Because start_immediately is false, TCP accept is single thread but # handshake is essentiall multithreaded as we defer it to the thread @tcp_server.reset_peer client = nil begin client = @server.accept rescue OpenSSL::SSL::SSLError, IOError => e # Accept failure or other issue @logger&.warn 'Connection failed to accept', error: e.message, peer: @tcp_server.peer begin client&.close rescue OpenSSL::SSL::SSLError, IOError # Ignore IO error during close end next end @logger&.info 'New connection', peer: @tcp_server.peer # Clear up finished threads client_threads.delete_if do |_, thr| !thr.alive? end # Start a new connection thread client_threads[client] = Thread.new(client, @tcp_server.peer) do |client_copy, peer_copy| run_thread client_copy, peer_copy, &block end end nil rescue ShutdownSignal nil rescue StandardError => e # Some other unknown problem @logger&.warn e.message, hint: 'Unknown error, shutting down' nil ensure # Raise shutdown in all client threads and join then client_threads.each do |_, thr| thr.raise ShutdownSignal end client_threads.each(&:join) @tcp_server.close end
Private Instance Methods
run_thread(client, peer, &block)
click to toggle source
# File lib/log-courier/server_tcp.rb, line 198 def run_thread(client, peer, &block) # Perform the handshake inside the new thread so we don't block TCP accept if @options[:transport] == 'tls' begin client.accept rescue OpenSSL::SSL::SSLError, IOError => e # Handshake failure or other issue @logger&.warn 'Connection failed to initialise', error: e.message, peer: peer begin client.close rescue OpenSSL::SSL::SSLError, IOError # Ignore during close end return end @logger&.info 'Connection setup successfully', peer: peer, ssl_version: client.ssl_version end ConnectionTcp.new(@logger, client, peer, @options).run(&block) rescue ShutdownSignal # Shutting down @logger&.info 'Server shutting down, connection closed', peer: peer end