class Scheduler::Manager::Runner
Public Class Methods
new(manager)
click to toggle source
# File lib/scheduler/manager.rb, line 14 def initialize(manager) @mutex = Mutex.new @queue = Queue.new @manager = manager @reschedule_orphans_thread = Thread.new do while true sleep 1.minute @mutex.synchronize do reschedule_orphans end end end @keep_alive_thread = Thread.new do while true @mutex.synchronize do keep_alive end sleep (@manager.keep_alive_duration / 2) end end @thread = Thread.new do while true process_queue end end end
Public Instance Methods
attempts(n) { || ... }
click to toggle source
# File lib/scheduler/manager.rb, line 104 def attempts(n) n.times { begin yield; break rescue sleep Random.rand end } end
enq(klass)
click to toggle source
# File lib/scheduler/manager.rb, line 89 def enq(klass) @queue << klass end
keep_alive()
click to toggle source
# File lib/scheduler/manager.rb, line 41 def keep_alive @manager.keep_alive rescue => ex Scheduler.handle_job_exception(ex, {message: "Scheduling manager keep-alive"}) end
process_queue()
click to toggle source
# File lib/scheduler/manager.rb, line 53 def process_queue klass = @queue.deq # hack alert, I need to both deq and set @running atomically. @running = true failed = false start = Time.now.to_f info = @mutex.synchronize { @manager.schedule_info(klass) } begin info.prev_result = "RUNNING" @mutex.synchronize { info.write! } klass.new.perform rescue => e Scheduler.handle_job_exception(e, {message: "Running a scheduled job", job: klass}) failed = true end duration = ((Time.now.to_f - start) * 1000).to_i info.prev_duration = duration info.prev_result = failed ? "FAILED" : "OK" info.current_owner = nil attempts(3) do @mutex.synchronize { info.write! } end rescue => ex Scheduler.handle_job_exception(ex, {message: "Processing scheduled job queue"}) ensure @running = false end
reschedule_orphans()
click to toggle source
# File lib/scheduler/manager.rb, line 47 def reschedule_orphans @manager.reschedule_orphans! rescue => ex Scheduler.handle_job_exception(ex, {message: "Scheduling manager orphan rescheduler"}) end
stop!()
click to toggle source
# File lib/scheduler/manager.rb, line 81 def stop! @mutex.synchronize do @thread.kill @keep_alive_thread.kill @reschedule_orphans_thread.kill end end
wait_till_done()
click to toggle source
# File lib/scheduler/manager.rb, line 93 def wait_till_done while !@queue.empty? && !(@queue.num_waiting > 0) sleep 0.001 end # this is a hack, but is only used for test anyway sleep 0.001 while @running sleep 0.001 end end