class Smith::Messaging::Receiver
Public Class Methods
new(queue_def, opts={}, &blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 12 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance @foo_options = { :error_queue => opts.delete(:error_queue) { false }, :auto_ack => option_or_default(@queue_def.options, :auto_ack, true), :threading => option_or_default(@queue_def.options, :threading, false)} fanout_options = if opts.delete(:fanout) {:persistence => opts.delete(:fanout_persistence) { true }, :queue_suffix => opts.delete(:fanout_queue_suffix)} end @payload_type = Array(option_or_default(@queue_def.options, :type, [])) prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch) @options = AmqpOptions.new(@queue_def.options) @options.routing_key = @queue_def.normalise @message_counter = MessageCounter.new(@queue_def.denormalise) @channel_completion = EM::Completion.new @queue_completion = EM::Completion.new @exchange_completion = EM::Completion.new @requeue_options_completion = EM::Completion.new @reply_queues = {} open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) exchange_type = (fanout_options) ? :fanout : :direct channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange| @exchange_completion.succeed(exchange) end end open_channel(:prefetch => prefetch) do |channel| channel.queue(*fanout_queue_and_opts(fanout_options)) do |queue| @exchange_completion.completion do |exchange| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) @requeue_options_completion.succeed(:exchange => exchange, :queue => queue) end end end blk.call(self) if blk end
Public Instance Methods
ack(multiple=false)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 65 def ack(multiple=false) @channel_completion.completion {|channel| channel.ack(multiple) } end
delete(&blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 167 def delete(&blk) @exchange_completion.completion do |exchange| @queue_completion.completion do |queue| @channel_completion.completion do |channel| queue.unbind(exchange) do queue.delete do exchange.delete do channel.close(&blk) end end end end end end end
on_error(chain=false, &blk)
click to toggle source
Define a channel error handler.
# File lib/smith/messaging/receiver.rb, line 137 def on_error(chain=false, &blk) # TODO Check that this chains callbacks @channel_completion.completion do |channel| channel.on_error(&blk) end end
on_requeue(&blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 197 def on_requeue(&blk) @requeue_options_completion.completion do |requeue_options| requeue_options.merge!(:on_requeue => blk) end end
on_requeue_limit(&blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 203 def on_requeue_limit(&blk) @requeue_options_completion.completion do |requeue_options| requeue_options.merge!(:on_requeue_limit => blk) end end
pop(&blk)
click to toggle source
pops a message off the queue and passes the headers and payload into the block. pop
will automatically acknowledge the message unless the options sets :ack to false.
# File lib/smith/messaging/receiver.rb, line 121 def pop(&blk) opts = @options.pop @queue_completion.completion do |queue| @requeue_options_completion.completion do |requeue_options| queue.pop(opts) do |metadata, payload| if payload on_message(metadata, payload, requeue_options, &blk) else blk.call(nil,nil) end end end end end
queue_name()
click to toggle source
# File lib/smith/messaging/receiver.rb, line 163 def queue_name @queue_def.denormalise end
requeue_parameters(opts={})
click to toggle source
# File lib/smith/messaging/receiver.rb, line 191 def requeue_parameters(opts={}) @requeue_options_completion.completion do |requeue_options| requeue_options.merge!(opts) end end
setup_reply_queue(reply_queue_name, &blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 69 def setup_reply_queue(reply_queue_name, &blk) if @reply_queues[reply_queue_name] blk.call(@reply_queues[reply_queue_name]) else @exchange_completion.completion do |exchange| logger.debug { "Attaching to reply queue: #{reply_queue_name}" } reply_queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false) Smith::Messaging::Sender.new(reply_queue_def) do |sender| @reply_queues[reply_queue_name] = sender blk.call(sender) end end end end
status(&blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 183 def status(&blk) @queue_completion.completion do |queue| queue.status do |num_messages, num_consumers| blk.call(num_messages, num_consumers) end end end
subscribe(handler=nil, &blk)
click to toggle source
Subscribes to a queue and passes the headers and payload into the block. subscribe
will automatically acknowledge the message unless the options sets :ack to false.
# File lib/smith/messaging/receiver.rb, line 89 def subscribe(handler=nil, &blk) blk = handler || blk @queue_completion.completion do |queue| @requeue_options_completion.completion do |requeue_options| if !queue.subscribed? opts = @options.subscribe logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" } queue.subscribe(opts) do |metadata, payload| if payload on_message(metadata, payload, requeue_options, &blk) else logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" } end end else logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" } end end end end
unsubscribe(&blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 112 def unsubscribe(&blk) @queue_completion.completion do |queue| queue.unsubscribe(&blk) end end
Private Instance Methods
fanout_queue_and_opts(fanout_options)
click to toggle source
Calculates the queue name and the various queue options based on the fanout_options and returns them in a form that can be used by the queue constuctor.
@param [Hash] fanout_options the various fanout related options
@raise [ArgumentError] raised if :queue_suffix is given without the :persistence flag
@return [Array<queue_name, options]
# File lib/smith/messaging/receiver.rb, line 220 def fanout_queue_and_opts(fanout_options) if fanout_options if fanout_options[:persistence] if fanout_options[:queue_suffix] ["#{@queue_def.normalise}.#{fanout_options[:queue_suffix]}", @options.queue] else raise ArgumentError, "Incorrect options. :fanout_queue_suffix must be provided if :fanout_persistence is true." end else ["", @options.queue.merge(:durable => false, :auto_delete => true)] end else [@queue_def.normalise, @options.queue] end end
on_message(metadata, payload, requeue_options, &blk)
click to toggle source
# File lib/smith/messaging/receiver.rb, line 144 def on_message(metadata, payload, requeue_options, &blk) if @payload_type.empty? || @payload_type.include?(@acl_type_cache.get_by_hash(metadata.type)) @message_counter.increment_counter if metadata.reply_to setup_reply_queue(metadata.reply_to) do |queue| Foo.new(metadata, payload, @foo_options.merge(:reply_queue => queue), requeue_options, &blk) end else Foo.new(metadata, payload, @foo_options, requeue_options, &blk) end else allowable_acls = @payload_type.join(", ") received_acl = @acl_type_cache.get_by_hash(metadata.type) raise ACL::IncorrectTypeError, "Received ACL: #{received_acl} on queue: #{@queue_def.denormalise}. This queue can only accept the following ACLs: #{allowable_acls}" end end