diff options
| author | Fraser Tweedale <ftweedal@redhat.com> | 2016-10-13 17:12:31 +1000 |
|---|---|---|
| committer | David Kupka <dkupka@redhat.com> | 2016-11-10 10:21:47 +0100 |
| commit | db116f73fe5fc199bb2e28103cf5e3e2a24eab4c (patch) | |
| tree | ff1a043b376ec4d98b6399040a868e8b45725ee0 /ipaserver | |
| parent | c57dc890b2bf447ab575f2e91249179bce3f05d5 (diff) | |
x509: use python-cryptography to process certs
Update x509.load_certificate and related functions to return
python-cryptography ``Certificate`` objects. Update the call sites
accordingly, including removal of NSS initialisation code.
Also update GeneralName parsing code to return python-cryptography
GeneralName values, for consistency with other code that processes
GeneralNames. The new function, `get_san_general_names`, and
associated helper functions, can be removed when python-cryptography
provides a way to deal with unrecognised critical extensions.
Part of: https://fedorahosted.org/freeipa/ticket/6398
Reviewed-By: Jan Cholasta <jcholast@redhat.com>
Reviewed-By: Florence Blanc-Renaud <frenaud@redhat.com>
Diffstat (limited to 'ipaserver')
| -rw-r--r-- | ipaserver/install/ca.py | 2 | ||||
| -rw-r--r-- | ipaserver/install/cainstance.py | 25 | ||||
| -rw-r--r-- | ipaserver/install/certs.py | 9 | ||||
| -rw-r--r-- | ipaserver/install/installutils.py | 14 | ||||
| -rw-r--r-- | ipaserver/install/ipa_cacert_manage.py | 103 | ||||
| -rw-r--r-- | ipaserver/install/krainstance.py | 2 | ||||
| -rw-r--r-- | ipaserver/plugins/cert.py | 115 | ||||
| -rw-r--r-- | ipaserver/plugins/service.py | 20 |
8 files changed, 146 insertions, 144 deletions
diff --git a/ipaserver/install/ca.py b/ipaserver/install/ca.py index 88ec6277f..921e49495 100644 --- a/ipaserver/install/ca.py +++ b/ipaserver/install/ca.py @@ -102,7 +102,7 @@ def install_check(standalone, replica_config, options): cert = db.get_cert_from_db(nickname) if not cert: continue - subject = DN(str(x509.get_subject(cert))) + subject = DN(x509.load_certificate(cert).subject) if subject in (DN('CN=Certificate Authority', subject_base), DN('CN=IPA RA', subject_base)): raise ScriptError( diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py index 1b7acef70..7b26e749e 100644 --- a/ipaserver/install/cainstance.py +++ b/ipaserver/install/cainstance.py @@ -554,9 +554,9 @@ class CAInstance(DogtagInstance): config.set("CA", "pki_req_ext_data", "1E0A00530075006200430041") elif self.external == 2: - cert = x509.load_certificate_from_file(self.cert_file) cert_file = tempfile.NamedTemporaryFile() - x509.write_certificate(cert.der_data, cert_file.name) + with open(self.cert_file) as f: + x509.write_certificate(f.read(), cert_file.name) cert_file.flush() result = ipautil.run( @@ -778,7 +778,7 @@ class CAInstance(DogtagInstance): userstate=["1"], userCertificate=[cert_data], description=['2;%s;%s;%s' % ( - cert.serial_number, + cert.serial, DN(('CN', 'Certificate Authority'), self.subject_base), DN(('CN', 'IPA RA'), self.subject_base))]) conn.add_entry(entry) @@ -1674,8 +1674,9 @@ def update_people_entry(dercert): is needed when a certificate is renewed. """ def make_filter(dercert): - subject = x509.get_subject(dercert, datatype=x509.DER) - issuer = x509.get_issuer(dercert, datatype=x509.DER) + cert = x509.load_certificate(dercert, datatype=x509.DER) + subject = DN(cert.subject) + issuer = DN(cert.issuer) return ldap2.ldap2.combine_filters( [ ldap2.ldap2.make_filter({'objectClass': 'inetOrgPerson'}), @@ -1686,9 +1687,10 @@ def update_people_entry(dercert): ldap2.ldap2.MATCH_ALL) def make_entry(dercert, entry): - serial_number = x509.get_serial_number(dercert, datatype=x509.DER) - subject = x509.get_subject(dercert, datatype=x509.DER) - issuer = x509.get_issuer(dercert, datatype=x509.DER) + cert = x509.load_certificate(dercert, datatype=x509.DER) + serial_number = cert.serial + subject = DN(cert.subject) + issuer = DN(cert.issuer) entry['usercertificate'].append(dercert) entry['description'] = '2;%d;%s;%s' % (serial_number, issuer, subject) return entry @@ -1702,15 +1704,16 @@ def update_authority_entry(dercert): serial number to match the given cert. """ def make_filter(dercert): - subject = x509.get_subject(dercert, datatype=x509.DER) + cert = x509.load_certificate(dercert, datatype=x509.DER) + subject = str(DN(cert.subject)) return ldap2.ldap2.make_filter( dict(objectclass='authority', authoritydn=subject), rules=ldap2.ldap2.MATCH_ALL, ) def make_entry(dercert, entry): - serial_number = x509.get_serial_number(dercert, datatype=x509.DER) - entry['authoritySerial'] = serial_number + cert = x509.load_certificate(dercert, datatype=x509.DER) + entry['authoritySerial'] = cert.serial return entry return __update_entry_from_cert(make_filter, make_entry, dercert) diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 31fd36cc3..a73025099 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -60,9 +60,8 @@ def get_cert_nickname(cert): representation of the first RDN in the subject and subject_dn is a DN object. """ - nsscert = x509.load_certificate(cert) - subject = str(nsscert.subject) - dn = DN(subject) + cert_obj = x509.load_certificate(cert) + dn = DN(cert_obj.subject) return (str(dn[0]), dn) @@ -304,8 +303,8 @@ class CertDB(object): return cert = self.get_cert_from_db(nickname) - nsscert = x509.load_certificate(cert, dbdir=self.secdir) - subject = str(nsscert.subject) + cert_obj = x509.load_certificate(cert) + subject = str(DN(cert_obj.subject)) certmonger.add_principal(request_id, principal) certmonger.add_subject(request_id, subject) diff --git a/ipaserver/install/installutils.py b/ipaserver/install/installutils.py index fb9579a07..bee501a6e 100644 --- a/ipaserver/install/installutils.py +++ b/ipaserver/install/installutils.py @@ -921,10 +921,9 @@ def load_pkcs12(cert_files, key_password, key_nickname, ca_cert_files, if ca_cert is None: ca_cert = cert - nss_cert = x509.load_certificate(cert, x509.DER) - subject = DN(str(nss_cert.subject)) - issuer = DN(str(nss_cert.issuer)) - del nss_cert + cert_obj = x509.load_certificate(cert, x509.DER) + subject = DN(cert_obj.subject) + issuer = DN(cert_obj.issuer) if subject == issuer: break @@ -1046,10 +1045,9 @@ def load_external_cert(files, subject_base): for nickname, _trust_flags in nssdb.list_certs(): cert = nssdb.get_cert(nickname, pem=True) - nss_cert = x509.load_certificate(cert) - subject = DN(str(nss_cert.subject)) - issuer = DN(str(nss_cert.issuer)) - del nss_cert + cert_obj = x509.load_certificate(cert) + subject = DN(cert_obj.subject) + issuer = DN(cert_obj.issuer) cache[nickname] = (cert, subject, issuer) if subject == ca_subject: diff --git a/ipaserver/install/ipa_cacert_manage.py b/ipaserver/install/ipa_cacert_manage.py index af08ba68c..0dcb70fea 100644 --- a/ipaserver/install/ipa_cacert_manage.py +++ b/ipaserver/install/ipa_cacert_manage.py @@ -21,8 +21,7 @@ from __future__ import print_function import os from optparse import OptionGroup -from nss import nss -from nss.error import NSPRError +from cryptography.hazmat.primitives import serialization import gssapi from ipapython import admintool, certmonger, ipautil @@ -187,7 +186,7 @@ class CACertManage(admintool.AdminTool): "--external-cert-file=/path/to/signed_certificate " "--external-cert-file=/path/to/external_ca_certificate") - def renew_external_step_2(self, ca, old_cert): + def renew_external_step_2(self, ca, old_cert_der): print("Importing the renewed CA certificate, please wait") options = self.options @@ -195,55 +194,54 @@ class CACertManage(admintool.AdminTool): cert_file, ca_file = installutils.load_external_cert( options.external_cert_files, x509.subject_base()) - nss_cert = None - nss.nss_init(paths.PKI_TOMCAT_ALIAS_DIR) - try: - nss_cert = x509.load_certificate(old_cert, x509.DER) - subject = nss_cert.subject - der_subject = x509.get_der_subject(old_cert, x509.DER) - #pylint: disable=E1101 - pkinfo = nss_cert.subject_public_key_info.format() - #pylint: enable=E1101 - - nss_cert = x509.load_certificate_from_file(cert_file.name) - cert = nss_cert.der_data - if nss_cert.subject != subject: - raise admintool.ScriptError( - "Subject name mismatch (visit " - "http://www.freeipa.org/page/Troubleshooting for " - "troubleshooting guide)") - if x509.get_der_subject(cert, x509.DER) != der_subject: - raise admintool.ScriptError( - "Subject name encoding mismatch (visit " - "http://www.freeipa.org/page/Troubleshooting for " - "troubleshooting guide)") - #pylint: disable=E1101 - if nss_cert.subject_public_key_info.format() != pkinfo: - raise admintool.ScriptError( - "Subject public key info mismatch (visit " - "http://www.freeipa.org/page/Troubleshooting for " - "troubleshooting guide)") - #pylint: enable=E1101 - finally: - del nss_cert - nss.nss_shutdown() + old_cert_obj = x509.load_certificate(old_cert_der, x509.DER) + old_der_subject = x509.get_der_subject(old_cert_der, x509.DER) + old_spki = old_cert_obj.public_key().public_bytes( + serialization.Encoding.DER, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + + with open(cert_file.name) as f: + new_cert_data = f.read() + new_cert_der = x509.normalize_certificate(new_cert_data) + new_cert_obj = x509.load_certificate(new_cert_der, x509.DER) + new_der_subject = x509.get_der_subject(new_cert_der, x509.DER) + new_spki = new_cert_obj.public_key().public_bytes( + serialization.Encoding.DER, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + + if new_cert_obj.subject != old_cert_obj.subject: + raise admintool.ScriptError( + "Subject name mismatch (visit " + "http://www.freeipa.org/page/Troubleshooting for " + "troubleshooting guide)") + if new_der_subject != old_der_subject: + raise admintool.ScriptError( + "Subject name encoding mismatch (visit " + "http://www.freeipa.org/page/Troubleshooting for " + "troubleshooting guide)") + if new_spki != old_spki: + raise admintool.ScriptError( + "Subject public key info mismatch (visit " + "http://www.freeipa.org/page/Troubleshooting for " + "troubleshooting guide)") with certs.NSSDatabase() as tmpdb: pw = ipautil.write_tmp_file(ipautil.ipa_generate_password()) tmpdb.create_db(pw.name) - tmpdb.add_cert(old_cert, 'IPA CA', 'C,,') + tmpdb.add_cert(old_cert_der, 'IPA CA', 'C,,') try: - tmpdb.add_cert(cert, 'IPA CA', 'C,,') + tmpdb.add_cert(new_cert_der, 'IPA CA', 'C,,') except ipautil.CalledProcessError as e: raise admintool.ScriptError( "Not compatible with the current CA certificate: %s" % e) ca_certs = x509.load_certificate_list_from_file(ca_file.name) for ca_cert in ca_certs: - tmpdb.add_cert(ca_cert.der_data, str(ca_cert.subject), 'C,,') - del ca_certs - del ca_cert + data = ca_cert.public_bytes(serialization.Encoding.DER) + tmpdb.add_cert(data, str(DN(ca_cert.subject)), 'C,,') try: tmpdb.verify_ca_cert_validity('IPA CA') @@ -266,14 +264,14 @@ class CACertManage(admintool.AdminTool): ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn) try: entry = conn.get_entry(dn, ['usercertificate']) - entry['usercertificate'] = [cert] + entry['usercertificate'] = [new_cert_der] conn.update_entry(entry) except errors.NotFound: entry = conn.make_entry( dn, objectclass=['top', 'pkiuser', 'nscontainer'], cn=[self.cert_nickname], - usercertificate=[cert]) + usercertificate=[new_cert_der]) conn.add_entry(entry) except errors.EmptyModlist: pass @@ -313,21 +311,16 @@ class CACertManage(admintool.AdminTool): options = self.options cert_filename = self.args[1] - nss_cert = None try: - try: - nss_cert = x509.load_certificate_from_file(cert_filename) - except IOError as e: - raise admintool.ScriptError( - "Can't open \"%s\": %s" % (cert_filename, e)) - except (TypeError, NSPRError, ValueError) as e: - raise admintool.ScriptError("Not a valid certificate: %s" % e) - subject = nss_cert.subject - cert = nss_cert.der_data - finally: - del nss_cert + cert_obj = x509.load_certificate_from_file(cert_filename) + except IOError as e: + raise admintool.ScriptError( + "Can't open \"%s\": %s" % (cert_filename, e)) + except (TypeError, ValueError) as e: + raise admintool.ScriptError("Not a valid certificate: %s" % e) + cert = cert_obj.public_bytes(serialization.Encoding.DER) - nickname = options.nickname or str(subject) + nickname = options.nickname or str(DN(cert_obj.subject)) ca_certs = certstore.get_ca_certs_nss(api.Backend.ldap2, api.env.basedn, diff --git a/ipaserver/install/krainstance.py b/ipaserver/install/krainstance.py index 315057808..77f23c1c3 100644 --- a/ipaserver/install/krainstance.py +++ b/ipaserver/install/krainstance.py @@ -297,7 +297,7 @@ class KRAInstance(DogtagInstance): usertype=["undefined"], userCertificate=[cert_data], description=['2;%s;%s;%s' % ( - cert.serial_number, + cert.serial, DN(('CN', 'Certificate Authority'), self.subject_base), DN(('CN', 'IPA RA'), self.subject_base))]) conn.add_entry(entry) diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index a534c4d26..4362d8268 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -22,11 +22,11 @@ import base64 import collections import datetime +from operator import attrgetter import os import cryptography.x509 -from nss import nss -from nss.error import NSPRError +from cryptography.hazmat.primitives import hashes import six from ipalib import Command, Str, Int, Flag @@ -224,7 +224,7 @@ def bind_principal_can_manage_cert(cert): """Check that the bind principal can manage the given cert. ``cert`` - An NSS certificate object. + A python-cryptography ``Certificate`` object. """ bind_principal = kerberos.Principal(getattr(context, 'principal')) @@ -233,9 +233,14 @@ def bind_principal_can_manage_cert(cert): hostname = bind_principal.hostname - # If we have a hostname we want to verify that the subject - # of the certificate matches it. - return hostname == cert.subject.common_name #pylint: disable=E1101 + # Verify that hostname matches subject of cert. + # We check the "most-specific" CN value. + cns = cert.subject.get_attributes_for_oid( + cryptography.x509.oid.NameOID.COMMON_NAME) + if len(cns) == 0: + return False # no CN in subject + else: + return hostname == cns[-1].value class BaseCertObject(Object): @@ -370,30 +375,27 @@ class BaseCertObject(Object): attribute. """ - cert = obj.get('certificate') - if cert is not None: - cert = x509.load_certificate(cert) - obj['subject'] = DN(unicode(cert.subject)) - obj['issuer'] = DN(unicode(cert.issuer)) - obj['serial_number'] = cert.serial_number - obj['valid_not_before'] = unicode(cert.valid_not_before_str) - obj['valid_not_after'] = unicode(cert.valid_not_after_str) + if 'certificate' in obj: + cert = x509.load_certificate(obj['certificate']) + obj['subject'] = DN(cert.subject) + obj['issuer'] = DN(cert.issuer) + obj['serial_number'] = cert.serial + obj['valid_not_before'] = x509.format_datetime( + cert.not_valid_before) + obj['valid_not_after'] = x509.format_datetime( + cert.not_valid_after) if full: obj['md5_fingerprint'] = x509.to_hex_with_colons( - nss.md5_digest(cert.der_data)) + cert.fingerprint(hashes.MD5())) obj['sha1_fingerprint'] = x509.to_hex_with_colons( - nss.sha1_digest(cert.der_data)) + cert.fingerprint(hashes.SHA1())) - try: - ext_san = cert.get_extension(nss.SEC_OID_X509_SUBJECT_ALT_NAME) - general_names = x509.decode_generalnames(ext_san.value) - except KeyError: - general_names = [] + general_names = x509.process_othernames( + x509.get_san_general_names(cert)) - for name_type, _desc, name, der_name in general_names: + for gn in general_names: try: - self._add_san_attribute( - obj, full, name_type, name, der_name) + self._add_san_attribute(obj, full, gn) except Exception: # Invalid GeneralName (i.e. not a valid X.509 cert); # don't fail but log something about it @@ -404,45 +406,52 @@ class BaseCertObject(Object): if serial_number is not None: obj['serial_number_hex'] = u'0x%X' % serial_number - - def _add_san_attribute( - self, obj, full, name_type, name, der_name): + def _add_san_attribute(self, obj, full, gn): name_type_map = { - nss.certRFC822Name: 'san_rfc822name', - nss.certDNSName: 'san_dnsname', - nss.certX400Address: 'san_x400address', - nss.certDirectoryName: 'san_directoryname', - nss.certEDIPartyName: 'san_edipartyname', - nss.certURI: 'san_uri', - nss.certIPAddress: 'san_ipaddress', - nss.certRegisterID: 'san_oid', - (nss.certOtherName, x509.SAN_UPN): 'san_other_upn', - (nss.certOtherName, x509.SAN_KRB5PRINCIPALNAME): 'san_other_kpn', + cryptography.x509.RFC822Name: + ('san_rfc822name', attrgetter('value')), + cryptography.x509.DNSName: ('san_dnsname', attrgetter('value')), + # cryptography.x509.???: 'san_x400address', + cryptography.x509.DirectoryName: + ('san_directoryname', lambda x: DN(x.value)), + # cryptography.x509.???: 'san_edipartyname', + cryptography.x509.UniformResourceIdentifier: + ('san_uri', attrgetter('value')), + cryptography.x509.IPAddress: + ('san_ipaddress', attrgetter('value')), + cryptography.x509.RegisteredID: + ('san_oid', attrgetter('value.dotted_string')), + cryptography.x509.OtherName: ('san_other', _format_othername), + x509.UPN: ('san_other_upn', attrgetter('name')), + x509.KRB5PrincipalName: ('san_other_kpn', attrgetter('name')), } default_attrs = { 'san_rfc822name', 'san_dnsname', 'san_other_upn', 'san_other_kpn', } - attr_name = name_type_map.get(name_type, 'san_other') + if type(gn) not in name_type_map: + return + + attr_name, format_name = name_type_map[type(gn)] if full or attr_name in default_attrs: - if attr_name != 'san_other': - name_formatted = name - else: - # display as "OID : b64(DER)" - name_formatted = u'{}:{}'.format( - name_type[1], base64.b64encode(der_name)) - attr_value = self.params[attr_name].type(name_formatted) + attr_value = self.params[attr_name].type(format_name(gn)) obj.setdefault(attr_name, []).append(attr_value) if full and attr_name.startswith('san_other_'): # also include known otherName in generic otherName attribute - name_formatted = u'{}:{}'.format( - name_type[1], base64.b64encode(der_name)) - attr_value = self.params['san_other'].type(name_formatted) + attr_value = self.params['san_other'].type(_format_othername(gn)) obj.setdefault('san_other', []).append(attr_value) +def _format_othername(on): + """Format a python-cryptography OtherName for display.""" + return u'{}:{}'.format( + on.type_id.dotted_string, + base64.b64encode(on.value) + ) + + class BaseCertMethod(Method): def get_options(self): yield self.obj.params['cacn'].clone(query=True) @@ -909,7 +918,7 @@ class cert_show(Retrieve, CertMethod, VirtualCommand): raise acierr # pylint: disable=E0702 ca_obj = api.Command.ca_show(options['cacn'])['result'] - if DN(unicode(cert.issuer)) != DN(ca_obj['ipacasubjectdn'][0]): + if DN(cert.issuer) != DN(ca_obj['ipacasubjectdn'][0]): # DN of cert differs from what we requested raise errors.NotFound( reason=_("Certificate with serial number %(serial)s " @@ -1132,16 +1141,16 @@ class cert_find(Search, CertMethod): def _get_cert_key(self, cert): try: - nss_cert = x509.load_certificate(cert, x509.DER) - except NSPRError as e: + cert_obj = x509.load_certificate(cert, x509.DER) + except ValueError as e: message = messages.SearchResultTruncated( reason=_("failed to load certificate: %s") % e, ) self.add_message(message) - raise ValueError("failed to load certificate") + raise - return (DN(unicode(nss_cert.issuer)), nss_cert.serial_number) + return (DN(cert_obj.issuer), cert_obj.serial) def _get_cert_obj(self, cert, all, raw, pkey_only): obj = {'certificate': unicode(base64.b64encode(cert))} diff --git a/ipaserver/plugins/service.py b/ipaserver/plugins/service.py index a39ba3249..ddae37fec 100644 --- a/ipaserver/plugins/service.py +++ b/ipaserver/plugins/service.py @@ -19,6 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from cryptography.hazmat.primitives import hashes import six from ipalib import api, errors, messages @@ -49,8 +50,6 @@ from ipalib import output from ipapython import kerberos from ipapython.dn import DN -import nss.nss as nss - if six.PY3: unicode = str @@ -268,16 +267,17 @@ def set_certificate_attrs(entry_attrs): cert = entry_attrs['usercertificate'] cert = x509.normalize_certificate(cert) cert = x509.load_certificate(cert, datatype=x509.DER) - entry_attrs['subject'] = unicode(cert.subject) - entry_attrs['serial_number'] = unicode(cert.serial_number) - entry_attrs['serial_number_hex'] = u'0x%X' % cert.serial_number - entry_attrs['issuer'] = unicode(cert.issuer) - entry_attrs['valid_not_before'] = unicode(cert.valid_not_before_str) - entry_attrs['valid_not_after'] = unicode(cert.valid_not_after_str) + entry_attrs['subject'] = unicode(DN(cert.subject)) + entry_attrs['serial_number'] = unicode(cert.serial) + entry_attrs['serial_number_hex'] = u'0x%X' % cert.serial + entry_attrs['issuer'] = unicode(DN(cert.issuer)) + entry_attrs['valid_not_before'] = x509.format_datetime( + cert.not_valid_before) + entry_attrs['valid_not_after'] = x509.format_datetime(cert.not_valid_after) entry_attrs['md5_fingerprint'] = x509.to_hex_with_colons( - nss.md5_digest(cert.der_data)) + cert.fingerprint(hashes.MD5())) entry_attrs['sha1_fingerprint'] = x509.to_hex_with_colons( - nss.sha1_digest(cert.der_data)) + cert.fingerprint(hashes.SHA1())) def check_required_principal(ldap, principal): """ |
