class Rack::App::Worker::RabbitMQ

Public Instance Methods

broadcast_exchange(name) click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 20
def broadcast_exchange(name)
  exchange_for('broadcast', name)
end
channel() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 10
def channel
  @channel ||= create_channel
  @channel = create_channel unless @channel.open?
  @channel
end
create_broadcast_queue(name) click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 31
def create_broadcast_queue(name)
  queue = channel.queue('', :exclusive => true, :auto_delete => false)
  queue.bind(broadcast_exchange(name)) unless exchange_already_bind?(queue, name)
  return queue
end
send_exchange(name) click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 16
def send_exchange(name)
  exchange_for('send', name)
end
send_queue(name) click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 24
def send_queue(name)
  queue_name = "#{namespace}.#{cluster}.#{name}"
  queue = channel.queue(queue_name, :durable => true, :auto_delete => false)
  queue.bind(send_exchange(name)) unless exchange_already_bind?(queue, name)
  return queue
end
session() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 5
def session
  check_connection
  @session
end

Protected Instance Methods

check_connection() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 52
def check_connection
  case @session

    when ::Bunny::Session
      @session.close if @session.status == :not_connected
      create_session if @session.closed?

    when NilClass
      create_session

  end
rescue ::Bunny::TCPConnectionFailedForAllHosts
  sleep(1)
  retry
end
cluster() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 74
def cluster
  Rack::App::Worker::Environment.worker_cluster
end
create_channel() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 82
def create_channel
  new_channel = session.create_channel
  new_channel.basic_qos(Rack::App::Worker::Environment.queue_qos)
  new_channel
end
create_new_session() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 46
def create_new_session
  session = ::Bunny.new
  session.start
  return session
end
create_session() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 68
def create_session
  new_session = create_new_session
  new_session.logger.level = Logger::ERROR
  @session = new_session
end
exchange_already_bind?(queue, name) click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 39
def exchange_already_bind?(queue, name)
  queue.recover_bindings.any? { |binding| binding[:exchange] == exchange_name('send', name) }
rescue Timeout::Error
  sleep(rand(1..5))
  retry
end
exchange_cache() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 88
def exchange_cache
  @exchange_cache ||= {}
end
exchange_for(type, name) click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 92
def exchange_for(type, name)
  exchange_cache[name] ||= proc {
    channel.fanout(exchange_name(type, name), :durable => true)
  }.call
end
exchange_name(type, name) click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 98
def exchange_name(type, name)
  "#{namespace}.#{type}.#{name}"
end
namespace() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 78
def namespace
  Rack::App::Worker::Environment.namespace
end
session_finalizer!() click to toggle source
# File lib/rack/app/worker/rabbit_mq.rb, line 102
def session_finalizer!
  @session_finalizer ||= lambda do
    this = self
    Kernel.at_exit do
      begin
        session = this.instance_variable_get(:@session)
        session && session.respond_to?(:close) && session.close
      rescue Timeout::Error
        nil
      end
    end
    true
  end.call
end