class DisqueJockey::WorkerPool

Public Class Methods

new(worker_class) click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 4
def initialize(worker_class)
  @worker_class = worker_class
  @pool = Queue.new
  @broker = Broker.new(DisqueJockey.configuration.nodes)
  build_worker_pool
end

Public Instance Methods

work!() click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 11
def work!
  endless_loop do
    # fetching from broker blocks until a job is returned
    _, job_id, job = @broker.fetch_message_from(@worker_class.queue_name)

    with_worker do |worker|
      Thread.new { handle_job(worker, job, job_id) }
    end

  end
end

Private Instance Methods

build_worker_pool() click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 49
def build_worker_pool
  @worker_class.thread_count.times { @pool << @worker_class.new(Logger) }
end
endless_loop() { || ... } click to toggle source

this method exists so we can stub the endless loop in tests

# File lib/disque_jockey/worker_pool.rb, line 26
def endless_loop
  loop { yield }
end
handle_job(worker, job, job_id) click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 37
def handle_job(worker, job, job_id)
  begin
    Timeout::timeout(@worker_class.timeout_seconds) { worker.handle(job) }
    @worker_class.use_fast_ack ? @broker.fast_acknowledge(job_id) : @broker.acknowledge(job_id)
  rescue StandardError => exception
    worker.log_exception(exception)
    # TODO: Need to implement retry logic
    #       Also should do more helpful logging around worker timeouts
    #       (explain the error, log the job and maybe metadata)
  end
end
with_worker() { |worker| ... } click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 30
def with_worker
  # @pool.pop will block until a worker becomes available
  worker = @pool.pop
  yield worker
  @pool.push(worker)
end