class Sidekiq::Throttled::Fetch
Throttled
fetch strategy.
@private
Constants
- TIMEOUT
Timeout to sleep between fetch retries in case of no job received, as well as timeout to wait for redis to give us something to work.
Public Class Methods
new(options)
click to toggle source
Initializes fetcher instance. @param options [Hash] @option options [Integer] :throttled_queue_cooldown (TIMEOUT
)
Min delay in seconds before queue will be polled again after throttled job.
@option options [Boolean] :strict (false) @option options [Array<#to_s>] :queue
# File lib/sidekiq/throttled/fetch.rb, line 49 def initialize(options) @paused = ExpirableList.new(options.fetch(:throttled_queue_cooldown, TIMEOUT)) @strict = options.fetch(:strict, false) @queues = options.fetch(:queues).map { |q| QueueName.expand q } raise ArgumentError, "empty :queues" if @queues.empty? @queues.uniq! if @strict end
Public Instance Methods
retrieve_work()
click to toggle source
Retrieves job from redis.
@return [Sidekiq::Throttled::UnitOfWork, nil]
# File lib/sidekiq/throttled/fetch.rb, line 63 def retrieve_work work = brpop return unless work work = UnitOfWork.new(*work) return work unless work.throttled? work.requeue_throttled @paused << QueueName.expand(work.queue_name) nil end
Private Instance Methods
brpop()
click to toggle source
Tries to pop pair of `queue` and job `message` out of sidekiq queues.
@see redis.io/commands/brpop @return [Array(String, String), nil]
# File lib/sidekiq/throttled/fetch.rb, line 82 def brpop queues = filter_queues(@strict ? @queues : @queues.shuffle.uniq) if queues.empty? sleep TIMEOUT return end Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) } end
filter_queues(queues)
click to toggle source
Returns list of queues to try to fetch jobs from.
@note It may return an empty array. @param [Array<String>] queues @return [Array<String>]
# File lib/sidekiq/throttled/fetch.rb, line 98 def filter_queues(queues) QueuesPauser.instance.filter(queues) - @paused.to_a end