class Kcl::Workers::Consumer
Shard : Consumer
= 1 : 1
-
get records from stream
-
send to record processor
-
create record checkpoint
Public Class Methods
new(shard, record_processor, kinesis_proxy, checkpointer)
click to toggle source
# File lib/kcl/workers/consumer.rb, line 7 def initialize(shard, record_processor, kinesis_proxy, checkpointer) @shard = shard @record_processor = record_processor @kinesis = kinesis_proxy @checkpointer = checkpointer end
Public Instance Methods
consume!()
click to toggle source
# File lib/kcl/workers/consumer.rb, line 14 def consume! initialize_input = create_initialize_input @record_processor.after_initialize(initialize_input) record_checkpointer = Kcl::Workers::RecordCheckpointer.new(@shard, @checkpointer) shard_iterator = start_shard_iterator loop do result = @kinesis.get_records(shard_iterator) records_input = create_records_input( result[:records], result[:millis_behind_latest], record_checkpointer ) @record_processor.process_records(records_input) shard_iterator = result[:next_shard_iterator] break if result[:records].empty? || shard_iterator.nil? end shutdown_reason = shard_iterator.nil? ? Kcl::Workers::ShutdownReason::TERMINATE : Kcl::Workers::ShutdownReason::REQUESTED shutdown_input = create_shutdown_input(shutdown_reason, record_checkpointer) @record_processor.shutdown(shutdown_input) end
create_initialize_input()
click to toggle source
# File lib/kcl/workers/consumer.rb, line 58 def create_initialize_input Kcl::Types::InitializationInput.new( @shard.shard_id, Kcl::Types::ExtendedSequenceNumber.new(@shard.checkpoint) ) end
create_records_input(records, millis_behind_latest, record_checkpointer)
click to toggle source
# File lib/kcl/workers/consumer.rb, line 65 def create_records_input(records, millis_behind_latest, record_checkpointer) Kcl::Types::RecordsInput.new( records, millis_behind_latest, record_checkpointer ) end
create_shutdown_input(shutdown_reason, record_checkpointer)
click to toggle source
# File lib/kcl/workers/consumer.rb, line 73 def create_shutdown_input(shutdown_reason, record_checkpointer) Kcl::Types::ShutdownInput.new( shutdown_reason, record_checkpointer ) end
start_shard_iterator()
click to toggle source
# File lib/kcl/workers/consumer.rb, line 42 def start_shard_iterator shard = @checkpointer.fetch_checkpoint(@shard) if shard.checkpoint.nil? return @kinesis.get_shard_iterator( @shard.shard_id, Kcl::Checkpoints::Sentinel::TRIM_HORIZON ) end @kinesis.get_shard_iterator( @shard.shard_id, Kcl::Checkpoints::Sentinel::AFTER_SEQUENCE_NUMBER, @shard.checkpoint ) end