summaryrefslogtreecommitdiffstats
path: root/ipalib
diff options
context:
space:
mode:
authorStanislav Laznicka <slaznick@redhat.com>2017-06-16 10:18:07 +0200
committerPavel Vomacka <pvomacka@redhat.com>2017-07-27 10:28:58 +0200
commitb5732efda6d27f588adbc3f41259bc4511716f43 (patch)
tree3bd131be4ccef3c31acb84b8c823df8660c9b266 /ipalib
parent4375ef860fdd8221baeff23e88f9217b0cddc5ac (diff)
downloadfreeipa-b5732efda6d27f588adbc3f41259bc4511716f43.tar.gz
freeipa-b5732efda6d27f588adbc3f41259bc4511716f43.tar.xz
freeipa-b5732efda6d27f588adbc3f41259bc4511716f43.zip
x509: Make certificates represented as objects
https://pagure.io/freeipa/issue/4985 Reviewed-By: Fraser Tweedale <ftweedal@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com> Reviewed-By: Martin Basti <mbasti@redhat.com>
Diffstat (limited to 'ipalib')
-rw-r--r--ipalib/install/certstore.py50
-rw-r--r--ipalib/x509.py532
2 files changed, 357 insertions, 225 deletions
diff --git a/ipalib/install/certstore.py b/ipalib/install/certstore.py
index 0d0902fc1..89302f6d8 100644
--- a/ipalib/install/certstore.py
+++ b/ipalib/install/certstore.py
@@ -28,13 +28,13 @@ from ipapython.dn import DN
from ipapython.certdb import get_ca_nickname, TrustFlags
from ipalib import errors, x509
-def _parse_cert(dercert):
+
+def _parse_cert(cert):
try:
- cert = x509.load_der_x509_certificate(dercert)
subject = DN(cert.subject)
issuer = DN(cert.issuer)
serial_number = cert.serial_number
- public_key_info = x509.get_der_public_key_info(dercert, x509.DER)
+ public_key_info = cert.public_key_info_bytes
except (ValueError, PyAsn1Error) as e:
raise ValueError("failed to decode certificate: %s" % e)
@@ -45,15 +45,15 @@ def _parse_cert(dercert):
return subject, issuer_serial, public_key_info
-def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage):
+def init_ca_entry(entry, cert, nickname, trusted, ext_key_usage):
"""
Initialize certificate store entry for a CA certificate.
"""
- subject, issuer_serial, public_key = _parse_cert(dercert)
+ subject, issuer_serial, public_key = _parse_cert(cert)
if ext_key_usage is not None:
try:
- cert_eku = x509.get_ext_key_usage(dercert, x509.DER)
+ cert_eku = cert.extended_key_usage
except ValueError as e:
raise ValueError("failed to decode certificate: %s" % e)
if cert_eku is not None:
@@ -68,7 +68,7 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage):
entry['ipaCertSubject'] = [subject]
entry['ipaCertIssuerSerial'] = [issuer_serial]
entry['ipaPublicKey'] = [public_key]
- entry['cACertificate;binary'] = [dercert]
+ entry['cACertificate;binary'] = [cert.public_bytes(x509.Encoding.DER)]
if trusted is not None:
entry['ipaKeyTrust'] = ['trusted' if trusted else 'distrusted']
@@ -79,11 +79,12 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage):
entry['ipaKeyExtUsage'] = ext_key_usage
-def update_compat_ca(ldap, base_dn, dercert):
+def update_compat_ca(ldap, base_dn, cert):
"""
Update the CA certificate in cn=CAcert,cn=ipa,cn=etc,SUFFIX.
"""
dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), base_dn)
+ dercert = cert.public_bytes(x509.Encoding.DER)
try:
entry = ldap.get_entry(dn, attrs_list=['cACertificate;binary'])
entry.single_value['cACertificate;binary'] = dercert
@@ -152,12 +153,12 @@ def add_ca_cert(ldap, base_dn, dercert, nickname, trusted=None,
clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
-def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
+def update_ca_cert(ldap, base_dn, cert, trusted=None, ext_key_usage=None,
config_ipa=False, config_compat=False):
"""
Update existing entry for a CA certificate in the certificate store.
"""
- subject, issuer_serial, public_key = _parse_cert(dercert)
+ subject, issuer_serial, public_key = _parse_cert(cert)
filter = ldap.make_filter({'ipaCertSubject': subject})
result, _truncated = ldap.find_entries(
@@ -172,7 +173,7 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
for old_cert in entry['cACertificate;binary']:
# Check if we are adding a new cert
- if old_cert == dercert:
+ if old_cert == cert:
break
else:
# We are adding a new cert, validate it
@@ -181,7 +182,8 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
if entry.single_value['ipaPublicKey'] != public_key:
raise ValueError("subject public key info mismatch")
entry['ipaCertIssuerSerial'].append(issuer_serial)
- entry['cACertificate;binary'].append(dercert)
+ entry['cACertificate;binary'].append(
+ cert.public_bytes(x509.Encoding.DER))
# Update key trust
if trusted is not None:
@@ -217,22 +219,24 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
entry.setdefault('ipaConfigString', []).append('compatCA')
if is_compat or config_compat:
- update_compat_ca(ldap, base_dn, dercert)
+ update_compat_ca(ldap, base_dn, cert)
ldap.update_entry(entry)
clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
-def put_ca_cert(ldap, base_dn, dercert, nickname, trusted=None,
+def put_ca_cert(ldap, base_dn, cert, nickname, trusted=None,
ext_key_usage=None, config_ipa=False, config_compat=False):
"""
Add or update entry for a CA certificate in the certificate store.
+
+ :param cert: IPACertificate
"""
try:
- update_ca_cert(ldap, base_dn, dercert, trusted, ext_key_usage,
+ update_ca_cert(ldap, base_dn, cert, trusted, ext_key_usage,
config_ipa=config_ipa, config_compat=config_compat)
except errors.NotFound:
- add_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage,
+ add_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage,
config_ipa=config_ipa, config_compat=config_compat)
except errors.EmptyModlist:
pass
@@ -306,11 +310,12 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
for cert in entry.get('cACertificate;binary', []):
try:
- _parse_cert(cert)
+ cert_obj = x509.load_der_x509_certificate(cert)
+ _parse_cert(cert_obj)
except ValueError:
certs = []
break
- certs.append((cert, nickname, trusted, ext_key_usage))
+ certs.append((cert_obj, nickname, trusted, ext_key_usage))
except errors.NotFound:
try:
ldap.get_entry(container_dn, [''])
@@ -319,7 +324,8 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
dn = DN(('cn', 'CAcert'), config_dn)
entry = ldap.get_entry(dn, ['cACertificate;binary'])
- cert = entry.single_value['cACertificate;binary']
+ cert = x509.load_der_x509_certificate(
+ entry.single_value['cACertificate;binary'])
try:
subject, _issuer_serial, _public_key_info = _parse_cert(cert)
except ValueError:
@@ -354,16 +360,18 @@ def key_policy_to_trust_flags(trusted, ca, ext_key_usage):
return TrustFlags(False, trusted, ca, ext_key_usage)
-def put_ca_cert_nss(ldap, base_dn, dercert, nickname, trust_flags,
+def put_ca_cert_nss(ldap, base_dn, cert, nickname, trust_flags,
config_ipa=False, config_compat=False):
"""
Add or update entry for a CA certificate in the certificate store.
+
+ :param cert: IPACertificate
"""
trusted, ca, ext_key_usage = trust_flags_to_key_policy(trust_flags)
if ca is False:
raise ValueError("must be CA certificate")
- put_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage,
+ put_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage,
config_ipa, config_compat)
diff --git a/ipalib/x509.py b/ipalib/x509.py
index 5bec5ae59..dc7a2eb3c 100644
--- a/ipalib/x509.py
+++ b/ipalib/x509.py
@@ -39,10 +39,15 @@ import ssl
import base64
import re
+from cryptography import x509 as crypto_x509
+from cryptography import utils as crypto_utils
from cryptography.hazmat.backends import default_backend
-import cryptography.x509
+from cryptography.hazmat.primitives.serialization import (
+ Encoding, PublicFormat
+)
from pyasn1.type import univ, char, namedtype, tag
from pyasn1.codec.der import decoder, encoder
+# from pyasn1.codec.native import decoder, encoder
from pyasn1_modules import rfc2315, rfc2459
import six
@@ -101,26 +106,319 @@ def strip_header(pem):
return pem
+@crypto_utils.register_interface(crypto_x509.Certificate)
+class IPACertificate(object):
+ """
+ A proxy class wrapping a python-cryptography certificate representation for
+ FreeIPA purposes
+ """
+ def __init__(self, cert, backend=None):
+ """
+ :param cert: A python-cryptography Certificate object
+ :param backend: A python-cryptography Backend object
+ """
+ self._cert = cert
+ self.backend = default_backend() if backend is None else backend()
+
+ # initialize the certificate fields
+ # we have to do it this way so that some systems don't explode since
+ # some field types encode-decoding is not strongly defined
+ self._subject = self.__get_der_field('subject')
+ self._issuer = self.__get_der_field('issuer')
+
+ def __getstate__(self):
+ state = {
+ '_cert': self.public_bytes(Encoding.DER),
+ '_subject': self.subject_bytes,
+ '_issuer': self.issuer_bytes,
+ }
+ return state
+
+ def __setstate__(self, state):
+ self._subject = state['_subject']
+ self._issuer = state['_issuer']
+ self._cert = crypto_x509.load_der_x509_certificate(
+ state['_cert'], backend=default_backend())
+
+ def __eq__(self, other):
+ """
+ Checks equality.
+
+ :param other: either cryptography.Certificate or IPACertificate or
+ bytes representing a DER-formatted certificate
+ """
+ if (isinstance(other, (crypto_x509.Certificate, IPACertificate))):
+ return (self.public_bytes(Encoding.DER) ==
+ other.public_bytes(Encoding.DER))
+ elif isinstance(other, bytes):
+ return self.public_bytes(Encoding.DER) == other
+ else:
+ return False
+
+ def __ne__(self, other):
+ """
+ Checks not equal.
+ """
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ """
+ Computes a hash of the wrapped cryptography.Certificate.
+ """
+ return hash(self._cert)
+
+ def __encode_extension(self, oid, critical, value):
+ # TODO: have another proxy for crypto_x509.Extension which would
+ # provide public_bytes on the top of what python-cryptography has
+ ext = rfc2459.Extension()
+ # TODO: this does not have to be so weird, pyasn1 now has codecs
+ # which are capable of providing python-native types
+ ext['extnID'] = univ.ObjectIdentifier(oid)
+ ext['critical'] = univ.Boolean(critical)
+ ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value)))
+ ext = encoder.encode(ext)
+ return ext
+
+ def __get_pyasn1_field(self, field):
+ """
+ :returns: a field of the certificate in pyasn1 representation
+ """
+ cert_bytes = self.tbs_certificate_bytes
+ cert = decoder.decode(cert_bytes, rfc2459.TBSCertificate())[0]
+ field = cert[field]
+ return field
+
+ def __get_der_field(self, field):
+ """
+ :field: the name of the field of the certificate
+ :returns: bytes representing the value of a certificate field
+ """
+ return encoder.encode(self.__get_pyasn1_field(field))
+
+ def public_bytes(self, encoding):
+ """
+ Serializes the certificate to PEM or DER format.
+ """
+ return self._cert.public_bytes(encoding)
+
+ def is_self_signed(self):
+ """
+ :returns: True if this certificate is self-signed, False otherwise
+ """
+ return self._cert.issuer == self._cert.subject
+
+ def fingerprint(self, algorithm):
+ """
+ Counts fingerprint of the wrapped cryptography.Certificate
+ """
+ return self._cert.fingerprint(algorithm)
+
+ @property
+ def serial_number(self):
+ return self._cert.serial_number
+
+ @property
+ def version(self):
+ return self._cert.version
+
+ @property
+ def subject(self):
+ return self._cert.subject
+
+ @property
+ def subject_bytes(self):
+ return self._subject
+
+ @property
+ def signature_hash_algorithm(self):
+ """
+ Returns a HashAlgorithm corresponding to the type of the digest signed
+ in the certificate.
+ """
+ return self._cert.signature_hash_algorithm
+
+ @property
+ def signature_algorithm_oid(self):
+ """
+ Returns the ObjectIdentifier of the signature algorithm.
+ """
+ return self._cert.signature_algorithm_oid
+
+ @property
+ def signature(self):
+ """
+ Returns the signature bytes.
+ """
+ return self._cert.signature
+
+ @property
+ def issuer(self):
+ return self._cert.issuer
+
+ @property
+ def issuer_bytes(self):
+ return self._issuer
+
+ @property
+ def not_valid_before(self):
+ return self._cert.not_valid_before
+
+ @property
+ def not_valid_after(self):
+ return self._cert.not_valid_after
+
+ @property
+ def tbs_certificate_bytes(self):
+ return self._cert.tbs_certificate_bytes
+
+ @property
+ def extensions(self):
+ # TODO: own Extension and Extensions classes proxying
+ # python-cryptography
+ return self._cert.extensions
+
+ def public_key(self):
+ return self._cert.public_key()
+
+ @property
+ def public_key_info_bytes(self):
+ return self._cert.public_key().public_bytes(
+ encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)
+
+ @property
+ def extended_key_usage(self):
+ try:
+ ext_key_usage = self._cert.extensions.get_extension_for_oid(
+ crypto_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
+ except crypto_x509.ExtensionNotFound:
+ return None
+
+ return set(oid.dotted_string for oid in ext_key_usage)
+
+ @property
+ def extended_key_usage_bytes(self):
+ ekurfc = rfc2459.ExtKeyUsageSyntax()
+ eku = self.extended_key_usage or {EKU_PLACEHOLDER}
+ for i, oid in enumerate(eku):
+ ekurfc[i] = univ.ObjectIdentifier(oid)
+ ekurfc = encoder.encode(ekurfc)
+ return self.__encode_extension('2.5.29.37', EKU_ANY not in eku, ekurfc)
+
+ @property
+ def san_general_names(self):
+ """
+ Return SAN general names from a python-cryptography
+ certificate object. If the SAN extension is not present,
+ return an empty sequence.
+
+ Because python-cryptography does not yet provide a way to
+ handle unrecognised critical extensions (which may occur),
+ we must parse the certificate and extract the General Names.
+ For uniformity with other code, we manually construct values
+ of python-crytography GeneralName subtypes.
+
+ python-cryptography does not yet provide types for
+ ediPartyName or x400Address, so we drop these name types.
+
+ otherNames are NOT instantiated to more specific types where
+ the type is known. Use ``process_othernames`` to do that.
+
+ When python-cryptography can handle certs with unrecognised
+ critical extensions and implements ediPartyName and
+ x400Address, this function (and helpers) will be redundant
+ and should go away.
+
+ """
+ gns = self.__pyasn1_get_san_general_names()
+
+ GENERAL_NAME_CONSTRUCTORS = {
+ 'rfc822Name': lambda x: crypto_x509.RFC822Name(unicode(x)),
+ 'dNSName': lambda x: crypto_x509.DNSName(unicode(x)),
+ 'directoryName': _pyasn1_to_cryptography_directoryname,
+ 'registeredID': _pyasn1_to_cryptography_registeredid,
+ 'iPAddress': _pyasn1_to_cryptography_ipaddress,
+ 'uniformResourceIdentifier':
+ lambda x: crypto_x509.UniformResourceIdentifier(unicode(x)),
+ 'otherName': _pyasn1_to_cryptography_othername,
+ }
+
+ result = []
+
+ for gn in gns:
+ gn_type = gn.getName()
+ if gn_type in GENERAL_NAME_CONSTRUCTORS:
+ result.append(
+ GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
+
+ return result
+
+ def __pyasn1_get_san_general_names(self):
+ # pyasn1 returns None when the key is not present in the certificate
+ # but we need an iterable
+ extensions = self.__get_pyasn1_field('extensions') or []
+ OID_SAN = univ.ObjectIdentifier('2.5.29.17')
+ gns = []
+ for ext in extensions:
+ if ext['extnID'] == OID_SAN:
+ der = decoder.decode(
+ ext['extnValue'], asn1Spec=univ.OctetString())[0]
+ gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
+ break
+ return gns
+
+ @property
+ def san_a_label_dns_names(self):
+ gns = self.__pyasn1_get_san_general_names()
+ result = []
+
+ for gn in gns:
+ if gn.getName() == 'dNSName':
+ result.append(unicode(gn.getComponent()))
+
+ return result
+
+ def match_hostname(self, hostname):
+ match_cert = {}
+
+ match_cert['subject'] = match_subject = []
+ for rdn in self._cert.subject.rdns:
+ match_rdn = []
+ for ava in rdn:
+ if ava.oid == crypto_x509.oid.NameOID.COMMON_NAME:
+ match_rdn.append(('commonName', ava.value))
+ match_subject.append(match_rdn)
+
+ values = self.san_a_label_dns_names
+ if values:
+ match_cert['subjectAltName'] = match_san = []
+ for value in values:
+ match_san.append(('DNS', value))
+
+ ssl.match_hostname(match_cert, DNSName(hostname).ToASCII())
+
+
def load_pem_x509_certificate(data):
"""
Load an X.509 certificate in PEM format.
- :returns: a python-cryptography ``Certificate`` object.
+ :returns: a ``IPACertificate`` object.
:raises: ``ValueError`` if unable to load the certificate.
"""
- return crypto_x509.load_pem_x509_certificate(data,
- backend=default_backend())
+ return IPACertificate(
+ crypto_x509.load_pem_x509_certificate(data, backend=default_backend())
+ )
def load_der_x509_certificate(data):
"""
Load an X.509 certificate in DER format.
- :returns: a python-cryptography ``Certificate`` object.
+ :returns: a ``IPACertificate`` object.
:raises: ``ValueError`` if unable to load the certificate.
"""
- return crypto_x509.load_der_x509_certificate(data,
- backend=default_backend())
+ return IPACertificate(
+ crypto_x509.load_der_x509_certificate(data, backend=default_backend())
+ )
def load_certificate_from_file(filename, dbdir=None):
@@ -138,7 +436,6 @@ def load_certificate_list(data):
Load a certificate list from a sequence of concatenated PEMs.
Return a list of python-cryptography ``Certificate`` objects.
-
"""
certs = PEM_REGEX.findall(data)
return [load_pem_x509_certificate(cert) for cert in certs]
@@ -155,11 +452,11 @@ def load_certificate_list_from_file(filename):
return load_certificate_list(f.read())
-def pkcs7_to_pems(data, datatype=PEM):
+def pkcs7_to_certs(data, datatype=PEM):
"""
Extract certificates from a PKCS #7 object.
- Return a ``list`` of X.509 PEM strings.
+ :returns: a ``list`` of ``IPACertificate`` objects.
"""
if datatype == PEM:
match = re.match(
@@ -187,62 +484,12 @@ def pkcs7_to_pems(data, datatype=PEM):
for certificate in signed_data['certificates']:
certificate = encoder.encode(certificate)
- certificate = base64.b64encode(certificate)
- certificate = make_pem(certificate)
+ certificate = load_der_x509_certificate(certificate)
result.append(certificate)
return result
-def is_self_signed(certificate, datatype=PEM):
- cert = load_certificate(certificate, datatype)
- return cert.issuer == cert.subject
-
-
-def _get_der_field(cert, datatype, dbdir, field):
- cert = normalize_certificate(cert)
- cert = decoder.decode(cert, rfc2459.Certificate())[0]
- field = cert['tbsCertificate'][field]
- field = encoder.encode(field)
- return field
-
-def get_der_subject(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'subject')
-
-def get_der_issuer(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'issuer')
-
-def get_der_serial_number(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'serialNumber')
-
-def get_der_public_key_info(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'subjectPublicKeyInfo')
-
-
-def get_ext_key_usage(certificate, datatype=PEM):
- cert = load_certificate(certificate, datatype)
- try:
- eku = cert.extensions.get_extension_for_oid(
- cryptography.x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
- except cryptography.x509.ExtensionNotFound:
- return None
-
- return set(oid.dotted_string for oid in eku)
-
-
-def make_pem(data):
- """
- Convert a raw base64-encoded blob into something that looks like a PE
- file with lines split to 64 characters and proper headers.
- """
- if isinstance(data, bytes):
- data = data.decode('ascii')
- pemcert = '\r\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
- return '-----BEGIN CERTIFICATE-----\n' + \
- pemcert + \
- '\n-----END CERTIFICATE-----'
-
-
def ensure_der_format(rawcert):
"""
Incoming certificates should be DER-encoded. If not it is converted to
@@ -254,8 +501,6 @@ def ensure_der_format(rawcert):
if not rawcert:
return None
- rawcert = strip_header(rawcert)
-
try:
if isinstance(rawcert, bytes):
# base64 must work with utf-8, otherwise it is raw bin certificate
@@ -299,58 +544,37 @@ def validate_der_x509_certificate(cert):
raise errors.CertificateFormatError(error=str(e))
-def write_certificate(rawcert, filename):
+def write_certificate(cert, filename):
"""
Write the certificate to a file in PEM format.
The cert value can be either DER or PEM-encoded, it will be normalized
to DER regardless, then back out to PEM.
"""
- dercert = normalize_certificate(rawcert)
try:
- fp = open(filename, 'w')
- fp.write(make_pem(base64.b64encode(dercert)))
- fp.close()
+ with open(filename, 'wb') as fp:
+ fp.write(cert.public_bytes(Encoding.PEM))
except (IOError, OSError) as e:
raise errors.FileError(reason=str(e))
-def write_certificate_list(rawcerts, filename):
+
+def write_certificate_list(certs, filename):
"""
Write a list of certificates to a file in PEM format.
- The cert values can be either DER or PEM-encoded, they will be normalized
- to DER regardless, then back out to PEM.
+ :param certs: a list of IPACertificate objects to be written to a file
+ :param filename: a path to the file the certificates should be written into
"""
- dercerts = [normalize_certificate(rawcert) for rawcert in rawcerts]
try:
- with open(filename, 'w') as f:
- for cert in dercerts:
- cert = base64.b64encode(cert)
- cert = make_pem(cert)
- f.write(cert + '\n')
+ with open(filename, 'wb') as f:
+ for cert in certs:
+ f.write(cert.public_bytes(Encoding.PEM))
except (IOError, OSError) as e:
raise errors.FileError(reason=str(e))
-def _encode_extension(oid, critical, value):
- ext = rfc2459.Extension()
- ext['extnID'] = univ.ObjectIdentifier(oid)
- ext['critical'] = univ.Boolean(critical)
- ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value)))
- ext = encoder.encode(ext)
- return ext
-
-
-def encode_ext_key_usage(ext_key_usage):
- eku = rfc2459.ExtKeyUsageSyntax()
- for i, oid in enumerate(ext_key_usage):
- eku[i] = univ.ObjectIdentifier(oid)
- eku = encoder.encode(eku)
- return _encode_extension('2.5.29.37', EKU_ANY not in ext_key_usage, eku)
-
-
class _PrincipalName(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('name-type', univ.Integer().subtype(
@@ -385,13 +609,13 @@ def _decode_krb5principalname(data):
return name
-class KRB5PrincipalName(cryptography.x509.general_name.OtherName):
+class KRB5PrincipalName(crypto_x509.general_name.OtherName):
def __init__(self, type_id, value):
super(KRB5PrincipalName, self).__init__(type_id, value)
self.name = _decode_krb5principalname(value)
-class UPN(cryptography.x509.general_name.OtherName):
+class UPN(crypto_x509.general_name.OtherName):
def __init__(self, type_id, value):
super(UPN, self).__init__(type_id, value)
self.name = unicode(
@@ -411,128 +635,48 @@ def process_othernames(gns):
"""
for gn in gns:
- if isinstance(gn, cryptography.x509.general_name.OtherName):
+ if isinstance(gn, crypto_x509.general_name.OtherName):
cls = OTHERNAME_CLASS_MAP.get(
gn.type_id.dotted_string,
- cryptography.x509.general_name.OtherName)
+ crypto_x509.general_name.OtherName)
yield cls(gn.type_id, gn.value)
else:
yield gn
-def _pyasn1_get_san_general_names(cert):
- tbs = decoder.decode(
- cert.tbs_certificate_bytes,
- asn1Spec=rfc2459.TBSCertificate()
- )[0]
- OID_SAN = univ.ObjectIdentifier('2.5.29.17')
- # One would expect KeyError or empty iterable when the key ('extensions'
- # in this particular case) is not pressent in the certificate but pyasn1
- # returns None here
- extensions = tbs['extensions'] or []
- gns = []
- for ext in extensions:
- if ext['extnID'] == OID_SAN:
- der = decoder.decode(
- ext['extnValue'], asn1Spec=univ.OctetString())[0]
- gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
- break
-
- return gns
-
-
-def get_san_general_names(cert):
- """
- Return SAN general names from a python-cryptography
- certificate object. If the SAN extension is not present,
- return an empty sequence.
-
- Because python-cryptography does not yet provide a way to
- handle unrecognised critical extensions (which may occur),
- we must parse the certificate and extract the General Names.
- For uniformity with other code, we manually construct values
- of python-crytography GeneralName subtypes.
-
- python-cryptography does not yet provide types for
- ediPartyName or x400Address, so we drop these name types.
-
- otherNames are NOT instantiated to more specific types where
- the type is known. Use ``process_othernames`` to do that.
-
- When python-cryptography can handle certs with unrecognised
- critical extensions and implements ediPartyName and
- x400Address, this function (and helpers) will be redundant
- and should go away.
-
- """
- gns = _pyasn1_get_san_general_names(cert)
-
- GENERAL_NAME_CONSTRUCTORS = {
- 'rfc822Name': lambda x: cryptography.x509.RFC822Name(unicode(x)),
- 'dNSName': lambda x: cryptography.x509.DNSName(unicode(x)),
- 'directoryName': _pyasn1_to_cryptography_directoryname,
- 'registeredID': _pyasn1_to_cryptography_registeredid,
- 'iPAddress': _pyasn1_to_cryptography_ipaddress,
- 'uniformResourceIdentifier':
- lambda x: cryptography.x509.UniformResourceIdentifier(unicode(x)),
- 'otherName': _pyasn1_to_cryptography_othername,
- }
-
- result = []
-
- for gn in gns:
- gn_type = gn.getName()
- if gn_type in GENERAL_NAME_CONSTRUCTORS:
- result.append(
- GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
-
- return result
-
-
def _pyasn1_to_cryptography_directoryname(dn):
attrs = []
# Name is CHOICE { RDNSequence } (only one possibility)
for rdn in dn.getComponent():
for ava in rdn:
- attr = cryptography.x509.NameAttribute(
+ attr = crypto_x509.NameAttribute(
_pyasn1_to_cryptography_oid(ava['type']),
unicode(decoder.decode(ava['value'])[0])
)
attrs.append(attr)
- return cryptography.x509.DirectoryName(cryptography.x509.Name(attrs))
+ return crypto_x509.DirectoryName(crypto_x509.Name(attrs))
def _pyasn1_to_cryptography_registeredid(oid):
- return cryptography.x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
+ return crypto_x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
def _pyasn1_to_cryptography_ipaddress(octet_string):
- return cryptography.x509.IPAddress(
+ return crypto_x509.IPAddress(
ipaddress.ip_address(bytes(octet_string)))
def _pyasn1_to_cryptography_othername(on):
- return cryptography.x509.OtherName(
+ return crypto_x509.OtherName(
_pyasn1_to_cryptography_oid(on['type-id']),
bytes(on['value'])
)
def _pyasn1_to_cryptography_oid(oid):
- return cryptography.x509.ObjectIdentifier(str(oid))
-
-
-def get_san_a_label_dns_names(cert):
- gns = _pyasn1_get_san_general_names(cert)
- result = []
-
- for gn in gns:
- if gn.getName() == 'dNSName':
- result.append(unicode(gn.getComponent()))
-
- return result
+ return crypto_x509.ObjectIdentifier(str(oid))
def chunk(size, s):
@@ -574,23 +718,3 @@ def format_datetime(t):
if t.tzinfo is None:
t = t.replace(tzinfo=UTC())
return unicode(t.strftime("%a %b %d %H:%M:%S %Y %Z"))
-
-
-def match_hostname(cert, hostname):
- match_cert = {}
-
- match_cert['subject'] = match_subject = []
- for rdn in cert.subject.rdns:
- match_rdn = []
- for ava in rdn:
- if ava.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
- match_rdn.append(('commonName', ava.value))
- match_subject.append(match_rdn)
-
- values = get_san_a_label_dns_names(cert)
- if values:
- match_cert['subjectAltName'] = match_san = []
- for value in values:
- match_san.append(('DNS', value))
-
- ssl.match_hostname(match_cert, DNSName(hostname).ToASCII())