class DisqueJockey::WorkerPool
Public Class Methods
new(worker_class)
click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 4 def initialize(worker_class) @worker_class = worker_class @pool = Queue.new @broker = Broker.new(DisqueJockey.configuration.nodes) build_worker_pool end
Public Instance Methods
work!()
click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 11 def work! endless_loop do # fetching from broker blocks until a job is returned _, job_id, job = @broker.fetch_message_from(@worker_class.queue_name) with_worker do |worker| Thread.new { handle_job(worker, job, job_id) } end end end
Private Instance Methods
build_worker_pool()
click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 49 def build_worker_pool @worker_class.thread_count.times { @pool << @worker_class.new(Logger) } end
endless_loop() { || ... }
click to toggle source
this method exists so we can stub the endless loop in tests
# File lib/disque_jockey/worker_pool.rb, line 26 def endless_loop loop { yield } end
handle_job(worker, job, job_id)
click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 37 def handle_job(worker, job, job_id) begin Timeout::timeout(@worker_class.timeout_seconds) { worker.handle(job) } @worker_class.use_fast_ack ? @broker.fast_acknowledge(job_id) : @broker.acknowledge(job_id) rescue StandardError => exception worker.log_exception(exception) # TODO: Need to implement retry logic # Also should do more helpful logging around worker timeouts # (explain the error, log the job and maybe metadata) end end
with_worker() { |worker| ... }
click to toggle source
# File lib/disque_jockey/worker_pool.rb, line 30 def with_worker # @pool.pop will block until a worker becomes available worker = @pool.pop yield worker @pool.push(worker) end