class Zelastic::Indexer
Attributes
config[R]
Public Class Methods
new(config)
click to toggle source
# File lib/zelastic/indexer.rb, line 16 def initialize(config) @config = config end
Public Instance Methods
delete_by_id(id)
click to toggle source
# File lib/zelastic/indexer.rb, line 37 def delete_by_id(id) delete_by_ids([id]) end
delete_by_ids(ids)
click to toggle source
# File lib/zelastic/indexer.rb, line 41 def delete_by_ids(ids) logger.info('ES: Deleting batch records') execute_bulk do |index_name| ids.map do |id| delete_params = { _index: index_name, _id: id } delete_params[:_type] = config.type if config.type? { delete: delete_params } end end end
delete_by_query(query)
click to toggle source
# File lib/zelastic/indexer.rb, line 54 def delete_by_query(query) logger.info('ES: Deleting batch records') config.clients.map do |client| client.delete_by_query(index: config.write_alias, body: { query: query }) end end
index_batch(batch, client: nil, index_name: nil, refresh: false)
click to toggle source
# File lib/zelastic/indexer.rb, line 20 def index_batch(batch, client: nil, index_name: nil, refresh: false) version = current_version execute_bulk(client: client, index_name: index_name, refresh: refresh) do |index| batch.map do |record| index_command(index: index, version: version, record: record) end end end
index_record(record, refresh: false)
click to toggle source
# File lib/zelastic/indexer.rb, line 29 def index_record(record, refresh: false) version = current_version execute_bulk(refresh: refresh) do |index_name| [index_command(index: index_name, version: version, record: record)] end end
Private Instance Methods
check_errors!(result)
click to toggle source
# File lib/zelastic/indexer.rb, line 108 def check_errors!(result) return false unless result['errors'] errors = result['items'] .map { |item| item['error'] || item.fetch('index', {})['error'] } .compact ignorable_errors, important_errors = errors .partition { |error| ignorable_error?(error) } logger.warn("Ignoring #{ignorable_errors.count} version conflicts") if ignorable_errors.any? return unless important_errors.any? raise IndexingError, important_errors end
current_version()
click to toggle source
# File lib/zelastic/indexer.rb, line 67 def current_version config.data_source.connection .select_one('SELECT txid_snapshot_xmax(txid_current_snapshot()) as xmax') .fetch('xmax') end
execute_bulk(client: nil, index_name: nil, refresh: false) { |index| ... }
click to toggle source
# File lib/zelastic/indexer.rb, line 94 def execute_bulk(client: nil, index_name: nil, refresh: false) clients = Array(client || config.clients) clients.map do |current_client| indices = Array(index_name || write_indices(current_client)) commands = indices.flat_map { |index| yield(index) } current_client.bulk(body: commands, refresh: refresh).tap do |result| check_errors!(result) end end end
ignorable_error?(error)
click to toggle source
# File lib/zelastic/indexer.rb, line 125 def ignorable_error?(error) # rubocop:disable Layout/LineLength regexp = if config.type? /^\[#{config.type}\]\[\d+\]: version conflict, current version \[\d+\] is higher or equal to the one provided \[\d+\]$/ else /^\[\d+\]: version conflict, current version \[\d+\] is higher or equal to the one provided \[\d+\]$/ end # rubocop:enable Layout/LineLength error['type'] == 'version_conflict_engine_exception' && error['reason'] =~ regexp end
index_command(index:, version:, record:)
click to toggle source
# File lib/zelastic/indexer.rb, line 77 def index_command(index:, version:, record:) version_params = if config.type? { _version: version, _version_type: :external, _type: config.type } else { version: version, version_type: :external } end { index: { _index: index, _id: record.id, data: config.index_data(record) }.merge(version_params) } end
write_indices(client)
click to toggle source
# File lib/zelastic/indexer.rb, line 73 def write_indices(client) client.indices.get_alias(name: config.write_alias).keys end