class Messagebus::Message

Container class for frames, misnamed technically

Attributes

raw_message[R]

Public Class Methods

create(payload, properties = nil, binary = false) click to toggle source
# File lib/messagebus/message.rb, line 46
def create(payload, properties = nil, binary = false)
  Messagebus::Message.create_message(define_thrift(payload, binary), properties)
end
create_message_from_message_internal(raw_message) click to toggle source

TODO: Why use this in utils when trying to follow a factory pattern?

# File lib/messagebus/message.rb, line 99
def create_message_from_message_internal(raw_message)
  Message.new(raw_message)
end
define_thrift(payload, binary = false) click to toggle source
# File lib/messagebus/message.rb, line 63
def define_thrift(payload, binary = false)
  options = {}
  
  if binary        
    if RUBY_VERSION.to_f >= 1.9 
        payload.force_encoding('UTF-8')
    end
    options.merge!({
      :messageFormat => Messagebus::Thrift::MessagePayloadType::BINARY,
      :binaryPayload => payload
    })
  elsif payload.is_a?(Hash) || (payload.respond_to?(:to_json) && !payload.is_a?(String))
    options.merge!({
      :messageFormat => Messagebus::Thrift::MessagePayloadType::JSON,
      :stringPayload => payload.to_json
    })
  elsif payload.is_a?(String)
    #Only UTF-8 is supported by thrift. Should warn to use binary if not ascii or utf-8 but there's no logger
    #I believe all users use utf8 or ascii.
    if RUBY_VERSION.to_f >= 1.9 
       payload.force_encoding('UTF-8')
    end
    options.merge!({
      :messageFormat => Messagebus::Thrift::MessagePayloadType::STRING,
      :stringPayload => payload
    })
  else
    # TODO: Create specific error class
    raise "Type not supported"
  end

  Messagebus::Thrift::MessagePayload.new(options)
end
get_message_from_thrift_binary(body) click to toggle source

Creates message from base64 encoded thrift bytes. We use Stomp protocol for server communication which internally uses string (utf-8). Base64 encoding is needed to avoid corner cases with weird bytes etc.

# File lib/messagebus/message.rb, line 54
def get_message_from_thrift_binary(body)
  binary_string = Base64.decode64(body)
  rmessage = nil
  @@deserializer_lock.synchronize do
    rmessage = @@deserializer.deserialize(Messagebus::Thrift::MessageInternal.new, binary_string)
  end
  Messagebus::Message.create_message_from_message_internal(rmessage)
end
new(raw_message) click to toggle source
# File lib/messagebus/message.rb, line 142
def initialize(raw_message)
  @raw_message = raw_message
end

Private Class Methods

create_message(payload, properties = nil) click to toggle source
# File lib/messagebus/message.rb, line 157
def create_message(payload, properties = nil)
  raw_message = Messagebus::Thrift::MessageInternal.new
  raw_message.messageId = Digest::MD5.hexdigest(get_salted_payload(payload))
  raw_message.payload = payload
  raw_message.properties = properties if properties

  Message.new(raw_message)
end
get_salted_payload(payload) click to toggle source

Returns the payload as a string salted with current milliseconds since epoch.

# File lib/messagebus/message.rb, line 149
def get_salted_payload(payload)
  t = Time.now
  data = payload.binary? ? payload.binaryPayload : payload.stringPayload
  data += t.to_i.to_s
  data += t.tv_usec.to_s 
  data
end

Public Instance Methods

message_id() click to toggle source
# File lib/messagebus/message.rb, line 104
def message_id
  @raw_message.messageId
end
message_properties() click to toggle source
# File lib/messagebus/message.rb, line 108
def message_properties
  @raw_message.properties
end
message_properties=(hash) click to toggle source
# File lib/messagebus/message.rb, line 112
def message_properties=(hash)
  @raw_message.properties = hash
end
payload() click to toggle source
# File lib/messagebus/message.rb, line 120
def payload
  payload = @raw_message.payload

  if payload.binary?
    @raw_message.payload.binaryPayload
  elsif payload.json? || payload.string?
    @raw_message.payload.stringPayload
  else
    raise "Payload is not an understandable type: #{payload.messageFormat}"
  end
end
payload_type() click to toggle source
# File lib/messagebus/message.rb, line 116
def payload_type
  @raw_message.payload.messageFormat
end
to_thrift_binary() click to toggle source
# File lib/messagebus/message.rb, line 132
def to_thrift_binary
  binary_string = nil
  @@serializer_lock.synchronize do
    binary_string = @@serializer.serialize(@raw_message)
  end
  Base64.encode64(binary_string);
end