class TestQueue::Runner

Attributes

concurrency[RW]

Public Class Methods

new(queue, concurrency=nil, socket=nil, relay=nil) click to toggle source
# File lib/test_queue/runner.rb, line 27
def initialize(queue, concurrency=nil, socket=nil, relay=nil)
  raise ArgumentError, 'array required' unless Array === queue

  if forced = ENV['TEST_QUEUE_FORCE']
    forced = forced.split(/\s*,\s*/)
    whitelist = Set.new(forced)
    queue = queue.select{ |s| whitelist.include?(s.to_s) }
    queue.sort_by!{ |s| forced.index(s.to_s) }
  end

  @procline = $0
  @suites = queue.inject(Hash.new) do |hash, suite|
    key = suite.respond_to?(:id) ? suite.id : suite.to_s
    hash.update key => suite
  end
  @queue = @suites.keys

  @workers = {}
  @completed = []

  @concurrency =
    concurrency ||
    (ENV['TEST_QUEUE_WORKERS'] && ENV['TEST_QUEUE_WORKERS'].to_i) ||
    if File.exists?('/proc/cpuinfo')
      File.read('/proc/cpuinfo').split("\n").grep(/processor/).size
    elsif RUBY_PLATFORM =~ /darwin/
      `/usr/sbin/sysctl -n hw.activecpu`.to_i
    else
      2
    end

  @slave_connection_timeout =
    (ENV['TEST_QUEUE_RELAY_TIMEOUT'] && ENV['TEST_QUEUE_RELAY_TIMEOUT'].to_i) ||
    30

  @run_token = ENV['TEST_QUEUE_RELAY_TOKEN'] || SecureRandom.hex(8)

  @socket =
    socket ||
    ENV['TEST_QUEUE_SOCKET'] ||
    "/tmp/test_queue_#{$$}_#{object_id}.sock"

  @relay =
    relay ||
    ENV['TEST_QUEUE_RELAY']

  @slave_message = ENV["TEST_QUEUE_SLAVE_MESSAGE"] if ENV.has_key?("TEST_QUEUE_SLAVE_MESSAGE")

  if @relay == @socket
    STDERR.puts "*** Detected TEST_QUEUE_RELAY == TEST_QUEUE_SOCKET. Disabling relay mode."
    @relay = nil
  elsif @relay
    @queue = []
  end
end

Public Instance Methods

after_fork(num) click to toggle source

Prepare a worker for executing jobs after a fork.

# File lib/test_queue/runner.rb, line 268
def after_fork(num)
end
after_fork_internal(num, iterator) click to toggle source
# File lib/test_queue/runner.rb, line 240
def after_fork_internal(num, iterator)
  srand

  output = File.open("/tmp/test_queue_worker_#{$$}_output", 'w')

  $stdout.reopen(output)
  $stderr.reopen($stdout)
  $stdout.sync = $stderr.sync = true

  $0 = "test-queue worker [#{num}]"
  puts
  puts "==> Starting #$0 (#{Process.pid} on #{Socket.gethostname}) - iterating over #{iterator.sock}"
  puts

  after_fork(num)
end
around_filter(suite) { || ... } click to toggle source
# File lib/test_queue/runner.rb, line 263
def around_filter(suite)
  yield
end
cleanup_worker() click to toggle source
# File lib/test_queue/runner.rb, line 284
def cleanup_worker
end
connect_to_relay() click to toggle source
# File lib/test_queue/runner.rb, line 371
def connect_to_relay
  sock = nil
  start = Time.now
  puts "Attempting to connect for #{@slave_connection_timeout}s..."
  while sock.nil?
    begin
      sock = TCPSocket.new(*@relay.split(':'))
    rescue Errno::ECONNREFUSED => e
      raise e if Time.now - start > @slave_connection_timeout
      puts "Master not yet available, sleeping..."
      sleep 0.5
    end
  end
  sock
end
distribute_queue() click to toggle source
# File lib/test_queue/runner.rb, line 317
def distribute_queue
  return if relay?
  remote_workers = 0

  until @queue.empty? && remote_workers == 0
    if IO.select([@server], nil, nil, 0.1).nil?
      reap_worker(false) if @workers.any? # check for worker deaths
    else
      sock = @server.accept
      cmd = sock.gets.strip
      case cmd
      when /^POP/
        # If we have a slave from a different test run, don't respond, and it will consider the test run done.
        if obj = @queue.shift
          data = Marshal.dump(obj.to_s)
          sock.write(data)
        end
      when /^SLAVE (\d+) ([\w\.-]+) (\w+)(?: (.+))?/
        num = $1.to_i
        slave = $2
        run_token = $3
        slave_message = $4
        if run_token == @run_token
          # If we have a slave from a different test run, don't respond, and it will consider the test run done.
          sock.write("OK\n")
          remote_workers += num
        else
          STDERR.puts "*** Worker from run #{run_token} connected to master for run #{@run_token}; ignoring."
          sock.write("WRONG RUN\n")
        end
        message = "*** #{num} workers connected from #{slave} after #{Time.now-@start_time}s"
        message << " " + slave_message if slave_message
        STDERR.puts message
      when /^WORKER (\d+)/
        data = sock.read($1.to_i)
        worker = Marshal.load(data)
        worker_completed(worker)
        remote_workers -= 1
      end
      sock.close
    end
  end
ensure
  stop_master

  until @workers.empty?
    reap_worker
  end
end
execute() click to toggle source
# File lib/test_queue/runner.rb, line 92
def execute
  $stdout.sync = $stderr.sync = true
  @start_time = Time.now

  @concurrency > 0 ?
    execute_parallel :
    execute_sequential
ensure
  summarize_internal unless $!
end
execute_parallel() click to toggle source
# File lib/test_queue/runner.rb, line 158
def execute_parallel
  start_master
  prepare(@concurrency)
  @prepared_time = Time.now
  start_relay if relay?
  spawn_workers
  distribute_queue
ensure
  stop_master

  @workers.each do |pid, worker|
    Process.kill 'KILL', pid
  end

  until @workers.empty?
    reap_worker
  end
end
execute_sequential() click to toggle source
# File lib/test_queue/runner.rb, line 154
def execute_sequential
  exit! run_worker(@queue)
end
prepare(concurrency) click to toggle source

Run in the master before the fork. Used to create concurrency copies of any databases required by the test workers.

# File lib/test_queue/runner.rb, line 260
def prepare(concurrency)
end
reap_worker(blocking=true) click to toggle source
# File lib/test_queue/runner.rb, line 292
def reap_worker(blocking=true)
  if pid = Process.waitpid(-1, blocking ? 0 : Process::WNOHANG) and worker = @workers.delete(pid)
    worker.status = $?
    worker.end_time = Time.now

    if File.exists?(file = "/tmp/test_queue_worker_#{pid}_output")
      worker.output = IO.binread(file)
      FileUtils.rm(file)
    end

    if File.exists?(file = "/tmp/test_queue_worker_#{pid}_stats")
      worker.stats = Marshal.load(IO.binread(file))
      FileUtils.rm(file)
    end

    relay_to_master(worker) if relay?
    worker_completed(worker)
  end
end
relay?() click to toggle source
# File lib/test_queue/runner.rb, line 367
def relay?
  !!@relay
end
relay_to_master(worker) click to toggle source
# File lib/test_queue/runner.rb, line 387
def relay_to_master(worker)
  worker.host = Socket.gethostname
  data = Marshal.dump(worker)

  sock = connect_to_relay
  sock.puts("WORKER #{data.bytesize}")
  sock.write(data)
ensure
  sock.close if sock
end
run_worker(iterator) click to toggle source

Entry point for internal runner implementations. The iterator will yield jobs from the shared queue on the master.

Returns nothing. exits 0 on success. exits N on error, where N is the number of failures.

# File lib/test_queue/runner.rb, line 276
def run_worker(iterator)
  iterator.each do |item|
    puts "  #{item.inspect}"
  end

  return 0 # exit status
end
spawn_workers() click to toggle source
# File lib/test_queue/runner.rb, line 222
def spawn_workers
  @concurrency.times do |i|
    num = i+1

    pid = fork do
      @server.close if @server

      iterator = Iterator.new(relay?? @relay : @socket, @suites, method(:around_filter))
      after_fork_internal(num, iterator)
      ret = run_worker(iterator) || 0
      cleanup_worker
      Kernel.exit! ret
    end

    @workers[pid] = Worker.new(pid, num)
  end
end
start_master() click to toggle source
# File lib/test_queue/runner.rb, line 177
def start_master
  if !relay?
    if @socket =~ /^(?:(.+):)?(\d+)$/
      address = $1 || '0.0.0.0'
      port = $2.to_i
      @socket = "#$1:#$2"
      @server = TCPServer.new(address, port)
    else
      FileUtils.rm(@socket) if File.exists?(@socket)
      @server = UNIXServer.new(@socket)
    end
  end

  desc = "test-queue master (#{relay?? "relaying to #{@relay}" : @socket})"
  puts "Starting #{desc}"
  $0 = "#{desc} - #{@procline}"
end
start_relay() click to toggle source
# File lib/test_queue/runner.rb, line 195
def start_relay
  return unless relay?

  sock = connect_to_relay
  message = @slave_message ? " #{@slave_message}" : ""
  message.gsub!(/(\r|\n)/, "") # Our "protocol" is newline-separated
  sock.puts("SLAVE #{@concurrency} #{Socket.gethostname} #{@run_token}#{message}")
  response = sock.gets.strip
  unless response == "OK"
    STDERR.puts "*** Got non-OK response from master: #{response}"
    sock.close
    exit! 1
  end
  sock.close
rescue Errno::ECONNREFUSED
  STDERR.puts "*** Unable to connect to relay #{@relay}. Aborting.."
  exit! 1
end
stats() click to toggle source
# File lib/test_queue/runner.rb, line 83
def stats
  @stats ||=
    if File.exists?(file = stats_file)
      Marshal.load(IO.binread(file)) || {}
    else
      {}
    end
end
stats_file() click to toggle source
# File lib/test_queue/runner.rb, line 149
def stats_file
  ENV['TEST_QUEUE_STATS'] ||
  '.test_queue_stats'
end
stop_master() click to toggle source
# File lib/test_queue/runner.rb, line 214
def stop_master
  return if relay?

  FileUtils.rm_f(@socket) if @socket && @server.is_a?(UNIXServer)
  @server.close rescue nil if @server
  @socket = @server = nil
end
summarize() click to toggle source
# File lib/test_queue/runner.rb, line 146
def summarize
end
summarize_internal() click to toggle source
# File lib/test_queue/runner.rb, line 103
def summarize_internal
  puts
  puts "==> Summary (#{@completed.size} workers in %.4fs)" % (Time.now-@start_time)
  puts

  @failures = ''
  @completed.each do |worker|
    summarize_worker(worker)
    @failures << worker.failure_output if worker.failure_output

    puts "    [%2d] %60s      %4d suites in %.4fs      (pid %d exit %d%s)" % [
      worker.num,
      worker.summary,
      worker.stats.size,
      worker.end_time - worker.start_time,
      worker.pid,
      worker.status.exitstatus,
      worker.host && " on #{worker.host.split('.').first}"
    ]
  end

  unless @failures.empty?
    puts
    puts "==> Failures"
    puts
    puts @failures
  end

  puts

  if @stats
    File.open(stats_file, 'wb') do |f|
      f.write Marshal.dump(stats)
    end
  end

  summarize

  estatus = @completed.inject(0){ |s, worker| s + worker.status.exitstatus }
  estatus = 255 if estatus > 255
  exit!(estatus)
end
summarize_worker(worker) click to toggle source
# File lib/test_queue/runner.rb, line 287
def summarize_worker(worker)
  worker.summary = ''
  worker.failure_output = ''
end
worker_completed(worker) click to toggle source
# File lib/test_queue/runner.rb, line 312
def worker_completed(worker)
  @completed << worker
  puts worker.output if ENV['TEST_QUEUE_VERBOSE'] || worker.status.exitstatus != 0
end