class RSpec::MultiprocessRunner::Worker

This object has several roles:

@private

Constants

COMMAND_QUIT
COMMAND_RUN_FILE
ERROR_RUNNING
STATUS_EXAMPLE_COMPLETE
STATUS_RUN_COMPLETE

Attributes

current_file[R]
deactivation_reason[RW]
environment_number[R]
example_results[R]
options[RW]
pid[R]

Public Class Methods

new(environment_number, options) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 34
def initialize(environment_number, options)
  @environment_number = environment_number
  @worker_socket, @coordinator_socket = Socket.pair(:UNIX, :STREAM)
  @rspec_arguments = (options.rspec_options || []) + ["--format", ReportingFormatter.to_s]
  self.options = options
  @example_results = []
end
trap_interrupt() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 81
def self.trap_interrupt; end

Public Instance Methods

==(other) click to toggle source

Workers can be found in the coordinator process by their coordinator socket.

Calls superclass method
# File lib/rspec/multiprocess_runner/worker.rb, line 44
def ==(other)
  case other
  when Socket
    other == @coordinator_socket
  else
    super
  end
end
kill_now() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 136
def kill_now
  Process.kill(:KILL, pid)
  Process.detach(pid)
end
quit_when_idle_and_wait_for_quit() click to toggle source
COORDINATOR METHODS

These are methods that the coordinator process calls on its copy of the workers.

# File lib/rspec/multiprocess_runner/worker.rb, line 105
def quit_when_idle_and_wait_for_quit
  send_message_to_worker(command: COMMAND_QUIT)
  Process.wait(self.pid)
end
reap() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 141
def reap
  terminate_then_kill(3, "Reaping troubled process #{environment_number} (#{pid}; #{@current_file})")
end
receive_and_act_on_message_from_worker() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 145
def receive_and_act_on_message_from_worker
  act_on_message_from_worker(receive_message_from_worker)
end
report_error(message) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 231
def report_error(message)
  send_message_to_coordinator(
    status: ERROR_RUNNING,
    message: message,
    filename: @current_file,
  )
end
report_example_result(example_status, description, line_number, details) click to toggle source
WORKER METHODS

These are methods that the worker process calls on the copy of this object that lives in the fork.

# File lib/rspec/multiprocess_runner/worker.rb, line 220
def report_example_result(example_status, description, line_number, details)
  send_message_to_coordinator(
    status: STATUS_EXAMPLE_COMPLETE,
    example_status: example_status,
    description: description,
    line_number: line_number,
    details: details,
    filename: @current_file,
  )
end
run_file(filename) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 110
def run_file(filename)
  send_message_to_worker(command: COMMAND_RUN_FILE, filename: filename)
  @current_file = filename
  @current_file_started_at = @current_example_started_at = Time.now
end
shutdown_now() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 132
def shutdown_now
  terminate_then_kill(5)
end
socket() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 91
def socket
  if self.pid == Process.pid
    @worker_socket
  else
    @coordinator_socket
  end
end
stalled?() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 120
def stalled?
  file_stalled =
    if options.file_timeout_seconds
      working? && (Time.now - @current_file_started_at > options.file_timeout_seconds)
    end
  example_stalled =
    if options.example_timeout_seconds
      working? && (Time.now - @current_example_started_at > options.example_timeout_seconds)
    end
  file_stalled || example_stalled
end
start() click to toggle source

Forks the worker process. In the parent, returns the PID.

# File lib/rspec/multiprocess_runner/worker.rb, line 63
def start
  pid = fork
  if pid
    @worker_socket.close
    @pid = pid
  else
    @coordinator_socket.close
    @pid = Process.pid
    ENV["TEST_ENV_NUMBER"] = test_env_number

    # reset TERM handler so that
    # - the coordinator's version (if any) is not executed twice
    # - it actually terminates the process, instead of doing the ruby
    #   default (throw an exception, which gets caught by RSpec)
    Kernel.trap("TERM", "SYSTEM_DEFAULT")
    # rely on the coordinator to handle INT
    Kernel.trap("INT", "IGNORE")
    # prevent RSpec from trapping INT, also
    ::RSpec::Core::Runner.instance_eval { def self.trap_interrupt; end }

    # Disable RSpec's at_exit hook that would try to run whatever is in ARGV
    ::RSpec::Core::Runner.disable_autorun!

    set_process_name
    run_loop
  end
end
test_env_number() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 53
def test_env_number
  if environment_number == 1 && !options.first_is_1
    ""
  else
    environment_number.to_s
  end
end
to_json(options = nil) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 149
def to_json(options = nil)
  { "pid" => @pid, "environment_number" => @environment_number, "current_file" => @current_file, "deactivation_reason" => @deactivation_reason }.to_json
end
working?() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 116
def working?
  @current_file
end

Private Instance Methods

act_on_message_from_coordinator(message_hash) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 276
def act_on_message_from_coordinator(message_hash)
  return handle_closed_coordinator_socket unless message_hash # EOF
  case message_hash["command"]
  when COMMAND_QUIT
    exit
  when COMMAND_RUN_FILE
    execute_spec(message_hash["filename"])
  else
    $stderr.puts "Received unsupported command #{message_hash["command"].inspect} in worker #{pid}"
  end
  set_process_name
end
act_on_message_from_worker(message_hash) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 174
def act_on_message_from_worker(message_hash)
  return :dead unless message_hash # ignore EOF
  case message_hash["status"]
  when STATUS_RUN_COMPLETE
    example_results << Result.new(message_hash)
    @current_file = nil
    @current_file_started_at = nil
    @current_example_started_at = nil
  when STATUS_EXAMPLE_COMPLETE
    example_results << Result.new(message_hash)
    suffix =
      case message_hash["example_status"]
      when "failed"
        " - FAILED"
      when "pending"
        " - pending"
      end
    if message_hash["details"]
      suffix += "\n#{message_hash["details"]}"
    end
    location = @current_file
    if message_hash["line_number"]
      location = [location, message_hash["line_number"]].join(':')
    end
    $stdout.puts "#{environment_number} (#{pid}): #{message_hash["description"]} (#{location})#{suffix}"
    @current_example_started_at = Time.now
  when ERROR_RUNNING
    example_results << Result.new(message_hash)
    $stdout.puts "Error in file: #{message_hash["filename"]}"
    $stdout.print message_hash["message"]
  else
    $stderr.puts "Received unsupported status #{message_hash["status"].inspect} in worker #{pid}"
  end
  return :alive
end
execute_spec(spec_file) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 289
def execute_spec(spec_file)
  @current_file = spec_file
  set_process_name

  # If we don't do this, every previous spec is run every time run is called
  RSpec.world.example_groups.clear

  ReportingFormatter.worker = self
  error_stream = StringIO.new
  output_stream = StringIO.new
  RSpec::Core::Runner.run(@rspec_arguments + [spec_file], error_stream, output_stream)
  send_message_to_coordinator(status: STATUS_RUN_COMPLETE, filename: spec_file)
ensure
  @current_file = nil
end
handle_closed_coordinator_socket() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 259
def handle_closed_coordinator_socket
  # when the coordinator socket is closed, there's nothing more to do
  exit
end
receive_message(socket) click to toggle source
UTILITY FUNCTIONS

Methods that used by both the coordinator and worker processes.

# File lib/rspec/multiprocess_runner/worker.rb, line 310
def receive_message(socket)
  message_json = socket.gets
  if message_json
    JSON.parse(message_json)
  end
end
receive_message_from_coordinator(socket) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 264
def receive_message_from_coordinator(socket)
  receive_message(socket)
end
receive_message_from_worker() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 170
def receive_message_from_worker
  receive_message(@coordinator_socket)
end
run_loop() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 247
def run_loop
  loop do
    select_result = IO.select([@worker_socket], nil, nil, 1)
    if select_result
      readables, _, _ = select_result
      act_on_message_from_coordinator(
        receive_message_from_coordinator(readables.first)
      )
    end
  end
end
send_message(socket, message_hash) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 317
def send_message(socket, message_hash)
  socket.puts(message_hash.to_json)
end
send_message_to_coordinator(message_hash) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 268
def send_message_to_coordinator(message_hash)
  begin
    send_message(@worker_socket, message_hash)
  rescue Errno::EPIPE
    handle_closed_coordinator_socket
  end
end
send_message_to_worker(message_hash) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 210
def send_message_to_worker(message_hash)
  send_message(@coordinator_socket, message_hash)
end
set_process_name() click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 241
def set_process_name
  name = "RSpec::MultiprocessRunner::Worker #{environment_number}"
  status = current_file ? "running #{current_file}" : "idle"
  $0 = "#{name} #{status}"
end
terminate_then_kill(timeout, message=nil) click to toggle source
# File lib/rspec/multiprocess_runner/worker.rb, line 155
def terminate_then_kill(timeout, message=nil)
  begin
    Timeout.timeout(timeout) do
      $stderr.puts "#{message} with TERM" if message
      if pid
        Process.kill(:TERM, pid)
        Process.wait(pid)
      end
    end
  rescue Timeout::Error
    $stderr.puts "#{message} with KILL" if message
    kill_now
  end
end