module Elasticsearch::Model::Adapter::DynaModelAdapter::Importing
Public Instance Methods
__find_in_batches(options={}) { |batch_for_bulk| ... }
click to toggle source
# File lib/dyna_model/adapters/elasticsearch/dyna_model_adapter.rb, line 33 def __find_in_batches(options={}, &block) items = [] # Use 1/4 or read provision read_provision = self.dynamo_db_table.table_schema[:provisioned_throughput][:read_capacity_units] raise "read_provision not set for class!" unless read_provision default_batch_size = (read_provision / 2.0).floor batch_size = options[:batch_size] || default_batch_size puts "Indexing via scan with batch size of #{batch_size}..." # :consumed_capacity scan_idx = 0 results_hash = {} while scan_idx == 0 || (results_hash && results_hash[:last_evaluated_key]) puts "Batch iteration #{scan_idx+1}..." scan_options = { batch: batch_size, manual_batching: true, return_consumed_capacity: :total } scan_options.merge!(exclusive_start_key: results_hash[:last_evaluated_key]) if results_hash[:last_evaluated_key] scan_options.merge!(scan_filter: options[:scan_filter]) if options[:scan_filter] results_hash = self.scan(scan_options) unless results_hash[:results].blank? puts "Indexing #{results_hash[:results].size} results..." batch_for_bulk = results_hash[:results].map { |a| { index: { _id: a.id, data: a.__elasticsearch__.as_indexed_json } } } yield batch_for_bulk end # If more results to scan, sleep to throttle... # Local Dynamo is not returning consumed_capacity 2014-01-12 if results_hash[:last_evaluated_key] && results_hash[:consumed_capacity] # try to keep read usage under 50% of read_provision sleep_time = results_hash[:consumed_capacity][:capacity_units].to_f / (read_provision / 2.0) puts "Sleeping for #{sleep_time}..." sleep(sleep_time) end scan_idx += 1 end end