Source code for snapm.manager._manager

# Copyright Red Hat
#
# snapm/manager/_manager.py - Snapshot Manager
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: GPL-2.0-only
"""
Manager interface and plugin infrastructure.
"""
from subprocess import run, CalledProcessError
import logging
from time import time
from math import floor
from stat import S_ISBLK
from os import stat
from os.path import exists, ismount, normpath, samefile
import fnmatch
import inspect
import os

import snapm.manager.plugins
from snapm.manager.signals import suspend_signals
from snapm.manager.boot import (
    BootCache,
    create_snapset_boot_entry,
    delete_snapset_boot_entry,
    create_snapset_revert_entry,
    delete_snapset_revert_entry,
    check_boom_config,
)

from snapm import (
    SNAPM_DEBUG_MANAGER,
    SnapmError,
    SnapmCalloutError,
    SnapmNoSpaceError,
    SnapmNoProviderError,
    SnapmExistsError,
    SnapmBusyError,
    SnapmPathError,
    SnapmNotFoundError,
    SnapmInvalidIdentifierError,
    SnapmPluginError,
    SnapmStateError,
    SnapmRecursionError,
    Selection,
    is_size_policy,
    SnapStatus,
    SnapshotSet,
    Snapshot,
)


_log = logging.getLogger(__name__)
_log.set_debug_mask(SNAPM_DEBUG_MANAGER)

_log_debug = _log.debug
_log_debug_manager = _log.debug_masked
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error

JOURNALCTL_CMD = "journalctl"


class PluginRegistry(type):
    """
    Metaclass for Plugin classes.
    """

    plugins = []

    def __init__(cls, name, _bases, _attrs):
        super().__init__(name, _bases, _attrs)
        if name != "Plugin" and not name.startswith("_"):
            _log_debug("Loaded plugin %s version: %s", cls.name, cls.version)
            PluginRegistry.plugins.append(cls)


[docs] class Plugin(metaclass=PluginRegistry): """ Abstract base class for snapshot manager plugins. """ name = "plugin" version = "0.1.0" snapshot_class = Snapshot
[docs] def __init__(self, logger): self.size_map = None self.logger = logger
[docs] def _log_error(self, *args): """ Log at error level. """ self.logger.error(*args)
[docs] def _log_warn(self, *args): """ Log at warning level. """ self.logger.warning(*args)
[docs] def _log_info(self, *args): """ Log at info level. """ self.logger.info(*args)
[docs] def _log_debug(self, *args): """ Log at debug level. """ self.logger.debug(*args)
[docs] def info(self): """ Return plugin name and version. """ return {"name": self.name, "version": self.version}
[docs] def start_transaction(self): """ Begin a snapshot set creation transaction in this plugin. """ self.size_map = {}
[docs] def end_transaction(self): """ End a snapshot set creation transaction in this plugin. """ self.size_map = None
[docs] def discover_snapshots(self): """ Discover snapshots managed by this plugin class. Returns a list of objects that are a subclass of ``Snapshot``. :returns: A list of snapshots discovered by this plugin class. """ raise NotImplementedError
[docs] def can_snapshot(self, source): """ Test whether this plugin can snapshot the specified mount point or block device. :param source: The block device or mount point path to test. :returns: ``True`` if this plugin can snapshot the file system mounted at ``mount_point``, or ``False`` otherwise. """ raise NotImplementedError
[docs] def check_create_snapshot( self, origin, snapset_name, timestamp, mount_point, size_policy ): """ Perform pre-creation checks before creating a snapshot. :param origin: The origin volume for the snapshot. :param snapset_name: The name of the snapshot set to be created. :param timestamp: The snapshot set timestamp. :param mount_point: The mount point path for this snapshot. :raises: ``SnapmNoSpaceError`` if there is insufficient free space to create the snapshot. """ raise NotImplementedError
[docs] def create_snapshot( self, origin, snapset_name, timestamp, mount_point, size_policy ): """ Create a snapshot of ``origin`` in the snapset named ``snapset_name``. :param origin: The origin volume for the snapshot. :param snapset_name: The name of the snapshot set to be created. :param timestamp: The snapshot set timestamp. :param mount_point: The mount point path for this snapshot. :raises: ``SnapmNoSpaceError`` if there is insufficient free space to create the snapshot. """ raise NotImplementedError
[docs] def rename_snapshot(self, old_name, origin, snapset_name, timestamp, mount_point): """ Rename the snapshot named ``old_name`` according to the provided snapshot field values. :param old_name: The original name of the snapshot to be renamed. :param origin: The origin volume for the snapshot. :param snapset_name: The new name of the snapshot set. :param timestamp: The snapshot set timestamp. :param mount_point: The mount point of the snapshot. """ raise NotImplementedError
[docs] def check_resize_snapshot(self, name, origin, mount_point, size_policy): """ Chcek whether this snapshot can be resized to the requested ``size_policy``. This method returns if the resize can be satisfied and raises an exception if not. :returns: None :raises: ``SnapmNoSpaceError`` if insufficient space is available to satisfy the requested size policy or ``SnapmPluginError`` if another reason prevents the snapshot from being resized. """ raise NotImplementedError
[docs] def resize_snapshot(self, name, origin, mount_point, size_policy): """ Attempt to resize the snapshot ``name`` to the requested ``size_policy``. This method returns if the resize can be satisfied and raises an exception if not. :returns: None :raises: ``SnapmNoSpaceError`` if insufficient space is available to satisfy the requested size policy or ``SnapmPluginError`` if another reason prevents the snapshot from being resized. """ raise NotImplementedError
[docs] def check_revert_snapshot(self, name, origin): """ Check whether this snapshot can be reverted or not. This method returns if the current snapshot can be reverted and raises an exception if not. :returns: None :raises: ``NotImplementedError`` if this plugin does not support the revert operation, ``SnapmBusyError`` if the snapshot is already in the process of being reverted to another snapshot state or ``SnapmPluginError`` if another reason prevents the snapshot from being merged. """ raise NotImplementedError
[docs] def revert_snapshot(self, name): """ Request to revert a snapshot and revert the content of the origin volume to its state at the time of the snapshot. This may be deferred until the next device activation or mount operation for the respective volume. :param name: The name of the snapshot to revert. """ raise NotImplementedError
[docs] def delete_snapshot(self, name): """ Delete the snapshot named ``name`` :param name: The name of the snapshot to be removed. """ raise NotImplementedError
[docs] def activate_snapshot(self, name): """ Activate the snapshot named ``name`` :param name: The name of the snapshot to be activated. """ raise NotImplementedError
[docs] def deactivate_snapshot(self, name): """ Deactivate the snapshot named ``name`` :param name: The name of the snapshot to be deactivated. """ raise NotImplementedError
[docs] def set_autoactivate(self, name, auto=False): """ Set the autoactivation state of the snapshot named ``name``. :param name: The name of the snapshot to be modified. :param auto: ``True`` to enable autoactivation or ``False`` otherwise. """ raise NotImplementedError
[docs] def origin_from_mount_point(self, mount_point): """ Return a string representing the origin from a given mount point path. :param mount_point: The mount point path. """ raise NotImplementedError
def find(file_pattern, top_dir, max_depth=None, path_pattern=None): """ Generator function to find files recursively. Usage:: for filename in find("*.properties", "/var/log/foobar"): print filename """ if max_depth: base_depth = os.path.dirname(top_dir).count(os.path.sep) max_depth += base_depth for path, dirlist, filelist in os.walk(top_dir): if max_depth and path.count(os.path.sep) >= max_depth: del dirlist[:] if path_pattern and not fnmatch.fnmatch(path, path_pattern): continue for name in fnmatch.filter(filelist, file_pattern): yield os.path.join(path, name) def _plugin_name(path): """ Returns the plugin module name given the path. """ base = os.path.basename(path) name, _ = os.path.splitext(base) return name def _get_plugins_from_list(pluglist): """ Get list of plugin names from file list. :param pluglist: list of candidate plugin files. """ plugins = [ _plugin_name(plugin) for plugin in pluglist if "__init__" not in plugin and plugin.endswith(".py") ] plugins.sort() return plugins def _find_plugins_in_dir(path): """ Find possible plugin files in ``path``. :param path: The search directory for plugin files. """ _log_debug_manager("Finding plugins in %s", path) if os.path.exists(path): py_files = list(find("[a-zA-Z]*.py", path)) pnames = _get_plugins_from_list(py_files) _log_debug_manager("Found plugin modules: %s", ", ".join(pnames)) if pnames: return pnames return [] class ImporterHelper: """ Provides a list of modules that can be imported in a package. Importable modules are located along the module __path__ list and modules are files that end in .py. """ def __init__(self, package): """ package is a package module import my.package.module helper = ImporterHelper(my.package.module) """ self.package = package def get_modules(self): """ Get list of importable modules. Returns the list of importable modules in the configured python package. """ plugins = [] for path in self.package.__path__: if os.path.isdir(path): plugins.extend(_find_plugins_in_dir(path)) return plugins def import_module(module_fqname, superclasses=None): """ Import a single module. Imports the module module_fqname and returns a list of defined classes from that module. If superclasses is defined then the classes returned will be subclasses of the specified superclass or superclasses. If superclasses is plural it must be a tuple of classes. """ module_name = module_fqname.rpartition(".")[-1] try: module = __import__(module_fqname, globals(), locals(), [module_name]) # pylint: disable=broad-except except Exception as err: _log_error("Error importing %s plugin module: %s", module_fqname, err) return [] modules = [ class_ for cname, class_ in inspect.getmembers(module, inspect.isclass) if class_.__module__ == module_fqname ] if superclasses: modules = [m for m in modules if issubclass(m, superclasses)] return modules def import_plugin(name, superclasses=None): """ Import name as a module and return a list of all classes defined in that module. superclasses should be a tuple of valid superclasses to import, this defaults to (Plugin,). """ plugin_fqname = f"snapm.manager.plugins.{name}" if not superclasses: superclasses = (Plugin,) _log_debug("Importing plugin module %s", plugin_fqname) return import_module(plugin_fqname, superclasses) def load_plugins(): """ Attempt to load plugin modules. """ helper = ImporterHelper(snapm.manager.plugins) plugins = helper.get_modules() _log_debug( "Importing %d modules from %s", len(plugins), snapm.manager.plugins.__name__ ) for plug in plugins: plugbase, _ = os.path.splitext(plug) if not plugbase.startswith("_"): import_plugin(plugbase) def select_snapshot_set(select, snapshot_set): """ Test SnapshotSet against Selection criteria. Test the supplied ``SnapshotSet`` against the selection criteria in ``s`` and return ``True`` if it passes, or ``False`` otherwise. :param s: The selection criteria :param be: The SnapshotSet to test :rtype: bool :returns: True if SnapshotSet passes selection or ``False`` otherwise. """ if select.name and select.name != snapshot_set.name: return False if select.uuid and select.uuid != snapshot_set.uuid: return False if select.timestamp and select.timestamp != snapshot_set.timestamp: return False if select.nr_snapshots and select.nr_snapshots != snapshot_set.nr_snapshots: return False if select.mount_points: s_mount_points = sorted(select.mount_points) mount_points = sorted(snapshot_set.mount_points) if s_mount_points != mount_points: return False return True def select_snapshot(select, snapshot): """ Test SnapshotSet against Selection criteria. Test the supplied ``SnapshotSet`` against the selection criteria in ``s`` and return ``True`` if it passes, or ``False`` otherwise. :param s: The selection criteria :param be: The SnapshotSet to test :rtype: bool :returns: True if SnapshotSet passes selection or ``False`` otherwise. """ if not select_snapshot_set(select, snapshot.snapshot_set): return False if select.snapshot_uuid and select.snapshot_uuid != snapshot.uuid: return False if select.snapshot_name and select.snapshot_name != snapshot.name: return False return True def _suspend_journal(): """ Suspend journal writes to /var before creating snapshots. """ try: run([JOURNALCTL_CMD, "--flush"], check=True) run([JOURNALCTL_CMD, "--relinquish-var"], check=True) except CalledProcessError as err: # pragma: no cover raise SnapmCalloutError( f"Error calling journalctl to flush journal: {err}" ) from err def _resume_journal(): """ Resume journal writes to /var after creating snapshots. """ try: run([JOURNALCTL_CMD, "--flush"], check=True) except CalledProcessError as err: # pragma: no cover raise SnapmCalloutError( f"Error calling journalctl to flush journal: {err}" ) from err def _parse_source_specs(source_specs, default_size_policy): """ Parse and normalize source paths and size policies. :param source_specs: A list of mount point or block device paths and optional size policy strings. :returns: A tuple (sources, size_policies) containing a list of normalized mount point and block device paths and a dictionary mapping source paths to size policies. """ sources = [] size_policies = {} # Parse size policies and normalise mount paths for spec in source_specs: if ":" in spec: (source, policy) = spec.rsplit(":", maxsplit=1) if not is_size_policy(policy): source = f"{source}:{policy}" policy = default_size_policy else: (source, policy) = (spec, default_size_policy) source = normpath(source) sources.append(source) size_policies[source] = policy return (sources, size_policies) def _check_revert_snapshot_set(snapset): """ Check that all snapshots in a snapshot set can be reverted. Returns if all checks pass or raises an exception otherwise. :returns: None :raises: ``NotImplementedError``, ``SnapmBusyError`` or ``SnapmPluginError`` """ if snapset.status == SnapStatus.INVALID: raise SnapmStateError( f"Cannot revert snapset {snapset.name} with invalid snapshots" ) # Check for ability to revert all snapshots in set for snapshot in snapset.snapshots: try: snapshot.check_revert() except NotImplementedError as err: _log_error( "Snapshot provider %s does not support revert", snapshot.provider.name, ) raise SnapmPluginError from err except SnapmBusyError as err: _log_error( "Revert already in progress for snapshot origin %s", snapshot.origin, ) raise SnapmBusyError( f"A revert has already started for snapshot set {snapset.name}" ) from err except SnapmPluginError as err: _log_error( "Revert prechecks failed for snapshot %s (%s)", snapshot.name, snapshot.provider.name, ) raise err def _check_snapset_status(snapset, operation): """ Check that a snapshot set status is not ``SnapStatus.INVALID`` or ``SnapStatus.REVERTING`` before carrying out ``operation``. """ if snapset.status in (SnapStatus.INVALID, SnapStatus.REVERTING): if snapset.status == SnapStatus.INVALID: _log_error("Cannot operate on invalid snapshot set '%s'", snapset.name) if snapset.status == SnapStatus.REVERTING: _log_error("Cannot operate on reverting snapshot set '%s'", snapset.name) raise SnapmStateError(f"Failed to {operation} snapset '{snapset.name}'") def _find_mount_point_for_devpath(devpath): """ Return the first mount point found in /proc/mounts that corresponds to ``device``, or the empty string if no mount point can be found. """ with open("/proc/mounts", "r", encoding="utf8") as mounts: for line in mounts: fields = line.split(" ") if exists(fields[0]): if samefile(devpath, fields[0]): return fields[1] return ""
[docs] class Manager: """ Snapshot Manager high level interface. """ plugins = [] snapshot_sets = [] by_name = {} by_uuid = {}
[docs] @suspend_signals def __init__(self): self.plugins = [] self.snapshot_sets = [] self.by_name = {} self.by_uuid = {} check_boom_config() self._boot_cache = BootCache() load_plugins() for plugin_class in PluginRegistry.plugins: try: plugin = plugin_class(_log) self.plugins.append(plugin) except SnapmNotFoundError as err: _log_debug( "Plugin dependencies missing: %s (%s), skipping.", plugin_class.__name__, err, ) except SnapmPluginError as err: _log_error("Disabling plugin %s: %s", plugin_class.__name__, err) self.discover_snapshot_sets()
[docs] def _find_and_verify_plugins( self, sources, size_policies, _requested_provider=None ): """ Find snapshot provider plugins for each source in ``sources`` and verify that a provider exists for each source present. :param sources: A list of source mount point or block device paths. :param size_policies: A dictionary mapping sources to size policies. :returns: A dictionary mapping sources to plugins. """ # Initialise provider mapping. provider_map = {k: None for k in sources} # Find provider plugins for sources for source in sources: _log_debug( "Probing plugins for %s with size policy %s", source, size_policies[source], ) if not ismount(source) and not S_ISBLK(stat(source).st_mode): raise SnapmPathError( f"Path '{source}' is not a block device or mount point" ) for plugin in self.plugins: if plugin.can_snapshot(source): provider_map[source] = plugin # Verify each mount point has a provider plugin for source in provider_map: if provider_map[source] is None: raise SnapmNoProviderError( f"Could not find snapshot provider for {source}" ) return provider_map
[docs] def _snapset_from_name_or_uuid(self, name=None, uuid=None): """ Look a snapshot set up by ``name`` or ``uuid``. Returns a ``SnapshotSet`` correponding to either ``name`` or ``uuid`, or raises ``SnapmError`` on error. :param name: The name of the snapshot set to look up. :param uuid: The UUID of the snapshot set to look up. :returns: A ``SnapshotSet`` corresponding to the given name or UUID. :raises: ``SnapmNotFoundError`` is the name or UUID cannot be found. ``SnapmInvalidIdentifierError`` if the name and UUID do not match. """ if name is not None: if name not in self.by_name: raise SnapmNotFoundError(f"Could not find snapshot set named {name}") snapset = self.by_name[name] elif uuid is not None: if uuid not in self.by_uuid: raise SnapmNotFoundError( f"Could not find snapshot set with uuid {uuid}" ) snapset = self.by_uuid[uuid] else: raise SnapmNotFoundError("A snapshot set name or UUID is required") if name and uuid: if self.by_name[name] != self.by_uuid[uuid]: raise SnapmInvalidIdentifierError( f"Conflicting name and UUID: {str(uuid)} does not match '{name}'" ) return snapset
[docs] def _check_recursion(self, origins): """ Verify that each entry in ``origins`` corresponds to a device that is not a snapshot belonging to another snapshot set. :param origins: A list of origin devices to check. :raises: ``SnapmRecursionError`` if an origin device is a snapshot. """ snapshot_devices = [ snapshot.devpath for snapset in self.snapshot_sets for snapshot in snapset.snapshots ] for source, device in origins.items(): if device in snapshot_devices: raise SnapmRecursionError( "Snapshots of snapshots are not supported: " f"{source} corresponds to snapshot device {device}" )
[docs] def discover_snapshot_sets(self): """ Discover snapshot sets by calling into each plugin to find individual snapshots and then aggregating them together into snapshot sets. Initialises the ``snapshot_sets``, ``by_name`` and ``by_uuid`` members with the discovered snapshot sets. """ self.snapshot_sets = [] self.by_name = {} self.by_uuid = {} snapshots = [] snapset_names = set() self._boot_cache.refresh_cache() _log_debug("Discovering snapshot sets for %s plugins", len(self.plugins)) for plugin in self.plugins: snapshots.extend(plugin.discover_snapshots()) _log_debug("Discovered %s managed snapshots", len(snapshots)) for snapshot in snapshots: snapset_names.add(snapshot.snapset_name) for snapset_name in snapset_names: set_snapshots = [ snap for snap in snapshots if snap.snapset_name == snapset_name ] set_timestamp = set_snapshots[0].timestamp for snap in set_snapshots: if snap.timestamp != set_timestamp: _log_warn( "Snapshot set '%s' has inconsistent timestamps", snapset_name ) continue snapset = SnapshotSet(snapset_name, set_timestamp, set_snapshots) # Associate snapset with boot entry if present if snapset.name in self._boot_cache.entry_cache: snapset.boot_entry = self._boot_cache.entry_cache[snapset.name] elif str(snapset.uuid) in self._boot_cache.entry_cache: snapset.boot_entry = self._boot_cache.entry_cache[str(snapset.uuid)] # Associate snapset with revert entry if present if snapset.name in self._boot_cache.revert_cache: snapset.revert_entry = self._boot_cache.revert_cache[snapset.name] elif str(snapset.uuid) in self._boot_cache.revert_cache: snapset.revert_entry = self._boot_cache.revert_cache[str(snapset.uuid)] self.snapshot_sets.append(snapset) self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset for snapshot in snapset.snapshots: snapshot.snapshot_set = snapset self.snapshot_sets.sort(key=lambda ss: ss.name) _log_debug("Discovered %d snapshot sets", len(self.snapshot_sets))
[docs] def find_snapshot_sets(self, selection=None): """ Find snapshot sets matching selection criteria. :param selection: Selection criteria to apply. :returns: A list of matching ``SnapshotSet`` objects. """ matches = [] # Use null selection criteria if unspecified selection = selection if selection else Selection() selection.check_valid_selection(snapshot_set=True) _log_debug("Finding snapshot sets for %s", repr(selection)) for snapset in self.snapshot_sets: if select_snapshot_set(selection, snapset): matches.append(snapset) _log_debug("Found %d snapshot sets", len(matches)) return matches
[docs] def find_snapshots(self, selection=None): """ Find snapshots matching selection criteria. :param selection: Selection criteria to apply. :returns: A list of matching ``Snapshot`` objects. """ matches = [] # Use null selection criteria if unspecified selection = selection if selection else Selection() selection.check_valid_selection(snapshot=True, snapshot_set=True) _log_debug("Finding snapshots for %s", repr(selection)) for snapset in self.snapshot_sets: for snapshot in snapset.snapshots: if select_snapshot(selection, snapshot): matches.append(snapshot) _log_debug("Found %d snapshots", len(matches)) return matches
[docs] def _validate_snapset_name(self, name): """ Validate a snapshot set name. Returns if ``name`` is a valid snapshot set name or raises an appropriate exception otherwise. :param name: The snapshot set name to validate. :raises: ``SnapmExistsError`` if the name is already in use, or ``SnapmInvalidIdentifierError`` if the name fails validation. """ invalid_chars = ["/", "\\", "_", " "] if name in self.by_name: raise SnapmExistsError(f"Snapshot set named '{name}' already exists") for char in invalid_chars: if char in name: raise SnapmInvalidIdentifierError( f"Snapshot set name cannot include '{char}'" )
# pylint: disable=too-many-branches,too-many-locals,too-many-statements
[docs] @suspend_signals def create_snapshot_set( self, name, source_specs, default_size_policy=None, boot=False, revert=False ): """ Create a snapshot set of the supplied mount points with the name ``name``. :param name: The name of the snapshot set. :param source_specs: A list of mount point and block device paths to include in the set. :param default_size_policy: A default size policy to use for the set. :raises: ``SnapmExistsError`` if the name is already in use, or ``SnapmInvalidIdentifierError`` if the name fails validation. """ self._validate_snapset_name(name) # Parse size policies and normalise mount paths (sources, size_policies) = _parse_source_specs( source_specs, default_size_policy ) # Initialise provider mapping. provider_map = self._find_and_verify_plugins(sources, size_policies, None) for provider in set(provider_map.values()): provider.start_transaction() timestamp = floor(time()) origins = {} mounts = {} for source in provider_map: if S_ISBLK(stat(source).st_mode): mounts[source] = _find_mount_point_for_devpath(source) origins[source] = source mount = mounts[source] if mount in provider_map: raise SnapmInvalidIdentifierError( f"Duplicate snapshot source {source} already added to {name} as {mount}" ) else: mount = source origins[source] = provider_map[source].origin_from_mount_point(mount) try: provider_map[source].check_create_snapshot( origins[source], name, timestamp, mount, size_policies[source] ) except SnapmInvalidIdentifierError as err: _log_error( "Error creating %s snapshot: %s", provider_map[source].name, err ) raise SnapmInvalidIdentifierError( f"Snapset name {name} too long" ) from err except SnapmNoSpaceError as err: _log_error( "Error creating %s snapshot: %s", provider_map[source].name, err ) raise SnapmNoSpaceError( f"Insufficient free space for snapshot set {name}" ) from err self._check_recursion(origins) _suspend_journal() snapshots = [] for source in provider_map: if S_ISBLK(stat(source).st_mode): mount = mounts[source] else: mount = source try: snapshots.append( provider_map[source].create_snapshot( origins[source], name, timestamp, mount, size_policies[source] ) ) except SnapmError as err: _log_error("Error creating snapshot set member %s: %s", name, err) _resume_journal() for snapshot in snapshots: snapshot.delete() raise SnapmPluginError( f"Could not create all snapshots for set {name}" ) from err _resume_journal() for provider in set(provider_map.values()): provider.end_transaction() snapset = SnapshotSet(name, timestamp, snapshots) for snapshot in snapset.snapshots: snapshot.snapshot_set = snapset if boot or revert: snapshot.autoactivate = True snapshot.activate() if boot: try: create_snapset_boot_entry(snapset) except (OSError, ValueError) as err: _log_error("Failed to create snapshot set boot entry: %s", err) snapset.delete() raise SnapmCalloutError from err if revert: try: create_snapset_revert_entry(snapset) except (OSError, ValueError) as err: _log_error("Failed to create snapshot set revert boot entry: %s", err) snapset.delete() raise SnapmCalloutError from err self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset self.snapshot_sets.append(snapset) return snapset
[docs] @suspend_signals def rename_snapshot_set(self, old_name, new_name): """ Rename snapshot set ``old_name`` as ``new_name``. :param old_name: The name of the snapshot set to be renamed. :param new_name: The new name of the snapshot set. :raises: ``SnapmExistsError`` if the name is already in use, or ``SnapmInvalidIdentifierError`` if the name fails validation. """ if old_name not in self.by_name: raise SnapmNotFoundError(f"Cannot find snapshot set named {old_name}") self._validate_snapset_name(new_name) snapset = self.by_name[old_name] _check_snapset_status(snapset, "rename") # Remove references to old set self.by_name.pop(snapset.name) self.by_uuid.pop(snapset.uuid) self.snapshot_sets.remove(snapset) try: snapset.rename(new_name) except SnapmError as err: self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset self.snapshot_sets.append(snapset) raise err self.by_name[snapset.name] = snapset self.by_uuid[snapset.uuid] = snapset self.snapshot_sets.append(snapset) return snapset
[docs] @suspend_signals def delete_snapshot_sets(self, selection): """ Remove snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to remove. """ sets = self.find_snapshot_sets(selection=selection) deleted = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: delete_snapset_boot_entry(snapset) delete_snapset_revert_entry(snapset) snapset.delete() self.snapshot_sets.remove(snapset) self.by_name.pop(snapset.name) self.by_uuid.pop(snapset.uuid) deleted += 1 self._boot_cache.refresh_cache() return deleted
# pylint: disable=too-many-branches
[docs] @suspend_signals def resize_snapshot_set( self, source_specs, name=None, uuid=None, default_size_policy=None ): """ Resize snapshot set named ``name`` or having UUID ``uuid``. Request to resize each snapshot included in ``source_specs`` according to the given size policy, or apply ``default_size_policy`` if set. :param name: The name of the snapshot set to resize. :param uuid: The UUID of the snapshot set to resize. :param source_specs: A list of mount points and optional size policies. :param default_size_policy: A default size policy to apply to the resize. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) if source_specs: # Parse size policies and normalise mount paths (sources, size_policies) = _parse_source_specs( source_specs, default_size_policy ) else: sources = [snapshot.source for snapshot in snapset.snapshots] size_policies = {source: default_size_policy for source in sources} snapset.resize(sources, size_policies)
[docs] @suspend_signals def revert_snapshot_set(self, name=None, uuid=None): """ Revert snapshot set named ``name`` or having UUID ``uuid``. Request to revert each snapshot origin within each snapshot set to the state at the time the snapshot was taken. :param name: The name of the snapshot set to revert. :param uuid: The UUID of the snapshot set to revert. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) _check_revert_snapshot_set(snapset) # Snapshot boot entry becomes invalid as soon as revert is initiated. delete_snapset_boot_entry(snapset) snapset.revert() self._boot_cache.refresh_cache() return snapset
[docs] def revert_snapshot_sets(self, selection): """ Revert snapshot sets matching selection criteria ``selection``. Request to revert each snapshot origin within each snapshot set to the state at the time the snapshot was taken. :param selection: Selection criteria for snapshot sets to revert. """ sets = self.find_snapshot_sets(selection=selection) reverted = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: self.revert_snapshot_set(name=snapset.name) reverted += 1 return reverted
[docs] @suspend_signals def activate_snapshot_sets(self, selection): """ Activate snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to activate. """ sets = self.find_snapshot_sets(selection=selection) activated = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: _check_snapset_status(snapset, "activate") snapset.activate() activated += 1 return activated
[docs] @suspend_signals def deactivate_snapshot_sets(self, selection): """ Deactivate snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to deactivate. """ sets = self.find_snapshot_sets(selection=selection) deactivated = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: _check_snapset_status(snapset, "deactivate") snapset.deactivate() deactivated += 1 return deactivated
[docs] @suspend_signals def set_autoactivate(self, selection, auto=False): """ Set autoactivation state for snapshot sets matching selection criteria ``selection``. :param selection: Selection criteria for snapshot sets to set autoactivation. :param auto: ``True`` to enable autoactivation or ``False`` otherwise. """ sets = self.find_snapshot_sets(selection=selection) changed = 0 if not sets: raise SnapmNotFoundError( f"Could not find snapshot sets matching {selection}" ) for snapset in sets: _check_snapset_status(snapset, "set autoactivate status for") snapset.autoactivate = auto changed += 1 return changed
[docs] @suspend_signals def create_snapshot_set_boot_entry(self, name=None, uuid=None): """ Create a snapshot boot entry for the specified snapshot set. :param name: The name of the snapshot set. :param uuid: The UUID of the snapshot set. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) snapset.autoactivate = True if not snapset.autoactivate: raise SnapmPluginError( "Could not enable autoactivation for all snapshots in snapshot " f"set {snapset.name}" ) if snapset.boot_entry is not None: raise SnapmExistsError( f"Boot entry already associated with snapshot set {snapset.name}" ) create_snapset_boot_entry(snapset) self._boot_cache.refresh_cache()
[docs] @suspend_signals def create_snapshot_set_revert_entry(self, name=None, uuid=None): """ Create a revert boot entry for the specified snapshot set. :param name: The name of the snapshot set. :param uuid: The UUID of the snapshot set. """ snapset = self._snapset_from_name_or_uuid(name=name, uuid=uuid) if snapset.revert_entry is not None: raise SnapmExistsError( f"Revert entry already associated with snapshot set {snapset.name}" ) create_snapset_revert_entry(snapset) self._boot_cache.refresh_cache()
__all__ = [ "Plugin", "Manager", ]