class Coolie::Master

Constants

IO_TIMEOUT

Public Class Methods

new(job, options = {}) click to toggle source
# File lib/coolie/master.rb, line 7
def initialize(job, options = {})
  @number_of_workers = options.fetch :workers, 0
  @workers = []
  @job = job
end

Public Instance Methods

start() click to toggle source
# File lib/coolie/master.rb, line 13
def start
  trap_signals
  @number_of_workers.times do
    start_worker
    sleep rand(1000).fdiv(1000)
  end
  puts "Coolie::Master started with PID: #{Process.pid}"
  monitor_workers
end
start_worker() click to toggle source
# File lib/coolie/master.rb, line 23
def start_worker
  reader, writer = IO.pipe
  if wpid = fork
    writer.close
    @workers << { pid: wpid, reader: reader }
  else
    reader.close
    worker = Worker.new(@job, writer)
    self.process_name = "Worker #{Process.pid}"
    worker.start
  end
end
stop_all() click to toggle source
# File lib/coolie/master.rb, line 47
def stop_all
  @workers.each do |worker|
    stop_worker worker.fetch(:pid)
  end
end
stop_worker(wpid) click to toggle source
# File lib/coolie/master.rb, line 36
def stop_worker(wpid)
  if worker = @workers.find { |w| w.fetch(:pid) == wpid }
    Process.kill "INT", wpid
    Process.waitpid2 wpid
    worker.fetch(:reader).close
    @workers.delete worker
  else
    raise "Unknown worker PID: #{wpid}"
  end
end
worker_count() click to toggle source
# File lib/coolie/master.rb, line 53
def worker_count
  @workers.length
end

Private Instance Methods

decrease_workers() click to toggle source
# File lib/coolie/master.rb, line 94
def decrease_workers
  until worker_count == @number_of_workers do
    stop_worker(@workers.first.fetch(:pid))
  end 
end
increase_workers() click to toggle source
# File lib/coolie/master.rb, line 90
def increase_workers
  start_worker until worker_count == @number_of_workers
end
maintain_number_of_workers() click to toggle source
# File lib/coolie/master.rb, line 82
def maintain_number_of_workers
  if worker_count > @number_of_workers
    decrease_workers
  elsif worker_count < @number_of_workers
    increase_workers
  end
end
monitor_workers() click to toggle source
# File lib/coolie/master.rb, line 59
def monitor_workers
  loop do
    restart_workers pids_of_crashed_workers
    maintain_number_of_workers
  end
end
pids_of_crashed_workers() click to toggle source
# File lib/coolie/master.rb, line 66
def pids_of_crashed_workers
  readers = IO.select(worker_pipes, nil, nil, IO_TIMEOUT)
  if readers
    readers.first.map { |reader| worker_pid(reader) }
  else
    []
  end
end
process_name=(name) click to toggle source
# File lib/coolie/master.rb, line 130
def process_name=(name)
  $0 = name
end
restart_workers(worker_pids) click to toggle source
# File lib/coolie/master.rb, line 75
def restart_workers(worker_pids)
  worker_pids.each do |wpid|
    stop_worker wpid
    start_worker
  end
end
trap_signals() click to toggle source
# File lib/coolie/master.rb, line 116
def trap_signals
  Signal.trap 'INT' do
    puts "Waiting for workers to stop"
    stop_all
    exit 0
  end
  Signal.trap 'TTIN' do
    @number_of_workers += 1
  end
  Signal.trap 'TTOU' do
    @number_of_workers -= 1 if @number_of_workers > 0
  end
end
worker_pid(reader) click to toggle source
# File lib/coolie/master.rb, line 108
def worker_pid(reader)
  if worker = @workers.find { |w| w.fetch(:reader).fileno == reader.fileno }
    worker.fetch(:pid)
  else
    raise 'Unknown worker pipe'
  end
end
worker_pipes() click to toggle source
# File lib/coolie/master.rb, line 100
def worker_pipes
  if worker_count > 0
    @workers.map { |w| w.fetch(:reader) }
  else
    nil
  end
end