class Emque::Consuming::Adapters::RabbitMq::Manager
Attributes
delayed_message_workers[W]
workers[W]
Public Instance Methods
actor_died(actor, reason)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 11 def actor_died(actor, reason) unless shutdown logger.error "RabbitMQ Manager: actor_died - #{actor.inspect} " + "died: #{reason}" end end
delayed_message_workers()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 70 def delayed_message_workers @delayed_message_workers end
retry_errors()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 74 def retry_errors ErrorWorker.new(@connection).retry_errors end
start()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 18 def start setup_connection initialize_error_queue initialize_workers initialize_delayed_message_workers if enable_delayed_message logger.info "RabbitMQ Manager: starting #{worker_count} workers..." workers(:flatten => true).each do |worker| worker.async.start end if enable_delayed_message delayed_message_workers.each do |worker| worker.async.start end end end
stop()
click to toggle source
Calls superclass method
Emque::Consuming::Actor#stop
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 34 def stop logger.info "RabbitMQ Manager: stopping #{worker_count} workers..." super do workers(:flatten => true).each do |worker| logger.info "RabbitMQ Manager: stopping #{worker.topic} worker..." worker.stop end if enable_delayed_message delayed_message_workers.each_with_index do |worker, i| logger.info "RabbitMQ Manager: stopping #{worker.class} #{i + 1} worker..." worker.stop end end end @connection.stop end
worker(topic:, command:)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 53 def worker(topic:, command:) if workers.has_key?(topic) case command when :down worker = workers[topic].pop worker.stop if worker when :up workers[topic] << new_worker(topic) workers[topic].last.async.start end end end
workers(flatten: false)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 66 def workers(flatten: false) flatten ? @workers.values.flatten : @workers end
Private Instance Methods
enable_delayed_message()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 101 def enable_delayed_message config.enable_delayed_message end
initialize_delayed_message_workers()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 93 def initialize_delayed_message_workers self.delayed_message_workers = [].tap { |workers| config.delayed_message_workers.times do workers << DelayedMessageWorker.new_link(@connection) end } end
initialize_error_queue()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 105 def initialize_error_queue channel = @connection.create_channel error_exchange = channel.fanout( "#{config.app_name}.error", :durable => true, :auto_delete => false ) channel.queue( "emque.#{config.app_name}.error", :durable => true, :auto_delete => false, :arguments => { "x-dead-letter-exchange" => "#{config.app_name}.error" } ).bind(error_exchange) channel.close end
initialize_workers()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 82 def initialize_workers self.workers = {}.tap { |workers| router.topic_mapping.keys.each do |topic| workers[topic] ||= [] router.workers(topic).times do workers[topic] << new_worker(topic) end end } end
new_worker(topic)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 123 def new_worker(topic) Worker.new_link(@connection, topic) end
setup_connection()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 127 def setup_connection @connection = Bunny.new(config.adapter.options[:url]) @connection.start end
worker_count()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 132 def worker_count workers(:flatten => true).size end