class Sidekiq::ReliableFetcher

Constants

DEFAULT_CLEANING_INTERVAL
DEFAULT_DEAD_AFTER
IDLE_TIMEOUT
UnitOfWork
WORKING_QUEUE

Public Class Methods

bulk_requeue(inprogress, options) click to toggle source

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor

# File lib/sidekiq/reliable_fetcher.rb, line 65
def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }

  Sidekiq.redis do |conn|
    conn.pipelined do
      inprogress.each do |unit_of_work|
        conn.lpush("#{unit_of_work.queue}", unit_of_work.message)
        conn.lrem("#{unit_of_work.queue}:#{WORKING_QUEUE}", 1, unit_of_work.message)
      end
    end
  end

  Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
new(options) click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 15
def initialize(options)
  queues = options[:queues].map { |q| "queue:#{q}" }

  @unique_queues = queues.uniq
  @queues_iterator = queues.shuffle.cycle
  @queues_size  = queues.size

  @nb_fetched_jobs = 0
  @cleaning_interval = options[:cleaning_interval] || DEFAULT_CLEANING_INTERVAL
  @consider_dead_after = options[:consider_dead_after] || DEFAULT_DEAD_AFTER
end
requeue_on_startup!(queues) click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 47
def self.requeue_on_startup!(queues)
  Sidekiq.logger.debug { "Re-queueing working jobs" }

  counter = 0

  Sidekiq.redis do |conn|
    queues.uniq.each do |queue|
      while conn.rpoplpush("queue:#{queue}:#{WORKING_QUEUE}", "queue:#{queue}")
        counter += 1
      end
    end
  end

  Sidekiq.logger.debug { "Re-queued #{counter} jobs" }
end
setup_reliable_fetch!(config) click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 8
def self.setup_reliable_fetch!(config)
  config.options[:fetch] = Sidekiq::ReliableFetcher
  config.on(:startup) do
    requeue_on_startup!(config.options[:queues])
  end
end

Public Instance Methods

retrieve_work() click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 27
def retrieve_work
  clean_working_queues! if @cleaning_interval != -1 && @nb_fetched_jobs >= @cleaning_interval

  @queues_size.times do
    queue = @queues_iterator.next
    work = Sidekiq.redis { |conn| conn.rpoplpush(queue, "#{queue}:#{WORKING_QUEUE}") }

    if work
      @nb_fetched_jobs += 1
      return UnitOfWork.new(queue, work)
    end
  end

  # We didn't find a job in any of the configured queues. Let's sleep a bit
  # to avoid uselessly burning too much CPU
  sleep(IDLE_TIMEOUT)

  nil
end

Private Instance Methods

clean_working_queue!(queue) click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 123
def clean_working_queue!(queue)
  Sidekiq.redis do |conn|
    working_jobs = conn.lrange("#{queue}:#{WORKING_QUEUE}", 0, -1)
    working_jobs.each do |job|
      enqueued_at = Sidekiq.load_json(job)['enqueued_at'].to_i
      job_duration = Time.now.to_i - enqueued_at

      next if job_duration < @consider_dead_after

      Sidekiq.logger.info "Requeued a dead job from #{queue}:#{WORKING_QUEUE}"

      conn.lpush("#{queue}", job)
      conn.lrem("#{queue}:#{WORKING_QUEUE}", 1, job)
    end
  end
end
clean_working_queues!() click to toggle source

Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably. NOTE Potential problem here if a specific job always make a worker really fail.

# File lib/sidekiq/reliable_fetcher.rb, line 113
def clean_working_queues!
  Sidekiq.logger.debug "Cleaning working queues"

  @unique_queues.each do |queue|
    clean_working_queue!(queue)
  end

  @nb_fetched_jobs = 0
end