module InventoryRefresh::SaveCollection::Saver::PartialUpsertHelper
Private Instance Methods
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 234 def assign_partial_row_version_attributes!(full_row_version_attr, partial_row_version_attr, hash, all_attribute_keys) hash[comparable_max_column_name(all_attribute_keys)] = hash.delete(full_row_version_attr) return if hash[partial_row_version_attr].blank? # Lets clean to only what we save, since when we build the skeletal object, we can set more hash[partial_row_version_attr] = hash[partial_row_version_attr].slice(*all_attribute_keys) end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 142 def comparable_max_column_name(all_attribute_keys) if supports_remote_data_timestamp?(all_attribute_keys) :resource_timestamps_max elsif supports_remote_data_version?(all_attribute_keys) :resource_counters_max end end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 203 def comparable_timestamp(timestamp) # Lets cast all timestamps to to_f, rounding the time comparing precision to miliseconds, that should be # enough, since we are the ones setting the record version in collector. Otherwise we will have hard time with # doing equality, since the value changes going through DB (DB. cuts it at 5 decimal places) if timestamp.kind_of?(String) Time.use_zone('UTC') { Time.zone.parse(timestamp) }.to_f.round(3) elsif timestamp.kind_of?(Time) timestamp.in_time_zone('UTC').to_f.round(3) else timestamp end end
Saves partial records using upsert, taking records from skeletal_primary_index. This is used both for skeletal precreate as well as for saving partial rows.
@param all_attribute_keys [Set] Superset of all keys of all records being saved
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 35 def create_or_update_partial_records(all_attribute_keys) skeletal_inventory_objects_index, skeletal_attributes_index = load_partial_attributes(all_attribute_keys) indexed_inventory_objects, hashes = process_partial_data(skeletal_inventory_objects_index, skeletal_attributes_index, all_attribute_keys) return if hashes.blank? processed_record_refs = skeletal_precreate!(hashes, all_attribute_keys) hashes_for_update = hashes_for_update(hashes, processed_record_refs) partial_updates!(hashes_for_update, all_attribute_keys) if inventory_collection.dependees.present? # We need to get primary keys of the created objects, but only if there are dependees that would use them map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, nil, :on_conflict => :do_nothing) end end
Batch upserts 1 data column of the row, plus the internal columns
@param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row @param hashes [Array<InventoryRefresh::InventoryObject>] Array of InventoryObject
objects we will be inserting
into the DB
@param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values
are :do_update, :do_nothing, nil
@param column_name [Symbol] Name of the data column we will be upserting
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 197 def create_partial!(all_attribute_keys, hashes, on_conflict: nil, column_name: nil) get_connection.execute( build_insert_query(all_attribute_keys, hashes, :on_conflict => on_conflict, :mode => :partial, :column_name => column_name) ) end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 132 def fill_comparables_max!(batch, all_attribute_keys, column_name) comparables_max_name = comparable_max_column_name(all_attribute_keys) # We need to set correct timestamps_max for this particular attribute, based on what is in timestamps batch.each do |x| next unless x[:__non_serialized_versions][column_name] x[comparables_max_name] = x[:__non_serialized_versions][column_name] end end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 83 def hashes_for_update(hashes, processed_record_refs) indexed_hashes = hashes.each_with_object({}) { |hash, obj| obj[unique_index_columns.map { |x| hash[x] }] = hash } indexed_hashes.except!(*processed_record_refs) hashes_for_update = indexed_hashes.values # We need only skeletal records with timestamp. We can't save the ones without timestamp, because e.g. skeletal # precreate would be updating records with default values, that are not correct. hashes_for_update.select { |x| x[:resource_timestamps_max] || x[:resource_counters_max] } end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 57 def load_partial_attributes(all_attribute_keys) skeletal_attributes_index = {} skeletal_inventory_objects_index = {} inventory_collection.skeletal_primary_index.each_value do |inventory_object| attributes = inventory_object.class.attributes_with_keys(inventory_object.data, inventory_collection, all_attribute_keys, inventory_object) index = build_stringified_reference(attributes, unique_index_keys) skeletal_attributes_index[index] = attributes skeletal_inventory_objects_index[index] = inventory_object end if supports_remote_data_timestamp?(all_attribute_keys) all_attribute_keys << :resource_timestamps all_attribute_keys << :resource_timestamps_max elsif supports_remote_data_version?(all_attribute_keys) all_attribute_keys << :resource_counters all_attribute_keys << :resource_counters_max end # We cannot set the resource_version doing partial update all_attribute_keys.delete(resource_version_column) return skeletal_inventory_objects_index, skeletal_attributes_index end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 121 def partial_update!(batch, all_attribute_keys, column_name, results) fill_comparables_max!(batch, all_attribute_keys, column_name) result = create_partial!((inventory_collection.base_columns + [column_name]).to_set & all_attribute_keys, batch, :on_conflict => :do_update, :column_name => column_name) result.each do |res| results[res["id"]] = res end end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 108 def partial_updates!(hashes, all_attribute_keys) results = {} (all_attribute_keys - inventory_collection.base_columns).each do |column_name| filtered = hashes.select { |x| x.key?(column_name) } filtered.each_slice(batch_size_for_persisting) do |batch| partial_update!(batch, all_attribute_keys, column_name, results) end end inventory_collection.store_updated_records(results.values) end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 167 def prepare_partial_hash(hash, all_attribute_keys, create_time) # Partial create or update must never set a timestamp for the whole row timestamps = if supports_remote_data_timestamp?(all_attribute_keys) && supports_column?(:resource_timestamps_max) assign_partial_row_version_attributes!(:resource_timestamp, :resource_timestamps, hash, all_attribute_keys) elsif supports_remote_data_version?(all_attribute_keys) && supports_column?(:resource_counters_max) assign_partial_row_version_attributes!(:resource_counter, :resource_counters, hash, all_attribute_keys) end # Transform hash to DB format hash = transform_to_hash!(all_attribute_keys, hash) assign_attributes_for_create!(hash, create_time) hash[:__non_serialized_versions] = timestamps # store non serialized timestamps for the partial updates hash end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 150 def process_partial_data(skeletal_inventory_objects_index, skeletal_attributes_index, all_attribute_keys) indexed_inventory_objects = {} hashes = [] create_time = time_now skeletal_inventory_objects_index.each do |index, inventory_object| hash = prepare_partial_hash(skeletal_attributes_index.delete(index), all_attribute_keys, create_time) next unless assert_referential_integrity(hash) hashes << hash # Index on Unique Columns values, so we can easily fill in the :id later indexed_inventory_objects[unique_index_columns.map { |x| hash[x] }] = inventory_object end return indexed_inventory_objects, hashes end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 93 def skeletal_precreate!(hashes, all_attribute_keys) processed_record_refs = [] # First, lets try to create all partial records hashes.each_slice(batch_size_for_persisting) do |batch| result = create_partial!(all_attribute_keys, batch, :on_conflict => :do_nothing) inventory_collection.store_created_records(result) # Store refs of created records, so we can ignore them for update result.each { |hash| processed_record_refs << unique_index_columns.map { |x| hash[x.to_s] } } end processed_record_refs end
Taking result from update or upsert of the row. The records that were not saved will be turned into skeletal records and we will save them attribute by attribute.
@param hash [Hash{String => InventoryObject}>] Hash with indexed data we want to save @param result [Array<Hash>] Result from the DB containing the data that were actually saved @param all_unique_columns [Boolean] True if index is consisted from all columns of the unique index. False if
index is just made from manager_ref turned in DB column names.
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 13 def skeletonize_ignored_records!(hash, result, all_unique_columns: false) updated = if all_unique_columns result.map { |x| unique_index_columns_to_s.map { |key| x[key] } } else result.map { |x| db_columns_index(x, :pure_sql => true) } end updated.each { |x| hash.delete(x) } # Now lets skeletonize all inventory_objects that were not saved by update or upsert. Old rows that can't be # saved are not being sent here. We have only rows that are new, but become old as we send the query (so other # parallel process saved the data in the meantime). Or if some attributes are newer than the whole row # being sent. hash.each_key do |db_index| inventory_collection.skeletal_primary_index.skeletonize_primary_index(hash[db_index].manager_uuid) end end
# File lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb, line 217 def skeletonize_or_skip_record(record_version, hash_version, record_versions_max, inventory_object) record_version = comparable_timestamp(record_version) record_versions_max = comparable_timestamp(record_versions_max) if record_versions_max hash_version = comparable_timestamp(hash_version) # Skip updating this record, because it is old return true if record_version && hash_version && record_version >= hash_version # Some column has bigger version than the whole row, we need to store the row partially if record_versions_max && hash_version && record_versions_max > hash_version inventory_collection.skeletal_primary_index.skeletonize_primary_index(inventory_object.manager_uuid) return true end false end