class Bixby::ThreadPool
Constants
- DEFAULT_IDLE_TIMEOUT
- DEFAULT_MAX
- DEFAULT_MIN
Public Class Methods
new(options = {})
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 19 def initialize(options = {}) @input_queue = Queue.new @lock = Monitor.new @workers = [] @min_size = options[:min_size] || DEFAULT_MIN @max_size = options[:max_size] || DEFAULT_MAX @idle_timeout = options[:idle_timeout] || DEFAULT_IDLE_TIMEOUT @size = 0 expand(@min_size) end
Public Instance Methods
<<(proc)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 45 def <<(proc) enqueue(:perform, block) nil end
contract(count, &block)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 117 def contract(count, &block) @lock.synchronize do raise 'Count is too large.' if count > @size count.times do callback = Proc.new do |worker| remove_worker(worker) block.call if block end enqueue(:shutdown, callback) end end nil end
dispose()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 83 def dispose @lock.synchronize do shutdown join end nil end
enqueue(command, block=nil)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 31 def enqueue(command, block=nil) logger.debug { "enqueue new task: #{command}" } @input_queue.push(Task.new(command, block)) if command == :perform then grow end nil end
expand(count)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 106 def expand(count) @lock.synchronize do logger.debug "expanding by #{count} threads (from #{@size})" count.times do create_worker end end nil end
inspect()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 92 def inspect "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} threads=#{size} jobs=#{num_jobs}>" end
join(max_wait = nil)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 75 def join(max_wait = nil) results = @workers.map { |w| w.join(max_wait) } @workers.clear @size = 0 return results end
num_busy()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 58 def num_busy @lock.synchronize do return @workers.find_all{ |w| w.working? }.size end end
Also aliased as: num_working
num_idle()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 54 def num_idle @size - num_busy end
num_jobs()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 50 def num_jobs @input_queue.size end
perform(&block)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 40 def perform(&block) enqueue(:perform, block) nil end
resize(new_size)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 134 def resize(new_size) @lock.synchronize do if new_size > @size expand(new_size - @size) elsif new_size < @size contract(@size - new_size) end end nil end
shutdown(&block)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 65 def shutdown(&block) @lock.synchronize do @size.times do enqueue(:shutdown, block) end end nil end
size()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 100 def size @lock.synchronize do return @size end end
summary()
click to toggle source
For debugging
# File lib/bixby-common/util/thread_pool.rb, line 147 def summary @lock.synchronize do puts "jobs: #{@input_queue.size}" puts "workers: #{@workers.size}" @workers.each do |w| puts " " + w.thread.inspect end end end
to_s()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 96 def to_s inspect end
Private Instance Methods
create_worker()
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 160 def create_worker @lock.synchronize do logger.debug "spawning new worker thread" exit_handler = lambda { |worker, reason| @lock.synchronize do if reason == :exception or (reason == :timeout && @size > @min_size) then remove_worker(worker) grow return true end false end } @workers << Worker.new(@input_queue, @idle_timeout, exit_handler) @size += 1 end end
grow()
click to toggle source
Grow the pool by one if we have more jobs than idle workers
# File lib/bixby-common/util/thread_pool.rb, line 181 def grow @lock.synchronize do prune logger.debug { "jobs: #{num_jobs}; busy: #{num_working}; idle: #{num_idle}" } if @size == 0 || (@size < @max_size && num_jobs > 0 && num_jobs > num_idle) then space = @max_size-@size jobs = num_jobs-num_idle needed = space < jobs ? space : jobs needed = 1 if needed <= 0 expand(needed) else logger.debug "NOT growing the pool!" end end nil end
prune()
click to toggle source
Remove any dead worker threads which may not have been cleaned up properly (via callback handler)
# File lib/bixby-common/util/thread_pool.rb, line 201 def prune @lock.synchronize do @workers.delete_if { |w| !w.alive? } @size = @workers.size end end
remove_worker(worker)
click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 208 def remove_worker(worker) @lock.synchronize do @workers.delete(worker) @size -= 1 end nil end