class QueueingRabbit::Client::Bunny
Attributes
connection[R]
Public Class Methods
connect()
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 40 def self.connect self.new(::Bunny.new(QueueingRabbit.amqp_uri, connection_options)) end
connection_options()
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 35 def self.connection_options {:connection_timeout => QueueingRabbit.tcp_timeout, :heartbeat => QueueingRabbit.heartbeat} end
new(connection)
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 137 def initialize(connection) @connection = connection @connection.start end
Public Instance Methods
begin_worker_loop() { || ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 113 def begin_worker_loop yield if block_given? @actions_queue = [] @continue_worker_loop = true # We may need to add signal handling here while @continue_worker_loop @actions_queue.take_while { |block| block.call || true } sleep 1 end end
bind_queue(queue, exchange, options = {})
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 59 def bind_queue(queue, exchange, options = {}) queue.bind(exchange, options) end
close() { || ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 95 def close @connection.close yield if block_given? @continue_worker_loop = false end
define_exchange(channel = nil, name = '', options = {}) { |exchange| ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 68 def define_exchange(channel = nil, name = '', options = {}) options = options.dup type = options.delete(:type) exchange = type ? channel.send(type.to_sym, name, options) : channel.default_exchange yield exchange if block_given? exchange end
define_queue(channel, name, options = {}) { |queue| ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 53 def define_queue(channel, name, options = {}) queue = channel.queue(name.to_s, options) yield queue if block_given? queue end
enqueue(exchange, payload, options = {})
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 63 def enqueue(exchange, payload, options = {}) exchange.publish(payload, options) end
Also aliased as: publish
listen_queue(queue, options = {}) { |payload, metadata| ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 84 def listen_queue(queue, options = {}) queue.subscribe(options) do |delivery_info, properties, payload| yield payload, Metadata.new(queue.channel, delivery_info, properties) end end
next_tick(&block)
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 105 def next_tick(&block) if @continue_worker_loop @actions_queue << block else block.call end end
open?()
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 101 def open? @connection.open? end
open_channel(options = {}) { |ch, nil| ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 45 def open_channel(options = {}) ch = connection.create_channel(nil, options[:consumer_pool_size]) ch.prefetch(options[:prefetch]) if options[:prefetch] ch.confirm_select if options[:use_publisher_confirms] yield ch, nil ch end
purge_queue(queue) { || ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 90 def purge_queue(queue) queue.purge yield if block_given? end
queue_size(queue)
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 80 def queue_size(queue) queue.status[:message_count] end
wait_while_for(proc, seconds, interval = 0.5) { || ... }
click to toggle source
# File lib/queueing_rabbit/client/bunny.rb, line 124 def wait_while_for(proc, seconds, interval = 0.5) end_time = Time.now.to_i + seconds while Time.now.to_i < end_time do return unless proc.call sleep interval end yield end