class PerfectQueue::Multiprocess::ForkProcessor

Constants

INTER_FORK_LOCK

Public Class Methods

new(runner, processor_id, config) click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 23
def initialize(runner, processor_id, config)
  @runner = runner
  @processor_id = processor_id

  @stop = false
  @cpm = nil
  @last_fork_time = 0

  restart(false, config)
end

Public Instance Methods

join() click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 97
def join
  while !try_join
    sleep (@child_kill_interval+1) / 2  # TODO
  end
  self
end
keepalive() click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 56
def keepalive
  if @stop
    try_join
    return
  end

  if c = @cpm
    if c.killing_status != true
      # don't check status if killing status is immediate-killing
      begin
        # receive heartbeat
        keptalive = c.check_heartbeat(@child_heartbeat_limit)
        if !keptalive
          @log.error "Heartbeat broke out. Restarting child process id=#{@processor_id} pid=#{c.pid}."
          c.start_killing(true)
        end
      rescue EOFError
        @log.error "Heartbeat pipe is closed. Restarting child process id=#{@processor_id} pid=#{c.pid}."
        c.start_killing(true, @child_heartbeat_kill_delay)
      rescue
        @log.error "Unknown error: #{$!.class}: #{$!}: Restarting child process id=#{@processor_id} pid=#{c.pid}."
        $!.backtrace.each {|bt| @log.warn "\t#{bt}" }
        c.start_killing(true, @child_heartbeat_kill_delay)
      end
    end

    try_join
  end

  unless @cpm
    begin
      @cpm = fork_child
    rescue
      @log.error "Failed to fork child process id=#{@processor_id}: #{$!.class}: #{$!}"
      $!.backtrace.each {|bt| @log.warn "\t#{bt}" }
    end
  end

  nil
end
logrotated() click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 104
def logrotated
  if c = @cpm
    c.send_signal(:CONT)
  end
end
restart(immediate, config) click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 34
def restart(immediate, config)
  @child_heartbeat_limit = config[:child_heartbeat_limit] || 60.0
  @child_kill_interval = config[:child_kill_interval] || 2.0
  @child_graceful_kill_limit = config[:child_graceful_kill_limit] || nil
  @child_fork_frequency_limit = config[:child_fork_frequency_limit] || 5.0
  @child_heartbeat_kill_delay = config[:child_heartbeat_kill_delay] || 10
  @log = config[:logger]
  @config = config  # for child process

  if c = @cpm
    c.start_killing(immediate)
  end
end
stop(immediate) click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 48
def stop(immediate)
  @stop = true
  if c = @cpm
    c.start_killing(immediate)
  end
  self
end

Private Instance Methods

fork_child() click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 126
def fork_child
  now = Time.now.to_f
  if now - @last_fork_time < @child_fork_frequency_limit
    @log.info "Tried to fork child #{now-@last_fork_time} seconds ago < #{@child_fork_frequency_limit}. Waiting... id=#{@processor_id}"
    return nil
  end
  @last_fork_time = now

  # set process name
  @runner.before_fork if @runner.respond_to?(:before_fork)  # TODO exception handling

  INTER_FORK_LOCK.lock
  begin
    rpipe, wpipe = IO.pipe
    rpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    wpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
  ensure
    INTER_FORK_LOCK.unlock
  end

  pid = fork do
    #STDIN.close
    # pass-through STDOUT
    # pass-through STDERR
    rpipe.close

    $0 = "perfectqueue:#{@runner} #{@processor_id}"

    @runner.after_fork if @runner.respond_to?(:after_fork)

    begin
      ChildProcess.run(@runner, @processor_id, @config, wpipe)
    ensure
      @runner.after_child_end if @runner.respond_to?(:after_child_end)  # TODO exception handling
    end

    exit! 0
  end

  @log.info "Worker process started. pid=#{pid}"

  wpipe.close

  ChildProcessMonitor.new(@log, pid, rpipe)
end
try_join() click to toggle source
# File lib/perfectqueue/multiprocess/fork_processor.rb, line 111
def try_join
  unless @cpm
    return true
  end
  if @cpm.try_join(@child_kill_interval, @child_graceful_kill_limit)
    @cpm.cleanup
    @cpm = nil
    return true
  else
    return false
  end
end