class Banter::Subscriber
Attributes
Public Class Methods
# File lib/banter/subscriber.rb, line 58 def self.get_config_value(options, options_key, config_key = nil) if options.key?(options_key) options[options_key] else config_key = options_key if config_key.nil? configure_obj = [Banter::Configuration.configuration, Banter::Configuration].find do |obj| obj.respond_to?(config_key) end configure_obj.nil? ? nil : configure_obj.send(config_key) end rescue => e puts "Banter Configuration does not exist #{config_key}:#{e.message}" raise e end
Sets an error handler for the class
# File lib/banter/subscriber.rb, line 84 def self.handle_errors_with(handler) error_handler = handler end
# File lib/banter/subscriber.rb, line 14 def self.inherited(klass) @@registered_subscribers << klass end
@param [Object] delivery_routing_data
Contains routing information like originator and routing key @param [Object] delivery_properties
@param [Hash] context @param [Hash] envelope
# File lib/banter/subscriber.rb, line 92 def initialize(delivery_routing_data, delivery_properties, context, envelope) @delivery_routing_data = delivery_routing_data @delivery_properties = delivery_properties @context = context @envelope = envelope end
Specify the routing key that the subscriber class should listen to. @param [String] routing_key_name The routing key to subscribe to. Must be characters only separated by periods (.) @param [Hash] options subscription options
@option [String] :durable sets the subscriber to be a durable subscriber (one that survives reboots). This currently defaults to true. @option [Integer] :ttl Time, in seconds, that the message lives on the queue before being either consumer by a subscriber or being discarded. If not specified, then Banter::Configuration.default_queue_ttl is used @option [String] :exchange_name sets to exchange to the specified value, or if it is not used, will default to Banter.Configuration.exchange_name @option [String] :dead_letter_exchange_name name of queue to subscribe to. @option [String] :queue_name name of queue to subscribe to.
# File lib/banter/subscriber.rb, line 27 def self.subscribe_to(routing_key_name, options = {}) unless validate_routing_key_name(routing_key_name) raise ArgumentError.new("#{routing_key_name} is not supported. Only lower case characters separated by periods are allowed.") end options.assert_valid_keys( :topic_prefix, :exchange_name, :dead_letter_exchange_name, :dead_letter_queue_name, :queue_name, :ttl, :durable, :pool_size) config = Hashie::Mash.new config[:topic] = get_config_value(options, :topic_prefix) config[:exchange] = get_config_value(options, :exchange_name) config[:dead_letter_exchange] = get_config_value(options, :dead_letter_exchange_name) config[:dead_letter_queue_name] = get_config_value(options, :dead_letter_queue_name) config[:routing_key] = self.subscribed_key = routing_key_name config[:queue_name] = self.subscribed_queue = options[:queue_name] || generated_queue_name(routing_key_name, Banter::Configuration.application_name) config[:ttl] = get_config_value(options, :ttl, :default_queue_ttl) config[:durable] = get_config_value(options, :durable, :durable_queues) config[:pool_size] = get_config_value(options, :pool_size) self.config = config end
Sets the validator for payload
@param validator The validator to use for validating the payload.
Returns false if the payload is not valid. Proc must accept a payload as an argument.
# File lib/banter/subscriber.rb, line 78 def self.validates_payload_with(*validators) self.payload_validators ||= [] self.payload_validators += validators end
Private Class Methods
# File lib/banter/subscriber.rb, line 156 def self.generated_application_name(application_name) return application_name if application_name.present? Banter::Configuration.application_name.to_s.gsub(/[^\w\_ ]/, '') end
# File lib/banter/subscriber.rb, line 162 def self.generated_queue_name(routing_key, application_name) name = "" if routing_key.present? name = routing_key if application_name name += "_#{application_name}" end else name = application_name end name end
# File lib/banter/subscriber.rb, line 151 def self.validate_routing_key_name(key) return true if key.blank? key.match(/\A([a-z_]+[\.\d]*)*([a-z\_]+)\d*\Z/).present? end
Public Instance Methods
Actual subscribers need to implement perform method. This is the method where the message is actually processed. @param [Object] payload Payload of the message
# File lib/banter/subscriber.rb, line 111 def perform(payload) raise "Need implementation for your worker." end
Performs validation if validates_payload_with
is defined and then calls the perform method @param [Object] payload Payload of the message
# File lib/banter/subscriber.rb, line 101 def perform!(payload) if !valid_payload?(payload) raise ::Banter::PayloadValidationError.new("Invalid Payload for #{self.class.name}") end perform(payload) end
@return [String] The original routing key with which the current message was published
# File lib/banter/subscriber.rb, line 116 def routing_key delivery_routing_data[:routing_key] end
Iterates over all the payload validators and returns false if any of them are false @param [Object] payload The payload/arguments of the message @return [Boolen] Should return true or false value - If no validators are specified, then returns true
# File lib/banter/subscriber.rb, line 123 def valid_payload?(payload) return true unless payload_validators.present? payload_validators.inject(true) { |is_valid, validator| is_valid && (validator.respond_to?(:call) ? validator.call(payload) : send(validator, payload)) } end
Private Instance Methods
@param [Integer] number The current progress of the execution of the message
# File lib/banter/subscriber.rb, line 140 def at(number) return unless Banter::Configuration.web_enabled BanterMessage.update_progress_at(number) end
@param [Object] object An object that indicates the current progress of the message
# File lib/banter/subscriber.rb, line 146 def progress(object) return unless Banter::Configuration.web_enabled BanterMessage.update_progress(object) end
@param [Integer] number The expected number of total objects that may be processed due to the message
# File lib/banter/subscriber.rb, line 134 def total(number) return unless Banter::Configuration.web_enabled BanterMessage.update_progress_total(number) end