class PromisePool::ThreadPool
Attributes
condv[R]
idle_time[RW]
max_size[RW]
mutex[R]
queue[R]
waiting[R]
workers[R]
Public Class Methods
new(max_size, idle_time=60)
click to toggle source
# File lib/promise_pool/thread_pool.rb, line 14 def initialize max_size, idle_time=60 @max_size = max_size @idle_time = idle_time @queue = Queue.new @mutex = Mutex.new @workers = [] @waiting = 0 end
Public Instance Methods
defer(promise_mutex, &job)
click to toggle source
# File lib/promise_pool/thread_pool.rb, line 31 def defer promise_mutex, &job mutex.synchronize do task = Task.new(job, promise_mutex) queue << task spawn_worker if waiting < queue_size && workers.size < max_size task end end
queue_size()
click to toggle source
# File lib/promise_pool/thread_pool.rb, line 27 def queue_size queue.size end
shutdown()
click to toggle source
Block on shutting down, and should not add more jobs while shutting down
# File lib/promise_pool/thread_pool.rb, line 47 def shutdown workers.size.times{ trim(true) } workers.first.join && trim(true) until workers.empty? mutex.synchronize{ queue.clear } end
size()
click to toggle source
# File lib/promise_pool/thread_pool.rb, line 23 def size workers.size end
trim(force=false)
click to toggle source
# File lib/promise_pool/thread_pool.rb, line 40 def trim force=false mutex.synchronize do queue << lambda{ |_| false } if force || waiting > 0 end end
Private Instance Methods
spawn_worker()
click to toggle source
# File lib/promise_pool/thread_pool.rb, line 57 def spawn_worker workers << Thread.new{ Thread.current.abort_on_exception = true task = nil begin mutex.synchronize do @waiting += 1 task = queue.pop(mutex, idle_time) @waiting -= 1 end end while task.call(Thread.current) mutex.synchronize{ workers.delete(Thread.current) } } end