class Atatus::Collector::Base
Attributes
config[R]
Public Class Methods
new(config)
click to toggle source
# File lib/atatus/collector/base.rb, line 27 def initialize(config) # info 'Initializing Collector' @config = config @spans = Hash.new {|h,k| h[k]=[]} @txns_lock = Mutex.new @txns_agg = {} @txn_hist_agg = {} @traces_agg = [] @error_metrics_agg = {} @error_requests_agg = [] @errors_lock = Mutex.new @errors_aggs = [] @metrics_lock = Mutex.new @metrics_agg = [] @transport = Atatus::BaseTransport.new(config) @collect_counter = 0 @running = false end
Public Instance Methods
add_error(error)
click to toggle source
# File lib/atatus/collector/base.rb, line 80 def add_error(error) ensure_worker_running @errors_lock.synchronize do if @errors_aggs.length < 20 @errors_aggs.push(error) else i = rand(20) @errors_aggs[i] = error end end end
add_metrics(metricset)
click to toggle source
# File lib/atatus/collector/base.rb, line 93 def add_metrics(metricset) ensure_worker_running metric_added = false metric = {} if %i[system.cpu.total.norm.pct system.memory.actual.free system.memory.total system.process.cpu.total.norm.pct system.process.memory.size system.process.memory.rss.bytes].all? {|s| metricset.samples.key? s} then metric[:'system.cpu.total.norm.pct'] = metricset.samples[:'system.cpu.total.norm.pct'] metric[:'system.memory.actual.free'] = metricset.samples[:'system.memory.actual.free'] metric[:'system.memory.total'] = metricset.samples[:'system.memory.total'] metric[:'system.process.cpu.total.norm.pct'] = metricset.samples[:'system.process.cpu.total.norm.pct'] metric[:'system.process.memory.size'] = metricset.samples[:'system.process.memory.size'] metric[:'system.process.memory.rss.bytes'] = metricset.samples[:'system.process.memory.rss.bytes'] metric_added = true end if %i[ruby.gc.count ruby.threads ruby.heap.slots.live ruby.heap.slots.free ruby.heap.allocations.total].all? {|s| metricset.samples.key? s} then metric[:'ruby.gc.count'] = metricset.samples[:'ruby.gc.count'] metric[:'ruby.threads'] = metricset.samples[:'ruby.threads'] metric[:'ruby.heap.slots.live'] = metricset.samples[:'ruby.heap.slots.live'] metric[:'ruby.heap.slots.free'] = metricset.samples[:'ruby.heap.slots.free'] metric[:'ruby.heap.allocations.total'] = metricset.samples[:'ruby.heap.allocations.total'] if %i[ruby.gc.time].all? {|s| metricset.samples.key? s} then metric[:'ruby.gc.time'] = metricset.samples[:'ruby.gc.time'] end metric_added = true end if metric_added @metrics_lock.synchronize do @metrics_agg << metric end end end
add_span(span)
click to toggle source
# File lib/atatus/collector/base.rb, line 131 def add_span(span) ensure_worker_running if span.transaction_id.nil? || span.name.nil? || span.type.nil? || span.subtype.nil? || span.duration.nil? then return end @spans[span.transaction_id] << span if span.transaction_id end
add_txn(txn)
click to toggle source
# File lib/atatus/collector/base.rb, line 147 def add_txn(txn) ensure_worker_running if txn.name.nil? || txn.id.nil? || txn.timestamp.nil? || txn.duration.nil? then return end return if txn.name.empty? return if txn.duration <= 0 @txns_lock.synchronize do txn_type = @config.framework_name || "Ruby" background = false if !txn.type.nil? if txn.type == "Sidekiq" background = true end end if !@txns_agg.key?(txn.name) @txns_agg[txn.name] = Txn.new(txn_type, "Ruby", txn.duration, background: background) @txns_agg[txn.name].id = txn.id @txns_agg[txn.name].pid = txn.id else @txns_agg[txn.name].aggregate! txn.duration end if background == false and txn.duration <= 150*1000*1000.0 if !@txn_hist_agg.key?(txn.name) @txn_hist_agg[txn.name] = TxnHist.new(txn_type, "Ruby", Util.ms(txn.duration)) else @txn_hist_agg[txn.name].aggregate! Util.ms(txn.duration) end end spans_present = false ruby_time = 0 spans_tuple = [] if @spans.key?(txn.id) spans_present = true @spans[txn.id].each do |span| if span.name.nil? || span.type.nil? || span.subtype.nil? || span.timestamp.nil? || span.duration.nil? then next end next if span.name.empty? if span.timestamp >= txn.timestamp start = Util.ms(span.timestamp - txn.timestamp) spans_tuple.push(SpanTiming.new(start, start + Util.ms(span.duration))) if !@txns_agg[txn.name].spans.key?(span.name) kind = Layer.span_kind(span.type) type = Layer.span_type(span.subtype) @txns_agg[txn.name].spans[span.name] = Layer.new(type, kind, span.duration) @txns_agg[txn.name].spans[span.name].id = span.id @txns_agg[txn.name].spans[span.name].pid = span.transaction_id else @txns_agg[txn.name].spans[span.name].aggregate! span.duration end end end end if spans_tuple.length == 0 ruby_time = Util.ms(txn.duration) else spans_tuple.sort! {| a, b | a[:start] <=> b[:start] } ruby_time = spans_tuple[0].start span_end = spans_tuple[0].end j = 0 while j < spans_tuple.length if spans_tuple[j].start > span_end ruby_time += spans_tuple[j].start - span_end span_end = spans_tuple[j].end else if spans_tuple[j].end > span_end span_end = spans_tuple[j].end end end j += 1 end if Util.ms(txn.duration) > span_end ruby_time += Util.ms(txn.duration) - span_end end end if ruby_time > 0 ruby_time = Util.us(ruby_time) @txns_agg[txn.name].spans["Ruby"] = Layer.new("Ruby", "Ruby", ruby_time) end if spans_present == true || ruby_time > 0 if Util.ms(txn.duration) >= @config.trace_threshold trace_txn = txn trace_txn.spans = @spans[txn.id] if spans_present trace_txn.ruby_time = ruby_time if ruby_time > 0 if @traces_agg.length < 5 @traces_agg.push(trace_txn) else i = rand(5) @traces_agg[i] = trace_txn end end end if spans_present @spans.delete(txn.id) end if !txn.context.nil? && !txn.context.response.nil? && !txn.context.response.status_code.nil? then status_code = txn.context.response.status_code.to_i if status_code >= 400 && status_code != 404 if !@error_metrics_agg.key?(txn.name) @error_metrics_agg[txn.name] = {status_code => 1} else if !@error_metrics_agg[txn.name].key?(status_code) @error_metrics_agg[txn.name][status_code] = 1 else @error_metrics_agg[txn.name][status_code] += 1 end end if @error_requests_agg.length < 20 @error_requests_agg.push({'name' => txn.name, 'context' => txn.context}) else i = rand(20) @error_requests_agg[i] = {'name' => txn.name, 'context' => txn.context} end end end end end
handle_forking!()
click to toggle source
# File lib/atatus/collector/base.rb, line 75 def handle_forking! stop start end
pid_str()
click to toggle source
# File lib/atatus/collector/base.rb, line 52 def pid_str format('[PID:%s]', Process.pid) end
start()
click to toggle source
# File lib/atatus/collector/base.rb, line 56 def start debug '%s: Starting collector', pid_str ensure_worker_running end
stop()
click to toggle source
# File lib/atatus/collector/base.rb, line 62 def stop return unless @running @running = false if worker_active? debug '%s: Waiting for collector worker to exit', pid_str @worker.run @worker.join(10) end rescue => e error format('Failed during collector stop: [%s] %s', e.class, e.message) error "Backtrace:\n" + e.backtrace.join("\n") end
Private Instance Methods
collect(start_time)
click to toggle source
# File lib/atatus/collector/base.rb, line 317 def collect(start_time) if @config.license_key.nil? || @config.app_name.nil? if @config.license_key.nil? && @config.app_name.nil? error '%s: Atatus configuration license_key and app_name are missing', pid_str return elsif @config.license_key.nil? error '%s: Atatus configuration license_key is missing', pid_str return elsif @config.app_name.nil? error '%s: Atatus configuration app_name is missing', pid_str return end end if @collect_counter % 30 == 0 @transport.hostinfo(start_time) @collect_counter = 0 end @collect_counter += 1 end_time = (Time.now.to_f * 1000).to_i debug '%s: data collector', pid_str txns_data = nil txn_hist_data = nil traces_data = nil error_metrics_data = nil error_requests_data = nil errors_data = nil metrics_data = nil @txns_lock.synchronize do txns_data = @txns_agg @txns_agg = {} txn_hist_data = @txn_hist_agg @txn_hist_agg = {} traces_data = @traces_agg @traces_agg = [] error_metrics_data = @error_metrics_agg @error_metrics_agg = {} error_requests_data = @error_requests_agg @error_requests_agg = [] end @errors_lock.synchronize do errors_data = @errors_aggs @errors_aggs = [] end @metrics_lock.synchronize do metrics_data = @metrics_agg @metrics_agg = [] end @transport.txns(start_time, end_time, txns_data) unless txns_data.empty? @transport.txn_hist(start_time, end_time, txn_hist_data) unless txn_hist_data.empty? unless traces_data.empty? traces_data.each do |trace| individual_trace_data = [trace] @transport.traces(start_time, end_time, individual_trace_data) end end @transport.error_metrics(start_time, end_time, error_metrics_data, error_requests_data) unless error_metrics_data.empty? @transport.errors(start_time, end_time, errors_data) unless errors_data.empty? @transport.metrics(start_time, end_time, metrics_data) unless metrics_data.empty? end
ensure_worker_running()
click to toggle source
# File lib/atatus/collector/base.rb, line 299 def ensure_worker_running return if worker_active? @running = true @worker = Thread.new() do debug '%s: Starting collector worker', pid_str while @running start_time = (Time.now.to_f * 1000).to_i sleep(60) collect start_time end end end
worker_active?()
click to toggle source
# File lib/atatus/collector/base.rb, line 313 def worker_active? @worker && @worker.alive? end