class Sidekiq::PriorityQueue::ReliableFetch

Constants

UnitOfWork

Public Class Methods

new(options) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 37
def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "priority-queue:#{q}" }
  @queues = @queues.uniq if @strictly_ordered_queues
  @process_index = options[:index] || ENV['PROCESS_INDEX']
end
resume_wip_jobs(queues, process_index) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 81
def self.resume_wip_jobs(queues, process_index)
  Sidekiq.logger.debug { "Re-queueing WIP jobs" }
  process_index ||= ENV['PROCESS_INDEX']
  requeue_wip_jobs(queues, process_index)
end

Private Class Methods

reliable_fetch_active?(config) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 97
def self.reliable_fetch_active?(config)
  return true if config.options[:fetch].is_a?(Sidekiq::PriorityQueue::ReliableFetch)
  return config.options[:fetch].is_a?(Sidekiq::PriorityQueue::CombinedFetch) &&
    config.options[:fetch].fetches.any? { |f| f.is_a?(Sidekiq::PriorityQueue::ReliableFetch) }
end
requeue_wip_jobs(queues, index) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 103
def self.requeue_wip_jobs(queues, index)
  jobs_to_requeue = {}
  Sidekiq.redis do |conn|
    queues.map { |q| "priority-queue:#{q}" }.each do |q|
      wip_queue = "#{q}_#{Socket.gethostname}_#{index}"
      jobs_to_requeue[q] = []
      while job = conn.spop(wip_queue) do
        jobs_to_requeue[q] << job
      end
    end

    conn.pipelined do
      jobs_to_requeue.each do |queue, jobs|
        return unless jobs.size > 0
        conn.zadd(queue, jobs.map{|j| [0,j] })
      end
    end
  end
  Sidekiq.logger.info("Pushed #{ jobs_to_requeue.map{|q| q.size }.reduce(:+) } jobs back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{ jobs_to_requeue.map{|q| q.size }.reduce(:+) } jobs: #{ex.message}")
end

Public Instance Methods

bulk_requeue(_inprogress, options) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 75
def bulk_requeue(_inprogress, options)
  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  process_index = options[:index] || ENV['PROCESS_INDEX']
  self.class.requeue_wip_jobs(options[:queues], process_index)
end
queues_cmd() click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 67
def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    @queues.shuffle.uniq
  end
end
retrieve_work() click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 44
def retrieve_work
  work = @queues.detect do |q|
    job = zpopmin_sadd(q, wip_queue(q));
    break [q,job] if job
  end
  UnitOfWork.new(*work, wip_queue(work.first)) if work
end
spop(wip_queue) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 63
def spop(wip_queue)
  Sidekiq.redis{ |con| con.spop(wip_queue) }
end
wip_queue(q) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 52
def wip_queue(q)
  "#{q}_#{Socket.gethostname}_#{@process_index}"
end
zpopmin_sadd(queue, wip_queue) click to toggle source
# File lib/sidekiq/priority_queue/reliable_fetch.rb, line 56
def zpopmin_sadd(queue, wip_queue)
  Sidekiq.redis do |con|
    @script_sha ||= con.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN_SADD)
    con.evalsha(@script_sha, [queue, wip_queue])
  end
end