class Atatus::Collector::Base

Attributes

config[R]

Public Class Methods

new(config) click to toggle source
# File lib/atatus/collector/base.rb, line 27
def initialize(config)
  # info 'Initializing Collector'
  @config = config
  @spans = Hash.new {|h,k| h[k]=[]}

  @txns_lock = Mutex.new
  @txns_agg = {}
  @txn_hist_agg = {}
  @traces_agg = []
  @error_metrics_agg = {}
  @error_requests_agg = []

  @errors_lock = Mutex.new
  @errors_aggs = []

  @metrics_lock = Mutex.new
  @metrics_agg = []

  @transport = Atatus::BaseTransport.new(config)
  @collect_counter = 0
  @running = false
end

Public Instance Methods

add_error(error) click to toggle source
# File lib/atatus/collector/base.rb, line 80
def add_error(error)
  ensure_worker_running

  @errors_lock.synchronize do
    if @errors_aggs.length < 20
      @errors_aggs.push(error)
    else
      i = rand(20)
      @errors_aggs[i] = error
    end
  end
end
add_metrics(metricset) click to toggle source
# File lib/atatus/collector/base.rb, line 93
def add_metrics(metricset)
  ensure_worker_running

  metric_added = false
  metric = {}

  if %i[system.cpu.total.norm.pct system.memory.actual.free system.memory.total system.process.cpu.total.norm.pct system.process.memory.size system.process.memory.rss.bytes].all? {|s| metricset.samples.key? s}
  then
    metric[:'system.cpu.total.norm.pct'] = metricset.samples[:'system.cpu.total.norm.pct']
    metric[:'system.memory.actual.free'] = metricset.samples[:'system.memory.actual.free']
    metric[:'system.memory.total'] = metricset.samples[:'system.memory.total']
    metric[:'system.process.cpu.total.norm.pct'] = metricset.samples[:'system.process.cpu.total.norm.pct']
    metric[:'system.process.memory.size'] = metricset.samples[:'system.process.memory.size']
    metric[:'system.process.memory.rss.bytes'] = metricset.samples[:'system.process.memory.rss.bytes']
    metric_added = true
  end

  if %i[ruby.gc.count ruby.threads ruby.heap.slots.live ruby.heap.slots.free ruby.heap.allocations.total].all? {|s| metricset.samples.key? s}
  then
    metric[:'ruby.gc.count'] = metricset.samples[:'ruby.gc.count']
    metric[:'ruby.threads'] = metricset.samples[:'ruby.threads']
    metric[:'ruby.heap.slots.live'] = metricset.samples[:'ruby.heap.slots.live']
    metric[:'ruby.heap.slots.free'] = metricset.samples[:'ruby.heap.slots.free']
    metric[:'ruby.heap.allocations.total'] = metricset.samples[:'ruby.heap.allocations.total']
    if %i[ruby.gc.time].all? {|s| metricset.samples.key? s}
    then
      metric[:'ruby.gc.time'] = metricset.samples[:'ruby.gc.time']
    end
    metric_added = true
  end

  if metric_added
    @metrics_lock.synchronize do
      @metrics_agg << metric
    end
  end
end
add_span(span) click to toggle source
# File lib/atatus/collector/base.rb, line 131
def add_span(span)
  ensure_worker_running

  if
    span.transaction_id.nil? ||
    span.name.nil? ||
    span.type.nil? ||
    span.subtype.nil? ||
    span.duration.nil?
  then
    return
  end

  @spans[span.transaction_id] << span if span.transaction_id
end
add_txn(txn) click to toggle source
# File lib/atatus/collector/base.rb, line 147
def add_txn(txn)
  ensure_worker_running

  if
    txn.name.nil? ||
    txn.id.nil? ||
    txn.timestamp.nil? ||
    txn.duration.nil?
  then
    return
  end

  return if txn.name.empty?
  return if txn.duration <= 0

  @txns_lock.synchronize do
    txn_type = @config.framework_name || "Ruby"
    background = false
    if !txn.type.nil?
      if txn.type == "Sidekiq"
        background = true
      end
    end

    if !@txns_agg.key?(txn.name)
      @txns_agg[txn.name] = Txn.new(txn_type, "Ruby", txn.duration, background: background)
      @txns_agg[txn.name].id = txn.id
      @txns_agg[txn.name].pid = txn.id
    else
      @txns_agg[txn.name].aggregate! txn.duration
    end

    if background == false and txn.duration <= 150*1000*1000.0
      if !@txn_hist_agg.key?(txn.name)
        @txn_hist_agg[txn.name] = TxnHist.new(txn_type, "Ruby", Util.ms(txn.duration))
      else
        @txn_hist_agg[txn.name].aggregate! Util.ms(txn.duration)
      end
    end

    spans_present = false
    ruby_time = 0
    spans_tuple = []
    if @spans.key?(txn.id)
      spans_present = true
      @spans[txn.id].each do |span|
        if
          span.name.nil? ||
          span.type.nil? ||
          span.subtype.nil? ||
          span.timestamp.nil? ||
          span.duration.nil?
        then
          next
        end

        next if span.name.empty?

        if span.timestamp >= txn.timestamp
          start = Util.ms(span.timestamp - txn.timestamp)
          spans_tuple.push(SpanTiming.new(start, start + Util.ms(span.duration)))
          if !@txns_agg[txn.name].spans.key?(span.name)
            kind = Layer.span_kind(span.type)
            type = Layer.span_type(span.subtype)
            @txns_agg[txn.name].spans[span.name] = Layer.new(type, kind, span.duration)
            @txns_agg[txn.name].spans[span.name].id = span.id
            @txns_agg[txn.name].spans[span.name].pid = span.transaction_id
          else
            @txns_agg[txn.name].spans[span.name].aggregate! span.duration
          end
        end
      end
    end

    if spans_tuple.length == 0
      ruby_time = Util.ms(txn.duration)
    else
      spans_tuple.sort! {| a, b | a[:start] <=> b[:start] }
      ruby_time = spans_tuple[0].start
      span_end = spans_tuple[0].end
      j = 0
      while j < spans_tuple.length
        if spans_tuple[j].start > span_end
          ruby_time += spans_tuple[j].start - span_end
          span_end = spans_tuple[j].end
        else
          if spans_tuple[j].end > span_end
            span_end = spans_tuple[j].end
          end
        end
        j += 1
      end
      if Util.ms(txn.duration) > span_end
        ruby_time += Util.ms(txn.duration) - span_end
      end
    end

    if ruby_time > 0
      ruby_time = Util.us(ruby_time)
      @txns_agg[txn.name].spans["Ruby"] = Layer.new("Ruby", "Ruby", ruby_time)
    end

    if spans_present == true || ruby_time > 0
      if Util.ms(txn.duration) >= @config.trace_threshold
        trace_txn = txn
        trace_txn.spans = @spans[txn.id] if spans_present
        trace_txn.ruby_time = ruby_time if ruby_time > 0

        if @traces_agg.length < 5
          @traces_agg.push(trace_txn)
        else
          i = rand(5)
          @traces_agg[i] = trace_txn
        end
      end
    end

    if spans_present
      @spans.delete(txn.id)
    end

    if
      !txn.context.nil? &&
      !txn.context.response.nil? &&
      !txn.context.response.status_code.nil?
    then
      status_code = txn.context.response.status_code.to_i

      if status_code >= 400 && status_code != 404
        if !@error_metrics_agg.key?(txn.name)
          @error_metrics_agg[txn.name] = {status_code => 1}
        else
          if !@error_metrics_agg[txn.name].key?(status_code)
            @error_metrics_agg[txn.name][status_code] = 1
          else
            @error_metrics_agg[txn.name][status_code] += 1
          end
        end

        if @error_requests_agg.length < 20
          @error_requests_agg.push({'name' => txn.name, 'context' => txn.context})
        else
          i = rand(20)
          @error_requests_agg[i] = {'name' => txn.name, 'context' => txn.context}
        end
      end
    end

  end
end
handle_forking!() click to toggle source
# File lib/atatus/collector/base.rb, line 75
def handle_forking!
  stop
  start
end
pid_str() click to toggle source
# File lib/atatus/collector/base.rb, line 52
def pid_str
  format('[PID:%s]', Process.pid)
end
start() click to toggle source
# File lib/atatus/collector/base.rb, line 56
def start
  debug '%s: Starting collector', pid_str

  ensure_worker_running
end
stop() click to toggle source
# File lib/atatus/collector/base.rb, line 62
def stop
  return unless @running
  @running = false
  if worker_active?
    debug '%s: Waiting for collector worker to exit', pid_str
    @worker.run
    @worker.join(10)
  end
rescue => e
  error format('Failed during collector stop: [%s] %s', e.class, e.message)
  error "Backtrace:\n" + e.backtrace.join("\n")
end

Private Instance Methods

collect(start_time) click to toggle source
# File lib/atatus/collector/base.rb, line 317
def collect(start_time)
  if @config.license_key.nil? || @config.app_name.nil?
    if @config.license_key.nil? && @config.app_name.nil?
      error '%s: Atatus configuration license_key and app_name are missing', pid_str
      return
    elsif @config.license_key.nil?
      error '%s: Atatus configuration license_key is missing', pid_str
      return
    elsif @config.app_name.nil?
      error '%s: Atatus configuration app_name is missing', pid_str
      return
    end
  end

  if @collect_counter % 30 == 0
    @transport.hostinfo(start_time)
    @collect_counter = 0
  end
  @collect_counter += 1

  end_time = (Time.now.to_f * 1000).to_i
  debug '%s: data collector', pid_str

  txns_data = nil
  txn_hist_data = nil
  traces_data = nil
  error_metrics_data = nil
  error_requests_data = nil
  errors_data = nil
  metrics_data = nil

  @txns_lock.synchronize do
    txns_data = @txns_agg
    @txns_agg = {}

    txn_hist_data = @txn_hist_agg
    @txn_hist_agg = {}

    traces_data = @traces_agg
    @traces_agg = []

    error_metrics_data = @error_metrics_agg
    @error_metrics_agg = {}

    error_requests_data = @error_requests_agg
    @error_requests_agg = []
  end

  @errors_lock.synchronize do
    errors_data = @errors_aggs
    @errors_aggs = []
  end

  @metrics_lock.synchronize do
    metrics_data = @metrics_agg
    @metrics_agg = []
  end

  @transport.txns(start_time, end_time, txns_data) unless txns_data.empty?

  @transport.txn_hist(start_time, end_time, txn_hist_data) unless txn_hist_data.empty?

  unless traces_data.empty?
    traces_data.each do |trace|
      individual_trace_data = [trace]
      @transport.traces(start_time, end_time, individual_trace_data)
    end
  end

  @transport.error_metrics(start_time, end_time, error_metrics_data, error_requests_data) unless error_metrics_data.empty?

  @transport.errors(start_time, end_time, errors_data) unless errors_data.empty?

  @transport.metrics(start_time, end_time, metrics_data) unless metrics_data.empty?
end
ensure_worker_running() click to toggle source
# File lib/atatus/collector/base.rb, line 299
def ensure_worker_running
  return if worker_active?
  @running = true
  @worker = Thread.new() do
    debug '%s: Starting collector worker', pid_str

    while @running
      start_time = (Time.now.to_f * 1000).to_i
      sleep(60)
      collect start_time
    end
  end
end
worker_active?() click to toggle source
# File lib/atatus/collector/base.rb, line 313
def worker_active?
  @worker && @worker.alive?
end