class Workers::Pool

Constants

DEFAULT_POOL_SIZE

Attributes

on_exception[RW]

Public Class Methods

new(options = {}) click to toggle source
# File lib/workers/pool.rb, line 9
def initialize(options = {})
  @logger = Workers::LogProxy.new(options[:logger])
  @worker_class = options[:worker_class] || Workers::Worker
  @input_queue = Queue.new
  @lock = Monitor.new
  @workers = Set.new
  @size = 0
  @on_exception = options[:on_exception]

  expand(options[:size] || Workers::Pool::DEFAULT_POOL_SIZE)

  nil
end

Public Instance Methods

contract(count, &block) click to toggle source
# File lib/workers/pool.rb, line 83
def contract(count, &block)
  @lock.synchronize do
    raise Workers::PoolSizeError, 'Count is too large.' if count > @size

    count.times do
      callback = Proc.new do |worker|
        remove_worker(worker)
        block.call if block
      end

      enqueue(:shutdown, callback)
      @size -= 1
    end
  end

  nil
end
dispose(max_wait = nil, &block) click to toggle source
# File lib/workers/pool.rb, line 53
def dispose(max_wait = nil, &block)
  shutdown do
    block.call if block
  end

  join(max_wait)
end
enqueue(command, data = nil) click to toggle source
# File lib/workers/pool.rb, line 23
def enqueue(command, data = nil)
  @input_queue.push(Event.new(command, data))

  nil
end
expand(count) click to toggle source
# File lib/workers/pool.rb, line 71
def expand(count)
  @lock.synchronize do
    count.times do
      worker = @worker_class.new(:input_queue => @input_queue, :on_exception => @on_exception, :logger => @logger)
      @workers << worker
      @size += 1
    end
  end

  nil
end
inspect() click to toggle source
# File lib/workers/pool.rb, line 61
def inspect
  "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} size=#{size}>"
end
join(max_wait = nil) click to toggle source
# File lib/workers/pool.rb, line 45
def join(max_wait = nil)
  results = @workers.map { |w| w.join(max_wait) }
  @workers.clear
  @size = 0

  results
end
perform(&block) click to toggle source
# File lib/workers/pool.rb, line 29
def perform(&block)
  enqueue(:perform, block)

  nil
end
resize(new_size) click to toggle source
# File lib/workers/pool.rb, line 101
def resize(new_size)
  @lock.synchronize do
    if new_size > @size
      expand(new_size - @size)
    elsif new_size < @size
      contract(@size - new_size)
    end
  end

  nil
end
shutdown(&block) click to toggle source
# File lib/workers/pool.rb, line 35
def shutdown(&block)
  @lock.synchronize do
    @size.times do
      enqueue(:shutdown, block)
    end
  end

  nil
end
size() click to toggle source
# File lib/workers/pool.rb, line 65
def size
  @lock.synchronize do
    @size
  end
end

Private Instance Methods

remove_worker(worker) click to toggle source
# File lib/workers/pool.rb, line 115
def remove_worker(worker)
  @lock.synchronize do
    @workers.delete(worker)
  end

  nil
end