class Kcl::Worker

Constants

PROCESS_INTERVAL

Public Class Methods

new(id, record_processor_factory) click to toggle source
# File lib/kcl/worker.rb, line 11
def initialize(id, record_processor_factory)
  @id = id
  @record_processor_factory = record_processor_factory
  @live_shards  = {} # Map<String, Boolean>
  @shards       = {} # Map<String, Kcl::Workers::ShardInfo>
  @kinesis      = nil # Kcl::Proxies::KinesisProxy
  @checkpointer = nil # Kcl::Checkpointer
  @timer        = nil
end
run(id, record_processor_factory) click to toggle source
# File lib/kcl/worker.rb, line 6
def self.run(id, record_processor_factory)
  worker = self.new(id, record_processor_factory)
  worker.start
end

Public Instance Methods

available_lease_shard?() click to toggle source

Count the number of leases hold by worker excluding the processed shard @return [Boolean]

# File lib/kcl/worker.rb, line 91
def available_lease_shard?
  leased_count = @shards.values.inject(0) do |num, shard|
    shard.lease_owner == @id && !shard.completed? ? num + 1 : num
  end
  Kcl.config.max_lease_count > leased_count
end
cleanup() click to toggle source

Cleanup resources

# File lib/kcl/worker.rb, line 57
def cleanup
  @live_shards  = {}
  @shards       = {}
  @kinesis      = nil
  @checkpointer = nil
end
consume_shards!() click to toggle source

Process records by shard

# File lib/kcl/worker.rb, line 99
def consume_shards!
  threads = []
  @shards.each do |shard_id, shard|
    # already owner of the shard
    next if shard.lease_owner == @id

    begin
      shard = checkpointer.fetch_checkpoint(shard)
    rescue Kcl::Errors::CheckpointNotFoundError
      Kcl.logger.info("Not found checkpoint of shard at #{shard.to_h}")
      next
    end
    # shard is closed and processed all records
    next if shard.completed?

    shard = checkpointer.lease(shard, @id)

    threads << Thread.new do
      begin
        consumer = Kcl::Workers::Consumer.new(
          shard,
          @record_processor_factory.create_processor,
          kinesis,
          checkpointer
        )
        consumer.consume!
      ensure
        shard = checkpointer.remove_lease_owner(shard)
        Kcl.logger.info("Finish to consume shard at shard_id: #{shard_id}")
      end
    end
  end
  threads.each(&:join)
end
shutdown(signal = :NONE) click to toggle source

Shutdown gracefully

# File lib/kcl/worker.rb, line 43
def shutdown(signal = :NONE)
  unless @timer.nil?
    @timer.cancel
    @timer = nil
  end
  EM.stop

  Kcl.logger.info("Shutdown worker with signal #{signal} at #{object_id}")
rescue => e
  Kcl.logger.error("#{e.class}: #{e.message}")
  raise e
end
start() click to toggle source

Start consuming data from the stream, and pass it to the application record processors.

# File lib/kcl/worker.rb, line 23
def start
  Kcl.logger.info("Start worker at #{object_id}")

  EM.run do
    trap_signals

    @timer = EM::PeriodicTimer.new(PROCESS_INTERVAL) do
      sync_shards!
      consume_shards! if available_lease_shard?
    end
  end

  cleanup
  Kcl.logger.info("Finish worker at #{object_id}")
rescue => e
  Kcl.logger.error("#{e.class}: #{e.message}")
  raise e
end
sync_shards!() click to toggle source

Add new shards and delete unused shards

# File lib/kcl/worker.rb, line 65
def sync_shards!
  @live_shards.transform_values! { |_| false }

  kinesis.shards.each do |shard|
    @live_shards[shard.shard_id] = true
    next if @shards[shard.shard_id]
    @shards[shard.shard_id] = Kcl::Workers::ShardInfo.new(
      shard.shard_id,
      shard.parent_shard_id,
      shard.sequence_number_range
    )
    Kcl.logger.info("Found new shard at shard_id: #{shard.shard_id}")
  end

  @live_shards.each do |shard_id, alive|
    next if alive
    checkpointer.remove_lease(@shards[shard_id])
    @shards.delete(shard_id)
    Kcl.logger.info("Remove shard at shard_id: #{shard_id}")
  end

  @shards
end

Private Instance Methods

checkpointer() click to toggle source
# File lib/kcl/worker.rb, line 144
def checkpointer
  if @checkpointer.nil?
    @checkpointer = Kcl::Checkpointer.new(Kcl.config)
    Kcl.logger.info('Created Checkpoint in worker')
  end
  @checkpointer
end
kinesis() click to toggle source
# File lib/kcl/worker.rb, line 136
def kinesis
  if @kinesis.nil?
    @kinesis = Kcl::Proxies::KinesisProxy.new(Kcl.config)
    Kcl.logger.info('Created Kinesis session in worker')
  end
  @kinesis
end
trap_signals() click to toggle source
# File lib/kcl/worker.rb, line 152
def trap_signals
  [:HUP, :INT, :TERM].each do |signal|
    trap signal do
      EM.add_timer(0) { shutdown(signal) }
    end
  end
end