class Kinesis::Aggregation::Deaggregator

Constants

DIGEST_SIZE
MAGIC

Public Class Methods

new(raw_record) click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 11
def initialize(raw_record)
  @raw_record = raw_record.with_indifferent_access
end

Public Instance Methods

deaggregate() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 15
def deaggregate
  return [kinesis_record] unless aggregated_record? && computed_md5 == kinesis_record_md5

  aggregated_record.records.map do |record|
    base_record.merge(
      kinesis: {
        kinesisSchemaVersion: kinesis_record[:kinesis][:kinesisSchemaVersion],
        sequenceNumber: kinesis_record[:kinesis][:sequenceNumber],
        approximateArrivalTimestamp: kinesis_record[:kinesis][:approximateArrivalTimestamp],
        explicitHashKey: explicit_hash_for_for(record),
        partitionKey: partition_key_for(record),
        data: Base64.encode64(record.data),
        recordId: kinesis_record[:kinesis][:recordId]
      }
    )
  end
end

Private Instance Methods

aggregated_record() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 38
def aggregated_record
  @aggregated_record ||= AggregatedRecord.decode(kinesis_record_message_data)
end
aggregated_record?() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 34
def aggregated_record?
  data[0..MAGIC.length - 1] == MAGIC
end
base_record() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 42
def base_record
  kinesis_record.reject { |k, _v| k == :kinesis }
end
computed_md5() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 76
def computed_md5
  Digest::MD5.digest(kinesis_record_message_data)
end
data() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 46
def data
  @data ||= Base64.decode64(kinesis_record[:kinesis][:data])
end
explicit_hash_for_for(record) click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 50
def explicit_hash_for_for(record)
  aggregated_record.explicit_hash_key_table[record.explicit_hash_key_index]
end
kinesis_record() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 54
def kinesis_record
  @kinesis_record ||= begin
                      if @raw_record.has_key?(:kinesisStreamRecordMetadata)
                        KinesisAnalyticsConverter.new(@raw_record).convert
                      else
                        @raw_record
                      end
                    end
end
kinesis_record_md5() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 68
def kinesis_record_md5
  data[data.length - 16..-1]
end
kinesis_record_message_data() click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 72
def kinesis_record_message_data
  data[MAGIC.length..-17]
end
partition_key_for(record) click to toggle source
# File lib/kinesis/aggregation/deaggregator.rb, line 64
def partition_key_for(record)
  aggregated_record.partition_key_table[record.partition_key_index]
end