module Sidekiq
Sidekiq’s Data API provides a Ruby object model on top of Sidekiq’s runtime data in Redis. This API should never be used within application code for business logic.
The Sidekiq
server process never uses this API: all data manipulation is done directly for performance reasons to ensure we are using Redis as efficiently as possible at every callsite.
This file is designed to be required within the user’s deployment script; it should need a bare minimum of dependencies. Usage:
require "sidekiq/deploy" Sidekiq::Deploy.mark!("Some change")
If you do not pass a label, Sidekiq
will try to use the latest git commit info.
Iterable jobs are ones which provide a sequence to process using ‘build_enumerator(*args, cursor: cursor)` and then process each element of that sequence in `each_iteration(item, *args)`.
The job is kicked off as normal:
ProcessUserSet.perform_async(123)
but instead of calling ‘perform`, Sidekiq
will call:
enum = ProcessUserSet#build_enumerator(123, cursor:nil)
Your Enumerator must yield ‘(object, updated_cursor)` and Sidekiq
will call your `each_iteration` method:
ProcessUserSet#each_iteration(object, 123)
After every iteration, Sidekiq
will check for shutdown. If we are stopping, the cursor will be saved to Redis and the job re-queued to pick up the rest of the work upon restart. Your job will get the updated_cursor so it can pick up right where it stopped.
enum = ProcessUserSet#build_enumerator(123, cursor: updated_cursor)
The cursor object must be serializable to JSON.
Note there are several APIs to help you build enumerators for ActiveRecord Relations, CSV files, etc. See sidekiq/job/iterable/*.rb.
This file contains the components which track execution metrics within Sidekiq
.
SdNotify
is a pure-Ruby implementation of sd_notify(3). It can be used to notify systemd about state changes. Methods of this package are no-op on non-systemd systems (eg. Darwin).
The API maps closely to the original implementation of sd_notify(3), therefore be sure to check the official man pages prior to using SdNotify
.
@see www.freedesktop.org/software/systemd/man/sd_notify.html
Sidekiq’s systemd integration allows Sidekiq
to inform systemd:
1. when it has successfully started 2. when it is starting shutdown 3. periodically for a liveness check with a watchdog thread
Use ‘Sidekiq.transactional_push!` in your sidekiq.rb initializer
Constants
- ClientMiddleware
Server-side middleware must import this Module in order to get access to server resources during ‘call`.
- LICENSE
- MAJOR
- NAME
- VERSION
- Worker
Include this module in your job class and you can easily create asynchronous jobs:
class HardJob include Sidekiq::Job sidekiq_options queue: 'critical', retry: 5 def perform(*args) # do some work end end
Then in your
Rails
app, you can do this:HardJob.perform_async(1, 2, 3)
Note that perform_async is a class method, perform is an instance method.
Sidekiq::Job
also includes several APIs to provide compatibility withActiveJob
.class SomeJob include Sidekiq::Job queue_as :critical def perform(...) end end SomeJob.set(wait_until: 1.hour).perform_async(123)
Note that arguments passed to the job must still obey Sidekiq’s best practice for simple, JSON-native data types.
Sidekiq
will not implement ActiveJob’s more complex argument serialization. For this reason, we don’t implement ‘perform_later` as our call semantics are very different.
- Workers
The
WorkSet
stores the work being done by thisSidekiq
cluster. It tracks the process and thread working on each job.WARNING WARNING WARNING
This is live data that can change every millisecond. If you call size => 5 and then expect each to be called 5 times, you’re going to have a bad time.
works = Sidekiq::WorkSet.new works.size => 2 works.each do |process_id, thread_id, work| # process_id is a unique identifier per Sidekiq process # thread_id is a unique identifier per thread # work is a Hash which looks like: # { 'queue' => name, 'run_at' => timestamp, 'payload' => job_hash } # run_at is an epoch Integer. end
Public Class Methods
# File lib/sidekiq.rb, line 136 def self.configure_client yield default_configuration unless server? end
Creates a Sidekiq::Config
instance that is more tuned for embedding within an arbitrary Ruby process. Notably it reduces concurrency by default so there is less contention for CPU time with other threads.
instance = Sidekiq.configure_embed do |config| config.queues = %w[critical default low] end instance.run sleep 10 instance.stop
NB: it is really easy to overload a Ruby process with threads due to the GIL. I do not recommend setting concurrency higher than 2-3.
NB: Sidekiq
only supports one instance in memory. You will get undefined behavior if you try to embed Sidekiq
twice in the same process.
# File lib/sidekiq.rb, line 124 def self.configure_embed(&block) raise "Sidekiq global configuration is frozen, you must create all embedded instances BEFORE calling `run`" if @frozen require "sidekiq/embedded" cfg = default_configuration cfg.concurrency = 2 @config_blocks&.each { |block| block.call(cfg) } yield cfg Sidekiq::Embedded.new(cfg) end
# File lib/sidekiq.rb, line 97 def self.configure_server(&block) (@config_blocks ||= []) << block yield default_configuration if server? end
# File lib/sidekiq.rb, line 89 def self.default_configuration @config ||= Sidekiq::Config.new end
# File lib/sidekiq.rb, line 85 def self.default_job_options @default_job_options ||= {"retry" => true, "queue" => "default"} end
# File lib/sidekiq.rb, line 81 def self.default_job_options=(hash) @default_job_options = default_job_options.merge(hash.transform_keys(&:to_s)) end
# File lib/sidekiq.rb, line 57 def self.dump_json(object) JSON.generate(object) end
# File lib/sidekiq.rb, line 65 def self.ent? defined?(Sidekiq::Enterprise) end
# File lib/sidekiq.rb, line 102 def self.freeze! @frozen = true @config_blocks = nil default_configuration.freeze! end
# File lib/sidekiq/version.rb, line 7 def self.gem_version Gem::Version.new(VERSION) end
# File lib/sidekiq.rb, line 53 def self.load_json(string) JSON.parse(string) end
# File lib/sidekiq.rb, line 93 def self.logger default_configuration.logger end
# File lib/sidekiq.rb, line 61 def self.pro? defined?(Sidekiq::Pro) end
# File lib/sidekiq.rb, line 73 def self.redis(&block) (Thread.current[:sidekiq_capsule] || default_configuration).redis(&block) end
# File lib/sidekiq.rb, line 69 def self.redis_pool (Thread.current[:sidekiq_capsule] || default_configuration).redis_pool end
# File lib/sidekiq.rb, line 49 def self.server? defined?(Sidekiq::CLI) end
# File lib/sidekiq/systemd.rb, line 10 def self.start_watchdog usec = Integer(ENV["WATCHDOG_USEC"]) return Sidekiq.logger.error("systemd Watchdog too fast: " + usec) if usec < 1_000_000 sec_f = usec / 1_000_000.0 # "It is recommended that a daemon sends a keep-alive notification message # to the service manager every half of the time returned here." ping_f = sec_f / 2 Sidekiq.logger.info "Pinging systemd watchdog every #{ping_f.round(1)} sec" Thread.new do loop do sleep ping_f Sidekiq::SdNotify.watchdog end end end
# File lib/sidekiq.rb, line 77 def self.strict_args!(mode = :raise) Sidekiq::Config::DEFAULTS[:on_complex_arguments] = mode end
# File lib/sidekiq/transaction_aware_client.rb, line 40 def self.transactional_push! begin require "after_commit_everywhere" rescue LoadError raise %q(You need to add `gem "after_commit_everywhere"` to your Gemfile to use Sidekiq's transactional client) end Sidekiq.default_job_options["client_class"] = Sidekiq::TransactionAwareClient Sidekiq::JobUtil::TRANSIENT_ATTRIBUTES << "client_class" true end
# File lib/sidekiq.rb, line 45 def self.❨╯°□°❩╯︵┻━┻ puts "Take a deep breath and count to ten..." end