class Rabbithole::Worker

Attributes

number_of_threads[R]

Public Class Methods

new(number_of_threads = 1) click to toggle source
# File lib/rabbithole/worker.rb, line 5
def initialize(number_of_threads = 1)
  @number_of_threads = number_of_threads
  @channel           = Connection.create_channel(number_of_threads)
  @channel.prefetch(number_of_threads * 5)
end

Public Instance Methods

join() click to toggle source
# File lib/rabbithole/worker.rb, line 20
def join
  @channel.work_pool.join
end
listen_to_queue(queue_name) click to toggle source
# File lib/rabbithole/worker.rb, line 11
def listen_to_queue(queue_name)
  queue   = Connection.queue(queue_name, @channel)
  start_consumer(queue)
end
stop_listening() click to toggle source
# File lib/rabbithole/worker.rb, line 16
def stop_listening
  @channel.consumers.values.each(&:cancel)
end

Private Instance Methods

start_consumer(queue) click to toggle source
# File lib/rabbithole/worker.rb, line 25
def start_consumer(queue)
  queue.subscribe(:ack => true, :block => false) do |delivery_info, properties, payload|
    data = MessagePack.unpack(payload)
    begin
      Object.const_get(data['klass']).perform(*data['args'])
      @channel.acknowledge(delivery_info.delivery_tag, false)
    rescue => e
      @channel.reject(delivery_info.delivery_tag, !delivery_info.redelivered)
      ErrorHandler.handle(e, queue.name, payload)
    end
  end
end