class Sidekiq::Scheduled::Poller
The Poller
checks Redis every N seconds for jobs in the retry or scheduled set have passed their timestamp and should be enqueued. If so, it just pops the job back onto its original queue so the workers can pick it up like any other job.
Constants
- INITIAL_WAIT
Public Class Methods
# File lib/sidekiq/scheduled.rb, line 73 def initialize(options) @config = options @enq = (options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new @sleeper = ConnectionPool::TimedStack.new @done = false @thread = nil @count_calls = 0 end
Public Instance Methods
# File lib/sidekiq/scheduled.rb, line 107 def enqueue @enq.enqueue_jobs rescue => ex # Most likely a problem with redis networking. # Punt and try again at the next interval logger.error ex.message handle_exception(ex) end
# File lib/sidekiq/scheduled.rb, line 95 def start @thread ||= safe_thread("scheduler") { initial_wait until @done enqueue wait end logger.info("Scheduler exiting...") } end
Shut down this instance, will pause until the thread is dead.
# File lib/sidekiq/scheduled.rb, line 83 def terminate @done = true @enq.terminate if @enq.respond_to?(:terminate) if @thread t = @thread @thread = nil @sleeper << 0 t.value end end
Private Instance Methods
A copy of Sidekiq::ProcessSet#cleanup because server should never depend on sidekiq/api.
# File lib/sidekiq/scheduled.rb, line 195 def cleanup # dont run cleanup more than once per minute return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) } count = 0 Sidekiq.redis do |conn| procs = conn.sscan_each("processes").to_a heartbeats = conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "info") end } # the hash named key has an expiry of 60 seconds. # if it's not found, that means the process has not reported # in to Redis and probably died. to_prune = procs.select.with_index { |proc, i| heartbeats[i].nil? } count = conn.srem("processes", to_prune) unless to_prune.empty? end count end
# File lib/sidekiq/scheduled.rb, line 219 def initial_wait # Have all processes sleep between 5-15 seconds. 10 seconds to give time for # the heartbeat to register (if the poll interval is going to be calculated by the number # of workers), and 5 random seconds to ensure they don't all hit Redis at the same time. total = 0 total += INITIAL_WAIT unless @config[:poll_interval_average] total += (5 * rand) @sleeper.pop(total) rescue Timeout::Error ensure # periodically clean out the `processes` set in Redis which can collect # references to dead processes over time. The process count affects how # often we scan for scheduled jobs. cleanup end
We do our best to tune the poll interval to the size of the active Sidekiq
cluster. If you have 30 processes and poll every 15 seconds, that means one Sidekiq
is checking Redis every 0.5 seconds - way too often for most people and really bad if the retry or scheduled sets are large.
Instead try to avoid polling more than once every 15 seconds. If you have 30 Sidekiq
processes, we'll poll every 30 * 15 or 450 seconds. To keep things statistically random, we'll sleep a random amount between 225 and 675 seconds for each poll or 450 seconds on average. Otherwise restarting all your Sidekiq
processes at the same time will lead to them all polling at the same time: the thundering herd problem.
We only do this if poll_interval_average
is unset (the default).
# File lib/sidekiq/scheduled.rb, line 176 def poll_interval_average(count) @config[:poll_interval_average] || scaled_poll_interval(count) end
# File lib/sidekiq/scheduled.rb, line 187 def process_count pcount = Sidekiq.redis { |conn| conn.scard("processes") } pcount = 1 if pcount == 0 pcount end
# File lib/sidekiq/scheduled.rb, line 130 def random_poll_interval # We want one Sidekiq process to schedule jobs every N seconds. We have M processes # and **don't** want to coordinate. # # So in N*M second timespan, we want each process to schedule once. The basic loop is: # # * sleep a random amount within that N*M timespan # * wake up and schedule # # We want to avoid one edge case: imagine a set of 2 processes, scheduling every 5 seconds, # so N*M = 10. Each process decides to randomly sleep 8 seconds, now we've failed to meet # that 5 second average. Thankfully each schedule cycle will sleep randomly so the next # iteration could see each process sleep for 1 second, undercutting our average. # # So below 10 processes, we special case and ensure the processes sleep closer to the average. # In the example above, each process should schedule every 10 seconds on average. We special # case smaller clusters to add 50% so they would sleep somewhere between 5 and 15 seconds. # As we run more processes, the scheduling interval average will approach an even spread # between 0 and poll interval so we don't need this artifical boost. # count = process_count interval = poll_interval_average(count) if count < 10 # For small clusters, calculate a random interval that is ±50% the desired average. interval * rand + interval.to_f / 2 else # With 10+ processes, we should have enough randomness to get decent polling # across the entire timespan interval * rand end end
Calculates an average poll interval based on the number of known Sidekiq
processes. This minimizes a single point of failure by dispersing check-ins but without taxing Redis if you run many Sidekiq
processes.
# File lib/sidekiq/scheduled.rb, line 183 def scaled_poll_interval(process_count) process_count * @config[:average_scheduled_poll_interval] end
# File lib/sidekiq/scheduled.rb, line 118 def wait @sleeper.pop(random_poll_interval) rescue Timeout::Error # expected rescue => ex # if poll_interval_average hasn't been calculated yet, we can # raise an error trying to reach Redis. logger.error ex.message handle_exception(ex) sleep 5 end