class Server::Starter

Constants

VERSION

Public Class Methods

new() click to toggle source
# File lib/server/starter.rb, line 10
def initialize
  @signals_received  = []
  @current_worker    = nil
  @old_workers       = {}
  @last_restart_time = []
end

Public Instance Methods

_reload_env() click to toggle source
# File lib/server/starter.rb, line 377
def _reload_env
  dn = ENV['ENVDIR']
  return {} if dn.nil? or !File.exist?(dn)
  env = {}
  Dir.open(dn) do |d|
    while n = d.read
      next if n =~ /^\./
      File.open("#{dn}/#{n}") do |fh|
        first_line = fh.gets.chomp
        env[n] = first_line if first_line
      end
    end
  end
  env
end
restart_server(opts) click to toggle source
# File lib/server/starter.rb, line 324
def restart_server(opts)
  unless opts[:pid_file] && opts[:status_file]
    die "--restart option requires --pid-file and --status-file to be set as well"
  end

  # get first pid
  pid = Proc.new {
    begin
      File.open(opts[:pid_file]) do |fd|
        line = fd.gets
        line.chomp
      end
    rescue
      die "failed to open file:#{opts[:pid_file]}"
    end
  }.call

  # function that returns a list of active generations in sorted order
  get_generations = Proc.new {
    begin
      File.readlines(opts[:status_file]).map do |line|
        line =~ /^(\d+):/ ? $1 : nil
      end.compact.map(&:to_i).sort.uniq
    rescue
      die "failed to open file:#{opts[:status_file]}"
    end
  }

  # wait for this generation
  wait_for = Proc.new {
    gens = get_generations.call
    die "no active process found in the status file" if gens.empty?
    gens.last.to_i + 1
  }.call

  # send HUP
  Process.kill('HUP', pid.to_i) rescue die "failed to send SIGHUP to the server process"

  # wait for the generation
  while true
    gens = get_generations.call
    break if gens.size == 1 && gens[0].to_i == wait_for.to_i
    sleep 1
  end
end
server_ports() click to toggle source
# File lib/server/starter.rb, line 370
def server_ports
  die "no environment variable SERVER_STARTER_PORT. Did you start the process using server_starter?" unless ENV['SERVER_STARTER_PORT']
  ENV['SERVER_STARTER_PORT'].split(';').map do |_|
    _.split('=', 2)
  end.to_h
end
start_server(opts) click to toggle source
# File lib/server/starter.rb, line 17
def start_server(opts)
  # symbolize keys
  opts = opts.map {|k, v| [k.to_sym, v] }.to_h
  opts[:interval] ||= 1
  opts[:signal_on_hup]  ||= 'TERM'
  opts[:signal_on_term] ||= 'TERM'
  opts[:backlog] ||= Socket::SOMAXCONN
  [:signal_on_hup, :signal_on_term].each do |key|
    # normalize to the one that can be passed to kill
    opts[key].tr!("a-z", "A-Z")
    opts[key].sub!(/^SIG/i, "")
  end

  # prepare args
  ports = Array(opts[:port])
  paths = Array(opts[:path])
  unless ports.empty? || paths.empty?
    croak "either of ``port'' or ``path'' option is mandatory"
  end
  unless opts[:exec] && opts[:exec].is_a?(Array)
    croak "mandatory option ``exec'' is missing or not an array"
  end

  # set envs
  ENV['ENVDIR'] = opts[:envdir] if opts[:envdir]
  ENV['ENABLE_AUTO_RESTART'] = opts[:enable_auto_restart] ? '1' : nil
  ENV['KILL_OLD_DELAY'] = opts[:kill_old_delay].to_s if opts[:kill_old_delay]
  ENV['AUTO_RESTART_INTERVAL'] = opts[:auto_restart_interval].to_s if opts[:auto_restart_interval]

  # open pid file
  if opts[:pid_file]
    File.open(opts[:pid_file], "w") do |fh|
      fh.puts $$
    end rescue die "failed to open file:#{opts[:pid_file]}"
    at_exit { File.unlink opts[:pid_file] rescue nil }
  end

  # open log file
  if opts[:log_file]
    File.open(opts[:log_file], "a") do |fh|
      $stdout.flush
      $stderr.flush
      $stdout.reopen(fh) rescue die "failed to reopen STDOUT to file"
      $stderr.reopen(fh) rescue die "failed to reopen STDERR to file"
    end
  end

  # create guard that removes the status file
  if opts[:status_file]
    at_exit { File.unlink opts[:status_file] rescue nil }
  end

  $stderr.puts "start_server (pid:#{$$}) starting now..."

  # start listening, setup envvar
  socks = []
  sockenvs = []
  ports.each do |port|
    sock = nil
    begin
      if port =~ /^\s*(\d+)\s*$/
        # sock = Socket.new(:INET, :STREAM)
        # addr = Socket.pack_sockaddr_in(port, '0.0.0.0')
        # sock.setsockopt(:SOCKET, :REUSEADDR, true)
        # sock.bind(addr)
        # sock.listen(opts[:backlog])
        sock = TCPServer.new("0.0.0.0", port)
        sock.setsockopt(:SOCKET, :REUSEADDR, true)
        sock.listen(opts[:backlog])
      elsif port =~ /^\s*(.*)\s*:\s*(\d+)\s*$/
        _bind, _port = $1, $2
        sock = TCPServer.new(_bind, _port)
        sock.setsockopt(:SOCKET, :REUSEADDR, true)
        sock.listen(opts[:backlog])
      else
        croak "invalid ``port'' value:#{port}"
      end
    rescue
      die "failed to listen to #{port}"
    end
    sock.fcntl(Fcntl::F_SETFD, 0) rescue die "fcntl(F_SETFD, 0) failed"
    sockenvs.push "#{port}=#{sock.fileno}"
    socks.push sock
  end

  at_exit {
    paths.each do |path|
      File.symlink?(path) and File.unlink(path) rescue nil
    end
  }
  paths.each do |path|
    if File.symlink?(path)
      warn "removing existing socket file:#{path}"
      File.unlink(path) rescue die "failed to remove existing socket file:#{path}"
    end
    File.unlink(path) rescue nil
    saved_umask = File.umask(0)
    begin
      sock = UNIXServer.new(path)
      sock.listen(opts[:backlog])
    rescue
      die "failed to listen to file #{path}"
    end
    sock.fcntl(Fcntl::F_SETFD, 0) rescue die "fcntl(F_SETFD, 0) failed"
    sockenvs.push "#{path}=#{sock.fileno}"
    socks.push sock
  end
  ENV['SERVER_STARTER_PORT'] = sockenvs.join(";")
  ENV['SERVER_STARTER_GENERATION'] = "0"

  # setup signal handlers
  %w(INT TERM HUP ALRM).each do |signal|
    Signal.trap(signal) {
      @signals_received.push(signal)
      @signal_wait_thread.kill if @signal_wait_thread
    }
  end
  Signal.trap('PIPE') { 'IGNORE' }

  # setup status monitor
  update_status =
    if opts[:status_file]
      Proc.new {
        tmpfn = "#{opts[:status_file]}.#{$$}"
        File.open(tmpfn, "w") do |tmpfh|
          gen_pids = @current_worker ?
            {ENV['SERVER_STARTER_GENERATION'] => @current_worker} :
            {}
          @old_workers.each {|pid, gen| gen_pids[gen] = pid }
          gen_pids.keys.map(&:to_i).sort.each {|gen| tmpfh.puts "#{gen}:#{gen_pids[gen.to_s]}" }
        end rescue die "failed to create temporary file:#{tmpfn}"
        begin
          File.rename(tmpfn, opts[:status_file])
        rescue
          die "failed to rename #{tmpfn} to #{opts[:status_file]}"
        end
      }
    else
      Proc.new {}
    end

  # setup the start_worker function
  start_worker = Proc.new {
    pid = nil
    while true
      ENV['SERVER_STARTER_GENERATION'] = (ENV['SERVER_STARTER_GENERATION'].to_i + 1).to_s
      begin
        pid = fork
      rescue
        die "fork(2) failed"
      end
      if pid.nil? # child process
        args = Array(opts[:exec]).dup
        if opts[:dir]
          Dir.chdir opts[:dir] rescue die "failed to chdir"
        end
        begin
          bundler_with_clean_env do
            args << {:close_others => false}
            exec(*args)
          end
        rescue
          $stderr.puts "failed to exec #{args[0]}:#{$!.class} #{$!.message}"
          exit(255)
        end
      end
      $stderr.puts "starting new worker #{pid}"
      sleep opts[:interval]
      break if (@signals_received - [:HUP]).size > 0
      break if Process.waitpid(pid, Process::WNOHANG).nil?
      $stderr.puts "new worker #{pid} seems to have failed to start, exit status:#{$?.exitstatus}"
    end
    # ready, update the environment
    @current_worker = pid
    @last_restart_time = Time.now
    update_status.call
  }

  # setup the wait function
  wait = Proc.new {
    flags = @signals_received.empty? ? 0 : Process::WNOHANG
    r = nil
    # waitpid can not get EINTR on receiving signal, so create a thread,
    # and kill the thread on receiving signal to exit blocking
    #
    # there is another way to use wait3 which raises EINTR on receiving signal,
    # but proc-wait3 gem requires gcc, etc to compile its C codes.
    #
    #     require 'proc/wait3'
    #     begin
    #       rusage = Process.wait3(flags)
    #       r = [rusage.pid, rusage.status] if rusage
    #     rescue Errno::EINTR
    #       sleep 0.1 # need to wait until Signal.trap finishes its operation, terrible
    #       nil
    #     end
    @signal_wait_thread = Thread.start do
      if flags != 0 && ENV['ENABLE_AUTO_RESTART']
        begin
          Timeout.timeout(1) do
            pid = Process.waitpid(-1, flags)
            r = [pid, $?.exitstatus] if pid
          end
        rescue Timeout::Error
          # Process.kill('ALRM', Process.pid)
          Thread.exit
        end
      else
        pid = Process.waitpid(-1, flags)
        r = [pid, $?.exitstatus] if pid
      end
    end
    @signal_wait_thread.join
    @signal_wait_thread = nil
    r
  }

  # setup the cleanup function
  cleanup = Proc.new {|sig|
    term_signal = sig == 'TERM' ? opts[:signal_on_term] : 'TERM'
    @old_workers[@current_worker] = ENV['SERVER_STARTER_GENERATION']
    @current_worker = nil
    $stderr.print "received #{sig}, sending #{term_signal} to all workers:",
      @old_workers.keys.sort.join(','), "\n"
    @old_workers.keys.sort.each {|pid| Process.kill(term_signal, pid) }
    while true
      died_worker = Process.waitpid(-1, Process::WNOHANG)
      if died_worker
        $stderr.puts "worker #{died_worker} died, status:#{$?.exitstatus}"
        @old_workers.delete(died_worker)
        update_status.call
        break if @old_workers.empty?
      end
    end
    $stderr.puts "exiting"
  }

  # the main loop
  start_worker.call
  while true
    # wait for next signal (or when auto-restart becomes necessary)
    r = wait.call
    # reload env if necessary
    loaded_env = _reload_env
    ENV['AUTO_RESTART_INTERVAL'] ||= "360" if ENV['ENABLE_AUTO_RESTART']
    with_local_env(loaded_env) do
      # restart if worker died
      if r
        died_worker, status = r
        if died_worker == @current_worker
          $stderr.puts "worker #{died_worker} died unexpectedly with status:#{status}, restarting"
          start_worker.call
        else
          $stderr.puts "old worker #{died_worker} died, status:#{status}"
          @old_workers.delete(died_worker)
          update_status.call
        end
      end
      # handle signals
      restart = nil
      while !@signals_received.empty?
        sig = @signals_received.shift
        if sig == 'HUP'
          $stderr.puts "received HUP, spawning a new worker"
          restart = true
          break
        elsif sig == 'ALRM'
          # skip
        else
          return cleanup.call(sig)
        end
      end
      if !restart && ENV['ENABLE_AUTO_RESTART']
        auto_restart_interval = ENV['AUTO_RESTART_INTERVAL'].to_i
        elapsed_since_restart = Time.now - @last_restart_time
        if elapsed_since_restart >= auto_restart_interval && @old_workers.empty?
          $stderr.puts "autorestart triggered (interval=#{auto_restart_interval})"
          restart = true
        elsif elapsed_since_restart >= auto_restart_interval * 2
          $stderr.puts "autorestart triggered (forced, interval=#{auto_restart_interval})"
          restart = true
        end
      end
      # restart if requested
      if restart
        @old_workers[@current_worker] = ENV['SERVER_STARTER_GENERATION']
        start_worker.call
        $stderr.print "new worker is now running, sending #{opts[:signal_on_hup]} to old workers:"
        if !@old_workers.empty?
          $stderr.puts @old_workers.keys.sort.join(',')
        else
          $stderr.puts "none"
        end
        kill_old_delay = ENV['KILL_OLD_DELAY'] ? ENV['KILL_OLD_DELAY'].to_i : ENV['ENABLE_AUTO_RESTART'] ? 5 : 0
        if kill_old_delay != 0
          $stderr.puts "sleeping #{kill_old_delay} secs before killing old workers"
          sleep kill_old_delay
        end
        $stderr.puts "killing old workers"
        @old_workers.keys.sort.each {|pid| Process.kill(opts[:signal_on_hup], pid) }
      end
    end
  end

  die "unreachable"
end