module Cuniculus::SupervisorMethods
Attributes
config[R]
Public Class Methods
new(config)
click to toggle source
# File lib/cuniculus/supervisor.rb 13 def initialize(config) 14 @config = config 15 conn = connect(config.rabbitmq_opts) 16 @consumers = create_consumers(conn, config.queues) 17 @consumer_lock = Mutex.new 18 @done = false 19 end
Public Instance Methods
connect(conn_opts)
click to toggle source
# File lib/cuniculus/supervisor.rb 30 def connect(conn_opts) 31 conn = ::Bunny.new(conn_opts) 32 conn.start 33 conn 34 rescue StandardError => e 35 raise Cuniculus.convert_exception_class(e, Cuniculus::RMQConnectionError) 36 end
consumer_exception(consumer, _ex)
click to toggle source
# File lib/cuniculus/supervisor.rb 48 def consumer_exception(consumer, _ex) 49 @consumer_lock.synchronize do 50 @consumers.delete(consumer) 51 unless @done 52 # Reuse channel 53 ch = consumer.channel 54 name = consumer.queue.name 55 c = Cuniculus::Consumer.new(self, name, ch) 56 @consumers << c 57 c.start 58 end 59 end 60 end
create_consumers(conn, queues)
click to toggle source
# File lib/cuniculus/supervisor.rb 38 def create_consumers(conn, queues) 39 consumers = [] 40 queues.each do |_name, q_cfg| 41 ch = conn.create_channel(nil, q_cfg.thread_pool_size) 42 ch.prefetch(q_cfg.prefetch_count) if q_cfg.prefetch_count 43 consumers << Cuniculus::Consumer.new(q_cfg, ch) 44 end 45 consumers 46 end
start()
click to toggle source
# File lib/cuniculus/supervisor.rb 21 def start 22 @consumers.each(&:start) 23 end
stop()
click to toggle source
# File lib/cuniculus/supervisor.rb 25 def stop 26 @done = true 27 @consumers.each(&:stop) 28 end