class PerfectQueue::Engine
Attributes
processors[R]
Public Class Methods
new(runner, config)
click to toggle source
# File lib/perfectqueue/engine.rb, line 22 def initialize(runner, config) @runner = runner @finish_flag = BlockingFlag.new processor_type = config[:processor_type] || :process case processor_type.to_sym when :process @processor_class = Multiprocess::ForkProcessor when :thread @processor_class = Multiprocess::ThreadProcessor else raise ConfigError, "Unknown processor_type: #{config[:processor_type].inspect}" end @processors = [] restart(false, config) end
Public Instance Methods
join()
click to toggle source
# File lib/perfectqueue/engine.rb, line 96 def join @processors.each {|c| c.join } self end
logrotated()
click to toggle source
# File lib/perfectqueue/engine.rb, line 113 def logrotated @processors.each {|c| c.logrotated } end
replace(immediate, command=[$0]+ARGV)
click to toggle source
# File lib/perfectqueue/engine.rb, line 106 def replace(immediate, command=[$0]+ARGV) return if @replaced_pid stop(immediate) @replaced_pid = Process.spawn(*command) self end
restart(immediate, config)
click to toggle source
# File lib/perfectqueue/engine.rb, line 43 def restart(immediate, config) return nil if @finish_flag.set? # TODO connection check @log = config[:logger] || Logger.new(STDERR) num_processors = config[:processors] || 1 # scaling extra = num_processors - @processors.length if extra > 0 extra.times do @processors << @processor_class.new(@runner, @processors.size+1, config) end elsif extra < 0 (-extra).times do c = @processors.shift c.stop(immediate) c.join end extra = 0 end @processors[0..(-extra-1)].each {|c| c.restart(immediate, config) } @child_keepalive_interval = (config[:child_keepalive_interval] || config[:child_heartbeat_interval] || 2).to_i self end
run()
click to toggle source
# File lib/perfectqueue/engine.rb, line 76 def run @processors.each {|c| c.keepalive # add wait time before starting processors to avoid # a spike of the number of concurrent connections. sleep rand*2 # upto 2 second, average 1 seoncd } until @finish_flag.set? @processors.each {|c| c.keepalive } @finish_flag.wait(@child_keepalive_interval) end join end
shutdown(immediate)
click to toggle source
# File lib/perfectqueue/engine.rb, line 101 def shutdown(immediate) stop(immediate) join end
stop(immediate)
click to toggle source
# File lib/perfectqueue/engine.rb, line 90 def stop(immediate) @processors.each {|c| c.stop(immediate) } @finish_flag.set! self end