module OceanDynamo::Queries

Public Instance Methods

all(consistent: false, **options) click to toggle source

Returns all records in the table.

# File lib/ocean-dynamo/queries.rb, line 47
def all(consistent: false, **options)
  _late_connect?
  records = []
  in_batches :scan, { consistent_read: !!consistent } do |attrs|
    records << new._setup_from_dynamo(attrs)
  end
  records
end
condition_builder(hash_key, hash_value, range_key=nil, comparator=nil, range_value=nil, limit: nil, consistent: false, scan_index_forward: true, select: nil) click to toggle source

This helper constructs the options hash for a subsequent call to in_batches and friends. It takes care of creating expression attributes names and values for all data and takes parameters to control scan direction and limits, etc.

# File lib/ocean-dynamo/queries.rb, line 121
def condition_builder(hash_key, hash_value,
                      range_key=nil, comparator=nil, range_value=nil,
                      limit: nil, consistent: false, scan_index_forward: true,
                      select: nil)
  if range_key
    options = { 
      expression_attribute_names: { "#H" => hash_key, "#R" => range_key },
      key_condition_expression: "#H = :hashval AND #R #{comparator} :rangeval",
      expression_attribute_values: { ":hashval" => hash_value, ":rangeval" => range_value }
    }
  else
    options = { 
      expression_attribute_names: { "#H" => hash_key },
      key_condition_expression: "#H = :hashval",
      expression_attribute_values: { ":hashval" => hash_value }
    }
  end
  options[:limit] = limit if limit
  options[:consistent_read] = consistent if consistent
  options[:scan_index_forward] = scan_index_forward if !scan_index_forward
  options[:select] = select.to_s.upcase if select
  options
end
count(**options) click to toggle source

The number of records in the table. Updated every 6 hours or so; thus isn't a reliable real-time measure of the number of table items.

# File lib/ocean-dynamo/queries.rb, line 38
def count(**options)
  _late_connect?
  dynamo_table.item_count
end
find(hash, range=nil, consistent: false) click to toggle source

Class methods

# File lib/ocean-dynamo/queries.rb, line 10
def find(hash, range=nil, consistent: false)
  return hash.collect {|elem| find elem, range, consistent: consistent } if hash.is_a?(Array)
  _late_connect?
  hash = hash.id if hash.kind_of?(Table)    # TODO: We have (innocuous) leakage, fix!
  range = range.to_i if range.is_a?(Time)
  keys = { table_hash_key.to_s => hash }
  keys[table_range_key] = range if table_range_key && range
  options = { key: keys, consistent_read: consistent }
  item = dynamo_table.get_item(options).item
  unless item
    raise RecordNotFound, "can't find a #{self} with primary key ['#{hash}', #{range.inspect}]" 
  end
  new._setup_from_dynamo(item)
end
find_by_id(*args)
Alias for: find_by_key
find_by_key(*args) click to toggle source
# File lib/ocean-dynamo/queries.rb, line 26
def find_by_key(*args)
  find(*args)
rescue RecordNotFound
  nil
end
Also aliased as: find_by_id
find_each(consistent: false, limit: nil, batch_size: nil) { |_setup_from_dynamo| ... } click to toggle source

Looping through a collection of records from the database (using the all method, for example) is very inefficient since it will try to instantiate all the objects at once. Batch processing methods allow you to work with the records in batches, thereby greatly reducing memory consumption.

TODO: Add support for

index_name: "IndexName",
select: "ALL_ATTRIBUTES", # ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, SPECIFIC_ATTRIBUTES, COUNT
# File lib/ocean-dynamo/queries.rb, line 89
def find_each(consistent: false, limit: nil, batch_size: nil)
  options = { consistent_read: consistent }
  batch_size = limit if limit && batch_size && limit < batch_size
  options[:limit] = batch_size if batch_size   
  in_batches :scan, options do |attrs|
    if limit
      return true if limit <= 0
      limit = limit - 1
    end
    yield new._setup_from_dynamo(attrs)
  end
end
find_global(*args) click to toggle source

This method takes the same args as find_global_each but returns all found items as an array.

# File lib/ocean-dynamo/queries.rb, line 197
def find_global(*args)
  result = []
  find_global_each(*args) do |item|
    result << item
  end
  result
end
find_global_each(hash_key, hash_value, range_key=nil, comparator=nil, range_value=nil, limit: nil, scan_index_forward: true) { |_setup_from_dynamo| ... } click to toggle source

This method finds each item of a global secondary index, sequentially yielding each item to the given block (required). The parameters are as follows:

hash_key The name of the hash key to use (required). hash_value The value of the hash key to match (required). range_key The name of the range key to use (optional). comparator The comparator to use. “=”, “<”, “>”, “<=”, “>=”. (optional). range-value The value of the range key to match (optional).

Note that range_key is optional, but if it's present, then the comparator and the range_value must also be given. They must either all be present or absent.

The following keyword arguments are accepted:

:limit The maximum number of items to read. :scan_index_forward If false, items will be in reverse order.

If the index contains all attributes, no extra read will be performed. If it doesn't, the entire item will be fetched using an extra read operation.

# File lib/ocean-dynamo/queries.rb, line 167
def find_global_each(hash_key, hash_value,
                     range_key=nil, comparator=nil, range_value=nil,
                     limit: nil, scan_index_forward: true,
                     &block)
  hash_value = hash_value.to_i if hash_value.is_a?(Time)
  range_value = range_value.to_i if range_value.is_a?(Time)
  options = condition_builder(hash_key, hash_value, range_key, comparator, range_value,
                              limit: limit, scan_index_forward: scan_index_forward)
  index_name = (range_key ? "#{hash_key}_#{range_key}" : hash_key.to_s) + "_global"
  options[:index_name] = index_name
  raise "Undefined global index: #{index_name}" unless global_secondary_indexes[index_name]
  all_projected = global_secondary_indexes[index_name]["projection_type"] == "ALL"
  in_batches :query, options do |attrs|
    if limit
      return if limit <= 0
      limit = limit - 1
    end
    if all_projected
      yield new._setup_from_dynamo(attrs)
    else
      yield find(attrs[table_hash_key.to_s], table_range_key && attrs[table_range_key.to_s])
    end
  end
end
find_local(*args) click to toggle source

This method takes the same args as find_local_each but returns all found items as an array.

# File lib/ocean-dynamo/queries.rb, line 252
def find_local(*args)
  result = []
  find_local_each(*args) do |item|
    result << item
  end
  result
end
find_local_each(hash_key, hash_value, range_key, comparator, range_value, limit: nil, scan_index_forward: true, consistent: false) { |_setup_from_dynamo| ... } click to toggle source

This method finds each item of a local secondary index, sequentially yielding each item to the given block (required). The parameters are as follows:

hash_key The name of the hash key to use (required, must be the table's hash_key). hash_value The value of the hash key to match (required). range_key The name of the range key to use (required). comparator The comparator to use. “=”, “<”, “>”, “<=”, “>=”. (required). range-value The value of the range key to match (required).

Note that range_key must all be present.

The following keyword arguments are accepted:

:limit The maximum number of items to read. :scan_index_forward If false, items will be in reverse order. :consistent If true, consistent reads will be used. Default false.

# File lib/ocean-dynamo/queries.rb, line 224
def find_local_each(hash_key, hash_value,
                    range_key, comparator, range_value,
                    limit: nil, scan_index_forward: true, consistent: false,
                    &block)
  raise "The hash_key is #{hash_key.inspect} but must be #{table_hash_key.inspect}" unless hash_key == table_hash_key
  hash_value = hash_value.to_i if hash_value.is_a?(Time)
  range_value = range_value.to_i if range_value.is_a?(Time)
  options = condition_builder(hash_key, hash_value, range_key, comparator, range_value,
                              select: :all_attributes,
                              limit: limit, scan_index_forward: scan_index_forward,
                              consistent: consistent)
  index_name = range_key.to_s
  options[:index_name] = index_name
  raise "Undefined local index: #{index_name}" unless local_secondary_indexes.include?(index_name)
  in_batches :query, options do |attrs|
    if limit
      return if limit <= 0
      limit = limit - 1
    end
    yield new._setup_from_dynamo(attrs)
  end
end
in_batches(message, options) { |hash| ... } click to toggle source

This method takes a block and yields it to every record in a table. message must be either :scan or :query. options is the hash of options to pass to the scan or query operation.

TODO: Add support for

index_name: "IndexName",
select: "ALL_ATTRIBUTES", # ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, SPECIFIC_ATTRIBUTES, COUNT
# File lib/ocean-dynamo/queries.rb, line 66
def in_batches(message, options, &block)
  _late_connect?
  loop do
    result = dynamo_table.send message, options
    result.items.each do |hash|
      yield hash
    end
    return true unless result.last_evaluated_key
    options[:exclusive_start_key] = result.last_evaluated_key
  end
end