class PerfectQueue::Multiprocess::ThreadProcessor
Public Class Methods
new(runner, processor_id, config)
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 23 def initialize(runner, processor_id, config) @runner = runner @processor_id = processor_id @running_flag = BlockingFlag.new @finish_flag = BlockingFlag.new @tm = TaskMonitor.new(config, method(:child_heartbeat), method(:force_stop)) restart(false, config) end
Public Instance Methods
force_stop()
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 76 def force_stop @log.error "Force stopping processor processor_id=#{@processor_id}" @tm.stop_task(true) @finish_flag.set! end
join()
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 45 def join while t = @thread t.join end end
keepalive()
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 51 def keepalive unless @thread @thread = Thread.new(&method(:run)) end end
logrotated()
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 82 def logrotated # do nothing end
restart(immediate, config)
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 57 def restart(immediate, config) @poll_interval = config[:poll_interval] || 1.0 @log = config[:logger] @task_prefetch = config[:task_prefetch] || 0 @config = config @tm.stop_task(immediate) @finish_flag.set_region do @running_flag.wait while @running_flag.set? end end
run()
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 35 def run @tm.start @running_flag.set_region do run_loop end @tm.join ensure @thread = nil end
stop(immediate)
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 70 def stop(immediate) @log.info immediate ? "Stopping thread immediately id=#{@processor_id}" : "Stopping thread gracefully id=#{@processor_id}" @tm.stop_task(immediate) @finish_flag.set! end
Private Instance Methods
child_heartbeat()
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 87 def child_heartbeat # do nothing end
process(task)
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 123 def process(task) @log.info "acquired task task=#{task.key} id=#{@processor_id}: #{task.inspect}" begin r = @runner.new(task) @tm.set_task(task, r) begin r.run ensure @tm.task_finished(task) end @log.info "completed processing task=#{task.key} id=#{@processor_id}:" rescue @log.error "unexpectedly failed task=#{task.key} id=#{@processor_id}: #{$!.class}: #{$!}" $!.backtrace.each {|bt| @log.warn "\t#{bt}" } raise # force exit end end
run_loop()
click to toggle source
# File lib/perfectqueue/multiprocess/thread_processor.rb, line 91 def run_loop PerfectQueue.open(@config) {|queue| until @finish_flag.set? tasks = queue.poll_multi(:max_acquire=>1+@task_prefetch) if tasks == nil || tasks.empty? @finish_flag.wait(@poll_interval) else begin while task = tasks.shift process(task) end ensure # TODO do not call release! because rdb_compat backend # doesn't have a mechanism to detect preemption. # release! could cause a problem that multiple # workers run one task concurrently. #tasks.each {|task| # # ignoring errors doesn't cause serious problems # # because it's same as failure of this server. # task.release! rescue nil #} end end end } rescue @log.error "Unknown error #{$!.class}: #{$!}: Exiting thread id=#{@processor_id}" $!.backtrace.each {|bt| @log.warn "\t#{bt}" } ensure @tm.stop end