module Elasticsearch::Index::Transfer

Constants

BATCH_SIZE
VERSION

Attributes

access_key_id[RW]
batch_size[RW]
bucket[RW]
host[RW]
index[RW]
port[RW]
prefix[RW]
region[RW]
secret_access_key[RW]

Public Class Methods

elasticsearch_extract(source_client, source_options, target_client, target_options) click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 11
def self.elasticsearch_extract source_client, source_options, target_client, target_options
  elasticsearch_extract_params(source_options)
  target_options[:index] = target_options[:index] || @index # copyping the source index name
  client = Elasticsearch::Client.new(host: @host, port: @port)
  batch_no = 0

  # config setup into the target
  settings = client.indices.get_settings(index: @index)[@index]
  settings["settings"]["index"] = settings["settings"]["index"].select{|key, value| ["number_of_shards", "number_of_replicas"].include?(key)}
  mapping = client.indices.get_mapping(index: @index)[@index]
  aliases = client.indices.get_alias(index: @index)[@index]

  # Transferring the first batch
  response = client.search index: @index, scroll: '5m', body: {size: @batch_size, sort: ['_doc']}
  total = response["hits"]["total"]
  batch_size = @batch_size
  batch_count = total/batch_size

  _configs = {index: @index, body: mapping.merge(aliases).merge(settings), batch_count: batch_count, total: total}
  send("#{target_client}_write_settings", target_options, _configs)

  data = process_hits(response["hits"]["hits"])
  send("#{target_client}_ingest", target_options, data, batch_no)
  
  puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Total Document count - #{total}"
  puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Batch-(#{batch_no}/#{batch_count}) transfered successfully."
  
  # Transferring the subsequent batches
  batch_no = batch_no + 1
  while response = client.scroll(body: {scroll_id: response['_scroll_id']}, scroll: '5m') and not response['hits']['hits'].empty? do
    data = process_hits(response["hits"]["hits"])
    send("#{target_client}_ingest", target_options, data, batch_no)
    puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Batch-(#{batch_no}/#{batch_count}) transfered successfully."
    batch_no = batch_no + 1
  end
  puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Done."
end
elasticsearch_ingest(options, data, batch_no) click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 49
def self.elasticsearch_ingest options, data, batch_no
  elasticsearch_extract_params(options)
  client = Elasticsearch::Client.new(host: @host, port: @port)
  data.each do |record|
    record["index"]["_index"] = @index
  end
  client.bulk(body: data)
end
elasticsearch_write_settings(options, _configs) click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 58
def self.elasticsearch_write_settings options, _configs
  elasticsearch_extract_params(options)
  client = Elasticsearch::Client.new(host: @host, port: @port)
  begin
    client.indices.create(index: @index, body: _configs["body"])
  rescue
    raise "Index(#{@index}) is already present on (#{@host})"
  end
end
execute(options) click to toggle source
# File lib/elasticsearch/index/transfer.rb, line 9
def self.execute options
  source = options.fetch(:source) rescue (raise "key(source) missing in the options")
  target = options.fetch(:target) rescue (raise "key(target) missing in the options")

  source_client = source.keys.first
  target_client = target.keys.first

  send("#{source_client}_extract", source_client, source[source_client], target_client, target[target_client])
  true
end
s3_extract(source_client, source_options, target_client, target_options) click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 9
def self.s3_extract source_client, source_options, target_client, target_options
  s3_extract_params(source_options)
  client = Aws::S3::Client.new(access_key_id: @access_key_id, secret_access_key: @secret_access_key, region: @region)
  begin
    client.head_bucket({bucket: @bucket})
  rescue Aws::S3::Errors::NotFound
    raise "Bucket - #{@bucket} not found."
  end
  begin
    response = client.get_object(bucket: @bucket, key: "#{@prefix}_config.json").body.read
  rescue Aws::S3::Errors::NoSuchKey
    raise "_config.json not found. es-backup does not exists at specified location(S3:#{@bucket}/#{prefix})"
  end
  
  # config setup into the target
  _configs = JSON.parse(response)
  target_options[:index] = target_options[:index] || _configs["index"] # if target index is not provided used stored index name
  send("#{target_client}_write_settings", target_options, _configs)
  puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Total Document count - #{_configs['total']}"

  # Transferring the index data
  batch_count = _configs["batch_count"]
  (0..batch_count).each do |batch_no|
    # puts "processing batch_no = #{batch_no}..."
    data = JSON.parse(client.get_object(bucket: @bucket, key: "#{@prefix}batch-#{batch_no}.json").body.read)
    send("#{target_client}_ingest", target_options, data, batch_no)
    puts "Elasticsearch Transfer(#{source_client}-to-#{target_client}): Batch-(#{batch_no}/#{batch_count}) transfered successfully."
  end
end
s3_ingest(options, data, batch_no) click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 39
def self.s3_ingest options, data, batch_no
  s3_extract_params(options)
  client = Aws::S3::Client.new(access_key_id: @access_key_id, secret_access_key: @secret_access_key, region: @region)
  client.put_object(body: data.to_json,
                    bucket: @bucket,
                    key: "#{@prefix}batch-#{batch_no}.json")
end
s3_write_settings(options, _configs) click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 47
def self.s3_write_settings options, _configs
  s3_extract_params(options)
  client = Aws::S3::Client.new(access_key_id: @access_key_id, secret_access_key: @secret_access_key, region: @region)
  begin
    client.head_bucket({bucket: @bucket})
  rescue Aws::S3::Errors::NotFound
    raise "Bucket - #{@bucket} not found. Please create the bucket if it does not exists"
  end
  client.put_object(body: _configs.to_json,
                    bucket: @bucket,
                    key: "#{@prefix}_config.json")
end

Private Class Methods

elasticsearch_extract_params(options) click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 69
def self.elasticsearch_extract_params options
  @host  = options.fetch(:host) rescue (raise "elasticsearch-host is missing from the options")
  @port  = options.fetch(:port) rescue (raise "elasticsearch-port is missing from the options")
  @index = options.fetch(:index) rescue (raise "elasticsearch-index is missing from the options")
  @batch_size = options.fetch(:batch_size) rescue BATCH_SIZE # default batch size is 5000
end
process_hits(hits) click to toggle source
# File lib/elasticsearch/index/transfer/elasticsearch.rb, line 76
def self.process_hits hits
  docs = []
  hits.each do |hit|
    hit.delete("_score")
    hit.delete("sort")
    hit["data"] = hit.delete("_source")
    doc = {}
    doc["index"] = hit
    docs << doc
  end
  docs
end
s3_extract_params(options) click to toggle source
# File lib/elasticsearch/index/transfer/s3.rb, line 61
def self.s3_extract_params options
  @region = options.fetch(:region)
  @access_key_id = options.fetch(:access_key_id)
  @secret_access_key = options.fetch(:secret_access_key)
  @bucket = options.fetch(:bucket)
  @prefix = options.fetch(:prefix) rescue ""
  @prefix = "#{@prefix}/" if not @prefix.empty? and @prefix[-1] != '/'
end