summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJan Cholasta <jcholast@redhat.com>2014-09-18 11:19:46 +0200
committerMartin Kosek <mkosek@redhat.com>2014-09-30 10:01:38 +0200
commit86c534df7de353662078feb9128e8125d89474f1 (patch)
tree756ffb60f84879e7c7f47f6490274037391b48a5
parent231f57cedb4fea26d3317fe2b1f30d043c7d2524 (diff)
downloadfreeipa-86c534df7de353662078feb9128e8125d89474f1.tar.gz
freeipa-86c534df7de353662078feb9128e8125d89474f1.tar.xz
freeipa-86c534df7de353662078feb9128e8125d89474f1.zip
Move NSSDatabase from ipaserver.certs to ipapython.certdb
https://fedorahosted.org/freeipa/ticket/4416 Reviewed-By: Rob Crittenden <rcritten@redhat.com>
-rw-r--r--ipapython/certdb.py451
-rw-r--r--ipaserver/install/certs.py448
2 files changed, 452 insertions, 447 deletions
diff --git a/ipapython/certdb.py b/ipapython/certdb.py
index 426c80996..e190a7093 100644
--- a/ipapython/certdb.py
+++ b/ipapython/certdb.py
@@ -18,8 +18,14 @@
#
import os
+import re
+import tempfile
+import shutil
+from nss import nss
+from nss.error import NSPRError
from ipaplatform.paths import paths
+from ipapython.ipa_log_manager import root_logger
from ipapython import ipautil
CA_NICKNAME_FMT = "%s IPA CA"
@@ -48,3 +54,448 @@ def create_ipa_nssdb():
os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'cert8.db'), 0644)
os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'key3.db'), 0644)
os.chmod(os.path.join(paths.IPA_NSSDB_DIR, 'secmod.db'), 0644)
+
+
+def find_cert_from_txt(cert, start=0):
+ """
+ Given a cert blob (str) which may or may not contian leading and
+ trailing text, pull out just the certificate part. This will return
+ the FIRST cert in a stream of data.
+
+ Returns a tuple (certificate, last position in cert)
+ """
+ s = cert.find('-----BEGIN CERTIFICATE-----', start)
+ e = cert.find('-----END CERTIFICATE-----', s)
+ if e > 0:
+ e = e + 25
+
+ if s < 0 or e < 0:
+ raise RuntimeError("Unable to find certificate")
+
+ cert = cert[s:e]
+ return (cert, e)
+
+
+class NSSDatabase(object):
+ """A general-purpose wrapper around a NSS cert database
+
+ For permanent NSS databases, pass the cert DB directory to __init__
+
+ For temporary databases, do not pass nssdir, and call close() when done
+ to remove the DB. Alternatively, a NSSDatabase can be used as a
+ context manager that calls close() automatically.
+ """
+ # Traditionally, we used CertDB for our NSS DB operations, but that class
+ # got too tied to IPA server details, killing reusability.
+ # BaseCertDB is a class that knows nothing about IPA.
+ # Generic NSS DB code should be moved here.
+ def __init__(self, nssdir=None):
+ if nssdir is None:
+ self.secdir = tempfile.mkdtemp()
+ self._is_temporary = True
+ else:
+ self.secdir = nssdir
+ self._is_temporary = False
+
+ def close(self):
+ if self._is_temporary:
+ shutil.rmtree(self.secdir)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, tb):
+ self.close()
+
+ def run_certutil(self, args, stdin=None):
+ new_args = [paths.CERTUTIL, "-d", self.secdir]
+ new_args = new_args + args
+ return ipautil.run(new_args, stdin)
+
+ def create_db(self, password_filename):
+ """Create cert DB
+
+ :param password_filename: Name of file containing the database password
+ """
+ self.run_certutil(["-N", "-f", password_filename])
+
+ def list_certs(self):
+ """Return nicknames and cert flags for all certs in the database
+
+ :return: List of (name, trust_flags) tuples
+ """
+ certs, stderr, returncode = self.run_certutil(["-L"])
+ certs = certs.splitlines()
+
+ # FIXME, this relies on NSS never changing the formatting of certutil
+ certlist = []
+ for cert in certs:
+ match = re.match(r'^(.+?)\s+(\w*,\w*,\w*)\s*$', cert)
+ if match:
+ certlist.append(match.groups())
+
+ return tuple(certlist)
+
+ def find_server_certs(self):
+ """Return nicknames and cert flags for server certs in the database
+
+ Server certs have an "u" character in the trust flags.
+
+ :return: List of (name, trust_flags) tuples
+ """
+ server_certs = []
+ for name, flags in self.list_certs():
+ if 'u' in flags:
+ server_certs.append((name, flags))
+
+ return server_certs
+
+ def get_trust_chain(self, nickname):
+ """Return names of certs in a given cert's trust chain
+
+ :param nickname: Name of the cert
+ :return: List of certificate names
+ """
+ root_nicknames = []
+ chain, stderr, returncode = self.run_certutil([
+ "-O", "-n", nickname])
+ chain = chain.splitlines()
+
+ for c in chain:
+ m = re.match('\s*"(.*)" \[.*', c)
+ if m:
+ root_nicknames.append(m.groups()[0])
+
+ return root_nicknames
+
+ def import_pkcs12(self, pkcs12_filename, db_password_filename,
+ pkcs12_passwd=None):
+ args = [paths.PK12UTIL, "-d", self.secdir,
+ "-i", pkcs12_filename,
+ "-k", db_password_filename, '-v']
+ if pkcs12_passwd is not None:
+ pkcs12_passwd = pkcs12_passwd + '\n'
+ args = args + ["-w", paths.DEV_STDIN]
+ try:
+ ipautil.run(args, stdin=pkcs12_passwd)
+ except ipautil.CalledProcessError, e:
+ if e.returncode == 17:
+ raise RuntimeError("incorrect password for pkcs#12 file %s" %
+ pkcs12_filename)
+ elif e.returncode == 10:
+ raise RuntimeError("Failed to open %s" % pkcs12_filename)
+ else:
+ raise RuntimeError("unknown error import pkcs#12 file %s" %
+ pkcs12_filename)
+
+ def import_files(self, files, db_password_filename, import_keys=False,
+ key_password=None, key_nickname=None):
+ """
+ Import certificates and a single private key from multiple files
+
+ The files may be in PEM and DER certificate, PKCS#7 certificate chain,
+ PKCS#8 and raw private key and PKCS#12 formats.
+
+ :param files: Names of files to import
+ :param db_password_filename: Name of file containing the database
+ password
+ :param import_keys: Whether to import private keys
+ :param key_password: Password to decrypt private keys
+ :param key_nickname: Nickname of the private key to import from PKCS#12
+ files
+ """
+ key_file = None
+ extracted_key = None
+ extracted_certs = ''
+
+ for filename in files:
+ try:
+ with open(filename, 'rb') as f:
+ data = f.read()
+ except IOError as e:
+ raise RuntimeError(
+ "Failed to open %s: %s" % (filename, e.strerror))
+
+ # Try to parse the file as PEM file
+ matches = list(re.finditer(
+ r'-----BEGIN (.+?)-----(.*?)-----END \1-----', data, re.DOTALL))
+ if matches:
+ loaded = False
+ for match in matches:
+ body = match.group()
+ label = match.group(1)
+ line = len(data[:match.start() + 1].splitlines())
+
+ if label in ('CERTIFICATE', 'X509 CERTIFICATE',
+ 'X.509 CERTIFICATE'):
+ try:
+ x509.load_certificate(match.group(2))
+ except NSPRError as e:
+ if label != 'CERTIFICATE':
+ root_logger.warning(
+ "Skipping certificate in %s at line %s: %s",
+ filename, line, e)
+ continue
+ else:
+ extracted_certs += body + '\n'
+ loaded = True
+ continue
+
+ if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'):
+ args = [
+ paths.OPENSSL, 'pkcs7',
+ '-print_certs',
+ ]
+ try:
+ stdout, stderr, rc = ipautil.run(args, stdin=body)
+ except ipautil.CalledProcessError as e:
+ if label == 'CERTIFICATE':
+ root_logger.warning(
+ "Skipping certificate in %s at line %s: %s",
+ filename, line, e)
+ else:
+ root_logger.warning(
+ "Skipping PKCS#7 in %s at line %s: %s",
+ filename, line, e)
+ continue
+ else:
+ extracted_certs += stdout + '\n'
+ loaded = True
+ continue
+
+ if label in ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY',
+ 'RSA PRIVATE KEY', 'DSA PRIVATE KEY',
+ 'EC PRIVATE KEY'):
+ if not import_keys:
+ continue
+
+ if key_file:
+ raise RuntimeError(
+ "Can't load private key from both %s and %s" %
+ (key_file, filename))
+
+ args = [
+ paths.OPENSSL, 'pkcs8',
+ '-topk8',
+ '-passout', 'file:' + db_password_filename,
+ ]
+ if ((label != 'PRIVATE KEY' and key_password) or
+ label == 'ENCRYPTED PRIVATE KEY'):
+ key_pwdfile = ipautil.write_tmp_file(key_password)
+ args += [
+ '-passin', 'file:' + key_pwdfile.name,
+ ]
+ try:
+ stdout, stderr, rc = ipautil.run(args, stdin=body)
+ except ipautil.CalledProcessError as e:
+ root_logger.warning(
+ "Skipping private key in %s at line %s: %s",
+ filename, line, e)
+ continue
+ else:
+ extracted_key = stdout
+ key_file = filename
+ loaded = True
+ continue
+ if loaded:
+ continue
+ raise RuntimeError("Failed to load %s" % filename)
+
+ # Try to load the file as DER certificate
+ try:
+ x509.load_certificate(data, x509.DER)
+ except NSPRError:
+ pass
+ else:
+ data = x509.make_pem(base64.b64encode(data))
+ extracted_certs += data + '\n'
+ continue
+
+ # Try to import the file as PKCS#12 file
+ if import_keys:
+ try:
+ self.import_pkcs12(
+ filename, db_password_filename, key_password)
+ except RuntimeError:
+ pass
+ else:
+ if key_file:
+ raise RuntimeError(
+ "Can't load private key from both %s and %s" %
+ (key_file, filename))
+ key_file = filename
+
+ server_certs = self.find_server_certs()
+ if key_nickname:
+ for nickname, trust_flags in server_certs:
+ if nickname == key_nickname:
+ break
+ else:
+ raise RuntimeError(
+ "Server certificate \"%s\" not found in %s" %
+ (key_nickname, filename))
+ else:
+ if len(server_certs) > 1:
+ raise RuntimeError(
+ "%s server certificates found in %s, "
+ "expecting only one" %
+ (len(server_certs), filename))
+
+ continue
+
+ raise RuntimeError("Failed to load %s" % filename)
+
+ if import_keys and not key_file:
+ raise RuntimeError(
+ "No server certificates found in %s" % (', '.join(files)))
+
+ nss_certs = x509.load_certificate_list(extracted_certs)
+ nss_cert = None
+ for nss_cert in nss_certs:
+ nickname = str(nss_cert.subject)
+ self.add_cert(nss_cert.der_data, nickname, ',,')
+ del nss_certs, nss_cert
+
+ if extracted_key:
+ in_file = ipautil.write_tmp_file(extracted_certs + extracted_key)
+ out_file = tempfile.NamedTemporaryFile()
+ out_password = ipautil.ipa_generate_password()
+ out_pwdfile = ipautil.write_tmp_file(out_password)
+ args = [
+ paths.OPENSSL, 'pkcs12',
+ '-export',
+ '-in', in_file.name,
+ '-out', out_file.name,
+ '-passin', 'file:' + db_password_filename,
+ '-passout', 'file:' + out_pwdfile.name,
+ ]
+ try:
+ ipautil.run(args)
+ except ipautil.CalledProcessError as e:
+ raise RuntimeError(
+ "No matching certificate found for private key from %s" %
+ key_file)
+
+ self.import_pkcs12(out_file.name, db_password_filename,
+ out_password)
+
+ def trust_root_cert(self, root_nickname, trust_flags=None):
+ if root_nickname[:7] == "Builtin":
+ root_logger.debug(
+ "No need to add trust for built-in root CAs, skipping %s" %
+ root_nickname)
+ else:
+ if trust_flags is None:
+ trust_flags = 'C,,'
+ try:
+ self.run_certutil(["-M", "-n", root_nickname,
+ "-t", trust_flags])
+ except ipautil.CalledProcessError, e:
+ raise RuntimeError(
+ "Setting trust on %s failed" % root_nickname)
+
+ def get_cert(self, nickname, pem=False):
+ args = ['-L', '-n', nickname]
+ if pem:
+ args.append('-a')
+ else:
+ args.append('-r')
+ try:
+ cert, err, returncode = self.run_certutil(args)
+ except ipautil.CalledProcessError:
+ raise RuntimeError("Failed to get %s" % nickname)
+ return cert
+
+ def export_pem_cert(self, nickname, location):
+ """Export the given cert to PEM file in the given location"""
+ cert = self.get_cert(nickname)
+ with open(location, "w+") as fd:
+ fd.write(cert)
+ os.chmod(location, 0444)
+
+ def import_pem_cert(self, nickname, flags, location):
+ """Import a cert form the given PEM file.
+
+ The file must contain exactly one certificate.
+ """
+ try:
+ with open(location) as fd:
+ certs = fd.read()
+ except IOError as e:
+ raise RuntimeError(
+ "Failed to open %s: %s" % (location, e.strerror)
+ )
+
+ cert, st = find_cert_from_txt(certs)
+ self.add_cert(cert, nickname, flags, pem=True)
+
+ try:
+ find_cert_from_txt(certs, st)
+ except RuntimeError:
+ pass
+ else:
+ raise ValueError('%s contains more than one certificate' %
+ location)
+
+ def add_cert(self, cert, nick, flags, pem=False):
+ args = ["-A", "-n", nick, "-t", flags]
+ if pem:
+ args.append("-a")
+ self.run_certutil(args, stdin=cert)
+
+ def delete_cert(self, nick):
+ self.run_certutil(["-D", "-n", nick])
+
+ def verify_server_cert_validity(self, nickname, hostname):
+ """Verify a certificate is valid for a SSL server with given hostname
+
+ Raises a ValueError if the certificate is invalid.
+ """
+ certdb = cert = None
+ if nss.nss_is_initialized():
+ nss.nss_shutdown()
+ nss.nss_init(self.secdir)
+ try:
+ certdb = nss.get_default_certdb()
+ cert = nss.find_cert_from_nickname(nickname)
+ intended_usage = nss.certificateUsageSSLServer
+ try:
+ approved_usage = cert.verify_now(certdb, True, intended_usage)
+ except NSPRError, e:
+ if e.errno != -8102:
+ raise ValueError(e.strerror)
+ approved_usage = 0
+ if not approved_usage & intended_usage:
+ raise ValueError('invalid for a SSL server')
+ if not cert.verify_hostname(hostname):
+ raise ValueError('invalid for server %s' % hostname)
+ finally:
+ del certdb, cert
+ nss.nss_shutdown()
+
+ return None
+
+ def verify_ca_cert_validity(self, nickname):
+ certdb = cert = None
+ if nss.nss_is_initialized():
+ nss.nss_shutdown()
+ nss.nss_init(self.secdir)
+ try:
+ certdb = nss.get_default_certdb()
+ cert = nss.find_cert_from_nickname(nickname)
+ if not cert.subject:
+ raise ValueError("has empty subject")
+ if not cert.is_ca_cert():
+ raise ValueError("not a CA certificate")
+ intended_usage = nss.certificateUsageSSLCA
+ try:
+ approved_usage = cert.verify_now(certdb, True, intended_usage)
+ except NSPRError, e:
+ if e.errno != -8102: # SEC_ERROR_INADEQUATE_KEY_USAGE
+ raise ValueError(e.strerror)
+ approved_usage = 0
+ if approved_usage & intended_usage != intended_usage:
+ raise ValueError('invalid for a CA')
+ finally:
+ del certdb, cert
+ nss.nss_shutdown()
diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py
index 55feb6596..5399a0fa5 100644
--- a/ipaserver/install/certs.py
+++ b/ipaserver/install/certs.py
@@ -19,7 +19,6 @@
import os
import stat
-import re
import sys
import tempfile
import shutil
@@ -28,15 +27,12 @@ import pwd
import base64
from hashlib import sha1
-from nss import nss
-from nss.error import NSPRError
-
from ipapython.ipa_log_manager import root_logger
from ipapython import dogtag
from ipapython import sysrestore
from ipapython import ipautil
from ipapython import certmonger
-from ipapython.certdb import get_ca_nickname
+from ipapython.certdb import get_ca_nickname, find_cert_from_txt, NSSDatabase
from ipapython.dn import DN
from ipalib import pkcs10, x509, api
from ipalib.errors import CertificateOperationError
@@ -48,23 +44,6 @@ from ipaplatform.paths import paths
# where apache can reach
NSS_DIR = paths.HTTPD_ALIAS_DIR
-def find_cert_from_txt(cert, start=0):
- """
- Given a cert blob (str) which may or may not contian leading and
- trailing text, pull out just the certificate part. This will return
- the FIRST cert in a stream of data.
-
- Returns a tuple (certificate, last position in cert)
- """
- s = cert.find('-----BEGIN CERTIFICATE-----', start)
- e = cert.find('-----END CERTIFICATE-----', s)
- if e > 0: e = e + 25
-
- if s < 0 or e < 0:
- raise RuntimeError("Unable to find certificate")
-
- cert = cert[s:e]
- return (cert, e)
def get_cert_nickname(cert):
"""
@@ -83,431 +62,6 @@ def get_cert_nickname(cert):
return (str(dn[0]), dn)
-class NSSDatabase(object):
- """A general-purpose wrapper around a NSS cert database
-
- For permanent NSS databases, pass the cert DB directory to __init__
-
- For temporary databases, do not pass nssdir, and call close() when done
- to remove the DB. Alternatively, a NSSDatabase can be used as a
- context manager that calls close() automatically.
- """
- # Traditionally, we used CertDB for our NSS DB operations, but that class
- # got too tied to IPA server details, killing reusability.
- # BaseCertDB is a class that knows nothing about IPA.
- # Generic NSS DB code should be moved here.
- def __init__(self, nssdir=None):
- if nssdir is None:
- self.secdir = tempfile.mkdtemp()
- self._is_temporary = True
- else:
- self.secdir = nssdir
- self._is_temporary = False
-
- def close(self):
- if self._is_temporary:
- shutil.rmtree(self.secdir)
-
- def __enter__(self):
- return self
-
- def __exit__(self, type, value, tb):
- self.close()
-
- def run_certutil(self, args, stdin=None):
- new_args = [paths.CERTUTIL, "-d", self.secdir]
- new_args = new_args + args
- return ipautil.run(new_args, stdin)
-
- def create_db(self, password_filename):
- """Create cert DB
-
- :param password_filename: Name of file containing the database password
- """
- self.run_certutil(["-N", "-f", password_filename])
-
- def list_certs(self):
- """Return nicknames and cert flags for all certs in the database
-
- :return: List of (name, trust_flags) tuples
- """
- certs, stderr, returncode = self.run_certutil(["-L"])
- certs = certs.splitlines()
-
- # FIXME, this relies on NSS never changing the formatting of certutil
- certlist = []
- for cert in certs:
- match = re.match(r'^(.+?)\s+(\w*,\w*,\w*)\s*$', cert)
- if match:
- certlist.append(match.groups())
-
- return tuple(certlist)
-
- def find_server_certs(self):
- """Return nicknames and cert flags for server certs in the database
-
- Server certs have an "u" character in the trust flags.
-
- :return: List of (name, trust_flags) tuples
- """
- server_certs = []
- for name, flags in self.list_certs():
- if 'u' in flags:
- server_certs.append((name, flags))
-
- return server_certs
-
- def get_trust_chain(self, nickname):
- """Return names of certs in a given cert's trust chain
-
- :param nickname: Name of the cert
- :return: List of certificate names
- """
- root_nicknames = []
- chain, stderr, returncode = self.run_certutil([
- "-O", "-n", nickname])
- chain = chain.splitlines()
-
- for c in chain:
- m = re.match('\s*"(.*)" \[.*', c)
- if m:
- root_nicknames.append(m.groups()[0])
-
- return root_nicknames
-
- def import_pkcs12(self, pkcs12_filename, db_password_filename,
- pkcs12_passwd=None):
- args = [paths.PK12UTIL, "-d", self.secdir,
- "-i", pkcs12_filename,
- "-k", db_password_filename, '-v']
- if pkcs12_passwd is not None:
- pkcs12_passwd = pkcs12_passwd + '\n'
- args = args + ["-w", paths.DEV_STDIN]
- try:
- ipautil.run(args, stdin=pkcs12_passwd)
- except ipautil.CalledProcessError, e:
- if e.returncode == 17:
- raise RuntimeError("incorrect password for pkcs#12 file %s" %
- pkcs12_filename)
- elif e.returncode == 10:
- raise RuntimeError("Failed to open %s" % pkcs12_filename)
- else:
- raise RuntimeError("unknown error import pkcs#12 file %s" %
- pkcs12_filename)
-
- def import_files(self, files, db_password_filename, import_keys=False,
- key_password=None, key_nickname=None):
- """
- Import certificates and a single private key from multiple files
-
- The files may be in PEM and DER certificate, PKCS#7 certificate chain,
- PKCS#8 and raw private key and PKCS#12 formats.
-
- :param files: Names of files to import
- :param db_password_filename: Name of file containing the database
- password
- :param import_keys: Whether to import private keys
- :param key_password: Password to decrypt private keys
- :param key_nickname: Nickname of the private key to import from PKCS#12
- files
- """
- key_file = None
- extracted_key = None
- extracted_certs = ''
-
- for filename in files:
- try:
- with open(filename, 'rb') as f:
- data = f.read()
- except IOError as e:
- raise RuntimeError(
- "Failed to open %s: %s" % (filename, e.strerror))
-
- # Try to parse the file as PEM file
- matches = list(re.finditer(
- r'-----BEGIN (.+?)-----(.*?)-----END \1-----', data, re.DOTALL))
- if matches:
- loaded = False
- for match in matches:
- body = match.group()
- label = match.group(1)
- line = len(data[:match.start() + 1].splitlines())
-
- if label in ('CERTIFICATE', 'X509 CERTIFICATE',
- 'X.509 CERTIFICATE'):
- try:
- x509.load_certificate(match.group(2))
- except NSPRError as e:
- if label != 'CERTIFICATE':
- root_logger.warning(
- "Skipping certificate in %s at line %s: %s",
- filename, line, e)
- continue
- else:
- extracted_certs += body + '\n'
- loaded = True
- continue
-
- if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'):
- args = [
- paths.OPENSSL, 'pkcs7',
- '-print_certs',
- ]
- try:
- stdout, stderr, rc = ipautil.run(args, stdin=body)
- except ipautil.CalledProcessError as e:
- if label == 'CERTIFICATE':
- root_logger.warning(
- "Skipping certificate in %s at line %s: %s",
- filename, line, e)
- else:
- root_logger.warning(
- "Skipping PKCS#7 in %s at line %s: %s",
- filename, line, e)
- continue
- else:
- extracted_certs += stdout + '\n'
- loaded = True
- continue
-
- if label in ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY',
- 'RSA PRIVATE KEY', 'DSA PRIVATE KEY',
- 'EC PRIVATE KEY'):
- if not import_keys:
- continue
-
- if key_file:
- raise RuntimeError(
- "Can't load private key from both %s and %s" %
- (key_file, filename))
-
- args = [
- paths.OPENSSL, 'pkcs8',
- '-topk8',
- '-passout', 'file:' + db_password_filename,
- ]
- if ((label != 'PRIVATE KEY' and key_password) or
- label == 'ENCRYPTED PRIVATE KEY'):
- key_pwdfile = ipautil.write_tmp_file(key_password)
- args += [
- '-passin', 'file:' + key_pwdfile.name,
- ]
- try:
- stdout, stderr, rc = ipautil.run(args, stdin=body)
- except ipautil.CalledProcessError as e:
- root_logger.warning(
- "Skipping private key in %s at line %s: %s",
- filename, line, e)
- continue
- else:
- extracted_key = stdout
- key_file = filename
- loaded = True
- continue
- if loaded:
- continue
- raise RuntimeError("Failed to load %s" % filename)
-
- # Try to load the file as DER certificate
- try:
- x509.load_certificate(data, x509.DER)
- except NSPRError:
- pass
- else:
- data = x509.make_pem(base64.b64encode(data))
- extracted_certs += data + '\n'
- continue
-
- # Try to import the file as PKCS#12 file
- if import_keys:
- try:
- self.import_pkcs12(
- filename, db_password_filename, key_password)
- except RuntimeError:
- pass
- else:
- if key_file:
- raise RuntimeError(
- "Can't load private key from both %s and %s" %
- (key_file, filename))
- key_file = filename
-
- server_certs = self.find_server_certs()
- if key_nickname:
- for nickname, trust_flags in server_certs:
- if nickname == key_nickname:
- break
- else:
- raise RuntimeError(
- "Server certificate \"%s\" not found in %s" %
- (key_nickname, filename))
- else:
- if len(server_certs) > 1:
- raise RuntimeError(
- "%s server certificates found in %s, "
- "expecting only one" %
- (len(server_certs), filename))
-
- continue
-
- raise RuntimeError("Failed to load %s" % filename)
-
- if import_keys and not key_file:
- raise RuntimeError(
- "No server certificates found in %s" % (', '.join(files)))
-
- nss_certs = x509.load_certificate_list(extracted_certs)
- nss_cert = None
- for nss_cert in nss_certs:
- nickname = str(nss_cert.subject)
- self.add_cert(nss_cert.der_data, nickname, ',,')
- del nss_certs, nss_cert
-
- if extracted_key:
- in_file = ipautil.write_tmp_file(extracted_certs + extracted_key)
- out_file = tempfile.NamedTemporaryFile()
- out_password = ipautil.ipa_generate_password()
- out_pwdfile = ipautil.write_tmp_file(out_password)
- args = [
- paths.OPENSSL, 'pkcs12',
- '-export',
- '-in', in_file.name,
- '-out', out_file.name,
- '-passin', 'file:' + db_password_filename,
- '-passout', 'file:' + out_pwdfile.name,
- ]
- try:
- ipautil.run(args)
- except ipautil.CalledProcessError as e:
- raise RuntimeError(
- "No matching certificate found for private key from %s" %
- key_file)
-
- self.import_pkcs12(out_file.name, db_password_filename,
- out_password)
-
- def trust_root_cert(self, root_nickname, trust_flags=None):
- if root_nickname[:7] == "Builtin":
- root_logger.debug(
- "No need to add trust for built-in root CAs, skipping %s" %
- root_nickname)
- else:
- if trust_flags is None:
- trust_flags = 'C,,'
- try:
- self.run_certutil(["-M", "-n", root_nickname,
- "-t", trust_flags])
- except ipautil.CalledProcessError, e:
- raise RuntimeError(
- "Setting trust on %s failed" % root_nickname)
-
- def get_cert(self, nickname, pem=False):
- args = ['-L', '-n', nickname]
- if pem:
- args.append('-a')
- else:
- args.append('-r')
- try:
- cert, err, returncode = self.run_certutil(args)
- except ipautil.CalledProcessError:
- raise RuntimeError("Failed to get %s" % nickname)
- return cert
-
- def export_pem_cert(self, nickname, location):
- """Export the given cert to PEM file in the given location"""
- cert = self.get_cert(nickname)
- with open(location, "w+") as fd:
- fd.write(cert)
- os.chmod(location, 0444)
-
- def import_pem_cert(self, nickname, flags, location):
- """Import a cert form the given PEM file.
-
- The file must contain exactly one certificate.
- """
- try:
- with open(location) as fd:
- certs = fd.read()
- except IOError as e:
- raise RuntimeError(
- "Failed to open %s: %s" % (location, e.strerror)
- )
-
- cert, st = find_cert_from_txt(certs)
- self.add_cert(cert, nickname, flags, pem=True)
-
- try:
- find_cert_from_txt(certs, st)
- except RuntimeError:
- pass
- else:
- raise ValueError('%s contains more than one certificate' %
- location)
-
- def add_cert(self, cert, nick, flags, pem=False):
- args = ["-A", "-n", nick, "-t", flags]
- if pem:
- args.append("-a")
- self.run_certutil(args, stdin=cert)
-
- def delete_cert(self, nick):
- self.run_certutil(["-D", "-n", nick])
-
- def verify_server_cert_validity(self, nickname, hostname):
- """Verify a certificate is valid for a SSL server with given hostname
-
- Raises a ValueError if the certificate is invalid.
- """
- certdb = cert = None
- if nss.nss_is_initialized():
- nss.nss_shutdown()
- nss.nss_init(self.secdir)
- try:
- certdb = nss.get_default_certdb()
- cert = nss.find_cert_from_nickname(nickname)
- intended_usage = nss.certificateUsageSSLServer
- try:
- approved_usage = cert.verify_now(certdb, True, intended_usage)
- except NSPRError, e:
- if e.errno != -8102:
- raise ValueError(e.strerror)
- approved_usage = 0
- if not approved_usage & intended_usage:
- raise ValueError('invalid for a SSL server')
- if not cert.verify_hostname(hostname):
- raise ValueError('invalid for server %s' % hostname)
- finally:
- del certdb, cert
- nss.nss_shutdown()
-
- return None
-
- def verify_ca_cert_validity(self, nickname):
- certdb = cert = None
- if nss.nss_is_initialized():
- nss.nss_shutdown()
- nss.nss_init(self.secdir)
- try:
- certdb = nss.get_default_certdb()
- cert = nss.find_cert_from_nickname(nickname)
- if not cert.subject:
- raise ValueError("has empty subject")
- if not cert.is_ca_cert():
- raise ValueError("not a CA certificate")
- intended_usage = nss.certificateUsageSSLCA
- try:
- approved_usage = cert.verify_now(certdb, True, intended_usage)
- except NSPRError, e:
- if e.errno != -8102: # SEC_ERROR_INADEQUATE_KEY_USAGE
- raise ValueError(e.strerror)
- approved_usage = 0
- if approved_usage & intended_usage != intended_usage:
- raise ValueError('invalid for a CA')
- finally:
- del certdb, cert
- nss.nss_shutdown()
-
-
class CertDB(object):
"""An IPA-server-specific wrapper around NSS