class LogCourier::Client

Implementation of a single client connection

Public Class Methods

new(options = {}) click to toggle source
# File lib/log-courier/client.rb, line 81
def initialize(options = {})
  @options = {
    logger: nil,
    transport: 'tls',
    spool_size: 1024,
    idle_timeout: 5,
    port: nil,
    addresses: [],
    min_tls_version: 1.2,
    disable_handshake: false,
  }.merge!(options)

  @logger = @options[:logger]

  case @options[:transport]
  when 'tcp', 'tls'
    require 'log-courier/client_tcp'
    @client = ClientTcp.new(@options)
  else
    raise 'output/courier: \'transport\' must be tcp or tls'
  end

  raise 'output/courier: \'addresses\' must contain at least one address' if @options[:addresses].empty?
  raise 'output/courier: \'addresses\' only supports a single address at this time' if @options[:addresses].length > 1

  @event_queue = EventQueue.new @options[:spool_size]
  @pending_payloads = {}
  @first_payload = nil
  @last_payload = nil

  # Start the spooler which will collect events into chunks
  @send_ready = false
  @send_mutex = Mutex.new
  @send_cond = ConditionVariable.new
  @spooler_thread = Thread.new do
    run_spooler
  end

  # TODO: Make these configurable?
  @keepalive_timeout = 1800
  @network_timeout = 30

  # TODO: Make pending payload max configurable?
  @max_pending_payloads = 100

  @retry_payload = nil
  @received_payloads = Queue.new

  @pending_ping = false

  # Start the IO thread
  @io_control = EventQueue.new 1
  @io_thread = Thread.new do
    run_io
  end
end

Public Instance Methods

publish(event) click to toggle source
# File lib/log-courier/client.rb, line 138
def publish(event)
  # Pass the event into the spooler
  @event_queue << event
  nil
end
shutdown(force = false) click to toggle source
# File lib/log-courier/client.rb, line 144
def shutdown(force = false) # rubocop:disable Style/OptionalBooleanParameter
  if force
    # Raise a shutdown signal in the spooler and wait for it
    @spooler_thread.raise ShutdownSignal
    @spooler_thread.join
    @io_thread.raise ShutdownSignal
  else
    @event_queue.push nil
    @spooler_thread.join
    @io_control << ['!', nil]
  end
  @io_thread.join
  @pending_payloads.length.zero?
end

Private Instance Methods

generate_nonce() click to toggle source
# File lib/log-courier/client.rb, line 364
def generate_nonce
  (0...16).map { rand(256).chr }.join
end
process_ackn(message) click to toggle source
# File lib/log-courier/client.rb, line 403
def process_ackn(message)
  # Sanity
  raise ProtocolError, "ACKN message size invalid (#{message.bytesize})" if message.bytesize != 20

  # Grab nonce
  nonce, sequence = message.unpack('a16N')

  if @logger&.debug?
    nonce_str = nonce.each_byte.map do |b|
      b.to_s(16).rjust(2, '0')
    end

    @logger&.debug 'ACKN message received', nonce: nonce_str.join, sequence: sequence
  end

  # Find the payload - skip if we couldn't as it will just a duplicated ACK
  return unless @pending_payloads.key?(nonce)

  payload = @pending_payloads[nonce]
  _, complete = payload.ack(sequence)

  return unless complete

  @pending_payloads.delete nonce
  @first_payload = payload.next
  @client.resume_send if @client.send_paused?
end
process_pong(message) click to toggle source
# File lib/log-courier/client.rb, line 393
def process_pong(message)
  # Sanity
  raise ProtocolError, "Unexpected data attached to pong message (#{message.bytesize})" if message.bytesize != 0

  @logger&.debug 'PONG message received' || !@logger&.debug?

  # No longer pending a PONG
  @ping_pending = false
end
run_io() click to toggle source
# File lib/log-courier/client.rb, line 200
def run_io
  loop do
    # Reconnect loop
    @client.connect @io_control

    @timeout = Time.now.to_i + @keepalive_timeout

    run_io_loop

    # Disconnect and retry payloads
    @client.disconnect
    @retry_payload = @first_payload

    # TODO: Make reconnect time configurable?
    sleep 5
  end

  @client.disconnect
rescue ShutdownSignal
  # Ensure disconnected
  @client.disconnect
end
run_io_loop() click to toggle source
# File lib/log-courier/client.rb, line 223
def run_io_loop
  io_stop = false
  can_send = false

  # IO loop
  loop do
    action = @io_control.pop @timeout - Time.now.to_i

    # Process the action
    case action[0]
    when 'S'
      # If we're flushing through the pending, pick from there
      unless @retry_payload.nil?
        @logger&.debug 'Send is ready, retrying previous payload'

        # Regenerate data if we need to
        @retry_payload.generate if @retry_payload.payload.nil?

        # Send and move onto next
        @client.send 'JDAT', @retry_payload.payload

        @retry_payload = @retry_payload.next

        # If first send, exit idle mode
        @timeout = Time.now.to_i + @network_timeout if @retry_payload == @first_payload

        next
      end

      # Ready to send, allow spooler to pass us something if we don't
      # have something already
      if @received_payloads.length.zero?
        @logger&.debug 'Send is ready, requesting events'

        can_send = true

        @send_mutex.synchronize do
          @send_ready = true
          @send_cond.signal
        end
      else
        @logger&.debug 'Send is ready, using events from backlog'
        send_payload @received_payloads.pop
      end
    when 'E'
      # Were we expecting a payload? Store it if not
      if can_send
        @logger&.debug 'Sending events', events: action[1].length
        send_payload action[1]
        can_send = false
      else
        @logger&.debug 'Events received when not ready; saved to backlog'
        @received_payloads.push action[1]
      end
    when 'R'
      # Received a message
      signature, message = action[1..2]
      case signature
      when 'PONG'
        process_pong message
      when 'ACKN'
        process_ackn message
      end
      # else
      # Unknown message - only listener is allowed to respond with a "????" message
      # TODO: What should we do? Just ignore for now and let timeouts conquer

      # Any pending payloads left?
      if @pending_payloads.length.zero?
        # Handle shutdown
        raise ShutdownSignal if io_stop

        # Enter idle mode
        @timeout = Time.now.to_i + @keepalive_timeout
      else
        # Set network timeout
        @timeout = Time.now.to_i + @network_timeout
      end
    when 'F'
      # Reconnect, an error occurred
      break
    when '!'
      @logger&.debug 'Shutdown request received'

      # Shutdown request received
      raise ShutdownSignal if @pending_payloads.length.zero?

      @logger&.debug 'Delaying shutdown due to pending payloads', payloads: @pending_payloads.length

      io_stop = true

      # Stop spooler sending
      can_send = false
      @send_mutex.synchronize do
        @send_ready = false
      end
    end
  rescue TimeoutError
    # Handle network timeout
    raise TimeoutError if @pending_payloads != 0

    # Keepalive timeout hit, timeout if we were waiting for a pong
    raise TimeoutError if @pending_ping

    # Stop spooler sending
    can_send = false
    @send_mutex.synchronize do
      @send_ready = false
    end

    # Send PING
    send_ping

    @timeout = Time.now.to_i + @network_timeout

    next
  end
rescue ProtocolError => e
  # Reconnect required due to a protocol error
  @logger&.warn 'Protocol error', error: e.message
rescue TimeoutError
  # Reconnect due to timeout
  @logger&.warn 'Timeout occurred'
rescue ShutdownSignal
  raise
rescue StandardError => e
  # Unknown error occurred
  @logger&.warn e, hint: 'Unknown error'
end
run_spooler() click to toggle source
# File lib/log-courier/client.rb, line 161
def run_spooler
  loop do
    spooled = []
    next_flush = Time.now.to_i + @options[:idle_timeout]

    # The spooler loop
    begin
      loop do
        event = @event_queue.pop next_flush - Time.now.to_i

        raise ShutdownSignal if event.nil?

        spooled.push(event)

        break if spooled.length >= @options[:spool_size]
      end
    rescue TimeoutError
      # Hit timeout but no events, keep waiting
      next if spooled.length.zero?
    end

    if spooled.length >= @options[:spool_size]
      @logger&.debug 'Flushing full spool', events: spooled.length
    else
      @logger&.debug 'Flushing spool due to timeout', events: spooled.length
    end

    # Pass through to io_control but only if we're ready to send
    @send_mutex.synchronize do
      @send_cond.wait(@send_mutex) until @send_ready
      @send_ready = false
    end

    @io_control << ['E', spooled]
  end
rescue ShutdownSignal
  # Shutdown
end
send_jdat(events) click to toggle source
# File lib/log-courier/client.rb, line 373
def send_jdat(events)
  # Generate the JSON payload and compress it
  nonce = generate_nonce

  # Save the pending payload
  payload = PendingPayload.new(events, nonce)

  @pending_payloads[nonce] = payload

  if @first_payload.nil?
    @first_payload = payload
  else
    @last_payload.next = payload
  end
  @last_payload = payload

  # Send it
  @client.send 'JDAT', payload.payload
end
send_payload(payload) click to toggle source
# File lib/log-courier/client.rb, line 353
def send_payload(payload)
  # If we have too many pending payloads, pause the IO
  @client.pause_send if @pending_payloads.length + 1 >= @max_pending_payloads

  # Received some events - send them
  send_jdat payload

  # Leave idle mode if this is the first payload after idle
  @timeout = Time.now.to_i + @network_timeout if @pending_payloads.length == 1
end
send_ping() click to toggle source
# File lib/log-courier/client.rb, line 368
def send_ping
  # Send it
  @client.send 'PING', ''
end