class PikaQue::DelayWorker
Attributes
broker[RW]
handler[RW]
pool[RW]
queue[RW]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/pika_que/delay_worker.rb, line 6 def initialize(opts = {}) @opts = PikaQue.config.merge(opts) @broker = @opts[:broker] || PikaQue::Broker.new(nil, @opts).tap{ |b| b.start } @pool = @opts[:worker_pool] || Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1) @delay_name = "#{@opts[:exchange]}-delay" end
Public Instance Methods
logger()
click to toggle source
# File lib/pika_que/delay_worker.rb, line 50 def logger PikaQue.logger end
prepare()
click to toggle source
# File lib/pika_que/delay_worker.rb, line 13 def prepare @queue = broker.queue(@delay_name, @opts[:queue_options]) @handler = broker.handler(@opts[:handler_class], @opts[:handler_options]) # TODO use routing keys? @handler.bind_queue(@queue, @queue.name) end
run()
click to toggle source
# File lib/pika_que/delay_worker.rb, line 21 def run @consumer = queue.subscribe(:block => false, :manual_ack => @opts[:ack]) do | delivery_info, metadata, msg | pool.post do work(delivery_info, metadata, msg) end end end
start()
click to toggle source
# File lib/pika_que/delay_worker.rb, line 29 def start prepare run end
stop()
click to toggle source
# File lib/pika_que/delay_worker.rb, line 34 def stop @consumer.cancel if @consumer @consumer = nil unless @opts[:worker_pool] @pool.shutdown @pool.wait_for_termination 12 end broker.cleanup broker.stop end
work(delivery_info, metadata, msg)
click to toggle source
# File lib/pika_que/delay_worker.rb, line 46 def work(delivery_info, metadata, msg) handler.handle(:ack, broker.channel, delivery_info, metadata, msg) end