class Bixby::ThreadPool

Constants

DEFAULT_IDLE_TIMEOUT
DEFAULT_MAX
DEFAULT_MIN

Public Class Methods

new(options = {}) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 19
def initialize(options = {})
  @input_queue = Queue.new
  @lock = Monitor.new
  @workers = []
  @min_size = options[:min_size] || DEFAULT_MIN
  @max_size = options[:max_size] || DEFAULT_MAX
  @idle_timeout = options[:idle_timeout] || DEFAULT_IDLE_TIMEOUT
  @size = 0

  expand(@min_size)
end

Public Instance Methods

<<(proc) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 45
def <<(proc)
  enqueue(:perform, block)
  nil
end
contract(count, &block) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 117
def contract(count, &block)
  @lock.synchronize do
    raise 'Count is too large.' if count > @size

    count.times do
      callback = Proc.new do |worker|
        remove_worker(worker)
        block.call if block
      end

      enqueue(:shutdown, callback)
    end
  end

  nil
end
dispose() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 83
def dispose
  @lock.synchronize do
    shutdown
    join
  end

  nil
end
enqueue(command, block=nil) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 31
def enqueue(command, block=nil)
  logger.debug { "enqueue new task: #{command}" }
  @input_queue.push(Task.new(command, block))
  if command == :perform then
    grow
  end
  nil
end
expand(count) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 106
def expand(count)
  @lock.synchronize do
    logger.debug "expanding by #{count} threads (from #{@size})"
    count.times do
      create_worker
    end
  end

  nil
end
inspect() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 92
def inspect
  "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} threads=#{size} jobs=#{num_jobs}>"
end
join(max_wait = nil) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 75
def join(max_wait = nil)
  results = @workers.map { |w| w.join(max_wait) }
  @workers.clear
  @size = 0

  return results
end
num_busy() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 58
def num_busy
  @lock.synchronize do
    return @workers.find_all{ |w| w.working? }.size
  end
end
Also aliased as: num_working
num_idle() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 54
def num_idle
  @size - num_busy
end
num_jobs() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 50
def num_jobs
  @input_queue.size
end
num_working()
Alias for: num_busy
perform(&block) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 40
def perform(&block)
  enqueue(:perform, block)
  nil
end
resize(new_size) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 134
def resize(new_size)
  @lock.synchronize do
    if new_size > @size
      expand(new_size - @size)
    elsif new_size < @size
      contract(@size - new_size)
    end
  end

  nil
end
shutdown(&block) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 65
def shutdown(&block)
  @lock.synchronize do
    @size.times do
      enqueue(:shutdown, block)
    end
  end

  nil
end
size() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 100
def size
  @lock.synchronize do
    return @size
  end
end
summary() click to toggle source

For debugging

# File lib/bixby-common/util/thread_pool.rb, line 147
def summary
  @lock.synchronize do
    puts "jobs: #{@input_queue.size}"
    puts "workers: #{@workers.size}"
    @workers.each do |w|
      puts "  " + w.thread.inspect
    end
  end
end
to_s() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 96
def to_s
  inspect
end

Private Instance Methods

create_worker() click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 160
def create_worker
  @lock.synchronize do
    logger.debug "spawning new worker thread"

    exit_handler = lambda { |worker, reason|
      @lock.synchronize do
        if reason == :exception or (reason == :timeout && @size > @min_size) then
          remove_worker(worker)
          grow
          return true
        end
        false
      end
    }

    @workers << Worker.new(@input_queue, @idle_timeout, exit_handler)
    @size += 1
  end
end
grow() click to toggle source

Grow the pool by one if we have more jobs than idle workers

# File lib/bixby-common/util/thread_pool.rb, line 181
def grow
  @lock.synchronize do
    prune
    logger.debug { "jobs: #{num_jobs}; busy: #{num_working}; idle: #{num_idle}" }
    if @size == 0 || (@size < @max_size && num_jobs > 0 && num_jobs > num_idle) then
      space = @max_size-@size
      jobs = num_jobs-num_idle
      needed = space < jobs ? space : jobs
      needed = 1 if needed <= 0
      expand(needed)
    else
      logger.debug "NOT growing the pool!"
    end
  end

  nil
end
prune() click to toggle source

Remove any dead worker threads which may not have been cleaned up properly (via callback handler)

# File lib/bixby-common/util/thread_pool.rb, line 201
def prune
  @lock.synchronize do
    @workers.delete_if { |w| !w.alive? }
    @size = @workers.size
  end
end
remove_worker(worker) click to toggle source
# File lib/bixby-common/util/thread_pool.rb, line 208
def remove_worker(worker)
  @lock.synchronize do
    @workers.delete(worker)
    @size -= 1
  end

  nil
end