class Kcl::Checkpointer
Constants
- DYNAMO_DB_CHECKPOINT_SEQUENCE_NUMBER_KEY
- DYNAMO_DB_LEASE_OWNER_KEY
- DYNAMO_DB_LEASE_PRIMARY_KEY
- DYNAMO_DB_LEASE_TIMEOUT_KEY
- DYNAMO_DB_PARENT_SHARD_KEY
Attributes
dynamodb[R]
Public Class Methods
new(config)
click to toggle source
@param [Kcl::Config] config
# File lib/kcl/checkpointer.rb, line 13 def initialize(config) @dynamodb = Kcl::Proxies::DynamoDbProxy.new(config) @table_name = config.dynamodb_table_name return if @dynamodb.exists?(@table_name) @dynamodb.create_table( @table_name, [{ attribute_name: DYNAMO_DB_LEASE_PRIMARY_KEY, attribute_type: 'S' }], [{ attribute_name: DYNAMO_DB_LEASE_PRIMARY_KEY, key_type: 'HASH' }], { read_capacity_units: config.dynamodb_read_capacity, write_capacity_units: config.dynamodb_write_capacity } ) Kcl.logger.info("Created DynamoDB table: #{@table_name}") end
Public Instance Methods
fetch_checkpoint(shard)
click to toggle source
Retrieves the checkpoint for the given shard @params [Kcl::Workers::ShardInfo] shard @return [Kcl::Workers::ShardInfo]
# File lib/kcl/checkpointer.rb, line 39 def fetch_checkpoint(shard) checkpoint = @dynamodb.get_item( @table_name, { "#{DYNAMO_DB_LEASE_PRIMARY_KEY}" => shard.shard_id } ) return shard if checkpoint.nil? if checkpoint[DYNAMO_DB_CHECKPOINT_SEQUENCE_NUMBER_KEY] shard.checkpoint = checkpoint[DYNAMO_DB_CHECKPOINT_SEQUENCE_NUMBER_KEY] end if checkpoint[DYNAMO_DB_LEASE_OWNER_KEY] shard.assigned_to = checkpoint[DYNAMO_DB_LEASE_OWNER_KEY] end Kcl.logger.info("Retrieves checkpoint of shard at #{shard.to_h}") shard end
lease(shard, next_assigned_to)
click to toggle source
Attempt to gain a lock on the given shard @params [Kcl::Workers::ShardInfo] shard @params [String] next_assigned_to @return [Kcl::Workers::ShardInfo]
# File lib/kcl/checkpointer.rb, line 85 def lease(shard, next_assigned_to) now = Time.now.utc next_lease_timeout = now + Kcl.config.dynamodb_failover_seconds checkpoint = @dynamodb.get_item( @table_name, { "#{DYNAMO_DB_LEASE_PRIMARY_KEY}" => shard.shard_id } ) assigned_to = checkpoint && checkpoint[DYNAMO_DB_LEASE_OWNER_KEY] lease_timeout = checkpoint && checkpoint[DYNAMO_DB_LEASE_TIMEOUT_KEY] if assigned_to && lease_timeout if now > Time.parse(lease_timeout) && assigned_to != next_assigned_to raise Kcl::Errors::LeaseNotAquiredError end condition_expression = 'shard_id = :shard_id AND assigned_to = :assigned_to AND lease_timeout = :lease_timeout' expression_attributes = { ':shard_id' => shard.shard_id, ':assigned_to' => assigned_to, ':lease_timeout' => lease_timeout } Kcl.logger.info("Attempting to get a lock for shard: #{shard.to_h}") else condition_expression = 'attribute_not_exists(assigned_to)' expression_attributes = nil end item = { "#{DYNAMO_DB_LEASE_PRIMARY_KEY}" => shard.shard_id, "#{DYNAMO_DB_LEASE_OWNER_KEY}" => next_assigned_to, "#{DYNAMO_DB_LEASE_TIMEOUT_KEY}" => next_lease_timeout.to_s } if shard.checkpoint != '' item[DYNAMO_DB_CHECKPOINT_SEQUENCE_NUMBER_KEY] = shard.checkpoint end if shard.parent_shard_id > 0 item[DYNAMO_DB_PARENT_SHARD_KEY] = shard.parent_shard_id end result = @dynamodb.conditional_update_item( @table_name, item, condition_expression, expression_attributes ) if result shard.assigned_to = next_assigned_to shard.lease_timeout = next_lease_timeout Kcl.logger.info("Get lease for shard at #{shard.to_h}") else Kcl.logger.info("Failed to get lease for shard at #{shard.to_h}") end shard end
remove_lease(shard)
click to toggle source
Remove the shard entry @params [Kcl::Workers::ShardInfo] shard @return [Kcl::Workers::ShardInfo]
# File lib/kcl/checkpointer.rb, line 144 def remove_lease(shard) result = @dynamodb.remove_item( @table_name, { "#{DYNAMO_DB_LEASE_PRIMARY_KEY}" => shard.shard_id } ) if result shard.assigned_to = nil shard.checkpoint = nil shard.lease_timeout = nil Kcl.logger.info("Remove lease for shard at #{shard.to_h}") else Kcl.logger.info("Failed to remove lease for shard at #{shard.to_h}") end shard end
remove_lease_owner(shard)
click to toggle source
Remove lease owner for the shard entry @params [Kcl::Workers::ShardInfo] shard @return [Kcl::Workers::ShardInfo]
# File lib/kcl/checkpointer.rb, line 164 def remove_lease_owner(shard) result = @dynamodb.update_item( @table_name, { "#{DYNAMO_DB_LEASE_PRIMARY_KEY}" => shard.shard_id }, "remove #{DYNAMO_DB_LEASE_OWNER_KEY}" ) if result shard.assigned_to = nil Kcl.logger.info("Remove lease owner for shard at #{shard.to_h}") else Kcl.logger.info("Failed to remove lease owner for shard at #{shard.to_h}") end shard end
update_checkpoint(shard)
click to toggle source
Write the checkpoint for the given shard @params [Kcl::Workers::ShardInfo] shard @return [Kcl::Workers::ShardInfo]
# File lib/kcl/checkpointer.rb, line 60 def update_checkpoint(shard) item = { "#{DYNAMO_DB_LEASE_PRIMARY_KEY}" => shard.shard_id, "#{DYNAMO_DB_CHECKPOINT_SEQUENCE_NUMBER_KEY}" => shard.checkpoint, "#{DYNAMO_DB_LEASE_OWNER_KEY}" => shard.assigned_to, "#{DYNAMO_DB_LEASE_TIMEOUT_KEY}" => shard.lease_timeout.to_s } if shard.parent_shard_id > 0 item[DYNAMO_DB_PARENT_SHARD_KEY] = shard.parent_shard_id end result = @dynamodb.put_item(@table_name, item) if result Kcl.logger.info("Write checkpoint of shard at #{shard.to_h}") else Kcl.logger.info("Failed to write checkpoint for shard at #{shard.to_h}") end shard end