class Orchestra::ThreadPool
Attributes
queue[RW]
Public Class Methods
build(count)
click to toggle source
# File lib/orchestra/thread_pool.rb, line 3 def self.build count instance = new instance.count = count instance end
default()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 9 def self.default build 1 end
new(args = {})
click to toggle source
# File lib/orchestra/thread_pool.rb, line 15 def initialize args = {} @timeout, _ = Util.extract_key_args args, :timeout_ms => 1000 @threads = Set.new @dying = Queue.new @pool_lock = Mutex.new @queue = Queue.new @jobs = {} end
Public Instance Methods
add_thread()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 52 def add_thread while_locked do add_thread! end end
count()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 36 def count threads.size end
count=(new_count)
click to toggle source
# File lib/orchestra/thread_pool.rb, line 40 def count= new_count while_locked do loop do case @threads.size <=> new_count when 0 then return when -1 then add_thread! when 1 then remove_thread! end end end end
enqueue(&work)
click to toggle source
# File lib/orchestra/thread_pool.rb, line 24 def enqueue &work job = Job.new work job.add_observer self while_locked do queue << job end job end
execute(&work)
click to toggle source
# File lib/orchestra/thread_pool.rb, line 31 def execute &work job = enqueue &work job.wait end
remove_thread()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 56 def remove_thread while_locked do remove_thread! end end
shutdown()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 60 def shutdown self.count = 0 end
status()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 64 def status while_locked do @threads.map &:status end end
threads()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 68 def threads while_locked do @threads end end
update(event, *;)
click to toggle source
# File lib/orchestra/thread_pool.rb, line 72 def update event, *; return unless event == :failed reap_thread end
Private Instance Methods
add_thread!()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 79 def add_thread! old_count = queue.num_waiting thr = Thread.new &method(:thread_loop) Thread.pass while thr.status == 'run' @threads << thr true end
reap_thread()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 93 def reap_thread thread = @dying.pop @threads.delete thread thread.join end
remove_thread!()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 87 def remove_thread! queue << :terminate reap_thread true end
thread_loop()
click to toggle source
# File lib/orchestra/thread_pool.rb, line 99 def thread_loop Thread.current.abort_on_exception = false until (job = queue.pop) == :terminate job.execute Thread.pass end rescue => error add_thread! job.set_error error ensure @dying << Thread.current end
while_locked(&block)
click to toggle source
# File lib/orchestra/thread_pool.rb, line 112 def while_locked &block @pool_lock.synchronize &block end