class ODBA::Cache
Constants
- COUNT_FILE
- LOCK_FILE
File lock exclusive control between processes, not threads, to create safely a new odba_id Sometimes several update jobs (processes) to the same database at the same time
Attributes
Public Instance Methods
Returns all objects designated by bulk_fetch_ids and registers odba_caller for each of them. Objects which are not yet loaded are loaded from ODBA#storage.
# File lib/odba/cache.rb, line 45 def bulk_fetch(bulk_fetch_ids, odba_caller) instances = [] loaded_ids = [] bulk_fetch_ids.each { |id| if(entry = fetch_cache_entry(id)) entry.odba_add_reference(odba_caller) instances.push(entry.odba_object) loaded_ids.push(id) end } bulk_fetch_ids -= loaded_ids unless(bulk_fetch_ids.empty?) rows = ODBA.storage.bulk_restore(bulk_fetch_ids) instances += bulk_restore(rows, odba_caller) end instances end
overrides the ODBA_PREFETCH constant and @odba_prefetch instance variable in Persistable
. Use this if a secondary client is more memory-bound than performance-bound.
# File lib/odba/cache.rb, line 121 def clean_prefetched(flag=true) if(@clean_prefetched = flag) clean end end
Get number of instances of a class
# File lib/odba/cache.rb, line 221 def count(klass) ODBA.storage.extent_count(klass) end
Creates or recreates automatically defined indices
# File lib/odba/cache.rb, line 131 def create_deferred_indices(drop_existing = false) @deferred_indices.each { |definition| name = definition.index_name if(drop_existing && self.indices.include?(name)) drop_index(name) end unless(self.indices.include?(name)) index = create_index(definition) if(index.target_klass.respond_to?(:odba_extent)) index.fill(index.target_klass.odba_extent) end end } end
Creates a new index according to IndexDefinition
# File lib/odba/cache.rb, line 146 def create_index(index_definition, origin_module=Object) transaction { klass = if(index_definition.fulltext) FulltextIndex elsif(index_definition.resolve_search_term.is_a?(Hash)) ConditionIndex else Index end index = klass.new(index_definition, origin_module) indices.store(index_definition.index_name, index) indices.odba_store_unsaved index } end
Permanently deletes object from the database and deconnects all connected Persistables
# File lib/odba/cache.rb, line 163 def delete(odba_object) odba_id = odba_object.odba_id name = odba_object.odba_name odba_object.odba_notify_observers(:delete, odba_id, odba_object.object_id) rows = ODBA.storage.retrieve_connected_objects(odba_id) rows.each { |row| id = row.first # Self-Referencing objects don't have to be resaved begin if(connected_object = fetch(id, nil)) connected_object.odba_cut_connection(odba_object) connected_object.odba_isolated_store end rescue OdbaError warn "OdbaError ### deleting #{odba_object.class}:#{odba_id}" warn " ### while looking for connected object #{id}" end } delete_cache_entry(odba_id) delete_cache_entry(name) ODBA.storage.delete_persistable(odba_id) delete_index_element(odba_object) odba_object end
# File lib/odba/cache.rb, line 187 def delete_cache_entry(key) @cache_mutex.synchronize { @fetched.delete(key) @prefetched.delete(key) } end
Permanently deletes the index named index_name
# File lib/odba/cache.rb, line 200 def drop_index(index_name) transaction { ODBA.storage.drop_index(index_name) self.delete(self.indices[index_name]) } end
Queue an index for creation by setup
# File lib/odba/cache.rb, line 213 def ensure_index_deferred(index_definition) @deferred_indices.push(index_definition) end
Get all instances of a class (- a limited extent)
# File lib/odba/cache.rb, line 217 def extent(klass, odba_caller=nil) bulk_fetch(ODBA.storage.extent_ids(klass), odba_caller) end
Fetch a Persistable
identified by odba_id. Registers odba_caller with the CacheEntry
. Loads the Persistable
if it is not already loaded.
# File lib/odba/cache.rb, line 226 def fetch(odba_id, odba_caller=nil) fetch_or_do(odba_id, odba_caller) { load_object(odba_id, odba_caller) } end
# File lib/odba/cache.rb, line 338 def fill_index(index_name, targets) self.indices[index_name].fill(targets) end
Checks wether the object identified by odba_id has been loaded.
# File lib/odba/cache.rb, line 342 def include?(odba_id) @fetched.include?(odba_id) || @prefetched.include?(odba_id) end
# File lib/odba/cache.rb, line 345 def index_keys(index_name, length=nil) index = indices.fetch(index_name) index.keys(length) end
# File lib/odba/cache.rb, line 349 def index_matches(index_name, substring, limit=nil, offset=0) index = indices.fetch(index_name) index.matches substring, limit, offset end
Returns a Hash-table containing all stored indices.
# File lib/odba/cache.rb, line 354 def indices @indices ||= fetch_named('__cache_server_indices__', self) { {} } end
# File lib/odba/cache.rb, line 359 def invalidate(odba_id) ## when finalizers are run, no other threads will be scheduled, # therefore we don't need to @cache_mutex.synchronize @fetched.delete odba_id @prefetched.delete odba_id end
# File lib/odba/cache.rb, line 365 def invalidate!(*odba_ids) odba_ids.each do |odba_id| if entry = fetch_cache_entry(odba_id) entry.odba_retire :force => true end invalidate odba_id end end
# File lib/odba/cache.rb, line 377 def lock(dbname) lock_file = LOCK_FILE + "." + dbname open(lock_file, 'a') do |st| st.flock(File::LOCK_EX) yield st.flock(File::LOCK_UN) end end
# File lib/odba/cache.rb, line 385 def new_id(dbname, odba_storage) count_file = COUNT_FILE + "." + dbname count = nil lock(dbname) do unless File.exist?(count_file) open(count_file, "w") do |out| out.print odba_storage.max_id end end count = File.read(count_file).to_i count += 1 open(count_file, "w") do |out| out.print count end odba_storage.update_max_id(count) end count end
Returns the next valid odba_id
# File lib/odba/cache.rb, line 404 def next_id if @file_lock dbname = ODBA.storage.instance_variable_get('@dbi').dbi_args.first.split(/:/).last id = new_id(dbname, ODBA.storage) else id = ODBA.storage.next_id end @peers.each do |peer| peer.reserve_next_id id rescue DRb::DRbError end id rescue OdbaDuplicateIdError retry end
Use this to load all prefetchable Persistables from the db at once
# File lib/odba/cache.rb, line 419 def prefetch bulk_restore(ODBA.storage.restore_prefetchable) end
prints loading statistics to $stdout
# File lib/odba/cache.rb, line 423 def print_stats fmh = " %-20s | %10s | %5s | %6s | %6s | %6s | %-20s\n" fmt = " %-20s | %10.3f | %5i | %6.3f | %6.3f | %6.3f | %s\n" head = sprintf(fmh, "class", "total", "count", "min", "max", "avg", "callers") line = "-" * head.length puts line print head puts line @loading_stats.sort_by { |key, val| val[:total_time] }.reverse.each { |key, val| key = key.to_s if(key.length > 20) key = key[-20,20] end avg = val[:total_time] / val[:count] printf(fmt, key, val[:total_time], val[:count], val[:times].min, val[:times].max, avg, val[:callers].join(',')) } puts line $stdout.flush end
Register a peer that has access to the same DB backend
# File lib/odba/cache.rb, line 448 def register_peer peer @peers.push(peer).uniq! end
Reserve an id with all registered peers
# File lib/odba/cache.rb, line 452 def reserve_next_id id ODBA.storage.reserve_next_id id end
Clears the loading statistics
# File lib/odba/cache.rb, line 456 def reset_stats @loading_stats.clear end
Find objects in an index
# File lib/odba/cache.rb, line 460 def retrieve_from_index(index_name, search_term, meta=nil) index = indices.fetch(index_name) ids = index.fetch_ids(search_term, meta) if meta.respond_to?(:error_limit) && (limit = meta.error_limit) \ && (size = ids.size) > limit.to_i error = OdbaResultLimitError.new error.limit = limit error.size = size error.index = index_name error.search_term = search_term error.meta = meta raise error end bulk_fetch(ids, nil) end
Create necessary DB-Structure / other storage-setup
# File lib/odba/cache.rb, line 476 def setup ODBA.storage.setup self.indices.each_key { |index_name| ODBA.storage.ensure_target_id_index(index_name) } create_deferred_indices nil end
Returns the total number of cached objects
# File lib/odba/cache.rb, line 485 def size @prefetched.size + @fetched.size end
Store a Persistable
object in the database
# File lib/odba/cache.rb, line 503 def store(object) odba_id = object.odba_id name = object.odba_name object.odba_notify_observers(:store, odba_id, object.object_id) if(ids = Thread.current[:txids]) ids.unshift([odba_id,name]) end ## get target_ids before anything else target_ids = object.odba_target_ids changes = store_collection_elements(object) prefetchable = object.odba_prefetch? dump = object.odba_isolated_dump ODBA.storage.store(odba_id, dump, name, prefetchable, object.class) store_object_connections(odba_id, target_ids) update_references(target_ids, object) object = store_cache_entry(odba_id, object, name) update_indices(object) @peers.each do |peer| peer.invalidate! odba_id rescue DRb::DRbError end object end
Executes the block in a transaction. If the transaction fails, all affected Persistable
objects are reloaded from the db (which by then has also performed a rollback). Rollback is quite inefficient at this time.
# File lib/odba/cache.rb, line 562 def transaction(&block) Thread.current[:txids] = [] ODBA.storage.transaction(&block) rescue Exception => excp transaction_rollback raise excp ensure Thread.current[:txids] = nil end
Unregister a peer
# File lib/odba/cache.rb, line 588 def unregister_peer peer @peers.delete peer end
Private Instance Methods
# File lib/odba/cache.rb, line 606 def load_object(odba_id, odba_caller) start = Time.now if(@debug) dump = ODBA.storage.restore(odba_id) odba_obj = restore_object(odba_id, dump, odba_caller) return odba_obj unless(@debug) stats = (@loading_stats[odba_obj.class] ||= { :count => 0, :times => [], :total_time => 0, :callers => [], }) stats[:count] += 1 time = Time.now - start stats[:times].push(time) stats[:total_time] += time stats[:callers].push(odba_caller.class).uniq! if(time > 2) names = [] odba_caller.instance_variables.each { |name| if(odba_caller.instance_variable_get(name).odba_id == odba_id) names.push(name) end } printf("long load-time (%4.2fs) for odba_id %i: %s#%s\n", time, odba_id, odba_caller, names.join(',')) end odba_obj end
# File lib/odba/cache.rb, line 631 def restore(dump) odba_obj = ODBA.marshaller.load(dump) unless(odba_obj.is_a?(Persistable)) odba_obj.extend(Persistable) end collection = fetch_collection(odba_obj) odba_obj.odba_restore(collection) [odba_obj, collection] end
# File lib/odba/cache.rb, line 640 def restore_object(odba_id, dump, odba_caller) if(dump.nil?) raise OdbaError, "Unknown odba_id #{odba_id}" end fetch_or_restore(odba_id, dump, odba_caller) end