module Sidekiq

Sidekiq’s Data API provides a Ruby object model on top of Sidekiq’s runtime data in Redis. This API should never be used within application code for business logic.

The Sidekiq server process never uses this API: all data manipulation is done directly for performance reasons to ensure we are using Redis as efficiently as possible at every callsite.

This file is designed to be required within the user’s deployment script; it should need a bare minimum of dependencies. Usage:

require "sidekiq/deploy"
Sidekiq::Deploy.mark!("Some change")

If you do not pass a label, Sidekiq will try to use the latest git commit info.

Iterable jobs are ones which provide a sequence to process using ‘build_enumerator(*args, cursor: cursor)` and then process each element of that sequence in `each_iteration(item, *args)`.

The job is kicked off as normal:

ProcessUserSet.perform_async(123)

but instead of calling ‘perform`, Sidekiq will call:

enum = ProcessUserSet#build_enumerator(123, cursor:nil)

Your Enumerator must yield ‘(object, updated_cursor)` and Sidekiq will call your `each_iteration` method:

ProcessUserSet#each_iteration(object, 123)

After every iteration, Sidekiq will check for shutdown. If we are stopping, the cursor will be saved to Redis and the job re-queued to pick up the rest of the work upon restart. Your job will get the updated_cursor so it can pick up right where it stopped.

enum = ProcessUserSet#build_enumerator(123, cursor: updated_cursor)

The cursor object must be serializable to JSON.

Note there are several APIs to help you build enumerators for ActiveRecord Relations, CSV files, etc. See sidekiq/job/iterable/*.rb.

This file contains the components which track execution metrics within Sidekiq.

SdNotify is a pure-Ruby implementation of sd_notify(3). It can be used to notify systemd about state changes. Methods of this package are no-op on non-systemd systems (eg. Darwin).

The API maps closely to the original implementation of sd_notify(3), therefore be sure to check the official man pages prior to using SdNotify.

@see www.freedesktop.org/software/systemd/man/sd_notify.html

Sidekiq’s systemd integration allows Sidekiq to inform systemd:

1. when it has successfully started
2. when it is starting shutdown
3. periodically for a liveness check with a watchdog thread

Use ‘Sidekiq.transactional_push!` in your sidekiq.rb initializer

Constants

ClientMiddleware

Server-side middleware must import this Module in order to get access to server resources during ‘call`.

LICENSE
MAJOR
NAME
VERSION
Worker

Include this module in your job class and you can easily create asynchronous jobs:

class HardJob
  include Sidekiq::Job
  sidekiq_options queue: 'critical', retry: 5

  def perform(*args)
    # do some work
  end
end

Then in your Rails app, you can do this:

HardJob.perform_async(1, 2, 3)

Note that perform_async is a class method, perform is an instance method.

Sidekiq::Job also includes several APIs to provide compatibility with ActiveJob.

class SomeJob
  include Sidekiq::Job
  queue_as :critical

  def perform(...)
  end
end

SomeJob.set(wait_until: 1.hour).perform_async(123)

Note that arguments passed to the job must still obey Sidekiq’s best practice for simple, JSON-native data types. Sidekiq will not implement ActiveJob’s more complex argument serialization. For this reason, we don’t implement ‘perform_later` as our call semantics are very different.







Workers

The WorkSet stores the work being done by this Sidekiq cluster. It tracks the process and thread working on each job.

WARNING WARNING WARNING

This is live data that can change every millisecond. If you call size => 5 and then expect each to be called 5 times, you’re going to have a bad time.

works = Sidekiq::WorkSet.new
works.size => 2
works.each do |process_id, thread_id, work|
  # process_id is a unique identifier per Sidekiq process
  # thread_id is a unique identifier per thread
  # work is a Hash which looks like:
  # { 'queue' => name, 'run_at' => timestamp, 'payload' => job_hash }
  # run_at is an epoch Integer.
end

Public Class Methods

configure_client() { |default_configuration| ... } click to toggle source
# File lib/sidekiq.rb, line 136
def self.configure_client
  yield default_configuration unless server?
end
configure_embed() { |cfg| ... } click to toggle source

Creates a Sidekiq::Config instance that is more tuned for embedding within an arbitrary Ruby process. Notably it reduces concurrency by default so there is less contention for CPU time with other threads.

instance = Sidekiq.configure_embed do |config|
  config.queues = %w[critical default low]
end
instance.run
sleep 10
instance.stop

NB: it is really easy to overload a Ruby process with threads due to the GIL. I do not recommend setting concurrency higher than 2-3.

NB: Sidekiq only supports one instance in memory. You will get undefined behavior if you try to embed Sidekiq twice in the same process.

# File lib/sidekiq.rb, line 124
def self.configure_embed(&block)
  raise "Sidekiq global configuration is frozen, you must create all embedded instances BEFORE calling `run`" if @frozen

  require "sidekiq/embedded"
  cfg = default_configuration
  cfg.concurrency = 2
  @config_blocks&.each { |block| block.call(cfg) }
  yield cfg

  Sidekiq::Embedded.new(cfg)
end
configure_server() { |default_configuration| ... } click to toggle source
# File lib/sidekiq.rb, line 97
def self.configure_server(&block)
  (@config_blocks ||= []) << block
  yield default_configuration if server?
end
default_configuration() click to toggle source
# File lib/sidekiq.rb, line 89
def self.default_configuration
  @config ||= Sidekiq::Config.new
end
default_job_options() click to toggle source
# File lib/sidekiq.rb, line 85
def self.default_job_options
  @default_job_options ||= {"retry" => true, "queue" => "default"}
end
default_job_options=(hash) click to toggle source
# File lib/sidekiq.rb, line 81
def self.default_job_options=(hash)
  @default_job_options = default_job_options.merge(hash.transform_keys(&:to_s))
end
dump_json(object) click to toggle source
# File lib/sidekiq.rb, line 57
def self.dump_json(object)
  JSON.generate(object)
end
ent?() click to toggle source
# File lib/sidekiq.rb, line 65
def self.ent?
  defined?(Sidekiq::Enterprise)
end
freeze!() click to toggle source
# File lib/sidekiq.rb, line 102
def self.freeze!
  @frozen = true
  @config_blocks = nil
  default_configuration.freeze!
end
gem_version() click to toggle source
# File lib/sidekiq/version.rb, line 7
def self.gem_version
  Gem::Version.new(VERSION)
end
load_json(string) click to toggle source
# File lib/sidekiq.rb, line 53
def self.load_json(string)
  JSON.parse(string)
end
logger() click to toggle source
# File lib/sidekiq.rb, line 93
def self.logger
  default_configuration.logger
end
pro?() click to toggle source
# File lib/sidekiq.rb, line 61
def self.pro?
  defined?(Sidekiq::Pro)
end
redis(&block) click to toggle source
# File lib/sidekiq.rb, line 73
def self.redis(&block)
  (Thread.current[:sidekiq_capsule] || default_configuration).redis(&block)
end
redis_pool() click to toggle source
# File lib/sidekiq.rb, line 69
def self.redis_pool
  (Thread.current[:sidekiq_capsule] || default_configuration).redis_pool
end
server?() click to toggle source
# File lib/sidekiq.rb, line 49
def self.server?
  defined?(Sidekiq::CLI)
end
start_watchdog() click to toggle source
# File lib/sidekiq/systemd.rb, line 10
def self.start_watchdog
  usec = Integer(ENV["WATCHDOG_USEC"])
  return Sidekiq.logger.error("systemd Watchdog too fast: " + usec) if usec < 1_000_000

  sec_f = usec / 1_000_000.0
  # "It is recommended that a daemon sends a keep-alive notification message
  # to the service manager every half of the time returned here."
  ping_f = sec_f / 2
  Sidekiq.logger.info "Pinging systemd watchdog every #{ping_f.round(1)} sec"
  Thread.new do
    loop do
      sleep ping_f
      Sidekiq::SdNotify.watchdog
    end
  end
end
strict_args!(mode = :raise) click to toggle source
# File lib/sidekiq.rb, line 77
def self.strict_args!(mode = :raise)
  Sidekiq::Config::DEFAULTS[:on_complex_arguments] = mode
end
transactional_push!() click to toggle source
# File lib/sidekiq/transaction_aware_client.rb, line 40
def self.transactional_push!
  begin
    require "after_commit_everywhere"
  rescue LoadError
    raise %q(You need to add `gem "after_commit_everywhere"` to your Gemfile to use Sidekiq's transactional client)
  end

  Sidekiq.default_job_options["client_class"] = Sidekiq::TransactionAwareClient
  Sidekiq::JobUtil::TRANSIENT_ATTRIBUTES << "client_class"
  true
end
❨╯°□°❩╯︵┻━┻() click to toggle source
# File lib/sidekiq.rb, line 45
def self.❨╯°□°❩╯︵┻━┻
  puts "Take a deep breath and count to ten..."
end