class JobPool
Attributes
max_jobs[RW]
Public Class Methods
new(options={})
click to toggle source
## Options
-
max_jobs
: the maximum number of jobs that can be running at any one time, or nil if unlimited.
# File lib/job_pool.rb, line 16 def initialize(options={}) @mutex ||= Mutex.new @processes ||= [] # TODO: convert this to a hash by child thread? @max_jobs = options[:max_jobs] end
Public Instance Methods
_add(process)
click to toggle source
# File lib/job_pool.rb, line 71 def _add process @mutex.synchronize do if @max_jobs && @processes.count >= @max_jobs raise JobPool::TooManyJobsError.new("launched process #{@processes.count+1} of #{@max_processes} maximum") end @processes.push process end end
_remove(process) { || ... }
click to toggle source
removes process from process table. pass a block that cleans up after the process. _remove may be called lots of times but block will only be called once
# File lib/job_pool.rb, line 82 def _remove process cleanup = false @mutex.synchronize do cleanup = process._deactivate raise "process not in process table??" if cleanup && !@processes.include?(process) end # don't want to hold mutex when calling callback because it might block if cleanup yield @mutex.synchronize do value = @processes.delete(process) raise "someone else deleted process??" unless value end end end
count()
click to toggle source
# File lib/job_pool.rb, line 31 def count @mutex.synchronize { @processes.count } end
find(&block)
click to toggle source
# File lib/job_pool.rb, line 35 def find &block @mutex.synchronize { @processes.find(&block) } end
first()
click to toggle source
# File lib/job_pool.rb, line 27 def first @mutex.synchronize { @processes.first } end
kill_all()
click to toggle source
# File lib/job_pool.rb, line 39 def kill_all # TODO: this is racy... if someone else is starting processes, # we'll just endless loop. can we take the mutex once outside the loop? while f = first f.kill end end
launch(*args)
click to toggle source
# File lib/job_pool.rb, line 23 def launch *args JobPool::Job.new self, *args end
log(msg)
click to toggle source
called there's an error in a job's subthreads. never happens during normal # usage.
# File lib/job_pool.rb, line 67 def log msg STDERR.puts msg end
wait_next(nonblock=nil)
click to toggle source
blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).
# File lib/job_pool.rb, line 50 def wait_next nonblock=nil # we wait on child threads since calling waitpid would produce a race condition. threads = {} @processes.each { |p| threads[p._child_thread] = p } # TODO: test nonblock thread = ThreadsWait.new(threads.keys).next_wait(nonblock) process = threads[thread] process.stop # otherwise process will be in an indeterminite state process end