class Workers::Scheduler

Public Class Methods

new(options = {}) click to toggle source
# File lib/workers/scheduler.rb, line 5
def initialize(options = {})
  @logger = Workers::LogProxy.new(options[:logger])
  @pool = options[:pool] || Workers::Pool.new
  @schedule = SortedSet.new
  @mutex = Mutex.new
  @thread = Thread.new { start_loop }

  nil
end

Public Instance Methods

alive?() click to toggle source
# File lib/workers/scheduler.rb, line 49
def alive?
  @thread && @thread.alive?
end
dispose() click to toggle source
# File lib/workers/scheduler.rb, line 39
def dispose
  @mutex.synchronize do
    @pool.shutdown
    @pool.join
    @thread.kill
  end

  nil
end
schedule(timer) click to toggle source
# File lib/workers/scheduler.rb, line 15
def schedule(timer)
  @mutex.synchronize do
    @schedule << timer
  end

  wakeup

  nil
end
unschedule(timer) click to toggle source
# File lib/workers/scheduler.rb, line 25
def unschedule(timer)
  @mutex.synchronize do
    @schedule.delete(timer)
  end

  nil
end
wakeup() click to toggle source
# File lib/workers/scheduler.rb, line 33
def wakeup
  @thread.wakeup

  nil
end

Private Instance Methods

next_delay() click to toggle source
# File lib/workers/scheduler.rb, line 90
def next_delay
  @schedule.first ? @schedule.first.sec_remaining : nil
end
process_overdue() click to toggle source
# File lib/workers/scheduler.rb, line 70
def process_overdue
  overdue = []

  while @schedule.first && @schedule.first.overdue?
    overdue << @schedule.first
    @schedule.delete(@schedule.first)
  end

  overdue.each do |timer|
    @pool.perform do
      timer.fire
    end

    timer.reset
    @schedule << timer if timer.repeat
  end

  nil
end
start_loop() click to toggle source
# File lib/workers/scheduler.rb, line 55
def start_loop
  while true
    delay = nil

    @mutex.synchronize do
      process_overdue
      delay = next_delay
    end

    delay ? sleep(delay) : sleep
  end

  nil
end