class PerfectQueue::Multiprocess::ThreadProcessor

Public Class Methods

new(runner, processor_id, config) click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 23
def initialize(runner, processor_id, config)
  @runner = runner
  @processor_id = processor_id

  @running_flag = BlockingFlag.new
  @finish_flag = BlockingFlag.new

  @tm = TaskMonitor.new(config, method(:child_heartbeat), method(:force_stop))

  restart(false, config)
end

Public Instance Methods

force_stop() click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 76
def force_stop
  @log.error "Force stopping processor processor_id=#{@processor_id}"
  @tm.stop_task(true)
  @finish_flag.set!
end
join() click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 45
def join
  while t = @thread
    t.join
  end
end
keepalive() click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 51
def keepalive
  unless @thread
    @thread = Thread.new(&method(:run))
  end
end
logrotated() click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 82
def logrotated
  # do nothing
end
restart(immediate, config) click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 57
def restart(immediate, config)
  @poll_interval = config[:poll_interval] || 1.0
  @log = config[:logger]
  @task_prefetch = config[:task_prefetch] || 0
  @config = config

  @tm.stop_task(immediate)

  @finish_flag.set_region do
    @running_flag.wait while @running_flag.set?
  end
end
run() click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 35
def run
  @tm.start
  @running_flag.set_region do
    run_loop
  end
  @tm.join
ensure
  @thread = nil
end
stop(immediate) click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 70
def stop(immediate)
  @log.info immediate ? "Stopping thread immediately id=#{@processor_id}" : "Stopping thread gracefully id=#{@processor_id}"
  @tm.stop_task(immediate)
  @finish_flag.set!
end

Private Instance Methods

child_heartbeat() click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 87
def child_heartbeat
  # do nothing
end
process(task) click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 123
def process(task)
  @log.info "acquired task task=#{task.key} id=#{@processor_id}: #{task.inspect}"
  begin
    r = @runner.new(task)
    @tm.set_task(task, r)
    begin
      r.run
    ensure
      @tm.task_finished(task)
    end
    @log.info "completed processing task=#{task.key} id=#{@processor_id}:"
  rescue
    @log.error "unexpectedly failed task=#{task.key} id=#{@processor_id}: #{$!.class}: #{$!}"
    $!.backtrace.each {|bt| @log.warn "\t#{bt}" }
    raise  # force exit
  end
end
run_loop() click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 91
def run_loop
  PerfectQueue.open(@config) {|queue|
    until @finish_flag.set?
      tasks = queue.poll_multi(:max_acquire=>1+@task_prefetch)
      if tasks == nil || tasks.empty?
        @finish_flag.wait(@poll_interval)
      else
        begin
          while task = tasks.shift
            process(task)
          end
        ensure
          # TODO do not call release! because rdb_compat backend
          #      doesn't have a mechanism to detect preemption.
          #      release! could cause a problem that multiple
          #      workers run one task concurrently.
          #tasks.each {|task|
          #  # ignoring errors doesn't cause serious problems
          #  # because it's same as failure of this server.
          #  task.release! rescue nil
          #}
        end
      end
    end
  }
rescue
  @log.error "Unknown error #{$!.class}: #{$!}: Exiting thread id=#{@processor_id}"
  $!.backtrace.each {|bt| @log.warn "\t#{bt}" }
ensure
  @tm.stop
end