class Philotic::Subscriber
Attributes
connection[RW]
Public Class Methods
new(connection)
click to toggle source
# File lib/philotic/subscriber.rb, line 10 def initialize(connection) @connection = connection end
Public Instance Methods
acknowledge(message, up_to_and_including=false)
click to toggle source
# File lib/philotic/subscriber.rb, line 79 def acknowledge(message, up_to_and_including=false) connection.channel.acknowledge(message.delivery_tag, up_to_and_including) end
config()
click to toggle source
# File lib/philotic/subscriber.rb, line 18 def config connection.config end
endure()
click to toggle source
# File lib/philotic/subscriber.rb, line 93 def endure Thread.stop end
get_subscription_settings(subscription, subscribe_options)
click to toggle source
# File lib/philotic/subscriber.rb, line 52 def get_subscription_settings(subscription, subscribe_options) if [Symbol, String].include? subscription.class queue_name = subscription subscription = subscribe_options queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS else queue_name = subscription[:queue_name] || '' queue_options = Philotic::DEFAULT_ANONYMOUS_QUEUE_OPTIONS subscribe_options = subscribe_options.merge(subscription[:subscribe_options]) if subscription[:subscribe_options] arguments = subscription[:arguments] || subscription arguments['x-match'] ||= 'all' end queue_options.merge!(subscription[:queue_options] || {}) queue_options[:auto_delete] ||= true if queue_name == '' { queue_name: queue_name, queue_options: queue_options, arguments: arguments, subscribe_options: subscribe_options, } end
initialize_queue(subscription_settings)
click to toggle source
# File lib/philotic/subscriber.rb, line 45 def initialize_queue(subscription_settings) queue = connection.channel.queue(subscription_settings[:queue_name], subscription_settings[:queue_options]) queue.bind(connection.exchange, arguments: subscription_settings[:arguments]) if subscription_settings[:arguments] queue end
logger()
click to toggle source
# File lib/philotic/subscriber.rb, line 14 def logger connection.logger end
reject(message, requeue=true)
click to toggle source
# File lib/philotic/subscriber.rb, line 83 def reject(message, requeue=true) connection.channel.reject(message.delivery_tag, requeue) end
subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block)
click to toggle source
# File lib/philotic/subscriber.rb, line 33 def subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block) connection.connect! connection.channel.prefetch(connection.config.prefetch_count) subscription_settings = get_subscription_settings subscription, subscribe_options queue = initialize_queue(subscription_settings) queue.subscribe(subscription_settings[:subscribe_options], &subscription_callback(&block)) end
subscribe_to_any(options = {})
click to toggle source
# File lib/philotic/subscriber.rb, line 87 def subscribe_to_any(options = {}) if block_given? subscribe(options.merge(:'x-match' => :any), &Proc.new) end end
subscription_callback(&block)
click to toggle source
# File lib/philotic/subscriber.rb, line 22 def subscription_callback(&block) lambda do |delivery_info, metadata, payload| hash_payload = Philotic::Serialization::Serializer.load(payload, metadata) message = Class.new(Philotic::Message).new(metadata[:headers], hash_payload) message.delivery_info = delivery_info instance_exec(message, &block) end end