class Kinesis::Aggregation::Aggregator
Constants
- MAGIC
Public Class Methods
new()
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 6 def initialize @user_records = [] end
Public Instance Methods
add_user_record(user_record)
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 10 def add_user_record(user_record) @user_records << user_record end
aggregate!()
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 14 def aggregate! result = { partition_key: @user_records.first[:partition_key], explicit_hash_key: @user_records.first[:explicit_hash_key] || '', data: Base64.encode64(data) } @user_records = [] result end
num_user_records()
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 24 def num_user_records @user_records.length end
Private Instance Methods
aggregated_record()
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 30 def aggregated_record AggregatedRecord.new( partition_key_table: @user_records.map { |r| r[:partition_key] }, explicit_hash_key_table: explicit_hash_key_table, records: records ) end
data()
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 38 def data bytes = AggregatedRecord.encode(aggregated_record) MAGIC + bytes + Digest::MD5.digest(bytes) end
explicit_hash_key_table()
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 43 def explicit_hash_key_table @user_records.map { |r| r[:explicit_hash_key] }.compact end
records()
click to toggle source
# File lib/kinesis/aggregation/aggregator.rb, line 47 def records @user_records.map.with_index do |user_record, index| record = Record.new( partition_key_index: index, data: user_record[:data] ) record.explicit_hash_key_index = index if user_record[:explicit_hash_key] record end end