class Banter::Subscriber

Attributes

context[RW]
delivery_properties[RW]
delivery_routing_data[RW]
envelope[RW]

Public Class Methods

get_config_value(options, options_key, config_key = nil) click to toggle source
# File lib/banter/subscriber.rb, line 58
def self.get_config_value(options, options_key, config_key = nil)
  if options.key?(options_key)
    options[options_key]
  else
    config_key = options_key if config_key.nil?
    configure_obj = [Banter::Configuration.configuration, Banter::Configuration].find do |obj|
      obj.respond_to?(config_key)
    end
    configure_obj.nil? ? nil : configure_obj.send(config_key)
  end
rescue => e
  puts "Banter Configuration does not exist #{config_key}:#{e.message}"
  raise e
end
handle_errors_with(handler) click to toggle source

Sets an error handler for the class

# File lib/banter/subscriber.rb, line 84
def self.handle_errors_with(handler)
  error_handler = handler
end
inherited(klass) click to toggle source
# File lib/banter/subscriber.rb, line 14
def self.inherited(klass)
  @@registered_subscribers << klass
end
new(delivery_routing_data, delivery_properties, context, envelope) click to toggle source

@param [Object] delivery_routing_data Contains routing information like originator and routing key @param [Object] delivery_properties @param [Hash] context @param [Hash] envelope

# File lib/banter/subscriber.rb, line 92
def initialize(delivery_routing_data, delivery_properties, context, envelope)
  @delivery_routing_data = delivery_routing_data
  @delivery_properties   = delivery_properties
  @context               = context
  @envelope              = envelope
end
subscribe_to(routing_key_name, options = {}) click to toggle source

Specify the routing key that the subscriber class should listen to. @param [String] routing_key_name The routing key to subscribe to. Must be characters only separated by periods (.) @param [Hash] options subscription options

@option [String] :durable sets the subscriber to be a durable subscriber (one that survives reboots).  This currently defaults to true.
@option [Integer] :ttl Time, in seconds, that the message lives on the queue before being either consumer by a subscriber or being discarded.
                             If not specified, then Banter::Configuration.default_queue_ttl is used
@option [String] :exchange_name sets to exchange to the specified value, or if it is not used, will default to Banter.Configuration.exchange_name
@option [String] :dead_letter_exchange_name name of queue to subscribe to.
@option [String] :queue_name name of queue to subscribe to.
# File lib/banter/subscriber.rb, line 27
def self.subscribe_to(routing_key_name, options = {})

  unless validate_routing_key_name(routing_key_name)
    raise ArgumentError.new("#{routing_key_name} is not supported. Only lower case characters separated by periods are allowed.")
  end

  options.assert_valid_keys(
    :topic_prefix,
    :exchange_name,
    :dead_letter_exchange_name,
    :dead_letter_queue_name,
    :queue_name,
    :ttl,
    :durable,
    :pool_size)

  config = Hashie::Mash.new
  config[:topic]          = get_config_value(options, :topic_prefix)
  config[:exchange]       = get_config_value(options, :exchange_name)
  config[:dead_letter_exchange] = get_config_value(options, :dead_letter_exchange_name)
  config[:dead_letter_queue_name] = get_config_value(options, :dead_letter_queue_name)
  config[:routing_key]    = self.subscribed_key = routing_key_name
  config[:queue_name]     = self.subscribed_queue = options[:queue_name] || generated_queue_name(routing_key_name, Banter::Configuration.application_name)
  config[:ttl]            = get_config_value(options, :ttl, :default_queue_ttl)
  config[:durable]        = get_config_value(options, :durable, :durable_queues)
  config[:pool_size]      = get_config_value(options, :pool_size)

  self.config = config

end
validates_payload_with(*validators) click to toggle source

Sets the validator for payload

@param validator The validator to use for validating the payload.

Returns false if the payload is not valid.
Proc must accept a payload as an argument.
# File lib/banter/subscriber.rb, line 78
def self.validates_payload_with(*validators)
  self.payload_validators ||= []
  self.payload_validators += validators
end

Private Class Methods

generated_application_name(application_name) click to toggle source
# File lib/banter/subscriber.rb, line 156
def self.generated_application_name(application_name)
  return application_name if application_name.present?

  Banter::Configuration.application_name.to_s.gsub(/[^\w\_ ]/, '')
end
generated_queue_name(routing_key, application_name) click to toggle source
# File lib/banter/subscriber.rb, line 162
def self.generated_queue_name(routing_key, application_name)
  name = ""
  if routing_key.present?
    name = routing_key
    if application_name
      name += "_#{application_name}"
    end
  else
    name = application_name
  end
  name
end
validate_routing_key_name(key) click to toggle source
# File lib/banter/subscriber.rb, line 151
def self.validate_routing_key_name(key)
  return true if key.blank?
  key.match(/\A([a-z_]+[\.\d]*)*([a-z\_]+)\d*\Z/).present?
end

Public Instance Methods

perform(payload) click to toggle source

Actual subscribers need to implement perform method. This is the method where the message is actually processed. @param [Object] payload Payload of the message

# File lib/banter/subscriber.rb, line 111
def perform(payload)
  raise "Need implementation for your worker."
end
perform!(payload) click to toggle source

Performs validation if validates_payload_with is defined and then calls the perform method @param [Object] payload Payload of the message

# File lib/banter/subscriber.rb, line 101
def perform!(payload)
  if !valid_payload?(payload)
    raise ::Banter::PayloadValidationError.new("Invalid Payload for #{self.class.name}")
  end

  perform(payload)
end
routing_key() click to toggle source

@return [String] The original routing key with which the current message was published

# File lib/banter/subscriber.rb, line 116
def routing_key
  delivery_routing_data[:routing_key]
end
valid_payload?(payload) click to toggle source

Iterates over all the payload validators and returns false if any of them are false @param [Object] payload The payload/arguments of the message @return [Boolen] Should return true or false value - If no validators are specified, then returns true

# File lib/banter/subscriber.rb, line 123
def valid_payload?(payload)
  return true unless payload_validators.present?

  payload_validators.inject(true) { |is_valid, validator|
    is_valid && (validator.respond_to?(:call) ? validator.call(payload) : send(validator, payload))
  }
end

Private Instance Methods

at(number) click to toggle source

@param [Integer] number The current progress of the execution of the message

# File lib/banter/subscriber.rb, line 140
def at(number)
  return unless Banter::Configuration.web_enabled
  BanterMessage.update_progress_at(number)
end
progress(object) click to toggle source

@param [Object] object An object that indicates the current progress of the message

# File lib/banter/subscriber.rb, line 146
def progress(object)
  return unless Banter::Configuration.web_enabled
  BanterMessage.update_progress(object)
end
total(number) click to toggle source

@param [Integer] number The expected number of total objects that may be processed due to the message

# File lib/banter/subscriber.rb, line 134
def total(number)
  return unless Banter::Configuration.web_enabled
  BanterMessage.update_progress_total(number)
end