class Isimud::BunnyClient
Interface for Bunny RabbitMQ client @see rubybunny.info
Constants
- CHANNEL_KEY
- DEFAULT_URL
Attributes
Public Class Methods
Initialize a new BunnyClient
instance. Note that a connection is not established until any other method is called
@param [String, Hash] _url Server URL or options hash @param [Hash] _bunny_options optional Bunny connection options @see Bunny.new for connection options
# File lib/isimud/bunny_client.rb, line 18 def initialize(_url = nil, _bunny_options = {}) log "Isimud::BunnyClient.initialize: options = #{_bunny_options.inspect}" @url = _url || DEFAULT_URL @url.symbolize_keys! if @url.respond_to?(:symbolize_keys!) @bunny_options = _bunny_options.symbolize_keys @bunny_options[:logger] = Isimud.logger end
Public Instance Methods
Convenience method that finds or creates a named queue, binds to an exchange, and subscribes to messages. If a block is provided, it will be called by the consumer each time a message is received.
@param [String] queue_name name of the queue @param [String] exchange_name name of the AMQP exchange. Note that existing exchanges must be declared as Topic
exchanges; otherwise, an error will occur
@param [Array<String>] routing_keys list of routing keys to be bound to the queue for the specified exchange. @yieldparam [String] payload message text @return [Bunny::Consumer] Bunny consumer interface
# File lib/isimud/bunny_client.rb, line 35 def bind(queue_name, exchange_name, *routing_keys, &block) queue = create_queue(queue_name, exchange_name, queue_options: {durable: true}, routing_keys: routing_keys) subscribe(queue, &block) if block_given? end
Open a new, thread-specific AMQP connection channel, or return the current channel for this thread if it exists
and is currently open. New channels are created with publisher confirms enabled. Messages will be prefetched according to Isimud.prefetch_count when declared.
@return [Bunny::Channel] channel instance.
# File lib/isimud/bunny_client.rb, line 114 def channel if (channel = Thread.current[CHANNEL_KEY]).try(:open?) channel else new_channel = connection.channel new_channel.confirm_select new_channel.prefetch(Isimud.prefetch_count) if Isimud.prefetch_count Thread.current[CHANNEL_KEY] = new_channel end end
Close the AMQP connection and clear it from the instance. @return nil
# File lib/isimud/bunny_client.rb, line 139 def close connection.close ensure @connection = nil end
Determine if a Bunny connection is currently established to the AMQP server. @return [Boolean,nil] true if a connection was established and is active or starting, false if a connection exists but is closed or closing, or nil if no connection has been established.
# File lib/isimud/bunny_client.rb, line 133 def connected? @connection && @connection.open? end
Establish a connection to the AMQP server, or return the current connection if one already exists @return [Bunny::Session]
# File lib/isimud/bunny_client.rb, line 102 def connection @connection ||= ::Bunny.new(url, @bunny_options).tap(&:start) end
Find or create a named queue and bind it to the specified exchange
@param [String] queue_name name of the queue @param [String] exchange_name name of the AMQP exchange. Note that pre-existing exchanges must be declared as Topic
exchanges; otherwise, an error will occur
@param [Hash] options queue declaration options @option options [Boolean] :queue_options ({durable: true}) queue declaration options – @see Bunny::Channel#queue @option options [Array<String>] :routing_keys ([]) routing keys to be bound to the queue. Use “*” to match any 1 word
in a route segment. Use "#" to match 0 or more words in a segment.
@return [Bunny::Queue] Bunny queue
# File lib/isimud/bunny_client.rb, line 52 def create_queue(queue_name, exchange_name, options = {}) queue_options = options[:queue_options] || {durable: true} routing_keys = options[:routing_keys] || [] log "Isimud::BunnyClient: create_queue #{queue_name}: queue_options=#{queue_options.inspect}" queue = find_queue(queue_name, queue_options) bind_routing_keys(queue, exchange_name, routing_keys) if routing_keys.any? queue end
Permanently delete the queue from the AMQP server. Any messages present in the queue will be discarded. @param [String] queue_name queue name @return [AMQ::Protocol::Queue::DeleteOk] RabbitMQ response
# File lib/isimud/bunny_client.rb, line 96 def delete_queue(queue_name) channel.queue_delete(queue_name) end
Look up a queue by name, or create it if it does not already exist.
# File lib/isimud/bunny_client.rb, line 167 def find_queue(queue_name, options = {durable: true}) channel.queue(queue_name, options) end
Publish a message to the specified exchange, which is declared as a durable, topic exchange. Note that message
is always persisted.
@param [String] exchange AMQP exchange name @param [String] routing_key message routing key. This should always be in the form of words separated by dots
e.g. "user.goal.complete"
@param [String] payload message payload @param [Hash] options additional message options @see Bunny::Exchange#publish @see rubybunny.info/articles/exchanges.html
# File lib/isimud/bunny_client.rb, line 154 def publish(exchange, routing_key, payload, options = {}) log "Isimud::BunnyClient#publish: exchange=#{exchange} routing_key=#{routing_key}", :debug channel.topic(exchange, durable: true).publish(payload, options.merge(routing_key: routing_key, persistent: true)) end
Close and reopen the AMQP connection @return [Bunny::Session]
# File lib/isimud/bunny_client.rb, line 161 def reconnect close connect end
Reset this client by closing all channels for the connection.
# File lib/isimud/bunny_client.rb, line 126 def reset connection.close_all_channels end
Subscribe to messages on the Bunny queue. The provided block will be called each time a message is received.
The message will be acknowledged and deleted from the queue unless an exception is raised from the block. In the case that an uncaught exception is raised, the message is rejected, and any declared exception handlers will be called.
@param [Bunny::Queue] queue Bunny queue @param [Hash] options {} subscription options – @see Bunny::Queue#subscribe @yieldparam [String] payload message text
# File lib/isimud/bunny_client.rb, line 69 def subscribe(queue, options = {}, &block) queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| current_channel = delivery_info.channel begin log "Isimud: queue #{queue.name} received #{properties[:message_id]} routing_key: #{delivery_info.routing_key}", :debug Thread.current['isimud_queue_name'] = queue.name Thread.current['isimud_delivery_info'] = delivery_info Thread.current['isimud_properties'] = properties block.call(payload) if current_channel.open? log "Isimud: queue #{queue.name} finished with #{properties[:message_id]}, acknowledging", :debug current_channel.ack(delivery_info.delivery_tag) else log "Isimud: queue #{queue.name} unable to acknowledge #{properties[:message_id]}", :warn end rescue => e log("Isimud: queue #{queue.name} error processing #{properties[:message_id]} payload #{payload.inspect}: #{e.class.name} #{e.message}\n #{e.backtrace.join("\n ")}", :warn) retry_status = run_exception_handlers(e) log "Isimud: rejecting #{properties[:message_id]} requeue=#{retry_status}", :warn current_channel.open? && current_channel.reject(delivery_info.delivery_tag, retry_status) end end end
Private Instance Methods
# File lib/isimud/bunny_client.rb, line 173 def bind_routing_keys(queue, exchange_name, routing_keys) log "Isimud::BunnyClient: bind queue #{queue.name} exchange #{exchange_name} routing_keys: #{routing_keys.join(',')}" channel.exchange(exchange_name, type: :topic, durable: true) routing_keys.each { |key| queue.bind(exchange_name, routing_key: key, nowait: false) } end