class PromisePool::ThreadPool

Attributes

condv[R]
idle_time[RW]
max_size[RW]
mutex[R]
queue[R]
waiting[R]
workers[R]

Public Class Methods

new(max_size, idle_time=60) click to toggle source
# File lib/promise_pool/thread_pool.rb, line 14
def initialize max_size, idle_time=60
  @max_size  = max_size
  @idle_time = idle_time
  @queue     = Queue.new
  @mutex     = Mutex.new
  @workers   = []
  @waiting   = 0
end

Public Instance Methods

defer(promise_mutex, &job) click to toggle source
# File lib/promise_pool/thread_pool.rb, line 31
def defer promise_mutex, &job
  mutex.synchronize do
    task = Task.new(job, promise_mutex)
    queue << task
    spawn_worker if waiting < queue_size && workers.size < max_size
    task
  end
end
queue_size() click to toggle source
# File lib/promise_pool/thread_pool.rb, line 27
def queue_size
  queue.size
end
shutdown() click to toggle source

Block on shutting down, and should not add more jobs while shutting down

# File lib/promise_pool/thread_pool.rb, line 47
def shutdown
  workers.size.times{ trim(true) }
  workers.first.join && trim(true) until workers.empty?
  mutex.synchronize{ queue.clear }
end
size() click to toggle source
# File lib/promise_pool/thread_pool.rb, line 23
def size
  workers.size
end
trim(force=false) click to toggle source
# File lib/promise_pool/thread_pool.rb, line 40
def trim force=false
  mutex.synchronize do
    queue << lambda{ |_| false } if force || waiting > 0
  end
end

Private Instance Methods

spawn_worker() click to toggle source
# File lib/promise_pool/thread_pool.rb, line 57
def spawn_worker
  workers << Thread.new{
    Thread.current.abort_on_exception = true

    task = nil
    begin
      mutex.synchronize do
        @waiting += 1
        task = queue.pop(mutex, idle_time)
        @waiting -= 1
      end
    end while task.call(Thread.current)

    mutex.synchronize{ workers.delete(Thread.current) }
  }
end