class RSpec::MultiprocessRunner::Coordinator
Attributes
options[RW]
Public Class Methods
new(worker_count, files, options)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 10 def initialize(worker_count, files, options) self.options = options @file_buffer = [] @workers = [] @stopped_workers = [] @worker_results = [] @file_coordinator = FileCoordinator.new(files, options) end
Public Instance Methods
exit_code()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 41 def exit_code exit_code = 0 exit_code |= 1 if any_example_failed? exit_code |= 2 if failed_workers.any? || error_messages.any? exit_code |= 4 if work_left_to_do? || @file_coordinator.missing_files.any? exit_code end
failed?()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 37 def failed? 0 < exit_code end
run()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 19 def run @start_time = Time.now expected_worker_numbers.each do |n| create_and_start_worker_if_necessary(n) end run_loop quit_all_workers @file_coordinator.finished if @file_coordinator.remaining_files.any? run_loop quit_all_workers @file_coordinator.finished end print_summary exit_code end
shutdown(options={})
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 49 def shutdown(options={}) if @shutting_down # Immediately kill the workers if shutdown is requested again end_workers_in_parallel(@workers.dup, :kill_now) else @shutting_down = true print "Shutting down #{pluralize(@workers.size, "worker")} …" if options[:print_summary] # end_workers_in_parallel modifies @workers, so dup before sending in end_workers_in_parallel(@workers.dup, :shutdown_now) if options[:print_summary] puts " done" print_summary end end end
Private Instance Methods
act_on_available_worker_messages(timeout)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 133 def act_on_available_worker_messages(timeout) while (select_result = IO.select(worker_sockets, nil, nil, timeout)) select_result.first.each do |readable_socket| ready_worker = @workers.detect { |worker| worker.socket == readable_socket } next unless ready_worker # Worker is already gone worker_status = ready_worker.receive_and_act_on_message_from_worker if worker_status == :dead reap_one_worker(ready_worker, "died") elsif work_left_to_do? && !ready_worker.working? ready_worker.run_file(get_file) send_results(ready_worker) end end end end
add_file_to_buffer()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 117 def add_file_to_buffer file = @file_coordinator.get_file @file_buffer << file if file end
any_example_failed?()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 241 def any_example_failed? @file_coordinator.results.detect { |r| r.status == "failed" } end
combine_example_results()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 233 def combine_example_results @file_coordinator.results.select { |r| r.run_status == "example_complete" }.sort_by { |r| r.time_finished } end
create_and_start_worker_if_necessary(n)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 178 def create_and_start_worker_if_necessary(n) if work_left_to_do? $stderr.puts "(Re)starting worker #{n}" new_worker = Worker.new(n, options) @workers << new_worker new_worker.start file = get_file new_worker.run_file(file) end end
end_workers_in_parallel(some_workers, end_method)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 96 def end_workers_in_parallel(some_workers, end_method) end_threads = some_workers.map do |worker| # This method is not threadsafe because it updates instance variables. # But it's fine to run it outside of the thread because it doesn't # block. mark_worker_as_stopped(worker) Thread.new do worker.send(end_method) end end end_threads.each(&:join) end
error_messages()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 237 def error_messages @file_coordinator.results.select { |r| r.run_status == "error_running" }.sort_by { |r| r.time_finished } end
expected_worker_numbers()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 174 def expected_worker_numbers (1..options.worker_count).to_a end
failed_workers()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 129 def failed_workers @file_coordinator.failed_workers end
get_file()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 122 def get_file if work_left_to_do? add_file_to_buffer @file_buffer.shift end end
log_failed_files(output, failed_files)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 275 def log_failed_files(output, failed_files) return if failed_files.nil? output.puts output.puts "Writing failures to file: #{options.log_failing_files}" File.open(options.log_failing_files, "w+") do |io| failed_files.each do |file| io << file io << "\n" end end end
mark_worker_as_stopped(worker)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 161 def mark_worker_as_stopped(worker) @stopped_workers << worker @workers.reject! { |w| w == worker } send_results(worker) @file_coordinator.send_worker_status(worker) if worker.deactivation_reason end
pluralize(count, string)
click to toggle source
Copied from RSpec
# File lib/rspec/multiprocess_runner/coordinator.rb, line 288 def pluralize(count, string) "#{count} #{string}#{'s' unless count.to_f == 1}" end
print_elapsed_time(output, seconds_elapsed)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 339 def print_elapsed_time(output, seconds_elapsed) minutes = seconds_elapsed.to_i / 60 seconds = seconds_elapsed % 60 m = if minutes > 0 "%d minute%s" % [minutes, minutes == 1 ? '' : 's'] end s = if seconds > 0 "%.2f second%s" % [seconds, seconds == 1 ? '' : 's'] end output.puts "Finished in #{[m, s].compact.join(", ")}" end
print_error_messages(output)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 321 def print_error_messages(output) return if error_messages.nil? output.puts output.puts "Errors:" error_messages.each_with_index do |error, i| output.puts output.puts " #{i.next}) #{error.filename}" output.print error.message end end
print_example_counts(output, by_status_and_time)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 292 def print_example_counts(output, by_status_and_time) example_count = by_status_and_time.map { |status, results| results.size }.inject(0) { |sum, ct| sum + ct } failure_count = by_status_and_time["failed"] ? by_status_and_time["failed"].size : 0 pending_count = by_status_and_time["pending"] ? by_status_and_time["pending"].size : 0 missing_count = @file_coordinator.missing_files.size process_failure_count = failed_workers.size error_count = error_messages.size skipped_count = @file_coordinator.remaining_files.size # Copied from RSpec summary = pluralize(example_count, "example") summary << ", " << pluralize(failure_count, "failure") summary << ", #{pending_count} pending" if pending_count > 0 summary << ", " << pluralize(process_failure_count, "failed proc") if process_failure_count > 0 summary << ", " << pluralize(error_count, "error") if error_count > 0 summary << ", " << pluralize(skipped_count, "skipped file") if skipped_count > 0 summary << ", " << pluralize(missing_count, "missing file") if missing_count > 0 output.puts summary end
print_failed_example_details(output, failed_example_results)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 264 def print_failed_example_details(output, failed_example_results) return if failed_example_results.nil? output.puts output.puts "Failures:" failed_example_results.each_with_index do |failure, i| output.puts output.puts " #{i.next}) #{failure.description}" output.puts failure.details end end
print_failed_process_details(output)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 312 def print_failed_process_details(output) return if failed_workers.empty? output.puts output.puts "Failed processes:" failed_workers.each do |worker| output.puts " - #{worker.node}:#{worker.pid} (env #{worker.environment_number}) #{worker.deactivation_reason} on #{worker.current_file}" end end
print_missing_files(output)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 332 def print_missing_files(output) return if @file_coordinator.missing_files.empty? output.puts output.puts "Missing files from disconnects:" @file_coordinator.missing_files.each { |file| output.puts " + #{file} was given to a node, which disconnected" } end
print_pending_example_details(output, pending_example_results)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 254 def print_pending_example_details(output, pending_example_results) return if pending_example_results.nil? output.puts output.puts "Pending:" pending_example_results.each do |pending| output.puts output.puts pending.details.sub(/^\s*Pending:\s*/, '') end end
print_skipped_files_details(output)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 245 def print_skipped_files_details(output) return if !work_left_to_do? output.puts output.puts "Skipped files:" @file_coordinator.remaining_files.each do |spec_file| output.puts " - #{spec_file}" end end
print_summary()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 210 def print_summary elapsed = Time.now - @start_time by_status_and_time = combine_example_results.each_with_object(Hash.new { |h, k| h[k] = [] }) do |result, idx| idx[result.status] << result end outputs = [$stdout, summary_file].compact outputs.each do |output| print_skipped_files_details(output) print_pending_example_details(output, by_status_and_time["pending"]) print_failed_example_details(output, by_status_and_time["failed"]) print_missing_files(output) log_failed_files(output, by_status_and_time["failed"].map(&:filename).uniq + @file_coordinator.missing_files.to_a) if options.log_failing_files print_failed_process_details(output) print_error_messages(output) output.puts print_elapsed_time(output, elapsed) output.puts failed? ? "FAILURE" : "SUCCESS" print_example_counts(output, by_status_and_time) output.close unless output == $stdout end end
quit_all_workers()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 91 def quit_all_workers # quit_workers modifies @workers, so dup before sending in quit_workers(@workers.dup) end
quit_idle_unnecessary_workers()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 199 def quit_idle_unnecessary_workers unless work_left_to_do? idle_workers = @workers.reject(&:working?) quit_workers(idle_workers) end end
quit_workers(some_workers)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 109 def quit_workers(some_workers) end_workers_in_parallel(some_workers, :quit_when_idle_and_wait_for_quit) end
reap_one_worker(worker, reason)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 155 def reap_one_worker(worker, reason) worker.reap worker.deactivation_reason = reason mark_worker_as_stopped(worker) end
reap_stalled_workers()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 168 def reap_stalled_workers @workers.select(&:stalled?).each do |stalled_worker| reap_one_worker(stalled_worker, "stalled") end end
run_loop()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 80 def run_loop add_file_to_buffer loop do act_on_available_worker_messages(0.3) reap_stalled_workers start_missing_workers quit_idle_unnecessary_workers break unless @workers.detect(&:working?) end end
send_results(worker)
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 149 def send_results(worker) results_to_send = worker.example_results - @worker_results @worker_results += results_to_send @file_coordinator.send_results(results_to_send) end
sort_files(files)
click to toggle source
Sorting by decreasing size attempts to ensure we don't send the slowest file to a worker right before all the other workers finish and then end up waiting for that one process to finish. In the future it would be nice to log execution time and sort by that.
# File lib/rspec/multiprocess_runner/coordinator.rb, line 71 def sort_files(files) # #sort_by caches the File.size result so we only call it once per file. files.sort_by { |file| -File.size(file) } end
start_missing_workers()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 189 def start_missing_workers if @workers.size < options.worker_count && work_left_to_do? running_process_numbers = @workers.map(&:environment_number) missing_process_numbers = expected_worker_numbers - running_process_numbers missing_process_numbers.each do |n| create_and_start_worker_if_necessary(n) end end end
summary_file()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 206 def summary_file File.new(options.summary_filename, 'w') if (options.summary_filename) end
work_left_to_do?()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 113 def work_left_to_do? @file_buffer.any? end
worker_sockets()
click to toggle source
# File lib/rspec/multiprocess_runner/coordinator.rb, line 76 def worker_sockets @workers.map(&:socket) end