class Kinesis::Aggregation::Deaggregator
Constants
- DIGEST_SIZE
- MAGIC
Public Class Methods
new(raw_record)
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 11 def initialize(raw_record) @raw_record = raw_record.with_indifferent_access end
Public Instance Methods
deaggregate()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 15 def deaggregate return [kinesis_record] unless aggregated_record? && computed_md5 == kinesis_record_md5 aggregated_record.records.map do |record| base_record.merge( kinesis: { kinesisSchemaVersion: kinesis_record[:kinesis][:kinesisSchemaVersion], sequenceNumber: kinesis_record[:kinesis][:sequenceNumber], approximateArrivalTimestamp: kinesis_record[:kinesis][:approximateArrivalTimestamp], explicitHashKey: explicit_hash_for_for(record), partitionKey: partition_key_for(record), data: Base64.encode64(record.data), recordId: kinesis_record[:kinesis][:recordId] } ) end end
Private Instance Methods
aggregated_record()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 38 def aggregated_record @aggregated_record ||= AggregatedRecord.decode(kinesis_record_message_data) end
aggregated_record?()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 34 def aggregated_record? data[0..MAGIC.length - 1] == MAGIC end
base_record()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 42 def base_record kinesis_record.reject { |k, _v| k == :kinesis } end
computed_md5()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 76 def computed_md5 Digest::MD5.digest(kinesis_record_message_data) end
data()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 46 def data @data ||= Base64.decode64(kinesis_record[:kinesis][:data]) end
explicit_hash_for_for(record)
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 50 def explicit_hash_for_for(record) aggregated_record.explicit_hash_key_table[record.explicit_hash_key_index] end
kinesis_record()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 54 def kinesis_record @kinesis_record ||= begin if @raw_record.has_key?(:kinesisStreamRecordMetadata) KinesisAnalyticsConverter.new(@raw_record).convert else @raw_record end end end
kinesis_record_md5()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 68 def kinesis_record_md5 data[data.length - 16..-1] end
kinesis_record_message_data()
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 72 def kinesis_record_message_data data[MAGIC.length..-17] end
partition_key_for(record)
click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 64 def partition_key_for(record) aggregated_record.partition_key_table[record.partition_key_index] end