class MiniScheduler::Manager::Runner
Public Class Methods
new(manager)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 8 def initialize(manager) @stopped = false @mutex = Mutex.new @queue = Queue.new @manager = manager @hostname = manager.hostname @recovery_thread = Thread.new do while !@stopped sleep 60 @mutex.synchronize do repair_queue reschedule_orphans end end end @keep_alive_thread = Thread.new do while !@stopped @mutex.synchronize do keep_alive end sleep (@manager.keep_alive_duration / 2) end end @threads = [] manager.workers.times do @threads << Thread.new do while !@stopped process_queue end end end end
Public Instance Methods
attempts(n) { || ... }
click to toggle source
# File lib/mini_scheduler/manager.rb, line 166 def attempts(n) n.times { begin yield; break rescue sleep Random.rand end } end
enq(klass)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 150 def enq(klass) @queue << klass end
hostname()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 61 def hostname @hostname end
keep_alive()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 43 def keep_alive @manager.keep_alive rescue => ex MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive") end
process_queue()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 65 def process_queue klass = @queue.deq # hack alert, I need to both deq and set @running atomically. @running = true return if !klass failed = false start = Time.now.to_f info = @mutex.synchronize { @manager.schedule_info(klass) } stat = nil error = nil begin info.prev_result = "RUNNING" @mutex.synchronize { info.write! } if @manager.enable_stats stat = MiniScheduler::Stat.create!( name: klass.to_s, hostname: hostname, pid: Process.pid, started_at: Time.now, live_slots_start: GC.stat[:heap_live_slots] ) end klass.new.perform rescue => e MiniScheduler.handle_job_exception(e, message: "Running a scheduled job", job: { "class" => klass }) error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}" failed = true end duration = ((Time.now.to_f - start) * 1000).to_i info.prev_duration = duration info.prev_result = failed ? "FAILED" : "OK" info.current_owner = nil if stat stat.update!( duration_ms: duration, live_slots_finish: GC.stat[:heap_live_slots], success: !failed, error: error ) MiniScheduler.job_ran&.call(stat) end attempts(3) do @mutex.synchronize { info.write! } end rescue => ex MiniScheduler.handle_job_exception(ex, message: "Processing scheduled job queue") ensure @running = false if defined?(ActiveRecord::Base) ActiveRecord::Base.connection_handler.clear_active_connections! end end
repair_queue()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 49 def repair_queue @manager.repair_queue rescue => ex MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair") end
reschedule_orphans()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 55 def reschedule_orphans @manager.reschedule_orphans! rescue => ex MiniScheduler.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler") end
stop!()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 125 def stop! return if @stopped @mutex.synchronize do @stopped = true @keep_alive_thread.kill @recovery_thread.kill @keep_alive_thread.join @recovery_thread.join enq(nil) kill_thread = Thread.new do sleep 0.5 @threads.each(&:kill) end @threads.each(&:join) kill_thread.kill kill_thread.join end end
wait_till_done()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 154 def wait_till_done while !@queue.empty? && !(@queue.num_waiting > 0) sleep 0.001 end # this is a hack, but is only used for test anyway # if tests fail that depend on this we are forced to increase it. sleep 0.010 while @running sleep 0.001 end end