class QueueingRabbit::Client::Bunny

Attributes

connection[R]

Public Class Methods

connect() click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 40
def self.connect
  self.new(::Bunny.new(QueueingRabbit.amqp_uri,
           connection_options))
end
connection_options() click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 35
def self.connection_options
  {:connection_timeout => QueueingRabbit.tcp_timeout,
   :heartbeat => QueueingRabbit.heartbeat}
end
new(connection) click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 137
def initialize(connection)
  @connection = connection
  @connection.start
end

Public Instance Methods

begin_worker_loop() { || ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 113
def begin_worker_loop
  yield if block_given?
  @actions_queue = []
  @continue_worker_loop = true
  # We may need to add signal handling here
  while @continue_worker_loop
    @actions_queue.take_while { |block| block.call || true }
    sleep 1
  end
end
bind_queue(queue, exchange, options = {}) click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 59
def bind_queue(queue, exchange, options = {})
  queue.bind(exchange, options)
end
close() { || ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 95
def close
  @connection.close
  yield if block_given?
  @continue_worker_loop = false
end
define_exchange(channel = nil, name = '', options = {}) { |exchange| ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 68
def define_exchange(channel = nil, name = '', options = {})
  options = options.dup
  type = options.delete(:type)

  exchange = type ? channel.send(type.to_sym, name, options) :
                    channel.default_exchange

  yield exchange if block_given?

  exchange
end
define_queue(channel, name, options = {}) { |queue| ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 53
def define_queue(channel, name, options = {})
  queue = channel.queue(name.to_s, options)
  yield queue if block_given?
  queue
end
enqueue(exchange, payload, options = {}) click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 63
def enqueue(exchange, payload, options = {})
  exchange.publish(payload, options)
end
Also aliased as: publish
listen_queue(queue, options = {}) { |payload, metadata| ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 84
def listen_queue(queue, options = {})
  queue.subscribe(options) do |delivery_info, properties, payload|
    yield payload, Metadata.new(queue.channel, delivery_info, properties)
  end
end
next_tick(&block) click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 105
def next_tick(&block)
  if @continue_worker_loop
    @actions_queue << block
  else
    block.call
  end
end
open?() click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 101
def open?
  @connection.open?
end
open_channel(options = {}) { |ch, nil| ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 45
def open_channel(options = {})
  ch = connection.create_channel(nil, options[:consumer_pool_size])
  ch.prefetch(options[:prefetch]) if options[:prefetch]
  ch.confirm_select if options[:use_publisher_confirms]
  yield ch, nil
  ch
end
publish(exchange, payload, options = {})
Alias for: enqueue
purge_queue(queue) { || ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 90
def purge_queue(queue)
  queue.purge
  yield if block_given?
end
queue_size(queue) click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 80
def queue_size(queue)
  queue.status[:message_count]
end
wait_while_for(proc, seconds, interval = 0.5) { || ... } click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 124
def wait_while_for(proc, seconds, interval = 0.5)
  end_time = Time.now.to_i + seconds

  while Time.now.to_i < end_time do
    return unless proc.call
    sleep interval
  end

  yield
end