class RSpecQ::Queue
Queue
is the data store interface (Redis) and is used to manage the work queue for a particular build. All Redis operations happen via Queue
.
A queue typically contains all the data needed for a particular build to happen. These include (but are not limited to) the following:
-
the list of jobs (spec files and/or examples) to be executed
-
the failed examples along with their backtrace
-
the set of running jobs
-
previous job timing statistics used to optimally schedule the jobs
-
the set of executed jobs
Constants
- REQUEUE_JOB
- REQUEUE_LOST_JOB
Scans for dead workers and puts their reserved jobs back to the queue.
- RESERVE_JOB
- STATUS_INITIALIZING
- STATUS_READY
Attributes
Public Class Methods
# File lib/rspecq/queue.rb, line 79 def initialize(build_id, worker_id, redis_opts) @build_id = build_id @worker_id = worker_id @redis = Redis.new(redis_opts.merge(id: worker_id)) end
Public Instance Methods
NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.
# File lib/rspecq/queue.rb, line 122 def acknowledge_job(job) @redis.multi do @redis.hdel(key_queue_running, @worker_id) @redis.sadd(key_queue_processed, job) @redis.rpush(key("queue", "jobs_per_worker", @worker_id), job) end end
# File lib/rspecq/queue.rb, line 214 def become_master @redis.setnx(key_queue_status, STATUS_INITIALIZING) end
Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.
# File lib/rspecq/queue.rb, line 291 def build_failed_fast? if fail_fast.nil? || fail_fast.zero? return false end @redis.multi do @redis.hlen(key_failures) @redis.hlen(key_errors) end.inject(:+) >= fail_fast end
# File lib/rspecq/queue.rb, line 255 def build_successful? exhausted? && example_failures.empty? && non_example_errors.empty? end
# File lib/rspecq/queue.rb, line 198 def example_count @redis.get(key_example_count).to_i end
# File lib/rspecq/queue.rb, line 223 def example_failures @redis.hgetall(key_failures) end
True if the build is complete, false otherwise
# File lib/rspecq/queue.rb, line 232 def exhausted? return false if !published? @redis.multi do @redis.llen(key_queue_unprocessed) @redis.hlen(key_queue_running) end.inject(:+).zero? end
Returns the number of failures that will trigger the build to fail-fast. Returns 0 if this feature is disabled and nil if the Queue
is not yet published
# File lib/rspecq/queue.rb, line 283 def fail_fast return nil unless published? @fail_fast ||= Integer(@redis.hget(key_queue_config, "fail_fast")) end
# File lib/rspecq/queue.rb, line 156 def failed_job_worker(job) redis.hget(key("requeued_job_original_worker"), job) end
Returns the jobs considered flaky (i.e. initially failed but passed after being retried). Must be called after the build is complete, otherwise an exception will be raised.
# File lib/rspecq/queue.rb, line 268 def flaky_jobs if !exhausted? && !build_failed_fast? raise "Queue is not yet exhausted" end requeued = @redis.hkeys(key_requeues) return [] if requeued.empty? requeued - @redis.hkeys(key_failures) end
# File lib/rspecq/queue.rb, line 194 def increment_example_count(n) @redis.incrby(key_example_count, n) end
# File lib/rspecq/queue.rb, line 152 def job_location(job) @redis.hget(key("job_location"), job) end
# File lib/rspecq/queue.rb, line 160 def job_rerun_command(job) worker = failed_job_worker(job) jobs = redis.lrange(key("queue", "jobs_per_worker", worker), 0, -1) seed = redis.hget(key("worker_seed"), worker) "DISABLE_SPRING=1 DISABLE_BOOTSNAP=1 bin/rspecq --build 1 " \ "--worker foo --seed #{seed} --max-requeues 0 --fail-fast 1 " \ "--reproduction #{jobs.join(' ')}" end
redis: LIST<duration>
Last build is at the head of the list.
# File lib/rspecq/queue.rb, line 376 def key_build_times "build_times" end
Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).
redis: HASH<job => error message>
# File lib/rspecq/queue.rb, line 338 def key_errors key("errors") end
The total number of examples, those that were requeued.
redis: STRING<integer>
# File lib/rspecq/queue.rb, line 353 def key_example_count key("example_count") end
Contains regular RSpec example failures.
redis: HASH<example_id => error message>
# File lib/rspecq/queue.rb, line 330 def key_failures key("example_failures") end
redis: HASH<config_key => config_value>
# File lib/rspecq/queue.rb, line 308 def key_queue_config key("queue", "config") end
redis: SET<job>
# File lib/rspecq/queue.rb, line 323 def key_queue_processed key("queue", "processed") end
redis: HASH<worker_id => job>
# File lib/rspecq/queue.rb, line 318 def key_queue_running key("queue", "running") end
redis: STRING [STATUS_INITIALIZING, STATUS_READY
]
# File lib/rspecq/queue.rb, line 303 def key_queue_status key("queue", "status") end
redis: LIST<job>
# File lib/rspecq/queue.rb, line 313 def key_queue_unprocessed key("queue", "unprocessed") end
As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.
redis: HASH<job => times_retried>
# File lib/rspecq/queue.rb, line 346 def key_requeues key("requeues") end
redis: ZSET<job => duration>
NOTE: This key is not scoped to a build (i.e. shared among all builds), so be careful to only publish timings from a single branch (e.g. master). Otherwise, timings won't be accurate.
# File lib/rspecq/queue.rb, line 369 def key_timings "timings" end
redis: ZSET<worker_id => timestamp>
Timestamp of the last example processed by each worker.
# File lib/rspecq/queue.rb, line 360 def key_worker_heartbeats key("worker_heartbeats") end
# File lib/rspecq/queue.rb, line 227 def non_example_errors @redis.hgetall(key_errors) end
# File lib/rspecq/queue.rb, line 206 def processed_jobs @redis.smembers(key_queue_processed) end
# File lib/rspecq/queue.rb, line 202 def processed_jobs_count @redis.scard(key_queue_processed) end
NOTE: jobs will be processed from head to tail (lpop)
# File lib/rspecq/queue.rb, line 86 def publish(jobs, fail_fast = 0) @redis.multi do @redis.hset(key_queue_config, "fail_fast", fail_fast) @redis.rpush(key_queue_unprocessed, jobs) @redis.set(key_queue_status, STATUS_READY) end.first end
# File lib/rspecq/queue.rb, line 241 def published? @redis.get(key_queue_status) == STATUS_READY end
# File lib/rspecq/queue.rb, line 183 def record_build_time(duration) @redis.multi do @redis.lpush(key_build_times, Float(duration)) @redis.ltrim(key_build_times, 0, 99) end end
# File lib/rspecq/queue.rb, line 170 def record_example_failure(example_id, message) @redis.hset(key_failures, example_id, message) end
For errors occured outside of examples (e.g. while loading a spec file)
# File lib/rspecq/queue.rb, line 175 def record_non_example_error(job, message) @redis.hset(key_errors, job, message) end
# File lib/rspecq/queue.rb, line 179 def record_timing(job, duration) @redis.zadd(key_timings, duration, job) end
# File lib/rspecq/queue.rb, line 190 def record_worker_heartbeat @redis.zadd(key_worker_heartbeats, current_time, @worker_id) end
Put job at the head of the queue to be re-processed right after, by another worker. This is a mitigation measure against flaky tests.
Returns nil if the job hit the requeue limit and therefore was not requeued and should be considered a failure.
# File lib/rspecq/queue.rb, line 135 def requeue_job(example, max_requeues, original_worker_id) return false if max_requeues.zero? job = example.id location = example.location_rerun_argument @redis.eval( REQUEUE_JOB, keys: [key_queue_unprocessed, key_requeues, key("requeued_job_original_worker"), key("job_location")], argv: [job, max_requeues, original_worker_id, location] ) end
# File lib/rspecq/queue.rb, line 105 def requeue_lost_job @redis.eval( REQUEUE_LOST_JOB, keys: [ key_worker_heartbeats, key_queue_running, key_queue_unprocessed ], argv: [ current_time, WORKER_LIVENESS_SEC ] ) end
# File lib/rspecq/queue.rb, line 210 def requeued_jobs @redis.hgetall(key_requeues) end
# File lib/rspecq/queue.rb, line 94 def reserve_job @redis.eval( RESERVE_JOB, keys: [ key_queue_unprocessed, key_queue_running, ], argv: [@worker_id] ) end
# File lib/rspecq/queue.rb, line 148 def save_worker_seed(worker, seed) @redis.hset(key("worker_seed"), worker, seed) end
ordered by execution time desc (slowest are in the head)
# File lib/rspecq/queue.rb, line 219 def timings Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)] end
The remaining jobs to be processed. Jobs at the head of the list will be procesed first.
# File lib/rspecq/queue.rb, line 261 def unprocessed_jobs @redis.lrange(key_queue_unprocessed, 0, -1) end
# File lib/rspecq/queue.rb, line 245 def wait_until_published(timeout = 30) (timeout * 10).times do return if published? sleep 0.1 end raise "Queue not yet published after #{timeout} seconds" end
Private Instance Methods
We don't use any Ruby `Time` methods because specs that use timecop in before(:all) hooks will mess up our times.
# File lib/rspecq/queue.rb, line 388 def current_time @redis.time[0] end
# File lib/rspecq/queue.rb, line 382 def key(*keys) [@build_id, keys].join(":") end