class Smith::Messaging::Receiver

Public Class Methods

new(queue_def, opts={}, &blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 12
def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @foo_options = {
    :error_queue => opts.delete(:error_queue) { false },
    :auto_ack => option_or_default(@queue_def.options, :auto_ack, true),
    :threading => option_or_default(@queue_def.options, :threading, false)}

  fanout_options = if opts.delete(:fanout)
    {:persistence => opts.delete(:fanout_persistence) { true }, :queue_suffix => opts.delete(:fanout_queue_suffix)}
  end

  @payload_type = Array(option_or_default(@queue_def.options, :type, []))

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counter = MessageCounter.new(@queue_def.denormalise)

  @channel_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @exchange_completion = EM::Completion.new
  @requeue_options_completion = EM::Completion.new

  @reply_queues = {}

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    exchange_type = (fanout_options) ? :fanout : :direct
    channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange|
      @exchange_completion.succeed(exchange)
    end
  end

  open_channel(:prefetch => prefetch) do |channel|
    channel.queue(*fanout_queue_and_opts(fanout_options)) do |queue|
      @exchange_completion.completion do |exchange|
        queue.bind(exchange, :routing_key => @queue_def.normalise)
        @queue_completion.succeed(queue)
        @requeue_options_completion.succeed(:exchange => exchange, :queue => queue)
      end
    end
  end

  blk.call(self) if blk
end

Public Instance Methods

ack(multiple=false) click to toggle source
# File lib/smith/messaging/receiver.rb, line 65
def ack(multiple=false)
  @channel_completion.completion {|channel| channel.ack(multiple) }
end
delete(&blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 167
def delete(&blk)
  @exchange_completion.completion do |exchange|
    @queue_completion.completion do |queue|
      @channel_completion.completion do |channel|
        queue.unbind(exchange) do
          queue.delete do
            exchange.delete do
              channel.close(&blk)
            end
          end
        end
      end
    end
  end
end
on_error(chain=false, &blk) click to toggle source

Define a channel error handler.

# File lib/smith/messaging/receiver.rb, line 137
def on_error(chain=false, &blk)
  # TODO Check that this chains callbacks
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end
on_requeue(&blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 197
def on_requeue(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue => blk)
  end
end
on_requeue_limit(&blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 203
def on_requeue_limit(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue_limit => blk)
  end
end
pop(&blk) click to toggle source

pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.

# File lib/smith/messaging/receiver.rb, line 121
def pop(&blk)
  opts = @options.pop
  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      queue.pop(opts) do |metadata, payload|
        if payload
          on_message(metadata, payload, requeue_options, &blk)
        else
          blk.call(nil,nil)
        end
      end
    end
  end
end
queue_name() click to toggle source
# File lib/smith/messaging/receiver.rb, line 163
def queue_name
  @queue_def.denormalise
end
requeue_parameters(opts={}) click to toggle source
# File lib/smith/messaging/receiver.rb, line 191
def requeue_parameters(opts={})
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(opts)
  end
end
setup_reply_queue(reply_queue_name, &blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 69
def setup_reply_queue(reply_queue_name, &blk)
  if @reply_queues[reply_queue_name]
    blk.call(@reply_queues[reply_queue_name])
  else
    @exchange_completion.completion do |exchange|
      logger.debug { "Attaching to reply queue: #{reply_queue_name}" }

      reply_queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false)

      Smith::Messaging::Sender.new(reply_queue_def) do |sender|
        @reply_queues[reply_queue_name] = sender
        blk.call(sender)
      end
    end
  end
end
status(&blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 183
def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end
subscribe(handler=nil, &blk) click to toggle source

Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.

# File lib/smith/messaging/receiver.rb, line 89
def subscribe(handler=nil, &blk)

  blk = handler || blk

  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      if !queue.subscribed?
        opts = @options.subscribe
        logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" }
        queue.subscribe(opts) do |metadata, payload|
          if payload
            on_message(metadata, payload, requeue_options, &blk)
          else
            logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" }
          end
        end
      else
        logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" }
      end
    end
  end
end
unsubscribe(&blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 112
def unsubscribe(&blk)
  @queue_completion.completion do |queue|
    queue.unsubscribe(&blk)
  end
end

Private Instance Methods

fanout_queue_and_opts(fanout_options) click to toggle source

Calculates the queue name and the various queue options based on the fanout_options and returns them in a form that can be used by the queue constuctor.

@param [Hash] fanout_options the various fanout related options

@raise [ArgumentError] raised if :queue_suffix is given without the :persistence flag

@return [Array<queue_name, options]

# File lib/smith/messaging/receiver.rb, line 220
def fanout_queue_and_opts(fanout_options)
  if fanout_options
    if fanout_options[:persistence]
      if fanout_options[:queue_suffix]
        ["#{@queue_def.normalise}.#{fanout_options[:queue_suffix]}", @options.queue]
      else
        raise ArgumentError, "Incorrect options. :fanout_queue_suffix must be provided if :fanout_persistence is true."
      end
    else
      ["", @options.queue.merge(:durable => false, :auto_delete => true)]
    end
  else
    [@queue_def.normalise, @options.queue]
  end
end
on_message(metadata, payload, requeue_options, &blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 144
def on_message(metadata, payload, requeue_options, &blk)
  if @payload_type.empty? || @payload_type.include?(@acl_type_cache.get_by_hash(metadata.type))
    @message_counter.increment_counter
    if metadata.reply_to
      setup_reply_queue(metadata.reply_to) do |queue|
        Foo.new(metadata, payload, @foo_options.merge(:reply_queue => queue), requeue_options, &blk)
      end
    else
      Foo.new(metadata, payload, @foo_options, requeue_options, &blk)
    end
  else
    allowable_acls = @payload_type.join(", ")
    received_acl = @acl_type_cache.get_by_hash(metadata.type)
    raise ACL::IncorrectTypeError, "Received ACL: #{received_acl} on queue: #{@queue_def.denormalise}. This queue can only accept the following ACLs: #{allowable_acls}"
  end
end