sampledoc

Source code for Bcfg2.Server.Lint.RequiredAttrs

import os
import re
import lxml.etree
import Bcfg2.Server.Lint
import Bcfg2.Client.Tools.VCS
from Bcfg2.Server.Plugins.Packages import Apt, Yum
from Bcfg2.Client.Tools.POSIX.base import device_map
try:
    from Bcfg2.Server.Plugins.Bundler import BundleTemplateFile
    HAS_GENSHI = True
except ImportError:
    HAS_GENSHI = False


# format verifying functions.  TODO: These should be moved into XML
# schemas where possible.
def is_filename(val):
    """ Return True if val is a string describing a valid full path
    """
[docs] return val.startswith("/") and len(val) > 1 def is_selinux_type(val): """ Return True if val is a string describing a valid (although not necessarily existent) SELinux type """
[docs] return re.match(r'^[a-z_]+_t', val) def is_selinux_user(val): """ Return True if val is a string describing a valid (although not necessarily existent) SELinux user """
[docs] return re.match(r'^[a-z_]+_u', val) def is_octal_mode(val): """ Return True if val is a string describing a valid octal permissions mode """
[docs] return re.match(r'[0-7]{3,4}', val) def is_username(val): """ Return True if val is a string giving either a positive integer uid, or a valid Unix username """
[docs] return re.match(r'^([A-z][-_A-z0-9]{0,30}|\d+)$', val) def is_device_mode(val): """ Return True if val is a string describing a positive integer """
[docs] return re.match(r'^\d+$', val) class RequiredAttrs(Bcfg2.Server.Lint.ServerPlugin): """ Verify attributes for configuration entries that cannot be verified with an XML schema alone. """
[docs] def __init__(self, *args, **kwargs): Bcfg2.Server.Lint.ServerPlugin.__init__(self, *args, **kwargs) self.required_attrs = dict( Path=dict( device=dict(name=is_filename, owner=is_username, group=is_username, dev_type=lambda v: v in device_map), directory=dict(name=is_filename, owner=is_username, group=is_username, mode=is_octal_mode), file=dict(name=is_filename, owner=is_username, group=is_username, mode=is_octal_mode, __text__=None), hardlink=dict(name=is_filename, to=is_filename), symlink=dict(name=is_filename), ignore=dict(name=is_filename), nonexistent=dict(name=is_filename), permissions=dict(name=is_filename, owner=is_username, group=is_username, mode=is_octal_mode), vcs=dict(vcstype=lambda v: (v != 'Path' and hasattr(Bcfg2.Client.Tools.VCS.VCS, "Install%s" % v)), revision=None, sourceurl=None)), Service={"__any__": dict(name=None), "smf": dict(name=None, FMRI=None)}, Action={None: dict(name=None, timing=lambda v: v in ['pre', 'post', 'both'], when=lambda v: v in ['modified', 'always'], status=lambda v: v in ['ignore', 'check'], command=None)}, ACL=dict( default=dict(scope=lambda v: v in ['user', 'group'], perms=lambda v: re.match(r'^([0-7]|[rwx\-]{0,3}', v)), access=dict(scope=lambda v: v in ['user', 'group'], perms=lambda v: re.match(r'^([0-7]|[rwx\-]{0,3}', v)), mask=dict(perms=lambda v: re.match(r'^([0-7]|[rwx\-]{0,3}', v))), Package={"__any__": dict(name=None)}, SEBoolean={None: dict(name=None, value=lambda v: v in ['on', 'off'])}, SEModule={None: dict(name=None, __text__=None)}, SEPort={ None: dict(name=lambda v: re.match(r'^\d+(-\d+)?/(tcp|udp)', v), selinuxtype=is_selinux_type)}, SEFcontext={None: dict(name=None, selinuxtype=is_selinux_type)}, SENode={None: dict(name=lambda v: "/" in v, selinuxtype=is_selinux_type, proto=lambda v: v in ['ipv6', 'ipv4'])}, SELogin={None: dict(name=is_username, selinuxuser=is_selinux_user)}, SEUser={None: dict(name=is_selinux_user, roles=lambda v: all(is_selinux_user(u) for u in " ".split(v)), prefix=None)}, SEInterface={None: dict(name=None, selinuxtype=is_selinux_type)}, SEPermissive={None: dict(name=is_selinux_type)}, POSIXGroup={None: dict(name=is_username)}, POSIXUser={None: dict(name=is_username)}) def Run(self): self.check_packages() if "Defaults" in self.core.plugins: self.logger.info("Defaults plugin enabled; skipping required " "attribute checks") else: self.check_rules() self.check_bundles() @classmethod def Errors(cls): return {"missing-elements": "error", "unknown-entry-type": "error", "unknown-entry-tag": "error", "required-attrs-missing": "error", "required-attr-format": "error", "extra-attrs": "warning"} def check_default_acl(self, path): """ Check that a default ACL contains either no entries or minimum required entries """
[docs] defaults = 0 if path.xpath("ACL[@type='default' and @scope='user' and @user='']"): defaults += 1 if path.xpath("ACL[@type='default' and @scope='group' and @group='']"): defaults += 1 if path.xpath("ACL[@type='default' and @scope='other']"): defaults += 1 if defaults > 0 and defaults < 3: self.LintError( "missing-elements", "A Path must have either no default ACLs or at" " least default:user::, default:group:: and" " default:other::") def check_packages(self): """ Check Packages sources for Source entries with missing attributes. """
[docs] if 'Packages' not in self.core.plugins: return for source in self.core.plugins['Packages'].sources: if isinstance(source, Yum.YumSource): if (not source.pulp_id and not source.url and not source.rawurl): self.LintError( "required-attrs-missing", "A %s source must have either a url, rawurl, or " "pulp_id attribute: %s" % (source.ptype, self.RenderXML(source.xsource))) elif not source.url and not source.rawurl: self.LintError( "required-attrs-missing", "A %s source must have either a url or rawurl attribute: " "%s" % (source.ptype, self.RenderXML(source.xsource))) if (not isinstance(source, Apt.AptSource) and source.recommended): self.LintError( "extra-attrs", "The recommended attribute is not supported on %s sources:" " %s" % (source.ptype, self.RenderXML(source.xsource))) def check_rules(self): """ check Rules for Path entries with missing attrs """ if 'Rules' not in self.core.plugins:
[docs] return for rules in self.core.plugins['Rules'].entries.values(): xdata = rules.pnode.data for path in xdata.xpath("//Path"): self.check_entry(path, os.path.join(self.config['repo'], rules.name)) def check_bundles(self): """ Check bundles for BoundPath entries with missing attrs. """
[docs] if 'Bundler' not in self.core.plugins: return for bundle in self.core.plugins['Bundler'].entries.values(): if (self.HandlesFile(bundle.name) and (not HAS_GENSHI or not isinstance(bundle, BundleTemplateFile))): try: xdata = lxml.etree.XML(bundle.data) except (lxml.etree.XMLSyntaxError, AttributeError): xdata = \ lxml.etree.parse(bundle.template.filepath).getroot() for path in \ xdata.xpath("//*[substring(name(), 1, 5) = 'Bound']"): self.check_entry(path, bundle.name) # ensure that abstract Package tags have either name # or group specified for package in xdata.xpath("//Package"): if ('name' not in package.attrib and 'group' not in package.attrib): self.LintError( "required-attrs-missing", "Package tags require either a 'name' or 'group' " "attribute: \n%s" % self.RenderXML(package)) def check_entry(self, entry, filename): """ Generic entry check.
[docs] :param entry: The XML entry to check for missing attributes. :type entry: lxml.etree._Element :param filename: The filename the entry came from :type filename: string """ if self.HandlesFile(filename): name = entry.get('name') tag = entry.tag if tag.startswith("Bound"): tag = tag[5:] if tag not in self.required_attrs: self.LintError("unknown-entry-tag", "Unknown entry tag '%s': %s" % (tag, self.RenderXML(entry))) return etype = entry.get('type') if etype in self.required_attrs[tag]: required_attrs = self.required_attrs[tag][etype] elif '__any__' in self.required_attrs[tag]: required_attrs = self.required_attrs[tag]['__any__'] else: self.LintError("unknown-entry-type", "Unknown %s type %s: %s" % (tag, etype, self.RenderXML(entry))) return attrs = set(entry.attrib.keys()) if 'dev_type' in required_attrs: dev_type = entry.get('dev_type') if dev_type in ['block', 'char']: # check if major/minor are specified required_attrs['major'] = is_device_mode required_attrs['minor'] = is_device_mode if tag == 'Path': self.check_default_acl(entry) if tag == 'ACL' and 'scope' in required_attrs: required_attrs[entry.get('scope')] = is_username if '__text__' in required_attrs: fmt = required_attrs['__text__'] del required_attrs['__text__'] if (not entry.text and not entry.get('empty', 'false').lower() == 'true'): self.LintError("required-attrs-missing", "Text missing for %s %s in %s: %s" % (tag, name, filename, self.RenderXML(entry))) if fmt is not None and not fmt(entry.text): self.LintError( "required-attr-format", "Text content of %s %s in %s is malformed\n%s" % (tag, name, filename, self.RenderXML(entry))) if not attrs.issuperset(required_attrs.keys()): self.LintError( "required-attrs-missing", "The following required attribute(s) are missing for %s " "%s in %s: %s\n%s" % (tag, name, filename, ", ".join([attr for attr in set(required_attrs.keys()).difference(attrs)]), self.RenderXML(entry))) for attr, fmt in required_attrs.items(): if fmt and attr in attrs and not fmt(entry.attrib[attr]): self.LintError( "required-attr-format", "The %s attribute of %s %s in %s is malformed\n%s" % (attr, tag, name, filename, self.RenderXML(entry)))