class RedisSafeQueue

Constants

TX_COMMIT_TIMEOUT

redis must process all commands within 10s

Attributes

iterations[RW]
prefix[RW]
queue_id[RW]
redis[RW]
timeout[RW]

Public Class Methods

new(opts) click to toggle source
# File lib/redis_safe_queue.rb, line 11
def initialize(opts)
  @redis = opts.fetch(:redis)
  @prefix = opts[:redis_prefix] || :redis_safe_queue
  @queue_id = opts.fetch(:queue_id)
  @timeout = opts.fetch(:timeout).to_i
  @iterations = 0
end

Public Instance Methods

finish() { || ... } click to toggle source
# File lib/redis_safe_queue.rb, line 39
def finish
  sleep 1 while size > 0

  lock = @redis.setnx(key(:finished), 1)
  yield if lock

  @redis.expire(key, @timeout)
  @redis.expire(key(:finished), @timeout)
end
push(job_data) click to toggle source
# File lib/redis_safe_queue.rb, line 19
def push(job_data)
  job_id = unique_id
  @redis.set(key(job_id, :job), job_data)
  @redis.sadd(key, job_id)
end
size() click to toggle source
# File lib/redis_safe_queue.rb, line 49
def size
  @redis.scard(key)
end
work(max_jobs = nil) { |job| ... } click to toggle source
# File lib/redis_safe_queue.rb, line 25
def work(max_jobs = nil)
  loop do
    break if max_jobs == 0
    if job_id = start_tx
      job = get_job(job_id)
      yield(job)
      commit_tx(job_id)
      max_jobs -= 1 if max_jobs
    else
      break
    end
  end
end

Private Instance Methods

commit_tx(job_id) click to toggle source
# File lib/redis_safe_queue.rb, line 77
def commit_tx(job_id)
  @redis.pipelined do
    @redis.expire(key(job_id, :lock), TX_COMMIT_TIMEOUT)
    @redis.srem(key, job_id)
    @redis.del(key(job_id, :job))
  end
end
get_job(job_id) click to toggle source
# File lib/redis_safe_queue.rb, line 89
def get_job(job_id)
  @redis.get(key(job_id, :job))
end
key(*append) click to toggle source
# File lib/redis_safe_queue.rb, line 93
def key(*append)
  [@prefix, @queue_id, append].flatten.compact * ":"
end
start_tx() click to toggle source
# File lib/redis_safe_queue.rb, line 55
def start_tx
  job_id = nil

  loop do
    @iterations +=1

    candidate = @redis.srandmember(key)
    return unless candidate

    if lock = @redis.setnx(key(candidate, :lock), Time.now.to_i)
      @redis.expire(key(candidate, :lock), @timeout)
      return candidate
    else
      lock_time = @redis.get(key(candidate, :lock)).to_i

      if (Time.now.to_i - lock_time) > @timeout
        @redis.del(key(candidate, :lock))
      end
    end
  end
end
unique_id() click to toggle source
# File lib/redis_safe_queue.rb, line 85
def unique_id
  rand(8**32).to_s(36)
end