class Fluent::Plugin::KinesisHelper::Aggregator

Constants

AggregatedRecord
MagicNumber
Record
Tag

Public Instance Methods

aggregate(records, partition_key) click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 48
def aggregate(records, partition_key)
  message = AggregatedRecord.encode(AggregatedRecord.new(
    partition_key_table: ['a', partition_key],
    records: records.map{|data|
      Record.new(partition_key_index: 1, data: data)
    },
  ))
  [MagicNumber, message, Digest::MD5.digest(message)].pack("A4A*A16")
end
aggregated?(encoded) click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 72
def aggregated?(encoded)
  encoded[0..3] == MagicNumber
end
aggregated_size_offset(partition_key) click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 76
def aggregated_size_offset(partition_key)
  data = 'd'
  encoded = aggregate([record(data)], partition_key)
  finalize(encoded).size - data.size
end
deaggregate(encoded) click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 58
def deaggregate(encoded)
  unless aggregated?(encoded)
    raise InvalidEncodingError, "Invalid MagicNumber #{encoded[0..3]}}"
  end
  message, digest = encoded[4..encoded.length-17], encoded[encoded.length-16..-1]
  if Digest::MD5.digest(message) != digest
    raise InvalidEncodingError, "Digest mismatch #{digest}"
  end
  decoded = AggregatedRecord.decode(message)
  records = decoded.records.map(&:data)
  partition_key = decoded.partition_key_table[1]
  [records, partition_key]
end