class SerialScheduler

Constants

VERSION

Public Class Methods

new(logger: Logger.new(STDOUT), error_handler: ->(e) { raise e } click to toggle source
# File lib/serial_scheduler.rb, line 46
def initialize(logger: Logger.new(STDOUT), error_handler: ->(e) { raise e })
  @logger = logger
  @error_handler = error_handler

  @producers = []
  @stopped = false
end

Public Instance Methods

add(*args, &block) click to toggle source

start a new thread that enqueues an execution at given interval

# File lib/serial_scheduler.rb, line 55
def add(*args, &block)
  @producers << Producer.new(*args, &block)
end
run() click to toggle source
# File lib/serial_scheduler.rb, line 59
def run
  now = Time.now.to_i
  @producers.each { |p| p.start now }

  loop do
    now = Time.now.to_i
    earliest = @producers.min_by(&:next)
    wait = [earliest.next - now, 0].max # do not wait when overdue
    target = Time.at(now + wait)

    if wait > 0
      @logger.info message: "Waiting to start job", job: earliest.name, in: wait, at: target.to_s
      loop do
        break if @stopped || Time.now >= target # need to re-check or long waits will drift by .3%
        sleep 1
      end
    end
    break if @stopped

    earliest.next!
    execute_in_fork earliest
  end
end
stop() click to toggle source
# File lib/serial_scheduler.rb, line 83
def stop
  @stopped = true
end

Private Instance Methods

execute_in_fork(producer) click to toggle source
# File lib/serial_scheduler.rb, line 89
def execute_in_fork(producer)
  @logger.info message: "Executing job", job: producer.name
  pid = fork do
    begin
      Timeout.timeout producer.timeout, &producer.block
    rescue StandardError => e # do not rescue `Exception` so it can be `Interrupt`-ed
      @logger.error message: "Error in job", job: producer.name, error: e.message
      @error_handler.call(e)
    end
  end
  Process.wait pid
end