class DisqueJockey::WorkerGroup

Public Class Methods

new(worker_classes = []) click to toggle source
# File lib/disque_jockey/worker_group.rb, line 7
def initialize(worker_classes = [])
  @worker_classes = worker_classes # array of classes to instantiate in our group
end

Public Instance Methods

work!() click to toggle source
# File lib/disque_jockey/worker_group.rb, line 11
def work!
  register_signal_handlers
  Supervisor.logger.info("Starting worker group with PID #{Process.pid}...")
  start_workers
  work_until_signal
end

Private Instance Methods

handle_signals() click to toggle source

Deal with signals we receive from the OS by logging the signal and then killing the worker group

# File lib/disque_jockey/worker_group.rb, line 48
def handle_signals
  signal = Thread.main[:signal_queue].shift
  if signal
    Supervisor.logger.info("Received signal #{signal}. Shutting down worker group with PID #{Process.pid}...")
    return true
  end
end
register_signal_handlers() click to toggle source

Register signal handlers to shut down the worker group.

# File lib/disque_jockey/worker_group.rb, line 29
def register_signal_handlers
  Thread.main[:signal_queue] = []
  %w(QUIT TERM INT ABRT).each do |signal|
    # This needs to be reentrant, so we queue up signals to be handled
    # in the run loop, rather than acting on signals here
    trap(signal) { Thread.main[:signal_queue] << signal }
  end
end
start_workers() click to toggle source

for each worker class, create a worker pool and have them start fetching jobs and working

# File lib/disque_jockey/worker_group.rb, line 40
def start_workers
  @worker_classes.each do |worker_class|
    Thread.new { WorkerPool.new(worker_class).work! }
  end
end
work_until_signal() click to toggle source

This loop is in a method so that we can stub it in tests

# File lib/disque_jockey/worker_group.rb, line 21
def work_until_signal
  loop do 
    break if handle_signals
    sleep(0.1)
  end
end