class Longleaf::SequelIndexDriver

Driver for interacting with RDBM based metadata index using the Sequel ORM gem. Users must create the database and credentials for connecting to it in advance, if using a database application that requires creation of databases (ie, not sqlite). The default database name is 'longleaf_metadata_index' but may be overridden.

See the Sequel documentation for details about accepted connection parameters: github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc

Public Class Methods

new(app_config, adapter, conn_details, page_size: nil) click to toggle source

Initialize the index driver

@param app_config [ApplicationConfigManager] the application configuration manager @param adapter [String] name of the database adapter to use. @param conn_details Details about the configuration and connection to the database used for the index.

If a string is provided, it will be used as the connection URL and must identify the adapter.
If a hash is provided, it used as the parameters for the database connection.

@param page_size [Integer] number of results to retrieve per query when getting candidates

# File lib/longleaf/indexing/sequel_index_driver.rb, line 33
def initialize(app_config, adapter, conn_details, page_size: nil)
  Sequel.default_timezone = :utc
  @app_config = app_config
  @adapter = adapter
  @conn_details = conn_details
  # Digest of the app config file so we can tell if it changes
  @config_md5 = app_config.config_md5
  @page_size = page_size.nil? || page_size <= 0 ? DEFAULT_PAGE_SIZE : page_size

  if @conn_details.is_a?(Hash)
    # Add in the adapter name
    @conn_details['adapter'] = adapter unless @conn_details.key?('adapter')
    # Add in default database name if none was specified
    @conn_details['database'] = INDEX_DB_NAME unless @conn_details.key?('database')
  end
end

Public Instance Methods

clear_index(older_than = nil) click to toggle source

Remove all entries from the index @param older_than [Time] Optional. If provided, only entries that have not been indexed

since before the provided time will be deleted.
# File lib/longleaf/indexing/sequel_index_driver.rb, line 153
def clear_index(older_than = nil)
  if older_than.nil?
    preserve_tbl.delete
  else
    older_than_timestamp = older_than.utc.strftime(TIMESTAMP_FORMAT)
    preserve_tbl.where { updated < older_than_timestamp }.delete
  end
end
delay_until_timestamp(md_rec) click to toggle source

@return The first failure timestamp for any service, or nil if there were none.

# File lib/longleaf/indexing/sequel_index_driver.rb, line 126
def delay_until_timestamp(md_rec)
  md_rec.list_services.each do |service_name|
    service_rec = md_rec.service(service_name)
    return service_rec.failure_timestamp unless service_rec.failure_timestamp.nil?
  end
  # return lowest possible date
  return minimum_timestamp
end
each_registered_path(file_selector, older_than: nil, &block) click to toggle source

Calls the provided block once per each registered file path registered. Must be passed a block. @param file_selector [FileSelector] selector for what paths to search for files @param older_than [Time] Optional. If provided, only files that have not been

indexed since before this timestamp will be returned.
# File lib/longleaf/indexing/sequel_index_driver.rb, line 250
def each_registered_path(file_selector, older_than: nil, &block)
  dataset = add_path_restrictions(registered_dataset, file_selector)
      .select(:file_path)
  if !older_than.nil?
    older_than_timestamp = older_than.utc.strftime(TIMESTAMP_FORMAT)
    dataset = dataset.where { updated < older_than_timestamp }
  end
  # Yield to the provided block once per row return
  dataset.paged_each(:rows_per_fetch => @page_size) do |row|
    block.call(row[:file_path])
  end
end
first_service_execution_timestamp(expected_services, md_rec) click to toggle source

Find the earliest service execution time for any services expected to be run for the specified file.

@param expected_services [Array] list of ServiceDefinition objects expected for specified file. @param md_rec [MetadataRecord] metadata record for the file being evaluated @return The timestamp of the earliest service execution time for the file described by md_rec, in iso8601 format.

Returns nil if no services are expected all services have already run and do not have a next occurrence, or
the file is deregistered.
# File lib/longleaf/indexing/sequel_index_driver.rb, line 105
def first_service_execution_timestamp(expected_services, md_rec)
  current_time = Time.now.utc.iso8601(3)
  if md_rec.deregistered?
    return nil
  end

  service_times = Array.new

  present_services = md_rec.list_services

  expected_services.each do |service_def|
    service_name = service_def.name

    next_run = ServiceDateHelper.next_run_needed(md_rec, service_def)
    service_times << next_run unless next_run.nil?
  end
  # Return the lowest service execution time
  service_times.min
end
index(file_rec) click to toggle source

Index the provided file_rec and its metadata

@param file_rec [FileRecord] file record to index

# File lib/longleaf/indexing/sequel_index_driver.rb, line 59
def index(file_rec)
  file_path = file_rec.path
  md_rec = file_rec.metadata_record
  storage_loc = file_rec.storage_location
  service_manager = @app_config.service_manager

  # Produce a list of service definitions which should apply to the file
  expected_services = service_manager.list_service_definitions(
      location: storage_loc.name)

  first_timestamp = first_service_execution_timestamp(expected_services, md_rec)
  delay_until_timestamp = delay_until_timestamp(md_rec)

  first_timestamp = convert_iso8601_to_timestamp(first_timestamp)
  delay_until_timestamp = convert_iso8601_to_timestamp(delay_until_timestamp)
  now_stamp = Time.now.utc.strftime(TIMESTAMP_FORMAT)

  if @adapter == :mysql || @adapter == :mysql2
    preserve_tbl.on_duplicate_key_update
        .insert(file_path: file_path,
            storage_location: storage_loc.name,
            service_time: first_timestamp,
            delay_until_time: delay_until_timestamp,
            updated: now_stamp)
  else
    preserve_tbl.insert_conflict(target: :file_path,
        update: {
            storage_location: storage_loc.name,
            service_time: first_timestamp,
            delay_until_time: delay_until_timestamp,
            updated: now_stamp } )
        .insert(file_path: file_path,
            storage_location: storage_loc.name,
            service_time: first_timestamp,
            delay_until_time: delay_until_timestamp,
            updated: now_stamp)
  end
end
is_stale?() click to toggle source

Returns true if the application configuration does not match the configuration used for the last reindex.

# File lib/longleaf/indexing/sequel_index_driver.rb, line 52
def is_stale?
  db_conn[INDEX_STATE_TBL].where(config_md5: @config_md5).count == 0
end
paths_with_stale_services(file_selector, stale_datetime) click to toggle source

Retrieves page of file paths which have one or more services which need to run. @param file_selector [FileSelector] selector for what paths to search for files @param stale_datetime [DateTime] find file_paths with services needing to be run before this value @return [Array] array of file paths that need one or more services run.

# File lib/longleaf/indexing/sequel_index_driver.rb, line 220
def paths_with_stale_services(file_selector, stale_datetime)
  if @preserve_dataset.nil?
    @preserve_dataset = db_conn
        .from(PRESERVE_TBL)
        .exclude(service_time: nil)
        .limit(@page_size)
        .order(Sequel.asc(:service_time))
  end

  # retrieve and return a page of results
  ds = add_path_restrictions(@preserve_dataset, file_selector)
      .where { service_time <= stale_datetime }
      .where { delay_until_time < stale_datetime }
      .select_map(:file_path)
end
registered_paths(file_selector) click to toggle source

Retrieves a page of paths for registered files. @param file_selector [FileSelector] selector for what paths to search for files @return [Array] array of file paths that are registered

# File lib/longleaf/indexing/sequel_index_driver.rb, line 239
def registered_paths(file_selector)
  # retrieve and return a page of results
  add_path_restrictions(registered_dataset, file_selector)
      .select_map(:file_path)
end
remove(remove_me) click to toggle source

Remove an entry from the index @param remove_me The record to remove from the index. May be a FileRecord or a String.

# File lib/longleaf/indexing/sequel_index_driver.rb, line 137
def remove(remove_me)
  if remove_me.is_a?(FileRecord)
    path = remove_me.path
  else
    path = remove_me
  end

  result = preserve_tbl.where(file_path: path).delete
  if result == 0
    logger.warn("Could not remove #{path} from the index, path was not present.")
  end
end
setup_index() click to toggle source

Initialize the index's database using the provided configuration

# File lib/longleaf/indexing/sequel_index_driver.rb, line 163
def setup_index
  # Create the table for tracking when files will need preservation services run on them.
  case @adapter
  when :mysql, :mysql2
    # mysql does not support 'text' fields as primary keys
    db_conn.create_table!(PRESERVE_TBL) do
      String :file_path, primary_key: true, size: 768
      column :storage_location, 'varchar(128)'
      column :service_time, 'timestamp(3)', { :null => true }
      column :delay_until_time, 'timestamp(3)'
      column :updated, 'timestamp(3)'
    end
  else
    db_conn.create_table!(PRESERVE_TBL) do
      String :file_path, primary_key: true, text: true
      column :storage_location, 'varchar(128)'
      column :service_time, 'timestamp(3)', { :null => true }
      column :delay_until_time, 'timestamp(3)'
      column :updated, 'timestamp(3)'
    end
  end

  # Setup database indexes
  case @adapter
  when :postgres
    db_conn.run("CREATE INDEX service_times_file_path_text_index ON preserve_service_times (file_path text_pattern_ops)")
  when :sqlite, :amalgalite
    db_conn.run("CREATE INDEX service_times_file_path_text_index ON preserve_service_times (file_path collate nocase)")
  end
  db_conn.run("CREATE INDEX service_times_storage_location_index ON preserve_service_times (storage_location)")

  # Create table for tracking the state of the index
  db_conn.create_table!(INDEX_STATE_TBL) do
    String :config_md5
    DateTime :last_reindexed
    String :longleaf_version
  end

  # Prepopulate the index state information
  update_index_state
end
update_index_state() click to toggle source

Updates the state information for the index to indicate that the index has been refreshed or is in sync with the application's configuration.

# File lib/longleaf/indexing/sequel_index_driver.rb, line 207
def update_index_state
  index_state_tbl = db_conn[INDEX_STATE_TBL]
  index_state_tbl.delete
  index_state_tbl.insert(
      config_md5: @config_md5,
      last_reindexed: Time.now.utc,
      longleaf_version: Longleaf::VERSION)
end

Private Instance Methods

add_path_restrictions(dataset, file_selector) click to toggle source
# File lib/longleaf/indexing/sequel_index_driver.rb, line 274
def add_path_restrictions(dataset, file_selector)
  if file_selector.specificity == FileSelector::SPECIFICITY_STORAGE_LOCATION
    dataset.where(storage_location: file_selector.storage_locations)
  else
    # Reformat all selected paths into LIKE partial string matches
    path_conds = file_selector.target_paths.map { |path| path.end_with?('/') ? path + '%' : path }
    dataset.where(Sequel.like(:file_path, *path_conds))
  end
end
convert_iso8601_to_timestamp(iso8601) click to toggle source
# File lib/longleaf/indexing/sequel_index_driver.rb, line 284
def convert_iso8601_to_timestamp(iso8601)
  return nil if iso8601.nil?
  Time.iso8601(iso8601).strftime(TIMESTAMP_FORMAT)
end
db_conn() click to toggle source
# File lib/longleaf/indexing/sequel_index_driver.rb, line 264
def db_conn
  @connection = Sequel.connect(@conn_details) if @connection.nil?
  @connection
end
minimum_timestamp() click to toggle source
# File lib/longleaf/indexing/sequel_index_driver.rb, line 289
def minimum_timestamp
  if @min_timestamp.nil?
    @min_timestamp = ServiceDateHelper.formatted_timestamp(Time.at(0).utc)
  end
  @min_timestamp
end
preserve_tbl() click to toggle source
# File lib/longleaf/indexing/sequel_index_driver.rb, line 269
def preserve_tbl
  @preserve_tbl = db_conn[PRESERVE_TBL] if @preserve_tbl.nil?
  @preserve_tbl
end
registered_dataset() click to toggle source
# File lib/longleaf/indexing/sequel_index_driver.rb, line 296
def registered_dataset
  if @registered_dataset.nil?
    @registered_dataset = db_conn
        .from(PRESERVE_TBL)
        .limit(@page_size)
        .order(Sequel.asc(:service_time))
  end
  @registered_dataset
end