class Longleaf::S3ReplicationService

Preservation service which performs replication of a file to one or more s3 destinations.

The service definition must contain one or more destinations, specified with the “to” property. These destinations must be either a known s3 storage location. The s3 client configuration is controlled by the storage location.

Optional service configuration properties:

Attributes

collision_policy[R]

Public Class Methods

new(service_def, app_manager) click to toggle source

Initialize a S3ReplicationService from the given service definition

@param service_def [ServiceDefinition] the configuration for this service @param app_manager [ApplicationConfigManager] the application configuration

# File lib/longleaf/preservation_services/s3_replication_service.rb, line 31
def initialize(service_def, app_manager)
  @service_def = service_def
  @app_manager = app_manager

  # Set and validate the replica collision policy
  @collision_policy = @service_def.properties[SF::COLLISION_PROPERTY] || SF::DEFAULT_COLLISION_POLICY
  if !SF::VALID_COLLISION_POLICIES.include?(@collision_policy)
    raise ArgumentError.new("Service #{service_def.name} received invalid #{SF::COLLISION_PROPERTY}" \
        + " value #{@collision_policy}")
  end

  # Store and validate destinations
  replicate_to = @service_def.properties[SF::REPLICATE_TO]
  if replicate_to.nil? || replicate_to.empty?
    raise ArgumentError.new("Service #{service_def.name} must provide one or more replication destinations.")
  end
  replicate_to = [replicate_to] if replicate_to.is_a?(String)

  loc_manager = app_manager.location_manager
  # Build list of destinations, translating to storage locations when relevant
  @destinations = Array.new
  replicate_to.each do |dest|
    if loc_manager.locations.key?(dest)
      location = loc_manager.locations[dest]
      if location.type != ST::S3_STORAGE_TYPE
        raise ArgumentError.new(
            "Service #{service_def.name} specifies destination #{dest} which is not of type 's3'")
      end
      @destinations << loc_manager.locations[dest]
    else
      raise ArgumentError.new("Service #{service_def.name} specifies unknown storage location '#{dest}'" \
          + " as a replication destination")
    end
  end
end

Public Instance Methods

is_applicable?(event) click to toggle source

Determine if this service is applicable for the provided event, given the configured service definition

@param event [String] name of the event @return [Boolean] returns true if this service is applicable for the provided event

# File lib/longleaf/preservation_services/s3_replication_service.rb, line 112
def is_applicable?(event)
  case event
  when EventNames::PRESERVE
    true
  else
    false
  end
end
perform(file_rec, event) click to toggle source

During a replication event, perform replication of the specified file to all configured destinations as necessary.

@param file_rec [FileRecord] record representing the file to perform the service on. @param event [String] name of the event this service is being invoked by. @raise [PreservationServiceError] if the rsync replication fails

# File lib/longleaf/preservation_services/s3_replication_service.rb, line 73
def perform(file_rec, event)
  if file_rec.storage_location.type == ST::FILESYSTEM_STORAGE_TYPE
    replicate_from_fs(file_rec)
  else
    raise PreservationServiceError.new("Replication from storage location of type " \
        + "#{file_rec.storage_location.type} to s3 is not supported")
  end
end
replicate_from_fs(file_rec) click to toggle source
# File lib/longleaf/preservation_services/s3_replication_service.rb, line 82
def replicate_from_fs(file_rec)
  # Determine the path to the file being replicated relative to its storage location
  rel_path = file_rec.storage_location.relativize(file_rec.path)

  @destinations.each do |destination|
    # Check that the destination is available before attempting to write
    verify_destination_available(destination, file_rec)

    rel_to_bucket = destination.relative_to_bucket_path(rel_path)
    file_obj = destination.s3_bucket.object(rel_to_bucket)
    begin
      file_obj.upload_file(file_rec.physical_path)
    rescue Aws::S3::Errors::BadDigest => e
      raise ChecksumMismatchError.new("Transfer to bucket '#{destination.s3_bucket.name}' failed, " \
          + "MD5 provided did not match the received content for #{file_rec.path}")
    rescue Aws::Errors::ServiceError => e
      raise PreservationServiceError.new("Failed to transfer #{file_rec.path} to bucket " \
          + "'#{destination.s3_bucket.name}': #{e.message}")
    end

    logger.info("Replicated #{file_rec.path} to destination #{file_obj.public_url}")

    # TODO register file in destination
  end
end

Private Instance Methods

verify_destination_available(destination, file_rec) click to toggle source
# File lib/longleaf/preservation_services/s3_replication_service.rb, line 122
def verify_destination_available(destination, file_rec)
  begin
    destination.available?
  rescue StorageLocationUnavailableError => e
    raise StorageLocationUnavailableError.new("Cannot replicate #{file_rec.path} to destination #{destination.name}: " \
        + e.message)
  end
end