class Elasticsearch::Extensions::Reindex::Reindex
Copy documents from one index into another
@example Copy documents to another index
client = Elasticsearch::Client.new reindex = Elasticsearch::Extensions::Reindex.new \ source: { index: 'test1', client: client }, dest: { index: 'test2' } reindex.perform
@example Copy documents to a different cluster
source_client = Elasticsearch::Client.new url: 'http://localhost:9200' destination_client = Elasticsearch::Client.new url: 'http://localhost:9250' reindex = Elasticsearch::Extensions::Reindex.new \ source: { index: 'test', client: source_client }, dest: { index: 'test', client: destination_client } reindex.perform
@example Transform the documents during re-indexing
reindex = Elasticsearch::Extensions::Reindex.new \ source: { index: 'test1', client: client }, dest: { index: 'test2' }, transform: lambda { |doc| doc['_source']['category'].upcase! }
The reindexing process works by “scrolling” an index and sending batches via the “Bulk” API
to the destination index/cluster
@option arguments [String] :source The source index/cluster definition (Required) @option arguments [String] :dest The destination index/cluster definition (Required) @option arguments [Proc] :transform A block which will be executed for each document @option arguments [Integer] :batch_size The size of the batch for scroll operation (Default: 1000) @option arguments [String] :scroll The timeout for the scroll operation (Default: 5min) @option arguments [Boolean] :refresh Whether to refresh the destination index after
the operation is completed (Default: false)
Be aware, that if you want to change the destination index settings and/or mappings, you have to do so in advance by using the “Indices Create” API
.
Note, that there is a native “Reindex” API
in Elasticsearch
2.3.x and higer versions, which will be more performant than the Ruby version.
@see www.rubydoc.info/gems/elasticsearch-api/Elasticsearch/API/Actions#reindex-instance_method
Attributes
Public Class Methods
# File lib/elasticsearch/extensions/reindex.rb, line 114 def initialize(arguments={}) [ [:source, :index], [:source, :client], [:dest, :index] ].each do |required_option| value = required_option.reduce(arguments) { |sum, o| sum = sum[o] ? sum[o] : {} } raise ArgumentError, "Required argument '#{Hash[*required_option]}' missing" if \ value.respond_to?(:empty?) ? value.empty? : value.nil? end @arguments = { batch_size: 1000, scroll: '5m', refresh: false }.merge(arguments) arguments[:dest][:client] ||= arguments[:source][:client] end
Public Instance Methods
# File lib/elasticsearch/extensions/reindex.rb, line 169 def __store_batch(documents) body = documents.map do |doc| doc['_index'] = arguments[:dest][:index] arguments[:transform].call(doc) if arguments[:transform] doc['data'] = doc['_source'] doc.delete('_score') doc.delete('_source') { index: doc } end arguments[:dest][:client].bulk body: body end
Performs the operation
@return [Hash] A Hash with the information about the operation outcome
# File lib/elasticsearch/extensions/reindex.rb, line 140 def perform output = { errors: 0 } response = arguments[:source][:client].search( index: arguments[:source][:index], scroll: arguments[:scroll], size: arguments[:batch_size] ) documents = response['hits']['hits'] unless documents.empty? bulk_response = __store_batch(documents) output[:errors] += bulk_response['items'].select { |k, v| k.values.first['error'] }.size end while response = arguments[:source][:client].scroll(scroll_id: response['_scroll_id'], scroll: arguments[:scroll]) do documents = response['hits']['hits'] break if documents.empty? bulk_response = __store_batch(documents) output[:errors] += bulk_response['items'].select { |k, v| k.values.first['error'] }.size end arguments[:dest][:client].indices.refresh index: arguments[:dest][:index] if arguments[:refresh] output end