class Awful::DynamoDB

Constants

COLORS

Public Instance Methods

all_matching_tables(name) click to toggle source

return array of tables names matching name

# File lib/awful/dynamodb.rb, line 25
def all_matching_tables(name)
  tables = []
  last_evaluated = nil
  loop do # get 100 at a time from sdk
    response = dynamodb.list_tables(exclusive_start_table_name: last_evaluated)
    matching = response.table_names.select do |table|
      table.match(name)
    end
    tables = tables + matching
    last_evaluated = response.last_evaluated_table_name
    break unless last_evaluated
  end
  tables
end
batch_write(name) click to toggle source
# File lib/awful/dynamodb.rb, line 372
def batch_write(name)
  items = (1..25).map do |n|
    {
      put_request: {
        item: {
          "store_id"     => "store#{n}",
          "object_id"    => "object#{n}",
          "object_value" => "value#{n}"
        }
      }
    }
  end
  p items
  r = dynamodb.batch_write_item(request_items: {name => items})
  p r
end
color(string) click to toggle source
# File lib/awful/dynamodb.rb, line 20
def color(string)
  set_color(string, COLORS.fetch(string.to_sym, :yellow))
end
copy(src, dst) click to toggle source
# File lib/awful/dynamodb.rb, line 232
def copy(src, dst)
  src_table, src_region = src.split('/').reverse # parse region/table into [table, region]
  dst_table, dst_region = dst.split('/').reverse

  ## clients are potentially for different regions
  src_client = Aws::DynamoDB::Client.new({region: src_region}.reject{|_,v| v.nil?})
  dst_client = Aws::DynamoDB::Client.new({region: dst_region}.reject{|_,v| v.nil?})

  ## params for put_item call
  params = {table_name: dst_table}

  ## add condition not to overwrite existing primary keys (hash or composite hash AND range)
  if options[:no_clobber]
    keys = dst_client.describe_table(table_name: dst_table).table.key_schema.map(&:attribute_name)
    params.merge!(condition_expression: keys.map{|key| "attribute_not_exists(#{key})"}.join(' AND '))
  end

  ## lame progress indicator, pass true for put, false for skip
  dots = options[:dots] ? ->(x){print x ? '.' : 'x'} : ->(_){}

  ## loop on each batch of scanned items
  exclusive_start_key = nil
  loop do
    r = src_client.scan(table_name: src_table, exclusive_start_key: exclusive_start_key, return_consumed_capacity: 'INDEXES')
    puts "[#{Time.now}] [#{src_table}] scanned:#{r.count} key:#{r.last_evaluated_key || 'nil'}"

    ## loop items and put to destination
    put = skipped = 0
    r.items.each do |item|
      begin
        dst_client.put_item(params.merge(item: item))
        put += 1
        dots.call(true)
      rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException #item key exists
        skipped += 1
        dots.call(false)
      end
    end

    print "\n" if options[:dots]
    puts "[#{Time.now}] [#{dst_table}] put:#{put} skipped:#{skipped}"

    ## loop if there are more keys to scan
    exclusive_start_key = r.last_evaluated_key
    break unless exclusive_start_key
  end
end
create_table(name, file = nil) click to toggle source
# File lib/awful/dynamodb.rb, line 87
def create_table(name, file = nil)
  opt = load_cfg(options, file)
  params = only_keys_matching(opt, %i[attribute_definitions key_schema])
  params[:table_name] = name
  params[:provisioned_throughput] = only_keys_matching(opt[:provisioned_throughput], %i[read_capacity_units write_capacity_units])

  ## scrub unwanted keys from LSIs
  if opt.has_key?(:local_secondary_indexes)
    params[:local_secondary_indexes] = opt[:local_secondary_indexes].map do |lsi|
      only_keys_matching(lsi, %i[index_name key_schema projection])
    end
  end

  ## scrub unwanted keys from GSIs
  if opt.has_key?(:global_secondary_indexes)
    params[:global_secondary_indexes] = opt[:global_secondary_indexes].map do |gsi|
      only_keys_matching(gsi, %i[index_name key_schema projection]).output do |g|
        if gsi[:provisioned_throughput]
          g[:provisioned_throughput] = only_keys_matching(gsi[:provisioned_throughput], %i[read_capacity_units write_capacity_units])
        end
      end
    end
  end

  dynamodb.create_table(params)
end
delete_table(name) click to toggle source
# File lib/awful/dynamodb.rb, line 219
def delete_table(name)
  confirmation = ask("to delete #{name} and all its data, type the name of table to delete:", :yellow)
  if confirmation == name
    say("deleting table #{name}")
    dynamodb.delete_table(table_name: name)
  else
    say("confirmation failed for #{name}", :red)
  end
end
dump(name) click to toggle source
# File lib/awful/dynamodb.rb, line 66
def dump(name)
  all_matching_tables(name).map do |table_name|
    dynamodb.describe_table(table_name: table_name).table.to_hash.output do |table|
      puts YAML.dump(stringify_keys(table))
    end
  end
end
enable_streams(name) click to toggle source
# File lib/awful/dynamodb.rb, line 212
def enable_streams(name)
  stream_specification = {stream_enabled: !options[:disable]}
  stream_specification.merge!(stream_view_type: options[:stream_view_type].upcase) unless options[:disable]
  dynamodb.update_table(table_name: name, stream_specification: stream_specification)
end
get_arn(name) click to toggle source
# File lib/awful/dynamodb.rb, line 40
def get_arn(name)
  if name.start_with?('arn:aws:dynamodb:') # already an arn
    name
  else
    dynamodb.describe_table(table_name: name).table.table_arn
  end
end
gsi(name) click to toggle source
# File lib/awful/dynamodb.rb, line 117
def gsi(name)
  (dynamodb.describe_table(table_name: name).table.global_secondary_indexes || []).output do |list|
    if options[:long]
      print_table list.map { |i|
        hash  = i.key_schema.find{ |k| k.key_type == 'HASH' }&.attribute_name
        range = i.key_schema.find{ |k| k.key_type == 'RANGE' }&.attribute_name
        [i.index_name, color(i.index_status), hash, range, i.projection.projection_type,
         i.provisioned_throughput.read_capacity_units, i.provisioned_throughput.write_capacity_units,
         i.index_size_bytes, i.item_count]
      }.sort
    elsif options[:arn]
      puts list.map(&:index_arn).sort
    else
      puts list.map(&:index_name).sort
    end
  end
end
keys(name) click to toggle source
# File lib/awful/dynamodb.rb, line 80
def keys(name)
  dynamodb.describe_table(table_name: name).table.key_schema.each_with_object({}) do |schema, h|
    h[schema.key_type.downcase.to_sym] = schema.attribute_name
  end.output(&method(:print_table))
end
ls(name = /./) click to toggle source
# File lib/awful/dynamodb.rb, line 51
def ls(name = /./)
  tables = all_matching_tables(name)

  if options[:long]
    tables.map do |table|
      dynamodb.describe_table(table_name: table).table
    end.output do |list|
      print_table list.map { |t| [ t.table_name, color(t.table_status), t.item_count, t.table_size_bytes, t.creation_date_time ] }
    end
  else
    tables.output(&method(:puts))
  end
end
lsi(name) click to toggle source
# File lib/awful/dynamodb.rb, line 138
def lsi(name)
  (dynamodb.describe_table(table_name: name).table.local_secondary_indexes || []).output do |list|
    if options[:long]
      print_table list.map { |i|
        hash  = i.key_schema.find{ |k| k.key_type == 'HASH' }&.attribute_name
        range = i.key_schema.find{ |k| k.key_type == 'RANGE' }&.attribute_name
        [i.index_name, hash, range, i.projection.projection_type, i.index_size_bytes, i.item_count]
      }.sort
    elsif options[:arn]
      puts list.map(&:index_arn).sort
    else
      puts list.map(&:index_name).sort
    end
  end
end
put_items(name, file = nil) click to toggle source
# File lib/awful/dynamodb.rb, line 342
def put_items(name, file = nil)
  params = {'TableName' => name}

  ## set a condition not to overwrite items with existing primary key(s)
  if options[:no_clobber]
    keys = dynamodb.describe_table(table_name: name).table.key_schema.map(&:attribute_name)
    params.merge!('ConditionExpression' => keys.map{|key| "attribute_not_exists(#{key})"}.join(' AND '))
  end

  ## input data
  io = (file and File.open(file)) || ((not $stdin.tty?) and $stdin)

  put_count = 0
  skip_count = 0
  io.each_line do |line|
    begin
      dynamodb_simple.put_item(params.merge('Item' => JSON.parse(line)))
      put_count += 1
    rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException #item key exists
      skip_count += 1
    end
  end

  ## return counts
  [put_count, skip_count].output do |put, skip|
    puts "put #{put} items, skipped #{skip} items"
  end
end
query(name, exclusive_start_key = nil) click to toggle source
# File lib/awful/dynamodb.rb, line 311
def query(name, exclusive_start_key = nil)
  fd = options[:output] ? File.open(options[:output], 'w') : $stdout.dup # open output file or stdout
  exclusive_start_key = nil
  count = 0
  condition = "#{options[:hash_key]} = :hash_key_value"
  condition += " and #{options[:range_key]} = :range_key_value" if options[:range_key]
  attributes = {
    ':hash_key_value'  => { S: options[:hash_key_value] },
    ':range_key_value' => { S: options[:range_key_value] },
  }.reject { |_,v| v[:S].nil? }
  loop do
    r = dynamodb_simple.query(
      'TableName'                 => name,
      'ExclusiveStartKey'         => exclusive_start_key,
      'Select'                    => options[:count] ? 'COUNT' : 'ALL_ATTRIBUTES',
      'KeyConditionExpression'    => condition,
      'ExpressionAttributeValues' => attributes,
    )
    count += r.fetch('Count', 0)
    r.fetch('Items', []).each do |item|
      fd.puts JSON.generate(item)
    end
    exclusive_start_key = r['LastEvaluatedKey']
    break unless exclusive_start_key
  end
  fd.close
  puts count if options[:count]
end
scan(name, exclusive_start_key = nil) click to toggle source
# File lib/awful/dynamodb.rb, line 283
def scan(name, exclusive_start_key = nil)
  fd = options[:output] ? File.open(options[:output], 'w') : $stdout.dup # open output file or stdout
  exclusive_start_key = nil
  count = 0
  loop do
    r = dynamodb_simple.scan(
      'TableName'         => name,
      'Select'            => options[:count] ? 'COUNT' : 'ALL_ATTRIBUTES',
      'ExclusiveStartKey' => exclusive_start_key
    )
    count += r.fetch('Count', 0)
    r.fetch('Items', []).each do |item|
      fd.puts JSON.generate(item)
    end
    exclusive_start_key = r['LastEvaluatedKey']
    break unless exclusive_start_key
  end
  fd.close
  puts count if options[:count]
end
status(name) click to toggle source
# File lib/awful/dynamodb.rb, line 75
def status(name)
  dynamodb.describe_table(table_name: name).table.table_status.output(&method(:puts))
end
tag(name, *tags) click to toggle source
# File lib/awful/dynamodb.rb, line 399
def tag(name, *tags)
  dynamodb.tag_resource(
    resource_arn: get_arn(name),
    tags: tags.map { |t|
      k, v = t.split(/[=:]/)
      {key: k, value: v}
    }
  )
end
tags(name) click to toggle source
# File lib/awful/dynamodb.rb, line 390
def tags(name)
  paginate(:tags) do |token|
    dynamodb.list_tags_of_resource(resource_arn: get_arn(name), next_token: token)
  end.output do |tags|
    print_table tags.map { |t| [t.key, t.value] }
  end
end
throughput(name) click to toggle source
# File lib/awful/dynamodb.rb, line 160
def throughput(name)
  table = dynamodb.describe_table(table_name: name).table

  ## current is hash of current provisioned throughput
  current = table.provisioned_throughput.to_h

  ## loop-safe version of GSIs (in case nil)
  global_secondary_indexes = table.global_secondary_indexes || []

  ## get throughput for each GSI
  global_secondary_indexes.each do |gsi|
    current[gsi.index_name] = gsi.provisioned_throughput.to_h
  end

  ## if no updates requested, just print throughput and return table details
  unless options[:read_capacity_units] or options[:write_capacity_units]
    puts YAML.dump(stringify_keys(current))
    return table
  end

  ## parameters for update request
  params = { table_name: name }

  ## add table throughput unless told not to
  params[:provisioned_throughput] = {
    read_capacity_units:  options[:read_capacity_units]  || current[:read_capacity_units],
    write_capacity_units: options[:write_capacity_units] || current[:write_capacity_units]
  } if options[:table]

  ## list of requested GSIs, or all for this table
  gsis = options[:gsi]
  gsis = global_secondary_indexes.map(&:index_name) if options[:all]
  params[:global_secondary_index_updates] = gsis.map do |gsi|
    {
      update: {
        index_name: gsi,
        provisioned_throughput: {
          read_capacity_units:  options[:read_capacity_units]  || current[gsi][:read_capacity_units],
          write_capacity_units: options[:write_capacity_units] || current[gsi][:write_capacity_units]
        }
      }
    }
  end

  ## make the update request
  params.reject! { |_,v| v.empty? } # sdk hates empty global_secondary_index_updates
  dynamodb.update_table(params)
end
ttl(name, attribute = nil) click to toggle source
# File lib/awful/dynamodb.rb, line 411
def ttl(name, attribute = nil)
  if attribute
    dynamodb.update_time_to_live(
      table_name: name,
      time_to_live_specification: {
        enabled: !options[:disable],
        attribute_name: attribute,
      }
    )
  else
    dynamodb.describe_time_to_live(table_name: name).time_to_live_description.output do |t|
      puts YAML.dump(stringify_keys(t.to_hash))
    end
  end
end