class RedisSafeQueue
Constants
- TX_COMMIT_TIMEOUT
redis must process all commands within 10s
Attributes
iterations[RW]
prefix[RW]
queue_id[RW]
redis[RW]
timeout[RW]
Public Class Methods
new(opts)
click to toggle source
# File lib/redis_safe_queue.rb, line 11 def initialize(opts) @redis = opts.fetch(:redis) @prefix = opts[:redis_prefix] || :redis_safe_queue @queue_id = opts.fetch(:queue_id) @timeout = opts.fetch(:timeout).to_i @iterations = 0 end
Public Instance Methods
finish() { || ... }
click to toggle source
# File lib/redis_safe_queue.rb, line 39 def finish sleep 1 while size > 0 lock = @redis.setnx(key(:finished), 1) yield if lock @redis.expire(key, @timeout) @redis.expire(key(:finished), @timeout) end
push(job_data)
click to toggle source
# File lib/redis_safe_queue.rb, line 19 def push(job_data) job_id = unique_id @redis.set(key(job_id, :job), job_data) @redis.sadd(key, job_id) end
size()
click to toggle source
# File lib/redis_safe_queue.rb, line 49 def size @redis.scard(key) end
work(max_jobs = nil) { |job| ... }
click to toggle source
# File lib/redis_safe_queue.rb, line 25 def work(max_jobs = nil) loop do break if max_jobs == 0 if job_id = start_tx job = get_job(job_id) yield(job) commit_tx(job_id) max_jobs -= 1 if max_jobs else break end end end
Private Instance Methods
commit_tx(job_id)
click to toggle source
# File lib/redis_safe_queue.rb, line 77 def commit_tx(job_id) @redis.pipelined do @redis.expire(key(job_id, :lock), TX_COMMIT_TIMEOUT) @redis.srem(key, job_id) @redis.del(key(job_id, :job)) end end
get_job(job_id)
click to toggle source
# File lib/redis_safe_queue.rb, line 89 def get_job(job_id) @redis.get(key(job_id, :job)) end
key(*append)
click to toggle source
# File lib/redis_safe_queue.rb, line 93 def key(*append) [@prefix, @queue_id, append].flatten.compact * ":" end
start_tx()
click to toggle source
# File lib/redis_safe_queue.rb, line 55 def start_tx job_id = nil loop do @iterations +=1 candidate = @redis.srandmember(key) return unless candidate if lock = @redis.setnx(key(candidate, :lock), Time.now.to_i) @redis.expire(key(candidate, :lock), @timeout) return candidate else lock_time = @redis.get(key(candidate, :lock)).to_i if (Time.now.to_i - lock_time) > @timeout @redis.del(key(candidate, :lock)) end end end end
unique_id()
click to toggle source
# File lib/redis_safe_queue.rb, line 85 def unique_id rand(8**32).to_s(36) end