class ZSpec::Queue
Attributes
counter_name[R]
done_queue_name[R]
metadata_hash_name[R]
pending_queue_name[R]
processing_queue_name[R]
workers_ready_key_name[R]
Public Class Methods
new(sink:, build_prefix:, retries:, timeout:)
click to toggle source
# File lib/zspec/queue.rb, line 10 def initialize(sink:, build_prefix:, retries:, timeout:) @sink = sink @retries = retries.to_i @timeout = timeout.to_i @counter_name = build_prefix + ":count" @pending_queue_name = build_prefix + ":pending" @processing_queue_name = build_prefix + ":processing" @done_queue_name = build_prefix + ":done" @metadata_hash_name = build_prefix + ":metadata" @workers_ready_key_name = build_prefix + ":ready" end
Public Instance Methods
cleanup(expire_seconds = EXPIRE_SECONDS)
click to toggle source
# File lib/zspec/queue.rb, line 22 def cleanup(expire_seconds = EXPIRE_SECONDS) @sink.expire(@counter_name, expire_seconds) @sink.expire(@pending_queue_name, expire_seconds) @sink.expire(@processing_queue_name, expire_seconds) @sink.expire(@done_queue_name, expire_seconds) @sink.expire(@metadata_hash_name, expire_seconds) @sink.expire(@workers_ready_key_name, expire_seconds) end
done_queue()
click to toggle source
# File lib/zspec/queue.rb, line 39 def done_queue Enumerator.new do |yielder| until workers_ready? && complete? expire_processing _list, message = @sink.brpop(@done_queue_name, timeout: 1) if message.nil? yielder << [nil, nil] next end if @sink.hget(@metadata_hash_name, dedupe_key(message)) yielder << [nil, nil] next end results = @sink.hget(@metadata_hash_name, results_key(message)) if results.nil? yielder << [nil, nil] next end stdout = @sink.hget(@metadata_hash_name, stdout_key(message)) @sink.hset(@metadata_hash_name, dedupe_key(message), true) @sink.decr(@counter_name) yielder << [results, stdout] end end end
enqueue(messages)
click to toggle source
# File lib/zspec/queue.rb, line 31 def enqueue(messages) messages.each do |message| @sink.lpush(@pending_queue_name, message) @sink.incr(@counter_name) end @sink.set(@workers_ready_key_name, true) end
pending_queue()
click to toggle source
# File lib/zspec/queue.rb, line 71 def pending_queue Enumerator.new do |yielder| until workers_ready? && complete? message = @sink.brpoplpush(@pending_queue_name, @processing_queue_name, timeout: 1) if message.nil? yielder << nil next end @sink.hset(@metadata_hash_name, timeout_key(message), @sink.time.first) yielder << message end end end
resolve(failed, message, results, stdout)
click to toggle source
# File lib/zspec/queue.rb, line 85 def resolve(failed, message, results, stdout) if failed && (count = retry_count(message)) && (count < @retries) retry_message(message, count) else resolve_message(message, results, stdout) end end
Private Instance Methods
complete?()
click to toggle source
# File lib/zspec/queue.rb, line 113 def complete? @sink.get(@counter_name).to_i == 0 end
expire_processing()
click to toggle source
# File lib/zspec/queue.rb, line 95 def expire_processing processing.each do |message| next unless expired?(message) @sink.lrem(@processing_queue_name, 0, message) @sink.rpush(@pending_queue_name, message) @sink.hdel(@metadata_hash_name, timeout_key(message)) end end
expired?(message)
click to toggle source
# File lib/zspec/queue.rb, line 121 def expired?(message) proccess_time = @sink.hget(@metadata_hash_name, timeout_key(message)).to_i (@sink.time.first - proccess_time) > @timeout end
processing()
click to toggle source
# File lib/zspec/queue.rb, line 109 def processing @sink.lrange(@processing_queue_name, 0, -1) end
resolve_message(message, results, stdout)
click to toggle source
# File lib/zspec/queue.rb, line 126 def resolve_message(message, results, stdout) @sink.hset(@metadata_hash_name, stdout_key(message), stdout) @sink.hset(@metadata_hash_name, results_key(message), results) @sink.lrem(@processing_queue_name, 0, message) @sink.lpush(@done_queue_name, message) end
retry_count(message)
click to toggle source
# File lib/zspec/queue.rb, line 117 def retry_count(message) @sink.hget(@metadata_hash_name, retry_key(message)).to_i end
retry_message(message, count)
click to toggle source
# File lib/zspec/queue.rb, line 133 def retry_message(message, count) @sink.hdel(@metadata_hash_name, timeout_key(message)) @sink.hset(@metadata_hash_name, retry_key(message), count + 1) end
workers_ready?()
click to toggle source
# File lib/zspec/queue.rb, line 105 def workers_ready? @sink.get(@workers_ready_key_name) end