# File lib/dynflow/executors/parallel/pool.rb, line 49 def initialize(core, name, pool_size, transaction_adapter) @name = name @executor_core = core @pool_size = pool_size @free_workers = Array.new(pool_size) { |i| Worker.spawn("worker-#{i}", reference, transaction_adapter) } @jobs = JobStorage.new end
# File lib/dynflow/executors/parallel/pool.rb, line 77 def execution_status(execution_plan_id = nil) { :pool_size => @pool_size, :free_workers => @free_workers.count, :execution_status => @jobs.execution_status(execution_plan_id) } end
# File lib/dynflow/executors/parallel/pool.rb, line 68 def handle_persistence_error(error) @executor_core.tell([:handle_persistence_error, error]) end
# File lib/dynflow/executors/parallel/pool.rb, line 57 def schedule_work(work) @jobs.add work distribute_jobs end
# File lib/dynflow/executors/parallel/pool.rb, line 72 def start_termination(*args) super try_to_terminate end
# File lib/dynflow/executors/parallel/pool.rb, line 62 def worker_done(worker, work) @executor_core.tell([:work_finished, work]) @free_workers << worker distribute_jobs end
# File lib/dynflow/executors/parallel/pool.rb, line 93 def distribute_jobs try_to_terminate @free_workers.pop << @jobs.pop until @free_workers.empty? || @jobs.empty? end
# File lib/dynflow/executors/parallel/pool.rb, line 85 def try_to_terminate if terminating? && @free_workers.size == @pool_size @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait) @executor_core.tell([:finish_termination, @name]) finish_termination end end