module Emque::Producing::Message

Constants

InvalidMessageError
MessagesNotSentError

Public Class Methods

included(base) click to toggle source
# File lib/emque/producing/message/message.rb, line 75
def self.included(base)
  base.extend(ClassMethods)
  base.send(:include, Virtus.value_object)
  base.send(:attribute, :partition_key, String, :default => nil, :required => false)
end

Public Instance Methods

add_metadata() click to toggle source
# File lib/emque/producing/message/message.rb, line 81
def add_metadata
  {
    :metadata =>
    {
      :host => hostname,
      :app => app_name,
      :topic => topic,
      :created_at => formatted_time,
      :uuid => uuid,
      :type => message_type,
      :partition_key => partition_key
    }
  }.merge(public_attributes)
end
ignored_exceptions() click to toggle source
# File lib/emque/producing/message/message.rb, line 108
def ignored_exceptions
  self.class.read_ignored_exceptions
end
invalid_attributes() click to toggle source
# File lib/emque/producing/message/message.rb, line 120
def invalid_attributes
  invalid_attrs = self.class.attribute_set.inject([]) do |attrs, attr|
    attrs << attr.name if attr.required? && self.attributes.fetch(attr.name).nil?
    attrs
  end
  Array(invalid_attrs) - self.class.private_attrs
end
message_type() click to toggle source
# File lib/emque/producing/message/message.rb, line 100
def message_type
  self.class.read_message_type
end
publish(publisher=nil) click to toggle source
# File lib/emque/producing/message/message.rb, line 133
def publish(publisher=nil)
  publisher ||= Emque::Producing.publisher
  log "publishing...", true
  if valid?
    log "valid...", true
    if Emque::Producing.configuration.publish_messages
      message = process_middleware(to_json)
      sent = publisher.publish(topic, message_type, message, partition_key, raise_on_failure?)
      log "sent #{sent}"
      raise MessagesNotSentError.new unless sent
    end
  else
    log "failed...", true
    raise InvalidMessageError.new(invalid_message)
  end
rescue *ignored_exceptions => error
  if raise_on_failure?
    raise
  else
    log "failed ignoring exception... #{error}", true
  end
end
raise_on_failure?() click to toggle source
# File lib/emque/producing/message/message.rb, line 104
def raise_on_failure?
  self.class.read_raise_on_failure
end
to_json() click to toggle source
# File lib/emque/producing/message/message.rb, line 128
def to_json
  data = self.add_metadata
  Oj.dump(data, :mode => :compat)
end
topic() click to toggle source
# File lib/emque/producing/message/message.rb, line 96
def topic
  self.class.read_topic
end
valid?() click to toggle source
# File lib/emque/producing/message/message.rb, line 112
def valid?
  if invalid_attributes.empty? && topic && message_type
    true
  else
    false
  end
end

Private Instance Methods

app_name() click to toggle source
# File lib/emque/producing/message/message.rb, line 180
def app_name
  Emque::Producing.configuration.app_name || raise("Messages must have an app name configured.")
end
formatted_time() click to toggle source
# File lib/emque/producing/message/message.rb, line 172
def formatted_time
  DateTime.now.new_offset(0).to_time.utc.iso8601
end
hostname() click to toggle source
# File lib/emque/producing/message/message.rb, line 168
def hostname
  Emque::Producing.hostname
end
invalid_message() click to toggle source
# File lib/emque/producing/message/message.rb, line 158
def invalid_message
  if !topic
    "A topic is required"
  elsif !message_type
    "A message type is required"
  else
    "Required attributes #{invalid_attributes} are missing."
  end
end
log(message, include_message = false) click to toggle source
# File lib/emque/producing/message/message.rb, line 184
def log(message, include_message = false)
  if Emque::Producing.configuration.log_publish_message
    message = "#{message} #{to_json}" if include_message
    Emque::Producing.logger.info("MESSAGE LOG: #{message}")
  end
end
middleware() click to toggle source
# File lib/emque/producing/message/message.rb, line 191
def middleware
  self.class.middleware + Emque::Producing.configuration.middleware
end
middleware?() click to toggle source
# File lib/emque/producing/message/message.rb, line 195
def middleware?
  middleware.count > 0
end
process_middleware(str) click to toggle source
# File lib/emque/producing/message/message.rb, line 199
def process_middleware(str)
  if middleware?
    middleware.inject(str) { |compiled, callable|
      callable.call(compiled)
    }
  else
    str
  end
end
public_attributes() click to toggle source
# File lib/emque/producing/message/message.rb, line 209
def public_attributes
  public = self.class.attribute_set.select do |attr|
    attr && !self.class.private_attrs.include?(attr.name)
  end.map(&:name)
  slice_attributes(*public)
end
slice_attributes(*keys) click to toggle source
# File lib/emque/producing/message/message.rb, line 216
def slice_attributes(*keys)
  keys.map!(&:to_sym)
  attributes.select { |key, value| keys.include?(key) }
end
uuid() click to toggle source
# File lib/emque/producing/message/message.rb, line 176
def uuid
  SecureRandom.uuid
end