class Rack::App::Worker::RabbitMQ
Public Instance Methods
broadcast_exchange(name)
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 20 def broadcast_exchange(name) exchange_for('broadcast', name) end
channel()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 10 def channel @channel ||= create_channel @channel = create_channel unless @channel.open? @channel end
create_broadcast_queue(name)
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 31 def create_broadcast_queue(name) queue = channel.queue('', :exclusive => true, :auto_delete => false) queue.bind(broadcast_exchange(name)) unless exchange_already_bind?(queue, name) return queue end
send_exchange(name)
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 16 def send_exchange(name) exchange_for('send', name) end
send_queue(name)
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 24 def send_queue(name) queue_name = "#{namespace}.#{cluster}.#{name}" queue = channel.queue(queue_name, :durable => true, :auto_delete => false) queue.bind(send_exchange(name)) unless exchange_already_bind?(queue, name) return queue end
session()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 5 def session check_connection @session end
Protected Instance Methods
check_connection()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 52 def check_connection case @session when ::Bunny::Session @session.close if @session.status == :not_connected create_session if @session.closed? when NilClass create_session end rescue ::Bunny::TCPConnectionFailedForAllHosts sleep(1) retry end
cluster()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 74 def cluster Rack::App::Worker::Environment.worker_cluster end
create_channel()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 82 def create_channel new_channel = session.create_channel new_channel.basic_qos(Rack::App::Worker::Environment.queue_qos) new_channel end
create_new_session()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 46 def create_new_session session = ::Bunny.new session.start return session end
create_session()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 68 def create_session new_session = create_new_session new_session.logger.level = Logger::ERROR @session = new_session end
exchange_already_bind?(queue, name)
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 39 def exchange_already_bind?(queue, name) queue.recover_bindings.any? { |binding| binding[:exchange] == exchange_name('send', name) } rescue Timeout::Error sleep(rand(1..5)) retry end
exchange_cache()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 88 def exchange_cache @exchange_cache ||= {} end
exchange_for(type, name)
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 92 def exchange_for(type, name) exchange_cache[name] ||= proc { channel.fanout(exchange_name(type, name), :durable => true) }.call end
exchange_name(type, name)
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 98 def exchange_name(type, name) "#{namespace}.#{type}.#{name}" end
namespace()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 78 def namespace Rack::App::Worker::Environment.namespace end
session_finalizer!()
click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 102 def session_finalizer! @session_finalizer ||= lambda do this = self Kernel.at_exit do begin session = this.instance_variable_get(:@session) session && session.respond_to?(:close) && session.close rescue Timeout::Error nil end end true end.call end