module Elastics::LiveReindex
private module
Attributes
each_change[R]
Public Instance Methods
get_timestamp_from_index(index)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 136 def get_timestamp_from_index(index) index =~ /^(\d{14})_/ timestr = $1 return unless timestr timestr timearr = timestr.scan /.{1,2}/ Time.mktime *timearr.unshift([timearr.shift,timearr.shift].join) end
on_each_change(&block)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 62 def on_each_change(&block) @each_change = block end
on_reindex(&block)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 58 def on_reindex(&block) @reindex = block end
on_stop_indexing(&block)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 67 def on_stop_indexing(&block) @stop_indexing = block end
prefix_index(index)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 111 def prefix_index(index) base = unprefix_index(index) # raise if base is not included in @ensure_indices raise ExtraIndexError, "The index #{base} is missing from the :ensure_indices option. Reindexing aborted." \ if @ensure_indices && !@ensure_indices.include?(base) prefixed = @prefix + base unless @indices.include?(base) unless Elastics.indices_exists(:index => prefixed) Conf.indices.create_index(base, prefixed, :raise => false) # it might trigger an error if 2 threads create the index at the same time if Conf.optimize_indexing @refresh_intervals[index] ||= Elastics.get_index_settings(:index => prefixed)[prefixed]['settings']['index.refresh_interval'] Elastics.put_index_settings(:index => prefixed, :data => {:index => {:refresh_interval => '-1'}}) unless @refresh_intervals[index] == '-1' end end @indices |= [base] end prefixed end
reindex(opts={}) { |self| ... }
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 71 def reindex(opts={}) yield self opts[:verbose] = true unless opts.has_key?(:verbose) perform(opts) end
reindex_indices(opts={}) { |self| ... }
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 77 def reindex_indices(opts={}) yield self if block_given? opts[:verbose] = true unless opts.has_key?(:verbose) opts[:index] ||= opts.delete(:indices) || Conf.indices.keys # we override the on_reindex eventually set on_reindex do migrate_indices(opts) end perform(opts) end
should_prefix_index?()
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 91 def should_prefix_index? Redis.get(:pid) == $$.to_s end
should_track_change?()
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 95 def should_track_change? pid = Redis.get(:pid) !!pid && !(pid == $$.to_s) end
track_change(action, document)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 100 def track_change(action, document) Redis.rpush(:changes, MultiJson.encode([action, document])) end
track_external_change(app_id, action, document)
click to toggle source
use this method when you are tracking the change of another app you must pass the app_id of the app being affected by the change
# File lib/elastics/admin_live_reindex.rb, line 106 def track_external_change(app_id, action, document) return unless Conf.redis Conf.redis.rpush("#{KEYS[:changes]}-#{app_id}", MultiJson.encode([action, document])) end
unprefix_index(index)
click to toggle source
remove the (eventual) prefix
# File lib/elastics/admin_live_reindex.rb, line 132 def unprefix_index(index) index.sub(/^\d{14}_/, '') end
Private Instance Methods
build_bulk_string(action, document)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 291 def build_bulk_string(action, document) result = if @each_change document.extend(Result::Document) document.extend(Result::DocumentLoader) @each_change.call(action, document) else [{ action => document }] end result = [result] unless result.is_a?(Array) bulk_string = '' result.compact.each do |hash| act, doc = hash.to_a.flatten bulk_string << Elastics.build_bulk_string(doc, :action => act) end bulk_string end
build_bulk_string_from_change(change)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 285 def build_bulk_string_from_change(change) action, document = MultiJson.decode(change) return '' unless @indices.include?(unprefix_index(document['_index'])) build_bulk_string(action, document) end
fduration(seconds)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 310 def fduration(seconds) seconds = seconds.round days = (seconds / 86400) hours = (seconds / 3600) % 24 mins = (seconds / 60) % 60 secs = seconds % 60 pluralize = lambda do |count, word| case when count == 1 then "1 #{word}" when count > 1 then "#{count} #{word}s" end end parts = [] parts << pluralize.call(days, 'day') parts << pluralize.call(hours, 'hour') parts << pluralize.call(mins, 'minute') parts << pluralize.call(secs, 'second') last = parts.pop first = parts.compact.join(', ') first = nil if first == '' [first, last].compact.join(' and ') end
index_changes(opts)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 244 def index_changes(opts) left_changes_count = Redis.llen(:changes) || 0 return if left_changes_count == 0 batch_size = opts[:batch_size] || 100 bulk_string = '' Prompter.say_notice "Reindexing #{left_changes_count} live-changes..." if opts[:verbose] until left_changes_count == 0 batch_count = left_changes_count > batch_size ? batch_size : left_changes_count batch_count.times do bulk_string << build_bulk_string_from_change(Redis.lpop(:changes)) left_changes_count -= 1 end Elastics.post_bulk_string(:bulk_string => bulk_string) bulk_string = '' end end
migrate_indices(opts)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 263 def migrate_indices(opts) Conf.http_client.options[:timeout] = opts[:timeout] || 60 opts[:verbose] = true unless opts.has_key?(:verbose) pbar = ProgBar.new(Elastics.count(opts)['count'], nil, "index #{opts[:index].inspect}: ") if opts[:verbose] Elastics.dump_all(opts) do |batch| result = process_and_post_batch(batch) pbar.process_result(result, batch.size) if opts[:verbose] end pbar.finish if opts[:verbose] end
perform(opts={})
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 147 def perform(opts={}) started_at = Time.now Prompter.say_title 'Live-Reindex' if opts[:verbose] if opts[:safe_reindex] == false Conf.logger.warn 'Safe reindex is disabled!' Prompter.say_warning 'WARNING: Safe reindex is disabled!' if opts[:verbose] end Redis.init @indices = [] @refresh_intervals = {} if Conf.optimize_indexing @prefix = Time.now.strftime('%Y%m%d%H%M%S_') @ensure_indices = nil unless opts[:on_stop_indexing] == false || Conf.on_stop_indexing == false @stop_indexing ||= Conf.on_stop_indexing || raise(MissingStopIndexingProcError, 'The on_stop_indexing block is not set.') end raise MissingOnReindexBlockError, 'You must configure an on_reindex block.' \ unless @reindex raise MissingEnsureIndicesError, 'You must pass the :ensure_indices option when you pass the :models option.' \ if opts.has_key?(:models) && !opts.has_key?(:ensure_indices) if opts[:ensure_indices] @ensure_indices = opts.delete(:ensure_indices) @ensure_indices = @ensure_indices.split(',') unless @ensure_indices.is_a?(Array) each_change = @each_change @each_change = nil migrate_indices(:index => @ensure_indices) @each_change = each_change end @reindex.call # try to empty the changes for 10 times before stopping the indexing 10.times{ index_changes(opts) } # optimizing indices if Conf.optimize_indexing @indices.each do |index| prefixed = @prefix + index Prompter.say_notice "Optimizing index #{prefixed}..." if opts[:verbose] # reset the refresh_interval Elastics.put_index_settings(:index => prefixed, :data => {:index => {:refresh_interval => (@refresh_intervals[index] || '1s')}}) # optimize the index Elastics.optimize_index(:index => prefixed, :params => {:max_num_segments => 5}) end index_changes(opts) end # at this point the changes list should be empty or contain the minimum number of changes we could achieve live # the @stop_indexing should ensure to stop/suspend all the actions that would produce changes in the indices being reindexed if @stop_indexing Prompter.say_notice 'Calling on_stop_indexing...' if opts[:verbose] @stop_indexing.call Prompter.say_notice 'Indexing stopped.' if opts[:verbose] else Prompter.say_notice 'No on_stop_indexing provided.' if opts[:verbose] end # if we have still changes, we can index them all, now that the indexing is stopped index_changes(opts) # deletes the old indices and create the aliases to the new @indices.each do |index| prefixed = @prefix + index Prompter.say_notice "Swapping to index #{prefixed}..." if opts[:verbose] Elastics.delete_index :index => index, :raise => false # may not exist Elastics.post_index_aliases :actions => [{ :add => { :alias => index, :index => prefixed } }] end # after the execution of this method the user should deploy the new code and then resume the regular app processing # we redefine this method so it will raise an error if any new live-reindex is attempted during this session. unless opts[:safe_reindex] == false class_eval <<-ruby, __FILE__, __LINE__ def perform(*) raise MultipleReindexError, "Multiple live-reindex attempted! You cannot use any reindexing method multiple times in the same session or you may corrupt your index/indices! The previous reindexing in this session successfully reindexed and swapped the new index/indices: #{@indices.map{|i| @prefix + i}.join(', ')}. You must deploy now, and run the other reindexing in single successive deploys ASAP. Notice that if the code-changes that you are about to deploy rely on the successive reindexings that have been aborted, your app may fail. If you are working in development mode you must restart the session now. The next time you can silence this error by passing :safe_reindex => false" end ruby end rescue Exception # delete all the created indices @indices ||=[] @indices.each do |index| Elastics.delete_index :index => @prefix + index end raise ensure Redis.reset_keys Prompter.say_notice "Elapsed Time: #{fduration(Time.now - started_at)}." end
process_and_post_batch(batch)
click to toggle source
# File lib/elastics/admin_live_reindex.rb, line 277 def process_and_post_batch(batch) bulk_string = '' batch.each do |document| bulk_string << build_bulk_string('index', document) end Elastics.post_bulk_string(:bulk_string => bulk_string) end