class Jober::Manager
Attributes
logger_path[RW]
stopped[RW]
Public Class Methods
new(name, allowed_classes = nil)
click to toggle source
# File lib/jober/manager.rb, line 8 def initialize(name, allowed_classes = nil) @name = name @stopped = false @mutex = Mutex.new @pids = [] @allowed_classes = allowed_classes ? (Jober.classes & allowed_classes) : Jober.classes $0 = "#{@name} manager" if @logger_path self.logger = ::Logger.new(File.join(@logger_path, "manager.log")) end info "starting manager #{@name}" end
Public Instance Methods
catch() { || ... }
click to toggle source
# File lib/jober/manager.rb, line 67 def catch yield true rescue Object => ex Jober.exception(ex) nil end
pids()
click to toggle source
# File lib/jober/manager.rb, line 117 def pids @mutex.synchronize { @pids } end
run()
click to toggle source
# File lib/jober/manager.rb, line 30 def run run! trap("TERM") { stop } loop do sleep 1 break if @stopped end end
run!()
click to toggle source
# File lib/jober/manager.rb, line 22 def run! @allowed_classes.each do |klass| klass.get_workers.times do |idx| Thread.new { start_worker(klass, klass.get_interval, idx, klass.get_workers) } end end end
run_task_fork(klass, idx, count)
click to toggle source
# File lib/jober/manager.rb, line 93 def run_task_fork(klass, idx, count) info "invoke #{klass}" fork do $0 = "#{@name} manager #{klass}" #$0 += " #{index}" if index > 0 Jober.call_after_fork Jober.reset_redis inst = klass.new(:worker_id => idx, :workers_count => count) # class_name parent of Jober::Task if @logger_path logger_path = File.join(@logger_path, "#{klass.short_name}.log") STDOUT.reopen(File.open(logger_path, 'a')) STDERR.reopen(File.open(logger_path, 'a')) inst.logger = ::Logger.new(logger_path) end catch do inst.execute end end end
start_worker(klass, interval, idx, count)
click to toggle source
# File lib/jober/manager.rb, line 75 def start_worker(klass, interval, idx, count) debug { "start worker for #{klass.to_s}" } loop do pid = nil res = catch do pid = run_task_fork(klass, idx, count) add_pid(pid) Process.wait(pid) del_pid(pid) sleep interval.to_f unless stopped end del_pid(pid) break if stopped sleep 0.5 unless res break if stopped end end
stop(timeout = 2.5)
click to toggle source
# File lib/jober/manager.rb, line 47 def stop(timeout = 2.5) stop! return if @pids.empty? sum = 0 while true sleep(0.1) sum += 0.1 break if sum >= timeout break if @pids.empty? end return if @pids.empty? info { "still alive pids: #{@pids}, killing" } @pids.each { |pid| ::Process.kill("KILL", pid) } @pids = [] end
stop!()
click to toggle source
# File lib/jober/manager.rb, line 41 def stop! @stopped = true @pids.each { |pid| ::Process.kill("QUIT", pid) } info "stopping manager..." end
Private Instance Methods
add_pid(pid)
click to toggle source
# File lib/jober/manager.rb, line 123 def add_pid(pid) @mutex.synchronize { @pids << pid } end
clear_pids()
click to toggle source
# File lib/jober/manager.rb, line 131 def clear_pids @mutex.synchronize { @pids = [] } end
del_pid(pid)
click to toggle source
# File lib/jober/manager.rb, line 127 def del_pid(pid) @mutex.synchronize { @pids -= [pid] } end