class Agni::Queue

Public Class Methods

new(queue_name, messenger, options={}) click to toggle source

Core method responsible for catching queue name problems, like nil values and empty strings.

@param queue_name [String] the name of this queue @param messenger [Agni::Messenger] the messenger object

with which this queue is associated

@param options [Hash] options that will be passed to the AMQP

gem during queue creation
# File lib/agni/queue.rb, line 13
def initialize(queue_name, messenger, options={})
  if queue_name.nil? || queue_name.empty?
    raise ArgumentError, 'Queue name must be present when creating a queue'
  end
  self.configure_logs
  @logical_queue_name = queue_name

  begin
    @queues = PRIORITY_LEVELS.map do |priority|
      create_queue(messenger, priority, options)
    end
  rescue AMQP::IncompatibleOptionsError
    raise AgniError,
    "One of the queues needed to create #{@logical_queue_name} " +
      "has already been created with different options!"
  end

  # The in-memory queue we use to prioritize incoming messages of
  # different priorities
  @queue_mutex = Mutex.new
  @memory_queue = Containers::MinHeap.new
end

Public Instance Methods

create_queue_name(base_name, priority) click to toggle source

Given a base name and a priority, creates a queue name suitable for use in naming an underlying AMQP queue.

@param base_name [String] the base name of the queue. This is

typcially just the queue name used when creating this
+Agni::Queue+ object.

@param priority [String] valid priorities are in the range 0

through 9 inclusive.
# File lib/agni/queue.rb, line 103
def create_queue_name(base_name, priority)
  "#{base_name}.#{priority}"
end
publish(payload, priority=DEFAULT_PRIORITY, options={}) click to toggle source

Publishes a payload to this queue.

@param payload [String] the payload of the message to publish @param priority [FixNum] must be one between 0 and 9, inclusive. @param options [Hash]

# File lib/agni/queue.rb, line 86
def publish(payload, priority=DEFAULT_PRIORITY, options={})
  unless PRIORITY_LEVELS.include? priority
    raise ArgumentError, "Invalid priority #{priority}, must be between 0 and 9"
  end
  queue_name = create_queue_name(@logical_queue_name, priority)
  @queues[priority][:exchange].publish(payload, DEFAULT_MESSAGE_OPTS.merge(options).
                                       merge(:routing_key => queue_name))
end
subscribe(handler, options={}) click to toggle source

Subscribes to this queue, handling each incoming message with the provided handler. @param handler [Proc] accepts two arguments:

metadata [Hash] a hash of attributes as it is provided by the
underlying AMQP implementation.
payload [String] the message itself, as was provided by the
publisher
The return value from the handler will be discarded.
# File lib/agni/queue.rb, line 44
def subscribe(handler, options={})
  if subscribed?
    raise AgniError, 'Queue #{queue_name} is already subscribed'
  end
  ack = options[:ack].nil? ? true : options[:ack]
  handle_func = lambda do
    metadata, payload = pop
    handler[metadata, payload] if handler
    EventMachine.next_tick{ metadata.ack } if ack
  end
  @queues.each do |q|
    queue = q[:queue]
    priority = q[:priority]
    queue.subscribe(:ack => true) do |metadata, payload|
      @queue_mutex.synchronize do
        @memory_queue.push(priority, [metadata, payload])
      end
      EventMachine.next_tick { EventMachine.defer(handle_func) }
    end
  end
  self
end
subscribed?() click to toggle source

@return [True] iff every AMQP queue is subscribed?

# File lib/agni/queue.rb, line 77
def subscribed?
  @queues.map{|q| q[:queue].default_consumer}.all?{|c| c.subscribed? if c}
end
unsubscribe() click to toggle source
# File lib/agni/queue.rb, line 67
def unsubscribe
  unless subscribed?
    raise AgniError, 'Queue #{queue_name} is not subscribed'
  end
  @queues.each do |q|
    q[:queue].unsubscribe
  end
end

Private Instance Methods

create_queue(messenger, priority, options) click to toggle source

Internal use utility method to create queue hashes. No checking is performed to ensure that the queue does not already exist, for example. Its only use right now is during initialization of the Agni::Queue class.

# File lib/agni/queue.rb, line 113
def create_queue(messenger, priority, options)
  name = create_queue_name(@logical_queue_name, priority)
  unless channel = AMQP::Channel.new(messenger.connection,
                                     DEFAULT_CHANNEL_OPTS.
                                     merge({prefetch: DEFAULT_PREFETCH}))
    raise AgniError,
    "Unable to obtain a channel from AMQP instance at #{amqp_url}"
  end
  # Get a handle to the default exchange. The default exchange
  # automatically binds messages with a given routing key to a
  # queue with the same name, eliminating the need to create
  # specific direct bindings for each queue.
  queue = channel.queue(name, DEFAULT_QUEUE_OPTS.
                        merge(options))

  exchange = channel.default_exchange
  # Each 'queue' in the queue array is a hash.  Here's how each
  # hash is laid out:
  {
    priority:  priority,
    name:      name,
    channel:   channel,
    queue:     queue,
    exchange:  exchange
  }
end
pop() click to toggle source

Removes and returns an item from the priority queue in a thread-safe manner. Thread safety reasoning: all calls to shared queue are locked by a single mutex.

# File lib/agni/queue.rb, line 143
def pop
  val = []
  @queue_mutex.synchronize do
    val = @memory_queue.pop
  end
  val
end