Source code for pki

# Authors:
#     Endi S. Dewata <edewata@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
#  along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright (C) 2013 Red Hat, Inc.
# All rights reserved.
#
"""
This module contains top-level classes and functions used by the Dogtag project.
"""
from __future__ import absolute_import
from __future__ import print_function

from functools import wraps
import cryptography.x509
import json
import ldap.dn
import logging
import os
import random
import re
import string
import subprocess
import sys

import requests
import six


CONF_DIR = '/etc/pki'
SHARE_DIR = '/usr/share/pki'
BASE_DIR = '/var/lib'
LOG_DIR = '/var/log/pki'

PACKAGE_VERSION = SHARE_DIR + '/VERSION'
CERT_HEADER = "-----BEGIN CERTIFICATE-----"
CERT_FOOTER = "-----END CERTIFICATE-----"

# Valid punctuation characters for random password.
# This is based on string.punctuation except:
#  - equal sign since it's used as delimiter in password.conf
#  - backslash since it's causing SSL handshake failure
#  - it should be relatively safe in an XML attribute
PUNCTUATIONS = '!#*+,-./:;^_|~'
GEN_PASS_CHARSET = string.digits + string.ascii_lowercase + string.ascii_uppercase + PUNCTUATIONS

# Map X.509 attribute OID to its short name.
# See also:
# - https://github.com/pyca/cryptography/blob/master/src/cryptography/x509/name.py
# - https://github.com/freeipa/freeipa/blob/master/ipapython/dn.py
ATTR_NAME_BY_OID = {
    cryptography.x509.oid.NameOID.COMMON_NAME: 'CN',
    cryptography.x509.oid.NameOID.COUNTRY_NAME: 'C',
    cryptography.x509.oid.NameOID.LOCALITY_NAME: 'L',
    cryptography.x509.oid.NameOID.STATE_OR_PROVINCE_NAME: 'ST',
    cryptography.x509.oid.NameOID.ORGANIZATION_NAME: 'O',
    cryptography.x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME: 'OU',
    cryptography.x509.oid.NameOID.SERIAL_NUMBER: 'serialNumber',
    cryptography.x509.oid.NameOID.SURNAME: 'SN',
    cryptography.x509.oid.NameOID.GIVEN_NAME: 'givenName',
    cryptography.x509.oid.NameOID.TITLE: 'title',
    cryptography.x509.oid.NameOID.GENERATION_QUALIFIER: 'generationQualifier',
    cryptography.x509.oid.NameOID.DN_QUALIFIER: 'dnQualifier',
    cryptography.x509.oid.NameOID.PSEUDONYM: 'pseudonym',
    cryptography.x509.oid.NameOID.DOMAIN_COMPONENT: 'DC',
    cryptography.x509.oid.NameOID.EMAIL_ADDRESS: 'E',
    cryptography.x509.oid.NameOID.JURISDICTION_COUNTRY_NAME:
        'incorporationCountry',
    cryptography.x509.oid.NameOID.JURISDICTION_LOCALITY_NAME:
        'incorporationLocality',
    cryptography.x509.oid.NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME:
        'incorporationState',
    cryptography.x509.oid.NameOID.BUSINESS_CATEGORY: 'businessCategory',
    cryptography.x509.ObjectIdentifier('2.5.4.9'): 'STREET',
    cryptography.x509.ObjectIdentifier('2.5.4.17'): 'postalCode',
    cryptography.x509.ObjectIdentifier('0.9.2342.19200300.100.1.1'): 'UID',
}

# Retry-able connection errors, see https://github.com/dogtagpki/pki/issues/3091
RETRYABLE_EXCEPTIONS = (
    requests.exceptions.ConnectionError,  # connection failed
    requests.exceptions.Timeout,  # connection or read time out
)

logger = logging.getLogger(__name__)


[docs] def convert_x509_name_to_dn(name): """ Convert X.509 Name into NSS-style DN string. See also: - https://cryptography.io/en/latest/x509/reference.html#cryptography.x509.Name - https://cryptography.io/en/latest/x509/reference.html#cryptography.x509.RelativeDistinguishedName - https://cryptography.io/en/latest/x509/reference.html#cryptography.x509.NameAttribute - https://cryptography.io/en/latest/x509/reference.html#cryptography.x509.ObjectIdentifier :param name: X.509 Name :type name: cryptography.x509.Name :returns: str -- DN string. """ # noqa: E501 # Do not use cryptography.x509.Name.rfc4514_string() since # it generates a DN with reversed RDN order: # dn = name.rfc4514_string() dn = None for attr in name: # Do not use cryptography.x509.RelativeDistinguishedName.rfc4514_string() # since it may generate an RDN with incorrect attribute names: # rdn = attr.rfc4514_string() oid = attr.oid attr_name = ATTR_NAME_BY_OID.get(oid, oid.dotted_string) attr_value = ldap.dn.escape_dn_chars(attr.value) rdn = '%s=%s' % (attr_name, attr_value) if dn: dn = rdn + ',' + dn else: dn = rdn return dn
[docs] def specification_version(): """ Return specification version. :returns: str -- specification version """ return get_info('Specification-Version')
[docs] def implementation_version(): """ Return implementation version. :returns: str -- implementation version """ return get_info('Implementation-Version')
[docs] def get_info(name): with open(PACKAGE_VERSION, 'r', encoding='utf-8') as input_file: for line in input_file: line = line.strip('\n') # parse <key>: <value> match = re.match(r'^\s*(\S*)\s*:\s*(.*)\s*$', line) if not match: continue key = match.group(1) value = match.group(2) if key.lower() == name.lower(): return value raise Exception('Property not found: %s' % name)
[docs] def generate_password(charset=GEN_PASS_CHARSET, length=12): """ This function generates FIPS-compliant password. See sftk_newPinCheck() in the following file: https://dxr.mozilla.org/nss/source/nss/lib/softoken/fipstokn.c The minimum password length is FIPS_MIN_PIN Unicode characters. The password must contain at least 3 character classes: * digits (string.digits) * ASCII lowercase letters (string.ascii_lowercase) * ASCII uppercase letters (string.ascii_uppercase) * ASCII non-alphanumeric characters (PUNCTUATIONS) * non-ASCII characters If an ASCII uppercase letter is the first character of the password, the uppercase letter is not counted toward its character class. If a digit is the last character of the password, the digit is not counted toward its character class. The FIPS_MIN_PIN is defined in the following file: https://dxr.mozilla.org/nss/source/nss/lib/softoken/pkcs11i.h #define FIPS_MIN_PIN 7 """ rnd = random.SystemRandom() chars = [] # add 1 random char from each present char class to meet # the minimum number of char class requirement if string.digits in charset: chars.append(rnd.choice(string.digits)) if string.ascii_lowercase in charset: chars.append(rnd.choice(string.ascii_lowercase)) if string.ascii_uppercase in charset: chars.append(rnd.choice(string.ascii_uppercase)) if PUNCTUATIONS in charset: chars.append(rnd.choice(PUNCTUATIONS)) # extend chars to specified length via any valid character classes chars.extend(rnd.choice(charset) for i in range(length - len(chars))) # randomize the char order rnd.shuffle(chars) # final password is `length` chars password = ''.join(chars) return password
[docs] class FIPS:
[docs] @staticmethod def is_enabled(): # Check if /proc/sys/crypto/fips_enabled exists if not os.path.exists('/proc/sys/crypto/fips_enabled'): return False # Check to see if FIPS is enabled on this system command = ['sysctl', 'crypto.fips_enabled', '-bn'] with open(os.devnull, 'w', encoding='utf-8') as fnull: output = subprocess.check_output(command, stderr=fnull).decode('utf-8') if output != '0': return True else: return False
# pylint: disable=R0903
[docs] class Attribute(object): """ Class representing a key/value pair. This object is the basis of the representation of a ResourceMessage. """ def __init__(self, name, value): """ Constructor """ self.name = name self.value = value
# pylint: disable=R0903
[docs] class AttributeList(object): """ Class representing a list of attributes. This class is needed because of a JavaMapper used in the REST API. """ # pylint: disable=C0103 def __init__(self): """ Constructor """ self.Attribute = []
[docs] class ResourceMessage(object): """ This class is the basis for the various types of key requests. It is essentially a list of attributes. """ # pylint: disable=C0103 def __init__(self, class_name): """ Constructor """ self.Attributes = AttributeList() self.ClassName = class_name
[docs] def add_attribute(self, name, value): """ Add an attribute to the list. :param name: name of attribute to add :type name: str :param value: value to add :type value: str :returns: None """ attr = Attribute(name, value) self.Attributes.Attribute.append(attr)
[docs] def get_attribute_value(self, name): """ Get the value of a given attribute. :param name: name of attribute to retrieve :type name: str :returns: str -- value of parameter """ for attr in self.Attributes.Attribute: if attr.name == name: return attr.value return None
[docs] class PKIException(Exception, ResourceMessage): """ Base exception class for REST Interface """ def __init__(self, message, exception=None, code=None, class_name=None): """ Constructor """ Exception.__init__(self, message) ResourceMessage.__init__(self, class_name) self.code = code self.message = message self.exception = exception
[docs] @classmethod def from_json(cls, json_value): """ Construct PKIException from JSON. :param json_value: JSON representation of the exception. :type json_value: str :return: pki.PKIException """ ret = cls( message=json_value['Message'], code=json_value['Code'], class_name=json_value['ClassName'] ) for attr in json_value['Attributes']['Attribute']: ret.add_attribute(attr["name"], attr["value"]) return ret
[docs] class BadRequestException(PKIException): """ Bad Request Exception: return code = 400 """
[docs] class ConflictingOperationException(PKIException): """ Conflicting Operation Exception: return code = 409 """
[docs] class ForbiddenException(PKIException): """ Forbidden Exception: return code = 403 """
[docs] class HTTPGoneException(PKIException): """ Gone Exception: return code = 410 """
[docs] class ResourceNotFoundException(PKIException): """ Not Found Exception: return code = 404 """
[docs] class UnauthorizedException(PKIException): """ Unauthorized Exception: return code = 401 """
[docs] class CertNotFoundException(ResourceNotFoundException): """ Cert Not Found Exception: return code = 404 """
[docs] class GroupNotFoundException(ResourceNotFoundException): """ Group Not Found Exception: return code = 404 """
[docs] class KeyNotFoundException(ResourceNotFoundException): """ Key Not Found Exception: return code 404 """
[docs] class ProfileNotFoundException(ResourceNotFoundException): """ Profile Not Found Exception: return code = 404 """
[docs] class RequestNotFoundException(ResourceNotFoundException): """ Request Not Found Exception: return code = 404 """
[docs] class UserNotFoundException(ResourceNotFoundException): """ User Not Found Exception: return code = 404 """
# Mapping from Java Server exception classes to python exception classes EXCEPTION_MAPPINGS = { "com.netscape.certsrv.base.BadRequestException": BadRequestException, "com.netscape.certsrv.base.ConflictingOperationException": ConflictingOperationException, "com.netscape.certsrv.base.ForbiddenException": ForbiddenException, "com.netscape.certsrv.base.HTTPGoneException": HTTPGoneException, "com.netscape.certsrv.base.ResourceNotFoundException": ResourceNotFoundException, "com.netscape.certsrv.cert.CertNotFoundException": CertNotFoundException, "com.netscape.certsrv.group.GroupNotFoundException": GroupNotFoundException, "com.netscape.certsrv.key.KeyNotFoundException": KeyNotFoundException, "com.netscape.certsrv.profile.ProfileNotFoundException": ProfileNotFoundException, "com.netscape.certsrv.request.RequestNotFoundException": RequestNotFoundException, "com.netscape.certsrv.base.UserNotFoundException": UserNotFoundException, "com.netscape.certsrv.base.PKIException": PKIException}
[docs] def handle_exceptions(): """ Decorator handling exceptions from REST methods. """ def exceptions_decorator(fn_call): """ The actual decorator handler.""" @wraps(fn_call) def handler(inst, *args, **kwargs): """ Decorator to catch and re-throw PKIExceptions.""" try: return fn_call(inst, *args, **kwargs) except requests.exceptions.HTTPError: # store exception information. json may raise another # exception. We want to re-raise the HTTPError. exc_type, exc_val, exc_tb = sys.exc_info() try: response = exc_val.response json_response = response.json() logger.debug('Response:\n%s', json.dumps(json_response, indent=4)) except ValueError: # json raises ValueError. simplejson raises # JSONDecodeError, which is a subclass of ValueError. # re-raise original exception six.reraise(exc_type, exc_val, exc_tb) else: # clear reference cycle exc_type = exc_val = exc_tb = None clazz = json_response.get('ClassName') if clazz and clazz in EXCEPTION_MAPPINGS: exception_class = EXCEPTION_MAPPINGS[clazz] pki_exception = exception_class.from_json(json_response) raise pki_exception return handler return exceptions_decorator
[docs] class PropertyFile(object): """ Class to manage property files The contents of the property file are maintained internally as a list of properties. Properties are strings of the format <name> <delimiter> <value> where '=' is the default delimiter. The value can optionally be quoted. """ def __init__(self, filename, delimiter='=', quote=None): """ Constructor """ if quote and len(quote) > 1: raise Exception('Invalid quote character: ' + quote) self.filename = filename self.delimiter = delimiter self.quote = quote self.lines = []
[docs] def read(self): """ Read from property file into the list of properties maintained by this object. :return: None """ self.lines = [] if not os.path.exists(self.filename): return # read all lines and preserve the original order with open(self.filename, 'r', encoding='utf-8') as f_in: for line in f_in: line = line.strip('\n') self.lines.append(line)
[docs] def write(self): """ Write the list of properties maintained by this object to the property file. :return: None """ # write all lines in the original order with open(self.filename, 'w', encoding='utf-8') as f_out: for line in self.lines: f_out.write(line + '\n')
[docs] def show(self): """ Print the contents of the list of properties maintained by this object to STDOUT. :return: None """ for line in self.lines: print(line)
[docs] def insert_line(self, index, line): """ Insert property into the list of properties maintained by this object at the specified location (index). :param index: point at which to insert value. :type index: int :param line: value to be inserted. :type line: str :return: None """ self.lines.insert(index, line)
[docs] def remove_line(self, index): """ Remove property at specified index from the properties list. :param index: location of property to be removed. :type index: int :return: None """ self.lines.pop(index)
[docs] def index(self, name): """ Find the index (position) of a property in a property file. :param name: name of property :type name: str :return: int -- index of property. """ for i, line in enumerate(self.lines): # parse <key> <delimiter> <value> match = re.match(r'^\s*([^%s]*)\s*%s\s*(.*)\s*$' % (self.delimiter, self.delimiter), line) if not match: continue key = match.group(1) if key.lower() == name.lower(): return i return -1
[docs] def get(self, name): """ Get the value of the specified property. :param name: name of property to be fetched. :type name: str :return: str -- value of property """ for line in self.lines: # parse <key> <delimiter> <value> match = re.match(r'^\s*([^%s]*)\s*%s\s*(.*)\s*$' % (self.delimiter, self.delimiter), line) if not match: continue key = match.group(1) value = match.group(2) if key.lower() != name.lower(): continue # remove quotes if any if self.quote and value.startswith(self.quote) and value.endswith(self.quote): value = value[1:-1] return value return None
[docs] def set(self, name, value, index=None): """ Set value of specified property. :param name: name of property to set. :type name: str :param value: value to set :type value: str :param index: (optional) position of property :type index: int :return: None """ if self.quote: value = self.quote + value + self.quote for i, line in enumerate(self.lines): # parse <key> <delimiter> <value> match = re.match(r'^\s*([^%s]*)\s*%s\s*(.*)\s*$' % (self.delimiter, self.delimiter), line) if not match: continue key = match.group(1) if key.lower() == name.lower(): self.lines[i] = key + self.delimiter + value return if index is None: self.lines.append(name + self.delimiter + value) else: self.insert_line(index, name + self.delimiter + value)
[docs] def remove(self, name): """ Remove property from list of properties maintained by this object. :param name: name of property to be removed. :type name: str :returns: None """ for i, line in enumerate(self.lines): # parse <key> <delimiter> <value> match = re.match(r'^\s*([^%s]*)\s*%s\s*(.*)\s*$' % (self.delimiter, self.delimiter), line) if not match: continue key = match.group(1) value = match.group(2) if key.lower() != name.lower(): continue self.lines.pop(i) # remove quotes if any if self.quote and value.startswith(self.quote) and value.endswith(self.quote): value = value[1:-1] return value return None