class AMQP::Client
AMQP
0-9-1 Client
@see Connection
Constants
- VERSION
Version of the client library
Public Class Methods
Create a new Client
object, this won't establish a connection yet, use {#connect} or {#start} for that @param uri [String] URL on the format amqp://username:password@hostname/vhost,
use amqps:// for encrypted connection
@option options [Boolean] connection_name (PROGRAM_NAME) Set a name for the connection to be able to identify
the client from the broker
@option options [Boolean] verify_peer (true) Verify broker's TLS certificate, set to false for self-signed certs @option options [Integer] heartbeat (0) Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead @option options [Integer] frame_max (131_072) Maximum frame size,
the smallest of the client's and the broker's values will be used
@option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open.
Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used.
# File lib/amqp/client.rb, line 26 def initialize(uri = "", **options) @uri = uri @options = options @queues = {} @exchanges = {} @subscriptions = Set.new @connq = SizedQueue.new(1) end
Public Instance Methods
Bind a queue to an exchange @param queue [String] Name of the queue to bind @param exchange [String] Name of the exchange to bind to @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) @param arguments [Hash] Message
headers to match on (only relevant for header exchanges) @return [nil]
# File lib/amqp/client.rb, line 187 def bind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_bind(queue, exchange, binding_key, arguments: arguments) end end
Establishes and returns a new AMQP
connection @see Connection#initialize @return [Connection]
# File lib/amqp/client.rb, line 41 def connect(read_loop_thread: true) Connection.new(@uri, read_loop_thread: read_loop_thread, **@options) end
Delete an exchange @param name [String] Name of the exchange @return [nil]
# File lib/amqp/client.rb, line 257 def delete_exchange(name) with_connection do |conn| conn.channel(1).exchange_delete(name) @exchanges.delete(name) nil end end
Delete a queue @param name [String] Name of the queue @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise @return [Integer] Number of messages in the queue when deleted
# File lib/amqp/client.rb, line 219 def delete_queue(name, if_unused: false, if_empty: false) with_connection do |conn| msgs = conn.channel(1).queue_delete(name, if_unused: if_unused, if_empty: if_empty) @queues.delete(name) msgs end end
Declare an exchange and return a high level Exchange
object @return [Exchange]
# File lib/amqp/client.rb, line 111 def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete, internal: internal, arguments: arguments) end @exchanges[name] = Exchange.new(self, name) end end
Bind an exchange to an exchange @param destination [String] Name of the exchange to bind @param source [String] Name of the exchange to bind to @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) @param arguments [Hash] Message
headers to match on (only relevant for header exchanges) @return [nil]
# File lib/amqp/client.rb, line 236 def exchange_bind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_bind(destination, source, binding_key, arguments: arguments) end end
Unbind an exchange from an exchange @param destination [String] Name of the exchange to unbind @param source [String] Name of the exchange to unbind from @param binding_key [String] Binding key which the exchange is bound to the exchange with @param arguments [Hash] Arguments matching the binding that's being removed @return [nil]
# File lib/amqp/client.rb, line 248 def exchange_unbind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_unbind(destination, source, binding_key, arguments: arguments) end end
Publish a (persistent) message and wait for confirmation @param (see Connection::Channel#basic_publish_confirm
) @option (see Connection::Channel#basic_publish_confirm
) @return (see Connection::Channel#basic_publish_confirm
) @raise (see Connection::Channel#basic_publish_confirm
)
# File lib/amqp/client.rb, line 129 def publish(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties) end end
Publish a (persistent) message but don't wait for a confirmation @param (see Connection::Channel#basic_publish
) @option (see Connection::Channel#basic_publish
) @return (see Connection::Channel#basic_publish
) @raise (see Connection::Channel#basic_publish
)
# File lib/amqp/client.rb, line 141 def publish_and_forget(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish(body, exchange, routing_key, **properties) end end
Purge a queue @param queue [String] Name of the queue @return [nil]
# File lib/amqp/client.rb, line 208 def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end
Declare a queue @param name [String] Name of the queue @param durable [Boolean] If true the queue will survive broker restarts,
messages in the queue will only survive if they are published as persistent
@param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming
(it won't be deleted until at least one consumer has consumed from it)
@param arguments [Hash] Custom arguments, such as queue-ttl etc. @return [Queue]
# File lib/amqp/client.rb, line 98 def queue(name, durable: true, auto_delete: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| conn.channel(1).queue_declare(name, durable: durable, auto_delete: auto_delete, arguments: arguments) end @queues[name] = Queue.new(self, name) end end
Opens an AMQP
connection using the high level API, will try to reconnect if successfully connected at first @return [self]
# File lib/amqp/client.rb, line 47 def start @stopped = false Thread.new(connect(read_loop_thread: false)) do |conn| Thread.abort_on_exception = true # Raising an unhandled exception is a bug loop do break if @stopped conn ||= connect(read_loop_thread: false) Thread.new do # restore connection in another thread, read_loop have to run conn.channel(1) # reserve channel 1 for publishes @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk) end @connq << conn end conn.read_loop # blocks until connection is closed, then reconnect rescue Error => e warn "AMQP-Client reconnect error: #{e.inspect}" sleep @options[:reconnect_interval] || 1 ensure conn = nil end end self end
Close the currently open connection @return [nil]
# File lib/amqp/client.rb, line 78 def stop return if @stopped @stopped = true conn = @connq.pop conn.close nil end
Consume messages from a queue @param queue [String] Name of the queue to subscribe to @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false @param worker_threads [Integer] Number of threads processing messages,
0 means that the thread calling this method will be blocked
@param arguments [Hash] Custom arguments to the consumer @yield [Message] Delivered message from the queue @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
# File lib/amqp/client.rb, line 169 def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk] with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg| blk.call(msg) end end end
Unbind a queue from an exchange @param queue [String] Name of the queue to unbind @param exchange [String] Name of the exchange to unbind from @param binding_key [String] Binding key which the queue is bound to the exchange with @param arguments [Hash] Arguments matching the binding that's being removed @return [nil]
# File lib/amqp/client.rb, line 199 def unbind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments: arguments) end end
Wait for unconfirmed publishes @return [Boolean] True if successful, false if any message negatively acknowledged
# File lib/amqp/client.rb, line 150 def wait_for_confirms with_connection do |conn| conn.channel(1).wait_for_confirms end end
Private Instance Methods
@!endgroup
# File lib/amqp/client.rb, line 269 def with_connection conn = nil loop do conn = @connq.pop next if conn.closed? break end begin yield conn ensure @connq << conn unless conn.closed? end end