class Kcl::Worker
Constants
- PROCESS_INTERVAL
Public Class Methods
new(id, record_processor_factory)
click to toggle source
# File lib/kcl/worker.rb, line 11 def initialize(id, record_processor_factory) @id = id @record_processor_factory = record_processor_factory @live_shards = {} # Map<String, Boolean> @shards = {} # Map<String, Kcl::Workers::ShardInfo> @kinesis = nil # Kcl::Proxies::KinesisProxy @checkpointer = nil # Kcl::Checkpointer @timer = nil end
run(id, record_processor_factory)
click to toggle source
# File lib/kcl/worker.rb, line 6 def self.run(id, record_processor_factory) worker = self.new(id, record_processor_factory) worker.start end
Public Instance Methods
available_lease_shard?()
click to toggle source
Count the number of leases hold by worker excluding the processed shard @return [Boolean]
# File lib/kcl/worker.rb, line 91 def available_lease_shard? leased_count = @shards.values.inject(0) do |num, shard| shard.lease_owner == @id && !shard.completed? ? num + 1 : num end Kcl.config.max_lease_count > leased_count end
cleanup()
click to toggle source
Cleanup resources
# File lib/kcl/worker.rb, line 57 def cleanup @live_shards = {} @shards = {} @kinesis = nil @checkpointer = nil end
consume_shards!()
click to toggle source
Process records by shard
# File lib/kcl/worker.rb, line 99 def consume_shards! threads = [] @shards.each do |shard_id, shard| # already owner of the shard next if shard.lease_owner == @id begin shard = checkpointer.fetch_checkpoint(shard) rescue Kcl::Errors::CheckpointNotFoundError Kcl.logger.info("Not found checkpoint of shard at #{shard.to_h}") next end # shard is closed and processed all records next if shard.completed? shard = checkpointer.lease(shard, @id) threads << Thread.new do begin consumer = Kcl::Workers::Consumer.new( shard, @record_processor_factory.create_processor, kinesis, checkpointer ) consumer.consume! ensure shard = checkpointer.remove_lease_owner(shard) Kcl.logger.info("Finish to consume shard at shard_id: #{shard_id}") end end end threads.each(&:join) end
shutdown(signal = :NONE)
click to toggle source
Shutdown gracefully
# File lib/kcl/worker.rb, line 43 def shutdown(signal = :NONE) unless @timer.nil? @timer.cancel @timer = nil end EM.stop Kcl.logger.info("Shutdown worker with signal #{signal} at #{object_id}") rescue => e Kcl.logger.error("#{e.class}: #{e.message}") raise e end
start()
click to toggle source
Start consuming data from the stream, and pass it to the application record processors.
# File lib/kcl/worker.rb, line 23 def start Kcl.logger.info("Start worker at #{object_id}") EM.run do trap_signals @timer = EM::PeriodicTimer.new(PROCESS_INTERVAL) do sync_shards! consume_shards! if available_lease_shard? end end cleanup Kcl.logger.info("Finish worker at #{object_id}") rescue => e Kcl.logger.error("#{e.class}: #{e.message}") raise e end
sync_shards!()
click to toggle source
Add new shards and delete unused shards
# File lib/kcl/worker.rb, line 65 def sync_shards! @live_shards.transform_values! { |_| false } kinesis.shards.each do |shard| @live_shards[shard.shard_id] = true next if @shards[shard.shard_id] @shards[shard.shard_id] = Kcl::Workers::ShardInfo.new( shard.shard_id, shard.parent_shard_id, shard.sequence_number_range ) Kcl.logger.info("Found new shard at shard_id: #{shard.shard_id}") end @live_shards.each do |shard_id, alive| next if alive checkpointer.remove_lease(@shards[shard_id]) @shards.delete(shard_id) Kcl.logger.info("Remove shard at shard_id: #{shard_id}") end @shards end
Private Instance Methods
checkpointer()
click to toggle source
# File lib/kcl/worker.rb, line 144 def checkpointer if @checkpointer.nil? @checkpointer = Kcl::Checkpointer.new(Kcl.config) Kcl.logger.info('Created Checkpoint in worker') end @checkpointer end
kinesis()
click to toggle source
# File lib/kcl/worker.rb, line 136 def kinesis if @kinesis.nil? @kinesis = Kcl::Proxies::KinesisProxy.new(Kcl.config) Kcl.logger.info('Created Kinesis session in worker') end @kinesis end
trap_signals()
click to toggle source
# File lib/kcl/worker.rb, line 152 def trap_signals [:HUP, :INT, :TERM].each do |signal| trap signal do EM.add_timer(0) { shutdown(signal) } end end end