class Aggkit::Watcher

Constants

EXIT_SIGNALS
TERM_SIGNALS

Attributes

iolock[RW]

Public Class Methods

new() click to toggle source
# File lib/aggkit/watcher.rb, line 31
def initialize
  @iolock = Mutex.new
  @pipe = Pipe.new

  @procs = []
  @code = 0
  @crashed = false
  @who = nil
end

Public Instance Methods

add(*cmd) click to toggle source
# File lib/aggkit/watcher.rb, line 41
def add(*cmd)
  log "Starting #{cmd}..."
  @procs.push ProcessHandler.new(self, *cmd.flatten)
  log "  * PID: #{@procs.last.pid}"
  @procs.last
end
error(msg) click to toggle source
# File lib/aggkit/watcher.rb, line 52
def error(msg)
  @iolock.synchronize{ STDERR.puts("[watcher][#{Time.now.strftime('%H:%M:%S.%L')}]: Error: #{msg}") }
end
exec() { |self| ... } click to toggle source
# File lib/aggkit/watcher.rb, line 91
def exec
  install_signal_handlers(@pipe)

  begin
    yield(self)
  rescue StandardError => e
    @crashed = true
    @code = 1
    error e.inspect
    error e.backtrace.last(20).join("\n")
    log 'Try exits gracefully..'
    terminate_all
    exit!(@code)
  end

  loop_childs
  quit! if @procs.all?(&:handled?)

  Thread.new(@pipe) do |pipe|
    loop do
      begin
        sleep 10
        pipe.puts :gc
      rescue StandardError => e
        log "GC thread exception: #{e.inspect}"
      end
    end
  end

  loop do
    log 'Main loop...'
    case event = @pipe.gets
    when :term
      log '  * Child terminated: try exits gracefully...'
      terminate_all
      loop_childs
      quit!
    when *(EXIT_SIGNALS + TERM_SIGNALS).map(&:to_sym)
      log "  * Catch #{event}: try exits gracefully..."
      terminate_all
      loop_childs
      quit!
    when :child, :gc
      log "  * Main loop event: #{event}"
      loop_childs
      quit! if @procs.all?(&:handled?)
    else
      log "  * Unknown event: #{event.inspect}"
      loop_childs
      quit! if @procs.all?(&:handled?)
    end
  end
end
log(msg) click to toggle source
# File lib/aggkit/watcher.rb, line 48
def log msg
  @iolock.synchronize{ STDOUT.puts("[watcher][#{Time.now.strftime('%H:%M:%S.%L')}]: #{msg}") }
end
quit!() click to toggle source
# File lib/aggkit/watcher.rb, line 64
def quit!


  @code = [@code || 0, 1].max if @crashed
  exit!(@code || 0)
  
  
end
set_crash_report(pr) click to toggle source
# File lib/aggkit/watcher.rb, line 56
def set_crash_report(pr)
  return if @crashed

  @crashed  = true
  @code     = pr.exit_code
  @who      = pr.command
end
terminate_all() click to toggle source
# File lib/aggkit/watcher.rb, line 73
def terminate_all


  raise 'Double termination occured!' if @terminating
  @terminating = true
  running = @procs.reject(&:handled?)

  running.each do |pr|
    pr.stdin.close rescue nil
    pr.terminate
  end

  running.each do |pr|
    pr.stop
    collect_managed(pr)
  end
end

Private Instance Methods

collect_managed(pr, status = nil) click to toggle source
# File lib/aggkit/watcher.rb, line 147
def collect_managed(pr, status = nil)
  log 'Child finished'
  log "  Process[#{pr.pid}]: #{pr.command}"
  log "    status: #{(status || pr.status).inspect}"

  pr.stop!(status) if status

  set_crash_report(pr) if pr.crashed?
end
collect_processes() click to toggle source
# File lib/aggkit/watcher.rb, line 157
def collect_processes
  pid, status = Process.waitpid2(-1, ::Process::WNOHANG | ::Process::WUNTRACED)

  return false if pid.nil? || pid == 0

  if pr = @procs.find {|pr| pr.pid == pid }
    collect_managed(pr, status)
    @pipe.puts :term
  else
    log "Unmanaged process finished: #{status.inspect}"
  end

  true
rescue Errno::ECHILD, Errno::ESRCH
  false
end
install_signal_handlers(pipe) click to toggle source
# File lib/aggkit/watcher.rb, line 181
def install_signal_handlers(pipe)
  EXIT_SIGNALS.each do |sig|
    trap(sig) do |*_args|
      EXIT_SIGNALS.each do |sig|
        trap(sig, 'IGNORE')
      end

      pipe.puts sig
    end
  end

  TERM_SIGNALS.each do |sig|
    trap(sig) do |*_args|
      TERM_SIGNALS.each do |sig|
        trap(sig) do |*_args|
          STDERR.puts 'Forcing exit!'
          exit!(1)
        end
      end

      pipe.puts sig
    end
  end

  trap('CLD') do |*_args|
    pipe.puts :child
  end
end
loop_childs() click to toggle source
# File lib/aggkit/watcher.rb, line 174
def loop_childs
  loop do
    collected = collect_processes
    break unless collected
  end
end