class RRRSpec::Server::WorkerRunner
Constants
- CANCEL_POLLING
- TIMEOUT_EXITCODE
Attributes
current_taskset[R]
internal_status[R]
Public Class Methods
new(worker)
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 9 def initialize(worker) @worker = worker end
Public Instance Methods
work_loop()
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 13 def work_loop loop do DispatcherQueue.notify work end end
Private Instance Methods
cleaning_process(logger, taskset, cancel_watcher_pid, pid_to_slave_number)
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 148 def cleaning_process(logger, taskset, cancel_watcher_pid, pid_to_slave_number) logger.write("Send TERM signal to the children") (pid_to_slave_number.keys + [cancel_watcher_pid]).each do |pid| begin Process.kill("-TERM", pid) rescue Errno::ESRCH, Errno::EPERM end end logger.write("Wait for the children") begin loop do pid, status = Process.wait2 if pid != cancel_watcher_pid slave = Slave.build_from_pid(pid) slave.update_status('normal_exit') end end rescue Errno::ECHILD end logger.write("Finished the task") # Some slaves are failed to exit with SIGTERM. Kill -9 them by name. `ps aux | grep "rrrspec slave" | grep -v grep | awk '{print $2}'`.split("\n").map(&:to_i).each do |pid| begin Process.kill("KILL", pid) rescue Errno::ESRCH, Errno::EPERM end end end
execute_with_logs(chdir, command, env, input=nil)
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 209 def execute_with_logs(chdir, command, env, input=nil) Bundler.with_clean_env do in_rd, in_wt = IO.pipe out_rd, out_wt = IO.pipe err_rd, err_wt = IO.pipe pid = spawn(env, command, { chdir: chdir, pgroup: true, in: in_rd, out: out_wt, err: err_wt }) out_wt.close_write err_wt.close_write in_wt.write(input) if input in_wt.close_write return pid, out_rd, err_rd end end
log_to_logger(logger, out_rd, err_rd)
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 225 def log_to_logger(logger, out_rd, err_rd) rds = [out_rd, err_rd] while !rds.empty? IO.select(rds)[0].each do |r| line = r.gets if line line = line.strip if r == out_rd logger.write("OUT " + line) else logger.write("ERR " + line) end else rds.delete(r) end end end end
rspec(taskset)
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 64 def rspec(taskset) working_path = File.join(RRRSpec.configuration.working_dir, taskset.rsync_name) num_slaves = RRRSpec.configuration.slave_processes env = {} env["NUM_SLAVES"] = num_slaves.to_s env["RRRSPEC_CONFIG_FILES"] = RRRSpec.configuration.loaded.join(':') env["RRRSPEC_WORKING_DIR"] = RRRSpec.configuration.working_dir env["RRRSPEC_TASKSET_KEY"] = taskset.key pid_to_slave_number = {} slave_command = taskset.slave_command spawner = proc do |slave_number| pid, out_rd, err_rd = execute_with_logs( working_path, '/bin/bash -ex', env.merge({"SLAVE_NUMBER" => slave_number.to_s}), slave_command ) slave = Slave.build_from_pid(pid) taskset.add_slave(slave) Thread.fork { log_to_logger(TimedLogger.new(slave), out_rd, err_rd) } pid_to_slave_number[pid] = slave_number end num_slaves.times { |i| spawner.call(i) } cancel_watcher_pid = Process.fork do $0 = 'rrrspec cancel watcher' loop do break unless taskset.status == 'running' sleep CANCEL_POLLING end end trials = 1 max_trials = taskset.max_trials loop do break if pid_to_slave_number.empty? begin pid, status = Process.wait2 break if pid == cancel_watcher_pid break unless taskset.status == 'running' slave = Slave.build_from_pid(pid) if status.success? slave.update_status('normal_exit') pid_to_slave_number.delete(pid) else exit_code = (status.to_i >> 8) if exit_code == TIMEOUT_EXITCODE slave_log = slave.log slave.trials.each do |trial| if trial.status == nil trial.finish('timeout', slave_log, '', nil, nil, nil) ArbiterQueue.trial(trial) end end slave.update_status('timeout_exit') else slave.trials.each do |trial| if trial.status == nil trial.finish('error', '', '', nil, nil, nil) ArbiterQueue.trial(trial) end end slave.update_status('failure_exit') trials += 1 if trials > max_trials ArbiterQueue.fail(taskset) break end end slave_number = pid_to_slave_number[pid] pid_to_slave_number.delete(pid) spawner.call(slave_number) end rescue Errno::ECHILD break end end return cancel_watcher_pid, pid_to_slave_number end
rsync(logger, taskset)
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 22 def rsync(logger, taskset) logger.write("Start RSync") working_path = File.join(RRRSpec.configuration.working_dir, taskset.rsync_name) FileUtils.mkdir_p(working_path) unless Dir.exist?(working_path) remote_path = File.join(RRRSpec.configuration.rsync_remote_path, taskset.rsync_name) command = "rsync #{RRRSpec.configuration.rsync_options} #{remote_path}/ #{working_path}" pid, out_rd, err_rd = execute_with_logs(working_path, command, {}) log_to_logger(logger, out_rd, err_rd) pid, status = Process.waitpid2(pid) if status.success? logger.write("RSync finished") return true else logger.write("RSync failed") ArbiterQueue.fail(taskset) return false end end
setup(logger, taskset)
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 43 def setup(logger, taskset) logger.write("Start setup") env = { 'NUM_SLAVES' => RRRSpec.configuration.slave_processes.to_s } working_path = File.join(RRRSpec.configuration.working_dir, taskset.rsync_name) pid, out_rd, err_rd = execute_with_logs(working_path, '/bin/bash -ex', env, taskset.setup_command) log_to_logger(logger, out_rd, err_rd) pid, status = Process.waitpid2(pid) if status.success? logger.write("Setup finished") return true else logger.write("Setup failed") ArbiterQueue.fail(taskset) return false end end
work()
click to toggle source
# File lib/rrrspec/server/worker_runner.rb, line 179 def work @worker.update_current_taskset(nil) taskset = @worker.dequeue_taskset worker_log = WorkerLog.create(@worker, taskset) logger = TimedLogger.new(worker_log) check = proc do unless taskset.status == 'running' logger.write("The taskset(#{taskset.key}) is not running but #{taskset.status}") return end end check.call @worker.update_current_taskset(taskset) rsync(logger, taskset) worker_log.set_rsync_finished_time check.call setup(logger, taskset) worker_log.set_setup_finished_time check.call cancel_watcher_pid, pid_to_slave_number = rspec(taskset) cleaning_process(logger, taskset, cancel_watcher_pid, pid_to_slave_number) ensure worker_log.set_finished_time if worker_log @worker.update_current_taskset(nil) end