class Thread::Pool
A pool is a container of a limited amount of threads to which you can add tasks to run.
This is usually more performant and less memory intensive than creating a new thread for every task.
Attributes
If true, tasks will allow raised exceptions to pass through.
Similar to Thread.abort_on_exception
Public Class Methods
# File lib/balotelli/thread/pool.rb, line 341 def cpu_count Etc.nprocessors * 2 rescue 16 end
Create the pool with minimum and maximum threads.
The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.
A default block can be passed, which will be used to {#process} the passed data.
# File lib/balotelli/thread/pool.rb, line 116 def initialize(min, max = nil, &block) @min = min @max = max || min @block = block @cond = ConditionVariable.new @mutex = Mutex.new @done = ConditionVariable.new @done_mutex = Mutex.new @todo = [] @workers = [] @timeouts = {} @spawned = 0 @waiting = 0 @shutdown = false @trim_requests = 0 @auto_trim = false @idle_trim = nil @timeout = nil @mutex.synchronize { min.times { spawn_thread } } end
Public Instance Methods
Enable auto trimming, unneeded threads will be deleted until the minimum is reached.
# File lib/balotelli/thread/pool.rb, line 158 def auto_trim! @auto_trim = true self end
Check if auto trimming is enabled.
# File lib/balotelli/thread/pool.rb, line 152 def auto_trim? @auto_trim end
Get the amount of tasks that still have to be run.
# File lib/balotelli/thread/pool.rb, line 200 def backlog @mutex.synchronize { @todo.length } end
Are all tasks consumed?
# File lib/balotelli/thread/pool.rb, line 207 def done? @mutex.synchronize { _done? } end
Check if there are idle workers.
# File lib/balotelli/thread/pool.rb, line 239 def idle? @mutex.synchronize { _idle? } end
Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.
# File lib/balotelli/thread/pool.rb, line 178 def idle_trim!(timeout) @idle_trim = timeout self end
Check if idle trimming is enabled.
# File lib/balotelli/thread/pool.rb, line 172 def idle_trim? !@idle_trim.nil? end
Disable auto trimming.
# File lib/balotelli/thread/pool.rb, line 165 def no_auto_trim! @auto_trim = false self end
Turn of idle trimming.
# File lib/balotelli/thread/pool.rb, line 185 def no_idle_trim! @idle_trim = nil self end
Add a task to the pool which will execute the block with the given argument.
If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.
# File lib/balotelli/thread/pool.rb, line 250 def process(*args, &block) unless block || @block raise ArgumentError, 'you must pass a block' end task = Task.new(self, *args, &(block || @block)) @mutex.synchronize { raise 'unable to add work while shutting down' if shutdown? @todo << task if @waiting == 0 && @spawned < @max spawn_thread end @cond.signal } task end
Resize the pool with the passed arguments.
# File lib/balotelli/thread/pool.rb, line 192 def resize(min, max = nil) @min = min @max = max || min trim! end
Shut down the pool, it will block until all tasks have finished running.
# File lib/balotelli/thread/pool.rb, line 305 def shutdown @mutex.synchronize { @shutdown = :nicely @cond.broadcast } until @workers.empty? if worker = @workers.first worker.join end end if @timeout @shutdown = :now wake_up_timeout @timeout.join end end
Shut down the pool instantly without finishing to execute tasks.
# File lib/balotelli/thread/pool.rb, line 293 def shutdown! @mutex.synchronize { @shutdown = :now @cond.broadcast } wake_up_timeout self end
Check if the pool has been shut down.
# File lib/balotelli/thread/pool.rb, line 147 def shutdown? !!@shutdown end
Shutdown the pool after a given amount of time.
# File lib/balotelli/thread/pool.rb, line 327 def shutdown_after(timeout) Thread.new { sleep timeout shutdown } end
Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.
# File lib/balotelli/thread/pool.rb, line 276 def trim(force = false) @mutex.synchronize { if (force || @waiting > 0) && @spawned - @trim_requests > @min @trim_requests += 1 @cond.signal end } self end
Force #{trim}.
# File lib/balotelli/thread/pool.rb, line 288 def trim! trim true end
Wait until all tasks are consumed. The caller will be blocked until then.
# File lib/balotelli/thread/pool.rb, line 214 def wait(what = :idle) case what when :done until done? @done_mutex.synchronize { break if _done? @done.wait @done_mutex } end when :idle until idle? @done_mutex.synchronize { break if _idle? @done.wait @done_mutex } end end self end
Private Instance Methods
# File lib/balotelli/thread/pool.rb, line 457 def _done? @todo.empty? and @waiting == @spawned end
# File lib/balotelli/thread/pool.rb, line 461 def _idle? @todo.length < @waiting end
# File lib/balotelli/thread/pool.rb, line 465 def done! @done_mutex.synchronize { @done.broadcast if _done? or _idle? } end
# File lib/balotelli/thread/pool.rb, line 367 def spawn_thread @spawned += 1 thread = Thread.new { loop do task = @mutex.synchronize { if @todo.empty? while @todo.empty? if @trim_requests > 0 @trim_requests -= 1 break end break if shutdown? @waiting += 1 done! if @idle_trim and @spawned > @min check_time = Time.now + @idle_trim @cond.wait @mutex, @idle_trim @trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min else @cond.wait @mutex end @waiting -= 1 end break if @todo.empty? && shutdown? end @todo.shift } or break task.execute break if @shutdown == :now trim if auto_trim? && @spawned > @min end @mutex.synchronize { @spawned -= 1 @workers.delete thread } } @workers << thread thread end
# File lib/balotelli/thread/pool.rb, line 422 def spawn_timeout_thread @pipes = IO.pipe @timeout = Thread.new { loop do now = Time.now timeout = @timeouts.map {|task, time| next unless task.started_at now - task.started_at + task.timeout }.compact.min unless @timeouts.empty? readable, = IO.select([@pipes.first], nil, nil, timeout) break if @shutdown == :now if readable && !readable.empty? readable.first.read_nonblock 1024 end now = Time.now @timeouts.each {|task, time| next if !task.started_at || task.terminated? || task.finished? if now > task.started_at + task.timeout task.timeout! end } @timeouts.reject! { |task, _| task.terminated? || task.finished? } break if @shutdown == :now end } end
# File lib/balotelli/thread/pool.rb, line 349 def timeout_for(task, timeout) unless @timeout spawn_timeout_thread end @mutex.synchronize { @timeouts[task] = timeout wake_up_timeout } end
# File lib/balotelli/thread/pool.rb, line 361 def wake_up_timeout if defined? @pipes @pipes.last.write_nonblock 'x' rescue nil end end