class PikaQue::Processor
Public Class Methods
new(opts = {})
click to toggle source
# File lib/pika_que/processor.rb, line 9 def initialize(opts = {}) @opts = PikaQue.config.merge(opts) @broker = PikaQue::Broker.new(self, @opts).tap{ |b| b.start } @pool = Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1) proc_config = @opts.merge({ broker: @broker, worker_pool: @pool }) @workers = @opts.fetch(:workers, []).map{ |w| PikaQue::Util.constantize(w).new(proc_config) } @thread = nil end
Public Instance Methods
process()
click to toggle source
# File lib/pika_que/processor.rb, line 23 def process @workers.each(&:run) end
setup()
click to toggle source
# File lib/pika_que/processor.rb, line 18 def setup logger.info "setting up processor with workers: #{@workers.map(&:class)}" @workers.each(&:prepare) end
start()
click to toggle source
# File lib/pika_que/processor.rb, line 27 def start @thread = Thread.new do Thread.current['label'] = 'processor' setup process end.tap{ |t| t.abort_on_exception = true } end
stop()
click to toggle source
# File lib/pika_que/processor.rb, line 35 def stop @workers.each(&:stop) @pool.shutdown @pool.wait_for_termination 12 @broker.cleanup(true) @broker.stop end