class Sqreen::Runner

Main running job class for the agent

Constants

HEARTBEAT_MAX_DELAY

Initail delay is 5 minutes

HEARTBEAT_WARMUP

During one hour

Attributes

deliverer[R]

@return [Sqreen::Deliveries::Simple]

heartbeat_delay[RW]
instrumenter[R]
metrics_engine[RW]
next_command_results[RW]
next_metrics[RW]
running[RW]
session[R]

@return [Sqreen::Session]

Public Class Methods

new(configuration, framework, set_at_exit = true, session_class = Sqreen::Session) click to toggle source

we may want to do that in a thread in order to prevent delaying app startup set_at_exit do not place a global at_exit (used for testing) @param [Sqreen::Frameworks::GenericFramework] framework

# File lib/sqreen/runner.rb, line 107
def initialize(configuration, framework, set_at_exit = true, session_class = Sqreen::Session)
  @logged_out_tried = false
  @configuration = configuration
  @framework = framework
  @heartbeat_delay = HEARTBEAT_MAX_DELAY
  @last_heartbeat_request = Time.now
  @next_command_results = {}
  @next_metrics = []
  @running = true

  @proxy_url = @configuration.get(:proxy_url)
  chosen_endpoints = determine_endpoints

  @token = @configuration.get(:token)
  @app_name = @configuration.get(:app_name)
  @url = chosen_endpoints.control.url
  @cert_store = chosen_endpoints.control.ca_store

  Sqreen.update_whitelisted_paths([])
  Sqreen.update_whitelisted_ips({})
  Sqreen.update_performance_budget(nil)
  raise(Sqreen::TokenNotFoundException, 'no token found') unless @token

  Sqreen::Kit::Configuration.logger = Sqreen.log
  Sqreen::Kit::Configuration.ingestion_url = chosen_endpoints.ingestion.url
  Sqreen::Kit::Configuration.certificate_store = chosen_endpoints.ingestion.ca_store
  Sqreen::Kit::Configuration.proxy_url = @proxy_url
  Sqreen::Kit::Configuration.default_source = "sqreen:agent:ruby:#{Sqreen::VERSION}"

  register_exit_cb if set_at_exit

  self.metrics_engine = MetricsStore.new

  needs_weave = proc do
    Gem::Specification.select { |s| s.name == 'scout_apm' && Gem::Requirement.new('>= 2.5.2').satisfied_by?(Gem::Version.new(s.version)) }.any?
  end

  if @configuration.get(:weave) || needs_weave.call
    @instrumenter = Sqreen::Weave::Legacy::Instrumentation.new(metrics_engine)
  else
    @instrumenter = Sqreen::Legacy::Instrumentation.new(metrics_engine)
  end

  Sqreen.log.debug "Using token #{@token}"
  response = create_session(session_class)
  post_endpoint_testing_msgs(chosen_endpoints)
  wanted_features = response.fetch('features', {})
  conf_initial_features = configuration.get(:initial_features)
  unless conf_initial_features.nil?
    begin
      conf_features = JSON.parse(conf_initial_features)
      raise 'Invalid Type' unless conf_features.is_a?(Hash)
      Sqreen.log.debug do
        "Override initial features with #{conf_features.inspect}"
      end
      wanted_features = wanted_features.merge(conf_features)
    rescue
      Sqreen.log.warn do
        "NOT using invalid initial features #{conf_initial_features}"
      end
    end
  end
  self.features = wanted_features

  @ecosystem_integration = EcosystemIntegration.new(framework, Sqreen.queue)
  framework.req_start_cb = @ecosystem_integration.method(:request_start)
  framework.req_end_cb = @ecosystem_integration.method(:request_end)

  # Ensure a deliverer is there unless features have set it first
  self.deliverer ||= Deliveries::Simple.new(session)
  context_infos = {}
  %w[rules pack_id].each do |p|
    context_infos[p] = response[p] unless response[p].nil?
  end
  process_commands(response.fetch('commands', []), context_infos)
end

Public Instance Methods

aggregate_observations() click to toggle source
# File lib/sqreen/runner.rb, line 458
def aggregate_observations
  q = Sqreen.observations_queue
  conv = Sqreen.time - Time.now.utc.to_f
  q.size.times do
    cat, key, obs, t = q.pop
    metrics_engine.update(cat, conv + t.utc.to_f, key, obs)
  end
end
batch_events(batch_size, max_staleness = nil, use_signals = false) click to toggle source
# File lib/sqreen/runner.rb, line 194
def batch_events(batch_size, max_staleness = nil, use_signals = false)
  size = batch_size.to_i

  if size <= 1 && use_signals
    Sqreen.log.warn do
      "Using signals with no delivery batching is unsupported. " \
      "Using instead batching with batch size = 30, max_staleness = 60"
    end
    size = 30
    max_staleness = 60
  end

  self.deliverer = if size < 1
                     Deliveries::Simple.new(session)
                   else
                     staleness = max_staleness.to_i
                     Deliveries::Batch.new(session, size, staleness)
                   end
end
call_counts_metrics_period=(value) click to toggle source
# File lib/sqreen/runner.rb, line 248
def call_counts_metrics_period=(value)
  value = value.to_i
  return unless value > 0 # else disable collection?
  metrics_engine.create_metric('name' => CallCountable::COUNT_CALLS,
                               'period' => value,
                               'kind' => 'Sum')
end
change_features(new_features, _context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 449
def change_features(new_features, _context_infos = {})
  old = features
  self.features = new_features
  {
    'was' => old,
    'now' => new_features,
  }
end
change_performance_budget(budget, _context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 406
def change_performance_budget(budget, _context_infos = {})
  return false unless budget.nil? || budget.to_f > 0

  if @configuration.get(:weave)
    prev = Sqreen::Weave::Budget.current
    prev = prev.to_h if prev

    budget_s = budget.to_f / 1000 if budget

    feature = features['performance_budget']
    if feature
      budget_s = feature['threshold'] if feature.key?('threshold')
      ratio = feature['ratio'] if feature.key?('ratio')
    end

    Sqreen::Weave::Budget.update(threshold: budget_s, ratio: ratio)
  else
    prev = Sqreen.performance_budget
    Sqreen.update_performance_budget(budget)
  end

  { :was => prev }
end
change_whitelisted_ips(ips, _context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 443
def change_whitelisted_ips(ips, _context_infos = {})
  return false unless ips.respond_to?(:each)
  Sqreen.update_whitelisted_ips(ips)
  true
end
change_whitelisted_paths(paths, _context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 400
def change_whitelisted_paths(paths, _context_infos = {})
  return false unless paths.respond_to?(:each)
  Sqreen.update_whitelisted_paths(paths)
  true
end
config_binned_metrics(level, base, factor, base_pct, factor_pct) click to toggle source
# File lib/sqreen/runner.rb, line 265
def config_binned_metrics(level, base, factor, base_pct, factor_pct)
  level = level.to_i
  if level <= 0
    Sqreen.log.info('Disabling binned metrics')
    PerformanceNotifications::BinnedMetrics.disable
  else
    Sqreen.log.info('Enabling binned metrics')
    Sqreen.log.warn("Unknown value for perf_level: #{level}. Treating as 1") unless level == 1
    PerformanceNotifications::BinnedMetrics.enable(
      metrics_engine, PERF_METRICS_PERIOD, base.to_f, factor.to_f, base_pct.to_f, factor_pct.to_f
    )
  end
end
create_session(session_class) click to toggle source
# File lib/sqreen/runner.rb, line 184
def create_session(session_class)
  @session = session_class.new(@url, @cert_store, @token, @app_name, @proxy_url)
  session.login(@framework)
end
deliver_metrics_as_event() click to toggle source
# File lib/sqreen/runner.rb, line 360
def deliver_metrics_as_event
  # this is disastrous withe simple delivery strategy,
  # as each aggregated metric would trigger an http request
  # Sending of metrics is therefore not supported with simple delivery strategy
  # TODO: Confirm that only batch is used in production
  next_metrics.each { |x| deliverer.post_event(x) }
end
deliverer=(new_deliverer) click to toggle source
# File lib/sqreen/runner.rb, line 189
def deliverer=(new_deliverer)
  deliverer.drain if deliverer
  @deliverer = new_deliverer
end
do_heartbeat() click to toggle source
# File lib/sqreen/runner.rb, line 346
def do_heartbeat
  @last_heartbeat_request = Time.now
  @next_metrics.concat(metrics_engine.publish(false)) if metrics_engine
  metrics_in_hb = use_signals? ? nil : next_metrics

  res = session.heartbeat(next_command_results, metrics_in_hb)
  next_command_results.clear

  deliver_metrics_as_event if use_signals?
  next_metrics.clear

  process_commands(res['commands'])
end
exit_from_sinatra_startup?() click to toggle source

Sinatra is using at_exit to run the application, see: github.com/sinatra/sinatra/blob/cd503e6c590cd48c2c9bb7869522494bfc62cb14/lib/sinatra/main.rb#L25

# File lib/sqreen/runner.rb, line 512
def exit_from_sinatra_startup?
  defined?(Sinatra::Application) &&
    Sinatra::Application.respond_to?(:run?) &&
    !Sinatra::Application.run?
end
features(_context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 368
def features(_context_infos = {})
  Sqreen.features
end
features=(features) click to toggle source
# File lib/sqreen/runner.rb, line 376
def features=(features)
  Sqreen.update_features(features)
  session.request_compression = features['request_compression'] if session
  session.use_signals = use_signals?
  self.performance_metrics_period = features['performance_metrics_period']

  unless @configuration.get(:weave)

  config_binned_metrics(features['perf_level'] || DEFAULT_PERF_LEVEL,
                        features['perf_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_BASE,
                        features['perf_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_UNIT,
                        features['perf_pct_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_BASE,
                        features['perf_pct_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_UNIT,
                       )

  end

  self.call_counts_metrics_period = features['call_counts_metrics_period']
  hd = features['heartbeat_delay'].to_i
  self.heartbeat_delay = hd if hd > 0
  return if features['batch_size'].nil?
  batch_events(features['batch_size'], features['max_staleness'], use_signals?)
end
handle_event(event) click to toggle source
# File lib/sqreen/runner.rb, line 498
def handle_event(event)
  if event == METRICS_EVENT
    aggregate_observations
  else
    @deliverer.post_event(event)
  end
end
heartbeat_needed?() click to toggle source
# File lib/sqreen/runner.rb, line 467
def heartbeat_needed?
  (@last_heartbeat_request + heartbeat_delay) < Time.now
end
load_rules(context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 214
def load_rules(context_infos = {})
  rules_pack = context_infos['rules']
  rulespack_id = context_infos['pack_id']
  if rules_pack.nil? || rulespack_id.nil?
    session_rules = session.rules
    rules_pack = session_rules['rules']
    rulespack_id = session_rules['pack_id']
  elsif @configuration.get(:rules_dump)
    rules_dir = (defined?(Rails) ? Rails.root : Pathname.pwd) + 'tmp/sqreen/rules'
    FileUtils.mkdir_p(rules_dir.to_s)
    File.open("#{rules_dir}/#{rulespack_id}.json", "wb") { |f| f.write(JSON.pretty_generate(rules_pack)) }
    FileUtils.mkdir_p("#{rules_dir}/#{rulespack_id}")
    rules_pack.each do |r|
      r = r.dup
      r['rulespack_id'] = rulespack_id
      File.open("#{rules_dir}/#{rulespack_id}/#{r['name']}.json", "wb") { |f| f.write(JSON.pretty_generate(r)) }
    end
  end
  rules = rules_pack.each { |r| r['rulespack_id'] = rulespack_id }
  Sqreen.log.info { format('retrieved rulespack id: %s', rulespack_id) }
  Sqreen.log.debug { format('retrieved %d rules', rules.size) }
  local_rules = Sqreen::Rules.local(@configuration) || []
  rules += local_rules.
           select { |rule| rule['enabled'] }.
           each { |r| r['rulespack_id'] = 'local' }
  Sqreen.log.debug do
    format('rules: %s', rules.
           sort_by { |r| r['name'] }.
           map { |r| format('(%s, %s, %s)', r[Rules::Attrs::NAME], r.to_json.size, r[Rules::Attrs::BLOCK]) }.
           join(', '))
  end
  [rulespack_id, rules]
end
logout(retrying = true) click to toggle source
# File lib/sqreen/runner.rb, line 532
def logout(retrying = true)
  return unless session
  Sqreen.log.debug("Logging out")
  if @framework.development?
    @running = false
    return
  end
  if @logged_out_tried
    Sqreen.log.debug('Not running logout twice')
    return
  end
  @logged_out_tried = true
  @deliverer.drain if @deliverer
  aggregate_observations
  session.post_metrics(metrics_engine.publish) if metrics_engine
  session.logout(retrying)
  @running = false
end
performance_metrics_period=(value) click to toggle source
# File lib/sqreen/runner.rb, line 256
def performance_metrics_period=(value)
  value = value.to_i
  if value > 0
    PerformanceNotifications::Metrics.enable(metrics_engine, value)
  else
    PerformanceNotifications::Metrics.disable
  end
end
periodic_cleanup() click to toggle source
# File lib/sqreen/runner.rb, line 489
def periodic_cleanup
  # Nothing occured:
  # tick delivery, aggregates_metrics
  # issue a simple heartbeat if it's time (which may return commands)
  @deliverer.tick
  aggregate_observations
  do_heartbeat if heartbeat_needed?
end
process_commands(commands, context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 340
def process_commands(commands, context_infos = {})
  return if commands.nil? || commands.empty?
  res = RemoteCommand.process_list(self, commands, context_infos)
  @next_command_results = res
end
register_exit_cb(try_again = true) click to toggle source
# File lib/sqreen/runner.rb, line 551
def register_exit_cb(try_again = true)
  at_exit do
    if exit_from_sinatra_startup? && try_again
      register_exit_cb(false)
    else
      begin
        logout
      rescue StandardError => e
        Sqreen.log.debug(e.inspect)
        Sqreen.log.debug(e.backtrace)
        nil
      end
    end
  end
end
reload_actions(_context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 310
def reload_actions(_context_infos = {})
  Sqreen.log.debug 'Reloading actions'

  data = session.get_actionspack

  unless data.respond_to?(:[]) && data['status']
    Sqreen.log.warn('Could not load actions')
    return RemoteCommand::FailureOutput.new(
      :error => 'Could not load actions from /actionspack'
    )
  end

  action_hashes = data['actions']
  unless action_hashes.respond_to? :each
    Sqreen.log.warn('No action definitions in response')
    return RemoteCommand::FailureOutput.new(
      :error => 'No action definitions in response'
    )
  end
  Sqreen.log.debug("Loading actions from hashes #{action_hashes}")

  unsupported = load_actions(action_hashes)

  if unsupported.empty?
    true
  else
    RemoteCommand::FailureOutput.new(:unsupported_actions => unsupported.to_a)
  end
end
reload_rules(_context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 300
def reload_rules(_context_infos = {})
  Sqreen.log.debug 'Reloading rules'
  rulespack_id, rules = load_rules
  instrumenter.remove_all_callbacks

  @framework.instrument_when_ready!(instrumenter, rules)
  Sqreen.log.debug 'Rules reloaded'
  rulespack_id.to_s
end
remove_instrumentation(_context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 292
def remove_instrumentation(_context_infos = {})
  Sqreen.log.debug 'Removing instrumentation'
  instrumenter.remove_all_callbacks
  Sqreen::Actions::Repository.clear
  Sqreen.log.debug 'Instrumentation removed'
  true
end
restart(_context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 523
def restart(_context_infos = {})
  shutdown
  heartbeat_delay = @heartbeat_delay
  Thread.new do
    sleep(2 * heartbeat_delay)
    Sqreen::Worker.start(Sqreen.framework)
  end
end
run_watcher() click to toggle source
# File lib/sqreen/runner.rb, line 506
def run_watcher
  run_watcher_once while running
end
run_watcher_once() click to toggle source
# File lib/sqreen/runner.rb, line 471
def run_watcher_once
  event = Timeout.timeout(heartbeat_delay) do
    Sqreen.queue.pop
  end
rescue Timeout::Error
  periodic_cleanup
else
  handle_event(event)
  if heartbeat_needed?
    # Also aggregate/post metrics when cleanup has
    # not been done for a long time
    Sqreen.log.debug 'Forced an heartbeat'
    periodic_cleanup # will trigger do_heartbeat since it's time
  end
ensure
  PerformanceNotifications::BinnedMetrics.finish_watcher_run
end
setup_instrumentation(context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 280
def setup_instrumentation(context_infos = {})
  Sqreen.log.info 'Setting up instrumentation'
  rulespack_id, rules = load_rules(context_infos)
  @framework.instrument_when_ready!(instrumenter, rules)
  Sqreen.log.info 'Instrumentation set up'

  # XXX: ecosystem instrumentation should likely be deferred
  #      the same way the rest might be
  @ecosystem_integration.init
  rulespack_id.to_s
end
shutdown(_context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 518
def shutdown(_context_infos = {})
  remove_instrumentation
  logout
end
tracing_enable(tracing_id_prefix, sampling_config, _context_infos = {}) click to toggle source

@param [String] tracing_id_prefix @param [Array<Hash{String=>Object}>] sampling_config

# File lib/sqreen/runner.rb, line 432
def tracing_enable(tracing_id_prefix, sampling_config, _context_infos = {})
  @ecosystem_integration.handle_tracing_command(tracing_id_prefix, sampling_config)
  { status: true }
end
upload_bundle(_context_infos = {}) click to toggle source
# File lib/sqreen/runner.rb, line 437
def upload_bundle(_context_infos = {})
  t = Time.now
  session.post_bundle(RuntimeInfos.dependencies_signature, RuntimeInfos.dependencies)
  Time.now - t
end
use_signals?() click to toggle source
# File lib/sqreen/runner.rb, line 372
def use_signals?
  features.fetch('use_signals', DEFAULT_USE_SIGNALS)
end

Private Instance Methods

determine_endpoints() click to toggle source
# File lib/sqreen/runner.rb, line 578
def determine_endpoints
  # there's no sniffing going on; just a misnamed config setting
  if @configuration.get(:no_sniff_domains)
    # reproduces behaviour before endpoint testing was introduced
    EndpointTesting.no_test_endpoints(@configuration.get(:url),
                                      @configuration.get(:ingestion_url))
  else
    EndpointTesting.test_endpoints(@proxy_url,
                                   @configuration.get(:url),
                                   @configuration.get(:ingestion_url))
  end
end
load_actions(hashes) click to toggle source
# File lib/sqreen/runner.rb, line 591
def load_actions(hashes)
  unsupported = Set.new

  new_repos = Sqreen::Actions::Repository.new

  actions = hashes.map do |h|
    begin
      act = Sqreen::Actions.deserialize_action(h)
      new_repos.add h['parameters'], act
      act
    rescue Sqreen::Actions::UnknownActionType => e
      Sqreen.log.warn("Unsupported action type: #{e.action_type}")
      unsupported << e.action_type
      nil
    rescue => e
      raise Sqreen::Exception, "Invalid action hash: #{h}: #{e.message}"
    end
  end

  actions = actions.reject(&:nil?)
  Sqreen.log.debug("Added #{actions.size} valid actions")

  Sqreen::Actions::Repository.current = new_repos

  unsupported
end
post_endpoint_testing_msgs(chosen_endpoints) click to toggle source
# File lib/sqreen/runner.rb, line 569
def post_endpoint_testing_msgs(chosen_endpoints)
  chosen_endpoints.messages.each do |msg|
    session.post_agent_message(@framework, msg)
  end
rescue => e
  Sqreen.log.warn "Error submitting agent message: #{e}"
  RemoteException.record(e)
end