class Resqued::Master
The master process.
-
Spawns a listener.
-
Tracks all work. (IO pipe from listener.)
-
Handles signals.
Constants
- OPTIONAL_SIGNALS
- OTHER_SIGNALS
- SIGNALS
- SIGNAL_QUEUE
- TRAPS
Public Class Methods
new(state, options = {})
click to toggle source
# File lib/resqued/master.rb, line 21 def initialize(state, options = {}) @state = state @status_pipe = options.fetch(:status_pipe, nil) @listeners = ListenerPool.new(state) @listener_backoff = Backoff.new end
Public Instance Methods
dump_object_counts()
click to toggle source
Private.
# File lib/resqued/master.rb, line 84 def dump_object_counts log GC.stat.inspect counts = {} total = 0 ObjectSpace.each_object do |o| count = counts[o.class.name] || 0 counts[o.class.name] = count + 1 total += 1 end top = 10 log "#{total} objects. top #{top}:" counts.sort_by { |_, count| -count }.each_with_index do |(name, count), i| next unless i < top diff = "" if last = @last_counts && @last_counts[name] diff = sprintf(" (%+d)", (count - last)) end log " #{count} #{name}#{diff}" end @last_counts = counts log GC.stat.inspect rescue => e log "Error while counting objects: #{e}" end
go_ham()
click to toggle source
Private: dat main loop.
# File lib/resqued/master.rb, line 44 def go_ham # If we're resuming, we'll want to recycle the existing listener now. prepare_new_listener loop do read_listeners reap_all_listeners(Process::WNOHANG) start_listener unless @state.paused case signal = SIGNAL_QUEUE.shift when nil yawn(@listener_backoff.how_long? || 30.0) when :INFO dump_object_counts when :HUP if @state.exec_on_hup log "Execing a new master" ExecOnHUP.exec!(@state) end reopen_logs log "Restarting listener with new configuration and application." prepare_new_listener when :USR2 log "Pause job processing" @state.paused = true kill_listener(:QUIT, @listeners.current) @listeners.clear_current! when :CONT log "Resume job processing" @state.paused = false kill_all_listeners(:CONT) when :INT, :TERM, :QUIT log "Shutting down..." kill_all_listeners(signal) wait_for_workers unless @state.fast_exit break end end end
install_signal_handlers()
click to toggle source
# File lib/resqued/master.rb, line 225 def install_signal_handlers trap(:CHLD) { awake } SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } } OPTIONAL_SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } rescue nil } end
kill_all_listeners(signal)
click to toggle source
# File lib/resqued/master.rb, line 179 def kill_all_listeners(signal) @listeners.each do |l| l.kill(signal) end end
kill_listener(signal, listener)
click to toggle source
# File lib/resqued/master.rb, line 175 def kill_listener(signal, listener) listener&.kill(signal) end
listener_running(listener)
click to toggle source
Listener
message: A listener finished booting, and is ready to start workers.
Promotes a booting listener to be the current listener.
# File lib/resqued/master.rb, line 144 def listener_running(listener) listener_status(listener, "ready") if listener == @listeners.current kill_listener(:QUIT, @listeners.last_good) @listeners.clear_last_good! else # This listener didn't receive the last SIGQUIT we sent. # (It was probably sent before the listener had set up its traps.) # So kill it again. We have moved on. kill_listener(:QUIT, listener) end end
listener_status(listener, status)
click to toggle source
# File lib/resqued/master.rb, line 252 def listener_status(listener, status) if listener&.pid status_message("listener", listener.pid, status) end end
no_more_unexpected_exits()
click to toggle source
# File lib/resqued/master.rb, line 240 def no_more_unexpected_exits trap("EXIT", "DEFAULT") end
prepare_new_listener()
click to toggle source
Private: Spin up a new listener.
The old one will be killed when the new one is ready for workers.
# File lib/resqued/master.rb, line 160 def prepare_new_listener if @listeners.last_good # The last good listener is still running because we got another # HUP before the new listener finished booting. # Keep the last_good_listener (where all the workers are) and # kill the booting current_listener. We'll start a new one. kill_listener(:QUIT, @listeners.current) # Indicate to `start_listener` that it should start a new # listener. @listeners.clear_current! else @listeners.cycle_current end end
read_listeners()
click to toggle source
# File lib/resqued/master.rb, line 119 def read_listeners @listeners.each do |l| l.read_worker_status(on_activity: self) end end
reap_all_listeners(waitpid_flags = 0)
click to toggle source
# File lib/resqued/master.rb, line 189 def reap_all_listeners(waitpid_flags = 0) until @listeners.empty? begin lpid, status = Process.waitpid2(-1, waitpid_flags) return unless lpid log "Listener exited #{status}" if @listeners.current_pid == lpid @listener_backoff.died @listeners.clear_current! end if @listeners.last_good_pid == lpid @listeners.clear_last_good! end if dead_listener = @listeners.delete(lpid) listener_status dead_listener, "stop" dead_listener.dispose end write_procline rescue Errno::ECHILD return end end end
report_unexpected_exits()
click to toggle source
# File lib/resqued/master.rb, line 231 def report_unexpected_exits trap("EXIT") do log("EXIT #{$!.inspect}") $!&.backtrace&.each do |line| log(line) end end end
run(ready_pipe = nil)
click to toggle source
Public: Starts the master process.
# File lib/resqued/master.rb, line 29 def run(ready_pipe = nil) report_unexpected_exits with_pidfile(@state.pidfile) do write_procline install_signal_handlers if ready_pipe ready_pipe.syswrite($$.to_s) ready_pipe.close rescue nil end go_ham end no_more_unexpected_exits end
start_listener()
click to toggle source
# File lib/resqued/master.rb, line 110 def start_listener return if @listeners.current || @listener_backoff.wait? listener = @listeners.start! listener_status listener, "start" @listener_backoff.started write_procline end
status_message(type, pid, status)
click to toggle source
# File lib/resqued/master.rb, line 262 def status_message(type, pid, status) @status_pipe&.write("#{type},#{pid},#{status}\n") end
wait_for_workers()
click to toggle source
# File lib/resqued/master.rb, line 185 def wait_for_workers reap_all_listeners end
worker_finished(pid)
click to toggle source
Listener
message: A worker just stopped working.
Forwards the message to the other listeners.
# File lib/resqued/master.rb, line 133 def worker_finished(pid) worker_status(pid, "stop") @listeners.each do |other| other.worker_finished(pid) end end
worker_started(pid)
click to toggle source
Listener
message: A worker just started working.
# File lib/resqued/master.rb, line 126 def worker_started(pid) worker_status(pid, "start") end
worker_status(pid, status)
click to toggle source
# File lib/resqued/master.rb, line 258 def worker_status(pid, status) status_message("worker", pid, status) end
write_procline()
click to toggle source
# File lib/resqued/master.rb, line 248 def write_procline $0 = "#{procline_version} master [gen #{@state.listeners_created}] [#{@listeners.size} running] #{ARGV.join(' ')}" end
yawn(duration)
click to toggle source
Calls superclass method
Resqued::Sleepy#yawn
# File lib/resqued/master.rb, line 244 def yawn(duration) super(duration, @listeners.map { |l| l.read_pipe }) end