class Sidekiq::PriorityQueue::Fetch

Constants

UnitOfWork

Public Class Methods

new(options) click to toggle source
# File lib/sidekiq/priority_queue/fetch.rb, line 38
def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "priority-queue:#{q}" }
  @queues = @queues.uniq if @strictly_ordered_queues
end

Public Instance Methods

bulk_requeue(inprogress, options) click to toggle source
# File lib/sidekiq/priority_queue/fetch.rb, line 64
def bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue] ||= []
    jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
  end

  Sidekiq.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue, jobs|
        conn.zadd(queue, jobs.map{|j| [0,j] })
      end
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
queues_cmd() click to toggle source
# File lib/sidekiq/priority_queue/fetch.rb, line 56
def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    @queues.shuffle.uniq
  end
end
retrieve_work() click to toggle source
# File lib/sidekiq/priority_queue/fetch.rb, line 44
def retrieve_work
  work = @queues.detect{ |q| job = zpopmin(q); break [q,job] if job }
  UnitOfWork.new(*work) if work
end
zpopmin(queue) click to toggle source
# File lib/sidekiq/priority_queue/fetch.rb, line 49
def zpopmin(queue)
  Sidekiq.redis do |con|
    @script_sha ||= con.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN)
    con.evalsha(@script_sha, [queue])
  end
end