class Poseidon::Message
The Message
class is used by both Producer
and Consumer classes.
Basic usage¶ ↑
message = Poseidon::Message.new(:value => "hello", :key => "user:123", :topic => "salutations")
Sending a message¶ ↑
When sending a message you must set the topic for the message, this can be done during creation or afterwards.
Compression
¶ ↑
In normal usage you should never have to worry about compressed Message
objects. When producing the producer takes care of compressing the messages and when fetching the fetcher will return them decompressed.
@api private
Constants
- COMPRESSION_MASK
Last 3 bits are used to indicate compression
- MAGIC_TYPE
Attributes
Public Class Methods
Create a new message object
@param [Hash] options
@option options [String] :value (nil)
The messages value. Optional.
@option options [String] :key (nil)
The messages key. Optional.
@option options [String] :topic (nil)
The topic we should send this message to. Optional.
@option options [String] :attributes (nil)
Attributes field for the message currently only idicates whether or not the message is compressed.
# File lib/poseidon/message.rb, line 67 def initialize(options = {}) build_struct(options) @topic = options.delete(:topic) if options.any? raise ArgumentError, "Unknown options: #{options.keys.inspect}" end end
Build a new Message
object from its binary representation
@param [ResponseBuffer] buffer
a response buffer containing binary data representing a message.
@return [Message]
# File lib/poseidon/message.rb, line 34 def self.read(buffer) m = Message.new m.struct = Protocol::MessageWithOffsetStruct.read(buffer) # Return nil if the message is truncated. if m.struct.message.truncated? return nil end if m.struct.message.checksum_failed? raise Errors::ChecksumError end m end
Public Instance Methods
# File lib/poseidon/message.rb, line 77 def ==(other) eql?(other) end
Is the value compressed?
@return [Boolean]
# File lib/poseidon/message.rb, line 116 def compressed? compression_codec_id > 0 end
Decompressed value
@return [String] decompressed value
# File lib/poseidon/message.rb, line 123 def decompressed_value compression_codec.decompress(value) end
# File lib/poseidon/message.rb, line 81 def eql?(other) struct.eql?(other.struct) end
@return [String] the Message’s key
# File lib/poseidon/message.rb, line 99 def key @struct.message.key end
# File lib/poseidon/message.rb, line 85 def objects_with_errors struct.objects_with_errors end
@return [Integer] the Message’s offset
# File lib/poseidon/message.rb, line 109 def offset @struct.offset end
@return [String] the Message’s value
# File lib/poseidon/message.rb, line 104 def value @struct.message.value end
Write a binary representation of the message to buffer
@param [RequestBuffer] buffer @return [nil]
# File lib/poseidon/message.rb, line 93 def write(buffer) @struct.write(buffer) nil end
Private Instance Methods
# File lib/poseidon/message.rb, line 128 def attributes @struct.message.attributes end
# File lib/poseidon/message.rb, line 140 def build_struct(options) message_struct = Protocol::MessageStruct.new( MAGIC_TYPE, options.delete(:attributes) || 0, options.delete(:key), options.delete(:value) ) struct = Protocol::MessageWithOffsetStruct.new(options.delete(:offset) || 0, message_struct) self.struct = struct end
# File lib/poseidon/message.rb, line 132 def compression_codec Compression.find_codec(compression_codec_id) end
# File lib/poseidon/message.rb, line 136 def compression_codec_id attributes & COMPRESSION_MASK end