class Coolie::Master
Constants
- IO_TIMEOUT
Public Class Methods
new(job, options = {})
click to toggle source
# File lib/coolie/master.rb, line 7 def initialize(job, options = {}) @number_of_workers = options.fetch :workers, 0 @workers = [] @job = job end
Public Instance Methods
start()
click to toggle source
# File lib/coolie/master.rb, line 13 def start trap_signals @number_of_workers.times do start_worker sleep rand(1000).fdiv(1000) end puts "Coolie::Master started with PID: #{Process.pid}" monitor_workers end
start_worker()
click to toggle source
# File lib/coolie/master.rb, line 23 def start_worker reader, writer = IO.pipe if wpid = fork writer.close @workers << { pid: wpid, reader: reader } else reader.close worker = Worker.new(@job, writer) self.process_name = "Worker #{Process.pid}" worker.start end end
stop_all()
click to toggle source
# File lib/coolie/master.rb, line 47 def stop_all @workers.each do |worker| stop_worker worker.fetch(:pid) end end
stop_worker(wpid)
click to toggle source
# File lib/coolie/master.rb, line 36 def stop_worker(wpid) if worker = @workers.find { |w| w.fetch(:pid) == wpid } Process.kill "INT", wpid Process.waitpid2 wpid worker.fetch(:reader).close @workers.delete worker else raise "Unknown worker PID: #{wpid}" end end
worker_count()
click to toggle source
# File lib/coolie/master.rb, line 53 def worker_count @workers.length end
Private Instance Methods
decrease_workers()
click to toggle source
# File lib/coolie/master.rb, line 94 def decrease_workers until worker_count == @number_of_workers do stop_worker(@workers.first.fetch(:pid)) end end
increase_workers()
click to toggle source
# File lib/coolie/master.rb, line 90 def increase_workers start_worker until worker_count == @number_of_workers end
maintain_number_of_workers()
click to toggle source
# File lib/coolie/master.rb, line 82 def maintain_number_of_workers if worker_count > @number_of_workers decrease_workers elsif worker_count < @number_of_workers increase_workers end end
monitor_workers()
click to toggle source
# File lib/coolie/master.rb, line 59 def monitor_workers loop do restart_workers pids_of_crashed_workers maintain_number_of_workers end end
pids_of_crashed_workers()
click to toggle source
# File lib/coolie/master.rb, line 66 def pids_of_crashed_workers readers = IO.select(worker_pipes, nil, nil, IO_TIMEOUT) if readers readers.first.map { |reader| worker_pid(reader) } else [] end end
process_name=(name)
click to toggle source
# File lib/coolie/master.rb, line 130 def process_name=(name) $0 = name end
restart_workers(worker_pids)
click to toggle source
# File lib/coolie/master.rb, line 75 def restart_workers(worker_pids) worker_pids.each do |wpid| stop_worker wpid start_worker end end
trap_signals()
click to toggle source
# File lib/coolie/master.rb, line 116 def trap_signals Signal.trap 'INT' do puts "Waiting for workers to stop" stop_all exit 0 end Signal.trap 'TTIN' do @number_of_workers += 1 end Signal.trap 'TTOU' do @number_of_workers -= 1 if @number_of_workers > 0 end end
worker_pid(reader)
click to toggle source
# File lib/coolie/master.rb, line 108 def worker_pid(reader) if worker = @workers.find { |w| w.fetch(:reader).fileno == reader.fileno } worker.fetch(:pid) else raise 'Unknown worker pipe' end end
worker_pipes()
click to toggle source
# File lib/coolie/master.rb, line 100 def worker_pipes if worker_count > 0 @workers.map { |w| w.fetch(:reader) } else nil end end