class PerfectQueue::TaskMonitor

Public Class Methods

new(config, child_heartbeat=nil, force_stop=nil) click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 22
def initialize(config, child_heartbeat=nil, force_stop=nil)
  @config = config
  @log = config[:logger]
  @child_heartbeat = child_heartbeat || Proc.new {}
  @force_stop = force_stop || Proc.new {}

  @child_heartbeat_interval = (@config[:child_heartbeat_interval] || 2).to_i
  @task_heartbeat_interval = (@config[:task_heartbeat_interval] || 2).to_i
  @last_child_heartbeat = Time.now.to_i
  @last_task_heartbeat = Time.now.to_i

  @task = nil

  @mutex = Monitor.new  # support recursive lock
  @cond = @mutex.new_cond
  @finished = false
end

Public Instance Methods

external_task_heartbeat(task, &block) click to toggle source

callback

# File lib/perfectqueue/task_monitor.rb, line 100
def external_task_heartbeat(task, &block)
  @mutex.synchronize {
    if task == @task
      ret = block.call if block
      @last_task_heartbeat = Time.now.to_i
    end
    ret
  }
end
join() click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 51
def join
  @thread.join
end
kill_task(reason) click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 74
def kill_task(reason)
  @mutex.synchronize {
    if task = @task
      begin
        task.runner.kill(reason)  # may recursive lock
      rescue
        @log.error "failed to kill task: #{$!.class}: #{$!}"
        $!.backtrace.each {|bt| @log.warn "\t#{bt}" }
        raise # force exit
      end
    end
  }
end
run() click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 110
def run
  @mutex.synchronize {
    now = Time.now.to_i

    until @finished
      next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval

      if @task
        next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval
        next_time = [next_child_heartbeat, next_task_heartbeat].min
      else
        next_task_heartbeat = nil
        next_time = next_child_heartbeat
      end

      next_wait = next_time - now
      @cond.wait(next_wait) if next_wait > 0

      now = Time.now.to_i
      if @task && next_task_heartbeat && next_task_heartbeat <= now
        task_heartbeat
        @last_task_heartbeat = now
      end

      if next_child_heartbeat <= now
        @child_heartbeat.call  # will recursive lock
        @last_child_heartbeat = now
      end
    end
  }
rescue
  @log.error "Unknown error #{$!.class}: #{$!}"
  $!.backtrace.each {|bt| @log.warn "\t#{bt}" }
  @force_stop.call
end
set_task(task, runner) click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 55
def set_task(task, runner)
  task.extend(TaskMonitorHook)
  task.log = @log
  task.task_monitor = self
  task.runner = runner
  @mutex.synchronize {
    @task = task
    @last_task_heartbeat = Time.now.to_i
  }
end
start() click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 40
def start
  @thread = Thread.new(&method(:run))
end
stop() click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 44
def stop
  @finished = true
  @mutex.synchronize {
    @cond.broadcast
  }
end
stop_task(immediate) click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 66
def stop_task(immediate)
  if immediate
    kill_task ImmediateProcessStopError.new('immediate stop requested')
  else
    kill_task GracefulProcessStopError.new('graceful stop requested')
  end
end
task_finished(task, &block) click to toggle source

callback

# File lib/perfectqueue/task_monitor.rb, line 89
def task_finished(task, &block)
  @mutex.synchronize {
    ret = block.call if block  # TODO is this ought to be synchronized?
    if task == @task
      @task = nil
    end
    ret
  }
end

Private Instance Methods

task_heartbeat() click to toggle source
# File lib/perfectqueue/task_monitor.rb, line 147
def task_heartbeat
  @task.heartbeat!
rescue
  # finished, preempted, etc.
  kill_task($!)
end