class Sidekiq::Throttled::QueuesPauser
Singleton class used to pause queues from being processed. For the sake of efficiency it uses {Communicator} behind the scene to notify all processes about paused/resumed queues.
@private
Constants
- PAUSED_QUEUES
Redis key of Set with paused queues.
@return [String]
- PAUSE_MESSAGE
{Communicator} message used to notify that queue needs to be paused.
@return [String]
- RESUME_MESSAGE
{Communicator} message used to notify that queue needs to be resumed.
@return [String]
Public Class Methods
Initializes singleton instance.
# File lib/sidekiq/throttled/queues_pauser.rb, line 40 def initialize @paused_queues = Set.new @communicator = Communicator.instance @mutex = Mutex.new end
Public Instance Methods
Returns queues list with paused queues being stripped out.
@private @return [Array<String>]
# File lib/sidekiq/throttled/queues_pauser.rb, line 67 def filter(queues) @mutex.synchronize { queues - @paused_queues.to_a } rescue => e Sidekiq.logger.error { "[#{self.class}] Failed filter queues: #{e}" } queues end
Pauses given `queue`.
@param [#to_s] queue @return [void]
# File lib/sidekiq/throttled/queues_pauser.rb, line 85 def pause!(queue) queue = QueueName.normalize queue.to_s Sidekiq.redis do |conn| conn.sadd(PAUSED_QUEUES, queue) @communicator.transmit(conn, PAUSE_MESSAGE, queue) end end
Checks if given `queue` is paused.
@param queue [#to_s] @return [Boolean]
# File lib/sidekiq/throttled/queues_pauser.rb, line 98 def paused?(queue) queue = QueueName.normalize queue.to_s Sidekiq.redis { |conn| conn.sismember(PAUSED_QUEUES, queue) } end
Returns list of paused queues.
@return [Array<String>]
# File lib/sidekiq/throttled/queues_pauser.rb, line 77 def paused_queues Sidekiq.redis { |conn| conn.smembers(PAUSED_QUEUES).to_a } end
Resumes given `queue`.
@param [#to_s] queue @return [void]
# File lib/sidekiq/throttled/queues_pauser.rb, line 107 def resume!(queue) queue = QueueName.normalize queue.to_s Sidekiq.redis do |conn| conn.srem(PAUSED_QUEUES, queue) @communicator.transmit(conn, RESUME_MESSAGE, queue) end end
Configures Sidekiq
server to keep actual list of paused queues.
@private @return [void]
# File lib/sidekiq/throttled/queues_pauser.rb, line 50 def setup! Patches::Queue.apply! Sidekiq.configure_server do |config| config.on(:startup) { start_watcher } config.on(:quiet) { stop_watcher } @communicator.receive(PAUSE_MESSAGE, &method(:add)) @communicator.receive(RESUME_MESSAGE, &method(:delete)) @communicator.ready { sync! } end end
Private Instance Methods
# File lib/sidekiq/throttled/queues_pauser.rb, line 118 def add(queue) @mutex.synchronize do @paused_queues << QueueName.expand(queue) end end
# File lib/sidekiq/throttled/queues_pauser.rb, line 124 def delete(queue) @mutex.synchronize do @paused_queues.delete QueueName.expand(queue) end end
# File lib/sidekiq/throttled/queues_pauser.rb, line 136 def start_watcher @mutex.synchronize do @watcher ||= Concurrent::TimerTask.execute({ :run_now => true, :execution_interval => 60 }) { sync! } end end
# File lib/sidekiq/throttled/queues_pauser.rb, line 145 def stop_watcher @mutex.synchronize do defined?(@watcher) && @watcher&.shutdown end end
# File lib/sidekiq/throttled/queues_pauser.rb, line 130 def sync! @mutex.synchronize do @paused_queues.replace(paused_queues.map { |q| QueueName.expand q }) end end