class Poseidon::Message

The Message class is used by both Producer and Consumer classes.

Basic usage

message = Poseidon::Message.new(:value => "hello", 
                                :key => "user:123", 
                                :topic => "salutations")

Sending a message

When sending a message you must set the topic for the message, this can be done during creation or afterwards.

Compression

In normal usage you should never have to worry about compressed Message objects. When producing the producer takes care of compressing the messages and when fetching the fetcher will return them decompressed.

@api private

Constants

COMPRESSION_MASK

Last 3 bits are used to indicate compression

MAGIC_TYPE

Attributes

struct[RW]
topic[RW]

Public Class Methods

new(options = {}) click to toggle source

Create a new message object

@param [Hash] options

@option options [String] :value (nil)

The messages value. Optional.

@option options [String] :key (nil)

The messages key. Optional.

@option options [String] :topic (nil)

The topic we should send this message to. Optional.

@option options [String] :attributes (nil)

Attributes field for the message currently only idicates
whether or not the message is compressed.
# File lib/poseidon/message.rb, line 67
def initialize(options = {})
  build_struct(options)

  @topic = options.delete(:topic)

  if options.any?
    raise ArgumentError, "Unknown options: #{options.keys.inspect}"
  end
end
read(buffer) click to toggle source

Build a new Message object from its binary representation

@param [ResponseBuffer] buffer

a response buffer containing binary data representing a message.

@return [Message]

# File lib/poseidon/message.rb, line 34
def self.read(buffer)
  m = Message.new
  m.struct = Protocol::MessageWithOffsetStruct.read(buffer)

  # Return nil if the message is truncated.
  if m.struct.message.truncated?
    return nil
  end

  if m.struct.message.checksum_failed?
    raise Errors::ChecksumError
  end
  m
end

Public Instance Methods

==(other) click to toggle source
# File lib/poseidon/message.rb, line 77
def ==(other)
  eql?(other)
end
compressed?() click to toggle source

Is the value compressed?

@return [Boolean]

# File lib/poseidon/message.rb, line 116
def compressed?
  compression_codec_id > 0
end
decompressed_value() click to toggle source

Decompressed value

@return [String] decompressed value

# File lib/poseidon/message.rb, line 123
def decompressed_value
  compression_codec.decompress(value)
end
eql?(other) click to toggle source
# File lib/poseidon/message.rb, line 81
def eql?(other)
  struct.eql?(other.struct)
end
key() click to toggle source

@return [String] the Message’s key

# File lib/poseidon/message.rb, line 99
def key
  @struct.message.key
end
objects_with_errors() click to toggle source
# File lib/poseidon/message.rb, line 85
def objects_with_errors
  struct.objects_with_errors
end
offset() click to toggle source

@return [Integer] the Message’s offset

# File lib/poseidon/message.rb, line 109
def offset
  @struct.offset
end
value() click to toggle source

@return [String] the Message’s value

# File lib/poseidon/message.rb, line 104
def value
  @struct.message.value
end
write(buffer) click to toggle source

Write a binary representation of the message to buffer

@param [RequestBuffer] buffer @return [nil]

# File lib/poseidon/message.rb, line 93
def write(buffer)
  @struct.write(buffer)
  nil
end

Private Instance Methods

attributes() click to toggle source
# File lib/poseidon/message.rb, line 128
def attributes
  @struct.message.attributes
end
build_struct(options) click to toggle source
# File lib/poseidon/message.rb, line 140
def build_struct(options)
  message_struct = Protocol::MessageStruct.new(
    MAGIC_TYPE,
    options.delete(:attributes) || 0,
    options.delete(:key),
    options.delete(:value)
  )
  struct = Protocol::MessageWithOffsetStruct.new(options.delete(:offset) || 0, message_struct)
  self.struct = struct
end
compression_codec() click to toggle source
# File lib/poseidon/message.rb, line 132
def compression_codec
  Compression.find_codec(compression_codec_id)
end
compression_codec_id() click to toggle source
# File lib/poseidon/message.rb, line 136
def compression_codec_id
  attributes & COMPRESSION_MASK
end