class Sidekiq::ReliableFetcher
Constants
- DEFAULT_CLEANING_INTERVAL
- DEFAULT_DEAD_AFTER
- IDLE_TIMEOUT
- UnitOfWork
- WORKING_QUEUE
Public Class Methods
bulk_requeue(inprogress, options)
click to toggle source
By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor
# File lib/sidekiq/reliable_fetcher.rb, line 65 def self.bulk_requeue(inprogress, options) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } Sidekiq.redis do |conn| conn.pipelined do inprogress.each do |unit_of_work| conn.lpush("#{unit_of_work.queue}", unit_of_work.message) conn.lrem("#{unit_of_work.queue}:#{WORKING_QUEUE}", 1, unit_of_work.message) end end end Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}") end
new(options)
click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 15 def initialize(options) queues = options[:queues].map { |q| "queue:#{q}" } @unique_queues = queues.uniq @queues_iterator = queues.shuffle.cycle @queues_size = queues.size @nb_fetched_jobs = 0 @cleaning_interval = options[:cleaning_interval] || DEFAULT_CLEANING_INTERVAL @consider_dead_after = options[:consider_dead_after] || DEFAULT_DEAD_AFTER end
requeue_on_startup!(queues)
click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 47 def self.requeue_on_startup!(queues) Sidekiq.logger.debug { "Re-queueing working jobs" } counter = 0 Sidekiq.redis do |conn| queues.uniq.each do |queue| while conn.rpoplpush("queue:#{queue}:#{WORKING_QUEUE}", "queue:#{queue}") counter += 1 end end end Sidekiq.logger.debug { "Re-queued #{counter} jobs" } end
setup_reliable_fetch!(config)
click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 8 def self.setup_reliable_fetch!(config) config.options[:fetch] = Sidekiq::ReliableFetcher config.on(:startup) do requeue_on_startup!(config.options[:queues]) end end
Public Instance Methods
retrieve_work()
click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 27 def retrieve_work clean_working_queues! if @cleaning_interval != -1 && @nb_fetched_jobs >= @cleaning_interval @queues_size.times do queue = @queues_iterator.next work = Sidekiq.redis { |conn| conn.rpoplpush(queue, "#{queue}:#{WORKING_QUEUE}") } if work @nb_fetched_jobs += 1 return UnitOfWork.new(queue, work) end end # We didn't find a job in any of the configured queues. Let's sleep a bit # to avoid uselessly burning too much CPU sleep(IDLE_TIMEOUT) nil end
Private Instance Methods
clean_working_queue!(queue)
click to toggle source
# File lib/sidekiq/reliable_fetcher.rb, line 123 def clean_working_queue!(queue) Sidekiq.redis do |conn| working_jobs = conn.lrange("#{queue}:#{WORKING_QUEUE}", 0, -1) working_jobs.each do |job| enqueued_at = Sidekiq.load_json(job)['enqueued_at'].to_i job_duration = Time.now.to_i - enqueued_at next if job_duration < @consider_dead_after Sidekiq.logger.info "Requeued a dead job from #{queue}:#{WORKING_QUEUE}" conn.lpush("#{queue}", job) conn.lrem("#{queue}:#{WORKING_QUEUE}", 1, job) end end end
clean_working_queues!()
click to toggle source
Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably. NOTE Potential problem here if a specific job always make a worker really fail.
# File lib/sidekiq/reliable_fetcher.rb, line 113 def clean_working_queues! Sidekiq.logger.debug "Cleaning working queues" @unique_queues.each do |queue| clean_working_queue!(queue) end @nb_fetched_jobs = 0 end