class Zelastic::IndexManager
rubocop:disable Metrics/AbcSize
Attributes
client[R]
config[R]
Public Class Methods
new(config, client: nil)
click to toggle source
# File lib/zelastic/index_manager.rb, line 8 def initialize(config, client: nil) @config = config @client = client || config.clients.first end
Public Instance Methods
cleanup_old_indices()
click to toggle source
# File lib/zelastic/index_manager.rb, line 70 def cleanup_old_indices logger.info('Cleaning up old indices in Elasticsearch') current_index = client.indices.get_alias(name: config.read_alias).keys.first logger.info("Currently used index is #{current_index}") indices_to_delete = client .cat .indices(format: :json) .map { |index| index['index'] } .select { |name| name.start_with?(config.read_alias) } .reject { |name| name == current_index } if indices_to_delete.none? logger.info('Nothing to do: no old indices') return end logger.info( "Deleting #{indices_to_delete.count} old indices: #{indices_to_delete.join(', ')}" ) client.indices.delete(index: indices_to_delete) end
create_index(unique_name)
click to toggle source
# File lib/zelastic/index_manager.rb, line 13 def create_index(unique_name) index_name = index_name_from_unique(unique_name) client.indices.create(index: index_name, body: config.index_definition) client.indices.put_alias(index: index_name, name: config.write_alias) end
populate_index(unique_name = nil, batch_size: 3000)
click to toggle source
# File lib/zelastic/index_manager.rb, line 20 def populate_index(unique_name = nil, batch_size: 3000) index_name = index_name_from_unique(unique_name) config.data_source.find_in_batches(batch_size: batch_size).with_index do |batch, i| logger.info(populate_index_log(batch_size: batch_size, batch_number: i + 1)) indexer.index_batch(batch, client: client, index_name: index_name) end end
stop_dual_writes()
click to toggle source
# File lib/zelastic/index_manager.rb, line 48 def stop_dual_writes logger.info('Stopping dual writes - making index read and write aliases the same') current_index = client.indices.get_alias(name: config.read_alias).keys.first logger.info("Currently used index is #{current_index}") other_write_indices = client.indices.get_alias(name: config.write_alias).keys .reject { |name| name == current_index } if other_write_indices.none? logger.info("No write indexes that aren't the read index. Nothing to do!") return end logger.info("Stopping writes to #{other_write_indices.count} old ES indices: " \ "#{other_write_indices.join(', ')}") actions = other_write_indices.map do |index| { remove: { index: index, alias: config.write_alias } } end client.indices.update_aliases(body: { actions: actions }) end
switch_read_index(new_name)
click to toggle source
# File lib/zelastic/index_manager.rb, line 29 def switch_read_index(new_name) new_index = [config.read_alias, new_name].join('_') old_index = if client.indices.exists_alias?(name: config.read_alias) client.indices.get_alias(name: config.read_alias).keys.first end remove_action = ({ remove: { index: old_index, alias: config.read_alias } } if old_index) client.indices.update_aliases(body: { actions: [ remove_action, { add: { index: new_index, alias: config.read_alias } } ].compact }) end
Private Instance Methods
count_params()
click to toggle source
# File lib/zelastic/index_manager.rb, line 123 def count_params { index: config.read_alias, type: config.type? ? config.type : nil }.compact end
current_index_exists?()
click to toggle source
# File lib/zelastic/index_manager.rb, line 134 def current_index_exists? client.indices.exists?(index: config.read_alias) end
current_index_size()
click to toggle source
# File lib/zelastic/index_manager.rb, line 119 def current_index_size @current_index_size ||= client.count(**count_params)['count'] end
index_name_from_unique(unique_name)
click to toggle source
# File lib/zelastic/index_manager.rb, line 102 def index_name_from_unique(unique_name) if unique_name [config.read_alias, unique_name].join('_') else config.write_alias end end
indexed_percent(batch_size, batch_number)
click to toggle source
# File lib/zelastic/index_manager.rb, line 130 def indexed_percent(batch_size, batch_number) (batch_size * batch_number.to_f / current_index_size * 100).round(2) end
indexer()
click to toggle source
# File lib/zelastic/index_manager.rb, line 98 def indexer @indexer ||= Indexer.new(config) end
populate_index_log(batch_size:, batch_number:)
click to toggle source
# File lib/zelastic/index_manager.rb, line 110 def populate_index_log(batch_size:, batch_number:) progress = if current_index_exists? "ESTIMATED: #{indexed_percent(batch_size, batch_number)}%" else 'First index' end "ES: (#{progress}) Indexing #{config.type} records" end