class Spool::Pool

Constants

CHECK_TIMEOUT
SIGNALS

Attributes

actions_queue[R]
configuration[R]
working_processes[RW]
zombie_processes[RW]

Public Class Methods

new(configuration=nil, &block) click to toggle source
# File lib/spool/pool.rb, line 18
def initialize(configuration=nil, &block)
  @configuration = configuration || DSL.configure(&block)
  @working_processes = []
  @zombie_processes = Set.new
  @running = false
  @actions_queue = []
end

Public Instance Methods

all_processes() click to toggle source
# File lib/spool/pool.rb, line 34
def all_processes
  working_processes + zombie_processes.to_a
end
running?() click to toggle source
# File lib/spool/pool.rb, line 26
def running?
  @running
end
start() click to toggle source
# File lib/spool/pool.rb, line 45
def start
  @running = true

  handle_signals

  File.write configuration.pid_file, Process.pid if configuration.pid_file

  configuration.processes.times.map do
    working_processes << Spawner.spawn(configuration)
  end

  logger.info(self.class) { "SPOOL START => #{format_processes}" }

  while running?
    begin
      action = actions_queue.pop
      
      if action
        logger.info(self.class) { "Starting action #{action[:name]} with params: [#{action[:args].join(', ')}]" }
        send action[:name], *action[:args] 
      end

      if running?
        check_status
        sleep CHECK_TIMEOUT
      end
    rescue Exception => e
      log_error e
    end
  end

  logger.info(self.class) { "Spool finished successfully!" }
end
stopped?() click to toggle source
# File lib/spool/pool.rb, line 30
def stopped?
  !running?
end

Private Instance Methods

_decr(count=1) click to toggle source
# File lib/spool/pool.rb, line 120
def _decr(count=1)
  configuration.processes -= count
  configuration.processes = 0 if configuration.processes < 0
end
_incr(count=1) click to toggle source
# File lib/spool/pool.rb, line 116
def _incr(count=1)
  configuration.processes += count
end
_reload() click to toggle source
# File lib/spool/pool.rb, line 125
def _reload
  @configuration = DSL.configure configuration.source_file if configuration.source_file
end
_restart() click to toggle source
# File lib/spool/pool.rb, line 129
def _restart
  logger.info(self.class) { "RESTART" }
  stop_processes working_processes
end
_stop() click to toggle source
# File lib/spool/pool.rb, line 134
def _stop
  logger.info(self.class) { "SPOOL STOP" }

  stop_processes working_processes
  wait_for_stopped all_processes

  @running = false
end
_stop!() click to toggle source
# File lib/spool/pool.rb, line 143
def _stop!
  logger.info(self.class) { "SPOOL STOP! Going to kill => #{format_processes}" }

  all_processes.each do |p| 
    begin
      send_signal_to(p, configuration.kill_signal) if p.alive?
    rescue Datacenter::Shell::CommandError => e
      log_error e
    end
  end

  wait_for_stopped all_processes

  File.delete configuration.pid_file if File.exist? configuration.pid_file

  @running = false
end
all_processes_count() click to toggle source
# File lib/spool/pool.rb, line 201
def all_processes_count
  working_processes.count + zombie_processes.count
end
check_processes_to_restart() click to toggle source
# File lib/spool/pool.rb, line 182
def check_processes_to_restart
  to_restart = working_processes.select(&configuration.restart_condition)

  if to_restart.any?
    logger.info(self.class) {"Restart condition successful in child processes: #{to_restart.map(&:pid)}"}
    stop_processes to_restart
  end
end
check_status() click to toggle source
# File lib/spool/pool.rb, line 93
def check_status
  clear_dead_processes

  check_processes_to_restart
  
  if configuration.processes > all_processes_count
    logger.info(self.class) { "Initializing new children. Current State => #{format_processes}" }

    (configuration.processes - all_processes_count).times do
      working_processes << Spawner.spawn(configuration)
    end

    logger.info(self.class) { "Status after new childrens => #{format_processes}" }
  elsif configuration.processes < working_processes.count
    count_to_kill = working_processes.count - configuration.processes
    logger.info(self.class) { "Killing #{count_to_kill} children. Current state => #{format_processes}" }

    stop_processes working_processes.take(count_to_kill)

    logger.info(self.class) { "After killing childers. Current State => #{format_processes}" }
  end
end
clear_dead_processes() click to toggle source
# File lib/spool/pool.rb, line 196
def clear_dead_processes
  working_processes.delete_if { |p| !p.alive? }
  zombie_processes.delete_if { |p| !p.alive? }
end
format_actions_queue() click to toggle source
# File lib/spool/pool.rb, line 213
def format_actions_queue
  return "EMPTY" if actions_queue.empty?
  
  actions_queue.map.with_index do |action, index| 
    "#{index+1} => #{action[:name]}"
  end.join("\n")
end
format_processes() click to toggle source
# File lib/spool/pool.rb, line 221
def format_processes
  "Working Processes: #{working_processes.map(&:pid)}, Zombie Processes: #{zombie_processes.map(&:pid)}"
end
handle_signals() click to toggle source
# File lib/spool/pool.rb, line 84
def handle_signals
  SIGNALS.each do |signal, event|
    Signal.trap(signal) do
      logger.info(self.class) { "Signal #{signal} received. Current state of actions queue is:\n#{format_actions_queue}" }
      send event
    end
  end
end
log_error(error) click to toggle source
# File lib/spool/pool.rb, line 209
def log_error(error)
  logger.error(self.class) { "#{error.message}\n#{error.backtrace.join("\n")}" }
end
logger() click to toggle source
# File lib/spool/pool.rb, line 205
def logger
  configuration.logger
end
send_signal_to(process, signal) click to toggle source
# File lib/spool/pool.rb, line 191
def send_signal_to(process, signal)
  logger.info(self.class) { "Going to send signal #{signal} to process #{process.pid}" }
  process.send_signal signal
end
stop_processes(processes_list) click to toggle source
# File lib/spool/pool.rb, line 161
def stop_processes(processes_list)
  processes_list.each do |p| 
    begin
      send_signal_to p, configuration.stop_signal
      zombie_processes << p
    rescue Exception => e
      log_error e
    end
  end

  working_processes.delete_if{ |p| zombie_processes.include? p }
end
wait_for_stopped(processes_list) click to toggle source
# File lib/spool/pool.rb, line 174
def wait_for_stopped(processes_list)
  while processes_list.any?(&:alive?)
    sleep 0.01
  end

  clear_dead_processes
end