class RocketJob::Supervisor
Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown.
Attributes
server[R]
worker_id[RW]
worker_pool[R]
Public Class Methods
new(server)
click to toggle source
# File lib/rocket_job/supervisor.rb, line 24 def initialize(server) @server = server @worker_pool = WorkerPool.new(server.name) @mutex = Mutex.new end
run()
click to toggle source
Start the Supervisor
, using the supplied attributes to create a new Server
instance.
# File lib/rocket_job/supervisor.rb, line 13 def self.run Thread.current.name = "rocketjob main" RocketJob.create_indexes register_signal_handlers server = Server.create! new(server).run ensure server&.destroy end
Public Instance Methods
run()
click to toggle source
# File lib/rocket_job/supervisor.rb, line 30 def run logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" logger.info("Running with filter", Config.filter) if Config.filter server.started! logger.info "Rocket Job Server started" event_listener = Thread.new { Event.listener } Subscribers::SecretConfig.subscribe if defined?(SecretConfig) Subscribers::Server.subscribe(self) do Subscribers::Worker.subscribe(self) do Subscribers::Logger.subscribe do supervise_pool stop! end end end rescue ::Mongoid::Errors::DocumentNotFound logger.info("Server has been destroyed. Going down hard!") rescue Exception => e logger.error("RocketJob::Server is stopping due to an exception", e) ensure event_listener&.kill # Logs the backtrace for each running worker worker_pool.log_backtraces logger.info("Shutdown Complete") end
stop!()
click to toggle source
# File lib/rocket_job/supervisor.rb, line 57 def stop! server.stop! if server.may_stop? synchronize do worker_pool.stop end until worker_pool.join logger.info "Waiting for workers to finish processing ..." # One or more workers still running so update heartbeat so that server reports "alive". server.refresh(worker_pool.living_count) end end
supervise_pool()
click to toggle source
# File lib/rocket_job/supervisor.rb, line 69 def supervise_pool stagger = true until self.class.shutdown? synchronize do if server.running? worker_pool.prune worker_pool.rebalance(server.max_workers, stagger) stagger = false elsif server.paused? worker_pool.stop sleep(0.1) worker_pool.prune stagger = true else break end end synchronize { server.refresh(worker_pool.living_count) } self.class.wait_for_event(Config.heartbeat_seconds) break if self.class.shutdown? end end
synchronize(&block)
click to toggle source
# File lib/rocket_job/supervisor.rb, line 95 def synchronize(&block) @mutex.synchronize(&block) end