class ApolloStudioTracing::TraceChannel

rubocop:disable Metrics/ClassLength

Attributes

api_key[R]
compress[R]
debug_reports[R]
debug_reports?[R]
max_queue_bytes[R]
max_uncompressed_report_size[R]
max_upload_attempts[R]
min_upload_retry_delay_secs[R]
reporting_interval[R]

Public Class Methods

new(report_header:, compress: nil, api_key: nil, reporting_interval: nil, max_uncompressed_report_size: nil, max_queue_bytes: nil, debug_reports: nil, max_upload_attempts: nil, min_upload_retry_delay_secs: nil) click to toggle source

rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity

# File lib/apollo-studio-tracing/trace_channel.rb, line 22
def initialize(report_header:, compress: nil, api_key: nil, reporting_interval: nil,
               max_uncompressed_report_size: nil, max_queue_bytes: nil, debug_reports: nil,
               max_upload_attempts: nil, min_upload_retry_delay_secs: nil)
  @report_header = report_header
  @compress = compress.nil? ? true : compress
  @api_key = api_key || ENV.fetch('ENGINE_API_KEY', ENV.fetch('APOLLO_KEY', 'NO_API_KEY'))
  @reporting_interval = reporting_interval || 5
  @max_uncompressed_report_size = max_uncompressed_report_size || 4 * 1024 * 1024
  @max_queue_bytes = max_queue_bytes || @max_uncompressed_report_size * 10
  @max_upload_attempts = max_upload_attempts || 5
  @min_upload_retry_delay_secs = min_upload_retry_delay_secs || 0.1
  @debug_reports = debug_reports.nil? ? false : debug_reports
  @queue = Queue.new
  @queue_bytes = Concurrent::AtomicFixnum.new(0)
  @queue_full = false
  @enqueue_mutex = Mutex.new
  @shutdown_barrier = ApolloStudioTracing::ShutdownBarrier.new
end

Public Instance Methods

ensure_thread_started() click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 89
def ensure_thread_started
  return if @uploader_thread&.alive?

  start
end
flush() click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 95
def flush
  until @queue.empty?
    # If the uploader thread isn't running then the queue will never drain
    break unless @uploader_thread&.alive?

    sleep(0.1)
  end
end
queue(query_key, trace, context) click to toggle source

rubocop:enable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity

# File lib/apollo-studio-tracing/trace_channel.rb, line 42
def queue(query_key, trace, context)
  @enqueue_mutex.synchronize do
    if @queue_bytes.value >= max_queue_bytes
      unless @queue_full
        ApolloStudioTracing.logger.warn(
          "Apollo tracing queue is above the threshold of #{max_queue_bytes} bytes and " \
          'trace collection will be paused.',
        )
        @queue_full = true
      end
    else
      if @queue_full
        ApolloStudioTracing.logger.info(
          "Apollo tracing queue is below the threshold of #{max_queue_bytes} bytes and " \
          'trace collection will resume.',
        )
        @queue_full = false
      end

      if debug_reports?
        ApolloStudioTracing.logger.info("Queueing a trace for #{query_key}")
      end

      proto = ApolloStudioTracing::Trace.new(
        start_time: to_proto_timestamp(trace[:start_time]),
        end_time: to_proto_timestamp(trace[:end_time]),
        duration_ns: trace[:end_time_nanos] - trace[:start_time_nanos],
        root: trace[:node_map].root,
        client_name: context[:apollo_client_name],
        client_version: context[:apollo_client_version],
      )

      encoded_trace = ApolloStudioTracing::Trace.encode(proto)
      @queue << [query_key, encoded_trace]
      @queue_bytes.increment(encoded_trace.bytesize + query_key.bytesize)

      ensure_thread_started
    end
  end
end
shutdown() click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 104
def shutdown
  return unless @uploader_thread

  ApolloStudioTracing.logger.info('Shutting down Apollo trace channel...')
  @shutdown_barrier.shutdown
  @uploader_thread.join
end
start() click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 83
def start
  @uploader_thread = Thread.new do
    run_uploader
  end
end

Private Instance Methods

drain_queue() click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 130
def drain_queue
  traces_per_query = {}
  report_size = 0
  until @queue.empty?
    query_key, encoded_trace = @queue.pop(false)
    @queue_bytes.decrement(encoded_trace.bytesize + query_key.bytesize)

    traces_per_query[query_key] ||= []
    traces_per_query[query_key] << encoded_trace
    report_size += encoded_trace.bytesize + query_key.bytesize

    next unless report_size >= max_uncompressed_report_size

    send_report(traces_per_query)
    traces_per_query = {}
    report_size = 0
  end

  send_report(traces_per_query) unless traces_per_query.empty?
end
queue_full?() click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 114
def queue_full?
  @queue_bytes.value >= max_queue_bytes
end
run_uploader() click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 118
def run_uploader
  ApolloStudioTracing.logger.info('Trace uploader starting')
  drain_queue until @shutdown_barrier.await_shutdown(reporting_interval)
  ApolloStudioTracing.logger.info('Draining queue before shutdown...')
  drain_queue
rescue StandardError => e
  ApolloStudioTracing.logger.warn("Exception thrown in uploader process. #{e}")
  raise e
ensure
  ApolloStudioTracing.logger.info('Trace uploader exiting')
end
send_report(traces_per_query) click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 151
def send_report(traces_per_query)
  trace_report = ApolloStudioTracing::Report.new(header: @report_header)
  traces_per_query.each do |query_key, encoded_traces|
    trace_report.traces_per_query[query_key] = ApolloStudioTracing::TracesAndStats.new(
      # TODO: Figure out how to use the already encoded traces like Apollo
      # https://github.com/apollographql/apollo-server/blob/master/packages/apollo-engine-reporting-protobuf/src/index.js
      trace: encoded_traces.map do |encoded_trace|
        ApolloStudioTracing::Trace.decode(encoded_trace)
      end,
    )
  end

  if debug_reports?
    ApolloStudioTracing.logger.info(
      "Sending trace report:\n#{JSON.pretty_generate(JSON.parse(trace_report.to_json))}",
    )
  end

  ApolloStudioTracing::API.upload(
    ApolloStudioTracing::Report.encode(trace_report),
    api_key: api_key,
    compress: compress,
    max_attempts: max_upload_attempts,
    min_retry_delay_secs: min_upload_retry_delay_secs,
  )
end
to_proto_timestamp(time) click to toggle source
# File lib/apollo-studio-tracing/trace_channel.rb, line 178
def to_proto_timestamp(time)
  Google::Protobuf::Timestamp.new(seconds: time.to_i, nanos: time.nsec)
end