class PreforkEngine
Constants
- VERSION
Attributes
manager_pid[R]
signal_received[R]
Public Class Methods
new(options={})
click to toggle source
# File lib/prefork_engine.rb, line 9 def initialize(options={}) defaults = { "max_workers" => 10, "spawn_interval" => 0, "err_respawn_interval" => 1, "trap_signals" => { "TERM" => "TERM" }, "before_fork" => nil, "after_fork" => nil, "on_child_reap" => nil, } @options = defaults.merge(options) @signal_received = "" @manager_pid = "" @generation = 0 @_no_adjust_until = 0.0 @in_child = false @worker_pids = {} @delayed_task = nil @options["trap_signals"].each do |k,kv| Signal.trap(k) do |signo| @signal_received = Signal.signame(signo) end end Signal.trap("CHLD") do #do nothing end end
Public Instance Methods
_action_for(sig)
click to toggle source
# File lib/prefork_engine.rb, line 144 def _action_for(sig) return nil if !@options["trap_signals"].has_key?(sig) t = @options["trap_signals"][sig] t = [t,0] if !t.kind_of?(Enumerable) return t end
_decide_action()
click to toggle source
# File lib/prefork_engine.rb, line 124 def _decide_action return 1 if self.num_workers < @options["max_workers"] return 0 end
_handle_delayed_task()
click to toggle source
# File lib/prefork_engine.rb, line 135 def _handle_delayed_task while true return nil if !@delayed_task timeleft = @delayed_task_at - Time.now.to_f return timeleft if timeleft > 0 @delayed_task.call end end
_max_wait()
click to toggle source
# File lib/prefork_engine.rb, line 206 def _max_wait return nil end
_on_child_reap(pid,status)
click to toggle source
# File lib/prefork_engine.rb, line 129 def _on_child_reap(pid,status) if @options["on_child_reap"] @options["on_child_reap"].call(pid,status) end end
_update_spawn_delay(secs)
click to toggle source
# File lib/prefork_engine.rb, line 175 def _update_spawn_delay(secs) @_no_adjust_until = secs ? Time.now.to_f + secs : 0.0 end
_wait(block)
click to toggle source
# File lib/prefork_engine.rb, line 179 def _wait(block) if !block self._handle_delayed_task() return Process.wait3(Process::WNOHANG) else delayed_task_sleep = self._handle_delayed_task() delayed_fork_sleep = self._decide_action > 0 ? [@_no_adjust_until - Time.now.to_f,0].max : nil sleep_secs = [delayed_task_sleep,delayed_fork_sleep,self._max_wait].compact begin if sleep_secs.min != nil sleep(sleep_secs.min) # nonblock return Process.wait3(Process::WNOHANG) else #block return Process.wait3(0) end rescue Errno::EINTR # wait for timer thread? sleep 0.02 rescue Errno::ECHILD # nothing end return nil end end
num_workers()
click to toggle source
# File lib/prefork_engine.rb, line 120 def num_workers return @worker_pids.keys.length end
signal_all_children(sig)
click to toggle source
# File lib/prefork_engine.rb, line 114 def signal_all_children(sig) @worker_pids.keys.sort.each do |pid| Process.kill(sig,pid) end end
start(&block)
click to toggle source
# File lib/prefork_engine.rb, line 39 def start(&block) @manager_pid = $$ @signal_received = "" @generation += 1 raise "cannot start another process while you are in child process" if @in_child # main loop while @signal_received.length == 0 action = @_no_adjust_until <= Time.now.to_f ? self._decide_action() : 0 if action > 0 # start a new worker if @options["before_fork"] @options["before_fork"].call(self) end pid = nil begin pid = fork rescue => e # fork failed warn "fork failed:#{e}" self._update_spawn_delay(@options["err_respawn_interval"]) next end if pid == nil @in_child = true @options["trap_signals"].each do |k,kv| ## Signal.trap(k, 0) #XXX in rspec only? Signal.trap(k, "DEFAULT") end ## Signal.trap("CHLD", 0) #XXX in rspec only? Signal.trap("CHLD", "DEFAULT") block.call exit!(true) end # parent if @options["after_fork"] @options["after_fork"].call(self) end @worker_pids[pid] = @generation self._update_spawn_delay(@options["spawn_interval"]) end if r = self._wait(action <= 0) self._on_child_reap(r.pid, r.status) if @worker_pids.delete(r.pid) == @generation && r.status != 0 self._update_spawn_delay(@options["err_respawn_interval"]) end end end # send signals to workers if action = self._action_for(@signal_received) sig = action[0] interval = action[1] if interval > 0 pids = @worker_pids.keys.sort @delayed_task = proc { pid = pids.shift Process.kill(sig, pid) if pids.empty? @delayed_task = nil @delayed_task_at = nil else @delayed_task_at = Time.now.to_f + interval end } @delayed_task_at = 0.0 @delayed_task.call else self.signal_all_children(sig) end end return true end
wait_all_children(timeout = 0)
click to toggle source
# File lib/prefork_engine.rb, line 151 def wait_all_children(timeout = 0) wait_loop = proc { while !@worker_pids.keys.empty? if r = self._wait(true) if @worker_pids.delete(r.pid) self._on_child_reap(r.pid, r.status) end end end } if timeout > 0 begin Timeout.timeout(timeout){ wait_loop.call } rescue Timeout::Error # ignore end else wait_loop.call() end return self.num_workers(); end