class Kinetic::Master

Constants

EXIT_SIGS
QUEUE_SIGS
SELF_PIPE
SIG_QUEUE
WORKERS
WORKER_QUEUE_SIGS

Attributes

app[R]
reexecpid[R]

Public Class Methods

new(app) click to toggle source
# File lib/kinetic/master.rb, line 14
def initialize(app)
  @app = app
end

Public Instance Methods

run() click to toggle source

Starts and initializes the master process.

# File lib/kinetic/master.rb, line 30
def run
  begin
    daemonize if config[:daemonize]
    enable_logging!
    logger.debug 'Configuration:'
    logger.ap    app.config.to_hash
    proc_name 'master'
    initialize_self_pipe!
    initialize_signal_traps!
    spawn_missing_workers
    join
  rescue => e
    logger.fatal e
    logger.fatal 'Unable to start worker!'
  ensure
    call_on_exit_callbacks
    exit!
  end
end
start() click to toggle source

Starts and daemonizes the master process

# File lib/kinetic/master.rb, line 19
def start
  if File.exists?(config.pid)
    puts "Unable to start master process, a process with the pidfile #{config.pid} already exists." +
      'If you wish to run another instance of this application please pass --pid to set a new pid file.'
    exit(1)
  end
  config.daemonize = true
  run
end
stop() click to toggle source
# File lib/kinetic/master.rb, line 50
def stop
  unless File.exists?(File.join(config.root, config.pid))
    puts "Unable to stop process, a process with the pidfile #{config.pid} could not be found" +
           'If you have started this app with another pid file location pass --pid to set the pid file.'
    exit(1)
  end
  pid = File.open(File.join(config.root, config.pid), 'r') { |f| f.read }.to_i
  Process.kill(:QUIT, pid)
end

Private Instance Methods

awaken_master() click to toggle source
# File lib/kinetic/master.rb, line 104
def awaken_master
  SELF_PIPE[1].kgio_trywrite('.')
end
call_on_exit_callbacks() click to toggle source
# File lib/kinetic/master.rb, line 114
def call_on_exit_callbacks
ensure
  FileUtils.rm(config[:pid]) if config[:pid] && config[:daemonize]
end
daemonize() click to toggle source
# File lib/kinetic/master.rb, line 67
def daemonize
  logger.info 'Daemonizing process'
  Process.daemon true, false
  write_pidfile!
end
enable_logging!() click to toggle source
# File lib/kinetic/master.rb, line 62
def enable_logging!
  config.logging = true
  app.class.instance_variable_set(:@logger, nil)
end
init_worker_process(worker) click to toggle source

gets rid of stuff the worker has no business keeping track of to free some resources and drops all sig handlers. traps for USR1, USR2, and HUP may be set in the after_fork Proc by the user.

# File lib/kinetic/master.rb, line 155
def init_worker_process(worker)
  worker.atfork_child
  # we'll re-trap :QUIT later for graceful shutdown iff we accept clients
  EXIT_SIGS.each { |sig| trap(sig) { exit!(0) } }
  exit!(0) if (SIG_QUEUE & EXIT_SIGS)[0]
  WORKER_QUEUE_SIGS.each { |sig| trap(sig, nil) }
  trap(:CHLD, 'DEFAULT')
  SIG_QUEUE.clear
  WORKERS.clear

  after_fork.call(self, worker)
end
initialize_self_pipe!() click to toggle source

Based on Unicorn's self-pipe

# File lib/kinetic/master.rb, line 93
def initialize_self_pipe!
  logger.debug 'Initializing self pipe'
  SELF_PIPE.replace Kgio::Pipe.new.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end
initialize_signal_traps!() click to toggle source
# File lib/kinetic/master.rb, line 98
def initialize_signal_traps!
  logger.debug 'Initializing signal traps'
  QUEUE_SIGS.each { |sig| trap(sig) {SIG_QUEUE << sig; awaken_master } }
  trap(:CHLD) { awaken_master }
end
join() click to toggle source
# File lib/kinetic/master.rb, line 78
def join
  logger.debug 'Joining thread'
  case SIG_QUEUE.shift
  when :QUIT
    stop_all(true)
    break
  when :TERM, :INT
    stop_all(false)
    break
  else
    master_sleep(0.5)
  end while true
end
kill_each_worker(signal) click to toggle source
# File lib/kinetic/master.rb, line 184
def kill_each_worker(signal)
  WORKERS.keys.each { |wpid| kill_worker(signal, wpid) }
end
kill_worker(signal, wpid) click to toggle source

delivers a signal to a worker and fails gracefully if the worker is no longer running.

# File lib/kinetic/master.rb, line 190
def kill_worker(signal, wpid)
  Process.kill(signal, wpid)
rescue Errno::ESRCH
  worker = WORKERS.delete(wpid) and worker.close rescue nil
end
master_sleep(time) click to toggle source
# File lib/kinetic/master.rb, line 73
def master_sleep(time)
  IO.select([ SELF_PIPE[0] ], nil, nil, time) or return
  SELF_PIPE[0].kgio_tryread(11)
end
proc_name(tag) click to toggle source
# File lib/kinetic/master.rb, line 222
def proc_name(tag)
  $0 = "#{config.name} [#{tag}]"
end
reap_all_workers() click to toggle source

reaps all unreaped workers

# File lib/kinetic/master.rb, line 202
def reap_all_workers

  begin
    wpid, status = Process.waitpid2(-1, Process::WNOHANG)
    wpid or return
    if reexec_pid == wpid
      logger.error "reaped #{status.inspect} exec()-ed"
      self.reexec_pid = 0
      self.pid = pid.chomp('.oldbin') if pid
      proc_name 'master'
    else
      worker = WORKERS.delete(wpid) and worker.close rescue nil
      m = "reaped #{status.inspect} worker=#{worker.nr rescue 'unknown'}"
      status.success? ? logger.info(m) : logger.error(m)
    end
  rescue Errno::ECHILD
    break
  end while true
end
soft_kill_each_worker(signal) click to toggle source
# File lib/kinetic/master.rb, line 196
def soft_kill_each_worker(signal)
  WORKERS.each_value { |worker| worker.soft_kill(signal) }
end
spawn_missing_workers() click to toggle source
# File lib/kinetic/master.rb, line 119
def spawn_missing_workers
  logger.debug 'Spawning missing workers'
  worker_nr = -1
  until (worker_nr += 1) == config.workers
    WORKERS.value?(worker_nr) and next
    #noinspection RubyArgCount
    worker = Worker.new(worker_nr, @app)
    logger.debug 'Forking worker'
    parent = Process.pid
    pid = fork do
      begin
        proc_name "worker-#{worker_nr}"
        logger.debug 'Calling after fork procs'
        app.send(:after_fork_procs).each { |p| p.call }
        #noinspection RubyArgCount
        worker.run
      rescue => e
        logger.fatal e
        logger.fatal 'Unable to start workers'
        Process.kill(:TERM, parent)
      end
    end
    logger.debug "Worker #{worker_nr} started on #{pid}"
    WORKERS[pid] = worker
  end
rescue => e
  logger.error(e)
end
stop_all(graceful = true) click to toggle source

Terminates all workers, but does not exit master process

# File lib/kinetic/master.rb, line 169
def stop_all(graceful = true)
  logger.warn graceful ? 'Process is shutting down gracefully' : 'Process is shutting down immediately!'
  limit = Time.now + config.timeout
  until WORKERS.empty? || Time.now > limit
    if graceful
      soft_kill_each_worker(:QUIT)
    else
      kill_each_worker(:TERM)
    end
    sleep(0.1)
    reap_all_workers
  end
  kill_each_worker(:KILL)
end
write_pidfile!() click to toggle source
# File lib/kinetic/master.rb, line 108
def write_pidfile!
  set(:pid, File.join(config.root, "#{config.name}.pid"))
  logger.info "Writing PID file to #{config.pid}"
  File.open(config.pid, 'w') { |f| f.write(Process.pid) }
end