class RSpecQ::Queue

Queue is the data store interface (Redis) and is used to manage the work queue for a particular build. All Redis operations happen via Queue.

A queue typically contains all the data needed for a particular build to happen. These include (but are not limited to) the following:

Constants

REQUEUE_JOB
REQUEUE_LOST_JOB

Scans for dead workers and puts their reserved jobs back to the queue.

RESERVE_JOB
STATUS_INITIALIZING
STATUS_READY

Attributes

redis[R]

Public Class Methods

new(build_id, worker_id, redis_opts) click to toggle source
# File lib/rspecq/queue.rb, line 79
def initialize(build_id, worker_id, redis_opts)
  @build_id = build_id
  @worker_id = worker_id
  @redis = Redis.new(redis_opts.merge(id: worker_id))
end

Public Instance Methods

acknowledge_job(job) click to toggle source

NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.

# File lib/rspecq/queue.rb, line 122
def acknowledge_job(job)
  @redis.multi do
    @redis.hdel(key_queue_running, @worker_id)
    @redis.sadd(key_queue_processed, job)
    @redis.rpush(key("queue", "jobs_per_worker", @worker_id), job)
  end
end
become_master() click to toggle source
# File lib/rspecq/queue.rb, line 214
def become_master
  @redis.setnx(key_queue_status, STATUS_INITIALIZING)
end
build_failed_fast?() click to toggle source

Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.

# File lib/rspecq/queue.rb, line 291
def build_failed_fast?
  if fail_fast.nil? || fail_fast.zero?
    return false
  end

  @redis.multi do
    @redis.hlen(key_failures)
    @redis.hlen(key_errors)
  end.inject(:+) >= fail_fast
end
build_successful?() click to toggle source
# File lib/rspecq/queue.rb, line 255
def build_successful?
  exhausted? && example_failures.empty? && non_example_errors.empty?
end
example_count() click to toggle source
# File lib/rspecq/queue.rb, line 198
def example_count
  @redis.get(key_example_count).to_i
end
example_failures() click to toggle source
# File lib/rspecq/queue.rb, line 223
def example_failures
  @redis.hgetall(key_failures)
end
exhausted?() click to toggle source

True if the build is complete, false otherwise

# File lib/rspecq/queue.rb, line 232
def exhausted?
  return false if !published?

  @redis.multi do
    @redis.llen(key_queue_unprocessed)
    @redis.hlen(key_queue_running)
  end.inject(:+).zero?
end
fail_fast() click to toggle source

Returns the number of failures that will trigger the build to fail-fast. Returns 0 if this feature is disabled and nil if the Queue is not yet published

# File lib/rspecq/queue.rb, line 283
def fail_fast
  return nil unless published?

  @fail_fast ||= Integer(@redis.hget(key_queue_config, "fail_fast"))
end
failed_job_worker(job) click to toggle source
# File lib/rspecq/queue.rb, line 156
def failed_job_worker(job)
  redis.hget(key("requeued_job_original_worker"), job)
end
flaky_jobs() click to toggle source

Returns the jobs considered flaky (i.e. initially failed but passed after being retried). Must be called after the build is complete, otherwise an exception will be raised.

# File lib/rspecq/queue.rb, line 268
def flaky_jobs
  if !exhausted? && !build_failed_fast?
    raise "Queue is not yet exhausted"
  end

  requeued = @redis.hkeys(key_requeues)

  return [] if requeued.empty?

  requeued - @redis.hkeys(key_failures)
end
increment_example_count(n) click to toggle source
# File lib/rspecq/queue.rb, line 194
def increment_example_count(n)
  @redis.incrby(key_example_count, n)
end
job_location(job) click to toggle source
# File lib/rspecq/queue.rb, line 152
def job_location(job)
  @redis.hget(key("job_location"), job)
end
job_rerun_command(job) click to toggle source
# File lib/rspecq/queue.rb, line 160
def job_rerun_command(job)
  worker = failed_job_worker(job)
  jobs = redis.lrange(key("queue", "jobs_per_worker", worker), 0, -1)
  seed = redis.hget(key("worker_seed"), worker)

  "DISABLE_SPRING=1 DISABLE_BOOTSNAP=1 bin/rspecq --build 1 " \
    "--worker foo --seed #{seed} --max-requeues 0 --fail-fast 1 " \
    "--reproduction #{jobs.join(' ')}"
end
key_build_times() click to toggle source

redis: LIST<duration>

Last build is at the head of the list.

# File lib/rspecq/queue.rb, line 376
def key_build_times
  "build_times"
end
key_errors() click to toggle source

Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).

redis: HASH<job => error message>

# File lib/rspecq/queue.rb, line 338
def key_errors
  key("errors")
end
key_example_count() click to toggle source

The total number of examples, those that were requeued.

redis: STRING<integer>

# File lib/rspecq/queue.rb, line 353
def key_example_count
  key("example_count")
end
key_failures() click to toggle source

Contains regular RSpec example failures.

redis: HASH<example_id => error message>

# File lib/rspecq/queue.rb, line 330
def key_failures
  key("example_failures")
end
key_queue_config() click to toggle source

redis: HASH<config_key => config_value>

# File lib/rspecq/queue.rb, line 308
def key_queue_config
  key("queue", "config")
end
key_queue_processed() click to toggle source

redis: SET<job>

# File lib/rspecq/queue.rb, line 323
def key_queue_processed
  key("queue", "processed")
end
key_queue_running() click to toggle source

redis: HASH<worker_id => job>

# File lib/rspecq/queue.rb, line 318
def key_queue_running
  key("queue", "running")
end
key_queue_status() click to toggle source

redis: STRING [STATUS_INITIALIZING, STATUS_READY]

# File lib/rspecq/queue.rb, line 303
def key_queue_status
  key("queue", "status")
end
key_queue_unprocessed() click to toggle source

redis: LIST<job>

# File lib/rspecq/queue.rb, line 313
def key_queue_unprocessed
  key("queue", "unprocessed")
end
key_requeues() click to toggle source

As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.

redis: HASH<job => times_retried>

# File lib/rspecq/queue.rb, line 346
def key_requeues
  key("requeues")
end
key_timings() click to toggle source

redis: ZSET<job => duration>

NOTE: This key is not scoped to a build (i.e. shared among all builds), so be careful to only publish timings from a single branch (e.g. master). Otherwise, timings won't be accurate.

# File lib/rspecq/queue.rb, line 369
def key_timings
  "timings"
end
key_worker_heartbeats() click to toggle source

redis: ZSET<worker_id => timestamp>

Timestamp of the last example processed by each worker.

# File lib/rspecq/queue.rb, line 360
def key_worker_heartbeats
  key("worker_heartbeats")
end
non_example_errors() click to toggle source
# File lib/rspecq/queue.rb, line 227
def non_example_errors
  @redis.hgetall(key_errors)
end
processed_jobs() click to toggle source
# File lib/rspecq/queue.rb, line 206
def processed_jobs
  @redis.smembers(key_queue_processed)
end
processed_jobs_count() click to toggle source
# File lib/rspecq/queue.rb, line 202
def processed_jobs_count
  @redis.scard(key_queue_processed)
end
publish(jobs, fail_fast = 0) click to toggle source

NOTE: jobs will be processed from head to tail (lpop)

# File lib/rspecq/queue.rb, line 86
def publish(jobs, fail_fast = 0)
  @redis.multi do
    @redis.hset(key_queue_config, "fail_fast", fail_fast)
    @redis.rpush(key_queue_unprocessed, jobs)
    @redis.set(key_queue_status, STATUS_READY)
  end.first
end
published?() click to toggle source
# File lib/rspecq/queue.rb, line 241
def published?
  @redis.get(key_queue_status) == STATUS_READY
end
record_build_time(duration) click to toggle source
# File lib/rspecq/queue.rb, line 183
def record_build_time(duration)
  @redis.multi do
    @redis.lpush(key_build_times, Float(duration))
    @redis.ltrim(key_build_times, 0, 99)
  end
end
record_example_failure(example_id, message) click to toggle source
# File lib/rspecq/queue.rb, line 170
def record_example_failure(example_id, message)
  @redis.hset(key_failures, example_id, message)
end
record_non_example_error(job, message) click to toggle source

For errors occured outside of examples (e.g. while loading a spec file)

# File lib/rspecq/queue.rb, line 175
def record_non_example_error(job, message)
  @redis.hset(key_errors, job, message)
end
record_timing(job, duration) click to toggle source
# File lib/rspecq/queue.rb, line 179
def record_timing(job, duration)
  @redis.zadd(key_timings, duration, job)
end
record_worker_heartbeat() click to toggle source
# File lib/rspecq/queue.rb, line 190
def record_worker_heartbeat
  @redis.zadd(key_worker_heartbeats, current_time, @worker_id)
end
requeue_job(example, max_requeues, original_worker_id) click to toggle source

Put job at the head of the queue to be re-processed right after, by another worker. This is a mitigation measure against flaky tests.

Returns nil if the job hit the requeue limit and therefore was not requeued and should be considered a failure.

# File lib/rspecq/queue.rb, line 135
def requeue_job(example, max_requeues, original_worker_id)
  return false if max_requeues.zero?

  job = example.id
  location = example.location_rerun_argument

  @redis.eval(
    REQUEUE_JOB,
    keys: [key_queue_unprocessed, key_requeues, key("requeued_job_original_worker"), key("job_location")],
    argv: [job, max_requeues, original_worker_id, location]
  )
end
requeue_lost_job() click to toggle source
# File lib/rspecq/queue.rb, line 105
def requeue_lost_job
  @redis.eval(
    REQUEUE_LOST_JOB,
    keys: [
      key_worker_heartbeats,
      key_queue_running,
      key_queue_unprocessed
    ],
    argv: [
      current_time,
      WORKER_LIVENESS_SEC
    ]
  )
end
requeued_jobs() click to toggle source
# File lib/rspecq/queue.rb, line 210
def requeued_jobs
  @redis.hgetall(key_requeues)
end
reserve_job() click to toggle source
# File lib/rspecq/queue.rb, line 94
def reserve_job
  @redis.eval(
    RESERVE_JOB,
    keys: [
      key_queue_unprocessed,
      key_queue_running,
    ],
    argv: [@worker_id]
  )
end
save_worker_seed(worker, seed) click to toggle source
# File lib/rspecq/queue.rb, line 148
def save_worker_seed(worker, seed)
  @redis.hset(key("worker_seed"), worker, seed)
end
timings() click to toggle source

ordered by execution time desc (slowest are in the head)

# File lib/rspecq/queue.rb, line 219
def timings
  Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)]
end
unprocessed_jobs() click to toggle source

The remaining jobs to be processed. Jobs at the head of the list will be procesed first.

# File lib/rspecq/queue.rb, line 261
def unprocessed_jobs
  @redis.lrange(key_queue_unprocessed, 0, -1)
end
wait_until_published(timeout = 30) click to toggle source
# File lib/rspecq/queue.rb, line 245
def wait_until_published(timeout = 30)
  (timeout * 10).times do
    return if published?

    sleep 0.1
  end

  raise "Queue not yet published after #{timeout} seconds"
end

Private Instance Methods

current_time() click to toggle source

We don't use any Ruby `Time` methods because specs that use timecop in before(:all) hooks will mess up our times.

# File lib/rspecq/queue.rb, line 388
def current_time
  @redis.time[0]
end
key(*keys) click to toggle source
# File lib/rspecq/queue.rb, line 382
def key(*keys)
  [@build_id, keys].join(":")
end