class Sidekiq::Launcher
The Launcher
starts the Capsule
Managers, the Poller thread and provides the process heartbeat.
Constants
- BEAT_PAUSE
- MEMORY_GRABBER
- PROCTITLES
- RTT_READINGS
We run the heartbeat every five seconds. Capture five samples of RTT, log a warning if each sample is above our warning threshold.
- RTT_WARNING_LEVEL
- STATS_TTL
Attributes
managers[RW]
poller[RW]
Public Class Methods
new(config, embedded: false)
click to toggle source
# File lib/sidekiq/launcher.rb, line 25 def initialize(config, embedded: false) @config = config @embedded = embedded @managers = config.capsules.values.map do |cap| Sidekiq::Manager.new(cap) end @poller = Sidekiq::Scheduled::Poller.new(@config) @done = false end
Public Instance Methods
heartbeat()
click to toggle source
If embedding Sidekiq
, you can have the process heartbeat call this method to regularly heartbeat rather than creating a separate thread.
# File lib/sidekiq/launcher.rb, line 80 def heartbeat ❤ end
quiet()
click to toggle source
Stops this instance from processing any more jobs,
# File lib/sidekiq/launcher.rb, line 47 def quiet return if @done @done = true @managers.each(&:quiet) @poller.terminate fire_event(:quiet, reverse: true) end
run(async_beat: true)
click to toggle source
Start this Sidekiq
instance. If an embedding process already has a heartbeat thread, caller can use ‘async_beat: false` and instead have thread call Launcher#heartbeat
every N seconds.
# File lib/sidekiq/launcher.rb, line 38 def run(async_beat: true) logger.debug { @config.merge!({}) } Sidekiq.freeze! @thread = safe_thread("heartbeat", &method(:start_heartbeat)) if async_beat @poller.start @managers.each(&:start) end
stop()
click to toggle source
Shuts down this Sidekiq
instance. Waits up to the deadline for all jobs to complete.
# File lib/sidekiq/launcher.rb, line 57 def stop deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @config[:timeout] quiet stoppers = @managers.map do |mgr| Thread.new do mgr.stop(deadline) end end fire_event(:shutdown, reverse: true) stoppers.each(&:join) clear_heartbeat end
stopping?()
click to toggle source
# File lib/sidekiq/launcher.rb, line 73 def stopping? @done end
Private Instance Methods
beat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 96 def beat $0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") unless @embedded ❤ end
check_rtt()
click to toggle source
# File lib/sidekiq/launcher.rb, line 202 def check_rtt a = b = 0 redis do |x| a = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) x.ping b = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) end rtt = b - a RTT_READINGS << rtt # Ideal RTT for Redis is < 1000µs # Workable is < 10,000µs # Log a warning if it's a disaster. if RTT_READINGS.all? { |x| x > RTT_WARNING_LEVEL } logger.warn <<~EOM Your Redis network connection is performing extremely poorly. Last RTT readings were #{RTT_READINGS.buffer.inspect}, ideally these should be < 1000. Ensure Redis is running in the same AZ or datacenter as Sidekiq. If these values are close to 100,000, that means your Sidekiq process may be CPU-saturated; reduce your concurrency and/or see https://github.com/sidekiq/sidekiq/discussions/5039 EOM RTT_READINGS.reset end rtt end
clear_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 101 def clear_heartbeat flush_stats # Remove record from Redis since we are shutting down. # Note we don't stop the heartbeat thread; if the process # doesn't actually exit, it'll reappear in the Web UI. redis do |conn| conn.pipelined do |pipeline| pipeline.srem("processes", [identity]) pipeline.unlink("#{identity}:work") end end rescue # best effort, ignore network errors end
flush_stats()
click to toggle source
# File lib/sidekiq/launcher.rb, line 117 def flush_stats fails = Processor::FAILURE.reset procd = Processor::PROCESSED.reset return if fails + procd == 0 nowdate = Time.now.utc.strftime("%Y-%m-%d") begin redis do |conn| conn.pipelined do |pipeline| pipeline.incrby("stat:processed", procd) pipeline.incrby("stat:processed:#{nowdate}", procd) pipeline.expire("stat:processed:#{nowdate}", STATS_TTL) pipeline.incrby("stat:failed", fails) pipeline.incrby("stat:failed:#{nowdate}", fails) pipeline.expire("stat:failed:#{nowdate}", STATS_TTL) end end rescue => ex logger.warn("Unable to flush stats: #{ex}") end end
memory_usage(pid)
click to toggle source
# File lib/sidekiq/launcher.rb, line 243 def memory_usage(pid) MEMORY_GRABBER.call(pid) end
start_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 88 def start_heartbeat loop do beat sleep BEAT_PAUSE end logger.info("Heartbeat stopping...") end
to_data()
click to toggle source
# File lib/sidekiq/launcher.rb, line 247 def to_data @data ||= { "hostname" => hostname, "started_at" => Time.now.to_f, "pid" => ::Process.pid, "tag" => @config[:tag] || "", "concurrency" => @config.total_concurrency, "queues" => @config.capsules.values.flat_map { |cap| cap.queues }.uniq, "weights" => to_weights, "labels" => @config[:labels].to_a, "identity" => identity, "version" => Sidekiq::VERSION, "embedded" => @embedded } end
to_json()
click to toggle source
# File lib/sidekiq/launcher.rb, line 267 def to_json # this data changes infrequently so dump it to a string # now so we don't need to dump it every heartbeat. @json ||= Sidekiq.dump_json(to_data) end
to_weights()
click to toggle source
# File lib/sidekiq/launcher.rb, line 263 def to_weights @config.capsules.values.map(&:weights) end
❤()
click to toggle source
# File lib/sidekiq/launcher.rb, line 140 def ❤ key = identity fails = procd = 0 begin flush_stats curstate = Processor::WORK_STATE.dup curstate.transform_values! { |val| Sidekiq.dump_json(val) } redis do |conn| # work is the current set of executing jobs work_key = "#{key}:work" conn.multi do |transaction| transaction.unlink(work_key) if curstate.size > 0 transaction.hset(work_key, curstate) transaction.expire(work_key, 60) end end end rtt = check_rtt fails = procd = 0 kb = memory_usage(::Process.pid) _, exists, _, _, signal = redis { |conn| conn.multi { |transaction| transaction.sadd("processes", [key]) transaction.exists(key) transaction.hset(key, "info", to_json, "busy", curstate.size, "beat", Time.now.to_f, "rtt_us", rtt, "quiet", @done.to_s, "rss", kb) transaction.expire(key, 60) transaction.rpop("#{key}-signals") } } # first heartbeat or recovering from an outage and need to reestablish our heartbeat fire_event(:heartbeat) unless exists > 0 fire_event(:beat, oneshot: false) ::Process.kill(signal, ::Process.pid) if signal && !@embedded rescue => e # ignore all redis/network issues logger.error("heartbeat: #{e}") # don't lose the counts if there was a network issue Processor::PROCESSED.incr(procd) Processor::FAILURE.incr(fails) end end