module Dynamini::BatchOperations

Public Instance Methods

batch_delete(ids) click to toggle source
# File lib/dynamini/batch_operations.rb, line 30
def batch_delete(ids)
  deletes = ids.map{ |id| { delete_request: { key: { hash_key => id } } } }

  deletes.each_slice(25) do |slice|
    client.batch_write_item(request_items: { table_name => slice })
  end
end
batch_find(ids = []) click to toggle source
# File lib/dynamini/batch_operations.rb, line 17
def batch_find(ids = [])
  return OpenStruct.new(found: [], not_found: []) if ids.length < 1
  objects = []
  key_structure = ids.map { |i| {hash_key => i} }
  key_structure.each_slice(100) do |keys|
    response = dynamo_batch_get(keys)
    response.responses[table_name].each do |item|
      objects << new(item.symbolize_keys, false)
    end
  end
  OpenStruct.new(found: objects, not_found: ids - objects.map(&hash_key))
end
import(models, options = {}) click to toggle source
# File lib/dynamini/batch_operations.rb, line 6
def import(models, options = {})
  # Max batch size is 25, per Dynamo BatchWriteItem docs

  models.each_slice(25) do |batch|
    batch.each do |model|
      model.send(:generate_timestamps!) unless options[:skip_timestamps]
    end
    dynamo_batch_save(batch)
  end
end
scan(options = {}) click to toggle source
# File lib/dynamini/batch_operations.rb, line 38
def scan(options = {})
  validate_scan_options(options)
  response = dynamo_scan(options)
  OpenStruct.new(
    items: response.items.map { |i| new(i.symbolize_keys, false) },
    last_evaluated_key: response.last_evaluated_key
  )
end

Private Instance Methods

dynamo_batch_get(key_struct) click to toggle source
# File lib/dynamini/batch_operations.rb, line 49
def dynamo_batch_get(key_struct)
  client.batch_get_item(
    request_items: {
      table_name => {keys: key_struct}
    }
  )
end
dynamo_batch_save(model_array) click to toggle source
# File lib/dynamini/batch_operations.rb, line 57
def dynamo_batch_save(model_array)
  put_requests = model_array.map do |model|
    {
      put_request: {
        item: model.attributes.reject { |_k, v| v.blank? }.stringify_keys
      }
    }
  end
  request_options = {
    request_items: {table_name => put_requests}
  }
  client.batch_write_item(request_options)
end
dynamo_scan(options) click to toggle source
# File lib/dynamini/batch_operations.rb, line 71
def dynamo_scan(options)
  if options[:start_key] && !options[:start_key].is_a?(Hash)
    if options[:index_name]
      attr = secondary_index[options[:index_name]][:hash_key_name].to_s
      start_key = { attr => options[:start_key] }
    else
      start_key = { hash_key.to_s => options[:start_key] }
    end
  else
    start_key = options[:start_key]
  end
  client.scan({
    consistent_read:      options[:consistent_read],
    exclusive_start_key:  start_key,
    index_name:           options[:index_name],
    limit:                options[:limit],
    segment:              options[:segment],
    total_segments:       options[:total_segments],
    table_name:           table_name
  }.select { |_, v| !v.nil? })
end
validate_scan_options(options) click to toggle source
# File lib/dynamini/batch_operations.rb, line 93
def validate_scan_options(options)
  if options[:total_segments] && !options[:segment]
    raise ArgumentError, 'Must specify segment if specifying total_segments'
  elsif options[:segment] && !options[:total_segments]
    raise ArgumentError, 'Must specify total_segments if specifying segment'
  elsif options[:index_name] && (!self.secondary_index || !self.secondary_index[options[:index_name]])
    raise ArgumentError, "Secondary index of #{options[:index_name]} does not exist"
  end
end