class StackifyRubyAPM::Worker

@api private

Attributes

config[R]
messages[R]
pending_transactions[R]

Public Class Methods

new(config, messages, pending_transactions, trace_logger) click to toggle source
# File lib/stackify_apm/worker.rb, line 29
def initialize(config, messages, pending_transactions, trace_logger)
  @config = config
  @messages = messages
  @pending_transactions = pending_transactions
  @trace_logger = trace_logger

  @serializers = Struct.new(:transactions, :errors).new(
    Serializers::Transactions.new(config),
    Serializers::Errors.new(config)
  )
end

Public Instance Methods

run_forever() click to toggle source

Collects and sends the transactions

# File lib/stackify_apm/worker.rb, line 45
def run_forever
  @timer_task = build_timer_task.execute

  while (msg = messages.pop)
    case msg
    when ErrorMsg
      post_error msg
    when FlushMsg
      collect_and_send_transactions
    when StopMsg
      # empty collected transactions before exiting
      collect_and_send_transactions
      stop!
    end
  end
end

Private Instance Methods

build_timer_task() click to toggle source

flush_interval_seconds - interval with which transactions should be sent to the APM. The running task responsible for initiating the transactions sending

# File lib/stackify_apm/worker.rb, line 72
def build_timer_task
  Concurrent::TimerTask.new(execution_interval: config.flush_interval_seconds) do
    messages.push(FlushMsg.new)
  end
end
collect_and_send_transactions() click to toggle source

rubocop:disable Lint/RescueException Collects, builds, and sends transactions via HTTP/TraceLogger

# File lib/stackify_apm/worker.rb, line 81
def collect_and_send_transactions
  return if pending_transactions.empty?
  transactions = collect_batched_transactions
  debug '[Worker] collect_and_send_transactions() transactions : ' + transactions.inspect.to_s if ENV['STACKIFY_TRANSPORT_LOG_LEVEL'] == '0'
  begin
    @trace_logger.post(transactions)
  rescue ::Exception => e
    error '[Worker] Collect_and_send_transactions error:'
    error e.inspect
    error e.backtrace.join("\n")
    nil
  end
end
collect_batched_transactions() click to toggle source

rubocop:disable Lint/HandleExceptions

# File lib/stackify_apm/worker.rb, line 104
def collect_batched_transactions
  batch = []

  begin
    while (transaction = pending_transactions.pop(true)) &&
          batch.length <= config.max_queue_size
      batch << transaction
    end
  rescue ThreadError # queue empty
  end

  batch
end
post_error(_msg) click to toggle source

rubocop:enable Lint/RescueException

# File lib/stackify_apm/worker.rb, line 96
def post_error(_msg)
  return if pending_transactions.empty?
  # transactions = collect_batched_transactions
  # payload = @serializers.errors.build_all([msg.error])
  # @trace_logger.post(payload, transactions[0])
end
stop!() click to toggle source
# File lib/stackify_apm/worker.rb, line 64
def stop!
  @timer_task && @timer_task.shutdown
  Thread.exit
end