# Copyright Red Hat
#
# snapm/manager/plugins/__init__.py - Snapshot Manager plugins
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: GPL-2.0-only
"""
Snapshot manager plugin helpers.
"""
import os
from os.path import sep as path_sep, ismount
_MOUNT_SEPARATOR = "-"
_ESCAPED_MOUNT_SEPARATOR = "--"
_PROC_MOUNTS = "/proc/self/mounts"
DEV_PREFIX = "/dev"
DEV_MAPPER_PREFIX = "/dev/mapper/"
# dmsetup options
DMSETUP_CMD = "dmsetup"
DMSETUP_SPLITNAME = "splitname"
DMSETUP_INFO = "info"
DMSETUP_LVM_SUBSYS = "LVM"
DMSETUP_NO_HEADINGS = "--noheadings"
DMSETUP_COLUMNS = "--columns"
DMSETUP_FIELDS_VG_LV = "-ovg_name,lv_name"
DMSETUP_FIELDS_UUID = "-ouuid"
DMSETUP_REPORT_SEP = ":"
# Fields: origin_name-snapset_snapset-name_timestamp_mount-point
SNAPSHOT_NAME_FORMAT = "%s-snapset_%s_%d_%s"
_bad_chars = [
":",
"?",
"@",
"[",
"]",
"#",
"<",
">",
"{",
"}",
"~",
";",
"%",
"$",
"£",
'"',
"!",
"*",
"&",
"^",
"`",
"'",
]
def _escape_bad_chars(path):
"""
Encode illegal characters in mount point path.
:param path: The path to escape.
:returns: The escaped path.
"""
escaped = ""
for char in path:
if char in _bad_chars:
escaped = escaped + "." + char.encode("utf8").hex()
else:
if char == ".":
escaped = escaped + ".."
else:
escaped = escaped + char
return escaped
def _unescape_bad_chars(path):
"""
Decode illegal characters in mount point path.
:param path: The path to unescape.
:returns: The unescaped path.
"""
def decode_byte(byte):
return bytearray.fromhex(byte).decode("utf8")
unescaped = ""
i = 0
while i < len(path):
if path[i] == ".":
if path[i + 1] == ".":
unescaped = unescaped + "."
i += 1
else:
unescaped = unescaped + decode_byte(path[i + 1 : i + 3])
i += 2
else:
unescaped = unescaped + path[i]
i += 1
return unescaped
[docs]
def encode_mount_point(mount_point):
"""
Encode mount point paths.
Encode a mount point for use in a snapshot name by replacing the path
separator with '-'.
"""
if mount_point == "/":
return _MOUNT_SEPARATOR
mount_point = _escape_bad_chars(mount_point)
parts = [
part.replace(_MOUNT_SEPARATOR, _ESCAPED_MOUNT_SEPARATOR)
for part in mount_point.split(path_sep)
]
return _MOUNT_SEPARATOR.join(parts)
def _split_mount_separators(mount_str):
"""
Split string by mount separator handling escaped separators.
Split the mount string ``mount_str`` at ``_MOUNT_SEPARATOR`` boundaries,
into path components, ignoring ``_ESCAPED_MOUNT_SEPARATOR``.
"""
result = []
current = ""
i = 0
while i < len(mount_str):
if mount_str[i] == _MOUNT_SEPARATOR:
if i < len(mount_str) - 1 and mount_str[i + 1] == _MOUNT_SEPARATOR:
current += _ESCAPED_MOUNT_SEPARATOR
i += 1
else:
result.append(current)
current = ""
else:
current += mount_str[i]
i += 1
result.append(current) # Append the remaining segment
return result
[docs]
def decode_mount_point(mount_str):
"""
Parse the mount point component of a snapshot name.
"""
mount_str = _unescape_bad_chars(mount_str)
mount_parts = _split_mount_separators(mount_str)
unescaped_parts = [
part.replace(_ESCAPED_MOUNT_SEPARATOR, _MOUNT_SEPARATOR) for part in mount_parts
]
return path_sep.join(unescaped_parts)
[docs]
def parse_snapshot_name(full_name, origin):
"""
Attempt to parse a snapshot set snapshot name.
Returns a tuple of (snapset_name, timestamp, mount_point) if ``full_name``
is a valid snapset snapshot name, or ``None`` otherwise.
"""
if not full_name.startswith(origin):
return None
base = full_name.removeprefix(origin + "-")
if "_" not in base:
return None
fields = base.split("_", maxsplit=3)
if len(fields) != 4:
return None
if fields[0] != "snapset":
return None
snapset_name = fields[1]
timestamp = int(fields[2])
mount_point = decode_mount_point(fields[3])
# (snapset_name, timestamp, mount)
return (snapset_name, timestamp, mount_point)
def _parse_proc_mounts_line(mount_line):
"""
Parse ``/proc/mounts`` line.
Parse a line of ``/proc/mounts`` data and return a tuple
containing the device and mount point.
"""
(device, mount_point, _) = mount_line.split(maxsplit=2)
return (device, mount_point)
[docs]
def device_from_mount_point(mount_point):
"""
Convert a mount point path to a corresponding device path.
Return the device corresponding to the file system mount point
``mount_point`` according to /proc/self/mounts.
"""
# Ignore any trailing '/' in mount point path
if mount_point != path_sep:
mount_point = mount_point.rstrip(path_sep)
with open(_PROC_MOUNTS, "r", encoding="utf8") as mounts:
for line in mounts.readlines():
(device, mount_path) = _parse_proc_mounts_line(line)
if mount_path == mount_point:
return device
raise KeyError(f"Mount point {mount_point} not found")
[docs]
def mount_point_space_used(mount_point):
"""
Retrieve space usage for a mount point path.
Return the space used on the file system mounted at ``mount_point``
as a value in bytes.
"""
if not mount_point or not ismount(mount_point):
return -1
stat = os.statvfs(mount_point)
return (stat.f_blocks - stat.f_bfree) * stat.f_frsize
__all__ = [
"DEV_PREFIX",
"DEV_MAPPER_PREFIX",
"DMSETUP_CMD",
"DMSETUP_SPLITNAME",
"DMSETUP_INFO",
"DMSETUP_LVM_SUBSYS",
"DMSETUP_NO_HEADINGS",
"DMSETUP_COLUMNS",
"DMSETUP_FIELDS_VG_LV",
"DMSETUP_FIELDS_UUID",
"DMSETUP_REPORT_SEP",
"encode_mount_point",
"decode_mount_point",
"parse_snapshot_name",
"device_from_mount_point",
"mount_point_space_used",
"format_snapshot_name",
]