class RJR::ThreadPool

Utility to launches a specified number of threads on instantiation, assigning work to them in order as it arrives.

Supports optional timeout which allows the user to kill and restart threads if a job is taking too long to run.

Attributes

num_threads[RW]

Number of threads to instantiate in local worker pool

timeout[RW]

Timeout after which worker threads are killed

Public Class Methods

new() click to toggle source

Create a new thread pool

# File lib/rjr/util/thread_pool.rb, line 162
def initialize
  RJR::ThreadPool.num_threads ||=  20
  RJR::ThreadPool.timeout     ||=  10
  @num_threads    = RJR::ThreadPool.num_threads
  @timeout        = RJR::ThreadPool.timeout
  @worker_threads = []

  @work_queue     = Queue.new
  @running_queue  = Queue.new

  @thread_lock = Mutex.new
  @terminate = true

  ObjectSpace.define_finalizer(self, self.class.finalize(self))
end

Private Class Methods

finalize(thread_pool) click to toggle source

Ruby ObjectSpace finalizer to ensure that thread pool terminates all threads when object is destroyed.

# File lib/rjr/util/thread_pool.rb, line 180
def self.finalize(thread_pool)
  # TODO this isn't doing much as by the time this is invoked threads will
  # already be shutdown
  proc { thread_pool.stop ; thread_pool.join }
end

Public Instance Methods

<<(work) click to toggle source

Add work to the pool @param [ThreadPoolJob] work job to execute in first available thread @return self

# File lib/rjr/util/thread_pool.rb, line 206
def <<(work)
  # TODO option to increase worker threads if work queue gets saturated
  @work_queue.push work
  self
end
join() click to toggle source

Block until all worker threads have finished executing

@return self

# File lib/rjr/util/thread_pool.rb, line 231
def join
  @manager_thread.join if @manager_thread
  self
end
running?() click to toggle source

Return boolean indicating if thread pool is running

# File lib/rjr/util/thread_pool.rb, line 198
def running?
  !@manager_thread.nil? &&
  ['sleep', 'run'].include?(@manager_thread.status)
end
start() click to toggle source

Start the thread pool

# File lib/rjr/util/thread_pool.rb, line 189
def start
  return self unless @terminate
  @terminate = false
  0.upto(@num_threads-1) { |i| launch_worker }
  launch_manager
  self
end
stop() click to toggle source

Terminate the thread pool, stopping all worker threads

@return self

# File lib/rjr/util/thread_pool.rb, line 215
def stop
  @terminate = true

  # this will wake up on it's own, but we can
  # speed things up if we manually wake it up,
  # surround w/ block incase thread cleans up on its own
  begin
    @manager_thread.wakeup if @manager_thread
  rescue
  end
  self
end

Private Instance Methods

check_workers() click to toggle source

Internal helper, performs checks on workers

# File lib/rjr/util/thread_pool.rb, line 118
def check_workers
  if @terminate
    @worker_threads.each { |t| 
      t.kill
    }
    @worker_threads = []

  elsif @timeout
    readd = []
    while @running_queue.size > 0 && work = @running_queue.pop
      # check expiration / killing expired threads must be atomic
      # and mutually exclusive with the process of marking a job completed above
      @thread_lock.synchronize{
        if work.expired?(@timeout)
          work.thread.kill
          @worker_threads.delete(work.thread)
          launch_worker

        elsif !work.completed?
          readd << work
        end
      }
    end
    readd.each { |work| @running_queue << work }
  end
end
launch_manager() click to toggle source

Internal helper, launch management thread

# File lib/rjr/util/thread_pool.rb, line 146
def launch_manager
  @manager_thread = Thread.new {
    until @terminate
      if @timeout
        sleep @timeout
        check_workers
      else
        Thread.yield
      end
    end

    check_workers
  }
end
launch_worker() click to toggle source

Internal helper, launch worker thread

# File lib/rjr/util/thread_pool.rb, line 99
def launch_worker
  @worker_threads << Thread.new {
    while work = @work_queue.pop
      begin
        #RJR::Logger.debug "launch thread pool job #{work}"
        @running_queue << work
        work.exec(@thread_lock)
        # TODO cleaner / more immediate way to pop item off running_queue
        #RJR::Logger.debug "finished thread pool job #{work}"
      rescue Exception => e
        # FIXME also send to rjr logger at a critical level
        puts "Thread raised Fatal Exception #{e}"
        puts "\n#{e.backtrace.join("\n")}"
      end
    end
  }
end