class Aws::EventStream::Decoder
This class provides method for decoding binary inputs into single or multiple messages (Aws::EventStream::Message
).
-
{#decode} - decodes messages from an IO like object responds to read that containing binary data, returning decoded
Aws::EventStream::Message
along the way or wrapped in an enumerator
## Examples
decoder = Aws::EventStream::Decoder.new # decoding from IO decoder.decode(io) do |message| message.headers # => { ... } message.payload # => StringIO / Tempfile end # alternatively message_pool = decoder.decode(io) message_pool.next # => Aws::EventStream::Message
-
{#decode_chunk} - decodes a single message from a chunk of data, returning message object followed by boolean(indicating eof status of data) in an array object
## Examples
# chunk containing exactly one message data message, chunk_eof = decoder.decode_chunk(chunk_str) message # => Aws::EventStream::Message chunk_eof # => true # chunk containing a partial message message, chunk_eof = decoder.decode_chunk(chunk_str) message # => nil chunk_eof # => true # chunk data is saved at decoder's message_buffer # chunk containing more that one data message message, chunk_eof = decoder.decode_chunk(chunk_str) message # => Aws::EventStream::Message chunk_eof # => false # extra chunk data is saved at message_buffer of the decoder
Constants
- ONE_MEGABYTE
- OVERHEAD_LENGTH
bytes of total overhead in a message, including prelude and 4 bytes total message crc checksum
- PRELUDE_LENGTH
bytes of prelude part, including 4 bytes of total message length, headers length and crc checksum of prelude
Attributes
@returns [BytesBuffer]
Public Class Methods
@options options [Boolean] format (true) When `false`
disable user-friendly formatting for message header values including timestamp and uuid etc.
# File lib/aws-eventstream/decoder.rb, line 79 def initialize(options = {}) @format = options.fetch(:format, true) @message_buffer = BytesBuffer.new('') end
Public Instance Methods
Decodes messages from a binary stream
@param [IO#read] io An IO-like object
that responds to `#read`
@yieldparam [Message] message @return [Enumerable<Message>, nil] Returns a new Enumerable
containing decoded messages if no block is given
# File lib/aws-eventstream/decoder.rb, line 95 def decode(io, &block) io = BytesBuffer.new(io.read) return decode_io(io) unless block_given? until io.eof? # fetch message only yield(decode_message(io).first) end end
Decodes a single message from a chunk of string
@param [String] chunk A chunk of string to be decoded,
chunk can contain partial event message to multiple event messages When not provided, decode data from #message_buffer
@return [Array<Message|nil, Boolean>] Returns single decoded message
and boolean pair, the boolean flag indicates whether this chunk has been fully consumed, unused data is tracked at #message_buffer
# File lib/aws-eventstream/decoder.rb, line 113 def decode_chunk(chunk = nil) @message_buffer.write(chunk) if chunk @message_buffer.rewind decode_message(@message_buffer) end
Private Instance Methods
# File lib/aws-eventstream/decoder.rb, line 166 def context(io, total_len, headers_len, prelude_buffer) # buffer rest of the message except prelude length # including context and total message checksum buffer = BytesBuffer.new(io.read(total_len - PRELUDE_LENGTH)) context_len = total_len - OVERHEAD_LENGTH prelude_buffer.rewind checksum = Zlib.crc32(prelude_buffer.read << buffer.read(context_len)) unless checksum == unpack_uint32(buffer) raise Errors::MessageChecksumError end buffer.rewind [ extract_headers(BytesBuffer.new(buffer.read(headers_len))), extract_payload(BytesBuffer.new(buffer.read(context_len - headers_len))) ] end
# File lib/aws-eventstream/decoder.rb, line 121 def decode_io(io) ::Enumerator.new {|e| e << decode_message(io) unless io.eof? } end
# File lib/aws-eventstream/decoder.rb, line 125 def decode_message(io) # incomplete message prelude received, leave it in the buffer return [nil, true] if io.bytesize < PRELUDE_LENGTH # decode prelude total_len, headers_len, prelude_buffer = prelude(io) # incomplete message received, leave it in the buffer return [nil, true] if io.bytesize < total_len # decode headers and payload headers, payload = context(io, total_len, headers_len, prelude_buffer) # track extra message data in the buffer if exists # for #decode_chunk, io is @message_buffer if eof = io.eof? @message_buffer.clear! else @message_buffer = BytesBuffer.new(@message_buffer.read) end [Message.new(headers: headers, payload: payload), eof] end
# File lib/aws-eventstream/decoder.rb, line 185 def extract_headers(buffer) headers = {} until buffer.eof? # header key key_len = unpack_uint8(buffer) key = buffer.read(key_len) # header value value_type = Types.types[unpack_uint8(buffer)] unpack_pattern, value_len, _ = Types.pattern[value_type] if !!unpack_pattern == unpack_pattern # boolean types won't have value specified value = unpack_pattern else value_len = unpack_uint16(buffer) unless value_len value = unpack_pattern ? buffer.read(value_len).unpack(unpack_pattern)[0] : buffer.read(value_len) end headers[key] = HeaderValue.new( format: @format, value: value, type: value_type ) end headers end
# File lib/aws-eventstream/decoder.rb, line 214 def extract_payload(buffer) buffer.bytesize <= ONE_MEGABYTE ? payload_stringio(buffer) : payload_tempfile(buffer) end
# File lib/aws-eventstream/decoder.rb, line 220 def payload_stringio(buffer) StringIO.new(buffer.read) end
# File lib/aws-eventstream/decoder.rb, line 224 def payload_tempfile(buffer) payload = Tempfile.new payload.binmode until buffer.eof? payload.write(buffer.read(ONE_MEGABYTE)) end payload.rewind payload end
# File lib/aws-eventstream/decoder.rb, line 149 def prelude(io) # buffer prelude into bytes buffer # prelude contains length of message and headers, # followed with CRC checksum of itself buffer = BytesBuffer.new(io.read(PRELUDE_LENGTH)) # prelude checksum takes last 4 bytes checksum = Zlib.crc32(buffer.read(PRELUDE_LENGTH - 4)) unless checksum == unpack_uint32(buffer) raise Errors::PreludeChecksumError end buffer.rewind total_len, headers_len, _ = buffer.read.unpack("N*") [total_len, headers_len, buffer] end
# File lib/aws-eventstream/decoder.rb, line 240 def unpack_uint16(buffer) buffer.read(2).unpack("S>")[0] end
overhead decode helpers
# File lib/aws-eventstream/decoder.rb, line 236 def unpack_uint32(buffer) buffer.read(4).unpack("N")[0] end
# File lib/aws-eventstream/decoder.rb, line 244 def unpack_uint8(buffer) buffer.readbyte.unpack("C")[0] end