class Sisyphus::Master

Constants

HANDLED_SIGNALS
IO_TIMEOUT

Attributes

job[R]
logger[R]
number_of_workers[RW]
workers[R]

Public Class Methods

new(job, options = {}) click to toggle source
# File lib/sisyphus/master.rb, line 14
def initialize(job, options = {})
  self.number_of_workers = options.fetch :workers, 0
  @logger = options.fetch(:logger) { NullLogger.new }
  @execution_strategy = options.fetch(:execution_strategy) { ForkingExecutionStrategy }
  @workers = []
  @job = job

  self_reader, self_writer = IO.pipe
  @selfpipe = { reader: self_reader, writer: self_writer }

  Thread.main[:signal_queue] = []
end

Public Instance Methods

spawn_worker() click to toggle source
# File lib/sisyphus/master.rb, line 37
def spawn_worker
  reader, writer = IO.pipe
  if wpid = fork
    writer.close
    workers << { pid: wpid, reader: reader }
  else
    reader.close
    self.process_name = "Worker #{Process.pid}"
    worker = create_worker(writer)
    start_worker worker
  end
end
start() click to toggle source
# File lib/sisyphus/master.rb, line 27
def start
  trap_signals
  number_of_workers.times do
    spawn_worker
    sleep rand(1000).fdiv(1000)
  end
  puts "Sisyphus::Master started with PID: #{Process.pid}"
  watch_for_output
end
start_worker(worker) click to toggle source
# File lib/sisyphus/master.rb, line 50
def start_worker(worker)
  worker.setup
  worker.start
rescue Exception => e
  worker.error_handler.call
  logger.warn(process_name) { e }
  exit! 0
end
stop_all() click to toggle source
# File lib/sisyphus/master.rb, line 65
def stop_all
  workers.each do |worker|
    stop_worker worker.fetch(:pid)
  end
  begin
    Timeout.timeout(30) do
      watch_for_shutdown while worker_count > 0
    end
  rescue e
    p "Timeout reached:", e
  end
end
stop_worker(wpid) click to toggle source
# File lib/sisyphus/master.rb, line 59
def stop_worker(wpid)
  if workers.find { |w| w.fetch(:pid) == wpid }
    Process.kill 'INT', wpid rescue Errno::ESRCH # Ignore if the process is already gone
  end
end
worker_count() click to toggle source
# File lib/sisyphus/master.rb, line 78
def worker_count
  workers.length
end

Private Instance Methods

create_worker(writer) click to toggle source
# File lib/sisyphus/master.rb, line 87
def create_worker(writer)
  Worker.new(job, writer, execution_strategy)
end
execution_strategy() click to toggle source
# File lib/sisyphus/master.rb, line 91
def execution_strategy
  @execution_strategy.new logger
end
handle_int() click to toggle source
# File lib/sisyphus/master.rb, line 185
def handle_int
  puts "Waiting for workers to stop..."
  stop
  stop_all
  exit 0
end
handle_signal(signal) click to toggle source
# File lib/sisyphus/master.rb, line 172
def handle_signal(signal)
  case signal
  when :INT
    handle_int
  when :TTIN
    handle_ttin
  when :TTOU
    handle_ttou
  else
    raise "Unknown signal"
  end
end
handle_ttin() click to toggle source
# File lib/sisyphus/master.rb, line 192
def handle_ttin
  self.number_of_workers += 1
  spawn_worker
end
handle_ttou() click to toggle source
# File lib/sisyphus/master.rb, line 197
def handle_ttou
  if number_of_workers > 0
    self.number_of_workers -= 1
    stop_worker(workers.first.fetch(:pid))
  end
end
process_name() click to toggle source
# File lib/sisyphus/master.rb, line 216
def process_name
  $0
end
process_name=(name) click to toggle source
# File lib/sisyphus/master.rb, line 212
def process_name=(name)
  $0 = name
end
process_output(pipes) click to toggle source
# File lib/sisyphus/master.rb, line 127
def process_output(pipes)
  pipes.each do |pipe|
    respawn_worker worker_pid(pipe) unless stopping?
  end
end
process_pipes(pipes) click to toggle source
# File lib/sisyphus/master.rb, line 118
def process_pipes(pipes)
  begin
    @selfpipe[:reader].read_nonblock(10) if pipes.include?(@selfpipe[:reader])
  rescue Errno::EAGAIN, Errno::EINTR
    # Ignore
  end
  process_output(pipes & worker_pipes) unless stopping?
end
process_signal_queue() click to toggle source
# File lib/sisyphus/master.rb, line 114
def process_signal_queue
  handle_signal(Thread.main[:signal_queue].shift) until Thread.main[:signal_queue].empty?
end
queue_signal(signal) click to toggle source
# File lib/sisyphus/master.rb, line 163
def queue_signal(signal)
  Thread.main[:signal_queue] << signal
  @selfpipe[:writer].write_nonblock('.')
rescue Errno::EAGAIN
  # Ignore
rescue Errno::EINTR
  retry
end
respawn_worker(wpid) click to toggle source
# File lib/sisyphus/master.rb, line 133
def respawn_worker(wpid)
  spawn_worker
  stop_worker wpid
  watch_for_shutdown
end
stop() click to toggle source
# File lib/sisyphus/master.rb, line 204
def stop
  @stopping = true
end
stopping?() click to toggle source
# File lib/sisyphus/master.rb, line 208
def stopping?
  @stopping
end
trap_signals() click to toggle source
# File lib/sisyphus/master.rb, line 155
def trap_signals
  HANDLED_SIGNALS.each do |signal|
    Signal.trap signal do
      queue_signal signal
    end
  end
end
watch_for_output() click to toggle source
# File lib/sisyphus/master.rb, line 104
def watch_for_output
  loop do
    ready = IO.select(worker_pipes + [@selfpipe[:reader]], nil, nil, IO_TIMEOUT)
    if ready
      process_pipes(ready[0])
      process_signal_queue
    end
  end
end
watch_for_shutdown() click to toggle source
# File lib/sisyphus/master.rb, line 95
def watch_for_shutdown
  wpid, _ = Process.wait2
  worker = @workers.find { |w| w.fetch(:pid) == wpid }
  worker.fetch(:reader).close
  workers.delete worker
  wpid
rescue Errno::ECHILD
end
worker_pid(reader) click to toggle source
# File lib/sisyphus/master.rb, line 147
def worker_pid(reader)
  if worker = workers.find { |w| w.fetch(:reader).fileno == reader.fileno }
    worker.fetch(:pid)
  else
    raise 'Unknown worker pipe'
  end
end
worker_pipes() click to toggle source
# File lib/sisyphus/master.rb, line 139
def worker_pipes
  if worker_count > 0
    workers.map { |w| w.fetch(:reader) }
  else
    []
  end
end