class Omnibus::ThreadPool

Public Class Methods

new(size, abort_on_exception = true) { |self| ... } click to toggle source

Create a new thread pool of the given size. If a block is given, it is assumed the thread pool is wrapping an operation and will block until all operations complete.

@example Using a block

ThreadPool.new(5) do |pool|
  complex_things.each do |thing|
    pool.schedule { thing.run }
  end
end

@example Using the object

pool = ThreadPool.new(5)
# ...
pool.schedule { complex_operation_1 }
pool.schedule { complex_operation_2 }
# ...
pool.schedule { complex_operation_4 }
# ...
pool.shutdown

# or

at_exit { pool.shutdown }

@param [Integer] size

the number of items to put in the thread pool

@param [Boolean] abort_on_exception

if the thread should abort the main thread also on a failure
# File lib/omnibus/thread_pool.rb, line 52
def initialize(size, abort_on_exception = true)
  @size = size
  @jobs = Queue.new

  @pool = Array.new(@size) do |i|
    Thread.new do
      Thread.abort_on_exception = abort_on_exception
      Thread.current[:id] = i

      catch(:exit) do
        loop do
          job, args = @jobs.pop
          job.call(*args)
        end
      end
    end
  end

  if block_given?
    yield self
    shutdown
  end
end

Public Instance Methods

schedule(*args, &block) click to toggle source

Schedule a single item onto the queue. If arguments are given, those arguments are used when calling the block in the queue. This is useful if you have arguments that you need to pass in from a parent binding.

@param [Object, Array<Object>] args

the arguments to pass to the block when calling

@param [Proc] block

the block to execute

@return [void]

# File lib/omnibus/thread_pool.rb, line 88
def schedule(*args, &block)
  @jobs << [block, args]
end
shutdown() click to toggle source

Stop the thread pool. This method quietly injects an exit clause into the queue (sometimes called “poison”) and then waits for all threads to exit.

@return [true]

# File lib/omnibus/thread_pool.rb, line 99
def shutdown
  @size.times do
    schedule { throw :exit }
  end

  @pool.map(&:join)

  true
end