class PerfectQueue::TaskMonitor
Public Class Methods
new(config, child_heartbeat=nil, force_stop=nil)
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 22 def initialize(config, child_heartbeat=nil, force_stop=nil) @config = config @log = config[:logger] @child_heartbeat = child_heartbeat || Proc.new {} @force_stop = force_stop || Proc.new {} @child_heartbeat_interval = (@config[:child_heartbeat_interval] || 2).to_i @task_heartbeat_interval = (@config[:task_heartbeat_interval] || 2).to_i @last_child_heartbeat = Time.now.to_i @last_task_heartbeat = Time.now.to_i @task = nil @mutex = Monitor.new # support recursive lock @cond = @mutex.new_cond @finished = false end
Public Instance Methods
external_task_heartbeat(task, &block)
click to toggle source
callback
# File lib/perfectqueue/task_monitor.rb, line 100 def external_task_heartbeat(task, &block) @mutex.synchronize { if task == @task ret = block.call if block @last_task_heartbeat = Time.now.to_i end ret } end
join()
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 51 def join @thread.join end
kill_task(reason)
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 74 def kill_task(reason) @mutex.synchronize { if task = @task begin task.runner.kill(reason) # may recursive lock rescue @log.error "failed to kill task: #{$!.class}: #{$!}" $!.backtrace.each {|bt| @log.warn "\t#{bt}" } raise # force exit end end } end
run()
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 110 def run @mutex.synchronize { now = Time.now.to_i until @finished next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval if @task next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval next_time = [next_child_heartbeat, next_task_heartbeat].min else next_task_heartbeat = nil next_time = next_child_heartbeat end next_wait = next_time - now @cond.wait(next_wait) if next_wait > 0 now = Time.now.to_i if @task && next_task_heartbeat && next_task_heartbeat <= now task_heartbeat @last_task_heartbeat = now end if next_child_heartbeat <= now @child_heartbeat.call # will recursive lock @last_child_heartbeat = now end end } rescue @log.error "Unknown error #{$!.class}: #{$!}" $!.backtrace.each {|bt| @log.warn "\t#{bt}" } @force_stop.call end
set_task(task, runner)
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 55 def set_task(task, runner) task.extend(TaskMonitorHook) task.log = @log task.task_monitor = self task.runner = runner @mutex.synchronize { @task = task @last_task_heartbeat = Time.now.to_i } end
start()
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 40 def start @thread = Thread.new(&method(:run)) end
stop()
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 44 def stop @finished = true @mutex.synchronize { @cond.broadcast } end
stop_task(immediate)
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 66 def stop_task(immediate) if immediate kill_task ImmediateProcessStopError.new('immediate stop requested') else kill_task GracefulProcessStopError.new('graceful stop requested') end end
task_finished(task, &block)
click to toggle source
callback
# File lib/perfectqueue/task_monitor.rb, line 89 def task_finished(task, &block) @mutex.synchronize { ret = block.call if block # TODO is this ought to be synchronized? if task == @task @task = nil end ret } end
Private Instance Methods
task_heartbeat()
click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 147 def task_heartbeat @task.heartbeat! rescue # finished, preempted, etc. kill_task($!) end