From 1e86378d491ac2dcb01fb3ac0da720df2bff5873 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 14 Mar 2013 13:54:38 +0100 Subject: ipaserver.install.certs: Introduce NSSDatabase as a more generic certutil wrapper The CertDB class was meant to be a wrapper around NSS databases, certutil, pk12util, etc. Unfortunately, over time it grew too dependent on the particular scenarios it is used in. Introduce a new class that has no knowledge about IPA configuration, and move generic code to it. In the future, generic code should be moved to NSSDatabase, code for the self-signed CA should be removed, and IPA-specific code may stay in CertDB (which calls NSSDatabase). --- ipaserver/install/certs.py | 286 ++++++++++++++++++++++++++++++--------------- 1 file changed, 191 insertions(+), 95 deletions(-) (limited to 'ipaserver/install/certs.py') diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 2e78e03e..1e718bb3 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -188,8 +188,184 @@ def next_replica(serial_file=CA_SERIALNO): return str(serial) + +class NSSDatabase(object): + """A general-purpose wrapper around a NSS cert database + + For permanent NSS databases, pass the cert DB directory to __init__ + + For temporary databases, do not pass nssdir, and call close() when done + to remove the DB. Alternatively, a NSSDatabase can be used as a + context manager that calls close() automatically. + """ + # Traditionally, we used CertDB for our NSS DB operations, but that class + # got too tied to IPA server details, killing reusability. + # BaseCertDB is a class that knows nothing about IPA. + # Generic NSS DB code should be moved here. + def __init__(self, nssdir=None): + if nssdir is None: + self.secdir = tempfile.mkdtemp() + self._is_temporary = True + else: + self.secdir = nssdir + self._is_temporary = False + + def close(self): + if self._is_temporary: + shutil.rmtree(self.secdir) + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + self.close() + + def run_certutil(self, args, stdin=None): + new_args = ["/usr/bin/certutil", "-d", self.secdir] + new_args = new_args + args + return ipautil.run(new_args, stdin) + + def create_db(self, password_filename): + """Create cert DB + + :param password_filename: Name of file containing the database password + """ + self.run_certutil(["-N", "-f", password_filename]) + + def list_certs(self): + """Return nicknames and cert flags for all certs in the database + + :return: List of (name, trust_flags) tuples + """ + certs, stderr, returncode = self.run_certutil(["-L"]) + certs = certs.splitlines() + + # FIXME, this relies on NSS never changing the formatting of certutil + certlist = [] + for cert in certs: + nickname = cert[0:61] + trust = cert[61:] + if re.match(r'\w*,\w*,\w*', trust): + certlist.append((nickname.strip(), trust.strip())) + + return tuple(certlist) + + def find_server_certs(self): + """Return nicknames and cert flags for server certs in the database + + Server certs have an "u" character in the trust flags. + + :return: List of (name, trust_flags) tuples + """ + server_certs = [] + for name, flags in self.list_certs(): + if 'u' in flags: + server_certs.append((name, flags)) + + return server_certs + + def get_trust_chain(self, nickname): + """Return names of certs in a given cert's trust chain + + :param nickname: Name of the cert + :return: List of certificate names + """ + root_nicknames = [] + chain, stderr, returncode = self.run_certutil([ + "-O", "-n", nickname]) + chain = chain.splitlines() + + for c in chain: + m = re.match('\s*"(.*)" \[.*', c) + if m: + root_nicknames.append(m.groups()[0]) + + return root_nicknames + + def import_pkcs12(self, pkcs12_filename, db_password_filename, + pkcs_password_filename=None): + args = ["/usr/bin/pk12util", "-d", self.secdir, + "-i", pkcs12_filename, + "-k", db_password_filename, '-v'] + if pkcs_password_filename: + args = args + ["-w", pkcs_password_filename] + try: + ipautil.run(args) + except ipautil.CalledProcessError, e: + if e.returncode == 17: + raise RuntimeError("incorrect password for pkcs#12 file") + else: + raise RuntimeError("unknown error import pkcs#12 file") + + def find_root_cert_from_pkcs12(self, pkcs12_fname, passwd_fname=None): + """Given a PKCS#12 file, try to find any certificates that do + not have a key. The assumption is that these are the root CAs. + """ + args = ["/usr/bin/pk12util", "-d", self.secdir, + "-l", pkcs12_fname, + "-k", passwd_fname] + if passwd_fname: + args = args + ["-w", passwd_fname] + try: + (stdout, stderr, returncode) = ipautil.run(args) + except ipautil.CalledProcessError, e: + if e.returncode == 17: + raise RuntimeError("incorrect password for pkcs#12 file") + else: + raise RuntimeError("unknown error using pkcs#12 file") + + lines = stdout.split('\n') + + # A simple state machine. + # 1 = looking for a line starting with 'Certificate' + # 2 = looking for the Friendly name (nickname) + nicknames = [] + state = 1 + for line in lines: + if state == 2: + m = re.match("\W+Friendly Name: (.*)", line) + if m: + nicknames.append( m.groups(0)[0]) + state = 1 + if line == "Certificate:": + state = 2 + elif not line.startswith(' '): + # Top-level item that is not a certificate + state = 1 + + return nicknames + + def trust_root_cert(self, root_nickname): + if root_nickname[:7] == "Builtin": + root_logger.debug( + "No need to add trust for built-in root CAs, skipping %s" % + root_nickname) + else: + try: + self.run_certutil(["-M", "-n", root_nickname, + "-t", "CT,CT,"]) + except ipautil.CalledProcessError, e: + raise RuntimeError( + "Setting trust on %s failed" % root_nickname) + + def export_pem_cert(self, nickname, location): + """Export the given cert to PEM file in the given location""" + cert, err, returncode = self.run_certutil(["-L", "-n", nickname, "-a"]) + with open(location, "w+") as fd: + fd.write(cert) + os.chmod(location, 0444) + + class CertDB(object): + """An IPA-server-specific wrapper around NSS + + This class knows IPA-specific details such as nssdir location, or the + CA cert name. + """ + # TODO: Remove all selfsign code def __init__(self, realm, nssdir=NSS_DIR, fstore=None, host_name=None, subject_base=None): + self.nssdb = NSSDatabase(nssdir) + self.secdir = nssdir self.realm = realm @@ -298,9 +474,7 @@ class CertDB(object): return sha1(ipautil.ipa_generate_password()).hexdigest() def run_certutil(self, args, stdin=None): - new_args = ["/usr/bin/certutil", "-d", self.secdir] - new_args = new_args + args - return ipautil.run(new_args, stdin) + return self.nssdb.run_certutil(args, stdin) def run_signtool(self, args, stdin=None): if not self.self_signed_ca: @@ -334,29 +508,14 @@ class CertDB(object): ipautil.backup_file(self.certdb_fname) ipautil.backup_file(self.keydb_fname) ipautil.backup_file(self.secmod_fname) - self.run_certutil(["-N", - "-f", self.passwd_fname]) + self.nssdb.create_db(self.passwd_fname) self.set_perms(self.passwd_fname, write=True) def list_certs(self): """ Return a tuple of tuples containing (nickname, trust) """ - p = subprocess.Popen(["/usr/bin/certutil", "-d", self.secdir, - "-L"], stdout=subprocess.PIPE) - - certs = p.stdout.read() - certs = certs.split("\n") - - # FIXME, this relies on NSS never changing the formatting of certutil - certlist = [] - for cert in certs: - nickname = cert[0:61] - trust = cert[61:] - if re.match(r'\w+,\w+,\w+', trust): - certlist.append((nickname.strip(), trust)) - - return tuple(certlist) + return self.nssdb.list_certs() def has_nickname(self, nickname): """ @@ -810,26 +969,7 @@ class CertDB(object): Given a nickname, return a list of the certificates that make up the trust chain. """ - root_nicknames = [] - p = subprocess.Popen(["/usr/bin/certutil", "-d", self.secdir, - "-O", "-n", nickname], stdout=subprocess.PIPE) - - chain = p.stdout.read() - chain = chain.split("\n") - - for c in chain: - m = re.match('\s*"(.*)" \[.*', c) - if m: - root_nicknames.append(m.groups()[0]) - - if len(root_nicknames) > 1: - # If you pass in the name of a CA to get the chain it may only - # return 1 (self-signed). Return that. - try: - root_nicknames.remove(nickname) - except ValueError: - # The nickname wasn't in the list - pass + root_nicknames = self.nssdb.get_trust_chain(nickname) # Try to work around a change in the F-11 certutil where untrusted # CA's are not shown in the chain. This will make a default IPA @@ -876,55 +1016,20 @@ class CertDB(object): def trust_root_cert(self, root_nickname): if root_nickname is None: - root_logger.debug("Unable to identify root certificate to trust. Continueing but things are likely to fail.") + root_logger.debug("Unable to identify root certificate to trust. Continuing but things are likely to fail.") return - if root_nickname[:7] == "Builtin": - root_logger.debug("No need to add trust for built-in root CA's, skipping %s" % root_nickname) - else: - try: - self.run_certutil(["-M", "-n", root_nickname, - "-t", "CT,CT,"]) - except ipautil.CalledProcessError, e: - root_logger.error("Setting trust on %s failed" % root_nickname) + try: + self.nssdb.trust_root_cert(root_nickname) + except RuntimeError: + pass def find_server_certs(self): - p = subprocess.Popen(["/usr/bin/certutil", "-d", self.secdir, - "-L"], stdout=subprocess.PIPE) - - certs = p.stdout.read() - - certs = certs.split("\n") - - server_certs = [] - - for cert in certs: - fields = cert.split() - if not len(fields): - continue - flags = fields[-1] - if 'u' in flags: - name = " ".join(fields[0:-1]) - # NSS 3.12 added a header to the certutil output - if name == "Certificate Nickname Trust": - continue - server_certs.append((name, flags)) - - return server_certs + return self.nssdb.find_server_certs() def import_pkcs12(self, pkcs12_fname, passwd_fname=None): - args = ["/usr/bin/pk12util", "-d", self.secdir, - "-i", pkcs12_fname, - "-k", self.passwd_fname] - if passwd_fname: - args = args + ["-w", passwd_fname] - try: - ipautil.run(args) - except ipautil.CalledProcessError, e: - if e.returncode == 17: - raise RuntimeError("incorrect password") - else: - raise RuntimeError("unknown error import pkcs#12 file") + return self.nssdb.import_pkcs12(pkcs12_fname, self.passwd_fname, + pkcs_password_filename=passwd_fname) def export_pkcs12(self, pkcs12_fname, pkcs12_pwd_fname, nickname=None): if nickname is None: @@ -1100,18 +1205,9 @@ class CertDB(object): "-in", p12_fname, "-out", pem_fname, "-passin", "file:" + p12_pwd_fname]) - def backup_files(self): - self.fstore.backup_file(self.noise_fname) - self.fstore.backup_file(self.passwd_fname) - self.fstore.backup_file(self.certdb_fname) - self.fstore.backup_file(self.keydb_fname) - self.fstore.backup_file(self.secmod_fname) - self.fstore.backup_file(self.cacert_fname) - self.fstore.backup_file(self.pk12_fname) - self.fstore.backup_file(self.pin_fname) - self.fstore.backup_file(self.certreq_fname) - self.fstore.backup_file(self.certder_fname) - def publish_ca_cert(self, location): shutil.copy(self.cacert_fname, location) os.chmod(location, 0444) + + def export_pem_cert(self, nickname, location): + return self.nssdb.export_pem_cert(nickname, location) -- cgit