class Sisyphus::Master
Constants
- HANDLED_SIGNALS
- IO_TIMEOUT
Attributes
job[R]
logger[R]
number_of_workers[RW]
workers[R]
Public Class Methods
new(job, options = {})
click to toggle source
# File lib/sisyphus/master.rb, line 14 def initialize(job, options = {}) self.number_of_workers = options.fetch :workers, 0 @logger = options.fetch(:logger) { NullLogger.new } @execution_strategy = options.fetch(:execution_strategy) { ForkingExecutionStrategy } @workers = [] @job = job self_reader, self_writer = IO.pipe @selfpipe = { reader: self_reader, writer: self_writer } Thread.main[:signal_queue] = [] end
Public Instance Methods
spawn_worker()
click to toggle source
# File lib/sisyphus/master.rb, line 37 def spawn_worker reader, writer = IO.pipe if wpid = fork writer.close workers << { pid: wpid, reader: reader } else reader.close self.process_name = "Worker #{Process.pid}" worker = create_worker(writer) start_worker worker end end
start()
click to toggle source
# File lib/sisyphus/master.rb, line 27 def start trap_signals number_of_workers.times do spawn_worker sleep rand(1000).fdiv(1000) end puts "Sisyphus::Master started with PID: #{Process.pid}" watch_for_output end
start_worker(worker)
click to toggle source
# File lib/sisyphus/master.rb, line 50 def start_worker(worker) worker.setup worker.start rescue Exception => e worker.error_handler.call logger.warn(process_name) { e } exit! 0 end
stop_all()
click to toggle source
# File lib/sisyphus/master.rb, line 65 def stop_all workers.each do |worker| stop_worker worker.fetch(:pid) end begin Timeout.timeout(30) do watch_for_shutdown while worker_count > 0 end rescue e p "Timeout reached:", e end end
stop_worker(wpid)
click to toggle source
# File lib/sisyphus/master.rb, line 59 def stop_worker(wpid) if workers.find { |w| w.fetch(:pid) == wpid } Process.kill 'INT', wpid rescue Errno::ESRCH # Ignore if the process is already gone end end
worker_count()
click to toggle source
# File lib/sisyphus/master.rb, line 78 def worker_count workers.length end
Private Instance Methods
create_worker(writer)
click to toggle source
# File lib/sisyphus/master.rb, line 87 def create_worker(writer) Worker.new(job, writer, execution_strategy) end
execution_strategy()
click to toggle source
# File lib/sisyphus/master.rb, line 91 def execution_strategy @execution_strategy.new logger end
handle_int()
click to toggle source
# File lib/sisyphus/master.rb, line 185 def handle_int puts "Waiting for workers to stop..." stop stop_all exit 0 end
handle_signal(signal)
click to toggle source
# File lib/sisyphus/master.rb, line 172 def handle_signal(signal) case signal when :INT handle_int when :TTIN handle_ttin when :TTOU handle_ttou else raise "Unknown signal" end end
handle_ttin()
click to toggle source
# File lib/sisyphus/master.rb, line 192 def handle_ttin self.number_of_workers += 1 spawn_worker end
handle_ttou()
click to toggle source
# File lib/sisyphus/master.rb, line 197 def handle_ttou if number_of_workers > 0 self.number_of_workers -= 1 stop_worker(workers.first.fetch(:pid)) end end
process_name()
click to toggle source
# File lib/sisyphus/master.rb, line 216 def process_name $0 end
process_name=(name)
click to toggle source
# File lib/sisyphus/master.rb, line 212 def process_name=(name) $0 = name end
process_output(pipes)
click to toggle source
# File lib/sisyphus/master.rb, line 127 def process_output(pipes) pipes.each do |pipe| respawn_worker worker_pid(pipe) unless stopping? end end
process_pipes(pipes)
click to toggle source
# File lib/sisyphus/master.rb, line 118 def process_pipes(pipes) begin @selfpipe[:reader].read_nonblock(10) if pipes.include?(@selfpipe[:reader]) rescue Errno::EAGAIN, Errno::EINTR # Ignore end process_output(pipes & worker_pipes) unless stopping? end
process_signal_queue()
click to toggle source
# File lib/sisyphus/master.rb, line 114 def process_signal_queue handle_signal(Thread.main[:signal_queue].shift) until Thread.main[:signal_queue].empty? end
queue_signal(signal)
click to toggle source
# File lib/sisyphus/master.rb, line 163 def queue_signal(signal) Thread.main[:signal_queue] << signal @selfpipe[:writer].write_nonblock('.') rescue Errno::EAGAIN # Ignore rescue Errno::EINTR retry end
respawn_worker(wpid)
click to toggle source
# File lib/sisyphus/master.rb, line 133 def respawn_worker(wpid) spawn_worker stop_worker wpid watch_for_shutdown end
stop()
click to toggle source
# File lib/sisyphus/master.rb, line 204 def stop @stopping = true end
stopping?()
click to toggle source
# File lib/sisyphus/master.rb, line 208 def stopping? @stopping end
trap_signals()
click to toggle source
# File lib/sisyphus/master.rb, line 155 def trap_signals HANDLED_SIGNALS.each do |signal| Signal.trap signal do queue_signal signal end end end
watch_for_output()
click to toggle source
# File lib/sisyphus/master.rb, line 104 def watch_for_output loop do ready = IO.select(worker_pipes + [@selfpipe[:reader]], nil, nil, IO_TIMEOUT) if ready process_pipes(ready[0]) process_signal_queue end end end
watch_for_shutdown()
click to toggle source
# File lib/sisyphus/master.rb, line 95 def watch_for_shutdown wpid, _ = Process.wait2 worker = @workers.find { |w| w.fetch(:pid) == wpid } worker.fetch(:reader).close workers.delete worker wpid rescue Errno::ECHILD end
worker_pid(reader)
click to toggle source
# File lib/sisyphus/master.rb, line 147 def worker_pid(reader) if worker = workers.find { |w| w.fetch(:reader).fileno == reader.fileno } worker.fetch(:pid) else raise 'Unknown worker pipe' end end
worker_pipes()
click to toggle source
# File lib/sisyphus/master.rb, line 139 def worker_pipes if worker_count > 0 workers.map { |w| w.fetch(:reader) } else [] end end