class RocketJob::WorkerPool
Attributes
server_name[R]
workers[R]
Public Class Methods
new(server_name)
click to toggle source
# File lib/rocket_job/worker_pool.rb, line 10 def initialize(server_name) @server_name = server_name @workers = Concurrent::Array.new @worker_id = 0 end
Public Instance Methods
find(id)
click to toggle source
Find a worker in the list by its id
# File lib/rocket_job/worker_pool.rb, line 17 def find(id) workers.find { |worker| worker.id == id } end
join(timeout = 5)
click to toggle source
Wait for all workers to stop. Return [true] if all workers stopped Return [false] on timeout
# File lib/rocket_job/worker_pool.rb, line 70 def join(timeout = 5) while (worker = workers.first) return false unless worker.join(timeout) # Worker thread is dead workers.shift end true end
kill()
click to toggle source
Kill Worker
threads
# File lib/rocket_job/worker_pool.rb, line 62 def kill workers.each(&:kill) workers.clear end
living_count()
click to toggle source
Returns [Integer] number of workers (threads) that are alive
# File lib/rocket_job/worker_pool.rb, line 81 def living_count workers.count(&:alive?) end
log_backtraces()
click to toggle source
# File lib/rocket_job/worker_pool.rb, line 85 def log_backtraces workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? } end
prune()
click to toggle source
Returns [Integer] number of dead workers removed.
# File lib/rocket_job/worker_pool.rb, line 47 def prune remove_count = workers.count - living_count return 0 if remove_count.zero? logger.info "Cleaned up #{remove_count} dead workers" workers.delete_if { |t| !t.alive? } remove_count end
rebalance(max_workers, stagger_start = false)
click to toggle source
Add new workers to get back to the `max_workers` if not already at `max_workers`
Parameters stagger_start Whether to stagger when the workers poll for work the first time. It spreads out the queue polling over the max_poll_seconds so that not all workers poll at the same time. The worker also responds faster than max_poll_seconds when a new job is created.
# File lib/rocket_job/worker_pool.rb, line 28 def rebalance(max_workers, stagger_start = false) count = max_workers.to_i - living_count return 0 unless count.positive? logger.info("#{'Stagger ' if stagger_start}Starting #{count} workers") add_one count -= 1 delay = Config.max_poll_seconds.to_f / max_workers count.times.each do sleep(delay) if stagger_start return -1 if Supervisor.shutdown? add_one end end
stop()
click to toggle source
Tell all workers to stop working.
# File lib/rocket_job/worker_pool.rb, line 57 def stop workers.each(&:shutdown!) end
Private Instance Methods
add_one()
click to toggle source
# File lib/rocket_job/worker_pool.rb, line 91 def add_one workers << ThreadWorker.new(id: next_worker_id, server_name: server_name) rescue StandardError => e logger.fatal("Cannot start worker", e) end
next_worker_id()
click to toggle source
# File lib/rocket_job/worker_pool.rb, line 97 def next_worker_id @worker_id += 1 end