module Mindex::Index::ClassMethods

Public Instance Methods

connection_settings(options = {}) click to toggle source
# File lib/mindex/index.rb, line 9
def connection_settings(options = {})
  @connection_settings = options
end
doc_type() click to toggle source
# File lib/mindex/index.rb, line 40
def doc_type
  @index_label || name.demodulize.tableize
end
drop_items(ids) click to toggle source
# File lib/mindex/index.rb, line 60
def drop_items(ids)
  bulk_data = [ids].flatten.map { |id| { delete: { _index: index_alias, _type: doc_type, _id: id } } }
  elasticsearch.indices.client.bulk(body: bulk_data) if bulk_data.any?
end
elasticsearch() click to toggle source
# File lib/mindex/index.rb, line 92
def elasticsearch
  Elasticsearch.connect(@connection_settings)
end
Also aliased as: es
es()
Alias for: elasticsearch
index_alias() click to toggle source
# File lib/mindex/index.rb, line 30
def index_alias
  [@index_prefix, (@index_label || name.demodulize.tableize)].compact.join('_').underscore
end
index_config(settings:, mappings:) click to toggle source
# File lib/mindex/index.rb, line 13
def index_config(settings:, mappings:)
  @index_settings = settings
  @index_mappings = mappings
end
index_create(move_or_create_index_alias: true) click to toggle source
# File lib/mindex/index.rb, line 53
def index_create(move_or_create_index_alias: true)
  index_name = new_index_name
  elasticsearch.indices.create(index: index_name, body: { settings: @index_settings || {}, mappings: @index_mappings || {} })
  add_index_alias(target: index_name) if move_or_create_index_alias
  index_name
end
index_exist?(name = nil) click to toggle source
# File lib/mindex/index.rb, line 48
def index_exist?(name = nil)
  return false if name.nil? && index_name.nil?
  elasticsearch.indices.exists(index: name || index_name)
end
index_label(label) click to toggle source
# File lib/mindex/index.rb, line 22
def index_label(label)
  @index_label = label
end
index_name() click to toggle source
# File lib/mindex/index.rb, line 34
def index_name
  elasticsearch.indices.get_alias(name: index_alias).keys.first
rescue ::Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end
index_num_threads(count) click to toggle source
# File lib/mindex/index.rb, line 26
def index_num_threads(count)
  @index_num_threads = count
end
index_prefix(prefix) click to toggle source
# File lib/mindex/index.rb, line 18
def index_prefix(prefix)
  @index_prefix = prefix
end
index_refresh() click to toggle source
# File lib/mindex/index.rb, line 44
def index_refresh
  elasticsearch.indices.refresh(index: index_name)
end
recreate_index(options = {}) click to toggle source
# File lib/mindex/index.rb, line 80
def recreate_index(options = {})
  started_at = Time.now
  index_name = index_create(move_or_create_index_alias: false)
  index_queue(index: index_name) do |queue|
    scroll(options) do |items|
      queue << items
    end
  end
  add_index_alias(target: index_name)
  reindex(options.merge(started_at: started_at))
end
reindex(options = {}) click to toggle source
# File lib/mindex/index.rb, line 71
def reindex(options = {})
  index_create unless index_exist?
  index_queue do |queue|
    scroll(options) do |items|
      queue << items
    end
  end
end
reindex_items(ids) click to toggle source
# File lib/mindex/index.rb, line 65
def reindex_items(ids)
  index_queue(threads: 1) do |queue|
    queue << fetch(ids)
  end
end

Private Instance Methods

add_index_alias(target:) click to toggle source
# File lib/mindex/index.rb, line 129
def add_index_alias(target:)
  actions = [{ add: { index: target, alias: index_alias } }]

  if elasticsearch.indices.exists_alias(name: index_alias)
    actions += elasticsearch.indices.get_alias(name: index_alias).keys.map do |index_name|
      { remove: { index: index_name, alias: index_alias } }
    end
  end

  elasticsearch.indices.update_aliases(body: { actions: actions })
end
index_queue(index: nil, threads: nil) { |queue| ... } click to toggle source
# File lib/mindex/index.rb, line 99
def index_queue(index: nil, threads: nil) # rubocop:disable Metrics/AbcSize
  index ||= index_name
  num_threads = threads || @index_num_threads || 4
  queue = SizedQueue.new(num_threads * 2)
  threads = num_threads.times.map do
    thread = Thread.new do
      until (items = queue.pop) == :stop
        bulk_data = []
        [items].flatten.each do |item|
          bulk_data << { index: { _index: index, _type: doc_type, _id: (item[:id] || item['id']), data: item } }
        end
        elasticsearch.indices.client.bulk(body: bulk_data) if bulk_data.any?
      end
    end
    thread.abort_on_exception = true
    thread
  end

  yield queue

  index
ensure
  num_threads.times { queue << :stop }
  threads.each(&:join)
end
new_index_name() click to toggle source
# File lib/mindex/index.rb, line 125
def new_index_name
  "#{index_alias}-v#{DateTime.now.strftime('%Q')}" # rubocop:disable Style/DateTime
end