class Sidekiq::BasicFetch

Constants

TIMEOUT

We want the fetch operation to timeout every few seconds so the thread can check if the process is shutting down.

UnitOfWork

Public Class Methods

new(cap) click to toggle source
# File lib/sidekiq/fetch.rb, line 30
def initialize(cap)
  raise ArgumentError, "missing queue list" unless cap.queues
  @config = cap
  @strictly_ordered_queues = cap.mode == :strict
  @queues = config.queues.map { |q| "queue:#{q}" }
  @queues.uniq! if @strictly_ordered_queues
end

Public Instance Methods

bulk_requeue(inprogress) click to toggle source
# File lib/sidekiq/fetch.rb, line 51
def bulk_requeue(inprogress)
  return if inprogress.empty?

  logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue] ||= []
    jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
  end

  redis do |conn|
    conn.pipelined do |pipeline|
      jobs_to_requeue.each do |queue, jobs|
        pipeline.rpush(queue, jobs)
      end
    end
  end
  logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
  logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
queues_cmd() click to toggle source

Creating the Redis#brpop command takes into account any configured queue weights. By default Redis#brpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#brpop to honor weights and avoid queue starvation.

# File lib/sidekiq/fetch.rb, line 78
def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    permute = @queues.shuffle
    permute.uniq!
    permute
  end
end
retrieve_work() click to toggle source
# File lib/sidekiq/fetch.rb, line 38
def retrieve_work
  qs = queues_cmd
  # 4825 Sidekiq Pro with all queues paused will return an
  # empty set of queues
  if qs.size <= 0
    sleep(TIMEOUT)
    return nil
  end

  queue, job = redis { |conn| conn.blocking_call(TIMEOUT, "brpop", *qs, TIMEOUT) }
  UnitOfWork.new(queue, job, config) if queue
end