class AMQP::Client

AMQP 0-9-1 Client @see Connection

Constants

VERSION

Version of the client library

Public Class Methods

new(uri = "", **options) click to toggle source

Create a new Client object, this won't establish a connection yet, use {#connect} or {#start} for that @param uri [String] URL on the format amqp://username:password@hostname/vhost,

use amqps:// for encrypted connection

@option options [Boolean] connection_name (PROGRAM_NAME) Set a name for the connection to be able to identify

the client from the broker

@option options [Boolean] verify_peer (true) Verify broker's TLS certificate, set to false for self-signed certs @option options [Integer] heartbeat (0) Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead @option options [Integer] frame_max (131_072) Maximum frame size,

the smallest of the client's and the broker's values will be used

@option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open.

Maxium allowed is 65_536.  The smallest of the client's and the broker's value will be used.
# File lib/amqp/client.rb, line 26
def initialize(uri = "", **options)
  @uri = uri
  @options = options

  @queues = {}
  @exchanges = {}
  @subscriptions = Set.new
  @connq = SizedQueue.new(1)
end

Public Instance Methods

bind(queue, exchange, binding_key, arguments: {}) click to toggle source

Bind a queue to an exchange @param queue [String] Name of the queue to bind @param exchange [String] Name of the exchange to bind to @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) @param arguments [Hash] Message headers to match on (only relevant for header exchanges) @return [nil]

# File lib/amqp/client.rb, line 187
def bind(queue, exchange, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).queue_bind(queue, exchange, binding_key, arguments: arguments)
  end
end
connect(read_loop_thread: true) click to toggle source

Establishes and returns a new AMQP connection @see Connection#initialize @return [Connection]

# File lib/amqp/client.rb, line 41
def connect(read_loop_thread: true)
  Connection.new(@uri, read_loop_thread: read_loop_thread, **@options)
end
delete_exchange(name) click to toggle source

Delete an exchange @param name [String] Name of the exchange @return [nil]

# File lib/amqp/client.rb, line 257
def delete_exchange(name)
  with_connection do |conn|
    conn.channel(1).exchange_delete(name)
    @exchanges.delete(name)
    nil
  end
end
delete_queue(name, if_unused: false, if_empty: false) click to toggle source

Delete a queue @param name [String] Name of the queue @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise @return [Integer] Number of messages in the queue when deleted

# File lib/amqp/client.rb, line 219
def delete_queue(name, if_unused: false, if_empty: false)
  with_connection do |conn|
    msgs = conn.channel(1).queue_delete(name, if_unused: if_unused, if_empty: if_empty)
    @queues.delete(name)
    msgs
  end
end
exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) click to toggle source

Declare an exchange and return a high level Exchange object @return [Exchange]

# File lib/amqp/client.rb, line 111
def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {})
  @exchanges.fetch(name) do
    with_connection do |conn|
      conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete,
                                                   internal: internal, arguments: arguments)
    end
    @exchanges[name] = Exchange.new(self, name)
  end
end
exchange_bind(destination, source, binding_key, arguments: {}) click to toggle source

Bind an exchange to an exchange @param destination [String] Name of the exchange to bind @param source [String] Name of the exchange to bind to @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) @param arguments [Hash] Message headers to match on (only relevant for header exchanges) @return [nil]

# File lib/amqp/client.rb, line 236
def exchange_bind(destination, source, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).exchange_bind(destination, source, binding_key, arguments: arguments)
  end
end
exchange_unbind(destination, source, binding_key, arguments: {}) click to toggle source

Unbind an exchange from an exchange @param destination [String] Name of the exchange to unbind @param source [String] Name of the exchange to unbind from @param binding_key [String] Binding key which the exchange is bound to the exchange with @param arguments [Hash] Arguments matching the binding that's being removed @return [nil]

# File lib/amqp/client.rb, line 248
def exchange_unbind(destination, source, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).exchange_unbind(destination, source, binding_key, arguments: arguments)
  end
end
publish(body, exchange, routing_key, **properties) click to toggle source

Publish a (persistent) message and wait for confirmation @param (see Connection::Channel#basic_publish_confirm) @option (see Connection::Channel#basic_publish_confirm) @return (see Connection::Channel#basic_publish_confirm) @raise (see Connection::Channel#basic_publish_confirm)

# File lib/amqp/client.rb, line 129
def publish(body, exchange, routing_key, **properties)
  with_connection do |conn|
    properties = { delivery_mode: 2 }.merge!(properties)
    conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties)
  end
end
publish_and_forget(body, exchange, routing_key, **properties) click to toggle source

Publish a (persistent) message but don't wait for a confirmation @param (see Connection::Channel#basic_publish) @option (see Connection::Channel#basic_publish) @return (see Connection::Channel#basic_publish) @raise (see Connection::Channel#basic_publish)

# File lib/amqp/client.rb, line 141
def publish_and_forget(body, exchange, routing_key, **properties)
  with_connection do |conn|
    properties = { delivery_mode: 2 }.merge!(properties)
    conn.channel(1).basic_publish(body, exchange, routing_key, **properties)
  end
end
purge(queue) click to toggle source

Purge a queue @param queue [String] Name of the queue @return [nil]

# File lib/amqp/client.rb, line 208
def purge(queue)
  with_connection do |conn|
    conn.channel(1).queue_purge(queue)
  end
end
queue(name, durable: true, auto_delete: false, arguments: {}) click to toggle source

Declare a queue @param name [String] Name of the queue @param durable [Boolean] If true the queue will survive broker restarts,

messages in the queue will only survive if they are published as persistent

@param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming

(it won't be deleted until at least one consumer has consumed from it)

@param arguments [Hash] Custom arguments, such as queue-ttl etc. @return [Queue]

# File lib/amqp/client.rb, line 98
def queue(name, durable: true, auto_delete: false, arguments: {})
  raise ArgumentError, "Currently only supports named, durable queues" if name.empty?

  @queues.fetch(name) do
    with_connection do |conn|
      conn.channel(1).queue_declare(name, durable: durable, auto_delete: auto_delete, arguments: arguments)
    end
    @queues[name] = Queue.new(self, name)
  end
end
start() click to toggle source

Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first @return [self]

# File lib/amqp/client.rb, line 47
def start
  @stopped = false
  Thread.new(connect(read_loop_thread: false)) do |conn|
    Thread.abort_on_exception = true # Raising an unhandled exception is a bug
    loop do
      break if @stopped

      conn ||= connect(read_loop_thread: false)
      Thread.new do
        # restore connection in another thread, read_loop have to run
        conn.channel(1) # reserve channel 1 for publishes
        @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk|
          ch = conn.channel
          ch.basic_qos(prefetch)
          ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk)
        end
        @connq << conn
      end
      conn.read_loop # blocks until connection is closed, then reconnect
    rescue Error => e
      warn "AMQP-Client reconnect error: #{e.inspect}"
      sleep @options[:reconnect_interval] || 1
    ensure
      conn = nil
    end
  end
  self
end
stop() click to toggle source

Close the currently open connection @return [nil]

# File lib/amqp/client.rb, line 78
def stop
  return if @stopped

  @stopped = true
  conn = @connq.pop
  conn.close
  nil
end
subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) click to toggle source

Consume messages from a queue @param queue [String] Name of the queue to subscribe to @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false @param worker_threads [Integer] Number of threads processing messages,

0 means that the thread calling this method will be blocked

@param arguments [Hash] Custom arguments to the consumer @yield [Message] Delivered message from the queue @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled

# File lib/amqp/client.rb, line 169
def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
  @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk]

  with_connection do |conn|
    ch = conn.channel
    ch.basic_qos(prefetch)
    ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg|
      blk.call(msg)
    end
  end
end
unbind(queue, exchange, binding_key, arguments: {}) click to toggle source

Unbind a queue from an exchange @param queue [String] Name of the queue to unbind @param exchange [String] Name of the exchange to unbind from @param binding_key [String] Binding key which the queue is bound to the exchange with @param arguments [Hash] Arguments matching the binding that's being removed @return [nil]

# File lib/amqp/client.rb, line 199
def unbind(queue, exchange, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments: arguments)
  end
end
wait_for_confirms() click to toggle source

Wait for unconfirmed publishes @return [Boolean] True if successful, false if any message negatively acknowledged

# File lib/amqp/client.rb, line 150
def wait_for_confirms
  with_connection do |conn|
    conn.channel(1).wait_for_confirms
  end
end

Private Instance Methods

with_connection() { |conn| ... } click to toggle source

@!endgroup

# File lib/amqp/client.rb, line 269
def with_connection
  conn = nil
  loop do
    conn = @connq.pop
    next if conn.closed?

    break
  end
  begin
    yield conn
  ensure
    @connq << conn unless conn.closed?
  end
end