class Quiq::Queue
Constants
- DEAD_LETTER_QUEUE
- PREFIX
- PROCESSING_SUFFIX
Attributes
name[R]
processing[R]
Public Class Methods
delete(queue, job)
click to toggle source
# File lib/quiq/queue.rb, line 48 def self.delete(queue, job) Quiq.redis.lrem(queue, 0, job) end
formatted_name(name)
click to toggle source
# File lib/quiq/queue.rb, line 52 def self.formatted_name(name) "#{PREFIX}:#{name}" end
new(name)
click to toggle source
# File lib/quiq/queue.rb, line 11 def initialize(name) @name = self.class.formatted_name(name) @processing = self.class.processing_name(name) end
processing_name(name)
click to toggle source
# File lib/quiq/queue.rb, line 56 def self.processing_name(name) "#{PREFIX}:#{name}:#{PROCESSING_SUFFIX}" end
push(queue, job)
click to toggle source
# File lib/quiq/queue.rb, line 43 def self.push(queue, job) @queue = new(queue) @queue.push(job) end
send_to_dlq(job)
click to toggle source
# File lib/quiq/queue.rb, line 60 def self.send_to_dlq(job) @dlq ||= Queue.new(DEAD_LETTER_QUEUE) @dlq.push(job) end
Public Instance Methods
pop()
click to toggle source
# File lib/quiq/queue.rb, line 24 def pop Quiq.redis.brpoplpush(@name, @processing, 0) end
purge_processing!()
click to toggle source
Insert elements that weren't fully processed at the tail of the queue to avoid loss @note that they should be enqueued at the head of the queue, but Redis
lacks a LPOPRPUSH command
# File lib/quiq/queue.rb, line 30 def purge_processing! Async do Quiq.redis.pipeline do |pipe| loop do job = pipe.sync.call('RPOPLPUSH', @processing, @name) Quiq.logger.warn("Requeuing job #{job} in #{@name}") unless job.nil? break if job.nil? end pipe.close end end.wait end
push(job)
click to toggle source
# File lib/quiq/queue.rb, line 16 def push(job) pushed = Quiq.redis.lpush(@name, job) return unless pushed <= 0 Quiq.logger.error("Could not push to the queue: #{@name}") false end