class Rabbithole::Worker
Attributes
number_of_threads[R]
Public Class Methods
new(number_of_threads = 1)
click to toggle source
# File lib/rabbithole/worker.rb, line 5 def initialize(number_of_threads = 1) @number_of_threads = number_of_threads @channel = Connection.create_channel(number_of_threads) @channel.prefetch(number_of_threads * 5) end
Public Instance Methods
join()
click to toggle source
# File lib/rabbithole/worker.rb, line 20 def join @channel.work_pool.join end
listen_to_queue(queue_name)
click to toggle source
# File lib/rabbithole/worker.rb, line 11 def listen_to_queue(queue_name) queue = Connection.queue(queue_name, @channel) start_consumer(queue) end
stop_listening()
click to toggle source
# File lib/rabbithole/worker.rb, line 16 def stop_listening @channel.consumers.values.each(&:cancel) end
Private Instance Methods
start_consumer(queue)
click to toggle source
# File lib/rabbithole/worker.rb, line 25 def start_consumer(queue) queue.subscribe(:ack => true, :block => false) do |delivery_info, properties, payload| data = MessagePack.unpack(payload) begin Object.const_get(data['klass']).perform(*data['args']) @channel.acknowledge(delivery_info.delivery_tag, false) rescue => e @channel.reject(delivery_info.delivery_tag, !delivery_info.redelivered) ErrorHandler.handle(e, queue.name, payload) end end end