class RocketJob::WorkerPool

Attributes

server_name[R]
workers[R]

Public Class Methods

new(server_name) click to toggle source
# File lib/rocket_job/worker_pool.rb, line 10
def initialize(server_name)
  @server_name = server_name
  @workers     = Concurrent::Array.new
  @worker_id   = 0
end

Public Instance Methods

find(id) click to toggle source

Find a worker in the list by its id

# File lib/rocket_job/worker_pool.rb, line 17
def find(id)
  workers.find { |worker| worker.id == id }
end
join(timeout = 5) click to toggle source

Wait for all workers to stop. Return [true] if all workers stopped Return [false] on timeout

# File lib/rocket_job/worker_pool.rb, line 70
def join(timeout = 5)
  while (worker = workers.first)
    return false unless worker.join(timeout)

    # Worker thread is dead
    workers.shift
  end
  true
end
kill() click to toggle source

Kill Worker threads

# File lib/rocket_job/worker_pool.rb, line 62
def kill
  workers.each(&:kill)
  workers.clear
end
living_count() click to toggle source

Returns [Integer] number of workers (threads) that are alive

# File lib/rocket_job/worker_pool.rb, line 81
def living_count
  workers.count(&:alive?)
end
log_backtraces() click to toggle source
# File lib/rocket_job/worker_pool.rb, line 85
def log_backtraces
  workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? }
end
prune() click to toggle source

Returns [Integer] number of dead workers removed.

# File lib/rocket_job/worker_pool.rb, line 47
def prune
  remove_count = workers.count - living_count
  return 0 if remove_count.zero?

  logger.info "Cleaned up #{remove_count} dead workers"
  workers.delete_if { |t| !t.alive? }
  remove_count
end
rebalance(max_workers, stagger_start = false) click to toggle source

Add new workers to get back to the `max_workers` if not already at `max_workers`

Parameters
  stagger_start
    Whether to stagger when the workers poll for work the first time.
    It spreads out the queue polling over the max_poll_seconds so
    that not all workers poll at the same time.
    The worker also responds faster than max_poll_seconds when a new job is created.
# File lib/rocket_job/worker_pool.rb, line 28
def rebalance(max_workers, stagger_start = false)
  count = max_workers.to_i - living_count
  return 0 unless count.positive?

  logger.info("#{'Stagger ' if stagger_start}Starting #{count} workers")

  add_one
  count -= 1
  delay = Config.max_poll_seconds.to_f / max_workers

  count.times.each do
    sleep(delay) if stagger_start
    return -1 if Supervisor.shutdown?

    add_one
  end
end
stop() click to toggle source

Tell all workers to stop working.

# File lib/rocket_job/worker_pool.rb, line 57
def stop
  workers.each(&:shutdown!)
end

Private Instance Methods

add_one() click to toggle source
# File lib/rocket_job/worker_pool.rb, line 91
def add_one
  workers << ThreadWorker.new(id: next_worker_id, server_name: server_name)
rescue StandardError => e
  logger.fatal("Cannot start worker", e)
end
next_worker_id() click to toggle source
# File lib/rocket_job/worker_pool.rb, line 97
def next_worker_id
  @worker_id += 1
end