class RSpec::MultiprocessRunner::Worker
This object has several roles:
-
It forks the worker process
-
In the coordinator process, it is used to send messages to the worker and track the worker's status, completed specs, and example results.
-
In the worker process, it is used to send messages to the coordinator and actually run specs.
@private
Constants
- COMMAND_QUIT
- COMMAND_RUN_FILE
- ERROR_RUNNING
- STATUS_EXAMPLE_COMPLETE
- STATUS_RUN_COMPLETE
Attributes
current_file[R]
deactivation_reason[RW]
environment_number[R]
example_results[R]
options[RW]
pid[R]
Public Class Methods
new(environment_number, options)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 34 def initialize(environment_number, options) @environment_number = environment_number @worker_socket, @coordinator_socket = Socket.pair(:UNIX, :STREAM) @rspec_arguments = (options.rspec_options || []) + ["--format", ReportingFormatter.to_s] self.options = options @example_results = [] end
trap_interrupt()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 81 def self.trap_interrupt; end
Public Instance Methods
==(other)
click to toggle source
Workers can be found in the coordinator process by their coordinator socket.
Calls superclass method
# File lib/rspec/multiprocess_runner/worker.rb, line 44 def ==(other) case other when Socket other == @coordinator_socket else super end end
kill_now()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 136 def kill_now Process.kill(:KILL, pid) Process.detach(pid) end
quit_when_idle_and_wait_for_quit()
click to toggle source
COORDINATOR METHODS
These are methods that the coordinator process calls on its copy of the workers.
# File lib/rspec/multiprocess_runner/worker.rb, line 105 def quit_when_idle_and_wait_for_quit send_message_to_worker(command: COMMAND_QUIT) Process.wait(self.pid) end
reap()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 141 def reap terminate_then_kill(3, "Reaping troubled process #{environment_number} (#{pid}; #{@current_file})") end
receive_and_act_on_message_from_worker()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 145 def receive_and_act_on_message_from_worker act_on_message_from_worker(receive_message_from_worker) end
report_error(message)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 231 def report_error(message) send_message_to_coordinator( status: ERROR_RUNNING, message: message, filename: @current_file, ) end
report_example_result(example_status, description, line_number, details)
click to toggle source
WORKER METHODS
These are methods that the worker process calls on the copy of this object that lives in the fork.
# File lib/rspec/multiprocess_runner/worker.rb, line 220 def report_example_result(example_status, description, line_number, details) send_message_to_coordinator( status: STATUS_EXAMPLE_COMPLETE, example_status: example_status, description: description, line_number: line_number, details: details, filename: @current_file, ) end
run_file(filename)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 110 def run_file(filename) send_message_to_worker(command: COMMAND_RUN_FILE, filename: filename) @current_file = filename @current_file_started_at = @current_example_started_at = Time.now end
shutdown_now()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 132 def shutdown_now terminate_then_kill(5) end
socket()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 91 def socket if self.pid == Process.pid @worker_socket else @coordinator_socket end end
stalled?()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 120 def stalled? file_stalled = if options.file_timeout_seconds working? && (Time.now - @current_file_started_at > options.file_timeout_seconds) end example_stalled = if options.example_timeout_seconds working? && (Time.now - @current_example_started_at > options.example_timeout_seconds) end file_stalled || example_stalled end
start()
click to toggle source
Forks the worker process. In the parent, returns the PID.
# File lib/rspec/multiprocess_runner/worker.rb, line 63 def start pid = fork if pid @worker_socket.close @pid = pid else @coordinator_socket.close @pid = Process.pid ENV["TEST_ENV_NUMBER"] = test_env_number # reset TERM handler so that # - the coordinator's version (if any) is not executed twice # - it actually terminates the process, instead of doing the ruby # default (throw an exception, which gets caught by RSpec) Kernel.trap("TERM", "SYSTEM_DEFAULT") # rely on the coordinator to handle INT Kernel.trap("INT", "IGNORE") # prevent RSpec from trapping INT, also ::RSpec::Core::Runner.instance_eval { def self.trap_interrupt; end } # Disable RSpec's at_exit hook that would try to run whatever is in ARGV ::RSpec::Core::Runner.disable_autorun! set_process_name run_loop end end
test_env_number()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 53 def test_env_number if environment_number == 1 && !options.first_is_1 "" else environment_number.to_s end end
to_json(options = nil)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 149 def to_json(options = nil) { "pid" => @pid, "environment_number" => @environment_number, "current_file" => @current_file, "deactivation_reason" => @deactivation_reason }.to_json end
working?()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 116 def working? @current_file end
Private Instance Methods
act_on_message_from_coordinator(message_hash)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 276 def act_on_message_from_coordinator(message_hash) return handle_closed_coordinator_socket unless message_hash # EOF case message_hash["command"] when COMMAND_QUIT exit when COMMAND_RUN_FILE execute_spec(message_hash["filename"]) else $stderr.puts "Received unsupported command #{message_hash["command"].inspect} in worker #{pid}" end set_process_name end
act_on_message_from_worker(message_hash)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 174 def act_on_message_from_worker(message_hash) return :dead unless message_hash # ignore EOF case message_hash["status"] when STATUS_RUN_COMPLETE example_results << Result.new(message_hash) @current_file = nil @current_file_started_at = nil @current_example_started_at = nil when STATUS_EXAMPLE_COMPLETE example_results << Result.new(message_hash) suffix = case message_hash["example_status"] when "failed" " - FAILED" when "pending" " - pending" end if message_hash["details"] suffix += "\n#{message_hash["details"]}" end location = @current_file if message_hash["line_number"] location = [location, message_hash["line_number"]].join(':') end $stdout.puts "#{environment_number} (#{pid}): #{message_hash["description"]} (#{location})#{suffix}" @current_example_started_at = Time.now when ERROR_RUNNING example_results << Result.new(message_hash) $stdout.puts "Error in file: #{message_hash["filename"]}" $stdout.print message_hash["message"] else $stderr.puts "Received unsupported status #{message_hash["status"].inspect} in worker #{pid}" end return :alive end
execute_spec(spec_file)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 289 def execute_spec(spec_file) @current_file = spec_file set_process_name # If we don't do this, every previous spec is run every time run is called RSpec.world.example_groups.clear ReportingFormatter.worker = self error_stream = StringIO.new output_stream = StringIO.new RSpec::Core::Runner.run(@rspec_arguments + [spec_file], error_stream, output_stream) send_message_to_coordinator(status: STATUS_RUN_COMPLETE, filename: spec_file) ensure @current_file = nil end
handle_closed_coordinator_socket()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 259 def handle_closed_coordinator_socket # when the coordinator socket is closed, there's nothing more to do exit end
receive_message(socket)
click to toggle source
UTILITY FUNCTIONS
Methods that used by both the coordinator and worker processes.
# File lib/rspec/multiprocess_runner/worker.rb, line 310 def receive_message(socket) message_json = socket.gets if message_json JSON.parse(message_json) end end
receive_message_from_coordinator(socket)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 264 def receive_message_from_coordinator(socket) receive_message(socket) end
receive_message_from_worker()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 170 def receive_message_from_worker receive_message(@coordinator_socket) end
run_loop()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 247 def run_loop loop do select_result = IO.select([@worker_socket], nil, nil, 1) if select_result readables, _, _ = select_result act_on_message_from_coordinator( receive_message_from_coordinator(readables.first) ) end end end
send_message(socket, message_hash)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 317 def send_message(socket, message_hash) socket.puts(message_hash.to_json) end
send_message_to_coordinator(message_hash)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 268 def send_message_to_coordinator(message_hash) begin send_message(@worker_socket, message_hash) rescue Errno::EPIPE handle_closed_coordinator_socket end end
send_message_to_worker(message_hash)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 210 def send_message_to_worker(message_hash) send_message(@coordinator_socket, message_hash) end
set_process_name()
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 241 def set_process_name name = "RSpec::MultiprocessRunner::Worker #{environment_number}" status = current_file ? "running #{current_file}" : "idle" $0 = "#{name} #{status}" end
terminate_then_kill(timeout, message=nil)
click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 155 def terminate_then_kill(timeout, message=nil) begin Timeout.timeout(timeout) do $stderr.puts "#{message} with TERM" if message if pid Process.kill(:TERM, pid) Process.wait(pid) end end rescue Timeout::Error $stderr.puts "#{message} with KILL" if message kill_now end end