class TestQueue::Runner

Constants

TOKEN_REGEX

Attributes

concurrency[RW]
exit_when_done[RW]
stats[R]

Public Class Methods

new(test_framework, concurrency=nil, socket=nil, relay=nil) click to toggle source
# File lib/test_queue/runner.rb, line 36
def initialize(test_framework, concurrency=nil, socket=nil, relay=nil)
  @test_framework = test_framework
  @stats = Stats.new(stats_file)

  if ENV['TEST_QUEUE_EARLY_FAILURE_LIMIT']
    begin
      @early_failure_limit = Integer(ENV['TEST_QUEUE_EARLY_FAILURE_LIMIT'])
    rescue ArgumentError
      raise ArgumentError, 'TEST_QUEUE_EARLY_FAILURE_LIMIT could not be parsed as an integer'
    end
  end

  @procline = $0

  @whitelist = if forced = ENV['TEST_QUEUE_FORCE']
                 forced.split(/\s*,\s*/)
               else
                 []
               end
  @whitelist.freeze

  all_files = @test_framework.all_suite_files.to_set
  @queue = @stats.all_suites
    .select { |suite| all_files.include?(suite.path) }
    .sort_by { |suite| -suite.duration }
    .map { |suite| [suite.name, suite.path] }

  if @whitelist.any?
    @queue.select! { |suite_name, path| @whitelist.include?(suite_name) }
    @queue.sort_by! { |suite_name, path| @whitelist.index(suite_name) }
  end

  @awaited_suites = Set.new(@whitelist)
  @original_queue = Set.new(@queue).freeze

  @workers = {}
  @completed = []

  @concurrency =
    concurrency ||
    (ENV['TEST_QUEUE_WORKERS'] && ENV['TEST_QUEUE_WORKERS'].to_i) ||
    if File.exist?('/proc/cpuinfo')
      File.read('/proc/cpuinfo').split("\n").grep(/processor/).size
    elsif RUBY_PLATFORM =~ /darwin/
      `/usr/sbin/sysctl -n hw.activecpu`.to_i
    else
      2
    end
  unless @concurrency > 0
    raise ArgumentError, "Worker count (#{@concurrency}) must be greater than 0"
  end

  @slave_connection_timeout =
    (ENV['TEST_QUEUE_RELAY_TIMEOUT'] && ENV['TEST_QUEUE_RELAY_TIMEOUT'].to_i) ||
    30

  @run_token = ENV['TEST_QUEUE_RELAY_TOKEN'] || SecureRandom.hex(8)

  @socket =
    socket ||
    ENV['TEST_QUEUE_SOCKET'] ||
    "/tmp/test_queue_#{$$}_#{object_id}.sock"

  @relay =
    relay ||
    ENV['TEST_QUEUE_RELAY']

  @slave_message = ENV["TEST_QUEUE_SLAVE_MESSAGE"] if ENV.has_key?("TEST_QUEUE_SLAVE_MESSAGE")

  if @relay == @socket
    STDERR.puts "*** Detected TEST_QUEUE_RELAY == TEST_QUEUE_SOCKET. Disabling relay mode."
    @relay = nil
  elsif @relay
    @queue = []
  end

  @discovered_suites = Set.new
  @assignments = {}

  @exit_when_done = true

  @aborting = false
end

Public Instance Methods

abort(message) click to toggle source

Stop the test run immediately.

message - String message to print to the console when exiting.

Doesn't return.

# File lib/test_queue/runner.rb, line 607
def abort(message)
  @aborting = true
  kill_subprocesses
  Kernel::abort("Aborting: #{message}")
end
after_fork(num) click to toggle source

Prepare a worker for executing jobs after a fork.

# File lib/test_queue/runner.rb, line 410
def after_fork(num)
end
after_fork_internal(num, iterator) click to toggle source
# File lib/test_queue/runner.rb, line 382
def after_fork_internal(num, iterator)
  srand

  output = File.open("/tmp/test_queue_worker_#{$$}_output", 'w')

  $stdout.reopen(output)
  $stderr.reopen($stdout)
  $stdout.sync = $stderr.sync = true

  $0 = "test-queue worker [#{num}]"
  puts
  puts "==> Starting #$0 (#{Process.pid} on #{Socket.gethostname}) - iterating over #{iterator.sock}"
  puts

  after_fork(num)
end
around_filter(suite) { || ... } click to toggle source
# File lib/test_queue/runner.rb, line 405
def around_filter(suite)
  yield
end
awaiting_suites?() click to toggle source
# File lib/test_queue/runner.rb, line 340
def awaiting_suites?
  case
  when @awaited_suites.any?
    # We're waiting to find all the whitelisted suites so we can run them
    # in the correct order.
    true
  when @queue.empty? && !!@discovering_suites_pid
    # We don't have any suites yet, but we're working on it.
    true
  else
    # It's fine to run any queued suites now.
    false
  end
end
cleanup_worker() click to toggle source
# File lib/test_queue/runner.rb, line 425
def cleanup_worker
end
collect_worker_data(worker) click to toggle source
# File lib/test_queue/runner.rb, line 450
def collect_worker_data(worker)
  if File.exist?(file = "/tmp/test_queue_worker_#{worker.pid}_output")
    worker.output = IO.binread(file)
    FileUtils.rm(file)
  end

  if File.exist?(file = "/tmp/test_queue_worker_#{worker.pid}_suites")
    worker.suites.replace(Marshal.load(IO.binread(file)))
    FileUtils.rm(file)
  end
end
connect_to_relay() click to toggle source
# File lib/test_queue/runner.rb, line 546
def connect_to_relay
  sock = nil
  start = Time.now
  puts "Attempting to connect for #{@slave_connection_timeout}s..."
  while sock.nil?
    begin
      sock = TCPSocket.new(*@relay.split(':'))
    rescue Errno::ECONNREFUSED => e
      raise e if Time.now - start > @slave_connection_timeout
      puts "Master not yet available, sleeping..."
      sleep 0.5
    end
  end
  sock
end
discover_suites() click to toggle source
# File lib/test_queue/runner.rb, line 310
def discover_suites
  # Remote masters don't discover suites; the central master does and
  # distributes them to remote masters.
  return if relay?

  # No need to discover suites if all whitelisted suites are already
  # queued.
  return if @whitelist.any? && @awaited_suites.empty?

  @discovering_suites_pid = fork do
    terminate = false
    Signal.trap("INT") { terminate = true }

    $0 = "test-queue suite discovery process"

    @test_framework.all_suite_files.each do |path|
      @test_framework.suites_from_file(path).each do |suite_name, suite|
        Kernel.exit!(0) if terminate

        @server.connect_address.connect do |sock|
          sock.puts("TOKEN=#{@run_token}")
          sock.puts("NEW SUITE #{Marshal.dump([suite_name, path])}")
        end
      end
    end

    Kernel.exit! 0
  end
end
distribute_queue() click to toggle source
# File lib/test_queue/runner.rb, line 468
def distribute_queue
  return if relay?
  remote_workers = 0

  until !awaiting_suites? && @queue.empty? && remote_workers == 0
    queue_status(@start_time, @queue.size, @workers.size, remote_workers)

    if status = reap_suite_discovery_process(false)
      abort("Discovering suites failed.") unless status.success?
      abort("Failed to discover #{@awaited_suites.sort.join(", ")} specified in TEST_QUEUE_FORCE") if @awaited_suites.any?
    end

    if IO.select([@server], nil, nil, 0.1).nil?
      reap_workers(false) # check for worker deaths
    else
      sock = @server.accept
      token = sock.gets.strip
      cmd = sock.gets.strip

      token = token[TOKEN_REGEX, 1]
      # If we have a slave from a different test run, respond with "WRONG RUN", and it will consider the test run done.
      if token != @run_token
        message = token.nil? ? "Worker sent no token to master" : "Worker from run #{token} connected to master"
        STDERR.puts "*** #{message} for run #{@run_token}; ignoring."
        sock.write("WRONG RUN\n")
        next
      end

      case cmd
      when /^POP (\S+) (\d+)/
        # If we have a slave from a different test run, don't respond, and it will consider the test run done.
        hostname = $1
        pid = Integer($2)
        if awaiting_suites?
          sock.write(Marshal.dump("WAIT"))
        elsif obj = @queue.shift
          data = Marshal.dump(obj)
          sock.write(data)
          @assignments[obj] = [hostname, pid]
        end
      when /^SLAVE (\d+) ([\w\.-]+)(?: (.+))?/
        num = $1.to_i
        slave = $2
        slave_message = $3

        sock.write("OK\n")
        remote_workers += num

        message = "*** #{num} workers connected from #{slave} after #{Time.now-@start_time}s"
        message << " " + slave_message if slave_message
        STDERR.puts message
      when /^WORKER (\d+)/
        data = sock.read($1.to_i)
        worker = Marshal.load(data)
        worker_completed(worker)
        remote_workers -= 1
      when /^NEW SUITE (.+)/
        suite_name, path = Marshal.load($1)
        enqueue_discovered_suite(suite_name, path)
      when /^KABOOM/
        # worker reporting an abnormal number of test failures;
        # stop everything immediately and report the results.
        break
      else
        STDERR.puts("Ignoring unrecognized command: \"#{cmd}\"")
      end
      sock.close
    end
  end
ensure
  stop_master
  reap_workers
end
enqueue_discovered_suite(suite_name, path) click to toggle source
# File lib/test_queue/runner.rb, line 355
def enqueue_discovered_suite(suite_name, path)
  if @whitelist.any? && !@whitelist.include?(suite_name)
    return
  end

  @discovered_suites << [suite_name, path]

  if @original_queue.include?([suite_name, path])
    # This suite was already added to the queue some other way.
    @awaited_suites.delete(suite_name)
    return
  end

  # We don't know how long new suites will take to run, so we put them at
  # the front of the queue. It's better to run a fast suite early than to
  # run a slow suite late.
  @queue.unshift [suite_name, path]

  if @awaited_suites.delete?(suite_name) && @awaited_suites.empty?
    # We've found all the whitelisted suites. Sort the queue to match the
    # whitelist.
    @queue.sort_by! { |suite_name, path| @whitelist.index(suite_name) }

    kill_suite_discovery_process("INT")
  end
end
execute() click to toggle source

Run the tests.

If exit_when_done is true, exit! will be called before this method completes. If exit_when_done is false, this method will return an Integer number of failures.

# File lib/test_queue/runner.rb, line 125
def execute
  $stdout.sync = $stderr.sync = true
  @start_time = Time.now

  execute_internal
  exitstatus = summarize_internal

  if exit_when_done
    exit! exitstatus
  else
    exitstatus
  end
end
execute_internal() click to toggle source
# File lib/test_queue/runner.rb, line 232
def execute_internal
  start_master
  prepare(@concurrency)
  @prepared_time = Time.now
  start_relay if relay?
  discover_suites
  spawn_workers
  distribute_queue
ensure
  stop_master

  kill_subprocesses
end
kill_subprocesses() click to toggle source
# File lib/test_queue/runner.rb, line 574
def kill_subprocesses
  kill_workers
  kill_suite_discovery_process
end
kill_suite_discovery_process(signal="KILL") click to toggle source
# File lib/test_queue/runner.rb, line 587
def kill_suite_discovery_process(signal="KILL")
  return unless @discovering_suites_pid
  Process.kill signal, @discovering_suites_pid
  reap_suite_discovery_process
end
kill_workers() click to toggle source
# File lib/test_queue/runner.rb, line 579
def kill_workers
  @workers.each do |pid, worker|
    Process.kill 'KILL', pid
  end

  reap_workers
end
prepare(concurrency) click to toggle source

Run in the master before the fork. Used to create concurrency copies of any databases required by the test workers.

# File lib/test_queue/runner.rb, line 402
def prepare(concurrency)
end
queue_status(start_time, queue_size, local_worker_count, remote_worker_count) click to toggle source

Subclasses can override to monitor the status of the queue.

For example, you may want to record metrics about how quickly remote workers connect, or abort the build if not enough connect.

This method is called very frequently during the test run, so don't do anything expensive/blocking.

This method is not called on remote masters when using remote workers, only on the central master.

start_time - Time when the test run began queue_size - Integer number of suites left in the queue local_worker_count - Integer number of active local workers remote_worker_count - Integer number of active remote workers

Returns nothing.

# File lib/test_queue/runner.rb, line 630
def queue_status(start_time, queue_size, local_worker_count, remote_worker_count)
end
reap_suite_discovery_process(blocking=true) click to toggle source
# File lib/test_queue/runner.rb, line 593
def reap_suite_discovery_process(blocking=true)
  return unless @discovering_suites_pid
  _, status = Process.waitpid2(@discovering_suites_pid, blocking ? 0 : Process::WNOHANG)
  return unless status

  @discovering_suites_pid = nil
  status
end
reap_workers(blocking=true) click to toggle source
# File lib/test_queue/runner.rb, line 433
def reap_workers(blocking=true)
  @workers.delete_if do |_, worker|
    if Process.waitpid(worker.pid, blocking ? 0 : Process::WNOHANG).nil?
      next false
    end

    worker.status = $?
    worker.end_time = Time.now

    collect_worker_data(worker)
    relay_to_master(worker) if relay?
    worker_completed(worker)

    true
  end
end
relay?() click to toggle source
# File lib/test_queue/runner.rb, line 542
def relay?
  !!@relay
end
relay_to_master(worker) click to toggle source
# File lib/test_queue/runner.rb, line 562
def relay_to_master(worker)
  worker.host = Socket.gethostname
  data = Marshal.dump(worker)

  sock = connect_to_relay
  sock.puts("TOKEN=#{@run_token}")
  sock.puts("WORKER #{data.bytesize}")
  sock.write(data)
ensure
  sock.close if sock
end
run_worker(iterator) click to toggle source

Entry point for internal runner implementations. The iterator will yield jobs from the shared queue on the master.

Returns an Integer number of failures.

# File lib/test_queue/runner.rb, line 417
def run_worker(iterator)
  iterator.each do |item|
    puts "  #{item.inspect}"
  end

  return 0 # exit status
end
spawn_workers() click to toggle source
# File lib/test_queue/runner.rb, line 292
def spawn_workers
  @concurrency.times do |i|
    num = i+1

    pid = fork do
      @server.close if @server

      iterator = Iterator.new(@test_framework, relay?? @relay : @socket, method(:around_filter), early_failure_limit: @early_failure_limit, run_token: @run_token)
      after_fork_internal(num, iterator)
      ret = run_worker(iterator) || 0
      cleanup_worker
      Kernel.exit! ret
    end

    @workers[pid] = Worker.new(pid, num)
  end
end
start_master() click to toggle source
# File lib/test_queue/runner.rb, line 246
def start_master
  if !relay?
    if @socket =~ /^(?:(.+):)?(\d+)$/
      address = $1 || '0.0.0.0'
      port = $2.to_i
      @socket = "#$1:#$2"
      @server = TCPServer.new(address, port)
    else
      FileUtils.rm(@socket) if File.exist?(@socket)
      @server = UNIXServer.new(@socket)
    end
  end

  desc = "test-queue master (#{relay?? "relaying to #{@relay}" : @socket})"
  puts "Starting #{desc}"
  $0 = "#{desc} - #{@procline}"
end
start_relay() click to toggle source
# File lib/test_queue/runner.rb, line 264
def start_relay
  return unless relay?

  sock = connect_to_relay
  message = @slave_message ? " #{@slave_message}" : ""
  message.gsub!(/(\r|\n)/, "") # Our "protocol" is newline-separated
  sock.puts("TOKEN=#{@run_token}")
  sock.puts("SLAVE #{@concurrency} #{Socket.gethostname} #{message}")
  response = sock.gets.strip
  unless response == "OK"
    STDERR.puts "*** Got non-OK response from master: #{response}"
    sock.close
    exit! 1
  end
  sock.close
rescue Errno::ECONNREFUSED
  STDERR.puts "*** Unable to connect to relay #{@relay}. Aborting.."
  exit! 1
end
stats_file() click to toggle source
# File lib/test_queue/runner.rb, line 227
def stats_file
  ENV['TEST_QUEUE_STATS'] ||
  '.test_queue_stats'
end
stop_master() click to toggle source
# File lib/test_queue/runner.rb, line 284
def stop_master
  return if relay?

  FileUtils.rm_f(@socket) if @socket && @server.is_a?(UNIXServer)
  @server.close rescue nil if @server
  @socket = @server = nil
end
summarize() click to toggle source
# File lib/test_queue/runner.rb, line 224
def summarize
end
summarize_internal() click to toggle source
# File lib/test_queue/runner.rb, line 139
def summarize_internal
  puts
  puts "==> Summary (#{@completed.size} workers in %.4fs)" % (Time.now-@start_time)
  puts

  estatus = 0
  misrun_suites = []
  unassigned_suites = []
  @failures = ''
  @completed.each do |worker|
    estatus += (worker.status.exitstatus || 1)
    @stats.record_suites(worker.suites)
    worker.suites.each do |suite|
      assignment = @assignments.delete([suite.name, suite.path])
      host = worker.host || Socket.gethostname
      if assignment.nil?
        unassigned_suites << [suite.name, suite.path]
      elsif assignment != [host, worker.pid]
        misrun_suites << [suite.name, suite.path] + assignment + [host, worker.pid]
      end
      @discovered_suites.delete([suite.name, suite.path])
    end

    summarize_worker(worker)

    @failures << worker.failure_output if worker.failure_output

    puts "    [%2d] %60s      %4d suites in %.4fs      (%s %s)" % [
      worker.num,
      worker.summary,
      worker.suites.size,
      worker.end_time - worker.start_time,
      worker.status.to_s,
      worker.host && " on #{worker.host.split('.').first}"
    ]
  end

  unless @failures.empty?
    puts
    puts "==> Failures"
    puts
    puts @failures
  end

  if !relay?
    unless @discovered_suites.empty?
      estatus += 1
      puts
      puts "The following suites were discovered but were not run:"
      puts

      @discovered_suites.sort.each do |suite_name, path|
        puts "#{suite_name} - #{path}"
      end
    end
    unless unassigned_suites.empty?
      estatus += 1
      puts
      puts "The following suites were not discovered but were run anyway:"
      puts
      unassigned_suites.sort.each do |suite_name, path|
        puts "#{suite_name} - #{path}"
      end
    end
    unless misrun_suites.empty?
      estatus += 1
      puts
      puts "The following suites were run on the wrong workers:"
      puts
      misrun_suites.each do |suite_name, path, target_host, target_pid, actual_host, actual_pid|
        puts "#{suite_name} - #{path}: #{actual_host} (#{actual_pid}) - assigned to #{target_host} (#{target_pid})"
      end
    end
  end

  puts

  @stats.save

  summarize

  estatus = @completed.inject(0){ |s, worker| s + (worker.status.exitstatus || 1)}
  [estatus, 255].min
end
summarize_worker(worker) click to toggle source
# File lib/test_queue/runner.rb, line 428
def summarize_worker(worker)
  worker.summary = ''
  worker.failure_output = ''
end
worker_completed(worker) click to toggle source
# File lib/test_queue/runner.rb, line 462
def worker_completed(worker)
  return if @aborting
  @completed << worker
  puts worker.output if ENV['TEST_QUEUE_VERBOSE'] || worker.status.exitstatus != 0
end