# Copyright Red Hat
#
# snapm/report.py - Text reporting
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""The snapm reporting module contains a set of classes for creating
simple text based tabular reports for a user-defined set of object
types and fields. No restrictions are placed on the types of object
that can be reported: users of the ``Report`` classes may define
additional object types outside the ``snapm`` package and include these
types in reports generated by the module.
The fields displayed in a specific report may be selected from the
available set of fields by specifying a simple comma-separated string
list of field names (in display order). In addition, custom multi-column
sorting is possible using a similar string notation.
The ``Report`` module is closely based on the ``device-mapper``
reporting engine and shares many features and behaviours with device
mapper reports.
"""
import logging
import sys
import uuid
from json import dumps
from snapm import size_fmt, SNAPSET_INDEX_NONE
def find_minimum_sha_prefix(shas, min_prefix):
"""Find the minimum SHA prefix length guaranteeing uniqueness.
Find the minimum unique prefix for the set of SHA IDs in the set
``shas``.
:param shas: A set of SHA IDs
:param min_prefix: Initial minimum prefix value
:returns: The minimum unique prefix length for the set
:rtype: int
"""
shas = list(shas)
shas.sort()
for sha in shas:
if shas.index(sha) == len(shas) - 1:
continue
def _next_sha(shas, sha):
return shas[shas.index(sha) + 1]
while sha[:min_prefix] == _next_sha(shas, sha)[:min_prefix]:
min_prefix += 1
return min_prefix
_log = logging.getLogger(__name__)
_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error
_DEFAULT_COLUMNS = 80
REP_NUM = "num"
REP_STR = "str"
REP_SHA = "sha"
REP_IDX = "idx"
REP_TIME = "time"
REP_UUID = "uuid"
REP_SIZE = "size"
REP_STR_LIST = "strlist"
_dtypes = [
REP_NUM,
REP_STR,
REP_SHA,
REP_IDX,
REP_TIME,
REP_UUID,
REP_SIZE,
REP_STR_LIST,
]
_left_align_dtypes = [REP_STR, REP_SHA, REP_TIME, REP_UUID, REP_STR_LIST]
_right_align_dtypes = [REP_NUM, REP_IDX, REP_SIZE]
_DEFAULT_WIDTH = 8
ALIGN_LEFT = "left"
ALIGN_RIGHT = "right"
_align_types = [ALIGN_LEFT, ALIGN_RIGHT]
ASCENDING = "ascending"
DESCENDING = "descending"
STANDARD_QUOTE = "'"
STANDARD_PAIR = "="
MIN_SHA_WIDTH = 7
string_types = (str,)
num_types = (int, float)
# pylint: disable=too-many-instance-attributes
[docs]
class ReportOpts:
"""
ReportOpts()
Options controlling the formatting and output of a report.
"""
# pylint: disable=too-many-arguments
[docs]
def __init__(
self,
columns=_DEFAULT_COLUMNS,
headings=True,
buffered=True,
separator=" ",
field_name_prefix="",
unquoted=True,
aligned=True,
json=False,
columns_as_rows=False,
report_file=sys.stdout,
):
"""Initialise ReportOpts object.
Initialise a ``ReportOpts`` object to control output
of a ``Report``.
:param columns: the number of columns to use for output.
:param headings: a boolean indicating whether to output
column headings for this report.
:param buffered: a boolean indicating whether to buffer
output from this report.
:param report_file: a file to which output will be sent.
:returns: a new ``ReportOpts`` object.
:rtype: ``<class ReportOpts>``
"""
self.columns = columns
self.headings = headings
self.buffered = buffered
self.separator = separator
self.field_name_prefix = field_name_prefix
self.unquoted = unquoted
self.aligned = aligned
self.json = json
self.columns_as_rows = columns_as_rows
self.report_file = report_file
[docs]
def __str__(self):
return (
f"columns={self.columns}\n"
f"headings={self.headings}\n"
f"buffered={self.buffered}\n"
f"separator={self.separator}\n"
f"field_name_prefix={self.field_name_prefix}\n"
f"unquoted={self.unquoted}\n"
f"aligned={self.aligned}\n"
f"json={self.json}\n"
f"columns_as_rows={self.columns_as_rows}\n"
f"report_file={self.report_file}"
)
[docs]
def __eq__(self, other):
if not isinstance(other, ReportOpts):
return False
return all(
[
self.columns == other.columns,
self.headings == other.headings,
self.buffered == other.buffered,
self.separator == other.separator,
self.field_name_prefix == other.field_name_prefix,
self.unquoted == other.unquoted,
self.aligned == other.aligned,
self.json == other.json,
self.columns_as_rows == other.columns_as_rows,
self.report_file == other.report_file,
]
)
# pylint: disable=too-few-public-methods
[docs]
class ReportObjType:
"""
ReportObjType()
Class representing a type of object to be reported on.
Instances of ``ReportObjType`` must specify an identifier,
a description, and a data function that will return the correct
type of object from a compound object containing data objects
of different types. For reports that use only a single object
type the ``data_fn`` member may be simply ``lambda x: x``.
"""
[docs]
def __init__(self, objtype, desc, prefix, data_fn):
"""
Initialise ReportObjType.
Initialise a new ``ReportObjType`` object with the
specified ``objtype``, ``desc``, optional ``prefix`` and
``data_fn``. The ``objtype`` must be an integer power of two
that is unique within a given report. The ``data_fn`` should
accept an object as its only argument and return an object
of the requested type.
"""
if not objtype or objtype < 0:
raise ValueError("ReportObjType objtype cannot be <= 0.")
if not desc:
raise ValueError("ReportObjType desc cannot be empty.")
if not data_fn:
raise ValueError("ReportObjType requires data_fn.")
self.objtype = objtype
self.desc = desc
self.prefix = prefix
self.data_fn = data_fn
[docs]
class FieldType:
"""
FieldType()
The ``FieldType`` class describes the properties of a field
available in a ``Report`` instance.
"""
# pylint: disable=too-many-arguments
[docs]
def __init__(self, objtype, name, head, desc, width, dtype, report_fn, align=None):
"""
Initialise new FieldType object.
Initialise a new ``FieldType`` object with the specified
properties.
:param objtype: The numeric object type ID (power of two)
:param name: The field name used to select display fields
:param desc: A human-readable description of the field
:param width: The default (initial) field width
:param dtype: The Report data type of the field
:param report_fn: The field reporting function
:param align: The field alignment value
:returns: A new ReportFieldType object
:rtype: ReportFieldType
"""
if not objtype:
raise ValueError("'objtype' must be non-zero")
if not name:
raise ValueError("'name' is required")
self.objtype = objtype
self.name = name
self.head = head
self.desc = desc
if dtype not in _dtypes:
raise ValueError(f"Invalid field dtype: {dtype}")
if align and align not in _align_types:
raise ValueError(f"Invalid field alignment: {align}")
self.dtype = dtype
self.report_fn = report_fn
if not align:
if dtype in _left_align_dtypes:
self.align = ALIGN_LEFT
elif dtype in _right_align_dtypes:
self.align = ALIGN_RIGHT
else:
self.align = align
if width < 0:
raise ValueError("Field width cannot be < 0")
self.width = width if width else _DEFAULT_WIDTH
[docs]
class FieldProperties:
"""
Properties of a field instance.
"""
[docs]
def __init__(
self,
field_num=None,
initial_width=0,
width=0,
objtype=None,
dtype=None,
align=None,
hidden=False,
implicit=False,
sort_key=False,
sort_dir=None,
compact_one=False,
compacted=False,
sort_posn=None,
):
self.field_num = field_num
self.initial_width = initial_width
self.width = width
self.objtype = objtype
self.dtype = dtype
self.align = align
#
# Field flags
#
self.hidden = hidden
self.implicit = implicit
self.sort_key = sort_key
self.sort_dir = sort_dir
self.compact_one = compact_one # used for implicit fields
self.compacted = compacted
self.sort_posn = sort_posn
[docs]
class Field:
"""
Field()
A ``Field`` represents an instance of a ``FieldType``
including its associated data values.
"""
#: reference to the containing Report
[docs]
def __init__(self, report, props):
"""
Initialise a new Field object.
Initialise a Field object and configure the supplied
``report`` and ``props`` attributes.
:param report: The Report that owns this field
:param props: The FieldProperties object for this field
"""
self.report = report
self.props = props
self.report_string = ""
self.sort_value = None
[docs]
def report_str(self, value):
"""
Report a string value for this Field object.
Set the value for this field to the supplied ``value``.
:param value: The string value to set
:rtype: None
"""
if not isinstance(value, string_types):
raise TypeError("Value for report_str() must be a string type.")
self.set_value(value, sort_value=value)
[docs]
def report_sha(self, value):
"""
Report a SHA value for this Field object.
Set the value for this field to the supplied ``value``.
:param value: The SHA value to set
:rtype: None
"""
if not isinstance(value, string_types):
raise TypeError("Value for report_sha() must be a string type.")
self.set_value(value, sort_value=value)
[docs]
def report_idx(self, value):
"""
Report an index value for this Field object.
Set the value for this field to the supplied ``value``.
:param value: The index value to set
:rtype: None
"""
if not isinstance(value, num_types):
raise TypeError("Value for report_idx() must me a numeric type.")
if value == SNAPSET_INDEX_NONE:
report_string = "-"
sort_value = value
else:
report_string = str(value)
sort_value = value
self.set_value(report_string, sort_value=sort_value)
[docs]
def report_num(self, value):
"""
Report a numeric value for this Field object.
Set the value for this field to the supplied ``value``.
:param value: The numeric value to set
:rtype: None
"""
if value is not None and not isinstance(value, num_types):
raise TypeError("Value for report_num() must be a numeric type.")
report_string = str(value)
sort_value = value if value is not None else -1
self.set_value(report_string, sort_value=sort_value)
[docs]
def report_time(self, value):
"""
Report a time value for this Field object.
Set the value for this field to the supplied ``value``.
:param value: The time value to set
:rtype: None
"""
if not isinstance(value, string_types):
raise TypeError("Value for report_time() must be a string type.")
self.set_value(value, sort_value=value)
[docs]
def report_uuid(self, value):
"""
Report a uuid value for this Field object.
Set the value for this field to the supplied ``value``.
:param value: The uuid value to set
:rtype: None
"""
if isinstance(value, uuid.UUID):
value = str(value)
else:
raise TypeError("Value for report_uuid() must be a UUID type.")
self.set_value(value, sort_value=value)
[docs]
def report_size(self, value):
"""
Report a size value for this Field object.
Set the value for this field to the supplied ``value``, converted
to a human readable string.
:param value: The size value to set in bytes.
:rtype: None
"""
if isinstance(value, int):
value_str = size_fmt(value)
else:
raise TypeError("Value for report_size() must be an int type.")
self.set_value(value_str, sort_value=value)
[docs]
def report_str_list(self, value):
"""
Report a string list value for this Field object.
Set the value for this field to the strings contained in ``value``.
:param value: A list of strings
:rtype: None
"""
if not isinstance(value, list):
raise TypeError("Value for report_str_list() must be a list type.")
if not all(isinstance(v, string_types) for v in value):
raise TypeError("Value for report_str_list() must be a list of strings.")
value = sorted(value)
list_value = ", ".join(value)
self.set_value(list_value, sort_value=list_value)
[docs]
def set_value(self, report_string, sort_value=None):
"""
Report an arbitrary value for this Field object.
Set the value for this field to the supplied ``value``,
and set the field's ``sort_value`` to the supplied
``sort_value``.
:param report_string: The string value to set
:param sort_value: The sort value
:rtype: None
"""
if report_string is None:
raise ValueError("No value assigned to field.")
self.report_string = report_string
self.sort_value = sort_value if sort_value else report_string
class Row:
"""
Row()
A class representing a single data row making up a report.
"""
def __init__(self, report):
self.report = report
self._fields = []
self.sort_fields = None
def add_field(self, field):
"""
Add a field to this Row.
:param field: The field to be added
:rtype: None
"""
self._fields.append(field)
def __none_returning_fn(_obj):
"""
Dummy data function for special report types.
:returns: None
"""
return None
# Implicit report fields and types
PR_SPECIAL = 0x80000000
_implicit_special_report_types = [
ReportObjType(PR_SPECIAL, "Special", "special_", __none_returning_fn)
]
def __no_report_fn(_field, _data):
"""
Dummy report function for special report types.
:returns: None
"""
return
_SPECIAL_FIELD_HELP_NAME = "help"
_implicit_special_report_fields = [
FieldType(
PR_SPECIAL,
_SPECIAL_FIELD_HELP_NAME,
"Help",
"Show help",
8,
REP_STR,
__no_report_fn,
)
]
# Report class
[docs]
class Report:
"""
Report()
A class representing a configurable text report with multiple
caller-defined fields. An optional title may be provided and the
``fields`` argument must contain a list of ``Field`` objects
describing the required report.
"""
# Implicit field support
_implicit_types = _implicit_special_report_types
_implicit_fields = _implicit_special_report_fields
opts = None
def __help_requested(self):
"""
Check for presence of 'help' fields in output selection.
Check the fields making up this Report and return True
if any valid 'help' field synonym is present.
:returns: True if help was requested or False otherwise
"""
for field_prop in self._field_properties:
if field_prop.implicit:
name = self._implicit_fields[field_prop.field_num].name
if name == _SPECIAL_FIELD_HELP_NAME:
return True
return False
def __get_longest_field_name_len(self, fields):
"""
Find the longest field name length.
:returns: the length of the longest configured field name
"""
max_len = 0
for field in fields:
cur_len = len(field.name)
max_len = cur_len if cur_len > max_len else max_len
for obj_type in self._types:
cur_len = len(obj_type.prefix) + 3
max_len = cur_len if cur_len > max_len else max_len
return max_len
def __display_fields(self, display_field_types):
"""
Display report fields help message.
Display a list of valid fields for this ``Report``.
:param fields: The list of fields to display
:param display_field_types: A boolean controlling whether
field types (str, SHA, num)
are included in help output
"""
out = self.opts.report_file
fields = self._fields
name_len = self.__get_longest_field_name_len(fields)
last_desc = ""
banner = "-" * 79
for field in fields:
obj_type = self.__find_type(field.objtype)
if obj_type:
desc = obj_type.desc
else:
desc = ""
if desc != last_desc:
if last_desc:
out.write(" \n")
desc_len = len(desc) + 7
out.write(f"{desc} Fields\n")
out.write(f"{banner:{desc_len}.{desc_len}}\n")
if display_field_types:
open_type = " ["
field_dtype = field.dtype
close_type = "]"
else:
open_type = field_dtype = close_type = ""
out.write(
f" {field.name:{name_len}} - {field.desc}"
f"{open_type}{field_dtype}{close_type}\n"
)
last_desc = desc
def __find_type(self, report_type):
"""
Resolve numeric type to corresponding ReportObjType.
:param report_type: The numeric report type to look up
:returns: The requested ReportObjType.
:raises: ValueError if no matching type was found.
"""
for obj_type in self._implicit_types:
if obj_type.objtype == report_type:
return obj_type
for obj_type in self._types:
if obj_type.objtype == report_type:
return obj_type
raise ValueError(f"Unknown report object type: {report_type}")
def __copy_field(self, field_num, implicit):
"""
Copy field definition to FieldProperties
Copy values from a FieldType to FieldProperties.
:param field_num: The number of this field (fields order)
:param implicit: True if this field is implicit, else False
"""
return FieldProperties(
field_num=field_num,
width=self._fields[field_num].width,
initial_width=self._fields[field_num].width,
implicit=implicit,
objtype=self.__find_type(self._fields[field_num].objtype),
dtype=self._fields[field_num].dtype,
align=self._fields[field_num].align,
)
def __add_field(self, field_num, implicit):
"""
Add a field to this Report.
Add the specified FieldType to this Report and configure
FieldProperties for it.
:param field_num: The number of this field (fields order)
:param implicit: True if this field is implicit, else False
"""
field_props = self.__copy_field(field_num, implicit)
if field_props.hidden:
self._field_properties.insert(0, field_props)
else:
self._field_properties.append(field_props)
return field_props
def __get_field(self, field_name):
"""
Look up a field by name.
Attempt to find the field named in ``field_name`` in this
Report's tables of implicit and user-defined fields,
returning a ``(field, implicit)`` tuple, where field
contains the requested ``FieldType``, and ``implicit``
is a boolean indicating whether this field is implicit or
not.
:param field_num: The number of this field (fields order)
:param implicit: True if this field is implicit, else False
"""
for field in self._implicit_fields:
if field.name == field_name:
return (self._implicit_fields.index(field), True)
for field in self._fields:
objtype = self.__find_type(field.objtype)
if field_name in (field.name, objtype.prefix + field.name):
return (self._fields.index(field), False)
raise ValueError(f"No matching field name: {field_name}")
def __field_match(self, field_name, type_only):
"""
Attempt to match a field and optionally update report type.
Look up the named field and, if ``type_only`` is True,
update this Report's ``report_types`` mask to include
the field's type identifier. If ``type_only`` is False the
field is also added to this Report's field list.
:param field_name: A string identifying the field
:param type_only: True if this call should only update types
"""
if "_" in field_name and field_name.split("_", maxsplit=1)[1] == "all":
prefix = field_name.split("_", maxsplit=1)[0]
for field in self._fields:
objtype = self.__find_type(field.objtype)
if objtype.prefix[:-1] == prefix:
self.__field_match(prefix + "_" + field.name, type_only)
return None
try:
(f_idx, implicit) = self.__get_field(field_name)
if type_only:
if implicit:
self.report_types |= self._implicit_fields[f_idx].objtype
else:
self.report_types |= self._fields[f_idx].objtype
return None
return self.__add_field(f_idx, implicit)
except ValueError as err:
_log_error("Error adding field %s", field_name)
raise err
return None
def __parse_fields(self, field_format, type_only):
"""
Parse report field list.
Parse ``field_format`` and attempt to match the names of
field names found to registered FieldType fields.
If ``type_only`` is True only the ``report_types`` field
is updated: otherwise the parsed fields are added to the
Report's field list.
:param field_format: The list of fields to parse
:param type_only: True if this call should only update types
"""
for word in field_format.split(","):
# Allow consecutive commas
if not word:
continue
try:
self.__field_match(word, type_only)
except ValueError as err:
self.__display_fields(True)
_log_error("Unrecognised field: %s", word)
raise err
def __add_sort_key(self, field_num, sort, implicit, type_only):
"""
Add a new sort key to this Report
Add the sort key identified by ``field_num`` to this list
of sort keys for this Report.
:param field_num: The field number of the key to add
:param sort: The sort direction for this key
:param implicit: True if field_num is implicit, else False
:param type_only: True if this call should only update types
"""
fields = self._implicit_fields if implicit else self._fields
found = None
for field_props in self._field_properties:
if field_props.implicit == implicit and field_props.field_num == field_num:
found = field_props
if not found:
if type_only:
self.report_types |= fields[field_num].objtype
return
found = self.__add_field(field_num, implicit)
if found.sort_key:
_log_info("Ignoring duplicate sort field: %s", fields[field_num].name)
found.sort_key = True
found.sort_dir = sort
found.sort_posn = self.keys_count
self.keys_count += 1
def __key_match(self, key_name, type_only):
"""
Attempt to match a sort key and update report type.
Look up the named sort key and, if ``type_only`` is True,
update this Report's ``report_types`` mask to include
the field's type identifier. If ``type_only`` is False the
field is also added to this Report's field list.
:param field_name: A string identifying the sort key
:param type_only: True if this call should only update types
"""
sort_dir = None
if not key_name:
raise ValueError("Sort key name cannot be empty")
if key_name.startswith("+"):
sort_dir = ASCENDING
key_name = key_name[1:]
elif key_name.startswith("-"):
sort_dir = DESCENDING
key_name = key_name[1:]
else:
sort_dir = ASCENDING
for field in self._implicit_fields:
fields = self._implicit_fields
if field.name == key_name:
return self.__add_sort_key(
fields.index(field), sort_dir, True, type_only
)
for field in self._fields:
fields = self._fields
objtype = self.__find_type(field.objtype)
if key_name in (field.name, objtype.prefix + field.name):
return self.__add_sort_key(
fields.index(field), sort_dir, False, type_only
)
raise ValueError(f"Unknown sort key name: {key_name}")
def __parse_keys(self, keys, type_only):
"""
Parse report sort key list.
Parse ``keys`` and attempt to match the names of
sort keys found to registered FieldType fields.
If ``type_only`` is True only the ``report_types`` field
is updated: otherwise the parsed fields are added to the
Report's sort key list.
:param field_format: The list of fields to parse
:param type_only: True if this call should only update types
"""
if not keys:
return
for word in keys.split(","):
# Allow consecutive commas
if not word:
continue
try:
self.__key_match(word, type_only)
except ValueError as err:
self.__display_fields(True)
_log_error("Unrecognised field: %s", word)
raise err
# pylint: disable=too-many-arguments
[docs]
def __init__(self, types, fields, output_fields, opts, sort_keys, title):
"""
Initialise Report.
Initialise a new ``Report`` object with the specified fields
and output control options.
:param types: List of ReportObjType used in this report.
:param fields: A list of ``Field`` field descriptions.
:param output_fields: An optional list of output fields to
be rendered by this report.
:param opts: An instance of ``ReportOpts`` or None.
:returns: A new report object.
:rtype: ``Report``.
"""
self.report_types = 0
self.keys_count = 0
self._data = None
self._rows = None
self._field_properties = None
self._header_written = False
self._field_calc_needed = True
self._sort_required = False
self._already_reported = False
self._fields = fields
self._types = types
self._title = title
if opts.buffered:
self._sort_required = True
self.opts = opts if opts else ReportOpts()
self._rows = []
self._field_properties = []
# set field_prefix from type
# canonicalize_field_ids()
if not output_fields:
output_fields = ",".join([field.name for field in fields])
# First pass: set up types
self.__parse_fields(output_fields, 1)
self.__parse_keys(sort_keys, 1)
# Second pass: initialise fields
self.__parse_fields(output_fields, 0)
self.__parse_keys(sort_keys, 0)
if self.__help_requested():
self._already_reported = True
self.__display_fields(display_field_types=True)
print("")
def __recalculate_sha_width(self):
"""
Recalculate minimum SHA field widths.
For each REP_SHA field present, recalculate the minimum
field width required to ensure uniqueness of the displayed
values.
:rtype: None
"""
shas = {}
props_map = {}
for row in self._rows:
for field in row._fields:
if self._fields[field.props.field_num].dtype == REP_SHA:
# Use field_num as index to apply check across rows
num = field.props.field_num
if num not in shas:
shas[num] = set()
props_map[num] = field.props
shas[num].add(field.report_string)
for num, vals in shas.items():
min_prefix = max(MIN_SHA_WIDTH, props_map[num].width)
props_map[num].width = find_minimum_sha_prefix(vals, min_prefix)
def __recalculate_fields(self):
"""
Recalculate field widths.
For each field, recalculate the minimum field width by
finding the longest ``report_string`` value for that field
and updating the dynamic width stored in the corresponding
``FieldProperties`` object.
:rtype: None
"""
for row in self._rows:
for field in row._fields:
if self._sort_required and field.props.sort_key:
row.sort_fields[field.props.sort_posn] = field
if self._fields[field.props.field_num].dtype == REP_SHA:
continue
field_len = len(field.report_string)
field.props.width = max(field_len, field.props.width)
def __report_headings(self):
"""
Output report headings.
Output the column headings for this Report.
:rtype: None
"""
self._header_written = True
if not self.opts.headings:
return
line = ""
props = self._field_properties
for field_props in props:
if field_props.hidden:
continue
fields = self._fields
heading = fields[field_props.field_num].head
if self.opts.aligned:
heading = f"{heading:{field_props.width}}"
line += heading
if props.index(field_props) != (len(props) - 1):
line += self.opts.separator
self.opts.report_file.write(line.strip() + "\n")
def __row_key_fn(self):
"""
Return a Python key function to compare report rows.
The ``cmp`` argument of sorting functions has been removed
in Python 3.x: to maintain similarity with the device-mapper
report library we keep a traditional "cmp"-style function
(that is structured identically to the version in the device
mapper library), and dynamically wrap it in a ``RowKey``
object to conform to the Python sort key model.
:returns: A RowKey object wrapping _row_cmp()
:rtype: RowKey
"""
def _row_cmp(row_a, row_b):
"""
Compare two report rows for sorting.
Compare the report rows ``row_a`` and ``row_b`` and
return a "cmp"-style comparison value:
1 if row_a > row_b
0 if row_a == row_b
-1 if row_b < row_a
Note that the actual comparison direction depends on the
field definitions of the fields being compared, since
each sort key defines its own sort order.
:param row_a: The first row to compare
:param row_b: The second row to compare
"""
for cnt in range(0, row_a.report.keys_count):
sfa = row_a.sort_fields[cnt]
sfb = row_b.sort_fields[cnt]
if sfa.props.dtype == REP_NUM:
num_a = sfa.sort_value
num_b = sfb.sort_value
if num_a == num_b:
continue
if sfa.props.sort_dir == ASCENDING:
return 1 if num_a > num_b else -1
return 1 if num_a < num_b else -1
stra = sfa.sort_value
strb = sfb.sort_value
if stra == strb:
continue
if sfa.props.sort_dir == ASCENDING:
return 1 if stra > strb else -1
return 1 if stra < strb else -1
return 0
class RowKey:
"""
RowKey sort wrapper.
"""
def __init__(self, obj, *_args):
"""
Initialise a new RowKey object.
:param obj: The object to be compared
:returns: None
"""
self.obj = obj
def __lt__(self, other):
"""
Test if less than.
:param other: The other object to be compared
"""
return _row_cmp(self.obj, other.obj) < 0
def __gt__(self, other):
"""
Test if greater than.
:param other: The other object to be compared
"""
return _row_cmp(self.obj, other.obj) > 0
def __eq__(self, other):
"""
Test if equal to.
:param other: The other object to be compared
"""
return _row_cmp(self.obj, other.obj) == 0
def __le__(self, other):
"""
Test if less than or equal to.
:param other: The other object to be compared
"""
return _row_cmp(self.obj, other.obj) <= 0
def __ge__(self, other):
"""
Test if greater than or equal to.
:param other: The other object to be compared
"""
return _row_cmp(self.obj, other.obj) >= 0
def __ne__(self, other):
"""
Test if not equal to.
:param other: The other object to be compared
"""
return _row_cmp(self.obj, other.obj) != 0
return RowKey
def _sort_rows(self):
"""
Sort the rows of this Report.
Sort this report's rows, according to the configured sort
keys.
:returns: None
"""
self._rows.sort(key=self.__row_key_fn())
[docs]
def report_object(self, obj):
"""
Report data for object.
Add a row of data to this ``Report``. The ``data``
argument should be an object of the type understood by this
report's fields. It will be passed in turn to each field to
obtain data for the current row.
:param obj: the object to report on for this row.
"""
if obj is None:
raise ValueError("Cannot report NoneType object.")
if self._already_reported:
return ""
row = Row(self)
fields = self._fields
if self._sort_required:
row.sort_fields = [-1] * self.keys_count
for field_props in self._field_properties:
field = Field(self, field_props)
data = field_props.objtype.data_fn(obj)
if data is None:
raise ValueError(
f"No data assigned to field {fields[field_props.field_num].name}"
)
try:
fields[field_props.field_num].report_fn(field, data)
except ValueError as err:
raise ValueError(
f"No value assigned to field {fields[field_props.field_num].name}"
) from err
row.add_field(field)
self._rows.append(row)
if not self.opts.buffered:
return self.report_output()
return ""
def _output_field(self, field):
"""
Output field data.
Generate string data for one field in a report row.
:field: The field to be output
:returns: The output report string for this field
:rtype: str
"""
fields = self._fields
prefix = self.opts.field_name_prefix
quote = "" if self.opts.unquoted else STANDARD_QUOTE
if prefix:
field_name = fields[field.props.field_num].name
prefix += f"{field_name.upper()}{STANDARD_PAIR}{STANDARD_QUOTE}"
repstr = field.report_string
width = field.props.width
if self.opts.aligned:
align = field.props.align
if not align:
if field.props.dtype in _right_align_dtypes:
align = ALIGN_RIGHT
else:
align = ALIGN_LEFT
if align == ALIGN_LEFT:
repstr = f"{repstr: <{width}.{width}}"
else:
repstr = f"{repstr: >{width}.{width}}"
suffix = quote
return prefix + repstr + suffix
def _output_field_json(self, field):
"""
Output field data as a JSON dictionary element.
:field: The field to be output
:returns: A ``(key, value)`` tuple for this field
:rtype: Tuple
"""
fields = self._fields
obj_type = field.props.objtype
field_name = obj_type.prefix + fields[field.props.field_num].name
field_dtype = fields[field.props.field_num].dtype
if field_dtype == REP_NUM:
value = field.sort_value
elif field_dtype == REP_STR:
if field.report_string == "yes":
value = True
elif field.report_string == "no":
value = False
else:
value = field.report_string
elif field_dtype == REP_STR_LIST:
value = field.report_string.split(", ")
else:
value = field.report_string
return (field_name, value)
def _output_as_rows(self):
"""
Output this report in column format.
Output the data contained in this ``Report`` in column
format, one row per line. If column headings have not been
printed already they will be automatically displayed by this
call.
:returns: None
"""
for field_props in self._field_properties:
if field_props.hidden:
for row in self._rows:
row._fields = row._fields[1:]
fields = self._implicit_fields if field_props.implicit else self._fields
line = ""
if self.opts.headings:
line += fields[field_props.field_num].head + self.opts.separator
for row in self._rows:
field = row._fields[0]
line += self._output_field(field)
line += self.opts.separator
row._fields = row._fields[1:]
self.opts.report_file.write(line.strip() + "\n")
def _output_as_columns(self):
"""
Output this report in column format.
Output the data contained in this ``Report`` in column
format, one row per line. If column headings have not been
printed already they will be automatically displayed by this
call.
:returns: None
"""
if not self._header_written:
self.__report_headings()
for row in self._rows:
do_field_delim = False
line = ""
for field in row._fields:
if field.props.hidden:
continue
if do_field_delim:
line += self.opts.separator
else:
do_field_delim = True
line += self._output_field(field)
self.opts.report_file.write(line.strip() + "\n")
def _output_as_json(self):
"""
Output this report in JSON format.
Output the data contained in this ``Report`` in JSON notation.
Column names are used as the keys for the JSON report.
:returns: None
"""
rows = {f"{self._title}": []}
for row in self._rows:
row_vals = {}
for field in row._fields:
if field.props.hidden:
continue
(key, value) = self._output_field_json(field)
row_vals[key] = value
rows[f"{self._title}"].append(row_vals)
self.opts.report_file.write(dumps(rows, indent=4) + "\n")
[docs]
def report_output(self):
"""
Output report data.
Output this report's data to the configured report file,
using the configured output controls and fields.
On success the number of rows output is returned. On
error an exception is raised.
:returns: the number of rows of output written.
:rtype: ``int``
"""
if self._already_reported:
return ""
if self._field_calc_needed:
self.__recalculate_sha_width()
self.__recalculate_fields()
if self._sort_required:
self._sort_rows()
if self.opts.json:
return self._output_as_json()
if self.opts.columns_as_rows:
return self._output_as_rows()
return self._output_as_columns()
__all__ = [
# Module constants
"REP_NUM",
"REP_STR",
"REP_SHA",
"REP_IDX",
"REP_TIME",
"REP_UUID",
"REP_SIZE",
"REP_STR_LIST",
"ALIGN_LEFT",
"ALIGN_RIGHT",
"ASCENDING",
"DESCENDING",
# Report objects
"ReportOpts",
"ReportObjType",
"Field",
"FieldType",
"FieldProperties",
"Report",
]
# vim: set et ts=4 sw=4 :