module Emque::Producing::Message
Constants
- InvalidMessageError
- MessagesNotSentError
Public Class Methods
included(base)
click to toggle source
# File lib/emque/producing/message/message.rb, line 75 def self.included(base) base.extend(ClassMethods) base.send(:include, Virtus.value_object) base.send(:attribute, :partition_key, String, :default => nil, :required => false) end
Public Instance Methods
add_metadata()
click to toggle source
# File lib/emque/producing/message/message.rb, line 81 def add_metadata { :metadata => { :host => hostname, :app => app_name, :topic => topic, :created_at => formatted_time, :uuid => uuid, :type => message_type, :partition_key => partition_key } }.merge(public_attributes) end
ignored_exceptions()
click to toggle source
# File lib/emque/producing/message/message.rb, line 108 def ignored_exceptions self.class.read_ignored_exceptions end
invalid_attributes()
click to toggle source
# File lib/emque/producing/message/message.rb, line 120 def invalid_attributes invalid_attrs = self.class.attribute_set.inject([]) do |attrs, attr| attrs << attr.name if attr.required? && self.attributes.fetch(attr.name).nil? attrs end Array(invalid_attrs) - self.class.private_attrs end
message_type()
click to toggle source
# File lib/emque/producing/message/message.rb, line 100 def message_type self.class.read_message_type end
publish(publisher=nil)
click to toggle source
# File lib/emque/producing/message/message.rb, line 133 def publish(publisher=nil) publisher ||= Emque::Producing.publisher log "publishing...", true if valid? log "valid...", true if Emque::Producing.configuration.publish_messages message = process_middleware(to_json) sent = publisher.publish(topic, message_type, message, partition_key, raise_on_failure?) log "sent #{sent}" raise MessagesNotSentError.new unless sent end else log "failed...", true raise InvalidMessageError.new(invalid_message) end rescue *ignored_exceptions => error if raise_on_failure? raise else log "failed ignoring exception... #{error}", true end end
raise_on_failure?()
click to toggle source
# File lib/emque/producing/message/message.rb, line 104 def raise_on_failure? self.class.read_raise_on_failure end
to_json()
click to toggle source
# File lib/emque/producing/message/message.rb, line 128 def to_json data = self.add_metadata Oj.dump(data, :mode => :compat) end
topic()
click to toggle source
# File lib/emque/producing/message/message.rb, line 96 def topic self.class.read_topic end
valid?()
click to toggle source
# File lib/emque/producing/message/message.rb, line 112 def valid? if invalid_attributes.empty? && topic && message_type true else false end end
Private Instance Methods
app_name()
click to toggle source
# File lib/emque/producing/message/message.rb, line 180 def app_name Emque::Producing.configuration.app_name || raise("Messages must have an app name configured.") end
formatted_time()
click to toggle source
# File lib/emque/producing/message/message.rb, line 172 def formatted_time DateTime.now.new_offset(0).to_time.utc.iso8601 end
hostname()
click to toggle source
# File lib/emque/producing/message/message.rb, line 168 def hostname Emque::Producing.hostname end
invalid_message()
click to toggle source
# File lib/emque/producing/message/message.rb, line 158 def invalid_message if !topic "A topic is required" elsif !message_type "A message type is required" else "Required attributes #{invalid_attributes} are missing." end end
log(message, include_message = false)
click to toggle source
# File lib/emque/producing/message/message.rb, line 184 def log(message, include_message = false) if Emque::Producing.configuration.log_publish_message message = "#{message} #{to_json}" if include_message Emque::Producing.logger.info("MESSAGE LOG: #{message}") end end
middleware()
click to toggle source
# File lib/emque/producing/message/message.rb, line 191 def middleware self.class.middleware + Emque::Producing.configuration.middleware end
middleware?()
click to toggle source
# File lib/emque/producing/message/message.rb, line 195 def middleware? middleware.count > 0 end
process_middleware(str)
click to toggle source
# File lib/emque/producing/message/message.rb, line 199 def process_middleware(str) if middleware? middleware.inject(str) { |compiled, callable| callable.call(compiled) } else str end end
public_attributes()
click to toggle source
# File lib/emque/producing/message/message.rb, line 209 def public_attributes public = self.class.attribute_set.select do |attr| attr && !self.class.private_attrs.include?(attr.name) end.map(&:name) slice_attributes(*public) end
slice_attributes(*keys)
click to toggle source
# File lib/emque/producing/message/message.rb, line 216 def slice_attributes(*keys) keys.map!(&:to_sym) attributes.select { |key, value| keys.include?(key) } end
uuid()
click to toggle source
# File lib/emque/producing/message/message.rb, line 176 def uuid SecureRandom.uuid end