class DisqueJockey::WorkerGroup
Public Class Methods
new(worker_classes = [])
click to toggle source
# File lib/disque_jockey/worker_group.rb, line 7 def initialize(worker_classes = []) @worker_classes = worker_classes # array of classes to instantiate in our group end
Public Instance Methods
work!()
click to toggle source
# File lib/disque_jockey/worker_group.rb, line 11 def work! register_signal_handlers Supervisor.logger.info("Starting worker group with PID #{Process.pid}...") start_workers work_until_signal end
Private Instance Methods
handle_signals()
click to toggle source
Deal with signals we receive from the OS by logging the signal and then killing the worker group
# File lib/disque_jockey/worker_group.rb, line 48 def handle_signals signal = Thread.main[:signal_queue].shift if signal Supervisor.logger.info("Received signal #{signal}. Shutting down worker group with PID #{Process.pid}...") return true end end
register_signal_handlers()
click to toggle source
Register signal handlers to shut down the worker group.
# File lib/disque_jockey/worker_group.rb, line 29 def register_signal_handlers Thread.main[:signal_queue] = [] %w(QUIT TERM INT ABRT).each do |signal| # This needs to be reentrant, so we queue up signals to be handled # in the run loop, rather than acting on signals here trap(signal) { Thread.main[:signal_queue] << signal } end end
start_workers()
click to toggle source
for each worker class, create a worker pool and have them start fetching jobs and working
# File lib/disque_jockey/worker_group.rb, line 40 def start_workers @worker_classes.each do |worker_class| Thread.new { WorkerPool.new(worker_class).work! } end end
work_until_signal()
click to toggle source
This loop is in a method so that we can stub it in tests
# File lib/disque_jockey/worker_group.rb, line 21 def work_until_signal loop do break if handle_signals sleep(0.1) end end