class Messagebus::Swarm::Controller
The controller for a set of drone workers.
Constants
- ConfigurationSource
Public Class Methods
# File lib/messagebus/swarm/controller.rb, line 72 def self.after_fork(&block) after_fork_procs << block end
# File lib/messagebus/swarm/controller.rb, line 76 def self.after_fork_procs @after_fork_procs ||= [] end
# File lib/messagebus/swarm/controller.rb, line 127 def self.delete_pid(pid_file) File.delete(pid_file) if File.exist?(pid_file) end
# File lib/messagebus/swarm/controller.rb, line 119 def self.require_files(files) files.each { |file_to_require| require file_to_require } end
Starts up the swarm based on the given config. This method does not return until the swarm is stopped down.
If the config has config[:fork]=true, it will boot the drones in subprocesses, otherwise it will use threads.
- destination_name
-
limit booting drones to only ones acting on the given
destination
- drone_count
-
override the number of drones to run
# File lib/messagebus/swarm/controller.rb, line 90 def self.start(configuration_source, drone_logger, destination_name=nil, drone_count=nil) config = if configuration_source.is_a?(ConfigurationSource) configuration_source.configuration_hash else configuration_source end raise BadConfigurationError.new("#{configuration_source.inspect} didn't evaluate to a configuration") if config.nil? config = Messagebus::DottableHash.new(config) relevant_worker_configs = config.workers # apply any applicable destination_name or drone_count settings relevant_worker_configs = relevant_worker_configs.select { |worker_config| worker_config[:destination] == destination_name } if destination_name relevant_worker_configs = relevant_worker_configs.map { |worker_config| worker_config.merge(:drones => drone_count) } if drone_count # The || is for backwards compatibility default_cluster_config = config.cluster_defaults || config drones = build_drones(relevant_worker_configs, default_cluster_config, config.clusters, swarm_control_logger, drone_logger) booter = start_drones(swarm_control_logger, config.swarm_config && config.swarm_config.fork, drones) booter.wait end
Shut down a previously started swarm
# File lib/messagebus/swarm/controller.rb, line 115 def self.stop(pid) stop_drones(pid) end
# File lib/messagebus/swarm/controller.rb, line 68 def self.swarm_control_logger @swarm_control_logger ||= Logger.new($stdout) end
It's important this a different logger instance than the one used for the drones/consumers/other things to avoid deadlocking issues. It's ok for it to use the same file, just not be the same instance of a logger.
This logger will be used in a signal handler, and logging involves mutexes, so we need/want to be sure the logger isn't being used by any other code outside the signal handler.
# File lib/messagebus/swarm/controller.rb, line 65 def self.swarm_control_logger=(swarm_control_logger) @swarm_control_logger = swarm_control_logger end
# File lib/messagebus/swarm/controller.rb, line 123 def self.write_pid(pid_file) File.open(pid_file, "w") { |f| f.print(Process.pid) } end