class Scheduler::Manager::Runner

Public Class Methods

new(manager) click to toggle source
# File lib/scheduler/manager.rb, line 14
def initialize(manager)
  @mutex = Mutex.new
  @queue = Queue.new
  @manager = manager
  @reschedule_orphans_thread = Thread.new do
    while true
      sleep 1.minute
      @mutex.synchronize do
        reschedule_orphans
      end
    end
  end
  @keep_alive_thread = Thread.new do
    while true
      @mutex.synchronize do
        keep_alive
      end
      sleep (@manager.keep_alive_duration / 2)
    end
  end
  @thread = Thread.new do
    while true
      process_queue
    end
  end
end

Public Instance Methods

attempts(n) { || ... } click to toggle source
# File lib/scheduler/manager.rb, line 104
def attempts(n)
  n.times {
    begin
      yield; break
    rescue
      sleep Random.rand
    end
  }
end
enq(klass) click to toggle source
# File lib/scheduler/manager.rb, line 89
def enq(klass)
  @queue << klass
end
keep_alive() click to toggle source
# File lib/scheduler/manager.rb, line 41
def keep_alive
  @manager.keep_alive
rescue => ex
  Scheduler.handle_job_exception(ex, {message: "Scheduling manager keep-alive"})
end
process_queue() click to toggle source
# File lib/scheduler/manager.rb, line 53
def process_queue
  klass = @queue.deq
  # hack alert, I need to both deq and set @running atomically.
  @running = true
  failed = false
  start = Time.now.to_f
  info = @mutex.synchronize { @manager.schedule_info(klass) }
  begin
    info.prev_result = "RUNNING"
    @mutex.synchronize { info.write! }
    klass.new.perform
  rescue => e
    Scheduler.handle_job_exception(e, {message: "Running a scheduled job", job: klass})
    failed = true
  end
  duration = ((Time.now.to_f - start) * 1000).to_i
  info.prev_duration = duration
  info.prev_result = failed ? "FAILED" : "OK"
  info.current_owner = nil
  attempts(3) do
    @mutex.synchronize { info.write! }
  end
rescue => ex
  Scheduler.handle_job_exception(ex, {message: "Processing scheduled job queue"})
ensure
  @running = false
end
reschedule_orphans() click to toggle source
# File lib/scheduler/manager.rb, line 47
def reschedule_orphans
  @manager.reschedule_orphans!
rescue => ex
  Scheduler.handle_job_exception(ex, {message: "Scheduling manager orphan rescheduler"})
end
stop!() click to toggle source
# File lib/scheduler/manager.rb, line 81
def stop!
  @mutex.synchronize do
    @thread.kill
    @keep_alive_thread.kill
    @reschedule_orphans_thread.kill
  end
end
wait_till_done() click to toggle source
# File lib/scheduler/manager.rb, line 93
def wait_till_done
  while !@queue.empty? && !(@queue.num_waiting > 0)
    sleep 0.001
  end
  # this is a hack, but is only used for test anyway
  sleep 0.001
  while @running
    sleep 0.001
  end
end