class BunnyMock::Queue
Attributes
@return {BunnyMock::Channel} Channel
used by queue
@return [String] Queue
name
@return [Hash] Creation options
Public Class Methods
Create a new [BunnyMock::Queue] instance
@param [BunnyMock::Channel] channel Channel
this queue will use @param [String] name Name of queue @param [Hash] opts Creation options
# File lib/bunny_mock/queue.rb, line 26 def initialize(channel, name = '', opts = {}) # Store creation information @channel = channel @name = name @opts = opts # Store messages @messages = [] end
Public Instance Methods
Get all messages in queue
@return [Array] All messages @api public
# File lib/bunny_mock/queue.rb, line 221 def all @messages end
Bind this queue to an exchange
@param [BunnyMock::Exchange,String] exchange Exchange
to bind to @param [Hash] opts Binding properties
@option opts [String] :routing_key Custom routing key
@api public
# File lib/bunny_mock/queue.rb, line 114 def bind(exchange, opts = {}) check_queue_deleted! if exchange.respond_to?(:add_route) # we can do the binding ourselves exchange.add_route opts.fetch(:routing_key, @name), self else # we need the channel to lookup the exchange @channel.queue_bind self, opts.fetch(:routing_key, @name), exchange end self end
Check if this queue is bound to the exchange
@param [BunnyMock::Exchange,String] exchange Exchange
to test @param [Hash] opts Binding properties
@option opts [String] :routing_key Routing key from binding
@return [Boolean] true if this queue is bound to the given exchange, false otherwise @api public
# File lib/bunny_mock/queue.rb, line 166 def bound_to?(exchange, opts = {}) check_queue_deleted! if exchange.respond_to?(:routes_to?) # we can do the check ourselves exchange.routes_to? self, opts else # we need the channel to lookup the exchange @channel.xchg_routes_to? self, opts.fetch(:routing_key, @name), exchange end end
Deletes this queue
@api public
# File lib/bunny_mock/queue.rb, line 230 def delete @channel.deregister_queue self @deleted = true end
Count of messages in queue
@return [Integer] Number of messages in queue @api public
# File lib/bunny_mock/queue.rb, line 184 def message_count @messages.count end
Get oldest message in queue
@return [Hash] Message data @api public
# File lib/bunny_mock/queue.rb, line 194 def pop(opts = { manual_ack: false }, &block) if BunnyMock.use_bunny_queue_pop_api bunny_pop(opts, &block) else warn '[DEPRECATED] This behavior is deprecated - please set `BunnyMock::use_bunny_queue_pop_api` to true to use Bunny Queue#pop behavior' @messages.shift end end
Publish a message
@param [Object] payload Message payload @param [Hash] opts Message properties
@option opts [String] :routing_key Routing key @option opts [Boolean] :persistent Should the message be persisted to disk? @option opts [Boolean] :mandatory Should the message be returned if it cannot be routed to any queue? @option opts [Integer] :timestamp A timestamp associated with this message @option opts [Integer] :expiration Expiration time after which the message will be deleted @option opts [String] :type Message type, e.g. what type of event or command this message represents. Can be any string @option opts [String] :reply_to Queue
name other apps should send the response to @option opts [String] :content_type Message content type (e.g. application/json) @option opts [String] :content_encoding Message content encoding (e.g. gzip) @option opts [String] :correlation_id Message correlated to this one, e.g. what request this message is a reply for @option opts [Integer] :priority Message priority, 0 to 9. Not used by RabbitMQ, only applications @option opts [String] :message_id Any message identifier @option opts [String] :user_id Optional user ID. Verified by RabbitMQ against the actual connection username @option opts [String] :app_id Optional application ID
@return [BunnyMock::Queue] self @see {BunnyMock::Exchange#publish} @api public
# File lib/bunny_mock/queue.rb, line 63 def publish(payload, opts = {}) check_queue_deleted! # add to messages @messages << { message: payload, options: opts } yield_consumers self end
Clear all messages in queue
@api public
# File lib/bunny_mock/queue.rb, line 209 def purge @messages = [] self end
Adds a consumer to the queue (subscribes for message deliveries).
Params are so they can be used when the message is processed. Takes a block which is called when a message is delivered to the queue
@api public
# File lib/bunny_mock/queue.rb, line 80 def subscribe(*args, &block) @consumers ||= [] @consumers << [block, args] yield_consumers self end
Adds a specific consumer object to the queue (subscribes for message deliveries).
@param [#call] consumer A subclass of Bunny::Consumer or any callable object Secondary params are so they can be used when the message is processed.
@api public
# File lib/bunny_mock/queue.rb, line 96 def subscribe_with(consumer, *args) @consumers ||= [] @consumers << [consumer, args] yield_consumers self end
Unbind this queue from an exchange
@param [BunnyMock::Exchange,String] exchange Exchange
to unbind from @param [Hash] opts Binding properties
@option opts [String] :routing_key Custom routing key
@api public
# File lib/bunny_mock/queue.rb, line 139 def unbind(exchange, opts = {}) check_queue_deleted! if exchange.respond_to?(:remove_route) # we can do the unbinding ourselves exchange.remove_route opts.fetch(:routing_key, @name), self else # we need the channel to lookup the exchange @channel.queue_unbind self, opts.fetch(:routing_key, @name), exchange end end
Private Instance Methods
@private
# File lib/bunny_mock/queue.rb, line 243 def bunny_pop(*) response = pop_response(@messages.shift) block_given? ? yield(*response) : response end
@private
# File lib/bunny_mock/queue.rb, line 238 def check_queue_deleted! raise 'Queue has been deleted' if @deleted end
@private
# File lib/bunny_mock/queue.rb, line 249 def pop_response(message) return [nil, nil, nil] unless message di = GetResponse.new(@channel, self, message[:options]) mp = MessageProperties.new(message[:options]) [di, mp, message[:message]] end
# File lib/bunny_mock/queue.rb, line 271 def store_acknowledgement(response, args) if args[0].is_a?(Hash) && args[0][:manual_ack] delivery_tag = response[0][:delivery_tag] @channel.acknowledged_state[:pending][delivery_tag] = response end end
@private
# File lib/bunny_mock/queue.rb, line 259 def yield_consumers return if @consumers.nil? @consumers.each do |c, args| # rubocop:disable AssignmentInCondition while message = all.pop response = pop_response(message) store_acknowledgement(response, args) c.call(response) end end end