module Mindex::Index::ClassMethods
Public Instance Methods
connection_settings(options = {})
click to toggle source
# File lib/mindex/index.rb, line 9 def connection_settings(options = {}) @connection_settings = options end
doc_type()
click to toggle source
# File lib/mindex/index.rb, line 40 def doc_type @index_label || name.demodulize.tableize end
drop_items(ids)
click to toggle source
# File lib/mindex/index.rb, line 60 def drop_items(ids) bulk_data = [ids].flatten.map { |id| { delete: { _index: index_alias, _type: doc_type, _id: id } } } elasticsearch.indices.client.bulk(body: bulk_data) if bulk_data.any? end
elasticsearch()
click to toggle source
# File lib/mindex/index.rb, line 92 def elasticsearch Elasticsearch.connect(@connection_settings) end
Also aliased as: es
index_alias()
click to toggle source
# File lib/mindex/index.rb, line 30 def index_alias [@index_prefix, (@index_label || name.demodulize.tableize)].compact.join('_').underscore end
index_config(settings:, mappings:)
click to toggle source
# File lib/mindex/index.rb, line 13 def index_config(settings:, mappings:) @index_settings = settings @index_mappings = mappings end
index_create(move_or_create_index_alias: true)
click to toggle source
# File lib/mindex/index.rb, line 53 def index_create(move_or_create_index_alias: true) index_name = new_index_name elasticsearch.indices.create(index: index_name, body: { settings: @index_settings || {}, mappings: @index_mappings || {} }) add_index_alias(target: index_name) if move_or_create_index_alias index_name end
index_exist?(name = nil)
click to toggle source
# File lib/mindex/index.rb, line 48 def index_exist?(name = nil) return false if name.nil? && index_name.nil? elasticsearch.indices.exists(index: name || index_name) end
index_label(label)
click to toggle source
# File lib/mindex/index.rb, line 22 def index_label(label) @index_label = label end
index_name()
click to toggle source
# File lib/mindex/index.rb, line 34 def index_name elasticsearch.indices.get_alias(name: index_alias).keys.first rescue ::Elasticsearch::Transport::Transport::Errors::NotFound nil end
index_num_threads(count)
click to toggle source
# File lib/mindex/index.rb, line 26 def index_num_threads(count) @index_num_threads = count end
index_prefix(prefix)
click to toggle source
# File lib/mindex/index.rb, line 18 def index_prefix(prefix) @index_prefix = prefix end
index_refresh()
click to toggle source
# File lib/mindex/index.rb, line 44 def index_refresh elasticsearch.indices.refresh(index: index_name) end
recreate_index(options = {})
click to toggle source
# File lib/mindex/index.rb, line 80 def recreate_index(options = {}) started_at = Time.now index_name = index_create(move_or_create_index_alias: false) index_queue(index: index_name) do |queue| scroll(options) do |items| queue << items end end add_index_alias(target: index_name) reindex(options.merge(started_at: started_at)) end
reindex(options = {})
click to toggle source
# File lib/mindex/index.rb, line 71 def reindex(options = {}) index_create unless index_exist? index_queue do |queue| scroll(options) do |items| queue << items end end end
reindex_items(ids)
click to toggle source
# File lib/mindex/index.rb, line 65 def reindex_items(ids) index_queue(threads: 1) do |queue| queue << fetch(ids) end end
Private Instance Methods
add_index_alias(target:)
click to toggle source
# File lib/mindex/index.rb, line 129 def add_index_alias(target:) actions = [{ add: { index: target, alias: index_alias } }] if elasticsearch.indices.exists_alias(name: index_alias) actions += elasticsearch.indices.get_alias(name: index_alias).keys.map do |index_name| { remove: { index: index_name, alias: index_alias } } end end elasticsearch.indices.update_aliases(body: { actions: actions }) end
index_queue(index: nil, threads: nil) { |queue| ... }
click to toggle source
# File lib/mindex/index.rb, line 99 def index_queue(index: nil, threads: nil) # rubocop:disable Metrics/AbcSize index ||= index_name num_threads = threads || @index_num_threads || 4 queue = SizedQueue.new(num_threads * 2) threads = num_threads.times.map do thread = Thread.new do until (items = queue.pop) == :stop bulk_data = [] [items].flatten.each do |item| bulk_data << { index: { _index: index, _type: doc_type, _id: (item[:id] || item['id']), data: item } } end elasticsearch.indices.client.bulk(body: bulk_data) if bulk_data.any? end end thread.abort_on_exception = true thread end yield queue index ensure num_threads.times { queue << :stop } threads.each(&:join) end
new_index_name()
click to toggle source
# File lib/mindex/index.rb, line 125 def new_index_name "#{index_alias}-v#{DateTime.now.strftime('%Q')}" # rubocop:disable Style/DateTime end