class Aws::EventStream::Decoder

This class provides method for decoding binary inputs into single or multiple messages (Aws::EventStream::Message).

## Examples

decoder = Aws::EventStream::Decoder.new

# decoding from IO
decoder.decode(io) do |message|
  message.headers
  # => { ... }
  message.payload
  # => StringIO / Tempfile
end

# alternatively
message_pool = decoder.decode(io)
message_pool.next
# => Aws::EventStream::Message

## Examples

# chunk containing exactly one message data
message, chunk_eof = decoder.decode_chunk(chunk_str)
message
# => Aws::EventStream::Message
chunk_eof
# => true

# chunk containing a partial message
message, chunk_eof = decoder.decode_chunk(chunk_str)
message
# => nil
chunk_eof
# => true
# chunk data is saved at decoder's message_buffer

# chunk containing more that one data message
message, chunk_eof = decoder.decode_chunk(chunk_str)
message
# => Aws::EventStream::Message
chunk_eof
# => false
# extra chunk data is saved at message_buffer of the decoder

Constants

ONE_MEGABYTE
OVERHEAD_LENGTH

bytes of total overhead in a message, including prelude and 4 bytes total message crc checksum

PRELUDE_LENGTH

bytes of prelude part, including 4 bytes of total message length, headers length and crc checksum of prelude

Attributes

message_buffer[R]

@returns [BytesBuffer]

Public Class Methods

new(options = {}) click to toggle source

@options options [Boolean] format (true) When `false`

disable user-friendly formatting for message header values
including timestamp and uuid etc.
# File lib/aws-eventstream/decoder.rb, line 79
def initialize(options = {})
  @format = options.fetch(:format, true)
  @message_buffer = BytesBuffer.new('')
end

Public Instance Methods

decode(io) { |decode_message(io).first| ... } click to toggle source

Decodes messages from a binary stream

@param [IO#read] io An IO-like object

that responds to `#read`

@yieldparam [Message] message @return [Enumerable<Message>, nil] Returns a new Enumerable

containing decoded messages if no block is given
# File lib/aws-eventstream/decoder.rb, line 95
def decode(io, &block)
  io = BytesBuffer.new(io.read)
  return decode_io(io) unless block_given?
  until io.eof?
    # fetch message only
    yield(decode_message(io).first)
  end
end
decode_chunk(chunk = nil) click to toggle source

Decodes a single message from a chunk of string

@param [String] chunk A chunk of string to be decoded,

chunk can contain partial event message to multiple event messages
When not provided, decode data from #message_buffer

@return [Array<Message|nil, Boolean>] Returns single decoded message

and boolean pair, the boolean flag indicates whether this chunk
has been fully consumed, unused data is tracked at #message_buffer
# File lib/aws-eventstream/decoder.rb, line 113
def decode_chunk(chunk = nil)
  @message_buffer.write(chunk) if chunk
  @message_buffer.rewind
  decode_message(@message_buffer)
end

Private Instance Methods

context(io, total_len, headers_len, prelude_buffer) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 166
def context(io, total_len, headers_len, prelude_buffer)
  # buffer rest of the message except prelude length
  # including context and total message checksum
  buffer = BytesBuffer.new(io.read(total_len - PRELUDE_LENGTH))
  context_len = total_len - OVERHEAD_LENGTH

  prelude_buffer.rewind
  checksum = Zlib.crc32(prelude_buffer.read << buffer.read(context_len))
  unless checksum == unpack_uint32(buffer)
    raise Errors::MessageChecksumError
  end

  buffer.rewind
  [
    extract_headers(BytesBuffer.new(buffer.read(headers_len))),
    extract_payload(BytesBuffer.new(buffer.read(context_len - headers_len)))
  ]
end
decode_io(io) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 121
def decode_io(io)
  ::Enumerator.new {|e| e << decode_message(io) unless io.eof? }
end
decode_message(io) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 125
def decode_message(io)
  # incomplete message prelude received, leave it in the buffer
  return [nil, true] if io.bytesize < PRELUDE_LENGTH

  # decode prelude
  total_len, headers_len, prelude_buffer = prelude(io)

  # incomplete message received, leave it in the buffer
  return [nil, true] if io.bytesize < total_len

  # decode headers and payload
  headers, payload = context(io, total_len, headers_len, prelude_buffer)

  # track extra message data in the buffer if exists
  # for #decode_chunk, io is @message_buffer
  if eof = io.eof?
    @message_buffer.clear!
  else
    @message_buffer = BytesBuffer.new(@message_buffer.read)
  end

  [Message.new(headers: headers, payload: payload), eof]
end
extract_headers(buffer) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 185
def extract_headers(buffer)
  headers = {}
  until buffer.eof?
    # header key
    key_len = unpack_uint8(buffer)
    key = buffer.read(key_len)

    # header value
    value_type = Types.types[unpack_uint8(buffer)]
    unpack_pattern, value_len, _ = Types.pattern[value_type]
    if !!unpack_pattern == unpack_pattern
      # boolean types won't have value specified
      value = unpack_pattern
    else
      value_len = unpack_uint16(buffer) unless value_len
      value = unpack_pattern ?
        buffer.read(value_len).unpack(unpack_pattern)[0] :
        buffer.read(value_len)
    end

    headers[key] = HeaderValue.new(
      format: @format,
      value: value,
      type: value_type
    )
  end
  headers
end
extract_payload(buffer) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 214
def extract_payload(buffer)
  buffer.bytesize <= ONE_MEGABYTE ?
    payload_stringio(buffer) :
    payload_tempfile(buffer)
end
payload_stringio(buffer) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 220
def payload_stringio(buffer)
  StringIO.new(buffer.read)
end
payload_tempfile(buffer) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 224
def payload_tempfile(buffer)
  payload = Tempfile.new
  payload.binmode
  until buffer.eof?
    payload.write(buffer.read(ONE_MEGABYTE))
  end
  payload.rewind
  payload
end
prelude(io) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 149
def prelude(io)
  # buffer prelude into bytes buffer
  # prelude contains length of message and headers,
  # followed with CRC checksum of itself
  buffer = BytesBuffer.new(io.read(PRELUDE_LENGTH))

  # prelude checksum takes last 4 bytes
  checksum = Zlib.crc32(buffer.read(PRELUDE_LENGTH - 4))
  unless checksum == unpack_uint32(buffer)
    raise Errors::PreludeChecksumError
  end

  buffer.rewind
  total_len, headers_len, _ = buffer.read.unpack("N*")
  [total_len, headers_len, buffer]
end
unpack_uint16(buffer) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 240
def unpack_uint16(buffer)
  buffer.read(2).unpack("S>")[0]
end
unpack_uint32(buffer) click to toggle source

overhead decode helpers

# File lib/aws-eventstream/decoder.rb, line 236
def unpack_uint32(buffer)
  buffer.read(4).unpack("N")[0]
end
unpack_uint8(buffer) click to toggle source
# File lib/aws-eventstream/decoder.rb, line 244
def unpack_uint8(buffer)
  buffer.readbyte.unpack("C")[0]
end