class ElasticAPM::Transport::Base

@api private

Constants

WATCHER_EXECUTION_INTERVAL
WATCHER_TIMEOUT_INTERVAL
WORKER_JOIN_TIMEOUT

Attributes

config[R]
filters[R]
queue[R]
stopped[R]
watcher[R]
workers[R]

Public Class Methods

new(config) click to toggle source
# File lib/elastic_apm/transport/base.rb, line 41
def initialize(config)
  @config = config
  @queue = SizedQueue.new(config.api_buffer_size)

  @serializers = Serializers.new(config)
  @filters = Filters.new(config)

  @stopped = Concurrent::AtomicBoolean.new
  @workers = Array.new(config.pool_size)

  @worker_mutex = Mutex.new
end

Public Instance Methods

add_filter(key, callback) click to toggle source
# File lib/elastic_apm/transport/base.rb, line 94
def add_filter(key, callback)
  @filters.add(key, callback)
end
handle_forking!() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 98
def handle_forking!
  # We can't just stop and start again because the StopMessage
  # will then be the first message processed when the transport is
  # restarted.
  stop_watcher
  ensure_worker_count
  create_watcher
end
start() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 56
def start
  debug '%s: Starting Transport', pid_str
  # Set @stopped to false first, in case transport is restarted;
  # ensure_worker_count requires @stopped to be false
  # ~estolfo
  @stopped.make_false unless @stopped.false?

  ensure_worker_count
  create_watcher
end
stop() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 67
def stop
  debug '%s: Stopping Transport', pid_str

  @stopped.make_true

  stop_watcher
  stop_workers
end
submit(resource) click to toggle source
# File lib/elastic_apm/transport/base.rb, line 76
def submit(resource)
  if @stopped.true?
    warn '%s: Transport stopping, no new events accepted', pid_str
    debug 'Dropping: %s', resource.inspect
    return false
  end

  queue.push(resource, true)

  true
rescue ThreadError
  throttled_queue_full_warning
  nil
rescue Exception => e
  error '%s: Failed adding to the transport queue: %p', pid_str, e.inspect
  nil
end

Private Instance Methods

all_workers_alive?() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 133
def all_workers_alive?
  !!workers.all? { |t| t&.alive? }
end
boot_worker() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 137
def boot_worker
  debug '%s: Booting worker...', pid_str

  Thread.new do
    Worker.new(
      config, queue,
      serializers: @serializers,
      filters: @filters
    ).work_forever
  end
end
create_watcher() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 113
def create_watcher
  @watcher = Concurrent::TimerTask.execute(
    execution_interval: WATCHER_EXECUTION_INTERVAL,
    timeout_interval: WATCHER_TIMEOUT_INTERVAL
  ) { ensure_worker_count }
end
ensure_worker_count() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 120
def ensure_worker_count
  @worker_mutex.synchronize do
    return if all_workers_alive?
    return if stopped.true?

    @workers.map! do |thread|
      next thread if thread&.alive?

      boot_worker
    end
  end
end
pid_str() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 109
def pid_str
  format('[PID:%s]', Process.pid)
end
send_stop_messages() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 171
def send_stop_messages
  config.pool_size.times { queue.push(Worker::StopMessage.new, true) }
rescue ThreadError
  warn 'Cannot push stop messages to worker queue as it is full'
end
stop_watcher() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 177
def stop_watcher
  watcher&.shutdown
end
stop_workers() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 149
def stop_workers
  debug '%s: Stopping workers', pid_str

  send_stop_messages

  @worker_mutex.synchronize do
    workers.each do |thread|
      next if thread.nil?
      next if thread.join(WORKER_JOIN_TIMEOUT)

      debug(
        '%s: Worker did not stop in %ds, killing...',
        pid_str, WORKER_JOIN_TIMEOUT
      )
      thread.kill
    end

    # Maintain the @worker array size for when transport is restarted
    @workers.fill(nil)
  end
end
throttled_queue_full_warning() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 181
def throttled_queue_full_warning
  (@queue_full_log ||= Util::Throttle.new(5) do
    warn(
      '%s: Queue is full (%i items), skipping…',
      pid_str, config.api_buffer_size
    )
  end).call
end