class Sidekiq::PrioritizedQueues::Fetch
Constants
- UnitOfWork
Public Class Methods
bulk_requeue(inprogress, options)
click to toggle source
By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor
# File lib/sidekiq/prioritized_queues/fetch.rb, line 32 def self.bulk_requeue(inprogress, options) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue_name] ||= [] jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message end Sidekiq.redis do |conn| conn.pipelined do jobs_to_requeue.each do |queue, jobs| jobs.each { |job| conn.zadd("queue:#{queue}", 0, job) } end end end Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}") end
new(options)
click to toggle source
# File lib/sidekiq/prioritized_queues/fetch.rb, line 4 def initialize(options) @strictly_ordered_queues = !!options[:strict] @queues = options[:queues].map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end
Public Instance Methods
queues()
click to toggle source
# File lib/sidekiq/prioritized_queues/fetch.rb, line 68 def queues @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq end
retrieve_work()
click to toggle source
# File lib/sidekiq/prioritized_queues/fetch.rb, line 10 def retrieve_work work = nil Sidekiq.redis do |conn| queues.find do |queue| response = conn.multi do conn.zrange(queue, 0, 0) conn.zremrangebyrank(queue, 0, 0) end.flatten(1) next if response.length == 1 work = [queue, response.first] break end end return UnitOfWork.new(*work) if work sleep 1; nil end