class JobPool

TODO: take mutex once in kill_all TODO: rewrite wait_next

Attributes

max_jobs[RW]

Public Class Methods

new(options={}) click to toggle source

## Options

  • max_jobs: the maximum number of jobs that can be running at any one time, or nil if unlimited.

# File lib/job_pool.rb, line 16
def initialize(options={})
  @mutex ||= Mutex.new

  @processes ||= []   # TODO: convert this to a hash by child thread?
  @max_jobs = options[:max_jobs]
end

Public Instance Methods

_add(process) click to toggle source
# File lib/job_pool.rb, line 71
def _add process
  @mutex.synchronize do
    if @max_jobs && @processes.count >= @max_jobs
      raise JobPool::TooManyJobsError.new("launched process #{@processes.count+1} of #{@max_processes} maximum")
    end
    @processes.push process
  end
end
_remove(process) { || ... } click to toggle source

removes process from process table. pass a block that cleans up after the process. _remove may be called lots of times but block will only be called once

# File lib/job_pool.rb, line 82
def _remove process
  cleanup = false

  @mutex.synchronize do
    cleanup = process._deactivate
    raise "process not in process table??" if cleanup && !@processes.include?(process)
  end

  # don't want to hold mutex when calling callback because it might block
  if cleanup
    yield
    @mutex.synchronize do
      value = @processes.delete(process)
      raise "someone else deleted process??" unless value
    end
  end
end
count() click to toggle source
# File lib/job_pool.rb, line 31
def count
  @mutex.synchronize { @processes.count }
end
find(&block) click to toggle source
# File lib/job_pool.rb, line 35
def find &block
  @mutex.synchronize { @processes.find(&block) }
end
first() click to toggle source
# File lib/job_pool.rb, line 27
def first
  @mutex.synchronize { @processes.first }
end
kill_all() click to toggle source
# File lib/job_pool.rb, line 39
def kill_all
  # TODO: this is racy...  if someone else is starting processes,
  # we'll just endless loop.  can we take the mutex once outside the loop?
  while f = first
    f.kill
  end
end
launch(*args) click to toggle source
# File lib/job_pool.rb, line 23
def launch *args
  JobPool::Job.new self, *args
end
log(msg) click to toggle source

called there's an error in a job's subthreads. never happens during normal # usage.

# File lib/job_pool.rb, line 67
def log msg
  STDERR.puts msg
end
wait_next(nonblock=nil) click to toggle source

blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).

# File lib/job_pool.rb, line 50
def wait_next nonblock=nil
  # we wait on child threads since calling waitpid would produce a race condition.

  threads = {}
  @processes.each { |p|
    threads[p._child_thread] = p
  }

  # TODO: test nonblock

  thread = ThreadsWait.new(threads.keys).next_wait(nonblock)
  process = threads[thread]
  process.stop # otherwise process will be in an indeterminite state
  process
end