class Fluent::Plugin::KinesisHelper::Aggregator
Constants
- AggregatedRecord
- MagicNumber
- Record
- Tag
Public Instance Methods
aggregate(records, partition_key)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 48 def aggregate(records, partition_key) message = AggregatedRecord.encode(AggregatedRecord.new( partition_key_table: ['a', partition_key], records: records.map{|data| Record.new(partition_key_index: 1, data: data) }, )) [MagicNumber, message, Digest::MD5.digest(message)].pack("A4A*A16") end
aggregated?(encoded)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 72 def aggregated?(encoded) encoded[0..3] == MagicNumber end
aggregated_size_offset(partition_key)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 76 def aggregated_size_offset(partition_key) data = 'd' encoded = aggregate([record(data)], partition_key) finalize(encoded).size - data.size end
deaggregate(encoded)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/aggregator.rb, line 58 def deaggregate(encoded) unless aggregated?(encoded) raise InvalidEncodingError, "Invalid MagicNumber #{encoded[0..3]}}" end message, digest = encoded[4..encoded.length-17], encoded[encoded.length-16..-1] if Digest::MD5.digest(message) != digest raise InvalidEncodingError, "Digest mismatch #{digest}" end decoded = AggregatedRecord.decode(message) records = decoded.records.map(&:data) partition_key = decoded.partition_key_table[1] [records, partition_key] end