module ElasticRecord::Index::Documents
Public Instance Methods
bulk(options = {}) { || ... }
click to toggle source
# File lib/elastic_record/index/documents.rb, line 89 def bulk(options = {}, &block) if current_bulk_batch yield else start_new_bulk_batch(options, &block) end end
bulk_add(batch, index_name: alias_name)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 97 def bulk_add(batch, index_name: alias_name) bulk do batch.each do |record| index_record(record, index_name: index_name) end end end
current_bulk_batch()
click to toggle source
# File lib/elastic_record/index/documents.rb, line 105 def current_bulk_batch connection.bulk_actions end
delete_by_query(query)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 79 def delete_by_query(query) scroll_enumerator = build_scroll_enumerator search: query scroll_enumerator.each_slice do |hits| bulk do hits.each { |hit| delete_document hit['_id'] } end end end
delete_document(id, parent: nil, index_name: alias_name)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 63 def delete_document(id, parent: nil, index_name: alias_name) raise "Cannot delete document with empty id" if id.blank? index_name ||= alias_name if batch = current_bulk_batch instructions = { _index: index_name, _id: id, retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { delete: instructions } else path = "/#{index_name}/_doc/#{id}" path << "&parent=#{parent}" if parent connection.json_delete path end end
index_document(id, document, parent: nil, index_name: alias_name)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 26 def index_document(id, document, parent: nil, index_name: alias_name) if batch = current_bulk_batch instructions = { _index: index_name, _id: id } instructions[:parent] = parent if parent batch << { index: instructions } batch << document else path = "/#{index_name}/_doc/#{id}" path << "?parent=#{parent}" if parent if id connection.json_put path, document else connection.json_post path, document end end end
index_record(record, index_name: alias_name)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 6 def index_record(record, index_name: alias_name) unless disabled index_document( record.try(:id), record.as_search_document, index_name: index_name ) end end
update_document(id, document, parent: nil, index_name: alias_name)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 45 def update_document(id, document, parent: nil, index_name: alias_name) raise "Cannot update a document with empty id" if id.blank? params = {doc: document, doc_as_upsert: true} if batch = current_bulk_batch instructions = { _index: index_name, _id: id, retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { update: instructions } batch << params else path = "/#{index_name}/_doc/#{id}/_update?retry_on_conflict=3" path << "&parent=#{parent}" if parent connection.json_post path, params end end
update_record(record, index_name: alias_name)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 16 def update_record(record, index_name: alias_name) unless disabled update_document( record.id, record.as_partial_update_document, index_name: index_name ) end end
Private Instance Methods
start_new_bulk_batch(options) { || ... }
click to toggle source
# File lib/elastic_record/index/documents.rb, line 111 def start_new_bulk_batch(options, &block) connection.bulk_actions = [] yield if current_bulk_batch.any? body = current_bulk_batch.map { |action| "#{JSON.generate(action)}\n" }.join results = connection.json_post("/_bulk?#{options.to_query}", body) verify_bulk_results(results) end ensure connection.bulk_actions = nil end
verify_bulk_results(results)
click to toggle source
# File lib/elastic_record/index/documents.rb, line 125 def verify_bulk_results(results) return unless results.is_a?(Hash) errors = results['items'].select do |item| item.values.first['error'] end raise ElasticRecord::BulkError.new(errors) unless errors.empty? end