class PikaQue::DelayWorker

Attributes

broker[RW]
handler[RW]
pool[RW]
queue[RW]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/pika_que/delay_worker.rb, line 6
def initialize(opts = {})
  @opts = PikaQue.config.merge(opts)
  @broker = @opts[:broker] || PikaQue::Broker.new(nil, @opts).tap{ |b| b.start }
  @pool = @opts[:worker_pool] || Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
  @delay_name = "#{@opts[:exchange]}-delay"
end

Public Instance Methods

logger() click to toggle source
# File lib/pika_que/delay_worker.rb, line 50
def logger
  PikaQue.logger
end
prepare() click to toggle source
# File lib/pika_que/delay_worker.rb, line 13
def prepare
  @queue = broker.queue(@delay_name, @opts[:queue_options])

  @handler = broker.handler(@opts[:handler_class], @opts[:handler_options])
  # TODO use routing keys?
  @handler.bind_queue(@queue, @queue.name)
end
run() click to toggle source
# File lib/pika_que/delay_worker.rb, line 21
def run
  @consumer = queue.subscribe(:block => false, :manual_ack => @opts[:ack]) do | delivery_info, metadata, msg |
    pool.post do
      work(delivery_info, metadata, msg)
    end
  end
end
start() click to toggle source
# File lib/pika_que/delay_worker.rb, line 29
def start
  prepare
  run
end
stop() click to toggle source
# File lib/pika_que/delay_worker.rb, line 34
def stop
  @consumer.cancel if @consumer
  @consumer = nil

  unless @opts[:worker_pool]
    @pool.shutdown
    @pool.wait_for_termination 12
  end
  broker.cleanup
  broker.stop
end
work(delivery_info, metadata, msg) click to toggle source
# File lib/pika_que/delay_worker.rb, line 46
def work(delivery_info, metadata, msg)
  handler.handle(:ack, broker.channel, delivery_info, metadata, msg)
end