class Philotic::Subscriber

Attributes

connection[RW]

Public Class Methods

new(connection) click to toggle source
# File lib/philotic/subscriber.rb, line 10
def initialize(connection)
  @connection = connection
end

Public Instance Methods

acknowledge(message, up_to_and_including=false) click to toggle source
# File lib/philotic/subscriber.rb, line 79
def acknowledge(message, up_to_and_including=false)
  connection.channel.acknowledge(message.delivery_tag, up_to_and_including)
end
config() click to toggle source
# File lib/philotic/subscriber.rb, line 18
def config
  connection.config
end
endure() click to toggle source
# File lib/philotic/subscriber.rb, line 93
def endure
  Thread.stop
end
get_subscription_settings(subscription, subscribe_options) click to toggle source
# File lib/philotic/subscriber.rb, line 52
def get_subscription_settings(subscription, subscribe_options)

  if [Symbol, String].include? subscription.class
    queue_name    = subscription
    subscription  = subscribe_options
    queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS

  else
    queue_name           = subscription[:queue_name] || ''
    queue_options        = Philotic::DEFAULT_ANONYMOUS_QUEUE_OPTIONS
    subscribe_options    = subscribe_options.merge(subscription[:subscribe_options]) if subscription[:subscribe_options]
    arguments            = subscription[:arguments] || subscription
    arguments['x-match'] ||= 'all'
  end

  queue_options.merge!(subscription[:queue_options] || {})

  queue_options[:auto_delete] ||= true if queue_name == ''

  {
    queue_name:        queue_name,
    queue_options:     queue_options,
    arguments:         arguments,
    subscribe_options: subscribe_options,
  }
end
initialize_queue(subscription_settings) click to toggle source
# File lib/philotic/subscriber.rb, line 45
def initialize_queue(subscription_settings)
  queue = connection.channel.queue(subscription_settings[:queue_name], subscription_settings[:queue_options])

  queue.bind(connection.exchange, arguments: subscription_settings[:arguments]) if subscription_settings[:arguments]
  queue
end
logger() click to toggle source
# File lib/philotic/subscriber.rb, line 14
def logger
  connection.logger
end
reject(message, requeue=true) click to toggle source
# File lib/philotic/subscriber.rb, line 83
def reject(message, requeue=true)
  connection.channel.reject(message.delivery_tag, requeue)
end
subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block) click to toggle source
# File lib/philotic/subscriber.rb, line 33
def subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block)
  connection.connect!
  connection.channel.prefetch(connection.config.prefetch_count)

  subscription_settings = get_subscription_settings subscription, subscribe_options

  queue = initialize_queue(subscription_settings)

  queue.subscribe(subscription_settings[:subscribe_options], &subscription_callback(&block))

end
subscribe_to_any(options = {}) click to toggle source
# File lib/philotic/subscriber.rb, line 87
def subscribe_to_any(options = {})
  if block_given?
    subscribe(options.merge(:'x-match' => :any), &Proc.new)
  end
end
subscription_callback(&block) click to toggle source
# File lib/philotic/subscriber.rb, line 22
def subscription_callback(&block)
  lambda do |delivery_info, metadata, payload|
    hash_payload = Philotic::Serialization::Serializer.load(payload, metadata)

    message               = Class.new(Philotic::Message).new(metadata[:headers], hash_payload)
    message.delivery_info = delivery_info

    instance_exec(message, &block)
  end
end