module Cuniculus::SupervisorMethods

Attributes

config[R]

Public Class Methods

new(config) click to toggle source
   # File lib/cuniculus/supervisor.rb
13 def initialize(config)
14   @config = config
15   conn = connect(config.rabbitmq_opts)
16   @consumers = create_consumers(conn, config.queues)
17   @consumer_lock = Mutex.new
18   @done = false
19 end

Public Instance Methods

connect(conn_opts) click to toggle source
   # File lib/cuniculus/supervisor.rb
30 def connect(conn_opts)
31   conn = ::Bunny.new(conn_opts)
32   conn.start
33   conn
34 rescue StandardError => e
35   raise Cuniculus.convert_exception_class(e, Cuniculus::RMQConnectionError)
36 end
consumer_exception(consumer, _ex) click to toggle source
   # File lib/cuniculus/supervisor.rb
48 def consumer_exception(consumer, _ex)
49   @consumer_lock.synchronize do
50     @consumers.delete(consumer)
51     unless @done
52       # Reuse channel
53       ch = consumer.channel
54       name = consumer.queue.name
55       c = Cuniculus::Consumer.new(self, name, ch)
56       @consumers << c
57       c.start
58     end
59   end
60 end
create_consumers(conn, queues) click to toggle source
   # File lib/cuniculus/supervisor.rb
38 def create_consumers(conn, queues)
39   consumers = []
40   queues.each do |_name, q_cfg|
41     ch = conn.create_channel(nil, q_cfg.thread_pool_size)
42     ch.prefetch(q_cfg.prefetch_count) if q_cfg.prefetch_count
43     consumers << Cuniculus::Consumer.new(q_cfg, ch)
44   end
45   consumers
46 end
start() click to toggle source
   # File lib/cuniculus/supervisor.rb
21 def start
22   @consumers.each(&:start)
23 end
stop() click to toggle source
   # File lib/cuniculus/supervisor.rb
25 def stop
26   @done = true
27   @consumers.each(&:stop)
28 end