class Cuniculus::Dispatcher
The dispatcher forwards jobs to a worker pool to be published to RabbitMQ. It holds a RabbitMQ session and, when it receives information from one of its workers that a network exception occurred, tries to reestablish the connection and restarts the pool.
The dispatcher background thread, which monitors for connection errors, is started whenever the first job is enqueued by a {Cuniculus::Worker}.
Constants
- ENFORCED_CONN_OPTS
- RECOVERABLE_ERRORS
Attributes
Public Class Methods
Instantiates a dispatcher using the passed {Cuniculus::Config}.
@param config [Cuniculus::Config]
# File lib/cuniculus/dispatcher.rb 27 def initialize(config) 28 @config = config 29 @conn = nil 30 @job_queue = Queue.new 31 @dispatcher_chan = Queue.new 32 @shutdown = false 33 @workers = config.pub_pool_size.times.map do |i| 34 Cuniculus::PubWorker.new(config, @job_queue, @dispatcher_chan) 35 end 36 @reconnect_attempts = config.pub_reconnect_attempts 37 @reconnect_delay = config.pub_reconnect_delay 38 @reconnect_delay_max = config.pub_reconnect_delay_max 39 @shutdown_grace_period = config.pub_shutdown_grace_period 40 @thread = nil 41 @shutdown = false 42 end
Public Instance Methods
Whether its background thread is running.
@return [Boolean]
# File lib/cuniculus/dispatcher.rb 85 def alive? 86 @thread&.alive? || false 87 end
# File lib/cuniculus/dispatcher.rb 44 def describe(log_level = Logger::DEBUG) 45 Cuniculus.logger.info @thread&.backtrace 46 @workers.each do |w| 47 Cuniculus.logger.log(log_level, w.instance_variable_get(:@thread)&.backtrace) 48 end 49 end
Starts connection to RabbitMQ followed by starting the workers background threads.
if it fails to connect, it keeps retrying for a certain number of attempts, defined by {Config.pub_reconnect_attempts}. For unlimited retries, this value should be set to `:infinite`.
The time between reconnect attempts follows an exponential backoff formula:
“` t = delay * 2^(n-1) “`
where n is the attempt number, and delay is defined by {Config.pub_reconnect_delay}.
If {Config.pub_reconnect_delay_max} is defined, it works as a cap for the above time. @return [void]
# File lib/cuniculus/dispatcher.rb 104 def recover_from_net_error 105 attempt = 0 106 begin 107 @conn.start 108 Cuniculus.logger.info("Connection established") 109 110 @workers.each { |w| w.start!(@conn) } 111 rescue *RECOVERABLE_ERRORS => ex 112 handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::RMQConnectionError)) 113 sleep_time = @shutdown ? 1 : [(reconnect_delay * 2**(attempt-1)), reconnect_delay_max].min 114 sleep sleep_time 115 attempt += 1 116 117 retry if @shutdown && attempt <= reconnect_delay_max 118 retry if reconnect_attempts == :infinite || attempt <= reconnect_attempts 119 end 120 end
Shutdown workers, giving them time to conclude outstanding tasks.
Shutdown is forced after {Config.pub_shutdown_grace_period} seconds.
@return [void]
# File lib/cuniculus/dispatcher.rb 127 def shutdown 128 Cuniculus.logger.info("Cuniculus: Shutting down dispatcher") 129 @shutdown = true 130 alive_size = @workers.size 131 shutdown_t0 = Cuniculus.mark_time 132 133 sleep 1 until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || @job_queue.empty? 134 135 until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || (alive_size = @workers.select(&:alive?).size) == 0 136 sleep 1 137 alive_size.times { @job_queue << :shutdown } 138 end 139 140 @dispatcher_chan << :shutdown 141 alive_size = @workers.select(&:alive?).size 142 return unless alive_size > 0 143 144 Cuniculus.logger.warn("Cuniculus: Forcing shutdown with #{alive_size} workers remaining") 145 describe 146 end
Starts a thread responsible for reestablishing lost RabbitMQ connections and restarting {Cuniculus::PubWorker}s.
It keeps track of the last time it had to reconnect, in case it receives outdated messages of failed connections from workers.
PubWorkers communicate to it through its `dispatcher_chan` queue. Depending on the content fetched from the dispatcher channel, it takes different actions:
-
when a :shutdown message is received, it waits until current jobs are finished (up to the configured `shutdown_grace_period`) and stops its background thread.
-
when a timestamp is received that is smaller than the last reconnect timestamp, the message is ignored
-
when the timestamp is larger than the last reconnect timestamp, it tries to reestablish the connection to RabbitMQ and restarts its workers.
Note that the first time the dispatcher is started, it sends a message to its own background thread with a timestamp to trigger the first connection.
# File lib/cuniculus/dispatcher.rb 65 def start! 66 return if @shutdown || @thread&.alive? 67 @thread = Thread.new do 68 last_connect_time = 0 69 loop do 70 disconnect_time = @dispatcher_chan.pop 71 break if disconnect_time == :shutdown 72 if disconnect_time > last_connect_time 73 recover_from_net_error 74 last_connect_time = Cuniculus.mark_time 75 end 76 end 77 end 78 @conn = ::Bunny.new(@config.rabbitmq_opts.merge(ENFORCED_CONN_OPTS).merge(session_error_handler: @thread)) 79 @dispatcher_chan << Cuniculus.mark_time 80 end
Private Instance Methods
# File lib/cuniculus/dispatcher.rb 150 def handle_error(e) 151 Cuniculus.logger.error("#{e.class.name}: #{e.message}") 152 Cuniculus.logger.error(e.backtrace.join("\n")) unless e.backtrace.nil? 153 end