class Zelastic::IndexManager

rubocop:disable Metrics/AbcSize

Attributes

client[R]
config[R]

Public Class Methods

new(config, client: nil) click to toggle source
# File lib/zelastic/index_manager.rb, line 8
def initialize(config, client: nil)
  @config = config
  @client = client || config.clients.first
end

Public Instance Methods

cleanup_old_indices() click to toggle source
# File lib/zelastic/index_manager.rb, line 70
def cleanup_old_indices
  logger.info('Cleaning up old indices in Elasticsearch')
  current_index = client.indices.get_alias(name: config.read_alias).keys.first

  logger.info("Currently used index is #{current_index}")

  indices_to_delete = client
                      .cat
                      .indices(format: :json)
                      .map { |index| index['index'] }
                      .select { |name| name.start_with?(config.read_alias) }
                      .reject { |name| name == current_index }

  if indices_to_delete.none?
    logger.info('Nothing to do: no old indices')
    return
  end
  logger.info(
    "Deleting #{indices_to_delete.count} old indices: #{indices_to_delete.join(', ')}"
  )
  client.indices.delete(index: indices_to_delete)
end
create_index(unique_name) click to toggle source
# File lib/zelastic/index_manager.rb, line 13
def create_index(unique_name)
  index_name = index_name_from_unique(unique_name)

  client.indices.create(index: index_name, body: config.index_definition)
  client.indices.put_alias(index: index_name, name: config.write_alias)
end
populate_index(unique_name = nil, batch_size: 3000) click to toggle source
# File lib/zelastic/index_manager.rb, line 20
def populate_index(unique_name = nil, batch_size: 3000)
  index_name = index_name_from_unique(unique_name)

  config.data_source.find_in_batches(batch_size: batch_size).with_index do |batch, i|
    logger.info(populate_index_log(batch_size: batch_size, batch_number: i + 1))
    indexer.index_batch(batch, client: client, index_name: index_name)
  end
end
stop_dual_writes() click to toggle source
# File lib/zelastic/index_manager.rb, line 48
def stop_dual_writes
  logger.info('Stopping dual writes - making index read and write aliases the same')
  current_index = client.indices.get_alias(name: config.read_alias).keys.first

  logger.info("Currently used index is #{current_index}")

  other_write_indices = client.indices.get_alias(name: config.write_alias).keys
                              .reject { |name| name == current_index }

  if other_write_indices.none?
    logger.info("No write indexes that aren't the read index. Nothing to do!")
    return
  end
  logger.info("Stopping writes to #{other_write_indices.count} old ES indices: " \
                    "#{other_write_indices.join(', ')}")

  actions = other_write_indices.map do |index|
    { remove: { index: index, alias: config.write_alias } }
  end
  client.indices.update_aliases(body: { actions: actions })
end
switch_read_index(new_name) click to toggle source
# File lib/zelastic/index_manager.rb, line 29
def switch_read_index(new_name)
  new_index = [config.read_alias, new_name].join('_')

  old_index =
    if client.indices.exists_alias?(name: config.read_alias)
      client.indices.get_alias(name: config.read_alias).keys.first
    end

  remove_action =
    ({ remove: { index: old_index, alias: config.read_alias } } if old_index)

  client.indices.update_aliases(body: {
                                  actions: [
                                    remove_action,
                                    { add: { index: new_index, alias: config.read_alias } }
                                  ].compact
                                })
end

Private Instance Methods

count_params() click to toggle source
# File lib/zelastic/index_manager.rb, line 123
def count_params
  {
    index: config.read_alias,
    type: config.type? ? config.type : nil
  }.compact
end
current_index_exists?() click to toggle source
# File lib/zelastic/index_manager.rb, line 134
def current_index_exists?
  client.indices.exists?(index: config.read_alias)
end
current_index_size() click to toggle source
# File lib/zelastic/index_manager.rb, line 119
def current_index_size
  @current_index_size ||= client.count(**count_params)['count']
end
index_name_from_unique(unique_name) click to toggle source
# File lib/zelastic/index_manager.rb, line 102
def index_name_from_unique(unique_name)
  if unique_name
    [config.read_alias, unique_name].join('_')
  else
    config.write_alias
  end
end
indexed_percent(batch_size, batch_number) click to toggle source
# File lib/zelastic/index_manager.rb, line 130
def indexed_percent(batch_size, batch_number)
  (batch_size * batch_number.to_f / current_index_size * 100).round(2)
end
indexer() click to toggle source
# File lib/zelastic/index_manager.rb, line 98
def indexer
  @indexer ||= Indexer.new(config)
end
populate_index_log(batch_size:, batch_number:) click to toggle source
# File lib/zelastic/index_manager.rb, line 110
def populate_index_log(batch_size:, batch_number:)
  progress = if current_index_exists?
               "ESTIMATED: #{indexed_percent(batch_size, batch_number)}%"
             else
               'First index'
             end
  "ES: (#{progress}) Indexing #{config.type} records"
end