class Autobuild::RakeTaskParallelism

This is a rewrite of the Rake task invocation code to use parallelism

Since autobuild does not use task arguments, we don't support them for simplicity

Attributes

available_workers[R]
finished_workers[R]
job_server[R]
workers[R]

Public Class Methods

new(level = Autobuild.parallel_build_level) click to toggle source
# File lib/autobuild/parallel.rb, line 29
def initialize(level = Autobuild.parallel_build_level)
    @job_server = JobServer.new(level)
    @available_workers = Array.new
    @finished_workers = Queue.new
    @workers = Array.new
end

Public Instance Methods

discover_dependencies(all_tasks, reverse_dependencies, task) click to toggle source
# File lib/autobuild/parallel.rb, line 61
def discover_dependencies(all_tasks, reverse_dependencies, task)
    return if task.already_invoked?
    return if all_tasks.include?(task) # already discovered or being discovered

    all_tasks << task

    task.prerequisite_tasks.each do |dep_t|
        reverse_dependencies[dep_t] << task
        discover_dependencies(all_tasks, reverse_dependencies, dep_t)
    end
end
finish_pending_work() click to toggle source
# File lib/autobuild/parallel.rb, line 309
def finish_pending_work
    while available_workers.size != workers.size
        w = finished_workers.pop
        available_workers << w
    end
end
invoke_parallel(required_tasks, completion_callback: proc {}) click to toggle source

Invokes the provided tasks. Unlike the rake code, this is a toplevel algorithm that does not use recursion

# File lib/autobuild/parallel.rb, line 150
def invoke_parallel(required_tasks, completion_callback: proc {})
    tasks = Set.new
    reverse_dependencies = Hash.new { |h, k| h[k] = Set.new }
    required_tasks.each do |t|
        discover_dependencies(tasks, reverse_dependencies, t)
    end
    # The queue is the set of tasks for which all prerequisites have
    # been successfully executed (or where not needed). I.e. it is the
    # set of tasks that can be queued for execution.
    state = ProcessingState.new(reverse_dependencies,
                                completion_callback: completion_callback)
    tasks.each do |t|
        state.push(t) if state.ready?(t)
    end

    # Build a reverse dependency graph (i.e. a mapping from a task to
    # the tasks that depend on it)

    # This is kind-of a topological sort. However, we don't do the full
    # topological sort since we would then have to scan all tasks each
    # time for tasks that have no currently running prerequisites

    loop do
        pending_task = state.pop
        unless pending_task
            # If we have pending workers, wait for one to be finished
            # until either they are all finished or the queue is not
            # empty anymore
            while !pending_task && available_workers.size != workers.size
                wait_for_worker_to_end(state)
                pending_task = state.pop
            end

            break if !pending_task && available_workers.size == workers.size
        end

        bypass_task = pending_task.disabled? ||
                      pending_task.already_invoked? ||
                      !pending_task.needed?

        if bypass_task
            pending_task.already_invoked = true
            state.process_finished_task(pending_task)
            next
        elsif state.trivial_task?(pending_task)
            Worker.execute_task(pending_task)
            state.process_finished_task(pending_task)
            next
        end

        # Get a job server token
        job_server.get

        wait_for_worker_to_end(state) until finished_workers.empty?

        # We do have a job server token, so we are allowed to allocate a
        # new worker if none are available
        if available_workers.empty?
            w = Worker.new(job_server, finished_workers)
            available_workers << w
            workers << w
        end

        worker = available_workers.pop
        state.mark_as_active(pending_task)
        worker.queue(pending_task)
    end

    not_processed = tasks.find_all { |t| !t.already_invoked? }
    unless not_processed.empty?
        cycle = resolve_cycle(tasks, not_processed, reverse_dependencies)
        raise "cycle in task graph: #{cycle.map(&:name).sort.join(', ')}"
    end
end
resolve_cycle(all_tasks, tasks, reverse_dependencies) click to toggle source
# File lib/autobuild/parallel.rb, line 225
def resolve_cycle(all_tasks, tasks, reverse_dependencies)
    cycle = tasks.dup
    chain = []
    next_task = tasks.first
    loop do
        task = next_task
        chain << task
        tasks.delete(next_task)
        next_task = task.prerequisite_tasks.find do |dep_task|
            if chain.include?(dep_task)
                reject = chain.take_while { |t| t != dep_task }
                return chain[reject.size..-1]
            elsif tasks.include?(dep_task)
                true
            end
        end
        unless next_task
            Autobuild.fatal "parallel processing stopped prematurely, "\
                "but no cycle is present in the remaining tasks"
            Autobuild.fatal "remaining tasks: #{cycle.map(&:name).join(', ')}"
            Autobuild.fatal "known dependencies at initialization time that "\
                "could block the processing of the remaining tasks"
            reverse_dependencies.each do |parent_task, parents|
                if cycle.include?(parent_task)
                    parents.each do |p|
                        Autobuild.fatal "  #{p}: #{parent_task}"
                    end
                end
            end
            Autobuild.fatal "known dependencies right now that could block "\
                "the processing of the remaining tasks"
            all_tasks.each do |p|
                (cycle & p.prerequisite_tasks).each do |t|
                    Autobuild.fatal "  #{p}: #{t}"
                end
            end
            raise "failed to resolve cycle in #{cycle.map(&:name).join(', ')}"
        end
    end
    chain
end
wait_for_worker_to_end(state) click to toggle source
# File lib/autobuild/parallel.rb, line 36
def wait_for_worker_to_end(state)
    w = finished_workers.pop
    finished_task, error = w.last_result
    available_workers << w
    if error
        if available_workers.size != workers.size
            if finished_task.respond_to?(:package) && finished_task.package
                Autobuild.error "got an error processing "\
                    "#{finished_task.package.name}, "\
                    "waiting for pending jobs to end"
            else
                Autobuild.error "got an error doing parallel processing, "\
                    "waiting for pending jobs to end"
            end
        end
        begin
            finish_pending_work
        ensure
            raise error
        end
    end

    state.process_finished_task(finished_task)
end