class Sidekiq::Paquet::Flusher
Public Class Methods
new()
click to toggle source
# File lib/sidekiq/paquet/flusher.rb, line 7 def initialize @task = Concurrent::TimerTask.new( execution_interval: execution_interval) { Bundle.enqueue_jobs } end
Public Instance Methods
shutdown()
click to toggle source
# File lib/sidekiq/paquet/flusher.rb, line 17 def shutdown Sidekiq.logger.info('Paquet flusher exiting...') @task.shutdown end
start()
click to toggle source
# File lib/sidekiq/paquet/flusher.rb, line 12 def start Sidekiq.logger.info('Starting paquet flusher') @task.execute end
Private Instance Methods
execution_interval()
click to toggle source
To avoid having all processes flushing at the same time, randomize the execution interval between 0.5-1.5 the scaled interval, so that on average, interval is respected.
# File lib/sidekiq/paquet/flusher.rb, line 28 def execution_interval avg = scaled_interval.to_f avg * rand + avg / 2 end
scaled_interval()
click to toggle source
Scale interval with the number of Sidekiq
processes running. Each one is going to run a flusher instance. If you have 10 processes and an average flush interval of 10s, it means one process is flushing every second, which is wasteful and beats the purpose of bundling.
To avoid this, we scale the average flush interval with the number of Sidekiq
processes running, i.e instead of flushing every 10s, let every process flush every 100 seconds.
# File lib/sidekiq/paquet/flusher.rb, line 42 def scaled_interval Sidekiq::Paquet.options[:flush_interval] ||= begin pcount = Sidekiq::ProcessSet.new.size pcount = 1 if pcount == 0 # Maybe raise here pcount * Sidekiq::Paquet.options[:average_flush_interval] end end