class RSpec::MultiprocessRunner::Coordinator

Attributes

options[RW]

Public Class Methods

new(worker_count, files, options) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 10
def initialize(worker_count, files, options)
  self.options = options
  @file_buffer = []
  @workers = []
  @stopped_workers = []
  @worker_results = []
  @file_coordinator = FileCoordinator.new(files, options)
end

Public Instance Methods

exit_code() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 41
def exit_code
  exit_code = 0
  exit_code |= 1 if any_example_failed?
  exit_code |= 2 if failed_workers.any? || error_messages.any?
  exit_code |= 4 if work_left_to_do? || @file_coordinator.missing_files.any?
  exit_code
end
failed?() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 37
def failed?
  0 < exit_code
end
run() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 19
def run
  @start_time = Time.now
  expected_worker_numbers.each do |n|
    create_and_start_worker_if_necessary(n)
  end
  run_loop
  quit_all_workers
  @file_coordinator.finished
  if @file_coordinator.remaining_files.any?
    run_loop
    quit_all_workers
    @file_coordinator.finished
  end
  print_summary

  exit_code
end
shutdown(options={}) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 49
def shutdown(options={})
  if @shutting_down
    # Immediately kill the workers if shutdown is requested again
    end_workers_in_parallel(@workers.dup, :kill_now)
  else
    @shutting_down = true
    print "Shutting down #{pluralize(@workers.size, "worker")} …" if options[:print_summary]
    # end_workers_in_parallel modifies @workers, so dup before sending in
    end_workers_in_parallel(@workers.dup, :shutdown_now)
    if options[:print_summary]
      puts " done"
      print_summary
    end
  end
end

Private Instance Methods

act_on_available_worker_messages(timeout) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 133
def act_on_available_worker_messages(timeout)
  while (select_result = IO.select(worker_sockets, nil, nil, timeout))
    select_result.first.each do |readable_socket|
      ready_worker = @workers.detect { |worker| worker.socket == readable_socket }
      next unless ready_worker # Worker is already gone
      worker_status = ready_worker.receive_and_act_on_message_from_worker
      if worker_status == :dead
        reap_one_worker(ready_worker, "died")
      elsif work_left_to_do? && !ready_worker.working?
        ready_worker.run_file(get_file)
        send_results(ready_worker)
      end
    end
  end
end
add_file_to_buffer() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 117
def add_file_to_buffer
  file = @file_coordinator.get_file
  @file_buffer << file if file
end
any_example_failed?() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 241
def any_example_failed?
  @file_coordinator.results.detect { |r| r.status == "failed" }
end
combine_example_results() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 233
def combine_example_results
  @file_coordinator.results.select { |r| r.run_status == "example_complete" }.sort_by { |r| r.time_finished }
end
create_and_start_worker_if_necessary(n) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 178
def create_and_start_worker_if_necessary(n)
  if work_left_to_do?
    $stderr.puts "(Re)starting worker #{n}"
    new_worker = Worker.new(n, options)
    @workers << new_worker
    new_worker.start
    file = get_file
    new_worker.run_file(file)
  end
end
end_workers_in_parallel(some_workers, end_method) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 96
def end_workers_in_parallel(some_workers, end_method)
  end_threads = some_workers.map do |worker|
    # This method is not threadsafe because it updates instance variables.
    # But it's fine to run it outside of the thread because it doesn't
    # block.
    mark_worker_as_stopped(worker)
    Thread.new do
      worker.send(end_method)
    end
  end
  end_threads.each(&:join)
end
error_messages() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 237
def error_messages
  @file_coordinator.results.select { |r| r.run_status == "error_running" }.sort_by { |r| r.time_finished }
end
expected_worker_numbers() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 174
def expected_worker_numbers
  (1..options.worker_count).to_a
end
failed_workers() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 129
def failed_workers
  @file_coordinator.failed_workers
end
get_file() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 122
def get_file
  if work_left_to_do?
    add_file_to_buffer
    @file_buffer.shift
  end
end
log_failed_files(output, failed_files) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 275
def log_failed_files(output, failed_files)
  return if failed_files.nil?
  output.puts
  output.puts "Writing failures to file: #{options.log_failing_files}"
  File.open(options.log_failing_files, "w+") do |io|
    failed_files.each do |file|
      io << file
      io << "\n"
    end
  end
end
mark_worker_as_stopped(worker) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 161
def mark_worker_as_stopped(worker)
  @stopped_workers << worker
  @workers.reject! { |w| w == worker }
  send_results(worker)
  @file_coordinator.send_worker_status(worker) if worker.deactivation_reason
end
pluralize(count, string) click to toggle source

Copied from RSpec

# File lib/rspec/multiprocess_runner/coordinator.rb, line 288
def pluralize(count, string)
  "#{count} #{string}#{'s' unless count.to_f == 1}"
end
print_elapsed_time(output, seconds_elapsed) click to toggle source
print_error_messages(output) click to toggle source
print_example_counts(output, by_status_and_time) click to toggle source
print_failed_example_details(output, failed_example_results) click to toggle source
print_failed_process_details(output) click to toggle source
print_missing_files(output) click to toggle source
print_pending_example_details(output, pending_example_results) click to toggle source
print_skipped_files_details(output) click to toggle source
print_summary() click to toggle source
quit_all_workers() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 91
def quit_all_workers
  # quit_workers modifies @workers, so dup before sending in
  quit_workers(@workers.dup)
end
quit_idle_unnecessary_workers() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 199
def quit_idle_unnecessary_workers
  unless work_left_to_do?
    idle_workers = @workers.reject(&:working?)
    quit_workers(idle_workers)
  end
end
quit_workers(some_workers) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 109
def quit_workers(some_workers)
  end_workers_in_parallel(some_workers, :quit_when_idle_and_wait_for_quit)
end
reap_one_worker(worker, reason) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 155
def reap_one_worker(worker, reason)
  worker.reap
  worker.deactivation_reason = reason
  mark_worker_as_stopped(worker)
end
reap_stalled_workers() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 168
def reap_stalled_workers
  @workers.select(&:stalled?).each do |stalled_worker|
    reap_one_worker(stalled_worker, "stalled")
  end
end
run_loop() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 80
def run_loop
  add_file_to_buffer
  loop do
    act_on_available_worker_messages(0.3)
    reap_stalled_workers
    start_missing_workers
    quit_idle_unnecessary_workers
    break unless @workers.detect(&:working?)
  end
end
send_results(worker) click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 149
def send_results(worker)
  results_to_send = worker.example_results - @worker_results
  @worker_results += results_to_send
  @file_coordinator.send_results(results_to_send)
end
sort_files(files) click to toggle source

Sorting by decreasing size attempts to ensure we don't send the slowest file to a worker right before all the other workers finish and then end up waiting for that one process to finish. In the future it would be nice to log execution time and sort by that.

# File lib/rspec/multiprocess_runner/coordinator.rb, line 71
def sort_files(files)
  # #sort_by caches the File.size result so we only call it once per file.
  files.sort_by { |file| -File.size(file) }
end
start_missing_workers() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 189
def start_missing_workers
  if @workers.size < options.worker_count && work_left_to_do?
    running_process_numbers = @workers.map(&:environment_number)
    missing_process_numbers = expected_worker_numbers - running_process_numbers
    missing_process_numbers.each do |n|
      create_and_start_worker_if_necessary(n)
    end
  end
end
summary_file() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 206
def summary_file
  File.new(options.summary_filename, 'w') if (options.summary_filename)
end
work_left_to_do?() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 113
def work_left_to_do?
  @file_buffer.any?
end
worker_sockets() click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 76
def worker_sockets
  @workers.map(&:socket)
end