class Jober::Manager

Attributes

logger_path[RW]
stopped[RW]

Public Class Methods

new(name, allowed_classes = nil) click to toggle source
# File lib/jober/manager.rb, line 8
def initialize(name, allowed_classes = nil)
  @name = name
  @stopped = false
  @mutex = Mutex.new
  @pids = []
  @allowed_classes = allowed_classes ? (Jober.classes & allowed_classes) : Jober.classes

  $0 = "#{@name} manager"
  if @logger_path
    self.logger = ::Logger.new(File.join(@logger_path, "manager.log"))
  end
  info "starting manager #{@name}"
end

Public Instance Methods

catch() { || ... } click to toggle source
# File lib/jober/manager.rb, line 67
def catch
  yield
  true
rescue Object => ex
  Jober.exception(ex)
  nil
end
pids() click to toggle source
# File lib/jober/manager.rb, line 117
def pids
  @mutex.synchronize { @pids }
end
run() click to toggle source
# File lib/jober/manager.rb, line 30
def run
  run!

  trap("TERM") { stop }

  loop do
    sleep 1
    break if @stopped
  end
end
run!() click to toggle source
# File lib/jober/manager.rb, line 22
def run!
  @allowed_classes.each do |klass|
    klass.get_workers.times do |idx|
      Thread.new { start_worker(klass, klass.get_interval, idx, klass.get_workers) }
    end
  end
end
run_task_fork(klass, idx, count) click to toggle source
# File lib/jober/manager.rb, line 93
def run_task_fork(klass, idx, count)
  info "invoke #{klass}"
  fork do
    $0 = "#{@name} manager #{klass}"
    #$0 += " #{index}" if index > 0
    Jober.call_after_fork
    Jober.reset_redis

    inst = klass.new(:worker_id => idx, :workers_count => count) # class_name parent of Jober::Task

    if @logger_path
      logger_path = File.join(@logger_path, "#{klass.short_name}.log")

      STDOUT.reopen(File.open(logger_path, 'a'))
      STDERR.reopen(File.open(logger_path, 'a'))
      inst.logger = ::Logger.new(logger_path)
    end

    catch do
      inst.execute
    end
  end
end
start_worker(klass, interval, idx, count) click to toggle source
# File lib/jober/manager.rb, line 75
def start_worker(klass, interval, idx, count)
  debug { "start worker for #{klass.to_s}" }
  loop do
    pid = nil
    res = catch do
      pid = run_task_fork(klass, idx, count)
      add_pid(pid)
      Process.wait(pid)
      del_pid(pid)
      sleep interval.to_f unless stopped
    end
    del_pid(pid)
    break if stopped
    sleep 0.5 unless res
    break if stopped
  end
end
stop(timeout = 2.5) click to toggle source
# File lib/jober/manager.rb, line 47
def stop(timeout = 2.5)
  stop!
  return if @pids.empty?

  sum = 0
  while true
    sleep(0.1)
    sum += 0.1
    break if sum >= timeout
    break if @pids.empty?
  end

  return if @pids.empty?

  info { "still alive pids: #{@pids}, killing" }
  @pids.each { |pid| ::Process.kill("KILL", pid) }

  @pids = []
end
stop!() click to toggle source
# File lib/jober/manager.rb, line 41
def stop!
  @stopped = true
  @pids.each { |pid| ::Process.kill("QUIT", pid) }
  info "stopping manager..."
end

Private Instance Methods

add_pid(pid) click to toggle source
# File lib/jober/manager.rb, line 123
def add_pid(pid)
  @mutex.synchronize { @pids << pid }
end
clear_pids() click to toggle source
# File lib/jober/manager.rb, line 131
def clear_pids
  @mutex.synchronize { @pids = [] }
end
del_pid(pid) click to toggle source
# File lib/jober/manager.rb, line 127
def del_pid(pid)
  @mutex.synchronize { @pids -= [pid] }
end