class Spool::Pool
Constants
- CHECK_TIMEOUT
- SIGNALS
Attributes
actions_queue[R]
configuration[R]
working_processes[RW]
zombie_processes[RW]
Public Class Methods
new(configuration=nil, &block)
click to toggle source
# File lib/spool/pool.rb, line 18 def initialize(configuration=nil, &block) @configuration = configuration || DSL.configure(&block) @working_processes = [] @zombie_processes = Set.new @running = false @actions_queue = [] end
Public Instance Methods
all_processes()
click to toggle source
# File lib/spool/pool.rb, line 34 def all_processes working_processes + zombie_processes.to_a end
running?()
click to toggle source
# File lib/spool/pool.rb, line 26 def running? @running end
start()
click to toggle source
# File lib/spool/pool.rb, line 45 def start @running = true handle_signals File.write configuration.pid_file, Process.pid if configuration.pid_file configuration.processes.times.map do working_processes << Spawner.spawn(configuration) end logger.info(self.class) { "SPOOL START => #{format_processes}" } while running? begin action = actions_queue.pop if action logger.info(self.class) { "Starting action #{action[:name]} with params: [#{action[:args].join(', ')}]" } send action[:name], *action[:args] end if running? check_status sleep CHECK_TIMEOUT end rescue Exception => e log_error e end end logger.info(self.class) { "Spool finished successfully!" } end
stopped?()
click to toggle source
# File lib/spool/pool.rb, line 30 def stopped? !running? end
Private Instance Methods
_decr(count=1)
click to toggle source
# File lib/spool/pool.rb, line 120 def _decr(count=1) configuration.processes -= count configuration.processes = 0 if configuration.processes < 0 end
_incr(count=1)
click to toggle source
# File lib/spool/pool.rb, line 116 def _incr(count=1) configuration.processes += count end
_reload()
click to toggle source
# File lib/spool/pool.rb, line 125 def _reload @configuration = DSL.configure configuration.source_file if configuration.source_file end
_restart()
click to toggle source
# File lib/spool/pool.rb, line 129 def _restart logger.info(self.class) { "RESTART" } stop_processes working_processes end
_stop()
click to toggle source
# File lib/spool/pool.rb, line 134 def _stop logger.info(self.class) { "SPOOL STOP" } stop_processes working_processes wait_for_stopped all_processes @running = false end
_stop!()
click to toggle source
# File lib/spool/pool.rb, line 143 def _stop! logger.info(self.class) { "SPOOL STOP! Going to kill => #{format_processes}" } all_processes.each do |p| begin send_signal_to(p, configuration.kill_signal) if p.alive? rescue Datacenter::Shell::CommandError => e log_error e end end wait_for_stopped all_processes File.delete configuration.pid_file if File.exist? configuration.pid_file @running = false end
all_processes_count()
click to toggle source
# File lib/spool/pool.rb, line 201 def all_processes_count working_processes.count + zombie_processes.count end
check_processes_to_restart()
click to toggle source
# File lib/spool/pool.rb, line 182 def check_processes_to_restart to_restart = working_processes.select(&configuration.restart_condition) if to_restart.any? logger.info(self.class) {"Restart condition successful in child processes: #{to_restart.map(&:pid)}"} stop_processes to_restart end end
check_status()
click to toggle source
# File lib/spool/pool.rb, line 93 def check_status clear_dead_processes check_processes_to_restart if configuration.processes > all_processes_count logger.info(self.class) { "Initializing new children. Current State => #{format_processes}" } (configuration.processes - all_processes_count).times do working_processes << Spawner.spawn(configuration) end logger.info(self.class) { "Status after new childrens => #{format_processes}" } elsif configuration.processes < working_processes.count count_to_kill = working_processes.count - configuration.processes logger.info(self.class) { "Killing #{count_to_kill} children. Current state => #{format_processes}" } stop_processes working_processes.take(count_to_kill) logger.info(self.class) { "After killing childers. Current State => #{format_processes}" } end end
clear_dead_processes()
click to toggle source
# File lib/spool/pool.rb, line 196 def clear_dead_processes working_processes.delete_if { |p| !p.alive? } zombie_processes.delete_if { |p| !p.alive? } end
format_actions_queue()
click to toggle source
# File lib/spool/pool.rb, line 213 def format_actions_queue return "EMPTY" if actions_queue.empty? actions_queue.map.with_index do |action, index| "#{index+1} => #{action[:name]}" end.join("\n") end
format_processes()
click to toggle source
# File lib/spool/pool.rb, line 221 def format_processes "Working Processes: #{working_processes.map(&:pid)}, Zombie Processes: #{zombie_processes.map(&:pid)}" end
handle_signals()
click to toggle source
# File lib/spool/pool.rb, line 84 def handle_signals SIGNALS.each do |signal, event| Signal.trap(signal) do logger.info(self.class) { "Signal #{signal} received. Current state of actions queue is:\n#{format_actions_queue}" } send event end end end
log_error(error)
click to toggle source
# File lib/spool/pool.rb, line 209 def log_error(error) logger.error(self.class) { "#{error.message}\n#{error.backtrace.join("\n")}" } end
logger()
click to toggle source
# File lib/spool/pool.rb, line 205 def logger configuration.logger end
send_signal_to(process, signal)
click to toggle source
# File lib/spool/pool.rb, line 191 def send_signal_to(process, signal) logger.info(self.class) { "Going to send signal #{signal} to process #{process.pid}" } process.send_signal signal end
stop_processes(processes_list)
click to toggle source
# File lib/spool/pool.rb, line 161 def stop_processes(processes_list) processes_list.each do |p| begin send_signal_to p, configuration.stop_signal zombie_processes << p rescue Exception => e log_error e end end working_processes.delete_if{ |p| zombie_processes.include? p } end
wait_for_stopped(processes_list)
click to toggle source
# File lib/spool/pool.rb, line 174 def wait_for_stopped(processes_list) while processes_list.any?(&:alive?) sleep 0.01 end clear_dead_processes end