class BunnyMock::Channel
Attributes
@return [Hash] with details of pending, acked and nacked messaged
@return [BunnyMock::Session] Session
this channel belongs to
@return [Integer] Channel
identifier
@return [Symbol] Current channel state
Public Class Methods
Create a new {BunnyMock::Channel} instance
@param [BunnyMock::Session] connection Mocked session instance @param [Integer] id Channel
identifier
@api public
# File lib/bunny_mock/channel.rb, line 28 def initialize(connection = nil, id = nil) # store channel id @id = id # store connection information @connection = connection # initialize exchange and queue storage @exchanges = {} @queues = {} @acknowledged_state = { pending: {}, acked: {}, nacked: {}, rejected: {} } # set status to opening @status = :opening end
Public Instance Methods
Acknowledge message.
@param [Integer] delivery_tag Delivery tag to acknowledge @param [Boolean] multiple (false) Should all unacknowledged messages up to this be acknowleded as well?
@return nil @api public
# File lib/bunny_mock/channel.rb, line 279 def ack(delivery_tag, multiple = false) if multiple @acknowledged_state[:pending].keys.each do |key| ack(key, false) if key <= delivery_tag end elsif @acknowledged_state[:pending].key?(delivery_tag) update_acknowledgement_state(delivery_tag, :acked) end nil end
@return [BunnyMock::Channel] Self
# File lib/bunny_mock/channel.rb, line 201 def basic_publish(payload, xchg, routing_key, opts = {}) xchg = xchg_find_or_create(xchg) unless xchg.respond_to? :name xchg.publish payload, opts.merge(routing_key: routing_key) self end
Sets status to closed
@return [BunnyMock::Channel] self @api public
# File lib/bunny_mock/channel.rb, line 74 def close @status = :closed self end
@return [Boolean] true if status is closed, false otherwise @api public
# File lib/bunny_mock/channel.rb, line 52 def closed? @status == :closed end
Does nothing atm.
@return nil @api public
# File lib/bunny_mock/channel.rb, line 246 def confirm_select(callback = nil) # noop end
Mocks RabbitMQ default exchange
@return [BunnyMock::Exchange] Mocked default exchange instance @api public
# File lib/bunny_mock/channel.rb, line 188 def default_exchange direct '', no_declare: true end
@private
# File lib/bunny_mock/channel.rb, line 343 def deregister_exchange(xchg) @connection.deregister_exchange xchg.name end
@private
# File lib/bunny_mock/channel.rb, line 338 def deregister_queue(queue) @connection.deregister_queue queue.name end
Mocks a direct exchange
@param [String] name Exchange
name @param [Hash] opts Exchange
parameters
@option opts [Boolean] :durable @option opts [Boolean] :auto_delete @option opts [Hash] :arguments
@return [BunnyMock::Exchange] Mocked exchange instance @api public
# File lib/bunny_mock/channel.rb, line 144 def direct(name, opts = {}) exchange name, opts.merge(type: :direct) end
Mocks an exchange
@param [String] name Exchange
name @param [Hash] opts Exchange
parameters
@option opts [Symbol,String] :type Type of exchange @option opts [Boolean] :durable @option opts [Boolean] :auto_delete @option opts [Hash] :arguments
@return [BunnyMock::Exchange] Mocked exchange instance @api public
# File lib/bunny_mock/channel.rb, line 102 def exchange(name, opts = {}) @connection.register_exchange xchg_find_or_create(name, opts) end
Mocks a fanout exchange
@param [String] name Exchange
name @param [Hash] opts Exchange
parameters
@option opts [Boolean] :durable @option opts [Boolean] :auto_delete @option opts [Hash] :arguments
@return [BunnyMock::Exchange] Mocked exchange instance @api public
# File lib/bunny_mock/channel.rb, line 127 def fanout(name, opts = {}) exchange name, opts.merge(type: :fanout) end
Unique string supposed to be used as a consumer tag.
@return [String] Unique string. @api plugin
# File lib/bunny_mock/channel.rb, line 110 def generate_consumer_tag(name = 'bunny') "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" end
Mocks a headers exchange
@param [String] name Exchange
name @param [Hash] opts Exchange
parameters
@option opts [Boolean] :durable @option opts [Boolean] :auto_delete @option opts [Hash] :arguments
@return [BunnyMock::Exchange] Mocked exchange instance @api public
# File lib/bunny_mock/channel.rb, line 178 def header(name, opts = {}) exchange name, opts.merge(type: :header) end
Unacknowledge message.
@param [Integer] delivery_tag Delivery tag to acknowledge @param [Boolean] multiple (false) Should all unacknowledged messages up to this be rejected as well? @param [Boolean] requeue (false) Should this message be requeued instead of dropping it?
@return nil @api public
# File lib/bunny_mock/channel.rb, line 301 def nack(delivery_tag, multiple = false, requeue = false) if multiple @acknowledged_state[:pending].keys.each do |key| nack(key, false, requeue) if key <= delivery_tag end elsif @acknowledged_state[:pending].key?(delivery_tag) delivery, header, body = update_acknowledgement_state(delivery_tag, :nacked) delivery.queue.publish(body, header.to_hash) if requeue end nil end
Sets status to open
@return [BunnyMock::Channel] self @api public
# File lib/bunny_mock/channel.rb, line 62 def open @status = :open self end
@return [Boolean] true if status is open, false otherwise @api public
# File lib/bunny_mock/channel.rb, line 46 def open? @status == :open end
Does nothing atm.
@return nil @api public
# File lib/bunny_mock/channel.rb, line 256 def prefetch(*) # noop end
Create a new {BunnyMock::Queue} instance, or find in channel cache
@param [String] name Name of queue @param [Hash] opts Queue
creation options
@return [BunnyMock::Queue] Queue
that was mocked or looked up @api public
# File lib/bunny_mock/channel.rb, line 222 def queue(name = '', opts = {}) queue = @connection.find_queue(name) || Queue.new(self, name, opts) @connection.register_queue queue end
@private
# File lib/bunny_mock/channel.rb, line 348 def queue_bind(queue, key, xchg) exchange = @connection.find_exchange xchg raise Bunny::NotFound.new("Exchange '#{xchg}' was not found", self, false) unless exchange exchange.add_route key, queue end
@private
# File lib/bunny_mock/channel.rb, line 356 def queue_unbind(queue, key, xchg) exchange = @connection.find_exchange xchg raise Bunny::NotFound.new("Exchange '#{xchg}' was not found", self, false) unless exchange exchange.remove_route key, queue end
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.
@param [Integer] delivery_tag Delivery tag to reject @param [Boolean] requeue Should this message be requeued instead of dropping it?
@return nil @api public
# File lib/bunny_mock/channel.rb, line 323 def reject(delivery_tag, requeue = false) if @acknowledged_state[:pending].key?(delivery_tag) delivery, header, body = update_acknowledgement_state(delivery_tag, :rejected) delivery.queue.publish(body, header.to_hash) if requeue end nil end
@return [String] Object representation
# File lib/bunny_mock/channel.rb, line 81 def to_s "#<#{self.class.name}:#{object_id} @id=#{@id} @open=#{open?}>" end
Mocks a topic exchange
@param [String] name Exchange
name @param [Hash] opts Exchange
parameters
@option opts [Boolean] :durable @option opts [Boolean] :auto_delete @option opts [Hash] :arguments
@return [BunnyMock::Exchange] Mocked exchange instance @api public
# File lib/bunny_mock/channel.rb, line 161 def topic(name, opts = {}) exchange name, opts.merge(type: :topic) end
Does not actually wait, but always return true.
@return true @api public
# File lib/bunny_mock/channel.rb, line 266 def wait_for_confirms(*) true end
@private
# File lib/bunny_mock/channel.rb, line 380 def xchg_bind(receiver, routing_key, name) source = @connection.find_exchange name raise Bunny::NotFound.new("Exchange '#{name}' was not found", self, false) unless source source.add_route routing_key, receiver end
@private
# File lib/bunny_mock/channel.rb, line 364 def xchg_bound_to?(receiver, key, name) source = @connection.find_exchange name raise Bunny::NotFound.new("Exchange '#{name}' was not found", self, false) unless source source.routes_to? receiver, routing_key: key end
@private
# File lib/bunny_mock/channel.rb, line 372 def xchg_routes_to?(queue, key, xchg) exchange = @connection.find_exchange xchg raise Bunny::NotFound.new("Exchange '#{xchg}' was not found", self, false) unless exchange exchange.routes_to? queue, routing_key: key end
@private
# File lib/bunny_mock/channel.rb, line 388 def xchg_unbind(routing_key, name, exchange) source = @connection.find_exchange name raise Bunny::NotFound.new("Exchange '#{name}' was not found", self, false) unless source source.remove_route routing_key, exchange end
Private Instance Methods
@private
# File lib/bunny_mock/channel.rb, line 403 def update_acknowledgement_state(delivery_tag, new_state) @acknowledged_state[new_state][delivery_tag] = @acknowledged_state[:pending].delete(delivery_tag) end
@private
# File lib/bunny_mock/channel.rb, line 398 def xchg_find_or_create(name, opts = {}) @connection.find_exchange(name) || Exchange.declare(self, name, opts) end