class QueueingRabbit::Client::AMQP
Attributes
connection[R]
Public Class Methods
connect()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 40 def self.connect self.ensure_event_machine_is_running self.new(::AMQP.connect(QueueingRabbit.amqp_uri, connection_options)) end
connection_options()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 34 def self.connection_options {:timeout => QueueingRabbit.tcp_timeout, :heartbeat => QueueingRabbit.heartbeat, :on_tcp_connection_failure => self.callback(:on_tcp_failure)} end
ensure_event_machine_is_running()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 46 def self.ensure_event_machine_is_running run_event_machine unless EM.reactor_running? end
join_event_machine_thread()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 70 def self.join_event_machine_thread @event_machine_thread.join if @event_machine_thread end
new(connection)
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 166 def initialize(connection) @connection = connection setup_callbacks end
run_event_machine()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 50 def self.run_event_machine @event_machine_thread = Thread.new do EM.run do QueueingRabbit.trigger_event(:event_machine_started) end end wait_for_event_machine_to_start end
wait_for_event_machine_to_start()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 60 def self.wait_for_event_machine_to_start Timeout.timeout(5) do sleep 0.5 until EM.reactor_running? end rescue Timeout::Error => e description = "wait timeout exceeded while starting up EventMachine" fatal description raise QueueingRabbitError.new(description) end
Public Instance Methods
begin_worker_loop() { || ... }
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 153 def begin_worker_loop EM.run do yield if block_given? end end
bind_queue(queue, exchange, options = {})
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 94 def bind_queue(queue, exchange, options = {}) queue.bind(exchange, options) end
close() { || ... }
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 78 def close info "closing AMQP broker connection..." connection.disconnect do yield if block_given? EM.stop if EM.reactor_running? end end
define_exchange(channel, name = '', options = {}) { |exchange| ... }
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 112 def define_exchange(channel, name = '', options = {}) options = options.dup type = options.delete(:type) with_exchange = Proc.new do |exchange, _| yield exchange if block_given? end if type && type != :default channel.send(type.to_sym, name, options, &with_exchange) else channel.default_exchange.tap(&with_exchange) end end
define_queue(channel, queue_name, options={}) { |queue| ... }
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 88 def define_queue(channel, queue_name, options={}) channel.queue(queue_name.to_s, options) do |queue| yield queue if block_given? end end
enqueue(exchange, payload, options = {})
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 126 def enqueue(exchange, payload, options = {}) exchange.publish(payload, options) end
Also aliased as: publish
listen_queue(queue, options = {}) { |payload, metadata| ... }
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 98 def listen_queue(queue, options = {}, &block) queue.subscribe(options) do |metadata, payload| yield payload, metadata end end
next_tick(&block)
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 141 def next_tick(&block) EM.next_tick(&block) end
open?()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 74 def open? EM.reactor_running? && @connection.open? end
open_channel(options = {}) { |c, open_ok| ... }
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 104 def open_channel(options = {}) ::AMQP::Channel.new(connection, nil, options) do |c, open_ok| c.confirm_select if !!options[:use_publisher_confirms] c.on_error(&self.class.callback(:on_channel_error)) yield c, open_ok end end
purge_queue(queue) { || ... }
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 135 def purge_queue(queue) queue.purge do yield if block_given? end end
queue_size(queue)
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 131 def queue_size(queue) raise NotImplementedError end
wait_while_for(proc, period, _ = nil, &block)
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 145 def wait_while_for(proc, period, _ = nil, &block) if proc.call EM.add_timer(period, &block) else block.call end end
Private Instance Methods
setup_callbacks()
click to toggle source
# File lib/queueing_rabbit/client/amqp.rb, line 161 def setup_callbacks connection.on_tcp_connection_loss(&self.class.callback(:on_tcp_loss)) connection.on_recovery(&self.class.callback(:on_tcp_recovery)) end