module Elasticsearch::Model::Adapter::DynaModelAdapter::Importing

Public Instance Methods

__find_in_batches(options={}) { |batch_for_bulk| ... } click to toggle source
# File lib/dyna_model/adapters/elasticsearch/dyna_model_adapter.rb, line 33
def __find_in_batches(options={}, &block)
  items = []

  # Use 1/4 or read provision
  read_provision = self.dynamo_db_table.table_schema[:provisioned_throughput][:read_capacity_units]
  raise "read_provision not set for class!" unless read_provision
  default_batch_size = (read_provision / 2.0).floor
  batch_size = options[:batch_size] || default_batch_size
  puts "Indexing via scan with batch size of #{batch_size}..."

  # :consumed_capacity
  scan_idx = 0
  results_hash = {}
  while scan_idx == 0 || (results_hash && results_hash[:last_evaluated_key])
    puts "Batch iteration #{scan_idx+1}..."
    scan_options = {
      batch: batch_size,
      manual_batching: true,
      return_consumed_capacity: :total
    }
    scan_options.merge!(exclusive_start_key: results_hash[:last_evaluated_key]) if results_hash[:last_evaluated_key]
    scan_options.merge!(scan_filter: options[:scan_filter]) if options[:scan_filter]
    results_hash = self.scan(scan_options)

    unless results_hash[:results].blank?
      puts "Indexing #{results_hash[:results].size} results..."
      batch_for_bulk = results_hash[:results].map { |a| { index: {
        _id: a.id,
        data: a.__elasticsearch__.as_indexed_json
      } } }
      yield batch_for_bulk
    end

    # If more results to scan, sleep to throttle...
    #   Local Dynamo is not returning consumed_capacity 2014-01-12
    if results_hash[:last_evaluated_key] && results_hash[:consumed_capacity]
      # try to keep read usage under 50% of read_provision
      sleep_time = results_hash[:consumed_capacity][:capacity_units].to_f / (read_provision / 2.0)
      puts "Sleeping for #{sleep_time}..."
      sleep(sleep_time)
    end
    
    scan_idx += 1
  end

end