class Sigurd::Executor

Class that takes a list of “runners” and runs them on a loop until told to stop. Each runner is given its own thread. Runners need to define a “start” and “stop” method.

Attributes

runners[RW]

@return [Array<#start, stop, id>]

Public Class Methods

new(runners, sleep_seconds: nil, logger: Logger.new(STDOUT)) click to toggle source

@param runners [Array<#start, stop, id>] A list of objects that can be started or stopped. @param logger [Logger] @param sleep_seconds [Integer] Use a fixed time to sleep between failed runs instead of using an exponential backoff.

# File lib/sigurd/executor.rb, line 20
def initialize(runners, sleep_seconds: nil, logger: Logger.new(STDOUT))
  @threads = Concurrent::Array.new
  @runners = runners
  @logger = logger
  @sleep_seconds = sleep_seconds
end

Public Instance Methods

start() click to toggle source

Start the executor.

# File lib/sigurd/executor.rb, line 28
def start
  @logger.info('Starting executor')
  @signal_to_stop = false
  @threads.clear
  @thread_pool = Concurrent::FixedThreadPool.new(@runners.size)

  @runners.each do |runner|
    @thread_pool.post do
      thread = Thread.current
      thread.abort_on_exception = true
      @threads << thread
      run_object(runner)
    end
  end

  true
end
stop() click to toggle source

Stop the executor.

# File lib/sigurd/executor.rb, line 47
def stop
  return if @signal_to_stop

  @logger.info('Stopping executor')
  @signal_to_stop = true
  @runners.each(&:stop)
  @threads.select(&:alive?).each do |thread|
    begin
      thread.wakeup
    rescue StandardError
      nil
    end
  end
  @thread_pool&.shutdown
  @thread_pool&.wait_for_termination
  @logger.info('Executor stopped')
end

Private Instance Methods

create_exponential_backoff() click to toggle source

@return [ExponentialBackoff]

# File lib/sigurd/executor.rb, line 95
def create_exponential_backoff
  min = 1
  max = 60
  ExponentialBackoff.new(min, max).tap do |backoff|
    backoff.randomize_factor = rand
  end
end
error_metadata(exception) click to toggle source

@param exception [Throwable] @return [Hash]

# File lib/sigurd/executor.rb, line 69
def error_metadata(exception)
  {
    exception_class: exception.class.name,
    exception_message: exception.message,
    backtrace: exception.backtrace
  }
end
handle_crashed_runner(runner, error, retry_count) click to toggle source

When “runner#start” is interrupted / crashes we assume it's safe to be called again

# File lib/sigurd/executor.rb, line 105
def handle_crashed_runner(runner, error, retry_count)
  interval = if @sleep_seconds
               @sleep_seconds
             else
               backoff = create_exponential_backoff
               backoff.interval_at(retry_count).round(2)
             end

  metadata = {
    listener_id: runner.id,
    retry_count: retry_count,
    waiting_time: interval
  }.merge(error_metadata(error))

  @logger.error("Runner crashed, waiting #{interval}s (#{error.message}) #{metadata}")
  sleep(interval)
end
run_object(runner) click to toggle source
# File lib/sigurd/executor.rb, line 77
def run_object(runner)
  retry_count = 0

  begin
    @logger.info("Running #{runner.id}")
    runner.start
    retry_count = 0 # success - reset retry count
  rescue Exception => e
    handle_crashed_runner(runner, e, retry_count)
    retry_count += 1
    retry unless @signal_to_stop
  end
rescue Exception => e
  @logger.error("Failed to run executor (#{e.message}) #{error_metadata(e)}")
  raise e
end