class Autobuild::RakeTaskParallelism
This is a rewrite of the Rake
task invocation code to use parallelism
Since autobuild does not use task arguments, we don't support them for simplicity
Attributes
available_workers[R]
finished_workers[R]
job_server[R]
workers[R]
Public Class Methods
new(level = Autobuild.parallel_build_level)
click to toggle source
# File lib/autobuild/parallel.rb, line 29 def initialize(level = Autobuild.parallel_build_level) @job_server = JobServer.new(level) @available_workers = Array.new @finished_workers = Queue.new @workers = Array.new end
Public Instance Methods
discover_dependencies(all_tasks, reverse_dependencies, task)
click to toggle source
# File lib/autobuild/parallel.rb, line 61 def discover_dependencies(all_tasks, reverse_dependencies, task) return if task.already_invoked? return if all_tasks.include?(task) # already discovered or being discovered all_tasks << task task.prerequisite_tasks.each do |dep_t| reverse_dependencies[dep_t] << task discover_dependencies(all_tasks, reverse_dependencies, dep_t) end end
finish_pending_work()
click to toggle source
# File lib/autobuild/parallel.rb, line 309 def finish_pending_work while available_workers.size != workers.size w = finished_workers.pop available_workers << w end end
invoke_parallel(required_tasks, completion_callback: proc {})
click to toggle source
Invokes the provided tasks. Unlike the rake code, this is a toplevel algorithm that does not use recursion
# File lib/autobuild/parallel.rb, line 150 def invoke_parallel(required_tasks, completion_callback: proc {}) tasks = Set.new reverse_dependencies = Hash.new { |h, k| h[k] = Set.new } required_tasks.each do |t| discover_dependencies(tasks, reverse_dependencies, t) end # The queue is the set of tasks for which all prerequisites have # been successfully executed (or where not needed). I.e. it is the # set of tasks that can be queued for execution. state = ProcessingState.new(reverse_dependencies, completion_callback: completion_callback) tasks.each do |t| state.push(t) if state.ready?(t) end # Build a reverse dependency graph (i.e. a mapping from a task to # the tasks that depend on it) # This is kind-of a topological sort. However, we don't do the full # topological sort since we would then have to scan all tasks each # time for tasks that have no currently running prerequisites loop do pending_task = state.pop unless pending_task # If we have pending workers, wait for one to be finished # until either they are all finished or the queue is not # empty anymore while !pending_task && available_workers.size != workers.size wait_for_worker_to_end(state) pending_task = state.pop end break if !pending_task && available_workers.size == workers.size end bypass_task = pending_task.disabled? || pending_task.already_invoked? || !pending_task.needed? if bypass_task pending_task.already_invoked = true state.process_finished_task(pending_task) next elsif state.trivial_task?(pending_task) Worker.execute_task(pending_task) state.process_finished_task(pending_task) next end # Get a job server token job_server.get wait_for_worker_to_end(state) until finished_workers.empty? # We do have a job server token, so we are allowed to allocate a # new worker if none are available if available_workers.empty? w = Worker.new(job_server, finished_workers) available_workers << w workers << w end worker = available_workers.pop state.mark_as_active(pending_task) worker.queue(pending_task) end not_processed = tasks.find_all { |t| !t.already_invoked? } unless not_processed.empty? cycle = resolve_cycle(tasks, not_processed, reverse_dependencies) raise "cycle in task graph: #{cycle.map(&:name).sort.join(', ')}" end end
resolve_cycle(all_tasks, tasks, reverse_dependencies)
click to toggle source
# File lib/autobuild/parallel.rb, line 225 def resolve_cycle(all_tasks, tasks, reverse_dependencies) cycle = tasks.dup chain = [] next_task = tasks.first loop do task = next_task chain << task tasks.delete(next_task) next_task = task.prerequisite_tasks.find do |dep_task| if chain.include?(dep_task) reject = chain.take_while { |t| t != dep_task } return chain[reject.size..-1] elsif tasks.include?(dep_task) true end end unless next_task Autobuild.fatal "parallel processing stopped prematurely, "\ "but no cycle is present in the remaining tasks" Autobuild.fatal "remaining tasks: #{cycle.map(&:name).join(', ')}" Autobuild.fatal "known dependencies at initialization time that "\ "could block the processing of the remaining tasks" reverse_dependencies.each do |parent_task, parents| if cycle.include?(parent_task) parents.each do |p| Autobuild.fatal " #{p}: #{parent_task}" end end end Autobuild.fatal "known dependencies right now that could block "\ "the processing of the remaining tasks" all_tasks.each do |p| (cycle & p.prerequisite_tasks).each do |t| Autobuild.fatal " #{p}: #{t}" end end raise "failed to resolve cycle in #{cycle.map(&:name).join(', ')}" end end chain end
wait_for_worker_to_end(state)
click to toggle source
# File lib/autobuild/parallel.rb, line 36 def wait_for_worker_to_end(state) w = finished_workers.pop finished_task, error = w.last_result available_workers << w if error if available_workers.size != workers.size if finished_task.respond_to?(:package) && finished_task.package Autobuild.error "got an error processing "\ "#{finished_task.package.name}, "\ "waiting for pending jobs to end" else Autobuild.error "got an error doing parallel processing, "\ "waiting for pending jobs to end" end end begin finish_pending_work ensure raise error end end state.process_finished_task(finished_task) end