class AMQP::Client::Connection::Channel
Constants
- QueueOk
Response when declaring a
Queue
@!attribute queue_name@return [String] The name of the queue
@!attribute message_count
@return [Integer] Number of messages in the queue at the time of declaration
@!attribute consumer_count
@return [Integer] Number of consumers subscribed to the queue at the time of declaration
Attributes
Channel
ID @return [Integer]
Public Class Methods
Should only be called from Connection
@param connection [Connection] The connection this channel belongs to @param id [Integer] ID of the channel @see Connection#channel
@api private
# File lib/amqp/client/channel.rb, line 15 def initialize(connection, id) @connection = connection @id = id @replies = ::Queue.new @consumers = {} @closed = nil @open = false @on_return = nil @confirm = nil @unconfirmed = ::Queue.new @unconfirmed_empty = ::Queue.new @basic_gets = ::Queue.new end
Public Instance Methods
Acknowledge a message @param delivery_tag [Integer] The delivery tag of the message to acknowledge @return [nil]
# File lib/amqp/client/channel.rb, line 361 def basic_ack(delivery_tag, multiple: false) write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple) nil end
Cancel/abort/stop a consumer @param consumer_tag [String] Tag of the consumer to cancel @param no_wait [Boolean] Will wait for a confirmation from the broker that the consumer is cancelled @return [nil]
# File lib/amqp/client/channel.rb, line 337 def basic_cancel(consumer_tag, no_wait: false) consumer = @consumers.fetch(consumer_tag) return if consumer.closed? write_bytes FrameBytes.basic_cancel(@id, consumer_tag) expect(:basic_cancel_ok) unless no_wait consumer.close nil end
Consume messages from a queue @param queue [String] Name of the queue to subscribe to @param tag [String] Custom consumer tag, will be auto assigned by the broker if empty.
Has to be uniqe among this channel's consumers only
@param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) @param exclusive [Boolean] When true only a single consumer can consume from the queue at a time @param arguments [Hash] Custom arguments for the consumer @param worker_threads [Integer] Number of threads processing messages,
0 means that the thread calling this method will process the messages and thus this method will block
@yield [Message] Delivered message from the queue @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
# File lib/amqp/client/channel.rb, line 312 def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1) write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments) tag, = expect(:basic_consume_ok) q = @consumers[tag] = ::Queue.new if worker_threads.zero? loop do yield (q.pop || break) end nil else threads = Array.new(worker_threads) do Thread.new do loop do yield (q.pop || break) end end end [tag, threads] end end
Get a message from a queue (by polling) @param queue_name [String] @param no_ack [Boolean] When false the message have to be manually acknowledged @return [Message] If the queue had a message @return [nil] If the queue doesn't have any messages
# File lib/amqp/client/channel.rb, line 229 def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) case (msg = @basic_gets.pop) when Message then msg when :basic_get_empty then nil when nil then raise Error::Closed.new(@id, *@closed) end end
@api private
# File lib/amqp/client/channel.rb, line 485 def basic_get_empty @basic_gets.push :basic_get_empty end
Negatively acknowledge a message @param delivery_tag [Integer] The delivery tag of the message to acknowledge @param multiple [Boolean] Nack all messages up to this message @param requeue [Boolean] Requeue the message @return [nil]
# File lib/amqp/client/channel.rb, line 371 def basic_nack(delivery_tag, multiple: false, requeue: false) write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue) nil end
Publishes a message to an exchange @param body [String] The body, can be a string or a byte array @param exchange [String] Name of the exchange to publish to @param routing_key [String] The routing key that the exchange might use to route the message to a queue @param properties [Properties] @option properties [Boolean] mandatory The message will be returned if the message can't be routed to a queue @option properties [Boolean] persistent Same as delivery_mode: 2 @option properties [String] content_type Content type of the message body @option properties [String] content_encoding Content encoding of the body @option properties [Hash<String, Object>] headers Custom headers @option properties [Integer] delivery_mode 2 for persisted message, transient messages for all other values @option properties [Integer] priority A priority of the message (between 0 and 255) @option properties [Integer] correlation_id A correlation id, most often used used for RPC communication @option properties [String] reply_to Queue
to reply RPC responses to @option properties [Integer, String] expiration Number of seconds the message will stay in the queue @option properties [String] message_id Can be used to uniquely identify the message, e.g. for deduplication @option properties [Date] timestamp Often used for the time the message was originally generated @option properties [String] type Can indicate what kind of message this is @option properties [String] user_id Can be used to verify that this is the user that published the message @option properties [String] app_id Can be used to indicates which app that generated the message @return [nil]
# File lib/amqp/client/channel.rb, line 259 def basic_publish(body, exchange, routing_key, **properties) body_max = @connection.frame_max - 8 id = @id mandatory = properties.delete(:mandatory) || false case properties.delete(:persistent) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) @unconfirmed.push @confirm += 1 if @confirm return end write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) pos = 0 while pos < body.bytesize # split body into multiple frame_max frames len = [body_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end @unconfirmed.push @confirm += 1 if @confirm nil end
Publish a message and block until the message has confirmed it has received it @param (see basic_publish
) @option (see basic_publish
) @return [Boolean] True if the message was successfully published @raise (see basic_publish
)
# File lib/amqp/client/channel.rb, line 294 def basic_publish_confirm(body, exchange, routing_key, **properties) confirm_select(no_wait: true) basic_publish(body, exchange, routing_key, **properties) wait_for_confirms end
Specify how many messages to prefetch for consumers with `no_ack: false` @param prefetch_count [Integer] Number of messages to maxium keep in flight @param prefetch_size [Integer] Number of bytes to maxium keep in flight @param global [Boolean] If true the limit will apply to channel rather than the consumer @return [nil]
# File lib/amqp/client/channel.rb, line 352 def basic_qos(prefetch_count, prefetch_size: 0, global: false) write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global) expect :basic_qos_ok nil end
Recover all the unacknowledge messages @param requeue [Boolean] If false the currently unack:ed messages will be deliviered to this consumer again,
if true to any consumer
@return [nil]
# File lib/amqp/client/channel.rb, line 389 def basic_recover(requeue: false) write_bytes FrameBytes.basic_recover(@id, requeue: requeue) expect :basic_recover_ok nil end
Reject a message @param delivery_tag [Integer] The delivery tag of the message to acknowledge @param requeue [Boolean] Requeue the message into the queue again @return [nil]
# File lib/amqp/client/channel.rb, line 380 def basic_reject(delivery_tag, requeue: false) write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue) nil end
@api private
# File lib/amqp/client/channel.rb, line 501 def body_delivered(body_part) @next_body.write(body_part) return unless @next_body.pos == @next_body_size @next_msg.body = @next_body.string next_message_finished! end
Gracefully close a connection @return [nil]
# File lib/amqp/client/channel.rb, line 54 def close(reason: "", code: 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) @closed = [:channel, code, reason] expect :channel_close_ok @replies.close @basic_gets.close @unconfirmed_empty.close @consumers.each_value(&:close) nil end
@api private
# File lib/amqp/client/channel.rb, line 510 def close_consumer(tag) @consumers.fetch(tag).close nil end
Called when channel is closed by broker @param level [Symbol] :connection or :channel @return [nil] @api private
# File lib/amqp/client/channel.rb, line 71 def closed!(level, code, reason, classid, methodid) @closed = [level, code, reason, classid, methodid] @replies.close @basic_gets.close @unconfirmed_empty.close @consumers.each_value(&:close) nil end
Called by Connection
when received ack/nack from broker @api private
# File lib/amqp/client/channel.rb, line 423 def confirm(args) ack_or_nack, delivery_tag, multiple = *args loop do tag = @unconfirmed.pop(true) break if tag == delivery_tag next if multiple && tag < delivery_tag @unconfirmed << tag # requeue rescue ThreadError break end return unless @unconfirmed.empty? ok = ack_or_nack == :ack @unconfirmed_empty.push(ok) until @unconfirmed_empty.num_waiting.zero? end
Put the channel in confirm mode, each published message will then be confirmed by the broker @param no_wait [Boolean] If false the method will block until the broker has confirmed the request @return [nil]
# File lib/amqp/client/channel.rb, line 401 def confirm_select(no_wait: false) return if @confirm write_bytes FrameBytes.confirm_select(@id, no_wait) expect :confirm_select_ok unless no_wait @confirm = 0 nil end
Bind an exchange to another exchange @param destination [String] Name of the exchange to bind @param source [String] Name of the exchange to bind to @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) @param arguments [Hash] Message
headers to match on, but only when bound to header exchanges @return [nil]
# File lib/amqp/client/channel.rb, line 123 def exchange_bind(destination, source, binding_key, arguments: {}) write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments) expect :exchange_bind_ok nil end
Declare an exchange @param name [String] Name of the exchange @param type [String] Type of exchange (amq.direct, amq.fanout, amq.topic, amq.headers, etc.) @param passive [Boolean] If true raise an exception if the exchange doesn't already exists @param durable [Boolean] If true the exchange will persist between broker restarts,
also a requirement for persistent messages
@param auto_delete [Boolean] If true the exchange will be deleted when the last queue/exchange is unbound @param internal [Boolean] If true the exchange can't be published to directly @param arguments [Hash] Custom arguments @return [nil]
# File lib/amqp/client/channel.rb, line 100 def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments) expect :exchange_declare_ok nil end
Delete an exchange @param name [String] Name of the exchange @param if_unused [Boolean] If true raise an exception if queues/exchanges is bound to this exchange @param no_wait [Boolean] If true don't wait for a broker confirmation @return [nil]
# File lib/amqp/client/channel.rb, line 111 def exchange_delete(name, if_unused: false, no_wait: false) write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) expect :exchange_delete_ok unless no_wait nil end
Unbind an exchange from another exchange @param destination [String] Name of the exchange to unbind @param source [String] Name of the exchange to unbind from @param binding_key [String] Binding key which the queue is bound to the exchange with @param arguments [Hash] Arguments matching the binding that's being removed @return [nil]
# File lib/amqp/client/channel.rb, line 135 def exchange_unbind(destination, source, binding_key, arguments: {}) write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments) expect :exchange_unbind_ok nil end
@api private
# File lib/amqp/client/channel.rb, line 490 def header_delivered(body_size, properties) @next_msg.properties = properties if body_size.zero? next_message_finished! else @next_body = StringIO.new(String.new(capacity: body_size)) @next_body_size = body_size end end
Override inspect
@api private
# File lib/amqp/client/channel.rb, line 31 def inspect "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{!@confirm.nil?}"\ " consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>" end
@api private
# File lib/amqp/client/channel.rb, line 480 def message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) @next_msg = Message.new(self, consumer_tag, delivery_tag, exchange, routing_key, redelivered) end
@api private
# File lib/amqp/client/channel.rb, line 475 def message_returned(reply_code, reply_text, exchange, routing_key) @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key) end
Handle returned messages in this block. If not set the message will just be logged to STDERR @yield [ReturnMessage] Messages returned by the broker when a publish has failed @return nil
# File lib/amqp/client/channel.rb, line 83 def on_return(&block) @on_return = block nil end
Open the channel (called from Connection
) @return [Channel] self @api private
# File lib/amqp/client/channel.rb, line 43 def open return self if @open @open = true write_bytes FrameBytes.channel_open(@id) expect(:channel_open_ok) self end
Bind a queue to an exchange @param name [String] Name of the queue to bind @param exchange [String] Name of the exchange to bind to @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) @param arguments [Hash] Message
headers to match on, but only when bound to header exchanges @return [nil]
# File lib/amqp/client/channel.rb, line 193 def queue_bind(name, exchange, binding_key, arguments: {}) write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments) expect :queue_bind_ok nil end
Create a queue (operation is idempotent) @param name [String] Name of the queue, can be empty, but will then be generated by the broker @param passive [Boolean] If true an exception will be raised if the queue doesn't already exists @param durable [Boolean] If true the queue will survive broker restarts,
messages in the queue will only survive if they are published as persistent
@param exclusive [Boolean] If true the queue will be deleted when the channel is closed @param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming
(it won't be deleted until at least one consumer has consumed from it)
@param arguments [Hash] Custom arguments, such as queue-ttl etc. @return [QueueOk] The QueueOk
struct got `queue_name`, `message_count` and `consumer_count` properties
# File lib/amqp/client/channel.rb, line 163 def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) durable = false if name.empty? exclusive = true if name.empty? auto_delete = true if name.empty? write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments) name, message_count, consumer_count = expect(:queue_declare_ok) QueueOk.new(name, message_count, consumer_count) end
Delete a queue @param name [String] Name of the queue @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise @param no_wait [Boolean] Don't wait for a broker confirmation if true @return [Integer] Number of messages in queue when deleted @return [nil] If no_wait was set true
# File lib/amqp/client/channel.rb, line 181 def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) message_count, = expect :queue_delete unless no_wait message_count end
Purge a queue @param name [String] Name of the queue @param no_wait [Boolean] Don't wait for a broker confirmation if true @return [nil]
# File lib/amqp/client/channel.rb, line 203 def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) expect :queue_purge_ok unless no_wait nil end
Unbind a queue from an exchange @param name [String] Name of the queue to unbind @param exchange [String] Name of the exchange to unbind from @param binding_key [String] Binding key which the queue is bound to the exchange with @param arguments [Hash] Arguments matching the binding that's being removed @return [nil]
# File lib/amqp/client/channel.rb, line 215 def queue_unbind(name, exchange, binding_key, arguments: {}) write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) expect :queue_unbind_ok nil end
@api private
# File lib/amqp/client/channel.rb, line 470 def reply(args) @replies.push(args) end
Commmit a transaction, requires that the channel is in transaction mode @return [nil]
# File lib/amqp/client/channel.rb, line 453 def tx_commit write_bytes FrameBytes.tx_commit(@id) expect :tx_commit_ok nil end
Rollback a transaction, requires that the channel is in transaction mode @return [nil]
# File lib/amqp/client/channel.rb, line 461 def tx_rollback write_bytes FrameBytes.tx_rollback(@id) expect :tx_rollback_ok nil end
Put the channel in transaction mode, make sure that you tx_commit
or tx_rollback
after publish @return [nil]
# File lib/amqp/client/channel.rb, line 445 def tx_select write_bytes FrameBytes.tx_select(@id) expect :tx_select_ok nil end
Block until all publishes messages are confirmed @return [Boolean] True if all message where positivly acknowledged, false if not
# File lib/amqp/client/channel.rb, line 412 def wait_for_confirms return true if @unconfirmed.empty? ok = @unconfirmed_empty.pop raise Error::Closed.new(@id, *@closed) if ok.nil? ok end
Private Instance Methods
# File lib/amqp/client/channel.rb, line 542 def expect(expected_frame_type) frame_type, *args = @replies.pop raise Error::Closed.new(@id, *@closed) if frame_type.nil? raise Error::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type args end
# File lib/amqp/client/channel.rb, line 517 def next_message_finished! next_msg = @next_msg if next_msg.is_a? ReturnMessage if @on_return Thread.new { @on_return.call(next_msg) } else warn "AMQP-Client message returned: #{msg.inspect}" end elsif next_msg.consumer_tag.nil? @basic_gets.push next_msg else Thread.pass until (consumer = @consumers[next_msg.consumer_tag]) consumer.push next_msg end nil ensure @next_msg = @next_body = @next_body_size = nil end
# File lib/amqp/client/channel.rb, line 536 def write_bytes(*bytes) raise Error::Closed.new(@id, *@closed) if @closed @connection.write_bytes(*bytes) end