Source code for snapm.manager._manager

# Copyright Red Hat
#
# snapm/manager/_manager.py - Snapshot Manager
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
Manager interface and plugin infrastructure.
"""
from subprocess import run, CalledProcessError
import logging
from time import time
from math import floor
from stat import S_ISBLK
from os import stat
from os.path import exists, ismount, normpath, samefile

from snapm.manager.signals import suspend_signals
from snapm.manager.boot import (
    BootCache,
    create_snapset_boot_entry,
    delete_snapset_boot_entry,
    create_snapset_revert_entry,
    delete_snapset_revert_entry,
    check_boom_config,
)

from snapm import (
    SNAPM_DEBUG_MANAGER,
    SNAPM_VALID_NAME_CHARS,
    SnapmError,
    SnapmCalloutError,
    SnapmNoSpaceError,
    SnapmNoProviderError,
    SnapmExistsError,
    SnapmBusyError,
    SnapmPathError,
    SnapmNotFoundError,
    SnapmInvalidIdentifierError,
    SnapmPluginError,
    SnapmStateError,
    SnapmRecursionError,
    SnapmArgumentError,
    Selection,
    bool_to_yes_no,
    is_size_policy,
    SnapStatus,
    SnapshotSet,
)
from ._loader import load_plugins


_log = logging.getLogger(__name__)
_log.set_debug_mask(SNAPM_DEBUG_MANAGER)

_log_debug = _log.debug
_log_debug_manager = _log.debug_masked
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error

JOURNALCTL_CMD = "journalctl"


# pylint: disable=too-many-return-statements
def select_snapshot_set(select, snapshot_set):
    """
    Test SnapshotSet against Selection criteria.

    Test the supplied ``SnapshotSet`` against the selection criteria
    in ``s`` and return ``True`` if it passes, or ``False``
    otherwise.

    :param s: The selection criteria
    :param be: The SnapshotSet to test
    :rtype: bool
    :returns: True if SnapshotSet passes selection or ``False``
              otherwise.
    """
    if select.name and select.name != snapshot_set.name:
        return False
    if select.uuid and select.uuid != snapshot_set.uuid:
        return False
    if select.timestamp and select.timestamp != snapshot_set.timestamp:
        return False
    if select.nr_snapshots and select.nr_snapshots != snapshot_set.nr_snapshots:
        return False
    if select.mount_points:
        s_mount_points = sorted(select.mount_points)
        mount_points = sorted(snapshot_set.mount_points)
        if s_mount_points != mount_points:
            return False

    return True


def select_snapshot(select, snapshot):
    """
    Test SnapshotSet against Selection criteria.

    Test the supplied ``SnapshotSet`` against the selection criteria
    in ``s`` and return ``True`` if it passes, or ``False``
    otherwise.

    :param s: The selection criteria
    :param be: The SnapshotSet to test
    :rtype: bool
    :returns: True if SnapshotSet passes selection or ``False``
              otherwise.
    """
    if not select_snapshot_set(select, snapshot.snapshot_set):
        return False
    if select.snapshot_uuid and select.snapshot_uuid != snapshot.uuid:
        return False
    if select.snapshot_name and select.snapshot_name != snapshot.name:
        return False

    return True


def _suspend_journal():
    """
    Suspend journal writes to /var before creating snapshots.
    """
    try:
        run([JOURNALCTL_CMD, "--flush"], check=True)
        run([JOURNALCTL_CMD, "--relinquish-var"], check=True)
    except CalledProcessError as err:  # pragma: no cover
        raise SnapmCalloutError(
            f"Error calling journalctl to flush journal: {err}"
        ) from err


def _resume_journal():
    """
    Resume journal writes to /var after creating snapshots.
    """
    try:
        run([JOURNALCTL_CMD, "--flush"], check=True)
    except CalledProcessError as err:  # pragma: no cover
        raise SnapmCalloutError(
            f"Error calling journalctl to flush journal: {err}"
        ) from err


def _parse_source_specs(source_specs, default_size_policy):
    """
    Parse and normalize source paths and size policies.

    :param source_specs: A list of mount point or block device paths and
                         optional size policy strings.
    :returns: A tuple (sources, size_policies) containing a list of
              normalized mount point and block device paths and a dictionary
              mapping source paths to size policies.
    """
    sources = []
    size_policies = {}

    # Parse size policies and normalise mount paths
    for spec in source_specs:
        if ":" in spec:
            (source, policy) = spec.rsplit(":", maxsplit=1)
            if not is_size_policy(policy):
                source = f"{source}:{policy}"
                policy = default_size_policy
        else:
            (source, policy) = (spec, default_size_policy)
        source = normpath(source)
        sources.append(source)
        size_policies[source] = policy
    return (sources, size_policies)


def _check_revert_snapshot_set(snapset):
    """
    Check that all snapshots in a snapshot set can be reverted.
    Returns if all checks pass or raises an exception otherwise.

    :returns: None
    :raises: ``NotImplementedError``, ``SnapmBusyError`` or
             ``SnapmPluginError``
    """
    if snapset.status == SnapStatus.INVALID:
        raise SnapmStateError(
            f"Cannot revert snapset {snapset.name} with invalid snapshots"
        )

    # Check for ability to revert all snapshots in set
    for snapshot in snapset.snapshots:
        try:
            snapshot.check_revert()
        except NotImplementedError as err:
            _log_error(
                "Snapshot provider %s does not support revert",
                snapshot.provider.name,
            )
            raise SnapmPluginError from err
        except SnapmBusyError as err:
            _log_error(
                "Revert already in progress for snapshot origin %s",
                snapshot.origin,
            )
            raise SnapmBusyError(
                f"A revert has already started for snapshot set {snapset.name}"
            ) from err
        except SnapmPluginError as err:
            _log_error(
                "Revert prechecks failed for snapshot %s (%s)",
                snapshot.name,
                snapshot.provider.name,
            )
            raise err


def _check_snapset_status(snapset, operation):
    """
    Check that a snapshot set status is not ``SnapStatus.INVALID`` or
    ``SnapStatus.REVERTING`` before carrying out ``operation``.
    """
    if snapset.status in (SnapStatus.INVALID, SnapStatus.REVERTING):
        if snapset.status == SnapStatus.INVALID:
            _log_error("Cannot operate on invalid snapshot set '%s'", snapset.name)
        if snapset.status == SnapStatus.REVERTING:
            _log_error("Cannot operate on reverting snapshot set '%s'", snapset.name)
        raise SnapmStateError(
            f"Failed to {operation} snapset '{snapset.name}': status is '{snapset.status}'"
        )


def _find_mount_point_for_devpath(devpath):
    """
    Return the first mount point found in /proc/mounts that corresponds to
    ``device``, or the empty string if no mount point can be found.
    """
    with open("/proc/mounts", "r", encoding="utf8") as mounts:
        for line in mounts:
            fields = line.split(" ")
            if exists(fields[0]):
                if samefile(devpath, fields[0]):
                    return fields[1]
    return ""


[docs] class Manager: """ Snapshot Manager high level interface. """ plugins = [] snapshot_sets = [] by_name = {} by_uuid = {}
[docs] @suspend_signals def __init__(self): self.plugins = [] self.snapshot_sets = [] self.by_name = {} self.by_uuid = {} check_boom_config() self._boot_cache = BootCache() plugin_classes = load_plugins() for plugin_class in plugin_classes: _log_debug("Loading plugin class '%s'", plugin_class.__name__) try: plugin = plugin_class(_log) self.plugins.append(plugin) except SnapmNotFoundError as err: _log_debug( "Plugin dependencies missing: %s (%s), skipping.", plugin_class.__name__, err, ) except SnapmPluginError as err: _log_error("Disabling plugin %s: %s", plugin_class.__name__, err) self.discover_snapshot_sets()
[docs] def _find_and_verify_plugins( self, sources, size_policies, _requested_provider=None ): """ Find snapshot provider plugins for each source in ``sources`` and verify that a provider exists for each source present. :param sources: A list of source mount point or block device paths. :param size_policies: A dictionary mapping sources to size policies. :returns: A dictionary mapping sources to plugins. """ # Initialise provider mapping. provider_map = {k: None for k in sources} # Find provider plugins for sources for source in sources: if not exists(source): _log_error("No such file or directory: %s", source) raise SnapmNotFoundError(f"Source path '{source}' does not exist") _log_debug( "Probing plugins for %s with size policy %s", source, size_policies[source], ) if not ismount(source) and not S_ISBLK(stat(source).st_mode): raise SnapmPathError( f"Path '{source}' is not a block device or mount point" ) for plugin in self.plugins: if plugin.can_snapshot(source): provider_map[source] = plugin # Verify each mount point has a provider plugin for source in provider_map: if provider_map[source] is None: raise SnapmNoProviderError( f"Could not find snapshot provider for {source}" ) return provider_map
[docs] def _snapset_from_name_or_uuid(self, name=None, uuid=None): """ Look a snapshot set up by ``name`` or ``uuid``. Returns a ``SnapshotSet`` correponding to either ``name`` or ``uuid`, or raises ``SnapmError`` on error. :param name: The name of the snapshot set to look up. :param uuid: The UUID of the snapshot set to look up. :returns: A ``SnapshotSet`` corresponding to the given name or UUID. :raises: ``SnapmNotFoundError`` is the name or UUID cannot be found. ``SnapmInvalidIdentifierError`` if the name and UUID do not match. """ if name is not None: if name not in self.by_name: raise SnapmNotFoundError(f"Could not find snapshot set named {name}") snapset = self.by_name[name] elif uuid is not None: if uuid not in self.by_uuid: raise SnapmNotFoundError( f"Could not find snapshot set with uuid {uuid}" ) snapset = self.by_uuid[uuid] else: raise SnapmNotFoundError("A snapshot set name or UUID is required") if name and uuid: if self.by_name[name] != self.by_uuid[uuid]: raise SnapmInvalidIdentifierError( f"Conflicting name and UUID: {str(uuid)} does not match '{name}'" ) return snapset
[docs] def _check_recursion(self, origins): """ Verify that each entry in ``origins`` corresponds to a device that is not a snapshot belonging to another snapshot set. :param origins: A list of origin devices to check. :raises: ``SnapmRecursionError`` if an origin device is a snapshot. """ snapshot_devices = [ snapshot.devpath for snapset in self.snapshot_sets for snapshot in snapset.snapshots ] for source, device in origins.items(): if device in snapshot_devices: raise SnapmRecursionError( "Snapshots of snapshots are not supported: " f"{source} corresponds to snapshot device {device}" )
[docs] def discover_snapshot_sets(self): """ Discover snapshot sets by calling into each plugin to find individual snapshots and then aggregating them together into snapshot sets. Initialises the ``snapshot_sets``, ``by_name`` and ``by_uuid`` members with the discovered snapshot sets. """ self.snapshot_sets = [] self.by_name = {} self.by_uuid = {} snapshots = [] snapset_names = set() self._boot_cache.refresh_cache() _log_debug("Discovering snapshot sets for %s plugins", len(self.plugins)) for plugin in self.plugins: snapshots.extend(plugin.discover_snapshots()) _log_debug("Discovered %s managed snapshots", len(snapshots)) for snapshot in snapshots: snapset_names.add(snapshot.snapset_name) for snapset_name in snapset_names: set_snapshots = [ snap for snap in snapshots if snap.snapset_name == snapset_name ] set_timestamp = set_snapshots[0].timestamp for snap in set_snapshots: if snap.timestamp != set_timestamp: _log_warn( "Snapshot set '%s' has inconsistent timestamps", snapset_name ) continue snapset = SnapshotSet(snapset_name, set_timestamp, set_snapshots) # Associate snapset with boot entry if present if snapset.name in self._boot_cache.entry_cache: snapset.boot_entry = self._boot_cache.entry_cache[snapset.name] elif str(snapset.uuid) in self._boot_cache.entry_cache: snapset.boot_entry = self._boot_cache.entry_cache[str(snapset.uuid)] # Associate snapset with revert entry if present if snapset.name in self._boot_cache.revert_cache: snapset.revert_entry = self._boot_cache.revert_cache[snapset.name] elif str(snapset.uuid) in self._boot_cache.revert_cache: snapset.revert_entry = self._boot_cache.revert_cache[str(snapset.uuid)] self.snapshot_sets.append(snapset) self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset for snapshot in snapset.snapshots: snapshot.snapshot_set = snapset self.snapshot_sets.sort(key=lambda ss: ss.name) _log_debug("Discovered %d snapshot sets", len(self.snapshot_sets))
[docs] def find_snapshot_sets(self, selection=None): """ Find snapshot sets matching selection criteria. :param selection: Selection criteria to apply. :returns: A list of matching ``SnapshotSet`` objects. """ matches = [] # Use null selection criteria if unspecified selection = selection if selection else Selection() selection.check_valid_selection(snapshot_set=True) _log_debug("Finding snapshot sets for %s", repr(selection)) for snapset in self.snapshot_sets: if select_snapshot_set(selection, snapset): matches.append(snapset) _log_debug("Found %d snapshot sets", len(matches)) return matches
[docs] def find_snapshots(self, selection=None): """ Find snapshots matching selection criteria. :param selection: Selection criteria to apply. :returns: A list of matching ``Snapshot`` objects. """ matches = [] # Use null selection criteria if unspecified selection = selection if selection else Selection() selection.check_valid_selection(snapshot=True, snapshot_set=True) _log_debug("Finding snapshots for %s", repr(selection)) for snapset in self.snapshot_sets: for snapshot in snapset.snapshots: if select_snapshot(selection, snapshot): matches.append(snapshot) _log_debug("Found %d snapshots", len(matches)) return matches
[docs] def _validate_snapset_name(self, name): """ Validate a snapshot set name. Returns if ``name`` is a valid snapshot set name or raises an appropriate exception otherwise. :param name: The snapshot set name to validate. :raises: ``SnapmExistsError`` if the name is already in use, or ``SnapmInvalidIdentifierError`` if the name fails validation. """ if name in self.by_name: raise SnapmExistsError(f"Snapshot set named '{name}' already exists") for char in name: # Underscore is specifically disallowed in snapset names. if char == "_" or char not in SNAPM_VALID_NAME_CHARS: raise SnapmInvalidIdentifierError( f"Snapshot set name cannot include '{char}'" )
# pylint: disable=too-many-branches,too-many-locals,too-many-statements
[docs] @suspend_signals def create_snapshot_set( self, name, source_specs, default_size_policy=None, boot=False, revert=False ): """ Create a snapshot set of the supplied mount points with the name ``name``. :param name: The name of the snapshot set. :param source_specs: A list of mount point and block device paths to include in the set. :param default_size_policy: A default size policy to use for the set. :raises: ``SnapmExistsError`` if the name is already in use, or ``SnapmInvalidIdentifierError`` if the name fails validation. """ self._validate_snapset_name(name) # Parse size policies and normalise mount paths (sources, size_policies) = _parse_source_specs( source_specs, default_size_policy ) # Initialise provider mapping. provider_map = self._find_and_verify_plugins(sources, size_policies, None) for provider in set(provider_map.values()): provider.start_transaction() timestamp = floor(time()) origins = {} mounts = {} for source in provider_map: if S_ISBLK(stat(source).st_mode): mounts[source] = _find_mount_point_for_devpath(source) origins[source] = source mount = mounts[source] if mount in provider_map: raise SnapmInvalidIdentifierError( f"Duplicate snapshot source {source} already added to {name} as {mount}" ) else: mount = source origins[source] = provider_map[source].origin_from_mount_point(mount) try: provider_map[source].check_create_snapshot( origins[source], name, timestamp, mount, size_policies[source] ) except SnapmInvalidIdentifierError as err: _log_error( "Error creating %s snapshot: %s", provider_map[source].name, err ) raise SnapmInvalidIdentifierError( f"Snapset name {name} too long" ) from err except SnapmNoSpaceError as err: _log_error( "Error creating %s snapshot: %s", provider_map[source].name, err ) raise SnapmNoSpaceError( f"Insufficient free space for snapshot set {name}" ) from err self._check_recursion(origins) _suspend_journal() snapshots = [] for source in provider_map: if S_ISBLK(stat(source).st_mode): mount = mounts[source] else: mount = source try: snapshots.append( provider_map[source].create_snapshot( origins[source], name, timestamp, mount, size_policies[source] ) ) except SnapmError as err: _log_error("Error creating snapshot set member %s: %s", name, err) _resume_journal() for snapshot in snapshots: snapshot.delete() raise SnapmPluginError( f"Could not create all snapshots for set {name}" ) from err _resume_journal() for provider in set(provider_map.values()): provider.end_transaction() snapset = SnapshotSet(name, timestamp, snapshots) if boot or revert: _log_info( "Autoactivation required for bootable snapshot set '%s'", snapset.name ) self._set_autoactivate(snapset, auto=True) snapset.activate() if boot: try: create_snapset_boot_entry(snapset) self._boot_cache.refresh_cache() except (OSError, ValueError) as err: _log_error("Failed to create snapshot set boot entry: %s", err) snapset.delete() raise SnapmCalloutError from err if revert: try: create_snapset_revert_entry(snapset) self._boot_cache.refresh_cache() except (OSError, ValueError) as err: _log_error("Failed to create snapshot set revert boot entry: %s", err) snapset.delete() raise SnapmCalloutError from err self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset self.snapshot_sets.append(snapset) return snapset
[docs] @suspend_signals def rename_snapshot_set(self, old_name, new_name): """ Rename snapshot set ``old_name`` as ``new_name``. :param old_name: The name of the snapshot set to be renamed. :param new_name: The new name of the snapshot set. :raises: ``SnapmExistsError`` if the name is already in use, or ``SnapmInvalidIdentifierError`` if the name fails validation. """ if old_name not in self.by_name: raise SnapmNotFoundError(f"Cannot find snapshot set named {old_name}") self._validate_snapset_name(new_name) snapset = self.by_name[old_name] _check_snapset_status(snapset, "rename") # Remove references to old set self.by_name.pop(snapset.name) self.by_uuid.pop(snapset.uuid) self.snapshot_sets.remove(snapset) try: snapset.rename(new_name) except SnapmError as err: self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset self.snapshot_sets.append(snapset) raise err self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset self.snapshot_sets.append(snapset) return snapset
[docs] @suspend_signals def delete_snapshot_sets(self, selection): """ Remove snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to remove. """ sets = self.find_snapshot_sets(selection=selection) deleted = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: delete_snapset_boot_entry(snapset) delete_snapset_revert_entry(snapset) snapset.delete() self.snapshot_sets.remove(snapset) self.by_name.pop(snapset.name) self.by_uuid.pop(snapset.uuid) deleted += 1 self._boot_cache.refresh_cache() return deleted
# pylint: disable=too-many-branches
[docs] @suspend_signals def resize_snapshot_set( self, source_specs, name=None, uuid=None, default_size_policy=None ): """ Resize snapshot set named ``name`` or having UUID ``uuid``. Request to resize each snapshot included in ``source_specs`` according to the given size policy, or apply ``default_size_policy`` if set. :param name: The name of the snapshot set to resize. :param uuid: The UUID of the snapshot set to resize. :param source_specs: A list of mount points and optional size policies. :param default_size_policy: A default size policy to apply to the resize. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) if source_specs: # Parse size policies and normalise mount paths (sources, size_policies) = _parse_source_specs( source_specs, default_size_policy ) else: sources = [snapshot.source for snapshot in snapset.snapshots] size_policies = {source: default_size_policy for source in sources} snapset.resize(sources, size_policies)
[docs] @suspend_signals def revert_snapshot_set(self, name=None, uuid=None): """ Revert snapshot set named ``name`` or having UUID ``uuid``. Request to revert each snapshot origin within each snapshot set to the state at the time the snapshot was taken. :param name: The name of the snapshot set to revert. :param uuid: The UUID of the snapshot set to revert. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) _check_revert_snapshot_set(snapset) # Snapshot boot entry becomes invalid as soon as revert is initiated. delete_snapset_boot_entry(snapset) snapset.revert() self._boot_cache.refresh_cache() return snapset
[docs] def revert_snapshot_sets(self, selection): """ Revert snapshot sets matching selection criteria ``selection``. Request to revert each snapshot origin within each snapshot set to the state at the time the snapshot was taken. :param selection: Selection criteria for snapshot sets to revert. """ sets = self.find_snapshot_sets(selection=selection) reverted = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: self.revert_snapshot_set(name=snapset.name) reverted += 1 return reverted
[docs] @suspend_signals def activate_snapshot_sets(self, selection): """ Activate snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to activate. """ sets = self.find_snapshot_sets(selection=selection) activated = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: _check_snapset_status(snapset, "activate") snapset.activate() activated += 1 return activated
[docs] @suspend_signals def deactivate_snapshot_sets(self, selection): """ Deactivate snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to deactivate. """ sets = self.find_snapshot_sets(selection=selection) deactivated = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: _check_snapset_status(snapset, "deactivate") snapset.deactivate() deactivated += 1 return deactivated
[docs] def _set_autoactivate(self, snapset, auto=False): """ Set autoactivation for ``snapset``. :param snapset: The ``SnapshotSet`` object to operate on. :param auto: ``True`` to enable autoactivation or ``False`` otherwise. """ _check_snapset_status(snapset, "set autoactivate status for") state = bool_to_yes_no(auto) _log_info("Setting autoactivation=%s for snapshot set %s", state, snapset.name) snapset.autoactivate = auto
[docs] @suspend_signals def set_autoactivate(self, selection, auto=False): """ Set autoactivation state for snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to set autoactivation. :param auto: ``True`` to enable autoactivation or ``False`` otherwise. """ sets = self.find_snapshot_sets(selection=selection) changed = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: self._set_autoactivate(snapset, auto=auto) changed += 1 return changed
[docs] @suspend_signals def split_snapshot_set(self, name, new_name, source_specs): """ Split the snapshot set named `name` into two snapshot sets, with `new_name` containing the listed `sources` and `name` containing all remaining snapshots. If `new_name` is `None` the listed snapshot are removed from `name` and permanently deleted. This operation cannot be undone. :param name: The name of the snapshot set to split :param new_name: The name of the newly split off snapshot set :param sources: The list of sources to include in the new snapshot set """ if name not in self.by_name: raise SnapmNotFoundError(f"Cannot find snapshot set named {name}") if new_name: self._validate_snapset_name(new_name) op = f"{'split' if new_name else 'prune'}" _log_info("Attempting to %s snapshot set '%s'", op, name) snapset = self.by_name[name] _check_snapset_status(snapset, op) if not source_specs: raise SnapmArgumentError( f"SnapshotSet {op} requires at least one source argument" ) # Parse size policies and normalise mount paths (sources, size_policies) = _parse_source_specs(source_specs, None) if any(size_policies[source] is not None for source in sources): err_sources = ", ".join( [source for source in source_specs if ":" in source] ) raise SnapmArgumentError( f"SnapshotSet {op} does not support size policies: {err_sources}" ) missing_sources = [ source for source in sources if source not in snapset.sources ] if missing_sources: raise SnapmNotFoundError( f"SnapshotSet '{snapset.name}' does not contain {', '.join(missing_sources)}" ) timestamp = snapset.timestamp split_snapshots = [ snapshot for snapshot in snapset.snapshots if snapshot.source in sources ] keep_snapshots = [ snapshot for snapshot in snapset.snapshots if snapshot.source not in sources ] if not keep_snapshots: op_str = "Splitting" if new_name else "Pruning" raise SnapmArgumentError( f"{op_str} {', '.join((snap.source for snap in split_snapshots))} " f"from {snapset.name} would leave {snapset.name} empty" ) _log_debug( "Splitting snapshot%s %s from snapshot set '%s'", "s" if len(split_snapshots) > 1 else "", ", ".join((snap.source for snap in split_snapshots)), snapset.name, ) self.by_name.pop(snapset.name) self.by_uuid.pop(snapset.uuid) self.snapshot_sets.remove(snapset) # Build new SnapshotSet with old name to start with, then use # SnapshotSet.rename() to propagate new_name to members. new_set = SnapshotSet(snapset.name, timestamp, split_snapshots) # Update original snapshot set and re-add to lookup tables orig_set = SnapshotSet(snapset.name, timestamp, keep_snapshots) self.by_name[orig_set.name] = orig_set self.by_uuid[orig_set.uuid] = orig_set self.snapshot_sets.append(orig_set) if new_name: # Attempt to rename members to new_name try: new_set.rename(new_name) except SnapmError as err: self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset self.snapshot_sets.append(snapset) raise err else: try: # Delete the pruned members new_set.delete() except SnapmError as err: _log_error( "Failed to clean up pruned snapshot set members: %s (%s)", err, ", ".join(snapshot.name for snapshot in new_set.snapshots), ) return orig_set # Add new snapshot set to lookup tables self.by_name[new_set.name] = new_set self.by_uuid[new_set.uuid] = new_set self.snapshot_sets.append(new_set) return new_set
[docs] @suspend_signals def create_snapshot_set_boot_entry(self, name=None, uuid=None): """ Create a snapshot boot entry for the specified snapshot set. :param name: The name of the snapshot set. :param uuid: The UUID of the snapshot set. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) snapset.autoactivate = True if not snapset.autoactivate: raise SnapmPluginError( "Could not enable autoactivation for all snapshots in snapshot " f"set {snapset.name}" ) if snapset.boot_entry is not None: raise SnapmExistsError( f"Boot entry already associated with snapshot set {snapset.name}" ) create_snapset_boot_entry(snapset) self._boot_cache.refresh_cache()
[docs] @suspend_signals def create_snapshot_set_revert_entry(self, name=None, uuid=None): """ Create a revert boot entry for the specified snapshot set. :param name: The name of the snapshot set. :param uuid: The UUID of the snapshot set. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) if snapset.revert_entry is not None: raise SnapmExistsError( f"Revert entry already associated with snapshot set {snapset.name}" ) create_snapset_revert_entry(snapset) self._boot_cache.refresh_cache()
__all__ = [ "Manager", ]