class LogCourier::ClientTcp

TLS transport implementation

Public Class Methods

new(options = {}) click to toggle source
# File lib/log-courier/client_tcp.rb, line 24
def initialize(options = {})
  @options = {
    logger: nil,
    transport: 'tls',
    ssl_ca: nil,
    ssl_certificate: nil,
    ssl_key: nil,
    ssl_key_passphrase: nil,
    min_tls_version: 1.2,
    disable_handshake: false,
  }.merge!(options)

  @logger = @options[:logger]

  raise "output/courier: 'port' is required" if @options[:port].nil?

  return unless @options[:transport] == 'tls'

  raise "output/courier: 'ssl_ca' is required if 'transport' is 'tls'" if @options[:ssl_ca].nil?

  c = 0
  [:ssl_certificate, :ssl_key].each do
    c += 1
  end
  raise 'output/courier: \'ssl_certificate\' and \'ssl_key\' must be specified together' if c == 1
end

Public Instance Methods

connect(io_control) click to toggle source
# File lib/log-courier/client_tcp.rb, line 51
def connect(io_control)
  loop do
    begin
      if tls_connect
        return unless handshake(io_control)

        break
      end
    rescue ShutdownSignal
      return
    end

    # TODO: Make this configurable
    sleep 5
  end

  @send_q = SizedQueue.new 1
  @send_paused = false

  @send_thread = Thread.new do
    run_send io_control
  rescue ShutdownSignal
    # Shutdown
  rescue StandardError => e
    @logger&.warn e, hint: 'Unknown write error'
    io_control << ['F']
  end
  @recv_thread = Thread.new do
    run_recv io_control
  rescue ShutdownSignal
    # Shutdown
  rescue StandardError => e
    @logger&.warn e, hint: 'Unknown read error'
    io_control << ['F']
  end
  nil
end
disconnect() click to toggle source
# File lib/log-courier/client_tcp.rb, line 89
def disconnect
  @send_thread&.raise ShutdownSignal
  @send_thread&.join
  @recv_thread&.raise ShutdownSignal
  @recv_thread&.join
  nil
end
pause_send() click to toggle source
# File lib/log-courier/client_tcp.rb, line 103
def pause_send
  return if @send_paused

  @send_paused = true
  @send_q << nil
  nil
end
resume_send() click to toggle source
# File lib/log-courier/client_tcp.rb, line 115
def resume_send
  if @send_paused
    @send_paused = false
    @send_q << nil
  end
  nil
end
send(signature, message) click to toggle source
# File lib/log-courier/client_tcp.rb, line 97
def send(signature, message)
  # Add to send queue
  @send_q << ([signature, message.length].pack('A4N') + message)
  nil
end
send_paused?() click to toggle source
# File lib/log-courier/client_tcp.rb, line 111
def send_paused?
  @send_paused
end

Private Instance Methods

handshake(io_control) click to toggle source
# File lib/log-courier/client_tcp.rb, line 125
def handshake(io_control)
  return true if @options[:disable_handshake]

  @socket.write ['HELO', 20, 0, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION, 'RYLC'].pack('A4NNNNNA4')

  signature, data = receive
  if signature != 'VERS'
    raise "Unexpected message during handshake: #{signature}" if signature != '????'

    @vers = Protocol.parse_helo_vers('')
    @logger&.info 'Remote does not support protocol handshake', server_version: @vers[:client_version]
    return true
  end

  @vers = Protocol.parse_helo_vers(data)
  @logger&.info 'Remote identified', server_version: @vers[:client_version]

  true
rescue StandardError => e
  @logger&.warn e, hint: 'Unknown write error'
  io_control << ['F']
  false
end
receive() click to toggle source
# File lib/log-courier/client_tcp.rb, line 203
def receive
  # Grab a header
  header = @socket.read(8)
  raise EOFError if header.nil?

  # Decode signature and length
  signature, length = header.unpack('A4N')

  if length > 1_048_576
    # Too big raise error
    raise IOError, 'Invalid message: data too big'
  end

  # Read remainder
  message = @socket.read(length)

  [signature, message]
end
run_recv(io_control) click to toggle source
# File lib/log-courier/client_tcp.rb, line 185
def run_recv(io_control)
  loop do
    signature, message = receive

    # Pass through to receive
    io_control << ['R', signature, message]
  end
rescue OpenSSL::SSL::SSLError => e
  @logger&.warn 'SSL read error', error: e.message
  io_control << ['F']
rescue EOFError
  @logger&.warn 'Connection closed by server'
  io_control << ['F']
rescue IOError, SystemCallError => e
  @logger&.warn 'Read error', error: e.message
  io_control << ['F']
end
run_send(io_control) click to toggle source
# File lib/log-courier/client_tcp.rb, line 149
def run_send(io_control)
  # Ask for something to send
  io_control << ['S']

  # If paused, we still accept message to send, but we don't release "S" to ask for more
  # As soon as we resume we then release "S" to ask for more
  paused = false

  loop do
    # Wait for data and send when we get it
    message = @send_q.pop

    # A nil is a pause/resume
    if message.nil?
      if paused
        paused = false
        io_control << ['S']
      else
        paused = true
        next
      end
    else
      # Ask for more to send while we send this one
      io_control << ['S'] unless paused

      @socket.write message
    end
  end
rescue OpenSSL::SSL::SSLError => e
  @logger&.warn 'SSL write error', error: e.message
  io_control << ['F']
rescue IOError, SystemCallError => e
  @logger&.warn 'Write error', error: e.message
  io_control << ['F']
end
tls_connect() click to toggle source
# File lib/log-courier/client_tcp.rb, line 222
def tls_connect
  # TODO: Implement random selection - and don't use separate :port - remember to update post_connection_check too
  address = @options[:addresses][0]
  port = @options[:port]

  @logger&.info 'Connecting', address: address, port: port

  begin
    tcp_socket = TCPSocket.new(address, port)

    if @options[:transport] == 'tls'
      ssl = OpenSSL::SSL::SSLContext.new

      # Disable SSLv2 and SSLv3
      # Call set_params first to ensure options attribute is there (hmmmm?)
      ssl.set_params
      # Modify the default options to ensure SSLv2 and SSLv3 is disabled
      # This retains any beneficial options set by default in the current Ruby implementation
      # TODO: https://github.com/jruby/jruby-openssl/pull/215 is fixed in JRuby 9.3.0.0
      #       As of 7.15 Logstash, JRuby version is still 9.2
      #       Once 9.3 is in use we can switch to using min_version and max_version
      ssl.options |= OpenSSL::SSL::OP_NO_SSLv2
      ssl.options |= OpenSSL::SSL::OP_NO_SSLv3
      ssl.options |= OpenSSL::SSL::OP_NO_TLSv1 if @options[:min_tls_version] > 1
      ssl.options |= OpenSSL::SSL::OP_NO_TLSv1_1 if @options[:min_tls_version] > 1.1
      ssl.options |= OpenSSL::SSL::OP_NO_TLSv1_2 if @options[:min_tls_version] > 1.2
      raise 'Invalid min_tls_version - max is 1.3' if @options[:min_tls_version] > 1.3

      # Set the certificate file
      unless @options[:ssl_certificate].nil?
        ssl.cert = OpenSSL::X509::Certificate.new(File.read(@options[:ssl_certificate]))
        ssl.key = OpenSSL::PKey::RSA.new(File.read(@options[:ssl_key]), @options[:ssl_key_passphrase])
      end

      cert_store = OpenSSL::X509::Store.new
      cert_store.add_file(@options[:ssl_ca])
      ssl.cert_store = cert_store
      ssl.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT

      @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl)
      @socket.connect

      # Verify certificate
      @socket.post_connection_check(address)

      @logger&.info 'Connected successfully', address: address, port: port, ssl_version: @socket.ssl_version
    else
      @socket = tcp_socket

      @logger&.info 'Connected successfully', address: address, port: port
    end

    return true
  rescue OpenSSL::SSL::SSLError, IOError, SystemCallError => e
    @logger&.warn 'Connection failed', error: e.message, address: address, port: port
  rescue StandardError => e
    @logger&.warn 'Unknown connection failure', hint: e.message, address: address, port: port
  end

  false
end