class Sqreen::Runner
Main running job class for the agent
Constants
- HEARTBEAT_MAX_DELAY
Initail delay is 5 minutes
- HEARTBEAT_WARMUP
During one hour
Attributes
deliverer[R]
@return [Sqreen::Deliveries::Simple]
heartbeat_delay[RW]
instrumenter[R]
metrics_engine[RW]
next_command_results[RW]
next_metrics[RW]
running[RW]
session[R]
@return [Sqreen::Session]
Public Class Methods
new(configuration, framework, set_at_exit = true, session_class = Sqreen::Session)
click to toggle source
we may want to do that in a thread in order to prevent delaying app startup set_at_exit do not place a global at_exit (used for testing) @param [Sqreen::Frameworks::GenericFramework] framework
# File lib/sqreen/runner.rb, line 107 def initialize(configuration, framework, set_at_exit = true, session_class = Sqreen::Session) @logged_out_tried = false @configuration = configuration @framework = framework @heartbeat_delay = HEARTBEAT_MAX_DELAY @last_heartbeat_request = Time.now @next_command_results = {} @next_metrics = [] @running = true @proxy_url = @configuration.get(:proxy_url) chosen_endpoints = determine_endpoints @token = @configuration.get(:token) @app_name = @configuration.get(:app_name) @url = chosen_endpoints.control.url @cert_store = chosen_endpoints.control.ca_store Sqreen.update_whitelisted_paths([]) Sqreen.update_whitelisted_ips({}) Sqreen.update_performance_budget(nil) raise(Sqreen::TokenNotFoundException, 'no token found') unless @token Sqreen::Kit::Configuration.logger = Sqreen.log Sqreen::Kit::Configuration.ingestion_url = chosen_endpoints.ingestion.url Sqreen::Kit::Configuration.certificate_store = chosen_endpoints.ingestion.ca_store Sqreen::Kit::Configuration.proxy_url = @proxy_url Sqreen::Kit::Configuration.default_source = "sqreen:agent:ruby:#{Sqreen::VERSION}" register_exit_cb if set_at_exit self.metrics_engine = MetricsStore.new needs_weave = proc do Gem::Specification.select { |s| s.name == 'scout_apm' && Gem::Requirement.new('>= 2.5.2').satisfied_by?(Gem::Version.new(s.version)) }.any? end if @configuration.get(:weave) || needs_weave.call @instrumenter = Sqreen::Weave::Legacy::Instrumentation.new(metrics_engine) else @instrumenter = Sqreen::Legacy::Instrumentation.new(metrics_engine) end Sqreen.log.debug "Using token #{@token}" response = create_session(session_class) post_endpoint_testing_msgs(chosen_endpoints) wanted_features = response.fetch('features', {}) conf_initial_features = configuration.get(:initial_features) unless conf_initial_features.nil? begin conf_features = JSON.parse(conf_initial_features) raise 'Invalid Type' unless conf_features.is_a?(Hash) Sqreen.log.debug do "Override initial features with #{conf_features.inspect}" end wanted_features = wanted_features.merge(conf_features) rescue Sqreen.log.warn do "NOT using invalid initial features #{conf_initial_features}" end end end self.features = wanted_features @ecosystem_integration = EcosystemIntegration.new(framework, Sqreen.queue) framework.req_start_cb = @ecosystem_integration.method(:request_start) framework.req_end_cb = @ecosystem_integration.method(:request_end) # Ensure a deliverer is there unless features have set it first self.deliverer ||= Deliveries::Simple.new(session) context_infos = {} %w[rules pack_id].each do |p| context_infos[p] = response[p] unless response[p].nil? end process_commands(response.fetch('commands', []), context_infos) end
Public Instance Methods
aggregate_observations()
click to toggle source
# File lib/sqreen/runner.rb, line 458 def aggregate_observations q = Sqreen.observations_queue conv = Sqreen.time - Time.now.utc.to_f q.size.times do cat, key, obs, t = q.pop metrics_engine.update(cat, conv + t.utc.to_f, key, obs) end end
batch_events(batch_size, max_staleness = nil, use_signals = false)
click to toggle source
# File lib/sqreen/runner.rb, line 194 def batch_events(batch_size, max_staleness = nil, use_signals = false) size = batch_size.to_i if size <= 1 && use_signals Sqreen.log.warn do "Using signals with no delivery batching is unsupported. " \ "Using instead batching with batch size = 30, max_staleness = 60" end size = 30 max_staleness = 60 end self.deliverer = if size < 1 Deliveries::Simple.new(session) else staleness = max_staleness.to_i Deliveries::Batch.new(session, size, staleness) end end
call_counts_metrics_period=(value)
click to toggle source
# File lib/sqreen/runner.rb, line 248 def call_counts_metrics_period=(value) value = value.to_i return unless value > 0 # else disable collection? metrics_engine.create_metric('name' => CallCountable::COUNT_CALLS, 'period' => value, 'kind' => 'Sum') end
change_features(new_features, _context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 449 def change_features(new_features, _context_infos = {}) old = features self.features = new_features { 'was' => old, 'now' => new_features, } end
change_performance_budget(budget, _context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 406 def change_performance_budget(budget, _context_infos = {}) return false unless budget.nil? || budget.to_f > 0 if @configuration.get(:weave) prev = Sqreen::Weave::Budget.current prev = prev.to_h if prev budget_s = budget.to_f / 1000 if budget feature = features['performance_budget'] if feature budget_s = feature['threshold'] if feature.key?('threshold') ratio = feature['ratio'] if feature.key?('ratio') end Sqreen::Weave::Budget.update(threshold: budget_s, ratio: ratio) else prev = Sqreen.performance_budget Sqreen.update_performance_budget(budget) end { :was => prev } end
change_whitelisted_ips(ips, _context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 443 def change_whitelisted_ips(ips, _context_infos = {}) return false unless ips.respond_to?(:each) Sqreen.update_whitelisted_ips(ips) true end
change_whitelisted_paths(paths, _context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 400 def change_whitelisted_paths(paths, _context_infos = {}) return false unless paths.respond_to?(:each) Sqreen.update_whitelisted_paths(paths) true end
config_binned_metrics(level, base, factor, base_pct, factor_pct)
click to toggle source
# File lib/sqreen/runner.rb, line 265 def config_binned_metrics(level, base, factor, base_pct, factor_pct) level = level.to_i if level <= 0 Sqreen.log.info('Disabling binned metrics') PerformanceNotifications::BinnedMetrics.disable else Sqreen.log.info('Enabling binned metrics') Sqreen.log.warn("Unknown value for perf_level: #{level}. Treating as 1") unless level == 1 PerformanceNotifications::BinnedMetrics.enable( metrics_engine, PERF_METRICS_PERIOD, base.to_f, factor.to_f, base_pct.to_f, factor_pct.to_f ) end end
create_session(session_class)
click to toggle source
# File lib/sqreen/runner.rb, line 184 def create_session(session_class) @session = session_class.new(@url, @cert_store, @token, @app_name, @proxy_url) session.login(@framework) end
deliver_metrics_as_event()
click to toggle source
# File lib/sqreen/runner.rb, line 360 def deliver_metrics_as_event # this is disastrous withe simple delivery strategy, # as each aggregated metric would trigger an http request # Sending of metrics is therefore not supported with simple delivery strategy # TODO: Confirm that only batch is used in production next_metrics.each { |x| deliverer.post_event(x) } end
deliverer=(new_deliverer)
click to toggle source
# File lib/sqreen/runner.rb, line 189 def deliverer=(new_deliverer) deliverer.drain if deliverer @deliverer = new_deliverer end
do_heartbeat()
click to toggle source
# File lib/sqreen/runner.rb, line 346 def do_heartbeat @last_heartbeat_request = Time.now @next_metrics.concat(metrics_engine.publish(false)) if metrics_engine metrics_in_hb = use_signals? ? nil : next_metrics res = session.heartbeat(next_command_results, metrics_in_hb) next_command_results.clear deliver_metrics_as_event if use_signals? next_metrics.clear process_commands(res['commands']) end
exit_from_sinatra_startup?()
click to toggle source
Sinatra is using at_exit to run the application, see: github.com/sinatra/sinatra/blob/cd503e6c590cd48c2c9bb7869522494bfc62cb14/lib/sinatra/main.rb#L25
# File lib/sqreen/runner.rb, line 512 def exit_from_sinatra_startup? defined?(Sinatra::Application) && Sinatra::Application.respond_to?(:run?) && !Sinatra::Application.run? end
features(_context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 368 def features(_context_infos = {}) Sqreen.features end
features=(features)
click to toggle source
# File lib/sqreen/runner.rb, line 376 def features=(features) Sqreen.update_features(features) session.request_compression = features['request_compression'] if session session.use_signals = use_signals? self.performance_metrics_period = features['performance_metrics_period'] unless @configuration.get(:weave) config_binned_metrics(features['perf_level'] || DEFAULT_PERF_LEVEL, features['perf_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_BASE, features['perf_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_UNIT, features['perf_pct_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_BASE, features['perf_pct_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_UNIT, ) end self.call_counts_metrics_period = features['call_counts_metrics_period'] hd = features['heartbeat_delay'].to_i self.heartbeat_delay = hd if hd > 0 return if features['batch_size'].nil? batch_events(features['batch_size'], features['max_staleness'], use_signals?) end
handle_event(event)
click to toggle source
# File lib/sqreen/runner.rb, line 498 def handle_event(event) if event == METRICS_EVENT aggregate_observations else @deliverer.post_event(event) end end
heartbeat_needed?()
click to toggle source
# File lib/sqreen/runner.rb, line 467 def heartbeat_needed? (@last_heartbeat_request + heartbeat_delay) < Time.now end
load_rules(context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 214 def load_rules(context_infos = {}) rules_pack = context_infos['rules'] rulespack_id = context_infos['pack_id'] if rules_pack.nil? || rulespack_id.nil? session_rules = session.rules rules_pack = session_rules['rules'] rulespack_id = session_rules['pack_id'] elsif @configuration.get(:rules_dump) rules_dir = (defined?(Rails) ? Rails.root : Pathname.pwd) + 'tmp/sqreen/rules' FileUtils.mkdir_p(rules_dir.to_s) File.open("#{rules_dir}/#{rulespack_id}.json", "wb") { |f| f.write(JSON.pretty_generate(rules_pack)) } FileUtils.mkdir_p("#{rules_dir}/#{rulespack_id}") rules_pack.each do |r| r = r.dup r['rulespack_id'] = rulespack_id File.open("#{rules_dir}/#{rulespack_id}/#{r['name']}.json", "wb") { |f| f.write(JSON.pretty_generate(r)) } end end rules = rules_pack.each { |r| r['rulespack_id'] = rulespack_id } Sqreen.log.info { format('retrieved rulespack id: %s', rulespack_id) } Sqreen.log.debug { format('retrieved %d rules', rules.size) } local_rules = Sqreen::Rules.local(@configuration) || [] rules += local_rules. select { |rule| rule['enabled'] }. each { |r| r['rulespack_id'] = 'local' } Sqreen.log.debug do format('rules: %s', rules. sort_by { |r| r['name'] }. map { |r| format('(%s, %s, %s)', r[Rules::Attrs::NAME], r.to_json.size, r[Rules::Attrs::BLOCK]) }. join(', ')) end [rulespack_id, rules] end
logout(retrying = true)
click to toggle source
# File lib/sqreen/runner.rb, line 532 def logout(retrying = true) return unless session Sqreen.log.debug("Logging out") if @framework.development? @running = false return end if @logged_out_tried Sqreen.log.debug('Not running logout twice') return end @logged_out_tried = true @deliverer.drain if @deliverer aggregate_observations session.post_metrics(metrics_engine.publish) if metrics_engine session.logout(retrying) @running = false end
performance_metrics_period=(value)
click to toggle source
# File lib/sqreen/runner.rb, line 256 def performance_metrics_period=(value) value = value.to_i if value > 0 PerformanceNotifications::Metrics.enable(metrics_engine, value) else PerformanceNotifications::Metrics.disable end end
periodic_cleanup()
click to toggle source
# File lib/sqreen/runner.rb, line 489 def periodic_cleanup # Nothing occured: # tick delivery, aggregates_metrics # issue a simple heartbeat if it's time (which may return commands) @deliverer.tick aggregate_observations do_heartbeat if heartbeat_needed? end
process_commands(commands, context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 340 def process_commands(commands, context_infos = {}) return if commands.nil? || commands.empty? res = RemoteCommand.process_list(self, commands, context_infos) @next_command_results = res end
register_exit_cb(try_again = true)
click to toggle source
# File lib/sqreen/runner.rb, line 551 def register_exit_cb(try_again = true) at_exit do if exit_from_sinatra_startup? && try_again register_exit_cb(false) else begin logout rescue StandardError => e Sqreen.log.debug(e.inspect) Sqreen.log.debug(e.backtrace) nil end end end end
reload_actions(_context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 310 def reload_actions(_context_infos = {}) Sqreen.log.debug 'Reloading actions' data = session.get_actionspack unless data.respond_to?(:[]) && data['status'] Sqreen.log.warn('Could not load actions') return RemoteCommand::FailureOutput.new( :error => 'Could not load actions from /actionspack' ) end action_hashes = data['actions'] unless action_hashes.respond_to? :each Sqreen.log.warn('No action definitions in response') return RemoteCommand::FailureOutput.new( :error => 'No action definitions in response' ) end Sqreen.log.debug("Loading actions from hashes #{action_hashes}") unsupported = load_actions(action_hashes) if unsupported.empty? true else RemoteCommand::FailureOutput.new(:unsupported_actions => unsupported.to_a) end end
reload_rules(_context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 300 def reload_rules(_context_infos = {}) Sqreen.log.debug 'Reloading rules' rulespack_id, rules = load_rules instrumenter.remove_all_callbacks @framework.instrument_when_ready!(instrumenter, rules) Sqreen.log.debug 'Rules reloaded' rulespack_id.to_s end
remove_instrumentation(_context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 292 def remove_instrumentation(_context_infos = {}) Sqreen.log.debug 'Removing instrumentation' instrumenter.remove_all_callbacks Sqreen::Actions::Repository.clear Sqreen.log.debug 'Instrumentation removed' true end
restart(_context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 523 def restart(_context_infos = {}) shutdown heartbeat_delay = @heartbeat_delay Thread.new do sleep(2 * heartbeat_delay) Sqreen::Worker.start(Sqreen.framework) end end
run_watcher()
click to toggle source
# File lib/sqreen/runner.rb, line 506 def run_watcher run_watcher_once while running end
run_watcher_once()
click to toggle source
# File lib/sqreen/runner.rb, line 471 def run_watcher_once event = Timeout.timeout(heartbeat_delay) do Sqreen.queue.pop end rescue Timeout::Error periodic_cleanup else handle_event(event) if heartbeat_needed? # Also aggregate/post metrics when cleanup has # not been done for a long time Sqreen.log.debug 'Forced an heartbeat' periodic_cleanup # will trigger do_heartbeat since it's time end ensure PerformanceNotifications::BinnedMetrics.finish_watcher_run end
setup_instrumentation(context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 280 def setup_instrumentation(context_infos = {}) Sqreen.log.info 'Setting up instrumentation' rulespack_id, rules = load_rules(context_infos) @framework.instrument_when_ready!(instrumenter, rules) Sqreen.log.info 'Instrumentation set up' # XXX: ecosystem instrumentation should likely be deferred # the same way the rest might be @ecosystem_integration.init rulespack_id.to_s end
shutdown(_context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 518 def shutdown(_context_infos = {}) remove_instrumentation logout end
tracing_enable(tracing_id_prefix, sampling_config, _context_infos = {})
click to toggle source
@param [String] tracing_id_prefix @param [Array<Hash{String=>Object}>] sampling_config
# File lib/sqreen/runner.rb, line 432 def tracing_enable(tracing_id_prefix, sampling_config, _context_infos = {}) @ecosystem_integration.handle_tracing_command(tracing_id_prefix, sampling_config) { status: true } end
upload_bundle(_context_infos = {})
click to toggle source
# File lib/sqreen/runner.rb, line 437 def upload_bundle(_context_infos = {}) t = Time.now session.post_bundle(RuntimeInfos.dependencies_signature, RuntimeInfos.dependencies) Time.now - t end
use_signals?()
click to toggle source
# File lib/sqreen/runner.rb, line 372 def use_signals? features.fetch('use_signals', DEFAULT_USE_SIGNALS) end
Private Instance Methods
determine_endpoints()
click to toggle source
# File lib/sqreen/runner.rb, line 578 def determine_endpoints # there's no sniffing going on; just a misnamed config setting if @configuration.get(:no_sniff_domains) # reproduces behaviour before endpoint testing was introduced EndpointTesting.no_test_endpoints(@configuration.get(:url), @configuration.get(:ingestion_url)) else EndpointTesting.test_endpoints(@proxy_url, @configuration.get(:url), @configuration.get(:ingestion_url)) end end
load_actions(hashes)
click to toggle source
# File lib/sqreen/runner.rb, line 591 def load_actions(hashes) unsupported = Set.new new_repos = Sqreen::Actions::Repository.new actions = hashes.map do |h| begin act = Sqreen::Actions.deserialize_action(h) new_repos.add h['parameters'], act act rescue Sqreen::Actions::UnknownActionType => e Sqreen.log.warn("Unsupported action type: #{e.action_type}") unsupported << e.action_type nil rescue => e raise Sqreen::Exception, "Invalid action hash: #{h}: #{e.message}" end end actions = actions.reject(&:nil?) Sqreen.log.debug("Added #{actions.size} valid actions") Sqreen::Actions::Repository.current = new_repos unsupported end
post_endpoint_testing_msgs(chosen_endpoints)
click to toggle source
# File lib/sqreen/runner.rb, line 569 def post_endpoint_testing_msgs(chosen_endpoints) chosen_endpoints.messages.each do |msg| session.post_agent_message(@framework, msg) end rescue => e Sqreen.log.warn "Error submitting agent message: #{e}" RemoteException.record(e) end