class Zelastic::Indexer

Attributes

config[R]

Public Class Methods

new(config) click to toggle source
# File lib/zelastic/indexer.rb, line 16
def initialize(config)
  @config = config
end

Public Instance Methods

delete_by_id(id) click to toggle source
# File lib/zelastic/indexer.rb, line 37
def delete_by_id(id)
  delete_by_ids([id])
end
delete_by_ids(ids) click to toggle source
# File lib/zelastic/indexer.rb, line 41
def delete_by_ids(ids)
  logger.info('ES: Deleting batch records')

  execute_bulk do |index_name|
    ids.map do |id|
      delete_params = { _index: index_name, _id: id }
      delete_params[:_type] = config.type if config.type?

      { delete: delete_params }
    end
  end
end
delete_by_query(query) click to toggle source
# File lib/zelastic/indexer.rb, line 54
def delete_by_query(query)
  logger.info('ES: Deleting batch records')

  config.clients.map do |client|
    client.delete_by_query(index: config.write_alias, body: { query: query })
  end
end
index_batch(batch, client: nil, index_name: nil, refresh: false) click to toggle source
# File lib/zelastic/indexer.rb, line 20
def index_batch(batch, client: nil, index_name: nil, refresh: false)
  version = current_version
  execute_bulk(client: client, index_name: index_name, refresh: refresh) do |index|
    batch.map do |record|
      index_command(index: index, version: version, record: record)
    end
  end
end
index_record(record, refresh: false) click to toggle source
# File lib/zelastic/indexer.rb, line 29
def index_record(record, refresh: false)
  version = current_version

  execute_bulk(refresh: refresh) do |index_name|
    [index_command(index: index_name, version: version, record: record)]
  end
end

Private Instance Methods

check_errors!(result) click to toggle source
# File lib/zelastic/indexer.rb, line 108
def check_errors!(result)
  return false unless result['errors']

  errors = result['items']
           .map { |item| item['error'] || item.fetch('index', {})['error'] }
           .compact

  ignorable_errors, important_errors = errors
                                       .partition { |error| ignorable_error?(error) }

  logger.warn("Ignoring #{ignorable_errors.count} version conflicts") if ignorable_errors.any?

  return unless important_errors.any?

  raise IndexingError, important_errors
end
current_version() click to toggle source
# File lib/zelastic/indexer.rb, line 67
def current_version
  config.data_source.connection
        .select_one('SELECT txid_snapshot_xmax(txid_current_snapshot()) as xmax')
        .fetch('xmax')
end
execute_bulk(client: nil, index_name: nil, refresh: false) { |index| ... } click to toggle source
# File lib/zelastic/indexer.rb, line 94
def execute_bulk(client: nil, index_name: nil, refresh: false)
  clients = Array(client || config.clients)

  clients.map do |current_client|
    indices = Array(index_name || write_indices(current_client))

    commands = indices.flat_map { |index| yield(index) }

    current_client.bulk(body: commands, refresh: refresh).tap do |result|
      check_errors!(result)
    end
  end
end
ignorable_error?(error) click to toggle source
# File lib/zelastic/indexer.rb, line 125
def ignorable_error?(error)
  # rubocop:disable Layout/LineLength
  regexp =
    if config.type?
      /^\[#{config.type}\]\[\d+\]: version conflict, current version \[\d+\] is higher or equal to the one provided \[\d+\]$/
    else
      /^\[\d+\]: version conflict, current version \[\d+\] is higher or equal to the one provided \[\d+\]$/
    end
  # rubocop:enable Layout/LineLength
  error['type'] == 'version_conflict_engine_exception' &&
    error['reason'] =~ regexp
end
index_command(index:, version:, record:) click to toggle source
# File lib/zelastic/indexer.rb, line 77
def index_command(index:, version:, record:)
  version_params =
    if config.type?
      { _version: version, _version_type: :external, _type: config.type }
    else
      { version: version, version_type: :external }
    end

  {
    index: {
      _index: index,
      _id: record.id,
      data: config.index_data(record)
    }.merge(version_params)
  }
end
write_indices(client) click to toggle source
# File lib/zelastic/indexer.rb, line 73
def write_indices(client)
  client.indices.get_alias(name: config.write_alias).keys
end