class RotatingEsLoader

:nodoc

Constants

DEFAULT_SLICE_SIZE
MAX_INDEX_AGE

indexs with a datestamp newer than this age will not be wiped

Attributes

es_major_version[RW]
slice_size[RW]

Public Class Methods

new(opts) click to toggle source
Calls superclass method EsClient::new
# File lib/rotating_es_loader.rb, line 15
def initialize(opts)
  raise('no credentials provided') unless opts[:credentials]
  raise('no url provided') unless opts[:url]
  raise('no definitions provided') unless opts[:index_definitions].is_a?(Hash)
  uri = URI.parse(opts[:url])

  super(
    url: opts[:url],
    credentials: opts[:credentials]
  )

  @index_definitions = opts[:index_definitions]
  @slice_size = opts[:slice_size] || DEFAULT_SLICE_SIZE

  @logger.debug("index keys: #{index_keys}")
  @datasources = opts[:datasources]

  index_keys.each do |key|
    raise("No datasource for #{key}") unless @index_definitions[key][:datasource].respond_to?(:each)
  end

  es_info = client.info
  @es_major_version = es_info['version']['number'].split('.').first.to_i
end

Public Instance Methods

create_documents() click to toggle source
# File lib/rotating_es_loader.rb, line 73
def create_documents
  index_keys.each do |k|
    create_documents_for_type(
      name: get_index_name(k),
      data: datasource_for(k),
      type: document_type_for(k)
    )
  end
end
create_documents_for_type(name:, data:, type: nil) click to toggle source
# File lib/rotating_es_loader.rb, line 83
def create_documents_for_type(name:, data:, type: nil)
  @logger.info("Creating documents of in index #{name} in batches of #{@slice_size}")
  data.lazy.each_slice(@slice_size).each_with_index do |slice, slice_num|
    @logger.debug("batch #{slice_num}: #{slice.size} docs")
    result = client.bulk(
      body: slice.flat_map do |rec|
        index_record = { index: { _index: name, _id: rec[:id] } }
        index_record[:index].merge!(_type: type) if es_major_version == 5

        [
          index_record,
          rec
        ]
      end
    )

    @logger.warn("ERRORS: #{JSON.pretty_generate(result)}") if result['errors']
  end
end
create_index(name:, key:) click to toggle source
# File lib/rotating_es_loader.rb, line 188
def create_index(name:, key:)
  @logger.debug("creating index #{name}")

  mappings = mappings_adjusted_for_es_version(key)

  @logger.debug("mappings: #{mappings.to_json}")
  @logger.debug("creating index #{name}")

  client.indices.create({
    index: name,
    body: {
      settings: settings_for(key),
      mappings: mappings
    }
  }.tap { |x| puts JSON.pretty_generate(x) })
end
create_indices() click to toggle source
# File lib/rotating_es_loader.rb, line 103
def create_indices
  index_keys.each do |k|
    create_index(name: get_index_name(k), key: k)
  end
end
datasource_for(key) click to toggle source
# File lib/rotating_es_loader.rb, line 58
def datasource_for(key)
  @index_definitions[key][:datasource]
end
delete_old_indices() click to toggle source
# File lib/rotating_es_loader.rb, line 126
def delete_old_indices
  existing_indices = client.indices.get(index: '_all')

  @logger.debug("Existing indexes: #{existing_indices.keys}")

  index_keys.each do |index|
    keys = existing_indices.keys.select { |k| k.include?(index.to_s) }.sort
    keys_by_date = keys.group_by { |k| key_age(k) }
    keys_to_delete = []

    # delete all indexes, keeping one from each day for the last few days
    keys_by_date.each do |age, key_list|
      key_list.pop if age <= MAX_INDEX_AGE
      keys_to_delete += key_list
    end

    unless keys_to_delete.empty?
      @logger.debug("Deleting indexes #{keys_to_delete.join(', ')}")
      client.indices.delete index: keys_to_delete
    end
  end
end
document_type_for(key) click to toggle source
# File lib/rotating_es_loader.rb, line 40
def document_type_for(key)
  raise "document type not supported for ES #{es_major_version}" \
    unless es_major_version <= 5
  @index_definitions[key][:type]
end
execute() click to toggle source
# File lib/rotating_es_loader.rb, line 62
def execute
  create_indices
  create_documents
  swap_aliases
  delete_old_indices
end
get_index_name(key) click to toggle source
# File lib/rotating_es_loader.rb, line 118
def get_index_name(key)
  # TODO: make it more sequential, so that it sorts correctly
  date_str = Date.today.to_s.gsub(/\D/, '') + '-' + Time.now.to_i.to_s + '-' + Process.pid.to_s
  raise("provided key #{key} is not a valid index") unless index_keys.include?(key)
  return key.to_s + '-' + date_str
end
index_keys() click to toggle source
# File lib/rotating_es_loader.rb, line 46
def index_keys
  @index_definitions.keys
end
key_age(key) click to toggle source
# File lib/rotating_es_loader.rb, line 109
def key_age(key)
  date_str = key.split('-')[1]
  if date_str && date_str.size == 8
    (Date.today - Date.parse(date_str)).to_i
  else
    0
  end
end
mappings_adjusted_for_es_version(key) click to toggle source
# File lib/rotating_es_loader.rb, line 176
def mappings_adjusted_for_es_version(key)
  mapping_for_key = mappings_for(key) || @logger.warn("mappings does not contain a mapping for #{key}")
  mappings = {}
  if es_major_version < 6
    mappings[key] = { properties: mapping_for_key }
  else
    mappings[:properties] = mapping_for_key
  end

  mappings
end
mappings_for(key) click to toggle source
# File lib/rotating_es_loader.rb, line 50
def mappings_for(key)
  @index_definitions[key][:mappings]
end
multitype_support?() click to toggle source
# File lib/rotating_es_loader.rb, line 69
def multitype_support?
  return es_major_version <= 5
end
settings_for(key) click to toggle source
# File lib/rotating_es_loader.rb, line 54
def settings_for(key)
  @index_definitions[key][:settings]
end
swap_aliases() click to toggle source
# File lib/rotating_es_loader.rb, line 149
def swap_aliases
  index_keys.each do |alias_name|
    index_name = get_index_name(alias_name)

    actions = [
      { add: { index: index_name, alias: alias_name } }
    ]

    @logger.debug("fetching any indices attached to alias #{alias_name}")
    begin
      client.indices.get_alias(name: alias_name).keys.each do |index_to_remove|
        actions.unshift(
          remove: { index: index_to_remove, alias: alias_name }
        )
      end
    rescue StandardError => e
      @logger.warn(e)
    end

    @logger.debug('update_aliases actions: ' + actions.to_json)

    client.indices.update_aliases body: {
      actions: actions
    }
  end
end