class Resque::Plugins::ResqueSliders::KEWatcher

KEWatcher class provides a daemon to run on host that are running resque workers.

Attributes

max_children[R]
pidfile[R]
verbosity[RW]

Verbosity level (Integer)

zombie_kill_wait[R]
zombie_term_wait[R]

Public Class Methods

new(options={}) click to toggle source

Initialize daemon with options from command-line.

# File lib/resque-sliders/kewatcher.rb, line 20
def initialize(options={})
  @verbosity = (options[:verbosity] || 0).to_i # verbosity level
  @zombie_term_wait = options[:zombie_term_wait] || 20 # time to wait before TERM
  @zombie_kill_wait = ENV['RESQUE_TERM_TIMEOUT'].to_i + @zombie_term_wait unless ENV['RESQUE_TERM_TIMEOUT'].nil?
  @zombie_kill_wait ||= options[:zombie_kill_wait] || 60 # time to wait before -9
  @hostile_takeover = options[:force] # kill running kewatcher?
  @rakefile = File.expand_path(options[:rakefile]) rescue nil
  @rakefile = File.exists?(@rakefile) ? @rakefile : nil if @rakefile
  @pidfile = File.expand_path(options[:pidfile]) rescue nil
  @pidfile = @pidfile =~ /\.pid$/ ? @pidfile : @pidfile + '.pid' if @pidfile
  save_pid!

  @max_children = options[:max_children] || 10
  @hostname = `hostname -s`.chomp.downcase
  @pids = Hash.new # init pids array to track running children
  @need_queues = Array.new # keep track of pids that are needed
  @dead_queues = Array.new # keep track of pids that are dead
  @zombie_pids = Hash.new # keep track of zombie's we kill and dont watch(), with elapsed time we've waited for it to die
  @async = options[:async] || false # sync and wait by default
  @hupped = 0

  Resque.redis = case options[:config]
    when Hash
      [options[:config]['host'], options[:config]['port'], options[:config]['db'] || 0].join(':')
    else
      options[:config]
  end
end

Public Instance Methods

run!(interval=0.1) click to toggle source

run the daemon

# File lib/resque-sliders/kewatcher.rb, line 50
def run!(interval=0.1)
  interval = Float(interval)
  if running?
    unless @hostile_takeover
      puts "Already running. Restart Not Forced exiting..."
      exit
    end
    restart_running!
  end
  $0 = "KEWatcher: Starting"
  startup

  count = 0
  old = 0 # know when to tell redis we have new different current pids
  loop do
    break if shutdown?
    count += 1
    log! ["watching:", @pids.keys.join(', '), "(#{@pids.keys.length})"].delete_if { |x| x == (nil || '') }.join(' ') if count % (10 / interval) == 1

    tick = count % (20 / interval) == 1
    (log! "checking signals..."; check_signals) if tick
    if not (paused? || shutdown?)
      queue_diff! if tick # do first and also about every 20 seconds so we can throttle calls to redis

      while @pids.keys.length < @max_children && (@need_queues.length > 0 || @dead_queues.length > 0)
        queue = @dead_queues.shift || @need_queues.shift
        exec_string = ""
        exec_string << 'rake'
        exec_string << " -f #{@rakefile}" if @rakefile
        exec_string << ' environment' if ENV['RAILS_ENV']
        exec_string << ' resque:work'
        env_opts = {"QUEUE" => queue}
        if Resque::Version >= '1.22.0' # when API changed for signals
          term_timeout = @zombie_kill_wait - @zombie_term_wait
          term_timeout = term_timeout > 0 ? term_timeout : 1
          env_opts.merge!({
            'TERM_CHILD' => '1',
            'RESQUE_TERM_TIMEOUT' => term_timeout.to_s # use new signal handling
          })
        end
        exec_args = if RUBY_VERSION < '1.9'
          [exec_string, env_opts.map {|k,v| "#{k}=#{v}"}].flatten.join(' ')
        else
          [env_opts, exec_string] # 1.9.x exec
        end
        pid = fork do
          srand # seed
          exec(*exec_args)
        end
        @pids.store(pid, queue) # store pid and queue its running if fork() ?
        procline
      end
    end

    register_setting('current_children', @pids.keys.length) if old != @pids.length
    old = @pids.length

    procline if tick

    sleep(interval) # microsleep
    kill_zombies! unless shutdown? # need to cleanup ones we've killed
    if @hupped > 0
      log "HUP received; purging children..."
      signal_hup
      do_reload!
      @hupped -= 1
    end

    @pids.keys.each do |pid|
      begin
        # check to see if pid is running, by waiting for it, with a timeout
        # Im sure Ruby 1.9 has some better helpers here
        Timeout::timeout(interval / 100) { Process.wait(pid) }
      rescue Timeout::Error
        # Timeout expired, goto next pid
        next
      rescue Errno::ECHILD
        # if no pid exists to wait for, remove it
        log! (paused? || shutdown?) ? "#{pid} (#{@pids[pid]}) child died; no one cares..." : "#{pid} (#{@pids[pid]}) child died; spawning another..."
        remove pid
        break
      end
    end
  end
end
running?() click to toggle source

Returns PID if already running, false otherwise

# File lib/resque-sliders/kewatcher.rb, line 137
def running?
  pid = `ps x -o pid,command|grep [K]EWatcher|awk '{print $1}'`.to_i
  pid == 0 ? false : pid
end

Private Instance Methods

check_signals() click to toggle source

Check signals, do appropriate thing

# File lib/resque-sliders/kewatcher.rb, line 199
def check_signals
  if reload?(@hostname)
    log ' -> RELOAD from web-ui'
    signal_hup
    do_reload!
  elsif stop?(@hostname)
    log ' -> STOPPED from web-ui' if not paused? or @pids.keys.length > 0
    signal_usr1
  elsif pause?(@hostname)
    log ' -> PAUSED from web-ui' unless paused?
    signal_usr2
  else
    log! ' -> Continuing; no signal found'
    signal_cont
  end
end
clean_signal_settings() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 194
def clean_signal_settings
  %w(pause stop reload).each { |x| unregister_setting(x) }
end
do_reload!() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 290
def do_reload!
  while not @async and @zombie_pids.length > 0
    kill_zombies!
  end
end
enable_gc_optimizations() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 171
def enable_gc_optimizations
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
end
kill_child(pid) click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 374
def kill_child(pid)
  begin
    Process.kill(:QUIT, pid) # try graceful shutdown
    log! "Child #{pid} killed. (#{@pids.keys.length-1})"
  rescue Object => e # dunno what this does but it works; dont know exception
    log! "Child #{pid} already dead, sad day. (#{@pids.keys.length-1}) #{e}"
  ensure
    # Keep track of ones we've killed
    @zombie_pids[pid] = [Time.now, 1] # set to current time, killed #
  end
end
kill_children() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 386
def kill_children
  @pids.dup.keys.each do |pid|
    kill_child pid
    remove pid
  end
end
kill_zombies!() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 340
def kill_zombies!
  return if @zombie_pids.empty?
  local_zombies = @zombie_pids.dup
  to_delete = []
  local_zombies.each do |pid,kill_data|
    begin
      when_killed, times_killed = kill_data
      elapsed = Time.now - when_killed
      sig = if elapsed >= @zombie_term_wait and times_killed == 1
        :TERM
      elsif elapsed >= @zombie_kill_wait and not Resque::Version >= '1.22.0'
        :KILL
      else
        nil
      end
      unless sig.nil?
        log "Waited more than #{@zombie_term_wait} seconds for #{pid}. Sending #{sig}..."
        Process.kill(sig, pid)
        @zombie_pids.merge!({pid => [when_killed, times_killed + 1]})
      end
      wait = !@async ? (@zombie_term_wait - elapsed) / @zombie_pids.length : 0.01
      wait = wait > 0 ? wait : 0.01
      # Issue wait() to make sure pid isn't forgotten
      Timeout::timeout(wait) { Process.wait(pid) }
      to_delete << pid
    rescue Timeout::Error
      # waited too long so just catch and ignore, and continue
    rescue Errno::ESRCH, Errno::ECHILD # child is gone
      to_delete << pid
    end
  end
  to_delete.each { |pid| @zombie_pids.delete(pid) }
end
log(message) click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 394
def log(message)
  if verbosity == 1
    puts "* #{message}"
  elsif verbosity > 1
    time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
    puts "*** [#{time}] #$$: #{message}"
  end
end
log!(message) click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 403
def log!(message)
  log message if verbosity > 1
end
paused?() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 313
def paused?
  @paused
end
procline() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 216
def procline
  status ||= 'stopped' if paused? and (@pids.keys.empty? and @zombie_pids.keys.empty?)
  status ||= 'paused' if paused?
  status = "#{[@pids.keys.length, @zombie_pids.keys.length, status].compact.join('-')}" unless status == 'stopped'
  name = "KEWatcher"
  pid_str = []
  pid_str << "R:#{@pids.keys.join(',')}" unless @pids.keys.empty?
  pid_str << "Z:#{@zombie_pids.keys.join(',')}" unless @zombie_pids.keys.empty?
  $0 = "#{name} (#{status}): #{pid_str.join(' ')}"
  log! $0
end
queue_diff() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 237
def queue_diff
  # Queries Redis to get Hash of what should running
  # figures what is running and does a diff
  # returns an Array of 2 Arrays: to_start, to_kill

  goal, to_start, to_kill = [], [], []
  queue_values(@hostname).each_pair { |queue,count| goal += [queue] * count.to_i }

  running_queues = @pids.values # check list
  goal.each do |q|
    if running_queues.include?(q)
      # delete from checklist cause its already running
      running_queues.delete_at(running_queues.index(q))
    else
      # not included in running queue, need to start
      to_start << q
    end
  end

  @pids.dup.each_pair do |k,v|
    if running_queues.include?(v)
      # whatever is left over in this checklist shouldn't be running
      to_kill << k
      running_queues.delete_at(running_queues.index(v))
    end
  end

  if (to_start.length + @pids.keys.length - to_kill.length) > @max_children
    # if to_start with existing minus whats to be killed is still greater than max children
    log "WARN: need to start too many children, please raise max children"
  end

  kill_queues = to_kill.map { |x| @pids[x] }
  log! ["GOTTA START:", to_start.map { |x| "#{x} (#{to_start.count(x)})" }.uniq.join(', '), "= #{to_start.length}"].delete_if { |x| x == (nil || '') }.join(' ')
  log! ["GOTTA KILL:", kill_queues.map { |x| "#{x} (#{kill_queues.count(x)})" }.uniq.join(', '), "= #{to_kill.length}"].delete_if { |x| x == (nil || '') }.join(' ')

  [to_start, to_kill] # return whats left
end
queue_diff!() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 228
def queue_diff!
  # Forces queue diff
  # Overrides what needs to start from Redis
  to_start, to_kill = queue_diff
  to_kill.each { |pid| remove! pid }
  @need_queues = to_start # authoritative answer from redis of what needs to be running
  @dead_queues = Array.new
end
register_signal_handlers() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 177
def register_signal_handlers
  trap('TERM') { shutdown! }
  trap('INT') { shutdown! }

  begin
    trap('QUIT') { shutdown! }
    trap('HUP') { @hupped += 1 }
    trap('USR1') { log "USR1 received; killing little children..."; set_signal_flag('stop'); signal_usr1 }
    trap('USR2') { log "USR2 received; not making babies"; set_signal_flag('pause'); signal_usr2 }
    trap('CONT') { log "CONT received; making babies..."; set_signal_flag('play'); signal_cont }
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end

  log! "Registered signals"
end
remove(pid) click to toggle source

remove pid, and respawn same queues

# File lib/resque-sliders/kewatcher.rb, line 284
def remove(pid)
  @dead_queues.unshift(@pids[pid]) # keep track of queues that pid was running, put it at front of list
  @pids.delete(pid)
  procline
end
remove!(pid) click to toggle source

removes pid completely, ignores its queues

# File lib/resque-sliders/kewatcher.rb, line 277
def remove!(pid)
  kill_child pid
  @pids.delete(pid)
  procline
end
remove_pidfile!() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 433
def remove_pidfile!
  File.exists?(@pidfile) && File.delete(@pidfile) if @pidfile
end
restart_running!() click to toggle source

Forces (via signal QUIT) any KEWatcher process running, located by ps and grep

# File lib/resque-sliders/kewatcher.rb, line 145
def restart_running!
  count = 0
  while pid = running?
    (puts "#{pid} wont die; giving up"; exit 2) if count > 6
    count += 1
    if count % 5 == 1
      puts "Killing running KEWatcher: #{pid}"
      Process.kill(:TERM, pid)
    end
    s = 3 * count
    puts "Waiting #{s}s for it to die..."
    sleep(s)
  end
end
save_pid!() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 407
def save_pid!
  if @pidfile
    begin
      log "Saving pid to => #{@pidfile}"
      File.open(@pidfile, 'w') { |f| f.write(Process.pid) }
    rescue Errno::EACCES => e
      puts "Cannot write pidfile => #{e}"
      exit 1
    rescue Errno::ENOENT => e
      dir = File.dirname(@pidfile)
      begin
        log! "#{dir} doesnt exist; Creating it..."
        FileUtils.mkdir_p(dir)
      rescue Errno::EACCES => e
        puts "Cannot create directory => #{e}"
        exit 1
      end
      begin
        save_pid! # after creating dir, do save again
      rescue # rescue anything else to stop loop
        exit 2
      end
    end
  end
end
shutdown!() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 296
def shutdown!
  log "Exiting..."
  @shutdown = true
  kill_children
  while @zombie_pids.keys.length > 0
    kill_zombies!
  end
  %w(current max).each { |x| unregister_setting("#{x}_children") }
  log! "Unregistered Max Children"
  Process.waitall()
  remove_pidfile!
end
shutdown?() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 309
def shutdown?
  @shutdown
end
signal_cont() click to toggle source

Continue

# File lib/resque-sliders/kewatcher.rb, line 336
def signal_cont
  @paused = false # unpause
end
signal_hup() click to toggle source

Reload

# File lib/resque-sliders/kewatcher.rb, line 318
def signal_hup
  clean_signal_settings
  kill_children
  @paused = false # unpause after kill (restart child)
end
signal_usr1() click to toggle source

Stop

# File lib/resque-sliders/kewatcher.rb, line 325
def signal_usr1
  kill_children
  @paused = true # pause after kill cause we're paused
end
signal_usr2() click to toggle source

Pause

# File lib/resque-sliders/kewatcher.rb, line 331
def signal_usr2
  @paused = true # paused again
end
startup() click to toggle source
# File lib/resque-sliders/kewatcher.rb, line 160
def startup
  log! "Found RAILS_ENV=#{ENV['RAILS_ENV']}" if ENV['RAILS_ENV']
  enable_gc_optimizations
  register_signal_handlers
  clean_signal_settings
  register_setting('max_children', @max_children)
  add_to_known_hosts(@hostname)
  log! "Registered Max Children with Redis"
  $stdout.sync = true
end