class Sidekiq::Throttled::QueuesPauser

Singleton class used to pause queues from being processed. For the sake of efficiency it uses {Communicator} behind the scene to notify all processes about paused/resumed queues.

@private

Constants

PAUSED_QUEUES

Redis key of Set with paused queues.

@return [String]

PAUSE_MESSAGE

{Communicator} message used to notify that queue needs to be paused.

@return [String]

RESUME_MESSAGE

{Communicator} message used to notify that queue needs to be resumed.

@return [String]

Public Class Methods

new() click to toggle source

Initializes singleton instance.

# File lib/sidekiq/throttled/queues_pauser.rb, line 40
def initialize
  @paused_queues = Set.new
  @communicator  = Communicator.instance
  @mutex         = Mutex.new
end

Public Instance Methods

filter(queues) click to toggle source

Returns queues list with paused queues being stripped out.

@private @return [Array<String>]

# File lib/sidekiq/throttled/queues_pauser.rb, line 67
def filter(queues)
  @mutex.synchronize { queues - @paused_queues.to_a }
rescue => e
  Sidekiq.logger.error { "[#{self.class}] Failed filter queues: #{e}" }
  queues
end
pause!(queue) click to toggle source

Pauses given `queue`.

@param [#to_s] queue @return [void]

# File lib/sidekiq/throttled/queues_pauser.rb, line 85
def pause!(queue)
  queue = QueueName.normalize queue.to_s

  Sidekiq.redis do |conn|
    conn.sadd(PAUSED_QUEUES, queue)
    @communicator.transmit(conn, PAUSE_MESSAGE, queue)
  end
end
paused?(queue) click to toggle source

Checks if given `queue` is paused.

@param queue [#to_s] @return [Boolean]

# File lib/sidekiq/throttled/queues_pauser.rb, line 98
def paused?(queue)
  queue = QueueName.normalize queue.to_s
  Sidekiq.redis { |conn| conn.sismember(PAUSED_QUEUES, queue) }
end
paused_queues() click to toggle source

Returns list of paused queues.

@return [Array<String>]

# File lib/sidekiq/throttled/queues_pauser.rb, line 77
def paused_queues
  Sidekiq.redis { |conn| conn.smembers(PAUSED_QUEUES).to_a }
end
resume!(queue) click to toggle source

Resumes given `queue`.

@param [#to_s] queue @return [void]

# File lib/sidekiq/throttled/queues_pauser.rb, line 107
def resume!(queue)
  queue = QueueName.normalize queue.to_s

  Sidekiq.redis do |conn|
    conn.srem(PAUSED_QUEUES, queue)
    @communicator.transmit(conn, RESUME_MESSAGE, queue)
  end
end
setup!() click to toggle source

Configures Sidekiq server to keep actual list of paused queues.

@private @return [void]

# File lib/sidekiq/throttled/queues_pauser.rb, line 50
def setup!
  Patches::Queue.apply!

  Sidekiq.configure_server do |config|
    config.on(:startup) { start_watcher }
    config.on(:quiet) { stop_watcher }

    @communicator.receive(PAUSE_MESSAGE, &method(:add))
    @communicator.receive(RESUME_MESSAGE, &method(:delete))
    @communicator.ready { sync! }
  end
end

Private Instance Methods

add(queue) click to toggle source
# File lib/sidekiq/throttled/queues_pauser.rb, line 118
def add(queue)
  @mutex.synchronize do
    @paused_queues << QueueName.expand(queue)
  end
end
delete(queue) click to toggle source
# File lib/sidekiq/throttled/queues_pauser.rb, line 124
def delete(queue)
  @mutex.synchronize do
    @paused_queues.delete QueueName.expand(queue)
  end
end
start_watcher() click to toggle source
# File lib/sidekiq/throttled/queues_pauser.rb, line 136
def start_watcher
  @mutex.synchronize do
    @watcher ||= Concurrent::TimerTask.execute({
      :run_now            => true,
      :execution_interval => 60
    }) { sync! }
  end
end
stop_watcher() click to toggle source
# File lib/sidekiq/throttled/queues_pauser.rb, line 145
def stop_watcher
  @mutex.synchronize do
    defined?(@watcher) && @watcher&.shutdown
  end
end
sync!() click to toggle source
# File lib/sidekiq/throttled/queues_pauser.rb, line 130
def sync!
  @mutex.synchronize do
    @paused_queues.replace(paused_queues.map { |q| QueueName.expand q })
  end
end