class Quiq::Queue

Constants

DEAD_LETTER_QUEUE
PREFIX
PROCESSING_SUFFIX

Attributes

name[R]
processing[R]

Public Class Methods

delete(queue, job) click to toggle source
# File lib/quiq/queue.rb, line 48
def self.delete(queue, job)
  Quiq.redis.lrem(queue, 0, job)
end
formatted_name(name) click to toggle source
# File lib/quiq/queue.rb, line 52
def self.formatted_name(name)
  "#{PREFIX}:#{name}"
end
new(name) click to toggle source
# File lib/quiq/queue.rb, line 11
def initialize(name)
  @name = self.class.formatted_name(name)
  @processing = self.class.processing_name(name)
end
processing_name(name) click to toggle source
# File lib/quiq/queue.rb, line 56
def self.processing_name(name)
  "#{PREFIX}:#{name}:#{PROCESSING_SUFFIX}"
end
push(queue, job) click to toggle source
# File lib/quiq/queue.rb, line 43
def self.push(queue, job)
  @queue = new(queue)
  @queue.push(job)
end
send_to_dlq(job) click to toggle source
# File lib/quiq/queue.rb, line 60
def self.send_to_dlq(job)
  @dlq ||= Queue.new(DEAD_LETTER_QUEUE)
  @dlq.push(job)
end

Public Instance Methods

pop() click to toggle source
# File lib/quiq/queue.rb, line 24
def pop
  Quiq.redis.brpoplpush(@name, @processing, 0)
end
purge_processing!() click to toggle source

Insert elements that weren't fully processed at the tail of the queue to avoid loss @note that they should be enqueued at the head of the queue, but Redis lacks a LPOPRPUSH command

# File lib/quiq/queue.rb, line 30
def purge_processing!
  Async do
    Quiq.redis.pipeline do |pipe|
      loop do
        job = pipe.sync.call('RPOPLPUSH', @processing, @name)
        Quiq.logger.warn("Requeuing job #{job} in #{@name}") unless job.nil?
        break if job.nil?
      end
      pipe.close
    end
  end.wait
end
push(job) click to toggle source
# File lib/quiq/queue.rb, line 16
def push(job)
  pushed = Quiq.redis.lpush(@name, job)
  return unless pushed <= 0

  Quiq.logger.error("Could not push to the queue: #{@name}")
  false
end