class Kinesis::Aggregation::Aggregator

Constants

MAGIC

Public Class Methods

new() click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 6
def initialize
  @user_records = []
end

Public Instance Methods

add_user_record(user_record) click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 10
def add_user_record(user_record)
  @user_records << user_record
end
aggregate!() click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 14
def aggregate!
  result = {
    partition_key: @user_records.first[:partition_key],
    explicit_hash_key: @user_records.first[:explicit_hash_key] || '',
    data: Base64.encode64(data)
  }
  @user_records = []
  result
end
num_user_records() click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 24
def num_user_records
  @user_records.length
end

Private Instance Methods

aggregated_record() click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 30
def aggregated_record
  AggregatedRecord.new(
    partition_key_table: @user_records.map { |r| r[:partition_key] },
    explicit_hash_key_table: explicit_hash_key_table,
    records: records
  )
end
data() click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 38
def data
  bytes = AggregatedRecord.encode(aggregated_record)
  MAGIC + bytes + Digest::MD5.digest(bytes)
end
explicit_hash_key_table() click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 43
def explicit_hash_key_table
  @user_records.map { |r| r[:explicit_hash_key] }.compact
end
records() click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 47
def records
  @user_records.map.with_index do |user_record, index|
    record = Record.new(
      partition_key_index: index,
      data: user_record[:data]
    )
    record.explicit_hash_key_index = index if user_record[:explicit_hash_key]
    record
  end

end