class DynaModel::Table

Constants

COMPARISON_OPERATOR
COMPARISON_OPERATOR_SCAN_ONLY
CONDITIONAL_OPERATOR
QUERY_SELECT
RETURNED_CONSUMED_CAPACITY
RETURN_VALUES
RETURN_VALUES_UPDATE_ONLY
TYPE_INDICATOR

Attributes

client[R]
hash_key[R]
range_keys[R]
schema_loaded_from_dynamo[R]
table_schema[R]

Public Class Methods

attr_with_type(attr_name, value) click to toggle source
# File lib/dyna_model/table.rb, line 83
def self.attr_with_type(attr_name, value)
  { attr_name => { TYPE_INDICATOR[type_from_value(value)] => value.to_s } }
end
new(model) click to toggle source
# File lib/dyna_model/table.rb, line 87
def initialize(model)
  @model = model
  @table_schema = model.table_schema
  self.load_schema
  self.validate_key_schema
end
type_from_value(value) click to toggle source
# File lib/dyna_model/table.rb, line 73
def self.type_from_value(value)
  case
  when value.kind_of?(AWS::DynamoDB::Binary) then :b
  when value.respond_to?(:to_str) then :s
  when value.kind_of?(Numeric) then :n
  else
    raise ArgumentError, "unsupported attribute type #{value.class}"
  end
end

Public Instance Methods

batch_get_item(keys, options={}) click to toggle source
# File lib/dyna_model/table.rb, line 292
def batch_get_item(keys, options={})
  options[:return_consumed_capacity] ||= :none
  options[:select] ||= [] # no :projected option, always an array or :all
  options[:consistent_read] = false unless options[:consistent_read]

  raise ArgumentError, "must include between 1 - 100 keys" if keys.size == 0 || keys.size > 100
  keys_request = []
  keys.each do |k|
    key_request = {}
    if @primary_range_key
      hash_value, range_value = k.split(@model.guid_delimiter)
    else
      hash_value = k
    end
    key_request[@hash_key[:attribute_name]] = { @hash_key[:attribute_type] => hash_value.to_s }
    if @primary_range_key
      raise ArgumentError, "every key must include a range_value" if range_value.blank?
      key_request[@primary_range_key[:attribute_name]] = { @primary_range_key[:attribute_type] => range_value.to_s }
    end
    keys_request << key_request
  end

  request_items_request = {}
  request_items_request.merge!( keys: keys_request )
  if options[:select].blank?
    options[:select] = :all # for obj_from_attrs
  else
    request_items_request.merge!( attributes_to_get: [options[:select]].flatten )
  end
  request_items_request.merge!( consistent_read: options[:consistent_read] ) if options[:consistent_read]
  batch_get_item_request = {
    request_items: { @model.dynamo_db_table_name(options[:shard_name]) => request_items_request },
    return_consumed_capacity: RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]]
  }
  @model.dynamo_db_client.batch_get_item(batch_get_item_request)
end
delete_item(options={}) click to toggle source
# File lib/dyna_model/table.rb, line 407
def delete_item(options={})
  raise ":delete_item => {...key_values...} required" unless options[:delete_item].present?
  options[:return_consumed_capacity] ||= :none
  options[:return_values] ||= :none
  raise ArgumentError, ":return_values must be one of (#{(RETURN_VALUES.keys - RETURN_VALUES_UPDATE_ONLY).join(", ")})" unless RETURN_VALUES[options[:return_values]] && !RETURN_VALUES_UPDATE_ONLY.include?(options[:return_values])
  key_request = {
    @hash_key[:attribute_name] => {
      @hash_key[:attribute_type] => options[:delete_item][:hash_value].to_s
    }
  }

  if @primary_range_key
    raise ArgumentError, "range_key was not provided to the delete_item command" if options[:delete_item][:range_value].blank?
    key_request.merge!({
      @primary_range_key[:attribute_name] => {
        @primary_range_key[:attribute_type] => options[:delete_item][:range_value].to_s
      }
    })
  end

  expected = {}
  conditional_operator = nil
  if options[:expected]
    raise ArgumentError, ":expected must be a hash" unless options[:expected].is_a?(Hash)
    options[:expected].each_pair do |k,v| 
      expected.merge!(self.attr_with_condition({ k => v}))
    end
    if options[:conditional_operator]
      raise ArgumentError, ":condition_operator invalid! Must be one of (#{CONDITIONAL_OPERATOR.keys.join(", ")})" unless CONDITIONAL_OPERATOR[options[:conditional_operator]]
      conditional_operator = CONDITIONAL_OPERATOR[options[:conditional_operator]]
    end
  end

  delete_item_request = {
    table_name: @model.dynamo_db_table_name(options[:shard_name]),
    key: key_request,
    return_consumed_capacity: RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]],
    return_values: RETURN_VALUES[options[:return_values]]
  }
  delete_item_request.merge!(expected: expected) unless expected.blank?
  delete_item_request.merge!(conditional_operator: conditional_operator) unless conditional_operator.blank? || expected.blank?
  @model.dynamo_db_client.delete_item(delete_item_request)
end
get_item(hash_key, options={}) click to toggle source
# File lib/dyna_model/table.rb, line 182
def get_item(hash_key, options={})
  options[:consistent_read] = false unless options[:consistent_read]
  options[:return_consumed_capacity] ||= :none # "NONE" # || "TOTAL"
  options[:select] ||= [] # no :projected option, always an array or :all
  raise ArgumentError, "Invalid :select. GetItem :select must be an Array (blank for :all)" unless options[:select].is_a?(Array)

  get_item_request = {
    table_name: @model.dynamo_db_table_name(options[:shard_name]),
    key: hash_key_item_param(hash_key),
    consistent_read: options[:consistent_read],
    return_consumed_capacity: RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]]
  }
  if options[:select].blank?
    options[:select] = :all # for obj_from_attrs
  else
    get_item_request.merge!( attributes_to_get: [options[:select]].flatten )
  end
  @model.dynamo_db_client.get_item(get_item_request)
end
hash_key_condition_param(hash_key, value) click to toggle source
# File lib/dyna_model/table.rb, line 172
def hash_key_condition_param(hash_key, value)
  hash_key_type = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == hash_key}[:attribute_type]
  {
    hash_key => {
      attribute_value_list: [hash_key_type => value.to_s],
      comparison_operator: COMPARISON_OPERATOR[:eq]
    }
  }
end
hash_key_item_param(value) click to toggle source
# File lib/dyna_model/table.rb, line 166
def hash_key_item_param(value)
  hash_key = @table_schema[:key_schema].find{|h| h[:key_type] == "HASH"}[:attribute_name]
  hash_key_type = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == hash_key}[:attribute_type]
  { hash_key => { hash_key_type => value.to_s } }
end
load_schema() click to toggle source
# File lib/dyna_model/table.rb, line 94
def load_schema
  @schema_loaded_from_dynamo = @model.describe_table

  @schema_loaded_from_dynamo[:table][:key_schema].each do |key|
    key_attr = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == key[:attribute_name]}
    next if key_attr.nil?
    key_schema_attr = {
      attribute_name: key[:attribute_name],
      attribute_type: key_attr[:attribute_type]
    }

    if key[:key_type] == "HASH"
      @hash_key = key_schema_attr
    else
      (@range_keys ||= []) << key_schema_attr.merge(:primary_range_key => true)
      @primary_range_key = key_schema_attr.merge(:primary_range_key => true)
    end
  end

  if @schema_loaded_from_dynamo[:table][:local_secondary_indexes] || @schema_loaded_from_dynamo[:table][:global_secondary_indexes]
    ((@schema_loaded_from_dynamo[:table][:local_secondary_indexes] || []) + (@schema_loaded_from_dynamo[:table][:global_secondary_indexes] || [])).each do |key|
      si_range_key = key[:key_schema].find{|h| h[:key_type] == "RANGE" }
      next if si_range_key.nil?
      si_range_attribute = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == si_range_key[:attribute_name]}
      next if si_range_attribute.nil?
      (@range_keys ||= []) << {
        attribute_name: si_range_key[:attribute_name],
        attribute_type: si_range_attribute[:attribute_type],
        index_name: key[:index_name]
      }
    end
  end

  @schema_loaded_from_dynamo
end
query(hash_value, options={}) click to toggle source

options

* consistent_read
* return_consumed_capacity
* order
* select
* range
# File lib/dyna_model/table.rb, line 208
def query(hash_value, options={})
  options[:consistent_read] = false unless options[:consistent_read]
  options[:return_consumed_capacity] ||= :none # "NONE" # || "TOTAL"
  options[:order] ||= :desc
  #options[:index_name] ||= :none
  #AWS::DynamoDB::Errors::ValidationException: ALL_PROJECTED_ATTRIBUTES can be used only when Querying using an IndexName
  #options[:limit] ||= 10
  #options[:exclusive_start_key]
  #options[:query_filter]

  key_conditions = {}
  gsi = nil
  if options[:global_secondary_index]
    gsi = @table_schema[:global_secondary_indexes].select{ |gsi| gsi[:index_name].to_s == options[:global_secondary_index].to_s}.first
    raise ArgumentError, "Could not find Global Secondary Index '#{options[:global_secondary_index]}'" unless gsi
    gsi_hash_key = gsi[:key_schema].find{|h| h[:key_type] == "HASH"}[:attribute_name]
    key_conditions.merge!(hash_key_condition_param(gsi_hash_key, hash_value))
  else
    hash_key = @table_schema[:key_schema].find{|h| h[:key_type] == "HASH"}[:attribute_name]
    key_conditions.merge!(hash_key_condition_param(hash_key, hash_value))
  end

  query_request = {
    table_name: @model.dynamo_db_table_name(options[:shard_name]),
    key_conditions: key_conditions,
    consistent_read: options[:consistent_read],
    return_consumed_capacity: RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]],
    scan_index_forward: (options[:order] == :asc)
  }

  if options[:range] 
    raise ArgumentError, "Table does not use a range key in its schema!" if @range_keys.blank?
    attr_with_condition_hash = self.attr_with_condition(options[:range])
    range_key = @range_keys.find{|k| k[:attribute_name] == attr_with_condition_hash.keys.first}
    raise ArgumentError, ":range key must be a valid Range attribute" unless range_key

    if range_key.has_key?(:index_name) # Local/Global Secondary Index
      options[:index_name] = range_key[:index_name]
      query_request[:index_name] = range_key[:index_name]
    end

    key_conditions.merge!(attr_with_condition_hash)
  end

  query_filter = {}
  conditional_operator = nil
  if options[:query_filter]
    raise ArgumentError, ":query_filter must be a hash" unless options[:query_filter].is_a?(Hash)
    options[:query_filter].each_pair do |k,v| 
      query_filter.merge!(self.attr_with_condition({ k => v}))
    end
    if options[:conditional_operator]
      raise ArgumentError, ":condition_operator invalid! Must be one of (#{CONDITIONAL_OPERATOR.keys.join(", ")})" unless CONDITIONAL_OPERATOR[options[:conditional_operator]]
      conditional_operator = CONDITIONAL_OPERATOR[options[:conditional_operator]]
    end
  end
  query_request.merge!(query_filter: query_filter) unless query_filter.blank?
  query_request.merge!(conditional_operator: conditional_operator) unless conditional_operator.blank? || query_filter.blank?

  if options[:global_secondary_index] # Override index_name if using GSI
    # You can only select projected attributes from a GSI
    options[:select] = :projected #if options[:select].blank?
    options[:index_name] = gsi[:index_name]
    query_request.merge!(index_name: gsi[:index_name])
  end
  options[:select] ||= :all # :all, :projected, :count, []
  if options[:select].is_a?(Array)
    attrs_to_select = [options[:select].map(&:to_s)].flatten
    attrs_to_select << @hash_key[:attribute_name]
    attrs_to_select << @primary_range_key[:attribute_name] if @primary_range_key
    query_request.merge!({
      select: QUERY_SELECT[:specific],
      attributes_to_get: attrs_to_select.uniq
    })
  else
    query_request.merge!({ select: QUERY_SELECT[options[:select]] })
  end
  
  query_request.merge!({ limit: options[:limit].to_i }) if options.has_key?(:limit)
  query_request.merge!({ exclusive_start_key: options[:exclusive_start_key] }) if options[:exclusive_start_key]

  @model.dynamo_db_client.query(query_request)
end
scan(options={}) click to toggle source

Perform a table scan docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html

# File lib/dyna_model/table.rb, line 453
def scan(options={})
  options[:return_consumed_capacity] ||= :none # "NONE" # || "TOTAL"
  # Default if not already set
  options[:select] ||= :all # :all, :projected, :count, []

  scan_request = {
    table_name: @model.dynamo_db_table_name(options[:shard_name]),
    return_consumed_capacity: RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]]
  }

  scan_request.merge!({ limit: options[:limit].to_i }) if options.has_key?(:limit)
  scan_request.merge!({ exclusive_start_key: options[:exclusive_start_key] }) if options[:exclusive_start_key]

  if options[:select].is_a?(Array)
    attrs_to_select = [options[:select].map(&:to_s)].flatten
    attrs_to_select << @hash_key[:attribute_name]
    attrs_to_select << @primary_range_key[:attribute_name] if @primary_range_key
    scan_request.merge!({
      select: QUERY_SELECT[:specific],
      attributes_to_get: attrs_to_select.uniq
    })
  else
    scan_request.merge!({ select: QUERY_SELECT[options[:select]] })
  end

  # :scan_filter => { :name.begins_with => "a" }
  scan_filter = {}
  conditional_operator = nil
  if options[:scan_filter].present?
    options[:scan_filter].each_pair.each do |k,v|
      scan_filter.merge!(self.attr_with_condition({ k => v}))
    end
  end
  if options[:conditional_operator]
    raise ArgumentError, ":condition_operator invalid! Must be one of (#{CONDITIONAL_OPERATOR.keys.join(", ")})" unless CONDITIONAL_OPERATOR[options[:conditional_operator]]
    conditional_operator = CONDITIONAL_OPERATOR[options[:conditional_operator]]
  end
  scan_request.merge!(scan_filter: scan_filter) unless scan_filter.blank?
  scan_request.merge!(conditional_operator: conditional_operator) unless conditional_operator.blank? || scan_filter.blank?
  scan_request.merge!(segment: options[:segment].to_i) if options[:segment].present?
  scan_request.merge!(total_segments: options[:total_segments].to_i) if options[:total_segments].present?

  @model.dynamo_db_client.scan(scan_request)
end
validate_key_schema() click to toggle source
# File lib/dyna_model/table.rb, line 130
def validate_key_schema
  if @schema_loaded_from_dynamo[:table][:key_schema].sort_by { |k| k[:key_type] } != @table_schema[:key_schema].sort_by { |k| k[:key_type] }
    raise ArgumentError, "It appears your key schema (Hash Key/Range Key) have changed from the table definition. Rebuilding the table is necessary."
  end

  if @schema_loaded_from_dynamo[:table][:attribute_definitions].sort_by { |k| k[:attribute_name] } != @table_schema[:attribute_definitions].sort_by { |k| k[:attribute_name] }
    raise ArgumentError, "It appears your attribute definition (types?) have changed from the table definition. Rebuilding the table is necessary."
  end

  index_keys_to_reject = [:index_status, :index_size_bytes, :item_count]

  if @schema_loaded_from_dynamo[:table][:local_secondary_indexes].blank? != @table_schema[:local_secondary_indexes].blank?
    raise ArgumentError, "It appears your local secondary indexes have changed from the table definition. Rebuilding the table is necessary."
  end
  
  if @schema_loaded_from_dynamo[:table][:local_secondary_indexes] && (@schema_loaded_from_dynamo[:table][:local_secondary_indexes].dup.collect {|i| i.delete_if{|k, v| index_keys_to_reject.include?(k) }; i }.sort_by { |lsi| lsi[:index_name] } != @table_schema[:local_secondary_indexes].sort_by { |lsi| lsi[:index_name] })
    raise ArgumentError, "It appears your local secondary indexes have changed from the table definition. Rebuilding the table is necessary."
  end

  if @schema_loaded_from_dynamo[:table][:global_secondary_indexes].blank? != @table_schema[:global_secondary_indexes].blank?
    raise ArgumentError, "It appears your global secondary indexes have changed from the table definition. Rebuilding the table is necessary."
  end

  if @schema_loaded_from_dynamo[:table][:global_secondary_indexes] && (@schema_loaded_from_dynamo[:table][:global_secondary_indexes].dup.collect {|i| i.delete_if{|k, v| index_keys_to_reject.include?(k) }; i }.sort_by { |gsi| gsi[:index_name] } != @table_schema[:global_secondary_indexes].sort_by { |gsi| gsi[:index_name] })
    raise ArgumentError, "It appears your global secondary indexes have changed from the table definition. Rebuilding the table is necessary."
  end

  if @schema_loaded_from_dynamo[:table][:provisioned_throughput][:read_capacity_units] != @table_schema[:provisioned_throughput][:read_capacity_units]
    DynaModel::Config.logger.error "read_capacity_units mismatch. Need to update table?"
  end

  if @schema_loaded_from_dynamo[:table][:provisioned_throughput][:write_capacity_units] != @table_schema[:provisioned_throughput][:write_capacity_units]
    DynaModel::Config.logger.error "write_capacity_units mismatch. Need to update table?"
  end
end
write(attributes, options={}) click to toggle source
# File lib/dyna_model/table.rb, line 329
def write(attributes, options={})
  options[:return_consumed_capacity] ||= :none
  options[:return_values] ||= :none
  options[:update_item] = false unless options[:update_item]

  expected = {}
  conditional_operator = nil
  if options[:expected]
    raise ArgumentError, ":expected must be a hash" unless options[:expected].is_a?(Hash)
    options[:expected].each_pair do |k,v| 
      expected.merge!(self.attr_with_condition({ k => v}))
    end
    if options[:conditional_operator]
      raise ArgumentError, ":condition_operator invalid! Must be one of (#{CONDITIONAL_OPERATOR.keys.join(", ")})" unless CONDITIONAL_OPERATOR[options[:conditional_operator]]
      conditional_operator = CONDITIONAL_OPERATOR[options[:conditional_operator]]
    end
  end

  if options[:update_item]
    # UpdateItem
    key_request = {
      @hash_key[:attribute_name] => {
        @hash_key[:attribute_type] => options[:update_item][:hash_value].to_s,
      }
    }
    if @primary_range_key
      raise ArgumentError, "range_key was not provided to the write command" if options[:update_item][:range_value].blank?
      key_request.merge!({
        @primary_range_key[:attribute_name] => {
          @primary_range_key[:attribute_type] => options[:update_item][:range_value].to_s
        }
      })
    end
    attrs_to_update = {}
    attributes.each_pair do |k,v|
      next if k == @hash_key[:attribute_name] || (@primary_range_key && k == @primary_range_key[:attribute_name])
      if v.nil?
        attrs_to_update.merge!({ k => { :action => "DELETE" } })
      else
        attrs_to_update.merge!({
          k => {
            value: self.class.attr_with_type(k,v).values.last,
            action: "PUT"
          }
        })
      end
    end
    raise ArgumentError, ":return_values must be one of (#{RETURN_VALUES.keys.join(", ")})" unless RETURN_VALUES[options[:return_values]]
    update_item_request = {
      table_name: @model.dynamo_db_table_name(options[:shard_name]),
      key: key_request,
      attribute_updates: attrs_to_update,
      return_consumed_capacity: RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]],
      return_values: RETURN_VALUES[options[:return_values]]
    }
    update_item_request.merge!(expected: expected) unless expected.blank?
    update_item_request.merge!(conditional_operator: conditional_operator) unless conditional_operator.blank? || expected.blank?
    @model.dynamo_db_client.update_item(update_item_request)
  else
    # PutItem
    items = {}
    attributes.each_pair do |k,v|
      next if v.blank? # If empty string or nil, skip...
      items.merge!(self.class.attr_with_type(k,v))
    end
    raise ArgumentError, ":return_values must be one of (#{(RETURN_VALUES.keys - RETURN_VALUES_UPDATE_ONLY).join(", ")})" unless RETURN_VALUES[options[:return_values]] && !RETURN_VALUES_UPDATE_ONLY.include?(options[:return_values])
    put_item_request = {
      table_name: @model.dynamo_db_table_name(options[:shard_name]),
      item: items,
      return_consumed_capacity: RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]],
      return_values: RETURN_VALUES[options[:return_values]]
    }
    put_item_request.merge!(expected: expected) unless expected.blank?
    put_item_request.merge!(conditional_operator: conditional_operator) unless conditional_operator.blank? || expected.blank?
    @model.dynamo_db_client.put_item(put_item_request)
  end
end

Protected Instance Methods

attr_with_condition(attr_conditional) click to toggle source

{:name.eq => “cary”}

return:

{
  "name" => {
    attribute_value_list: [
      "S" => "cary"
    ],
    comparison_operator: "EQ"
  }
}
# File lib/dyna_model/table.rb, line 511
def attr_with_condition(attr_conditional)
  raise ArgumentError, "Expected a 2 element Hash for each :query_filter (ex {:age.gt => 13})" unless attr_conditional.is_a?(Hash) && attr_conditional.keys.size == 1 && attr_conditional.keys.first.is_a?(String)
  attr_name, comparison_operator = attr_conditional.keys.first.split(".")
  raise ArgumentError, "Comparison operator must be one of (#{(COMPARISON_OPERATOR.keys - COMPARISON_OPERATOR_SCAN_ONLY).join(", ")})" unless COMPARISON_OPERATOR.keys.include?(comparison_operator.to_sym)
  attr_key = @model.attributes[attr_name]
  attr_class = attr_key.class
  raise ArgumentError, "#{attr_name} not a valid attribute" unless attr_key
  attr_type = @model.attribute_type_indicator(attr_key)
  raise ArgumentError, "#{attr_name} key must be a Range if using the operator BETWEEN" if comparison_operator == "between" && !attr_conditional.values.first.is_a?(Range)
  raise ArgumentError, ":query_filter value must be an Array if using the operator IN" if comparison_operator == "in" && !attr_conditional.values.first.is_a?(Array)

  attr_value = attr_conditional.values.first

  attribute_value_list = []
  if comparison_operator == "in"
    attr_value.each do |in_v|
      attribute_value_list << { attr_type => casted_attr_value(attr_class, in_v).to_s }
    end
  elsif comparison_operator == "between"
    attribute_value_list << { attr_type => attr_value.min.to_s }
    attribute_value_list << { attr_type => attr_value.max.to_s }
  else
    attribute_value_list = [{ attr_type => casted_attr_value(attr_class, attr_value).to_s }]
  end

  attribute_comparison_hash = {
    comparison_operator: COMPARISON_OPERATOR[comparison_operator.to_sym]
  }
  attribute_comparison_hash.merge!(attribute_value_list: attribute_value_list) unless %w(null not_null).include?(comparison_operator)

  { attr_name => attribute_comparison_hash }
end
casted_attr_value(attr_class, val) click to toggle source
# File lib/dyna_model/table.rb, line 544
def casted_attr_value(attr_class, val)
  casted = attr_class.type_cast(val)
  return nil if casted.nil?
  attr_class.serialize(casted)
end