class Searchkick::BulkIndexer

Attributes

index[R]

Public Class Methods

new(index) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 5
def initialize(index)
  @index = index
end

Public Instance Methods

batches_left() click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 54
def batches_left
  Searchkick.with_redis { |r| r.scard(batches_key) }
end
bulk_delete(records) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 46
def bulk_delete(records)
  Searchkick.indexer.queue(records.reject { |r| r.id.blank? }.map { |r| RecordData.new(index, r).delete_data })
end
bulk_index(records) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 42
def bulk_index(records)
  Searchkick.indexer.queue(records.map { |r| RecordData.new(index, r).index_data })
end
bulk_update(records, method_name) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 50
def bulk_update(records, method_name)
  Searchkick.indexer.queue(records.map { |r| RecordData.new(index, r).update_data(method_name) })
end
import_scope(relation, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false, scope: nil) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 9
def import_scope(relation, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false, scope: nil)
  if scope
    relation = relation.send(scope)
  elsif relation.respond_to?(:search_import)
    relation = relation.search_import
  end

  if batch
    import_or_update relation.to_a, method_name, async
    Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id
  elsif full && async
    full_reindex_async(relation)
  elsif relation.respond_to?(:find_in_batches)
    if resume
      # use total docs instead of max id since there's not a great way
      # to get the max _id without scripting since it's a string

      # TODO use primary key and prefix with table name
      relation = relation.where("id > ?", total_docs)
    end

    relation = relation.select("id").except(:includes, :preload) if async

    relation.find_in_batches batch_size: batch_size do |items|
      import_or_update items, method_name, async
    end
  else
    each_batch(relation) do |items|
      import_or_update items, method_name, async
    end
  end
end

Private Instance Methods

batch_size() click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 167
def batch_size
  @batch_size ||= index.options[:batch_size] || 1000
end
batches_key() click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 163
def batches_key
  "searchkick:reindex:#{index.name}:batches"
end
bulk_reindex_job(scope, batch_id, options) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 140
def bulk_reindex_job(scope, batch_id, options)
  Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) }
  Searchkick::BulkReindexJob.perform_later({
    class_name: scope.model_name.name,
    index_name: index.name,
    batch_id: batch_id
  }.merge(options))
end
each_batch(scope) { |items| ... } click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 126
def each_batch(scope)
  # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
  # use cursor for Mongoid
  items = []
  scope.all.each do |item|
    items << item
    if items.length == batch_size
      yield items
      items = []
    end
  end
  yield items if items.any?
end
full_reindex_async(scope) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 85
def full_reindex_async(scope)
  if scope.respond_to?(:primary_key)
    # TODO expire Redis key
    primary_key = scope.primary_key

    starting_id =
      begin
        scope.minimum(primary_key)
      rescue ActiveRecord::StatementInvalid
        false
      end

    if starting_id.nil?
      # no records, do nothing
    elsif starting_id.is_a?(Numeric)
      max_id = scope.maximum(primary_key)
      batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil

      batches_count.times do |i|
        batch_id = i + 1
        min_id = starting_id + (i * batch_size)
        bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1
      end
    else
      scope.find_in_batches(batch_size: batch_size).each_with_index do |batch, i|
        batch_id = i + 1

        bulk_reindex_job scope, batch_id, record_ids: batch.map { |record| record.id.to_s }
      end
    end
  else
    batch_id = 1
    # TODO remove any eager loading
    scope = scope.only(:_id) if scope.respond_to?(:only)
    each_batch(scope) do |items|
      bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s }
      batch_id += 1
    end
  end
end
import_or_update(records, method_name, async) click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 60
def import_or_update(records, method_name, async)
  if records.any?
    if async
      Searchkick::BulkReindexJob.perform_later(
        class_name: records.first.class.name,
        record_ids: records.map(&:id),
        index_name: index.name,
        method_name: method_name ? method_name.to_s : nil
      )
    else
      records = records.select(&:should_index?)
      if records.any?
        with_retries do
          # call out to index for ActiveSupport notifications
          if method_name
            index.bulk_update(records, method_name)
          else
            index.bulk_index(records)
          end
        end
      end
    end
  end
end
with_retries() { || ... } click to toggle source
# File lib/searchkick/bulk_indexer.rb, line 149
def with_retries
  retries = 0

  begin
    yield
  rescue Faraday::ClientError => e
    if retries < 1
      retries += 1
      retry
    end
    raise e
  end
end