class PikaQue::Processor

Public Class Methods

new(opts = {}) click to toggle source
# File lib/pika_que/processor.rb, line 9
def initialize(opts = {})
  @opts = PikaQue.config.merge(opts)
  @broker = PikaQue::Broker.new(self, @opts).tap{ |b| b.start }
  @pool = Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
  proc_config = @opts.merge({ broker: @broker, worker_pool: @pool })
  @workers = @opts.fetch(:workers, []).map{ |w| PikaQue::Util.constantize(w).new(proc_config) }
  @thread = nil
end

Public Instance Methods

process() click to toggle source
# File lib/pika_que/processor.rb, line 23
def process
  @workers.each(&:run)
end
setup() click to toggle source
# File lib/pika_que/processor.rb, line 18
def setup
  logger.info "setting up processor with workers: #{@workers.map(&:class)}"
  @workers.each(&:prepare)
end
start() click to toggle source
# File lib/pika_que/processor.rb, line 27
def start
  @thread = Thread.new do
    Thread.current['label'] = 'processor'
    setup
    process
  end.tap{ |t| t.abort_on_exception = true }
end
stop() click to toggle source
# File lib/pika_que/processor.rb, line 35
def stop
  @workers.each(&:stop)

  @pool.shutdown
  @pool.wait_for_termination 12

  @broker.cleanup(true)
  @broker.stop
end