diff options
author | Jan Cholasta <jcholast@redhat.com> | 2014-09-18 11:19:46 +0200 |
---|---|---|
committer | Martin Kosek <mkosek@redhat.com> | 2014-09-30 10:01:38 +0200 |
commit | 86c534df7de353662078feb9128e8125d89474f1 (patch) | |
tree | 756ffb60f84879e7c7f47f6490274037391b48a5 | |
parent | 231f57cedb4fea26d3317fe2b1f30d043c7d2524 (diff) | |
download | freeipa-86c534df7de353662078feb9128e8125d89474f1.tar.gz freeipa-86c534df7de353662078feb9128e8125d89474f1.tar.xz freeipa-86c534df7de353662078feb9128e8125d89474f1.zip |
Move NSSDatabase from ipaserver.certs to ipapython.certdb
https://fedorahosted.org/freeipa/ticket/4416
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
-rw-r--r-- | ipapython/certdb.py | 451 | ||||
-rw-r--r-- | ipaserver/install/certs.py | 448 |
2 files changed, 452 insertions, 447 deletions
diff --git a/ipapython/certdb.py b/ipapython/certdb.py index 426c80996..e190a7093 100644 --- a/ipapython/certdb.py +++ b/ipapython/certdb.py @@ -18,8 +18,14 @@ # import os +import re +import tempfile +import shutil +from nss import nss +from nss.error import NSPRError from ipaplatform.paths import paths +from ipapython.ipa_log_manager import root_logger from ipapython import ipautil CA_NICKNAME_FMT = "%s IPA CA" @@ -48,3 +54,448 @@ def create_ipa_nssdb(): os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'cert8.db'), 0644) os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'key3.db'), 0644) os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'secmod.db'), 0644) + + +def find_cert_from_txt(cert, start=0): + """ + Given a cert blob (str) which may or may not contian leading and + trailing text, pull out just the certificate part. This will return + the FIRST cert in a stream of data. + + Returns a tuple (certificate, last position in cert) + """ + s = cert.find('-----BEGIN CERTIFICATE-----', start) + e = cert.find('-----END CERTIFICATE-----', s) + if e > 0: + e = e + 25 + + if s < 0 or e < 0: + raise RuntimeError("Unable to find certificate") + + cert = cert[s:e] + return (cert, e) + + +class NSSDatabase(object): + """A general-purpose wrapper around a NSS cert database + + For permanent NSS databases, pass the cert DB directory to __init__ + + For temporary databases, do not pass nssdir, and call close() when done + to remove the DB. Alternatively, a NSSDatabase can be used as a + context manager that calls close() automatically. + """ + # Traditionally, we used CertDB for our NSS DB operations, but that class + # got too tied to IPA server details, killing reusability. + # BaseCertDB is a class that knows nothing about IPA. + # Generic NSS DB code should be moved here. + def __init__(self, nssdir=None): + if nssdir is None: + self.secdir = tempfile.mkdtemp() + self._is_temporary = True + else: + self.secdir = nssdir + self._is_temporary = False + + def close(self): + if self._is_temporary: + shutil.rmtree(self.secdir) + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + self.close() + + def run_certutil(self, args, stdin=None): + new_args = [paths.CERTUTIL, "-d", self.secdir] + new_args = new_args + args + return ipautil.run(new_args, stdin) + + def create_db(self, password_filename): + """Create cert DB + + :param password_filename: Name of file containing the database password + """ + self.run_certutil(["-N", "-f", password_filename]) + + def list_certs(self): + """Return nicknames and cert flags for all certs in the database + + :return: List of (name, trust_flags) tuples + """ + certs, stderr, returncode = self.run_certutil(["-L"]) + certs = certs.splitlines() + + # FIXME, this relies on NSS never changing the formatting of certutil + certlist = [] + for cert in certs: + match = re.match(r'^(.+?)\s+(\w*,\w*,\w*)\s*$', cert) + if match: + certlist.append(match.groups()) + + return tuple(certlist) + + def find_server_certs(self): + """Return nicknames and cert flags for server certs in the database + + Server certs have an "u" character in the trust flags. + + :return: List of (name, trust_flags) tuples + """ + server_certs = [] + for name, flags in self.list_certs(): + if 'u' in flags: + server_certs.append((name, flags)) + + return server_certs + + def get_trust_chain(self, nickname): + """Return names of certs in a given cert's trust chain + + :param nickname: Name of the cert + :return: List of certificate names + """ + root_nicknames = [] + chain, stderr, returncode = self.run_certutil([ + "-O", "-n", nickname]) + chain = chain.splitlines() + + for c in chain: + m = re.match('\s*"(.*)" \[.*', c) + if m: + root_nicknames.append(m.groups()[0]) + + return root_nicknames + + def import_pkcs12(self, pkcs12_filename, db_password_filename, + pkcs12_passwd=None): + args = [paths.PK12UTIL, "-d", self.secdir, + "-i", pkcs12_filename, + "-k", db_password_filename, '-v'] + if pkcs12_passwd is not None: + pkcs12_passwd = pkcs12_passwd + '\n' + args = args + ["-w", paths.DEV_STDIN] + try: + ipautil.run(args, stdin=pkcs12_passwd) + except ipautil.CalledProcessError, e: + if e.returncode == 17: + raise RuntimeError("incorrect password for pkcs#12 file %s" % + pkcs12_filename) + elif e.returncode == 10: + raise RuntimeError("Failed to open %s" % pkcs12_filename) + else: + raise RuntimeError("unknown error import pkcs#12 file %s" % + pkcs12_filename) + + def import_files(self, files, db_password_filename, import_keys=False, + key_password=None, key_nickname=None): + """ + Import certificates and a single private key from multiple files + + The files may be in PEM and DER certificate, PKCS#7 certificate chain, + PKCS#8 and raw private key and PKCS#12 formats. + + :param files: Names of files to import + :param db_password_filename: Name of file containing the database + password + :param import_keys: Whether to import private keys + :param key_password: Password to decrypt private keys + :param key_nickname: Nickname of the private key to import from PKCS#12 + files + """ + key_file = None + extracted_key = None + extracted_certs = '' + + for filename in files: + try: + with open(filename, 'rb') as f: + data = f.read() + except IOError as e: + raise RuntimeError( + "Failed to open %s: %s" % (filename, e.strerror)) + + # Try to parse the file as PEM file + matches = list(re.finditer( + r'-----BEGIN (.+?)-----(.*?)-----END \1-----', data, re.DOTALL)) + if matches: + loaded = False + for match in matches: + body = match.group() + label = match.group(1) + line = len(data[:match.start() + 1].splitlines()) + + if label in ('CERTIFICATE', 'X509 CERTIFICATE', + 'X.509 CERTIFICATE'): + try: + x509.load_certificate(match.group(2)) + except NSPRError as e: + if label != 'CERTIFICATE': + root_logger.warning( + "Skipping certificate in %s at line %s: %s", + filename, line, e) + continue + else: + extracted_certs += body + '\n' + loaded = True + continue + + if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'): + args = [ + paths.OPENSSL, 'pkcs7', + '-print_certs', + ] + try: + stdout, stderr, rc = ipautil.run(args, stdin=body) + except ipautil.CalledProcessError as e: + if label == 'CERTIFICATE': + root_logger.warning( + "Skipping certificate in %s at line %s: %s", + filename, line, e) + else: + root_logger.warning( + "Skipping PKCS#7 in %s at line %s: %s", + filename, line, e) + continue + else: + extracted_certs += stdout + '\n' + loaded = True + continue + + if label in ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY', + 'RSA PRIVATE KEY', 'DSA PRIVATE KEY', + 'EC PRIVATE KEY'): + if not import_keys: + continue + + if key_file: + raise RuntimeError( + "Can't load private key from both %s and %s" % + (key_file, filename)) + + args = [ + paths.OPENSSL, 'pkcs8', + '-topk8', + '-passout', 'file:' + db_password_filename, + ] + if ((label != 'PRIVATE KEY' and key_password) or + label == 'ENCRYPTED PRIVATE KEY'): + key_pwdfile = ipautil.write_tmp_file(key_password) + args += [ + '-passin', 'file:' + key_pwdfile.name, + ] + try: + stdout, stderr, rc = ipautil.run(args, stdin=body) + except ipautil.CalledProcessError as e: + root_logger.warning( + "Skipping private key in %s at line %s: %s", + filename, line, e) + continue + else: + extracted_key = stdout + key_file = filename + loaded = True + continue + if loaded: + continue + raise RuntimeError("Failed to load %s" % filename) + + # Try to load the file as DER certificate + try: + x509.load_certificate(data, x509.DER) + except NSPRError: + pass + else: + data = x509.make_pem(base64.b64encode(data)) + extracted_certs += data + '\n' + continue + + # Try to import the file as PKCS#12 file + if import_keys: + try: + self.import_pkcs12( + filename, db_password_filename, key_password) + except RuntimeError: + pass + else: + if key_file: + raise RuntimeError( + "Can't load private key from both %s and %s" % + (key_file, filename)) + key_file = filename + + server_certs = self.find_server_certs() + if key_nickname: + for nickname, trust_flags in server_certs: + if nickname == key_nickname: + break + else: + raise RuntimeError( + "Server certificate \"%s\" not found in %s" % + (key_nickname, filename)) + else: + if len(server_certs) > 1: + raise RuntimeError( + "%s server certificates found in %s, " + "expecting only one" % + (len(server_certs), filename)) + + continue + + raise RuntimeError("Failed to load %s" % filename) + + if import_keys and not key_file: + raise RuntimeError( + "No server certificates found in %s" % (', '.join(files))) + + nss_certs = x509.load_certificate_list(extracted_certs) + nss_cert = None + for nss_cert in nss_certs: + nickname = str(nss_cert.subject) + self.add_cert(nss_cert.der_data, nickname, ',,') + del nss_certs, nss_cert + + if extracted_key: + in_file = ipautil.write_tmp_file(extracted_certs + extracted_key) + out_file = tempfile.NamedTemporaryFile() + out_password = ipautil.ipa_generate_password() + out_pwdfile = ipautil.write_tmp_file(out_password) + args = [ + paths.OPENSSL, 'pkcs12', + '-export', + '-in', in_file.name, + '-out', out_file.name, + '-passin', 'file:' + db_password_filename, + '-passout', 'file:' + out_pwdfile.name, + ] + try: + ipautil.run(args) + except ipautil.CalledProcessError as e: + raise RuntimeError( + "No matching certificate found for private key from %s" % + key_file) + + self.import_pkcs12(out_file.name, db_password_filename, + out_password) + + def trust_root_cert(self, root_nickname, trust_flags=None): + if root_nickname[:7] == "Builtin": + root_logger.debug( + "No need to add trust for built-in root CAs, skipping %s" % + root_nickname) + else: + if trust_flags is None: + trust_flags = 'C,,' + try: + self.run_certutil(["-M", "-n", root_nickname, + "-t", trust_flags]) + except ipautil.CalledProcessError, e: + raise RuntimeError( + "Setting trust on %s failed" % root_nickname) + + def get_cert(self, nickname, pem=False): + args = ['-L', '-n', nickname] + if pem: + args.append('-a') + else: + args.append('-r') + try: + cert, err, returncode = self.run_certutil(args) + except ipautil.CalledProcessError: + raise RuntimeError("Failed to get %s" % nickname) + return cert + + def export_pem_cert(self, nickname, location): + """Export the given cert to PEM file in the given location""" + cert = self.get_cert(nickname) + with open(location, "w+") as fd: + fd.write(cert) + os.chmod(location, 0444) + + def import_pem_cert(self, nickname, flags, location): + """Import a cert form the given PEM file. + + The file must contain exactly one certificate. + """ + try: + with open(location) as fd: + certs = fd.read() + except IOError as e: + raise RuntimeError( + "Failed to open %s: %s" % (location, e.strerror) + ) + + cert, st = find_cert_from_txt(certs) + self.add_cert(cert, nickname, flags, pem=True) + + try: + find_cert_from_txt(certs, st) + except RuntimeError: + pass + else: + raise ValueError('%s contains more than one certificate' % + location) + + def add_cert(self, cert, nick, flags, pem=False): + args = ["-A", "-n", nick, "-t", flags] + if pem: + args.append("-a") + self.run_certutil(args, stdin=cert) + + def delete_cert(self, nick): + self.run_certutil(["-D", "-n", nick]) + + def verify_server_cert_validity(self, nickname, hostname): + """Verify a certificate is valid for a SSL server with given hostname + + Raises a ValueError if the certificate is invalid. + """ + certdb = cert = None + if nss.nss_is_initialized(): + nss.nss_shutdown() + nss.nss_init(self.secdir) + try: + certdb = nss.get_default_certdb() + cert = nss.find_cert_from_nickname(nickname) + intended_usage = nss.certificateUsageSSLServer + try: + approved_usage = cert.verify_now(certdb, True, intended_usage) + except NSPRError, e: + if e.errno != -8102: + raise ValueError(e.strerror) + approved_usage = 0 + if not approved_usage & intended_usage: + raise ValueError('invalid for a SSL server') + if not cert.verify_hostname(hostname): + raise ValueError('invalid for server %s' % hostname) + finally: + del certdb, cert + nss.nss_shutdown() + + return None + + def verify_ca_cert_validity(self, nickname): + certdb = cert = None + if nss.nss_is_initialized(): + nss.nss_shutdown() + nss.nss_init(self.secdir) + try: + certdb = nss.get_default_certdb() + cert = nss.find_cert_from_nickname(nickname) + if not cert.subject: + raise ValueError("has empty subject") + if not cert.is_ca_cert(): + raise ValueError("not a CA certificate") + intended_usage = nss.certificateUsageSSLCA + try: + approved_usage = cert.verify_now(certdb, True, intended_usage) + except NSPRError, e: + if e.errno != -8102: # SEC_ERROR_INADEQUATE_KEY_USAGE + raise ValueError(e.strerror) + approved_usage = 0 + if approved_usage & intended_usage != intended_usage: + raise ValueError('invalid for a CA') + finally: + del certdb, cert + nss.nss_shutdown() diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 55feb6596..5399a0fa5 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -19,7 +19,6 @@ import os import stat -import re import sys import tempfile import shutil @@ -28,15 +27,12 @@ import pwd import base64 from hashlib import sha1 -from nss import nss -from nss.error import NSPRError - from ipapython.ipa_log_manager import root_logger from ipapython import dogtag from ipapython import sysrestore from ipapython import ipautil from ipapython import certmonger -from ipapython.certdb import get_ca_nickname +from ipapython.certdb import get_ca_nickname, find_cert_from_txt, NSSDatabase from ipapython.dn import DN from ipalib import pkcs10, x509, api from ipalib.errors import CertificateOperationError @@ -48,23 +44,6 @@ from ipaplatform.paths import paths # where apache can reach NSS_DIR = paths.HTTPD_ALIAS_DIR -def find_cert_from_txt(cert, start=0): - """ - Given a cert blob (str) which may or may not contian leading and - trailing text, pull out just the certificate part. This will return - the FIRST cert in a stream of data. - - Returns a tuple (certificate, last position in cert) - """ - s = cert.find('-----BEGIN CERTIFICATE-----', start) - e = cert.find('-----END CERTIFICATE-----', s) - if e > 0: e = e + 25 - - if s < 0 or e < 0: - raise RuntimeError("Unable to find certificate") - - cert = cert[s:e] - return (cert, e) def get_cert_nickname(cert): """ @@ -83,431 +62,6 @@ def get_cert_nickname(cert): return (str(dn[0]), dn) -class NSSDatabase(object): - """A general-purpose wrapper around a NSS cert database - - For permanent NSS databases, pass the cert DB directory to __init__ - - For temporary databases, do not pass nssdir, and call close() when done - to remove the DB. Alternatively, a NSSDatabase can be used as a - context manager that calls close() automatically. - """ - # Traditionally, we used CertDB for our NSS DB operations, but that class - # got too tied to IPA server details, killing reusability. - # BaseCertDB is a class that knows nothing about IPA. - # Generic NSS DB code should be moved here. - def __init__(self, nssdir=None): - if nssdir is None: - self.secdir = tempfile.mkdtemp() - self._is_temporary = True - else: - self.secdir = nssdir - self._is_temporary = False - - def close(self): - if self._is_temporary: - shutil.rmtree(self.secdir) - - def __enter__(self): - return self - - def __exit__(self, type, value, tb): - self.close() - - def run_certutil(self, args, stdin=None): - new_args = [paths.CERTUTIL, "-d", self.secdir] - new_args = new_args + args - return ipautil.run(new_args, stdin) - - def create_db(self, password_filename): - """Create cert DB - - :param password_filename: Name of file containing the database password - """ - self.run_certutil(["-N", "-f", password_filename]) - - def list_certs(self): - """Return nicknames and cert flags for all certs in the database - - :return: List of (name, trust_flags) tuples - """ - certs, stderr, returncode = self.run_certutil(["-L"]) - certs = certs.splitlines() - - # FIXME, this relies on NSS never changing the formatting of certutil - certlist = [] - for cert in certs: - match = re.match(r'^(.+?)\s+(\w*,\w*,\w*)\s*$', cert) - if match: - certlist.append(match.groups()) - - return tuple(certlist) - - def find_server_certs(self): - """Return nicknames and cert flags for server certs in the database - - Server certs have an "u" character in the trust flags. - - :return: List of (name, trust_flags) tuples - """ - server_certs = [] - for name, flags in self.list_certs(): - if 'u' in flags: - server_certs.append((name, flags)) - - return server_certs - - def get_trust_chain(self, nickname): - """Return names of certs in a given cert's trust chain - - :param nickname: Name of the cert - :return: List of certificate names - """ - root_nicknames = [] - chain, stderr, returncode = self.run_certutil([ - "-O", "-n", nickname]) - chain = chain.splitlines() - - for c in chain: - m = re.match('\s*"(.*)" \[.*', c) - if m: - root_nicknames.append(m.groups()[0]) - - return root_nicknames - - def import_pkcs12(self, pkcs12_filename, db_password_filename, - pkcs12_passwd=None): - args = [paths.PK12UTIL, "-d", self.secdir, - "-i", pkcs12_filename, - "-k", db_password_filename, '-v'] - if pkcs12_passwd is not None: - pkcs12_passwd = pkcs12_passwd + '\n' - args = args + ["-w", paths.DEV_STDIN] - try: - ipautil.run(args, stdin=pkcs12_passwd) - except ipautil.CalledProcessError, e: - if e.returncode == 17: - raise RuntimeError("incorrect password for pkcs#12 file %s" % - pkcs12_filename) - elif e.returncode == 10: - raise RuntimeError("Failed to open %s" % pkcs12_filename) - else: - raise RuntimeError("unknown error import pkcs#12 file %s" % - pkcs12_filename) - - def import_files(self, files, db_password_filename, import_keys=False, - key_password=None, key_nickname=None): - """ - Import certificates and a single private key from multiple files - - The files may be in PEM and DER certificate, PKCS#7 certificate chain, - PKCS#8 and raw private key and PKCS#12 formats. - - :param files: Names of files to import - :param db_password_filename: Name of file containing the database - password - :param import_keys: Whether to import private keys - :param key_password: Password to decrypt private keys - :param key_nickname: Nickname of the private key to import from PKCS#12 - files - """ - key_file = None - extracted_key = None - extracted_certs = '' - - for filename in files: - try: - with open(filename, 'rb') as f: - data = f.read() - except IOError as e: - raise RuntimeError( - "Failed to open %s: %s" % (filename, e.strerror)) - - # Try to parse the file as PEM file - matches = list(re.finditer( - r'-----BEGIN (.+?)-----(.*?)-----END \1-----', data, re.DOTALL)) - if matches: - loaded = False - for match in matches: - body = match.group() - label = match.group(1) - line = len(data[:match.start() + 1].splitlines()) - - if label in ('CERTIFICATE', 'X509 CERTIFICATE', - 'X.509 CERTIFICATE'): - try: - x509.load_certificate(match.group(2)) - except NSPRError as e: - if label != 'CERTIFICATE': - root_logger.warning( - "Skipping certificate in %s at line %s: %s", - filename, line, e) - continue - else: - extracted_certs += body + '\n' - loaded = True - continue - - if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'): - args = [ - paths.OPENSSL, 'pkcs7', - '-print_certs', - ] - try: - stdout, stderr, rc = ipautil.run(args, stdin=body) - except ipautil.CalledProcessError as e: - if label == 'CERTIFICATE': - root_logger.warning( - "Skipping certificate in %s at line %s: %s", - filename, line, e) - else: - root_logger.warning( - "Skipping PKCS#7 in %s at line %s: %s", - filename, line, e) - continue - else: - extracted_certs += stdout + '\n' - loaded = True - continue - - if label in ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY', - 'RSA PRIVATE KEY', 'DSA PRIVATE KEY', - 'EC PRIVATE KEY'): - if not import_keys: - continue - - if key_file: - raise RuntimeError( - "Can't load private key from both %s and %s" % - (key_file, filename)) - - args = [ - paths.OPENSSL, 'pkcs8', - '-topk8', - '-passout', 'file:' + db_password_filename, - ] - if ((label != 'PRIVATE KEY' and key_password) or - label == 'ENCRYPTED PRIVATE KEY'): - key_pwdfile = ipautil.write_tmp_file(key_password) - args += [ - '-passin', 'file:' + key_pwdfile.name, - ] - try: - stdout, stderr, rc = ipautil.run(args, stdin=body) - except ipautil.CalledProcessError as e: - root_logger.warning( - "Skipping private key in %s at line %s: %s", - filename, line, e) - continue - else: - extracted_key = stdout - key_file = filename - loaded = True - continue - if loaded: - continue - raise RuntimeError("Failed to load %s" % filename) - - # Try to load the file as DER certificate - try: - x509.load_certificate(data, x509.DER) - except NSPRError: - pass - else: - data = x509.make_pem(base64.b64encode(data)) - extracted_certs += data + '\n' - continue - - # Try to import the file as PKCS#12 file - if import_keys: - try: - self.import_pkcs12( - filename, db_password_filename, key_password) - except RuntimeError: - pass - else: - if key_file: - raise RuntimeError( - "Can't load private key from both %s and %s" % - (key_file, filename)) - key_file = filename - - server_certs = self.find_server_certs() - if key_nickname: - for nickname, trust_flags in server_certs: - if nickname == key_nickname: - break - else: - raise RuntimeError( - "Server certificate \"%s\" not found in %s" % - (key_nickname, filename)) - else: - if len(server_certs) > 1: - raise RuntimeError( - "%s server certificates found in %s, " - "expecting only one" % - (len(server_certs), filename)) - - continue - - raise RuntimeError("Failed to load %s" % filename) - - if import_keys and not key_file: - raise RuntimeError( - "No server certificates found in %s" % (', '.join(files))) - - nss_certs = x509.load_certificate_list(extracted_certs) - nss_cert = None - for nss_cert in nss_certs: - nickname = str(nss_cert.subject) - self.add_cert(nss_cert.der_data, nickname, ',,') - del nss_certs, nss_cert - - if extracted_key: - in_file = ipautil.write_tmp_file(extracted_certs + extracted_key) - out_file = tempfile.NamedTemporaryFile() - out_password = ipautil.ipa_generate_password() - out_pwdfile = ipautil.write_tmp_file(out_password) - args = [ - paths.OPENSSL, 'pkcs12', - '-export', - '-in', in_file.name, - '-out', out_file.name, - '-passin', 'file:' + db_password_filename, - '-passout', 'file:' + out_pwdfile.name, - ] - try: - ipautil.run(args) - except ipautil.CalledProcessError as e: - raise RuntimeError( - "No matching certificate found for private key from %s" % - key_file) - - self.import_pkcs12(out_file.name, db_password_filename, - out_password) - - def trust_root_cert(self, root_nickname, trust_flags=None): - if root_nickname[:7] == "Builtin": - root_logger.debug( - "No need to add trust for built-in root CAs, skipping %s" % - root_nickname) - else: - if trust_flags is None: - trust_flags = 'C,,' - try: - self.run_certutil(["-M", "-n", root_nickname, - "-t", trust_flags]) - except ipautil.CalledProcessError, e: - raise RuntimeError( - "Setting trust on %s failed" % root_nickname) - - def get_cert(self, nickname, pem=False): - args = ['-L', '-n', nickname] - if pem: - args.append('-a') - else: - args.append('-r') - try: - cert, err, returncode = self.run_certutil(args) - except ipautil.CalledProcessError: - raise RuntimeError("Failed to get %s" % nickname) - return cert - - def export_pem_cert(self, nickname, location): - """Export the given cert to PEM file in the given location""" - cert = self.get_cert(nickname) - with open(location, "w+") as fd: - fd.write(cert) - os.chmod(location, 0444) - - def import_pem_cert(self, nickname, flags, location): - """Import a cert form the given PEM file. - - The file must contain exactly one certificate. - """ - try: - with open(location) as fd: - certs = fd.read() - except IOError as e: - raise RuntimeError( - "Failed to open %s: %s" % (location, e.strerror) - ) - - cert, st = find_cert_from_txt(certs) - self.add_cert(cert, nickname, flags, pem=True) - - try: - find_cert_from_txt(certs, st) - except RuntimeError: - pass - else: - raise ValueError('%s contains more than one certificate' % - location) - - def add_cert(self, cert, nick, flags, pem=False): - args = ["-A", "-n", nick, "-t", flags] - if pem: - args.append("-a") - self.run_certutil(args, stdin=cert) - - def delete_cert(self, nick): - self.run_certutil(["-D", "-n", nick]) - - def verify_server_cert_validity(self, nickname, hostname): - """Verify a certificate is valid for a SSL server with given hostname - - Raises a ValueError if the certificate is invalid. - """ - certdb = cert = None - if nss.nss_is_initialized(): - nss.nss_shutdown() - nss.nss_init(self.secdir) - try: - certdb = nss.get_default_certdb() - cert = nss.find_cert_from_nickname(nickname) - intended_usage = nss.certificateUsageSSLServer - try: - approved_usage = cert.verify_now(certdb, True, intended_usage) - except NSPRError, e: - if e.errno != -8102: - raise ValueError(e.strerror) - approved_usage = 0 - if not approved_usage & intended_usage: - raise ValueError('invalid for a SSL server') - if not cert.verify_hostname(hostname): - raise ValueError('invalid for server %s' % hostname) - finally: - del certdb, cert - nss.nss_shutdown() - - return None - - def verify_ca_cert_validity(self, nickname): - certdb = cert = None - if nss.nss_is_initialized(): - nss.nss_shutdown() - nss.nss_init(self.secdir) - try: - certdb = nss.get_default_certdb() - cert = nss.find_cert_from_nickname(nickname) - if not cert.subject: - raise ValueError("has empty subject") - if not cert.is_ca_cert(): - raise ValueError("not a CA certificate") - intended_usage = nss.certificateUsageSSLCA - try: - approved_usage = cert.verify_now(certdb, True, intended_usage) - except NSPRError, e: - if e.errno != -8102: # SEC_ERROR_INADEQUATE_KEY_USAGE - raise ValueError(e.strerror) - approved_usage = 0 - if approved_usage & intended_usage != intended_usage: - raise ValueError('invalid for a CA') - finally: - del certdb, cert - nss.nss_shutdown() - - class CertDB(object): """An IPA-server-specific wrapper around NSS |