class LogCourier::Server

Implementation of the server

Attributes

port[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/log-courier/server.rb, line 35
def initialize(options = {})
  @options = {
    logger: nil,
    transport: 'tls',
    disable_handshake: false,
  }.merge!(options)

  @logger = @options[:logger]

  case @options[:transport]
  when 'tcp', 'tls'
    require 'log-courier/server_tcp'
    @server = ServerTcp.new(@options)
  else
    raise 'input/courier: \'transport\' must be tcp or tls'
  end

  # Grab the port back and update the logger context
  @port = @server.port

  # TODO: Make queue size configurable
  @event_queue = EventQueue.new 1

  @server_thread = Thread.new do
    # Receive messages and process them
    @server.run do |signature, message, comm|
      case signature
      when 'PING'
        process_ping message, comm
      when 'JDAT'
        process_jdat message, comm, @event_queue
      else
        if comm.peer.nil?
          @logger&.warn 'Unknown message received', from: 'unknown'
        else
          @logger&.warn 'Unknown message received', from: comm.peer
        end
        # Don't kill a client that sends a bad message
        # Just reject it and let it send it again, potentially to another server
        comm.send '????', ''
      end
    end
  end
end

Public Instance Methods

run(&block) click to toggle source
# File lib/log-courier/server.rb, line 80
def run(&block)
  loop do
    event = @event_queue.pop
    break if event.nil?

    block.call event
  end
  nil
end
stop() click to toggle source
# File lib/log-courier/server.rb, line 90
def stop
  @server_thread.raise ShutdownSignal
  @event_queue << nil
  @server_thread.join
  nil
end

Private Instance Methods

process_jdat(message, comm, event_queue) click to toggle source
# File lib/log-courier/server.rb, line 108
def process_jdat(message, comm, event_queue)
  # Now we have the data, aim to respond within 5 seconds
  ack_timeout = Time.now.to_i + 5

  # OK - first is a nonce - we send this back with sequence acks
  # This allows the client to know what is being acknowledged
  # Nonce is 16 so check we have enough
  raise ProtocolError, "JDAT message too small (#{message.bytesize})" if message.bytesize < 17

  nonce = message[0...16]

  if @logger&.debug?
    nonce_str = nonce.each_byte.map do |b|
      b.to_s(16).rjust(2, '0')
    end
  end

  # The remainder of the message is the compressed data block
  message = StringIO.new Zlib::Inflate.inflate(message.byteslice(16, message.bytesize))

  # Message now contains JSON encoded events
  # They are aligned as [length][event]... so on
  # We acknowledge them by their 1-index position in the stream
  # A 0 sequence acknowledgement means we haven't processed any yet
  sequence = 0
  length_buf = ''
  data_buf = ''
  loop do
    ret = message.read 4, length_buf
    # Finished?
    break if ret.nil?
    raise ProtocolError, "JDAT length extraction failed (#{ret} #{length_buf.bytesize})" if length_buf.bytesize < 4

    length = length_buf.unpack1('N')

    # Extract message
    ret = message.read length, data_buf
    if ret.nil? || data_buf.bytesize < length
      @logger&.warn()
      raise ProtocolError, "JDAT message extraction failed #{ret} #{data_buf.bytesize}"
    end

    data_buf.force_encoding('utf-8')

    # Ensure valid encoding
    invalid_encodings = 0
    unless data_buf.valid_encoding?
      data_buf.chars.map do |c|
        if c.valid_encoding?
          c
        else
          invalid_encodings += 1
          "\xEF\xBF\xBD"
        end
      end
    end

    # Decode the JSON
    begin
      event = MultiJson.load(data_buf)
    rescue MultiJson::ParseError => e
      @logger&.warn e, invalid_encodings: invalid_encodings, hint: 'JSON parse failure, falling back to plain-text'
      event = { 'message' => data_buf }
    end

    # Add peer fields?
    comm.add_fields event

    # Queue the event
    begin
      event_queue.push event, [0, ack_timeout - Time.now.to_i].max
    rescue TimeoutError
      # Full pipeline, partial ack
      # NOTE: comm.send can raise a Timeout::Error of its own
      @logger&.debug 'Partially acknowledging message', nonce: nonce_str.join, sequence: sequence if @logger&.debug?
      comm.send 'ACKN', [nonce, sequence].pack('a*N')
      ack_timeout = Time.now.to_i + 5
      retry
    end

    sequence += 1
  end

  # Acknowledge the full message
  # NOTE: comm.send can raise a Timeout::Error
  @logger&.debug 'Acknowledging message', nonce: nonce_str.join, sequence: sequence if @logger&.debug?
  comm.send 'ACKN', [nonce, sequence].pack('A*N')
end
process_ping(message, comm) click to toggle source
# File lib/log-courier/server.rb, line 99
def process_ping(message, comm)
  # Size of message should be 0
  raise ProtocolError, "unexpected data attached to ping message (#{message.bytesize})" unless message.bytesize.zero?

  # PONG!
  # NOTE: comm.send can raise a Timeout::Error of its own
  comm.send 'PONG', ''
end