module Elasticsearch::Index::Transfer
Constants
- BATCH_SIZE
- VERSION
Attributes
access_key_id[RW]
batch_size[RW]
bucket[RW]
host[RW]
index[RW]
port[RW]
prefix[RW]
region[RW]
secret_access_key[RW]
Public Class Methods
elasticsearch_extract(source_client, source_options, target_client, target_options)
click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 11 def self.elasticsearch_extract source_client, source_options, target_client, target_options elasticsearch_extract_params(source_options) target_options[:index] = target_options[:index] || @index # copyping the source index name client = Elasticsearch::Client.new(host: @host, port: @port) batch_no = 0 # config setup into the target settings = client.indices.get_settings(index: @index)[@index] settings["settings"]["index"] = settings["settings"]["index"].select{|key, value| ["number_of_shards", "number_of_replicas"].include?(key)} mapping = client.indices.get_mapping(index: @index)[@index] aliases = client.indices.get_alias(index: @index)[@index] # Transferring the first batch response = client.search index: @index, scroll: '5m', body: {size: @batch_size, sort: ['_doc']} total = response["hits"]["total"] batch_size = @batch_size batch_count = total/batch_size _configs = {index: @index, body: mapping.merge(aliases).merge(settings), batch_count: batch_count, total: total} send("#{target_client}_write_settings", target_options, _configs) data = process_hits(response["hits"]["hits"]) send("#{target_client}_ingest", target_options, data, batch_no) puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Total Document count - #{total}" puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Batch-(#{batch_no}/#{batch_count}) transfered successfully." # Transferring the subsequent batches batch_no = batch_no + 1 while response = client.scroll(body: {scroll_id: response['_scroll_id']}, scroll: '5m') and not response['hits']['hits'].empty? do data = process_hits(response["hits"]["hits"]) send("#{target_client}_ingest", target_options, data, batch_no) puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Batch-(#{batch_no}/#{batch_count}) transfered successfully." batch_no = batch_no + 1 end puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Done." end
elasticsearch_ingest(options, data, batch_no)
click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 49 def self.elasticsearch_ingest options, data, batch_no elasticsearch_extract_params(options) client = Elasticsearch::Client.new(host: @host, port: @port) data.each do |record| record["index"]["_index"] = @index end client.bulk(body: data) end
elasticsearch_write_settings(options, _configs)
click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 58 def self.elasticsearch_write_settings options, _configs elasticsearch_extract_params(options) client = Elasticsearch::Client.new(host: @host, port: @port) begin client.indices.create(index: @index, body: _configs["body"]) rescue raise "Index(#{@index}) is already present on (#{@host})" end end
execute(options)
click to toggle source
# File lib/elasticsearch/index/transfer.rb, line 9 def self.execute options source = options.fetch(:source) rescue (raise "key(source) missing in the options") target = options.fetch(:target) rescue (raise "key(target) missing in the options") source_client = source.keys.first target_client = target.keys.first send("#{source_client}_extract", source_client, source[source_client], target_client, target[target_client]) true end
s3_extract(source_client, source_options, target_client, target_options)
click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 9 def self.s3_extract source_client, source_options, target_client, target_options s3_extract_params(source_options) client = Aws::S3::Client.new(access_key_id: @access_key_id, secret_access_key: @secret_access_key, region: @region) begin client.head_bucket({bucket: @bucket}) rescue Aws::S3::Errors::NotFound raise "Bucket - #{@bucket} not found." end begin response = client.get_object(bucket: @bucket, key: "#{@prefix}_config.json").body.read rescue Aws::S3::Errors::NoSuchKey raise "_config.json not found. es-backup does not exists at specified location(S3:#{@bucket}/#{prefix})" end # config setup into the target _configs = JSON.parse(response) target_options[:index] = target_options[:index] || _configs["index"] # if target index is not provided used stored index name send("#{target_client}_write_settings", target_options, _configs) puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Total Document count - #{_configs['total']}" # Transferring the index data batch_count = _configs["batch_count"] (0..batch_count).each do |batch_no| # puts "processing batch_no = #{batch_no}..." data = JSON.parse(client.get_object(bucket: @bucket, key: "#{@prefix}batch-#{batch_no}.json").body.read) send("#{target_client}_ingest", target_options, data, batch_no) puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Batch-(#{batch_no}/#{batch_count}) transfered successfully." end end
s3_ingest(options, data, batch_no)
click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 39 def self.s3_ingest options, data, batch_no s3_extract_params(options) client = Aws::S3::Client.new(access_key_id: @access_key_id, secret_access_key: @secret_access_key, region: @region) client.put_object(body: data.to_json, bucket: @bucket, key: "#{@prefix}batch-#{batch_no}.json") end
s3_write_settings(options, _configs)
click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 47 def self.s3_write_settings options, _configs s3_extract_params(options) client = Aws::S3::Client.new(access_key_id: @access_key_id, secret_access_key: @secret_access_key, region: @region) begin client.head_bucket({bucket: @bucket}) rescue Aws::S3::Errors::NotFound raise "Bucket - #{@bucket} not found. Please create the bucket if it does not exists" end client.put_object(body: _configs.to_json, bucket: @bucket, key: "#{@prefix}_config.json") end
Private Class Methods
elasticsearch_extract_params(options)
click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 69 def self.elasticsearch_extract_params options @host = options.fetch(:host) rescue (raise "elasticsearch-host is missing from the options") @port = options.fetch(:port) rescue (raise "elasticsearch-port is missing from the options") @index = options.fetch(:index) rescue (raise "elasticsearch-index is missing from the options") @batch_size = options.fetch(:batch_size) rescue BATCH_SIZE # default batch size is 5000 end
process_hits(hits)
click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 76 def self.process_hits hits docs = [] hits.each do |hit| hit.delete("_score") hit.delete("sort") hit["data"] = hit.delete("_source") doc = {} doc["index"] = hit docs << doc end docs end
s3_extract_params(options)
click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 61 def self.s3_extract_params options @region = options.fetch(:region) @access_key_id = options.fetch(:access_key_id) @secret_access_key = options.fetch(:secret_access_key) @bucket = options.fetch(:bucket) @prefix = options.fetch(:prefix) rescue "" @prefix = "#{@prefix}/" if not @prefix.empty? and @prefix[-1] != '/' end