class Orchestra::ThreadPool

Attributes

queue[RW]

Public Class Methods

build(count) click to toggle source
# File lib/orchestra/thread_pool.rb, line 3
def self.build count
  instance = new
  instance.count = count
  instance
end
default() click to toggle source
# File lib/orchestra/thread_pool.rb, line 9
def self.default
  build 1
end
new(args = {}) click to toggle source
# File lib/orchestra/thread_pool.rb, line 15
def initialize args = {}
  @timeout, _ = Util.extract_key_args args, :timeout_ms => 1000
  @threads = Set.new
  @dying = Queue.new
  @pool_lock = Mutex.new
  @queue = Queue.new
  @jobs = {}
end

Public Instance Methods

add_thread() click to toggle source
# File lib/orchestra/thread_pool.rb, line 52
def add_thread
  while_locked do add_thread! end
end
count() click to toggle source
# File lib/orchestra/thread_pool.rb, line 36
def count
  threads.size
end
count=(new_count) click to toggle source
# File lib/orchestra/thread_pool.rb, line 40
def count= new_count
  while_locked do
    loop do
      case @threads.size <=> new_count
      when 0 then return
      when -1 then add_thread!
      when 1 then remove_thread!
      end
    end
  end
end
enqueue(&work) click to toggle source
# File lib/orchestra/thread_pool.rb, line 24
def enqueue &work
  job = Job.new work
  job.add_observer self
  while_locked do queue << job end
  job
end
execute(&work) click to toggle source
# File lib/orchestra/thread_pool.rb, line 31
def execute &work
  job = enqueue &work
  job.wait
end
remove_thread() click to toggle source
# File lib/orchestra/thread_pool.rb, line 56
def remove_thread
  while_locked do remove_thread! end
end
shutdown() click to toggle source
# File lib/orchestra/thread_pool.rb, line 60
def shutdown
  self.count = 0
end
status() click to toggle source
# File lib/orchestra/thread_pool.rb, line 64
def status
  while_locked do @threads.map &:status end
end
threads() click to toggle source
# File lib/orchestra/thread_pool.rb, line 68
def threads
  while_locked do @threads end
end
update(event, *;) click to toggle source
# File lib/orchestra/thread_pool.rb, line 72
def update event, *;
  return unless event == :failed
  reap_thread
end

Private Instance Methods

add_thread!() click to toggle source
# File lib/orchestra/thread_pool.rb, line 79
def add_thread!
  old_count = queue.num_waiting
  thr = Thread.new &method(:thread_loop)
  Thread.pass while thr.status == 'run'
  @threads << thr
  true
end
reap_thread() click to toggle source
# File lib/orchestra/thread_pool.rb, line 93
def reap_thread
  thread = @dying.pop
  @threads.delete thread
  thread.join
end
remove_thread!() click to toggle source
# File lib/orchestra/thread_pool.rb, line 87
def remove_thread!
  queue << :terminate
  reap_thread
  true
end
thread_loop() click to toggle source
# File lib/orchestra/thread_pool.rb, line 99
def thread_loop
  Thread.current.abort_on_exception = false
  until (job = queue.pop) == :terminate
    job.execute
    Thread.pass
  end
rescue => error
  add_thread!
  job.set_error error
ensure
  @dying << Thread.current
end
while_locked(&block) click to toggle source
# File lib/orchestra/thread_pool.rb, line 112
def while_locked &block
  @pool_lock.synchronize &block
end