class RJR::ThreadPool
Utility to launches a specified number of threads on instantiation, assigning work to them in order as it arrives.
Supports optional timeout which allows the user to kill and restart threads if a job is taking too long to run.
Attributes
Number of threads to instantiate in local worker pool
Timeout after which worker threads are killed
Public Class Methods
Create a new thread pool
# File lib/rjr/util/thread_pool.rb, line 162 def initialize RJR::ThreadPool.num_threads ||= 20 RJR::ThreadPool.timeout ||= 10 @num_threads = RJR::ThreadPool.num_threads @timeout = RJR::ThreadPool.timeout @worker_threads = [] @work_queue = Queue.new @running_queue = Queue.new @thread_lock = Mutex.new @terminate = true ObjectSpace.define_finalizer(self, self.class.finalize(self)) end
Private Class Methods
Ruby ObjectSpace finalizer to ensure that thread pool terminates all threads when object is destroyed.
# File lib/rjr/util/thread_pool.rb, line 180 def self.finalize(thread_pool) # TODO this isn't doing much as by the time this is invoked threads will # already be shutdown proc { thread_pool.stop ; thread_pool.join } end
Public Instance Methods
Add work to the pool @param [ThreadPoolJob] work job to execute in first available thread @return self
# File lib/rjr/util/thread_pool.rb, line 206 def <<(work) # TODO option to increase worker threads if work queue gets saturated @work_queue.push work self end
Block until all worker threads have finished executing
@return self
# File lib/rjr/util/thread_pool.rb, line 231 def join @manager_thread.join if @manager_thread self end
Return boolean indicating if thread pool is running
# File lib/rjr/util/thread_pool.rb, line 198 def running? !@manager_thread.nil? && ['sleep', 'run'].include?(@manager_thread.status) end
Start the thread pool
# File lib/rjr/util/thread_pool.rb, line 189 def start return self unless @terminate @terminate = false 0.upto(@num_threads-1) { |i| launch_worker } launch_manager self end
Terminate the thread pool, stopping all worker threads
@return self
# File lib/rjr/util/thread_pool.rb, line 215 def stop @terminate = true # this will wake up on it's own, but we can # speed things up if we manually wake it up, # surround w/ block incase thread cleans up on its own begin @manager_thread.wakeup if @manager_thread rescue end self end
Private Instance Methods
Internal helper, performs checks on workers
# File lib/rjr/util/thread_pool.rb, line 118 def check_workers if @terminate @worker_threads.each { |t| t.kill } @worker_threads = [] elsif @timeout readd = [] while @running_queue.size > 0 && work = @running_queue.pop # check expiration / killing expired threads must be atomic # and mutually exclusive with the process of marking a job completed above @thread_lock.synchronize{ if work.expired?(@timeout) work.thread.kill @worker_threads.delete(work.thread) launch_worker elsif !work.completed? readd << work end } end readd.each { |work| @running_queue << work } end end
Internal helper, launch management thread
# File lib/rjr/util/thread_pool.rb, line 146 def launch_manager @manager_thread = Thread.new { until @terminate if @timeout sleep @timeout check_workers else Thread.yield end end check_workers } end
Internal helper, launch worker thread
# File lib/rjr/util/thread_pool.rb, line 99 def launch_worker @worker_threads << Thread.new { while work = @work_queue.pop begin #RJR::Logger.debug "launch thread pool job #{work}" @running_queue << work work.exec(@thread_lock) # TODO cleaner / more immediate way to pop item off running_queue #RJR::Logger.debug "finished thread pool job #{work}" rescue Exception => e # FIXME also send to rjr logger at a critical level puts "Thread raised Fatal Exception #{e}" puts "\n#{e.backtrace.join("\n")}" end end } end