module Nsqcd::WorkerGroup
Public Class Methods
new()
click to toggle source
# File lib/nsqcd/workergroup.rb, line 5 def initialize @stop_flag = ServerEngine::BlockingFlag.new end
Public Instance Methods
after_fork()
click to toggle source
# File lib/nsqcd/workergroup.rb, line 14 def after_fork # note! this is not Serverengine#after_start, this is ours! fafter = Nsqcd::CONFIG[:hooks][:after_fork] fafter.call if fafter end
before_fork()
click to toggle source
# File lib/nsqcd/workergroup.rb, line 9 def before_fork fbefore = Nsqcd::CONFIG[:hooks][:before_fork] fbefore.call if fbefore end
run()
click to toggle source
# File lib/nsqcd/workergroup.rb, line 19 def run after_fork # Allocate single thread pool if share_threads is set. This improves load balancing # when used with many workers. pool = config[:share_threads] ? Concurrent::FixedThreadPool.new(config[:threads]) : nil worker_classes = config[:worker_classes] if worker_classes.respond_to? :call worker_classes = worker_classes.call end # If we don't provide a connection to a worker, # the queue used in the worker will create a new one @workers = worker_classes.map do |worker_class| worker_class.new( pool, { connection: config[:connection] }) end # if more than one worker this should be per worker # accumulate clients and consumers as well @workers.each do |worker| worker.run end # end per worker # until @stop_flag.wait_for_set(Nsqcd::CONFIG[:heartbeat]) Nsqcd.logger.debug("Heartbeat: running threads [#{Thread.list.count}]") # report aggregated stats? end end
stop()
click to toggle source
# File lib/nsqcd/workergroup.rb, line 52 def stop Nsqcd.logger.info("Shutting down workers") @workers.each do |worker| worker.stop end @stop_flag.set! end