class PerfectQueue::Engine

Attributes

processors[R]

Public Class Methods

new(runner, config) click to toggle source
# File lib/perfectqueue/engine.rb, line 22
def initialize(runner, config)
  @runner = runner

  @finish_flag = BlockingFlag.new

  processor_type = config[:processor_type] || :process
  case processor_type.to_sym
  when :process
    @processor_class = Multiprocess::ForkProcessor
  when :thread
    @processor_class = Multiprocess::ThreadProcessor
  else
    raise ConfigError, "Unknown processor_type: #{config[:processor_type].inspect}"
  end

  @processors = []
  restart(false, config)
end

Public Instance Methods

join() click to toggle source
# File lib/perfectqueue/engine.rb, line 96
def join
  @processors.each {|c| c.join }
  self
end
logrotated() click to toggle source
# File lib/perfectqueue/engine.rb, line 113
def logrotated
  @processors.each {|c| c.logrotated }
end
replace(immediate, command=[$0]+ARGV) click to toggle source
# File lib/perfectqueue/engine.rb, line 106
def replace(immediate, command=[$0]+ARGV)
  return if @replaced_pid
  stop(immediate)
  @replaced_pid = Process.spawn(*command)
  self
end
restart(immediate, config) click to toggle source
# File lib/perfectqueue/engine.rb, line 43
def restart(immediate, config)
  return nil if @finish_flag.set?

  # TODO connection check

  @log = config[:logger] || Logger.new(STDERR)

  num_processors = config[:processors] || 1

  # scaling
  extra = num_processors - @processors.length
  if extra > 0
    extra.times do
      @processors << @processor_class.new(@runner, @processors.size+1, config)
    end
  elsif extra < 0
    (-extra).times do
      c = @processors.shift
      c.stop(immediate)
      c.join
    end
    extra = 0
  end

  @processors[0..(-extra-1)].each {|c|
    c.restart(immediate, config)
  }

  @child_keepalive_interval = (config[:child_keepalive_interval] || config[:child_heartbeat_interval] || 2).to_i

  self
end
run() click to toggle source
# File lib/perfectqueue/engine.rb, line 76
def run
  @processors.each {|c|
    c.keepalive
    # add wait time before starting processors to avoid
    # a spike of the number of concurrent connections.
    sleep rand*2  # upto 2 second, average 1 seoncd
  }
  until @finish_flag.set?
    @processors.each {|c| c.keepalive }
    @finish_flag.wait(@child_keepalive_interval)
  end
  join
end
shutdown(immediate) click to toggle source
# File lib/perfectqueue/engine.rb, line 101
def shutdown(immediate)
  stop(immediate)
  join
end
stop(immediate) click to toggle source
# File lib/perfectqueue/engine.rb, line 90
def stop(immediate)
  @processors.each {|c| c.stop(immediate) }
  @finish_flag.set!
  self
end