diff options
author | Stanislav Laznicka <slaznick@redhat.com> | 2017-06-16 10:18:07 +0200 |
---|---|---|
committer | Pavel Vomacka <pvomacka@redhat.com> | 2017-07-27 10:28:58 +0200 |
commit | b5732efda6d27f588adbc3f41259bc4511716f43 (patch) | |
tree | 3bd131be4ccef3c31acb84b8c823df8660c9b266 /ipalib | |
parent | 4375ef860fdd8221baeff23e88f9217b0cddc5ac (diff) | |
download | freeipa-b5732efda6d27f588adbc3f41259bc4511716f43.tar.gz freeipa-b5732efda6d27f588adbc3f41259bc4511716f43.tar.xz freeipa-b5732efda6d27f588adbc3f41259bc4511716f43.zip |
x509: Make certificates represented as objects
https://pagure.io/freeipa/issue/4985
Reviewed-By: Fraser Tweedale <ftweedal@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
Reviewed-By: Martin Basti <mbasti@redhat.com>
Diffstat (limited to 'ipalib')
-rw-r--r-- | ipalib/install/certstore.py | 50 | ||||
-rw-r--r-- | ipalib/x509.py | 532 |
2 files changed, 357 insertions, 225 deletions
diff --git a/ipalib/install/certstore.py b/ipalib/install/certstore.py index 0d0902fc1..89302f6d8 100644 --- a/ipalib/install/certstore.py +++ b/ipalib/install/certstore.py @@ -28,13 +28,13 @@ from ipapython.dn import DN from ipapython.certdb import get_ca_nickname, TrustFlags from ipalib import errors, x509 -def _parse_cert(dercert): + +def _parse_cert(cert): try: - cert = x509.load_der_x509_certificate(dercert) subject = DN(cert.subject) issuer = DN(cert.issuer) serial_number = cert.serial_number - public_key_info = x509.get_der_public_key_info(dercert, x509.DER) + public_key_info = cert.public_key_info_bytes except (ValueError, PyAsn1Error) as e: raise ValueError("failed to decode certificate: %s" % e) @@ -45,15 +45,15 @@ def _parse_cert(dercert): return subject, issuer_serial, public_key_info -def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage): +def init_ca_entry(entry, cert, nickname, trusted, ext_key_usage): """ Initialize certificate store entry for a CA certificate. """ - subject, issuer_serial, public_key = _parse_cert(dercert) + subject, issuer_serial, public_key = _parse_cert(cert) if ext_key_usage is not None: try: - cert_eku = x509.get_ext_key_usage(dercert, x509.DER) + cert_eku = cert.extended_key_usage except ValueError as e: raise ValueError("failed to decode certificate: %s" % e) if cert_eku is not None: @@ -68,7 +68,7 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage): entry['ipaCertSubject'] = [subject] entry['ipaCertIssuerSerial'] = [issuer_serial] entry['ipaPublicKey'] = [public_key] - entry['cACertificate;binary'] = [dercert] + entry['cACertificate;binary'] = [cert.public_bytes(x509.Encoding.DER)] if trusted is not None: entry['ipaKeyTrust'] = ['trusted' if trusted else 'distrusted'] @@ -79,11 +79,12 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage): entry['ipaKeyExtUsage'] = ext_key_usage -def update_compat_ca(ldap, base_dn, dercert): +def update_compat_ca(ldap, base_dn, cert): """ Update the CA certificate in cn=CAcert,cn=ipa,cn=etc,SUFFIX. """ dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), base_dn) + dercert = cert.public_bytes(x509.Encoding.DER) try: entry = ldap.get_entry(dn, attrs_list=['cACertificate;binary']) entry.single_value['cACertificate;binary'] = dercert @@ -152,12 +153,12 @@ def add_ca_cert(ldap, base_dn, dercert, nickname, trusted=None, clean_old_config(ldap, base_dn, dn, config_ipa, config_compat) -def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, +def update_ca_cert(ldap, base_dn, cert, trusted=None, ext_key_usage=None, config_ipa=False, config_compat=False): """ Update existing entry for a CA certificate in the certificate store. """ - subject, issuer_serial, public_key = _parse_cert(dercert) + subject, issuer_serial, public_key = _parse_cert(cert) filter = ldap.make_filter({'ipaCertSubject': subject}) result, _truncated = ldap.find_entries( @@ -172,7 +173,7 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, for old_cert in entry['cACertificate;binary']: # Check if we are adding a new cert - if old_cert == dercert: + if old_cert == cert: break else: # We are adding a new cert, validate it @@ -181,7 +182,8 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, if entry.single_value['ipaPublicKey'] != public_key: raise ValueError("subject public key info mismatch") entry['ipaCertIssuerSerial'].append(issuer_serial) - entry['cACertificate;binary'].append(dercert) + entry['cACertificate;binary'].append( + cert.public_bytes(x509.Encoding.DER)) # Update key trust if trusted is not None: @@ -217,22 +219,24 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None, entry.setdefault('ipaConfigString', []).append('compatCA') if is_compat or config_compat: - update_compat_ca(ldap, base_dn, dercert) + update_compat_ca(ldap, base_dn, cert) ldap.update_entry(entry) clean_old_config(ldap, base_dn, dn, config_ipa, config_compat) -def put_ca_cert(ldap, base_dn, dercert, nickname, trusted=None, +def put_ca_cert(ldap, base_dn, cert, nickname, trusted=None, ext_key_usage=None, config_ipa=False, config_compat=False): """ Add or update entry for a CA certificate in the certificate store. + + :param cert: IPACertificate """ try: - update_ca_cert(ldap, base_dn, dercert, trusted, ext_key_usage, + update_ca_cert(ldap, base_dn, cert, trusted, ext_key_usage, config_ipa=config_ipa, config_compat=config_compat) except errors.NotFound: - add_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage, + add_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage, config_ipa=config_ipa, config_compat=config_compat) except errors.EmptyModlist: pass @@ -306,11 +310,12 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, for cert in entry.get('cACertificate;binary', []): try: - _parse_cert(cert) + cert_obj = x509.load_der_x509_certificate(cert) + _parse_cert(cert_obj) except ValueError: certs = [] break - certs.append((cert, nickname, trusted, ext_key_usage)) + certs.append((cert_obj, nickname, trusted, ext_key_usage)) except errors.NotFound: try: ldap.get_entry(container_dn, ['']) @@ -319,7 +324,8 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca, dn = DN(('cn', 'CAcert'), config_dn) entry = ldap.get_entry(dn, ['cACertificate;binary']) - cert = entry.single_value['cACertificate;binary'] + cert = x509.load_der_x509_certificate( + entry.single_value['cACertificate;binary']) try: subject, _issuer_serial, _public_key_info = _parse_cert(cert) except ValueError: @@ -354,16 +360,18 @@ def key_policy_to_trust_flags(trusted, ca, ext_key_usage): return TrustFlags(False, trusted, ca, ext_key_usage) -def put_ca_cert_nss(ldap, base_dn, dercert, nickname, trust_flags, +def put_ca_cert_nss(ldap, base_dn, cert, nickname, trust_flags, config_ipa=False, config_compat=False): """ Add or update entry for a CA certificate in the certificate store. + + :param cert: IPACertificate """ trusted, ca, ext_key_usage = trust_flags_to_key_policy(trust_flags) if ca is False: raise ValueError("must be CA certificate") - put_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage, + put_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage, config_ipa, config_compat) diff --git a/ipalib/x509.py b/ipalib/x509.py index 5bec5ae59..dc7a2eb3c 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -39,10 +39,15 @@ import ssl import base64 import re +from cryptography import x509 as crypto_x509 +from cryptography import utils as crypto_utils from cryptography.hazmat.backends import default_backend -import cryptography.x509 +from cryptography.hazmat.primitives.serialization import ( + Encoding, PublicFormat +) from pyasn1.type import univ, char, namedtype, tag from pyasn1.codec.der import decoder, encoder +# from pyasn1.codec.native import decoder, encoder from pyasn1_modules import rfc2315, rfc2459 import six @@ -101,26 +106,319 @@ def strip_header(pem): return pem +@crypto_utils.register_interface(crypto_x509.Certificate) +class IPACertificate(object): + """ + A proxy class wrapping a python-cryptography certificate representation for + FreeIPA purposes + """ + def __init__(self, cert, backend=None): + """ + :param cert: A python-cryptography Certificate object + :param backend: A python-cryptography Backend object + """ + self._cert = cert + self.backend = default_backend() if backend is None else backend() + + # initialize the certificate fields + # we have to do it this way so that some systems don't explode since + # some field types encode-decoding is not strongly defined + self._subject = self.__get_der_field('subject') + self._issuer = self.__get_der_field('issuer') + + def __getstate__(self): + state = { + '_cert': self.public_bytes(Encoding.DER), + '_subject': self.subject_bytes, + '_issuer': self.issuer_bytes, + } + return state + + def __setstate__(self, state): + self._subject = state['_subject'] + self._issuer = state['_issuer'] + self._cert = crypto_x509.load_der_x509_certificate( + state['_cert'], backend=default_backend()) + + def __eq__(self, other): + """ + Checks equality. + + :param other: either cryptography.Certificate or IPACertificate or + bytes representing a DER-formatted certificate + """ + if (isinstance(other, (crypto_x509.Certificate, IPACertificate))): + return (self.public_bytes(Encoding.DER) == + other.public_bytes(Encoding.DER)) + elif isinstance(other, bytes): + return self.public_bytes(Encoding.DER) == other + else: + return False + + def __ne__(self, other): + """ + Checks not equal. + """ + return not self.__eq__(other) + + def __hash__(self): + """ + Computes a hash of the wrapped cryptography.Certificate. + """ + return hash(self._cert) + + def __encode_extension(self, oid, critical, value): + # TODO: have another proxy for crypto_x509.Extension which would + # provide public_bytes on the top of what python-cryptography has + ext = rfc2459.Extension() + # TODO: this does not have to be so weird, pyasn1 now has codecs + # which are capable of providing python-native types + ext['extnID'] = univ.ObjectIdentifier(oid) + ext['critical'] = univ.Boolean(critical) + ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value))) + ext = encoder.encode(ext) + return ext + + def __get_pyasn1_field(self, field): + """ + :returns: a field of the certificate in pyasn1 representation + """ + cert_bytes = self.tbs_certificate_bytes + cert = decoder.decode(cert_bytes, rfc2459.TBSCertificate())[0] + field = cert[field] + return field + + def __get_der_field(self, field): + """ + :field: the name of the field of the certificate + :returns: bytes representing the value of a certificate field + """ + return encoder.encode(self.__get_pyasn1_field(field)) + + def public_bytes(self, encoding): + """ + Serializes the certificate to PEM or DER format. + """ + return self._cert.public_bytes(encoding) + + def is_self_signed(self): + """ + :returns: True if this certificate is self-signed, False otherwise + """ + return self._cert.issuer == self._cert.subject + + def fingerprint(self, algorithm): + """ + Counts fingerprint of the wrapped cryptography.Certificate + """ + return self._cert.fingerprint(algorithm) + + @property + def serial_number(self): + return self._cert.serial_number + + @property + def version(self): + return self._cert.version + + @property + def subject(self): + return self._cert.subject + + @property + def subject_bytes(self): + return self._subject + + @property + def signature_hash_algorithm(self): + """ + Returns a HashAlgorithm corresponding to the type of the digest signed + in the certificate. + """ + return self._cert.signature_hash_algorithm + + @property + def signature_algorithm_oid(self): + """ + Returns the ObjectIdentifier of the signature algorithm. + """ + return self._cert.signature_algorithm_oid + + @property + def signature(self): + """ + Returns the signature bytes. + """ + return self._cert.signature + + @property + def issuer(self): + return self._cert.issuer + + @property + def issuer_bytes(self): + return self._issuer + + @property + def not_valid_before(self): + return self._cert.not_valid_before + + @property + def not_valid_after(self): + return self._cert.not_valid_after + + @property + def tbs_certificate_bytes(self): + return self._cert.tbs_certificate_bytes + + @property + def extensions(self): + # TODO: own Extension and Extensions classes proxying + # python-cryptography + return self._cert.extensions + + def public_key(self): + return self._cert.public_key() + + @property + def public_key_info_bytes(self): + return self._cert.public_key().public_bytes( + encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo) + + @property + def extended_key_usage(self): + try: + ext_key_usage = self._cert.extensions.get_extension_for_oid( + crypto_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value + except crypto_x509.ExtensionNotFound: + return None + + return set(oid.dotted_string for oid in ext_key_usage) + + @property + def extended_key_usage_bytes(self): + ekurfc = rfc2459.ExtKeyUsageSyntax() + eku = self.extended_key_usage or {EKU_PLACEHOLDER} + for i, oid in enumerate(eku): + ekurfc[i] = univ.ObjectIdentifier(oid) + ekurfc = encoder.encode(ekurfc) + return self.__encode_extension('2.5.29.37', EKU_ANY not in eku, ekurfc) + + @property + def san_general_names(self): + """ + Return SAN general names from a python-cryptography + certificate object. If the SAN extension is not present, + return an empty sequence. + + Because python-cryptography does not yet provide a way to + handle unrecognised critical extensions (which may occur), + we must parse the certificate and extract the General Names. + For uniformity with other code, we manually construct values + of python-crytography GeneralName subtypes. + + python-cryptography does not yet provide types for + ediPartyName or x400Address, so we drop these name types. + + otherNames are NOT instantiated to more specific types where + the type is known. Use ``process_othernames`` to do that. + + When python-cryptography can handle certs with unrecognised + critical extensions and implements ediPartyName and + x400Address, this function (and helpers) will be redundant + and should go away. + + """ + gns = self.__pyasn1_get_san_general_names() + + GENERAL_NAME_CONSTRUCTORS = { + 'rfc822Name': lambda x: crypto_x509.RFC822Name(unicode(x)), + 'dNSName': lambda x: crypto_x509.DNSName(unicode(x)), + 'directoryName': _pyasn1_to_cryptography_directoryname, + 'registeredID': _pyasn1_to_cryptography_registeredid, + 'iPAddress': _pyasn1_to_cryptography_ipaddress, + 'uniformResourceIdentifier': + lambda x: crypto_x509.UniformResourceIdentifier(unicode(x)), + 'otherName': _pyasn1_to_cryptography_othername, + } + + result = [] + + for gn in gns: + gn_type = gn.getName() + if gn_type in GENERAL_NAME_CONSTRUCTORS: + result.append( + GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent())) + + return result + + def __pyasn1_get_san_general_names(self): + # pyasn1 returns None when the key is not present in the certificate + # but we need an iterable + extensions = self.__get_pyasn1_field('extensions') or [] + OID_SAN = univ.ObjectIdentifier('2.5.29.17') + gns = [] + for ext in extensions: + if ext['extnID'] == OID_SAN: + der = decoder.decode( + ext['extnValue'], asn1Spec=univ.OctetString())[0] + gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0] + break + return gns + + @property + def san_a_label_dns_names(self): + gns = self.__pyasn1_get_san_general_names() + result = [] + + for gn in gns: + if gn.getName() == 'dNSName': + result.append(unicode(gn.getComponent())) + + return result + + def match_hostname(self, hostname): + match_cert = {} + + match_cert['subject'] = match_subject = [] + for rdn in self._cert.subject.rdns: + match_rdn = [] + for ava in rdn: + if ava.oid == crypto_x509.oid.NameOID.COMMON_NAME: + match_rdn.append(('commonName', ava.value)) + match_subject.append(match_rdn) + + values = self.san_a_label_dns_names + if values: + match_cert['subjectAltName'] = match_san = [] + for value in values: + match_san.append(('DNS', value)) + + ssl.match_hostname(match_cert, DNSName(hostname).ToASCII()) + + def load_pem_x509_certificate(data): """ Load an X.509 certificate in PEM format. - :returns: a python-cryptography ``Certificate`` object. + :returns: a ``IPACertificate`` object. :raises: ``ValueError`` if unable to load the certificate. """ - return crypto_x509.load_pem_x509_certificate(data, - backend=default_backend()) + return IPACertificate( + crypto_x509.load_pem_x509_certificate(data, backend=default_backend()) + ) def load_der_x509_certificate(data): """ Load an X.509 certificate in DER format. - :returns: a python-cryptography ``Certificate`` object. + :returns: a ``IPACertificate`` object. :raises: ``ValueError`` if unable to load the certificate. """ - return crypto_x509.load_der_x509_certificate(data, - backend=default_backend()) + return IPACertificate( + crypto_x509.load_der_x509_certificate(data, backend=default_backend()) + ) def load_certificate_from_file(filename, dbdir=None): @@ -138,7 +436,6 @@ def load_certificate_list(data): Load a certificate list from a sequence of concatenated PEMs. Return a list of python-cryptography ``Certificate`` objects. - """ certs = PEM_REGEX.findall(data) return [load_pem_x509_certificate(cert) for cert in certs] @@ -155,11 +452,11 @@ def load_certificate_list_from_file(filename): return load_certificate_list(f.read()) -def pkcs7_to_pems(data, datatype=PEM): +def pkcs7_to_certs(data, datatype=PEM): """ Extract certificates from a PKCS #7 object. - Return a ``list`` of X.509 PEM strings. + :returns: a ``list`` of ``IPACertificate`` objects. """ if datatype == PEM: match = re.match( @@ -187,62 +484,12 @@ def pkcs7_to_pems(data, datatype=PEM): for certificate in signed_data['certificates']: certificate = encoder.encode(certificate) - certificate = base64.b64encode(certificate) - certificate = make_pem(certificate) + certificate = load_der_x509_certificate(certificate) result.append(certificate) return result -def is_self_signed(certificate, datatype=PEM): - cert = load_certificate(certificate, datatype) - return cert.issuer == cert.subject - - -def _get_der_field(cert, datatype, dbdir, field): - cert = normalize_certificate(cert) - cert = decoder.decode(cert, rfc2459.Certificate())[0] - field = cert['tbsCertificate'][field] - field = encoder.encode(field) - return field - -def get_der_subject(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'subject') - -def get_der_issuer(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'issuer') - -def get_der_serial_number(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'serialNumber') - -def get_der_public_key_info(cert, datatype=PEM, dbdir=None): - return _get_der_field(cert, datatype, dbdir, 'subjectPublicKeyInfo') - - -def get_ext_key_usage(certificate, datatype=PEM): - cert = load_certificate(certificate, datatype) - try: - eku = cert.extensions.get_extension_for_oid( - cryptography.x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value - except cryptography.x509.ExtensionNotFound: - return None - - return set(oid.dotted_string for oid in eku) - - -def make_pem(data): - """ - Convert a raw base64-encoded blob into something that looks like a PE - file with lines split to 64 characters and proper headers. - """ - if isinstance(data, bytes): - data = data.decode('ascii') - pemcert = '\r\n'.join([data[x:x+64] for x in range(0, len(data), 64)]) - return '-----BEGIN CERTIFICATE-----\n' + \ - pemcert + \ - '\n-----END CERTIFICATE-----' - - def ensure_der_format(rawcert): """ Incoming certificates should be DER-encoded. If not it is converted to @@ -254,8 +501,6 @@ def ensure_der_format(rawcert): if not rawcert: return None - rawcert = strip_header(rawcert) - try: if isinstance(rawcert, bytes): # base64 must work with utf-8, otherwise it is raw bin certificate @@ -299,58 +544,37 @@ def validate_der_x509_certificate(cert): raise errors.CertificateFormatError(error=str(e)) -def write_certificate(rawcert, filename): +def write_certificate(cert, filename): """ Write the certificate to a file in PEM format. The cert value can be either DER or PEM-encoded, it will be normalized to DER regardless, then back out to PEM. """ - dercert = normalize_certificate(rawcert) try: - fp = open(filename, 'w') - fp.write(make_pem(base64.b64encode(dercert))) - fp.close() + with open(filename, 'wb') as fp: + fp.write(cert.public_bytes(Encoding.PEM)) except (IOError, OSError) as e: raise errors.FileError(reason=str(e)) -def write_certificate_list(rawcerts, filename): + +def write_certificate_list(certs, filename): """ Write a list of certificates to a file in PEM format. - The cert values can be either DER or PEM-encoded, they will be normalized - to DER regardless, then back out to PEM. + :param certs: a list of IPACertificate objects to be written to a file + :param filename: a path to the file the certificates should be written into """ - dercerts = [normalize_certificate(rawcert) for rawcert in rawcerts] try: - with open(filename, 'w') as f: - for cert in dercerts: - cert = base64.b64encode(cert) - cert = make_pem(cert) - f.write(cert + '\n') + with open(filename, 'wb') as f: + for cert in certs: + f.write(cert.public_bytes(Encoding.PEM)) except (IOError, OSError) as e: raise errors.FileError(reason=str(e)) -def _encode_extension(oid, critical, value): - ext = rfc2459.Extension() - ext['extnID'] = univ.ObjectIdentifier(oid) - ext['critical'] = univ.Boolean(critical) - ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value))) - ext = encoder.encode(ext) - return ext - - -def encode_ext_key_usage(ext_key_usage): - eku = rfc2459.ExtKeyUsageSyntax() - for i, oid in enumerate(ext_key_usage): - eku[i] = univ.ObjectIdentifier(oid) - eku = encoder.encode(eku) - return _encode_extension('2.5.29.37', EKU_ANY not in ext_key_usage, eku) - - class _PrincipalName(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('name-type', univ.Integer().subtype( @@ -385,13 +609,13 @@ def _decode_krb5principalname(data): return name -class KRB5PrincipalName(cryptography.x509.general_name.OtherName): +class KRB5PrincipalName(crypto_x509.general_name.OtherName): def __init__(self, type_id, value): super(KRB5PrincipalName, self).__init__(type_id, value) self.name = _decode_krb5principalname(value) -class UPN(cryptography.x509.general_name.OtherName): +class UPN(crypto_x509.general_name.OtherName): def __init__(self, type_id, value): super(UPN, self).__init__(type_id, value) self.name = unicode( @@ -411,128 +635,48 @@ def process_othernames(gns): """ for gn in gns: - if isinstance(gn, cryptography.x509.general_name.OtherName): + if isinstance(gn, crypto_x509.general_name.OtherName): cls = OTHERNAME_CLASS_MAP.get( gn.type_id.dotted_string, - cryptography.x509.general_name.OtherName) + crypto_x509.general_name.OtherName) yield cls(gn.type_id, gn.value) else: yield gn -def _pyasn1_get_san_general_names(cert): - tbs = decoder.decode( - cert.tbs_certificate_bytes, - asn1Spec=rfc2459.TBSCertificate() - )[0] - OID_SAN = univ.ObjectIdentifier('2.5.29.17') - # One would expect KeyError or empty iterable when the key ('extensions' - # in this particular case) is not pressent in the certificate but pyasn1 - # returns None here - extensions = tbs['extensions'] or [] - gns = [] - for ext in extensions: - if ext['extnID'] == OID_SAN: - der = decoder.decode( - ext['extnValue'], asn1Spec=univ.OctetString())[0] - gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0] - break - - return gns - - -def get_san_general_names(cert): - """ - Return SAN general names from a python-cryptography - certificate object. If the SAN extension is not present, - return an empty sequence. - - Because python-cryptography does not yet provide a way to - handle unrecognised critical extensions (which may occur), - we must parse the certificate and extract the General Names. - For uniformity with other code, we manually construct values - of python-crytography GeneralName subtypes. - - python-cryptography does not yet provide types for - ediPartyName or x400Address, so we drop these name types. - - otherNames are NOT instantiated to more specific types where - the type is known. Use ``process_othernames`` to do that. - - When python-cryptography can handle certs with unrecognised - critical extensions and implements ediPartyName and - x400Address, this function (and helpers) will be redundant - and should go away. - - """ - gns = _pyasn1_get_san_general_names(cert) - - GENERAL_NAME_CONSTRUCTORS = { - 'rfc822Name': lambda x: cryptography.x509.RFC822Name(unicode(x)), - 'dNSName': lambda x: cryptography.x509.DNSName(unicode(x)), - 'directoryName': _pyasn1_to_cryptography_directoryname, - 'registeredID': _pyasn1_to_cryptography_registeredid, - 'iPAddress': _pyasn1_to_cryptography_ipaddress, - 'uniformResourceIdentifier': - lambda x: cryptography.x509.UniformResourceIdentifier(unicode(x)), - 'otherName': _pyasn1_to_cryptography_othername, - } - - result = [] - - for gn in gns: - gn_type = gn.getName() - if gn_type in GENERAL_NAME_CONSTRUCTORS: - result.append( - GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent())) - - return result - - def _pyasn1_to_cryptography_directoryname(dn): attrs = [] # Name is CHOICE { RDNSequence } (only one possibility) for rdn in dn.getComponent(): for ava in rdn: - attr = cryptography.x509.NameAttribute( + attr = crypto_x509.NameAttribute( _pyasn1_to_cryptography_oid(ava['type']), unicode(decoder.decode(ava['value'])[0]) ) attrs.append(attr) - return cryptography.x509.DirectoryName(cryptography.x509.Name(attrs)) + return crypto_x509.DirectoryName(crypto_x509.Name(attrs)) def _pyasn1_to_cryptography_registeredid(oid): - return cryptography.x509.RegisteredID(_pyasn1_to_cryptography_oid(oid)) + return crypto_x509.RegisteredID(_pyasn1_to_cryptography_oid(oid)) def _pyasn1_to_cryptography_ipaddress(octet_string): - return cryptography.x509.IPAddress( + return crypto_x509.IPAddress( ipaddress.ip_address(bytes(octet_string))) def _pyasn1_to_cryptography_othername(on): - return cryptography.x509.OtherName( + return crypto_x509.OtherName( _pyasn1_to_cryptography_oid(on['type-id']), bytes(on['value']) ) def _pyasn1_to_cryptography_oid(oid): - return cryptography.x509.ObjectIdentifier(str(oid)) - - -def get_san_a_label_dns_names(cert): - gns = _pyasn1_get_san_general_names(cert) - result = [] - - for gn in gns: - if gn.getName() == 'dNSName': - result.append(unicode(gn.getComponent())) - - return result + return crypto_x509.ObjectIdentifier(str(oid)) def chunk(size, s): @@ -574,23 +718,3 @@ def format_datetime(t): if t.tzinfo is None: t = t.replace(tzinfo=UTC()) return unicode(t.strftime("%a %b %d %H:%M:%S %Y %Z")) - - -def match_hostname(cert, hostname): - match_cert = {} - - match_cert['subject'] = match_subject = [] - for rdn in cert.subject.rdns: - match_rdn = [] - for ava in rdn: - if ava.oid == cryptography.x509.oid.NameOID.COMMON_NAME: - match_rdn.append(('commonName', ava.value)) - match_subject.append(match_rdn) - - values = get_san_a_label_dns_names(cert) - if values: - match_cert['subjectAltName'] = match_san = [] - for value in values: - match_san.append(('DNS', value)) - - ssl.match_hostname(match_cert, DNSName(hostname).ToASCII()) |