class Logtail::LogDevices::HTTP

A highly efficient log device that buffers and delivers log messages over HTTPS to the Logtail API. It uses batches, keep-alive connections, and msgpack to deliver logs with high-throughput and little overhead. All log preparation and delivery is done asynchronously in a thread as not to block application execution and efficiently deliver logs for multi-threaded environments.

See {#initialize} for options and more details.

Constants

CONTENT_TYPE
LOGTAIL_HOST
LOGTAIL_PORT
LOGTAIL_PRODUCTION_HOST
LOGTAIL_SCHEME
LOGTAIL_STAGING_HOST
USER_AGENT

Public Class Methods

new(api_key, options = {}) click to toggle source

Instantiates a new HTTP log device that can be passed to {Logtail::Logger#initialize}.

The class maintains a buffer which is flushed in batches to the Logtail API. 2 options control when the flush happens, `:batch_byte_size` and `:flush_interval`. If either of these are surpassed, the buffer will be flushed.

By default, the buffer will apply back pressure when the rate of log messages exceeds the maximum delivery rate. If you don't want to sacrifice app performance in this case you can drop the log messages instead by passing a {DroppingSizedQueue} via the `:request_queue` option.

@param api_key [String] The API key provided to you after you add your application to

[Logtail](https://logtail.com).

@param [Hash] options the options to create a HTTP log device with. @option attributes [Symbol] :batch_size (1000) Determines the maximum of log lines in

each HTTP payload. If the queue exceeds this limit an HTTP request will be issued. Bigger
payloads mean higher throughput, but also use more memory. Logtail will not accept
payloads larger than 1mb.

@option attributes [Symbol] :flush_continuously (true) This should only be disabled under

special circumstsances (like test suites). Setting this to `false` disables the
continuous flushing of log message. As a result, flushing must be handled externally
via the #flush method.

@option attributes [Symbol] :flush_interval (1) How often the client should

attempt to deliver logs to the Logtail API in fractional seconds. The HTTP client buffers
logs and this options represents how often that will happen, assuming `:batch_byte_size`
is not met.

@option attributes [Symbol] :requests_per_conn (2500) The number of requests to send over a

single persistent connection. After this number is met, the connection will be closed
and a new one will be opened.

@option attributes [Symbol] :request_queue (FlushableDroppingSizedQueue.new(25)) The request

queue object that queues Net::HTTP requests for delivery. By deafult this is a
`FlushableDroppingSizedQueue` of size `25`. Meaning once the queue fills up to 25
requests new requests will be dropped. If you'd prefer to apply back pressure,
ensuring you do not lose log data, pass a standard {SizedQueue}. See examples for
an example.

@option attributes [Symbol] :logtail_host The Logtail host to delivery the log lines to.

The default is set via {LOGTAIL_HOST}.

@example Basic usage

Logtail::Logger.new(Logtail::LogDevices::HTTP.new("my_logtail_api_key"))

@example Apply back pressure instead of dropping messages

http_log_device = Logtail::LogDevices::HTTP.new("my_logtail_api_key", request_queue: SizedQueue.new(25))
Logtail::Logger.new(http_log_device)
# File lib/logtail/log_devices/http.rb, line 72
def initialize(api_key, options = {})
  @api_key = api_key || raise(ArgumentError.new("The api_key parameter cannot be blank"))
  @logtail_host = options[:logtail_host] || ENV['LOGTAIL_HOST'] || LOGTAIL_HOST
  @logtail_port = options[:logtail_port] || ENV['LOGTAIL_PORT'] || LOGTAIL_PORT
  @logtail_scheme = options[:logtail_scheme] || ENV['LOGTAIL_SCHEME'] || LOGTAIL_SCHEME
  @batch_size = options[:batch_size] || 1_000
  @flush_continuously = options[:flush_continuously] != false
  @flush_interval = options[:flush_interval] || 2 # 2 seconds
  @requests_per_conn = options[:requests_per_conn] || 2_500
  @msg_queue = FlushableDroppingSizedQueue.new(@batch_size)
  @request_queue = options[:request_queue] || FlushableDroppingSizedQueue.new(25)
  @successive_error_count = 0
  @requests_in_flight = 0
end

Public Instance Methods

close() click to toggle source

Closes the log device, cleans up, and attempts one last delivery.

# File lib/logtail/log_devices/http.rb, line 117
def close
  # Kill the flush thread immediately since we are about to flush again.
  @flush_thread.kill if @flush_thread

  # Flush all remaining messages
  flush

  # Kill the request queue thread. Flushing ensures that no requests are pending.
  @request_outlet_thread.kill if @request_outlet_thread
end
deliver_one(msg) click to toggle source
# File lib/logtail/log_devices/http.rb, line 128
def deliver_one(msg)
  http = build_http

  begin
    resp = http.start do |conn|
      req = build_request([msg])
      @requests_in_flight += 1
      conn.request(req)
    end
    return resp
  rescue => e
    Logtail::Config.instance.debug { "error: #{e.message}" }
    return e
  ensure
    http.finish if http.started?
    @requests_in_flight -= 1
  end
end
flush() click to toggle source

Flush all log messages in the buffer synchronously. This method will not return until delivery of the messages has been successful. If you want to flush asynchronously see {#flush_async}.

# File lib/logtail/log_devices/http.rb, line 110
def flush
  flush_async
  wait_on_request_queue
  true
end
verify_delivery!() click to toggle source
# File lib/logtail/log_devices/http.rb, line 147
      def verify_delivery!
        5.times do |i|
          sleep(2)

          if @last_resp.nil?
            print "."
          elsif @last_resp.code == "202"
            puts "Log delivery successful! View your logs at https://logtail.com"
          else
            raise <<-MESSAGE

Log delivery failed!

Status: #{@last_resp.code}
Body: #{@last_resp.body}

You can enable internal Logtail debug logging with the following:

Logtail::Config.instance.debug_logger = ::Logger.new(STDOUT)
MESSAGE
          end
        end

        raise <<-MESSAGE
        
Log delivery failed! No request was made.

You can enable internal debug logging with the following:

Logtail::Config.instance.debug_logger = ::Logger.new(STDOUT)
MESSAGE
      end
write(msg) click to toggle source

Write a new log line message to the buffer, and flush asynchronously if the message queue is full. We flush asynchronously because the maximum message batch size is constricted by the Logtail API. The actual application limit is a multiple of this. Hence the `@request_queue`.

# File lib/logtail/log_devices/http.rb, line 91
def write(msg)
  @msg_queue.enq(msg)

  # Lazily start flush threads to ensure threads are alive after forking processes.
  # If the threads are started during instantiation they will not be copied when
  # the current process is forked. This is the case with various web servers,
  # such as phusion passenger.
  ensure_flush_threads_are_started

  if @msg_queue.full?
    Logtail::Config.instance.debug { "Flushing HTTP buffer via write" }
    flush_async
  end
  true
end

Private Instance Methods

authorization_payload() click to toggle source

Builds the `Authorization` header value for HTTP delivery to the Logtail API.

# File lib/logtail/log_devices/http.rb, line 363
def authorization_payload
  "Bearer #{@api_key}"
end
build_http() click to toggle source

Builds an `Net::HTTP` object to deliver requests over.

# File lib/logtail/log_devices/http.rb, line 272
def build_http
  http = Net::HTTP.new(@logtail_host, @logtail_port)
  http.set_debug_output(Config.instance.debug_logger) if Config.instance.debug_logger
  if @logtail_scheme == 'https'
    http.use_ssl = true
    # Verification on Windows fails despite having a valid certificate.
    http.verify_mode = OpenSSL::SSL::VERIFY_NONE
  end
  http.read_timeout = 30
  http.ssl_timeout = 10
  http.open_timeout = 10
  http
end
build_request(msgs) click to toggle source

Builds an HTTP request based on the current messages queued.

# File lib/logtail/log_devices/http.rb, line 198
def build_request(msgs)
  path = '/'
  req = Net::HTTP::Post.new(path)
  req['Authorization'] = authorization_payload
  req['Content-Type'] = CONTENT_TYPE
  req['User-Agent'] = USER_AGENT
  req.body = msgs.to_msgpack
  req
end
deliver_requests(conn) click to toggle source

Creates a loop that delivers requests over an open (kept alive) HTTP connection. If the connection dies, the request is thrown back onto the queue and the method returns. It is the responsibility of the caller to implement retries and establish a new connection.

# File lib/logtail/log_devices/http.rb, line 310
def deliver_requests(conn)
  num_reqs = 0

  while num_reqs < @requests_per_conn
    if @request_queue.size > 0
      Logtail::Config.instance.debug { "Waiting on next request, threads waiting: #{@request_queue.size}" }
    end

    request_attempt = @request_queue.deq

    if request_attempt.nil?
      sleep(1)
    else
      request_attempt.attempted!
      @requests_in_flight += 1

      begin
        resp = conn.request(request_attempt.request)
      rescue => e
        Logtail::Config.instance.debug { "#deliver_requests error: #{e.message}" }

        # Throw the request back on the queue for a retry if it has been attempted less
        # than 3 times
        if request_attempt.attempts < 3
          Logtail::Config.instance.debug { "Request is being retried, #{request_attempt.attempts} previous attempts" }
          @request_queue.enq(request_attempt)
        else
          Logtail::Config.instance.debug { "Request is being dropped, #{request_attempt.attempts} previous attempts" }
        end

        return false
      ensure
        @requests_in_flight -= 1
      end

      num_reqs += 1

      @last_resp = resp

      Logtail::Config.instance.debug do
        if resp.code == "202"
          "Logs successfully sent! View your logs at https://logtail.com"
        else
          "Log delivery failed! status: #{resp.code}, body: #{resp.body}"
        end
      end
    end
  end

  true
end
ensure_flush_threads_are_started() click to toggle source

This is a convenience method to ensure the flush thread are started. This is called lazily from {#write} so that we only start the threads as needed, but it also ensures threads are started after process forking.

# File lib/logtail/log_devices/http.rb, line 185
def ensure_flush_threads_are_started
  if @flush_continuously
    if @request_outlet_thread.nil? || !@request_outlet_thread.alive?
      @request_outlet_thread = Thread.new { request_outlet }
    end

    if @flush_thread.nil? || !@flush_thread.alive?
      @flush_thread = Thread.new { intervaled_flush }
    end
  end
end
flush_async() click to toggle source

Flushes the message buffer asynchronously. The reason we provide this method is because the message buffer limit is constricted by the Logtail API. The application limit is multiples of the buffer limit, hence the `@request_queue`, allowing us to buffer beyond the Logtail API imposed limit.

# File lib/logtail/log_devices/http.rb, line 213
def flush_async
  @last_async_flush = Time.now
  msgs = @msg_queue.flush
  return if msgs.empty?

  req = build_request(msgs)
  if !req.nil?
    Logtail::Config.instance.debug { "New request placed on queue" }
    request_attempt = RequestAttempt.new(req)
    @request_queue.enq(request_attempt)
  end
end
intervaled_flush() click to toggle source

Flushes the message queue on an interval. You will notice that {#write} also flushes the buffer if it is full. This method takes note of this via the `@last_async_flush` variable as to not flush immediately after a write flush.

# File lib/logtail/log_devices/http.rb, line 246
def intervaled_flush
  # Wait specified time period before starting
  sleep @flush_interval

  loop do
    begin
      if intervaled_flush_ready?
        Logtail::Config.instance.debug { "Flushing HTTP buffer via the interval" }
        flush_async
      end

      sleep(0.5)
    rescue Exception => e
      Logtail::Config.instance.debug { "Intervaled HTTP flush failed: #{e.inspect}\n\n#{e.backtrace}" }
    end
  end
end
intervaled_flush_ready?() click to toggle source

Determines if the loop in {#intervaled_flush} is ready to be flushed again. It uses the `@last_async_flush` variable to ensure that a flush does not happen too rapidly ({#write} also triggers a flush).

# File lib/logtail/log_devices/http.rb, line 267
def intervaled_flush_ready?
  @last_async_flush.nil? || (Time.now.to_f - @last_async_flush.to_f).abs >= @flush_interval
end
request_outlet() click to toggle source

Creates a loop that processes the `@request_queue` on an interval.

# File lib/logtail/log_devices/http.rb, line 287
def request_outlet
  loop do
    http = build_http

    begin
      Logtail::Config.instance.debug { "Starting HTTP connection" }

      http.start do |conn|
        deliver_requests(conn)
      end
    rescue => e
      Logtail::Config.instance.debug { "#request_outlet error: #{e.message}" }
    ensure
      Logtail::Config.instance.debug { "Finishing HTTP connection" }
      http.finish if http.started?
    end
  end
end
wait_on_request_queue() click to toggle source

Waits on the request queue. This is used in {#flush} to ensure the log data has been delivered before returning.

# File lib/logtail/log_devices/http.rb, line 228
def wait_on_request_queue
  # Wait 20 seconds
  40.times do |i|
    if @request_queue.size == 0 && @requests_in_flight == 0
      Logtail::Config.instance.debug { "Request queue is empty and no requests are in flight, finish waiting" }
      return true
    end
    Logtail::Config.instance.debug do
      "Request size #{@request_queue.size}, reqs in-flight #{@requests_in_flight}, " \
        "continue waiting (iteration #{i + 1})"
    end
    sleep 0.5
  end
end