module Sidekiq::Throttled

Concurrency and threshold throttling for Sidekiq.

Just add somewhere in your bootstrap:

require "sidekiq/throttled"
Sidekiq::Throttled.setup!

Once you've done that you can include {Sidekiq::Throttled::Worker} to your job classes and configure throttling:

class MyWorker
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker

  sidekiq_options :queue => :my_queue

  sidekiq_throttle({
    # Allow maximum 10 concurrent jobs of this class at a time.
    :concurrency => { :limit => 10 },
    # Allow maximum 1K jobs being processed within one hour window.
    :threshold => { :limit => 1_000, :period => 1.hour }
  })

  def perform
    # ...
  end
end

Constants

MUTEX
VERSION

Gem version

Public Class Methods

configuration() click to toggle source

@return [Configuration]

# File lib/sidekiq/throttled.rb, line 52
def configuration
  @configuration ||= Configuration.new
end
setup!() click to toggle source

Hooks throttler into sidekiq.

@return [void]

# File lib/sidekiq/throttled.rb, line 59
def setup!
  Communicator.instance.setup!
  QueuesPauser.instance.setup!

  Sidekiq.configure_server do |config|
    setup_strategy!

    require "sidekiq/throttled/middleware"
    config.server_middleware do |chain|
      chain.add Sidekiq::Throttled::Middleware
    end
  end
end
throttled?(message) click to toggle source

Tells whenever job is throttled or not.

@param [String] message Job's JSON payload @return [Boolean]

# File lib/sidekiq/throttled.rb, line 77
def throttled?(message)
  message = JSON.parse message
  job = message.fetch("class") { return false }
  jid = message.fetch("jid") { return false }

  preload_constant! job

  Registry.get job do |strategy|
    return strategy.throttled?(jid, *message["args"])
  end

  false
rescue
  false
end

Private Class Methods

preload_constant!(job) click to toggle source

Tries to preload constant by it's name once.

Somehow, sometimes, some classes are not eager loaded upon Rails init, leading to throttling config not being registered prior job perform. And that leaves us with concurrency limit + 1 situation upon Sidekiq server restart (becomes normal after all Sidekiq processes handled at leas onr job of that class).

@return [void]

# File lib/sidekiq/throttled.rb, line 117
def preload_constant!(job)
  MUTEX.synchronize do
    @preloaded      ||= {}
    @preloaded[job] ||= constantize(job) || true
  end
end
setup_strategy!() click to toggle source

@return [void]

# File lib/sidekiq/throttled.rb, line 96
def setup_strategy!
  require "sidekiq/throttled/fetch"

  # https://github.com/mperham/sidekiq/commit/fce05c9d4b4c0411c982078a4cf3a63f20f739bc
  Sidekiq.options[:fetch] =
    if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("6.1.0")
      Sidekiq::Throttled::Fetch
    else
      Sidekiq::Throttled::Fetch.new(Sidekiq.options)
    end
end