class Cuniculus::Config
Constants
- ENFORCED_CONN_OPTS
Attributes
Public Class Methods
# File lib/cuniculus/config.rb 30 def initialize 31 @opts = {} 32 33 # ---- Default values 34 @queues = { "cun_default" => QueueConfig.new({ "name" => "cun_default" }) } 35 @rabbitmq_opts = { 36 host: "127.0.0.1", 37 port: 5672, 38 user: "guest", 39 pass: "guest", 40 vhost: "/" 41 } 42 @exchange_name = Cuniculus::CUNICULUS_EXCHANGE 43 @dead_queue_ttl = 1000 * 60 * 60 * 24 * 180 # 180 days 44 @pub_reconnect_attempts = :infinite 45 @pub_reconnect_delay = 1.5 46 @pub_reconnect_delay_max = 10 47 @pub_shutdown_grace_period = 50 48 @pub_pool_size = 5 49 ## ---- End of default values 50 end
Public Instance Methods
Configure an additional queue
Note that a single call to `add_queue` might lead to the creation of multiple queues on RabbitMQ: one base queue, and an additional queue for every retry attempt. For example, with a queue named `“test”` with `max_retry` set to `4`, 5 queues are created in RabbitMQ.
For tuning `prefetch_count`, refer to [this guide](www.cloudamqp.com/blog/2017-12-29-part1-rabbitmq-best-practice.html#prefetch).
If a queue already exists in RabbitMQ, and an attempt is done to add it again through `add_queue`, nothing happens, except if the options passed to `add_queue` conflict with the existing queue. For example if a queue exists that is durable, and `add_queue` is called with `“durable” => false`, a `Cuniculus::RMQQueueConfigurationConflict` is raised. To redeclare a queue with conflicting configurations, the original queue has first to be removed from RabbitMQ manually. This can be done, for example, through the management console.
@param qopts [Hash] Queue config options. @option qopts [String] “name” Name of the queue. @option qopts [Boolean] “durable” (true) Whether queue is declared as durable in RabbitMQ. Jobs in non-durable queues may be lost if the RabbitMQ goes down. @option qopts [Integer] “max_retry” (8) Number of retries for failed jobs in this queue. @option qopts [Integer] “prefetch_count” (10) Prefetch count used when consuming jobs from this queue. @option qopts [Integer] “thread_pool_size” (5) Thread pool size for receiving jobs.
@example Add queue named “critical”
Cuniculus.configure do |cfg| cfg.add_queue({ name: "critical", max_retry: 10 }) end
# File lib/cuniculus/config.rb 107 def add_queue(qopts) 108 qopts = qopts.transform_keys(&:to_s) 109 qname = qopts["name"].to_s 110 raise Cuniculus::ConfigError, "Missing 'name' key in queue configuration hash" if qname.strip.empty? 111 @queues[qname] = QueueConfig.new(qopts) 112 end
# File lib/cuniculus/config.rb 52 def dead_queue_ttl=(ttl) 53 raise Cuniculus::ConfigError, "dead_queue_ttl should be a positive integer, given #{ttl.inspect}" if ttl.to_i <= 0 54 @dead_queue_ttl = ttl 55 end
# File lib/cuniculus/config.rb 114 def declare! 115 conn = ::Bunny.new(rabbitmq_opts.merge(ENFORCED_CONN_OPTS)) 116 conn.start 117 ch = conn.create_channel 118 declare_exchanges!(ch) 119 declare_dead_queue!(ch) 120 @queues.each_value { |q| q.declare!(ch) } 121 conn.close unless conn.closed? 122 rescue Bunny::TCPConnectionFailed => ex 123 raise Cuniculus.convert_exception_class(ex, Cuniculus::RMQConnectionError) 124 end
Specify if the default queue `cun_default` should be created. `cun_default` is used by workers that don't explicitly specify a queue with `cuniculus_options queue: “another_queue”`.
@param bool [Boolean] If false, queue `cun_default` is not created. Defaults to `true`.
# File lib/cuniculus/config.rb 130 def default_queue=(bool) 131 @queues.delete("cun_default") unless bool 132 end
# File lib/cuniculus/config.rb 57 def exchange_name=(xname) 58 raise Cuniculus::ConfigError, "exchange_name should not be blank" if xname.to_s.empty? 59 @exchange_name = xname 60 end
# File lib/cuniculus/config.rb 62 def pub_pool_size=(pool_size) 63 raise Cuniculus::ConfigError, "pub_pool_size should be a positive integer, given #{pool_size.inspect}" if pool_size.to_i <= 0 64 @pub_pool_size = pool_size 65 end
# File lib/cuniculus/config.rb 67 def pub_reconnect_attempts=(attempts) 68 raise Cuniculus::ConfigError, "pub_reconnect_attempts should be either :infinite or a non-negative integer, was given #{attempts.inspect}" if attempts != :infinite && attempts.to_i < 0 69 @pub_reconnect_attempts = attempts 70 end
# File lib/cuniculus/config.rb 72 def pub_reconnect_delay=(delay) 73 raise Cuniculus::ConfigError, "pub_reconnect_delay should be a non-negative integer, was given #{delay.inspect}" if delay.to_i < 0 74 @pub_reconnect_delay = delay 75 end
# File lib/cuniculus/config.rb 77 def pub_reconnect_delay_max=(delay) 78 raise Cuniculus::ConfigError, "pub_reconnect_delay_max should be a non-negative integer, was given #{delay.inspect}" if delay.to_i < 0 79 @pub_reconnect_delay_max = delay 80 end
# File lib/cuniculus/config.rb 82 def pub_shutdown_grace_period=(period) 83 raise Cuniculus::ConfigError, "pub_shutdown_grace_period should be a non-negative integer, was given #{period.inspect}" if period.to_i < 0 84 @pub_shutdown_grace_period = period 85 end
Private Instance Methods
# File lib/cuniculus/config.rb 141 def declare_dead_queue!(ch) 142 ch.queue( 143 "cun_dead", 144 durable: true, 145 exclusive: false, 146 arguments: { 147 "x-message-ttl" => dead_queue_ttl 148 } 149 ).bind(Cuniculus::CUNICULUS_DLX_EXCHANGE) 150 end
# File lib/cuniculus/config.rb 136 def declare_exchanges!(ch) 137 ch.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true }) 138 ch.fanout(Cuniculus::CUNICULUS_DLX_EXCHANGE, { durable: true }) 139 end