class Sidekiq::PriorityQueue::ReliableFetch
Constants
- UnitOfWork
Public Class Methods
new(options)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 37 def initialize(options) @strictly_ordered_queues = !!options[:strict] @queues = options[:queues].map { |q| "priority-queue:#{q}" } @queues = @queues.uniq if @strictly_ordered_queues @process_index = options[:index] || ENV['PROCESS_INDEX'] end
resume_wip_jobs(queues, process_index)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 81 def self.resume_wip_jobs(queues, process_index) Sidekiq.logger.debug { "Re-queueing WIP jobs" } process_index ||= ENV['PROCESS_INDEX'] requeue_wip_jobs(queues, process_index) end
Private Class Methods
reliable_fetch_active?(config)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 97 def self.reliable_fetch_active?(config) return true if config.options[:fetch].is_a?(Sidekiq::PriorityQueue::ReliableFetch) return config.options[:fetch].is_a?(Sidekiq::PriorityQueue::CombinedFetch) && config.options[:fetch].fetches.any? { |f| f.is_a?(Sidekiq::PriorityQueue::ReliableFetch) } end
requeue_wip_jobs(queues, index)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 103 def self.requeue_wip_jobs(queues, index) jobs_to_requeue = {} Sidekiq.redis do |conn| queues.map { |q| "priority-queue:#{q}" }.each do |q| wip_queue = "#{q}_#{Socket.gethostname}_#{index}" jobs_to_requeue[q] = [] while job = conn.spop(wip_queue) do jobs_to_requeue[q] << job end end conn.pipelined do jobs_to_requeue.each do |queue, jobs| return unless jobs.size > 0 conn.zadd(queue, jobs.map{|j| [0,j] }) end end end Sidekiq.logger.info("Pushed #{ jobs_to_requeue.map{|q| q.size }.reduce(:+) } jobs back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{ jobs_to_requeue.map{|q| q.size }.reduce(:+) } jobs: #{ex.message}") end
Public Instance Methods
bulk_requeue(_inprogress, options)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 75 def bulk_requeue(_inprogress, options) Sidekiq.logger.debug { "Re-queueing terminated jobs" } process_index = options[:index] || ENV['PROCESS_INDEX'] self.class.requeue_wip_jobs(options[:queues], process_index) end
queues_cmd()
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 67 def queues_cmd if @strictly_ordered_queues @queues else @queues.shuffle.uniq end end
retrieve_work()
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 44 def retrieve_work work = @queues.detect do |q| job = zpopmin_sadd(q, wip_queue(q)); break [q,job] if job end UnitOfWork.new(*work, wip_queue(work.first)) if work end
spop(wip_queue)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 63 def spop(wip_queue) Sidekiq.redis{ |con| con.spop(wip_queue) } end
wip_queue(q)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 52 def wip_queue(q) "#{q}_#{Socket.gethostname}_#{@process_index}" end
zpopmin_sadd(queue, wip_queue)
click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 56 def zpopmin_sadd(queue, wip_queue) Sidekiq.redis do |con| @script_sha ||= con.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN_SADD) con.evalsha(@script_sha, [queue, wip_queue]) end end