class Cuniculus::Dispatcher

The dispatcher forwards jobs to a worker pool to be published to RabbitMQ. It holds a RabbitMQ session and, when it receives information from one of its workers that a network exception occurred, tries to reestablish the connection and restarts the pool.

The dispatcher background thread, which monitors for connection errors, is started whenever the first job is enqueued by a {Cuniculus::Worker}.

Constants

ENFORCED_CONN_OPTS
RECOVERABLE_ERRORS

Attributes

dispatcher_chan[R]
job_queue[R]
reconnect_attempts[R]
reconnect_delay[R]
reconnect_delay_max[R]
shutdown_grace_period[R]

Public Class Methods

new(config) click to toggle source

Instantiates a dispatcher using the passed {Cuniculus::Config}.

@param config [Cuniculus::Config]

   # File lib/cuniculus/dispatcher.rb
27 def initialize(config)
28   @config = config
29   @conn = nil
30   @job_queue = Queue.new
31   @dispatcher_chan = Queue.new
32   @shutdown = false
33   @workers = config.pub_pool_size.times.map do |i|
34     Cuniculus::PubWorker.new(config, @job_queue, @dispatcher_chan)
35   end
36   @reconnect_attempts = config.pub_reconnect_attempts
37   @reconnect_delay = config.pub_reconnect_delay
38   @reconnect_delay_max = config.pub_reconnect_delay_max
39   @shutdown_grace_period = config.pub_shutdown_grace_period
40   @thread = nil
41   @shutdown = false
42 end

Public Instance Methods

alive?() click to toggle source

Whether its background thread is running.

@return [Boolean]

   # File lib/cuniculus/dispatcher.rb
85 def alive?
86   @thread&.alive? || false
87 end
describe(log_level = Logger::DEBUG) click to toggle source
   # File lib/cuniculus/dispatcher.rb
44 def describe(log_level = Logger::DEBUG)
45   Cuniculus.logger.info @thread&.backtrace
46   @workers.each do |w|
47     Cuniculus.logger.log(log_level, w.instance_variable_get(:@thread)&.backtrace)
48   end
49 end
recover_from_net_error() click to toggle source

Starts connection to RabbitMQ followed by starting the workers background threads.

if it fails to connect, it keeps retrying for a certain number of attempts, defined by {Config.pub_reconnect_attempts}. For unlimited retries, this value should be set to `:infinite`.

The time between reconnect attempts follows an exponential backoff formula:

“` t = delay * 2^(n-1) “`

where n is the attempt number, and delay is defined by {Config.pub_reconnect_delay}.

If {Config.pub_reconnect_delay_max} is defined, it works as a cap for the above time. @return [void]

    # File lib/cuniculus/dispatcher.rb
104 def recover_from_net_error
105   attempt = 0
106   begin
107     @conn.start
108     Cuniculus.logger.info("Connection established")
109 
110     @workers.each { |w| w.start!(@conn) }
111   rescue *RECOVERABLE_ERRORS => ex
112     handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::RMQConnectionError))
113     sleep_time = @shutdown ? 1 : [(reconnect_delay * 2**(attempt-1)), reconnect_delay_max].min
114     sleep sleep_time
115     attempt += 1
116 
117     retry if @shutdown && attempt <= reconnect_delay_max
118     retry if reconnect_attempts == :infinite || attempt <= reconnect_attempts
119   end
120 end
shutdown() click to toggle source

Shutdown workers, giving them time to conclude outstanding tasks.

Shutdown is forced after {Config.pub_shutdown_grace_period} seconds.

@return [void]

    # File lib/cuniculus/dispatcher.rb
127 def shutdown
128   Cuniculus.logger.info("Cuniculus: Shutting down dispatcher")
129   @shutdown = true
130   alive_size = @workers.size
131   shutdown_t0 = Cuniculus.mark_time
132 
133   sleep 1 until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || @job_queue.empty?
134 
135   until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || (alive_size = @workers.select(&:alive?).size) == 0
136     sleep 1
137     alive_size.times { @job_queue << :shutdown }
138   end
139 
140   @dispatcher_chan << :shutdown
141   alive_size = @workers.select(&:alive?).size
142   return unless alive_size > 0
143 
144   Cuniculus.logger.warn("Cuniculus: Forcing shutdown with #{alive_size} workers remaining")
145   describe
146 end
start!() click to toggle source

Starts a thread responsible for reestablishing lost RabbitMQ connections and restarting {Cuniculus::PubWorker}s.

It keeps track of the last time it had to reconnect, in case it receives outdated messages of failed connections from workers.

PubWorkers communicate to it through its `dispatcher_chan` queue. Depending on the content fetched from the dispatcher channel, it takes different actions:

  • when a :shutdown message is received, it waits until current jobs are finished (up to the configured `shutdown_grace_period`) and stops its background thread.

  • when a timestamp is received that is smaller than the last reconnect timestamp, the message is ignored

  • when the timestamp is larger than the last reconnect timestamp, it tries to reestablish the connection to RabbitMQ and restarts its workers.

Note that the first time the dispatcher is started, it sends a message to its own background thread with a timestamp to trigger the first connection.

   # File lib/cuniculus/dispatcher.rb
65 def start!
66   return if @shutdown || @thread&.alive?
67   @thread = Thread.new do
68     last_connect_time = 0
69     loop do
70       disconnect_time = @dispatcher_chan.pop
71       break if disconnect_time == :shutdown
72       if disconnect_time > last_connect_time
73         recover_from_net_error
74         last_connect_time = Cuniculus.mark_time
75       end
76     end
77   end
78   @conn = ::Bunny.new(@config.rabbitmq_opts.merge(ENFORCED_CONN_OPTS).merge(session_error_handler: @thread))
79   @dispatcher_chan << Cuniculus.mark_time
80 end

Private Instance Methods

handle_error(e) click to toggle source
    # File lib/cuniculus/dispatcher.rb
150 def handle_error(e)
151   Cuniculus.logger.error("#{e.class.name}: #{e.message}")
152   Cuniculus.logger.error(e.backtrace.join("\n")) unless e.backtrace.nil?
153 end