class ZSpec::Queue

Attributes

counter_name[R]
done_queue_name[R]
metadata_hash_name[R]
pending_queue_name[R]
processing_queue_name[R]
workers_ready_key_name[R]

Public Class Methods

new(sink:, build_prefix:, retries:, timeout:) click to toggle source
# File lib/zspec/queue.rb, line 10
def initialize(sink:, build_prefix:, retries:, timeout:)
  @sink                   = sink
  @retries                = retries.to_i
  @timeout                = timeout.to_i
  @counter_name           = build_prefix + ":count"
  @pending_queue_name     = build_prefix + ":pending"
  @processing_queue_name  = build_prefix + ":processing"
  @done_queue_name        = build_prefix + ":done"
  @metadata_hash_name     = build_prefix + ":metadata"
  @workers_ready_key_name = build_prefix + ":ready"
end

Public Instance Methods

cleanup(expire_seconds = EXPIRE_SECONDS) click to toggle source
# File lib/zspec/queue.rb, line 22
def cleanup(expire_seconds = EXPIRE_SECONDS)
  @sink.expire(@counter_name, expire_seconds)
  @sink.expire(@pending_queue_name, expire_seconds)
  @sink.expire(@processing_queue_name, expire_seconds)
  @sink.expire(@done_queue_name, expire_seconds)
  @sink.expire(@metadata_hash_name, expire_seconds)
  @sink.expire(@workers_ready_key_name, expire_seconds)
end
done_queue() click to toggle source
# File lib/zspec/queue.rb, line 39
def done_queue
  Enumerator.new do |yielder|
    until workers_ready? && complete?
      expire_processing

      _list, message = @sink.brpop(@done_queue_name, timeout: 1)
      if message.nil?
        yielder << [nil, nil]
        next
      end

      if @sink.hget(@metadata_hash_name, dedupe_key(message))
        yielder << [nil, nil]
        next
      end

      results = @sink.hget(@metadata_hash_name, results_key(message))
      if results.nil?
        yielder << [nil, nil]
        next
      end

      stdout = @sink.hget(@metadata_hash_name, stdout_key(message))

      @sink.hset(@metadata_hash_name, dedupe_key(message), true)
      @sink.decr(@counter_name)

      yielder << [results, stdout]
    end
  end
end
enqueue(messages) click to toggle source
# File lib/zspec/queue.rb, line 31
def enqueue(messages)
  messages.each do |message|
    @sink.lpush(@pending_queue_name, message)
    @sink.incr(@counter_name)
  end
  @sink.set(@workers_ready_key_name, true)
end
pending_queue() click to toggle source
# File lib/zspec/queue.rb, line 71
def pending_queue
  Enumerator.new do |yielder|
    until workers_ready? && complete?
      message = @sink.brpoplpush(@pending_queue_name, @processing_queue_name, timeout: 1)
      if message.nil?
        yielder << nil
        next
      end
      @sink.hset(@metadata_hash_name, timeout_key(message), @sink.time.first)
      yielder << message
    end
  end
end
resolve(failed, message, results, stdout) click to toggle source
# File lib/zspec/queue.rb, line 85
def resolve(failed, message, results, stdout)
  if failed && (count = retry_count(message)) && (count < @retries)
    retry_message(message, count)
  else
    resolve_message(message, results, stdout)
  end
end

Private Instance Methods

complete?() click to toggle source
# File lib/zspec/queue.rb, line 113
def complete?
  @sink.get(@counter_name).to_i == 0
end
expire_processing() click to toggle source
# File lib/zspec/queue.rb, line 95
def expire_processing
  processing.each do |message|
    next unless expired?(message)

    @sink.lrem(@processing_queue_name, 0, message)
    @sink.rpush(@pending_queue_name, message)
    @sink.hdel(@metadata_hash_name, timeout_key(message))
  end
end
expired?(message) click to toggle source
# File lib/zspec/queue.rb, line 121
def expired?(message)
  proccess_time = @sink.hget(@metadata_hash_name, timeout_key(message)).to_i
  (@sink.time.first - proccess_time) > @timeout
end
processing() click to toggle source
# File lib/zspec/queue.rb, line 109
def processing
  @sink.lrange(@processing_queue_name, 0, -1)
end
resolve_message(message, results, stdout) click to toggle source
# File lib/zspec/queue.rb, line 126
def resolve_message(message, results, stdout)
  @sink.hset(@metadata_hash_name, stdout_key(message), stdout)
  @sink.hset(@metadata_hash_name, results_key(message), results)
  @sink.lrem(@processing_queue_name, 0, message)
  @sink.lpush(@done_queue_name, message)
end
retry_count(message) click to toggle source
# File lib/zspec/queue.rb, line 117
def retry_count(message)
  @sink.hget(@metadata_hash_name, retry_key(message)).to_i
end
retry_message(message, count) click to toggle source
# File lib/zspec/queue.rb, line 133
def retry_message(message, count)
  @sink.hdel(@metadata_hash_name, timeout_key(message))
  @sink.hset(@metadata_hash_name, retry_key(message), count + 1)
end
workers_ready?() click to toggle source
# File lib/zspec/queue.rb, line 105
def workers_ready?
  @sink.get(@workers_ready_key_name)
end