class OneApm::Collector::TransactionEventAggregator

Constants

OA_APDEX_PERF_ZONE_KEY
OA_CAT_ALTERNATE_PATH_HASHES_KEY
OA_CAT_PATH_HASH_KEY
OA_CAT_REFERRING_PATH_HASH_KEY
OA_CAT_TRIP_ID_KEY
OA_DURATION_KEY
OA_GUID_KEY
OA_HTTP_RESPONSE_CODE_KEY
OA_NAME_KEY
OA_REFERRING_TRANSACTION_GUID_KEY
OA_SAMPLE_TYPE

The type field of the sample

OA_SYNTHETICS_JOB_ID_KEY
OA_SYNTHETICS_MONITOR_ID_KEY
OA_SYNTHETICS_RESOURCE_ID_KEY
OA_TIMESTAMP_KEY
OA_TYPE_KEY

Strings for static keys of the sample structure

Public Class Methods

new(event_listener) click to toggle source
Calls superclass method Object::new
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 30
def initialize(event_listener)
  super()

  @enabled       = false
  @notified_full = false

  @samples            = ::OneApm::Agent::SampledBuffer.new(OneApm::Manager.config[:'analytics_events.max_samples_stored'])
  @synthetics_samples = ::OneApm::Agent::SyntheticsEventBuffer.new(OneApm::Manager.config[:'synthetics.events_limit'])

  event_listener.subscribe(:transaction_finished, &method(:on_transaction_finished))
  self.register_config_callbacks
end

Public Instance Methods

append_cat_alternate_path_hashes(sample, payload) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 206
def append_cat_alternate_path_hashes(sample, payload)
  if payload.include?(:cat_alternate_path_hashes)
    sample[OA_CAT_ALTERNATE_PATH_HASHES_KEY] = payload[:cat_alternate_path_hashes].sort.join(',')
  end
end
append_event(event) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 164
def append_event(event)
  same_spec_for_sample = @samples.select{|sample| sample.spec_name == event.spec_name}.first
  return @samples.append(event) if same_spec_for_sample.nil?
  same_spec_for_sample.event_analytic_data.concat event.event_analytic_data
end
append_http_response_code(sample, payload) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 200
def append_http_response_code(sample, payload)
  unless OneApm::Manager.config[:disable_rack_middleware]
    optionally_append(OA_HTTP_RESPONSE_CODE_KEY, :http_response_code, sample, payload)
  end
end
create_custom_parameters(payload) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 218
def create_custom_parameters(payload)
  custom_params = {}
  if OneApm::Manager.config[:'analytics_events.capture_attributes']
    custom_params.merge!(event_params(payload[:custom_params] || {}))
  end
  custom_params
end
create_main_event(payload) click to toggle source

see wiki.oneapm.me/display/SAAS/Ai-+Entrance

# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 171
def create_main_event(payload)
  OneApm::EventAnalyticSample.new(payload)
end
create_sub_events(main_event, payload) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 175
def create_sub_events main_event, payload
   payload = payload.dup
   payload[:scope] = payload[:name]
   payload[:referring_transaction_guid] = main_event.guid
   payload[:request_url] = ''

   sub_event_samples = []
   return sub_event_samples unless payload[:metrics]
   payload[:metrics].each_scoped do |metric_name, status|
    next if metric_name =~ /^Nested|View|External/
    payload[:name] = metric_name
    payload[:guid] = OneApm::Helper.generate_guid
    payload[:call_count] = status.call_count
    payload[:duration] = status.total_call_time
    sample = OneApm::EventAnalyticSample.new(payload)
    sub_event_samples << sample
  end
  sub_event_samples
end
error_times(txn_metrics) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 195
def error_times(txn_metrics)
  return txn_metrics[OA_ERROR_ALL_KEY].call_count if txn_metrics.has_key?(OA_ERROR_ALL_KEY) rescue 0
  return 0
end
harvest!() click to toggle source

Clear any existing samples, reset the last sample time, and return the previous set of samples. (Synchronized)

# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 70
def harvest!
  old_samples, sample_count, request_count, synthetics_dropped = reset!
  record_sampling_rate(request_count, sample_count) if @enabled
  record_dropped_synthetics(synthetics_dropped)
  # return old_samples.map(&:to_collector_array) if old_samples.respond_to?(:to_collector_array)
  old_samples
end
merge!(old_samples) click to toggle source

Merge samples back into the buffer, for example after a failed transmission to the collector. (Synchronized)

# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 80
def merge!(old_samples)
  self.synchronize do
    old_samples.each do |s|
      if s.respond_to?(:spec_name)
        append_event(s)
      else
        @samples.append_event(s)
      end
    end
  end
end
notify_full() click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 134
def notify_full
  OneApm::Manager.logger.debug "Transaction event capacity of #{@samples.capacity} reached, beginning sampling"
  @notified_full = true
end
on_cross_app_transaction_finished(payload) click to toggle source

Event handler for the :transaction_finished event.

# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 155
def on_cross_app_transaction_finished(payload)
  return unless @enabled
  main_event = create_main_event(payload)
  self.synchronize do
    append_event(main_event)
  end
  notify_full if !@notified_full && @samples.full?
end
on_transaction_finished(payload) click to toggle source

Event handler for the :transaction_finished event.

# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 140
def on_transaction_finished(payload)
  return unless @enabled
  main_event = create_main_event(payload)
  custom_params = create_custom_parameters(payload)
  sub_events = create_sub_events(main_event, payload)
  self.synchronize do 
     [main_event].concat(sub_events).each do |event|
        append_event(event) 
      end
   end
  notify_full if !@notified_full && @samples.full?
end
optionally_append(sample_key, payload_key, sample, payload) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 212
def optionally_append(sample_key, payload_key, sample, payload)
  if payload.include?(payload_key)
    sample[sample_key] = string(payload[payload_key])
  end
end
record_dropped_synthetics(synthetics_dropped) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 109
def record_dropped_synthetics(synthetics_dropped)
  return unless synthetics_dropped > 0

  OneApm::Manager.logger.debug("Synthetics transaction event limit (#{@samples.capacity}) reached. Further synthetics events this harvest period dropped.")

  engine = OneApm::Manager.agent.stats_engine
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/synthetics_events_dropped", synthetics_dropped)
end
record_sampling_rate(request_count, sample_count) click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 92
def record_sampling_rate(request_count, sample_count)
  request_count_lifetime = @samples.seen_lifetime
  sample_count_lifetime = @samples.captured_lifetime
  OneApm::Manager.logger.debug("Sampled %d / %d (%.1f %%) requests this cycle, %d / %d (%.1f %%) since startup" % [
    sample_count,
    request_count,
    (sample_count.to_f / request_count * 100.0),
    sample_count_lifetime,
    request_count_lifetime,
    (sample_count_lifetime.to_f / request_count_lifetime * 100.0)
  ])

  engine = OneApm::Manager.agent.stats_engine
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/requests", request_count)
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/samples", sample_count)
end
register_config_callbacks() click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 118
def register_config_callbacks
  OneApm::Manager.config.register_callback(:'analytics_events.max_samples_stored') do |max_samples|
    OneApm::Manager.logger.debug "TransactionEventAggregator max_samples set to #{max_samples}"
    self.synchronize { @samples.capacity = max_samples }
  end

  OneApm::Manager.config.register_callback(:'synthetics.events_limit') do |max_samples|
    OneApm::Manager.logger.debug "TransactionEventAggregator limit for synthetics events set to #{max_samples}"
    self.synchronize { @synthetics_samples.capacity = max_samples }
  end

  OneApm::Manager.config.register_callback(:'analytics_events.enabled') do |enabled|
    @enabled = enabled
  end
end
reset!() click to toggle source
# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 48
def reset!
  sample_count, request_count, synthetics_dropped = 0
  old_samples = nil

  self.synchronize do
    sample_count = @samples.size
    request_count = @samples.num_seen

    synthetics_dropped = @synthetics_samples.num_dropped

    old_samples = @samples.to_a + @synthetics_samples.to_a
    @samples.reset!
    @synthetics_samples.reset!

    @notified_full = false
  end

  [old_samples, sample_count, request_count, synthetics_dropped]
end
samples() click to toggle source

Fetch a copy of the sampler's gathered samples. (Synchronized)

# File lib/one_apm/collector/containers/transaction_event_aggregator.rb, line 44
def samples
  self.synchronize { @samples.to_a.concat(@synthetics_samples.to_a) }
end