class Sidekiq::Launcher
The Launcher
starts the Manager
and Poller threads and provides the process heartbeat.
Constants
- BEAT_PAUSE
- MEMORY_GRABBER
- PROCTITLES
- RTT_READINGS
We run the heartbeat every five seconds. Capture five samples of RTT, log a warning if each sample is above our warning threshold.
- RTT_WARNING_LEVEL
- STATS_TTL
Attributes
fetcher[RW]
manager[RW]
poller[RW]
Public Class Methods
new(options)
click to toggle source
# File lib/sidekiq/launcher.rb, line 25 def initialize(options) @config = options options[:fetch] ||= BasicFetch.new(options) @manager = Sidekiq::Manager.new(options) @poller = Sidekiq::Scheduled::Poller.new(options) @done = false end
Public Instance Methods
quiet()
click to toggle source
Stops this instance from processing any more jobs,
# File lib/sidekiq/launcher.rb, line 41 def quiet @done = true @manager.quiet @poller.terminate end
run()
click to toggle source
# File lib/sidekiq/launcher.rb, line 33 def run @thread = safe_thread("heartbeat", &method(:start_heartbeat)) @poller.start @manager.start end
stop()
click to toggle source
Shuts down this Sidekiq
instance. Waits up to the deadline for all jobs to complete.
# File lib/sidekiq/launcher.rb, line 48 def stop deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @config[:timeout] @done = true @manager.quiet @poller.terminate @manager.stop(deadline) # Requeue everything in case there was a thread which fetched a job while the process was stopped. # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. strategy = @config[:fetch] strategy.bulk_requeue([], @config) clear_heartbeat end
stopping?()
click to toggle source
# File lib/sidekiq/launcher.rb, line 65 def stopping? @done end
Private Instance Methods
check_rtt()
click to toggle source
# File lib/sidekiq/launcher.rb, line 203 def check_rtt a = b = 0 redis do |x| a = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) x.ping b = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) end rtt = b - a RTT_READINGS << rtt # Ideal RTT for Redis is < 1000µs # Workable is < 10,000µs # Log a warning if it's a disaster. if RTT_READINGS.all? { |x| x > RTT_WARNING_LEVEL } logger.warn <<~EOM Your Redis network connection is performing extremely poorly. Last RTT readings were #{RTT_READINGS.buffer.inspect}, ideally these should be < 1000. Ensure Redis is running in the same AZ or datacenter as Sidekiq. If these values are close to 100,000, that means your Sidekiq process may be CPU-saturated; reduce your concurrency and/or see https://github.com/mperham/sidekiq/discussions/5039 EOM RTT_READINGS.reset end rtt end
clear_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 81 def clear_heartbeat flush_stats # Remove record from Redis since we are shutting down. # Note we don't stop the heartbeat thread; if the process # doesn't actually exit, it'll reappear in the Web UI. redis do |conn| conn.pipelined do |pipeline| pipeline.srem("processes", [identity]) pipeline.unlink("#{identity}:work") end end rescue # best effort, ignore network errors end
flush_stats()
click to toggle source
# File lib/sidekiq/launcher.rb, line 103 def flush_stats fails = Processor::FAILURE.reset procd = Processor::PROCESSED.reset return if fails + procd == 0 nowdate = Time.now.utc.strftime("%Y-%m-%d") begin Sidekiq.redis do |conn| conn.pipelined do |pipeline| pipeline.incrby("stat:processed", procd) pipeline.incrby("stat:processed:#{nowdate}", procd) pipeline.expire("stat:processed:#{nowdate}", STATS_TTL) pipeline.incrby("stat:failed", fails) pipeline.incrby("stat:failed:#{nowdate}", fails) pipeline.expire("stat:failed:#{nowdate}", STATS_TTL) end end rescue => ex # we're exiting the process, things might be shut down so don't # try to handle the exception Sidekiq.logger.warn("Unable to flush stats: #{ex}") end end
heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 97 def heartbeat $0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") ❤ end
memory_usage(pid)
click to toggle source
# File lib/sidekiq/launcher.rb, line 244 def memory_usage(pid) MEMORY_GRABBER.call(pid) end
start_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 73 def start_heartbeat loop do heartbeat sleep BEAT_PAUSE end logger.info("Heartbeat stopping...") end
to_data()
click to toggle source
# File lib/sidekiq/launcher.rb, line 248 def to_data @data ||= { "hostname" => hostname, "started_at" => Time.now.to_f, "pid" => ::Process.pid, "tag" => @config[:tag] || "", "concurrency" => @config[:concurrency], "queues" => @config[:queues].uniq, "labels" => @config[:labels], "identity" => identity } end
to_json()
click to toggle source
# File lib/sidekiq/launcher.rb, line 261 def to_json # this data changes infrequently so dump it to a string # now so we don't need to dump it every heartbeat. @json ||= Sidekiq.dump_json(to_data) end
❤()
click to toggle source
# File lib/sidekiq/launcher.rb, line 128 def ❤ key = identity fails = procd = 0 begin fails = Processor::FAILURE.reset procd = Processor::PROCESSED.reset curstate = Processor::WORK_STATE.dup nowdate = Time.now.utc.strftime("%Y-%m-%d") redis do |conn| conn.multi do |transaction| transaction.incrby("stat:processed", procd) transaction.incrby("stat:processed:#{nowdate}", procd) transaction.expire("stat:processed:#{nowdate}", STATS_TTL) transaction.incrby("stat:failed", fails) transaction.incrby("stat:failed:#{nowdate}", fails) transaction.expire("stat:failed:#{nowdate}", STATS_TTL) end # work is the current set of executing jobs work_key = "#{key}:work" conn.pipelined do |transaction| transaction.unlink(work_key) curstate.each_pair do |tid, hash| transaction.hset(work_key, tid, Sidekiq.dump_json(hash)) end transaction.expire(work_key, 60) end end rtt = check_rtt fails = procd = 0 kb = memory_usage(::Process.pid) _, exists, _, _, msg = redis { |conn| conn.multi { |transaction| transaction.sadd("processes", [key]) transaction.exists?(key) transaction.hmset(key, "info", to_json, "busy", curstate.size, "beat", Time.now.to_f, "rtt_us", rtt, "quiet", @done.to_s, "rss", kb) transaction.expire(key, 60) transaction.rpop("#{key}-signals") } } # first heartbeat or recovering from an outage and need to reestablish our heartbeat fire_event(:heartbeat) unless exists fire_event(:beat, oneshot: false) return unless msg ::Process.kill(msg, ::Process.pid) rescue => e # ignore all redis/network issues logger.error("heartbeat: #{e}") # don't lose the counts if there was a network issue Processor::PROCESSED.incr(procd) Processor::FAILURE.incr(fails) end end