diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-11-24 16:07:44 -0500 |
---|---|---|
committer | Jason Gerard DeRose <jderose@redhat.com> | 2009-11-30 18:10:09 -0700 |
commit | ab1667f3c1607a22c6df49ceba58274347bc5826 (patch) | |
tree | bc2e6102d3d9cd103d2418ad5372e164e0e7533d /ipalib/plugins/cert.py | |
parent | 7c2c2d6130648fb6dd7c0e52d802cc6eff39ef95 (diff) | |
download | freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.gz freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.xz freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.zip |
Use pyasn1-based PKCS#10 and X509v3 parsers instead of pyOpenSSL.
The pyOpenSSL PKCS#10 parser doesn't support attributes so we can't identify
requests with subject alt names.
Subject alt names are only allowed if:
- the host for the alt name exists in IPA
- if binding as host principal, the host is in the services managedBy attr
Diffstat (limited to 'ipalib/plugins/cert.py')
-rw-r--r-- | ipalib/plugins/cert.py | 90 |
1 files changed, 69 insertions, 21 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 48ceaa6d7..ba088dd96 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -28,12 +28,16 @@ if api.env.enable_ra is not True: raise SkipPluginModule(reason='env.enable_ra is not True') from ipalib import Command, Str, Int, Bytes, Flag, File from ipalib import errors +from ipalib import pkcs10 +from ipalib import x509 from ipalib.plugins.virtual import * from ipalib.plugins.service import split_principal import base64 -from OpenSSL import crypto from ipalib.request import context from ipapython import dnsclient +from pyasn1.error import PyAsn1Error +import logging +import traceback def get_serial(certificate): """ @@ -45,9 +49,8 @@ def get_serial(certificate): if type(certificate) in (list, tuple): certificate = certificate[0] try: - x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate) - serial = str(x509.get_serial_number()) - except crypto.Error: + serial = str(x509.get_serial_number(certificate)) + except PyAsn1Error: raise errors.GenericError(format='Unable to decode certificate in entry') return serial @@ -57,25 +60,49 @@ def get_csr_hostname(csr): Return the value of CN in the subject of the request """ try: - der = base64.b64decode(csr) - request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der) + request = pkcs10.load_certificate_request(csr) sub = request.get_subject().get_components() for s in sub: if s[0].lower() == "cn": return s[1] - except crypto.Error, e: - raise errors.GenericError(format='Unable to decode CSR: %s' % str(e)) + except PyAsn1Error: + # The ASN.1 decoding errors tend to be long and involved and the + # last bit is generally not interesting. We need the whole traceback. + logging.error('Unable to decode CSR\n%s', traceback.format_exc()) + raise errors.GenericError(format='Failure decoding Certificate Signing Request') return None +def get_subjectaltname(csr): + """ + Return the value of the subject alt name, if any + """ + try: + request = pkcs10.load_certificate_request(csr) + except PyAsn1Error: + # The ASN.1 decoding errors tend to be long and involved and the + # last bit is generally not interesting. We need the whole traceback. + logging.error('Unable to decode CSR\n%s', traceback.format_exc()) + raise errors.GenericError(format='Failure decoding Certificate Signing Request') + return request.get_subjectaltname() + def validate_csr(ugettext, csr): """ - For now just verify that it is properly base64-encoded. + Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 + parser. """ try: - base64.b64decode(csr) - except Exception, e: + request = pkcs10.load_certificate_request(csr) + + # Explicitly request the attributes. This fires off additional + # decoding to get things like the subjectAltName. + attrs = request.get_attributes() + except TypeError, e: raise errors.Base64DecodeError(reason=str(e)) + except PyAsn1Error: + raise errors.GenericError(format='Failure decoding Certificate Signing Request') + except Exception, e: + raise errors.GenericError(format='Failure decoding Certificate Signing Request: %s' % str(e)) class cert_request(VirtualCommand): @@ -107,38 +134,43 @@ class cert_request(VirtualCommand): def execute(self, csr, **kw): ldap = self.api.Backend.ldap2 - skw = {"all": True} principal = kw.get('principal') add = kw.get('add') del kw['principal'] del kw['add'] service = None - # We just want the CSR bits, make sure there is nothing else - s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----") - e = csr.find("-----END NEW CERTIFICATE REQUEST-----") - if s >= 0: - csr = csr[s+40:e] + """ + Access control is partially handled by the ACI titled + 'Hosts can modify service userCertificate'. This is for the case + where a machine binds using a host/ prinicpal. It can only do the + request if the target hostname is in the managedBy attribute which + is managed using the add/del member commands. + + Binding with a user principal one needs to be in the request_certs + taskgroup (directly or indirectly via role membership). + """ # Can this user request certs? self.check_access() # FIXME: add support for subject alt name - # Is this cert for this principal? - subject_host = get_csr_hostname(csr) # Ensure that the hostname in the CSR matches the principal + subject_host = get_csr_hostname(csr) (servicename, hostname, realm) = split_principal(principal) if subject_host.lower() != hostname.lower(): raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname)) + dn = None + service = None # See if the service exists and punt if it doesn't and we aren't # going to add it try: - (dn, service) = api.Command['service_show'](principal, **skw) + (dn, service) = api.Command['service_show'](principal, all=True, raw=True) if 'usercertificate' in service: # FIXME, what to do here? Do we revoke the old cert? - raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate'])) + raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(base64.b64encode(service['usercertificate'][0]))) except errors.NotFound, e: if not add: raise errors.NotFound(reason="The service principal for this request doesn't exist.") @@ -151,6 +183,22 @@ class cert_request(VirtualCommand): if not ldap.can_write(dn, "usercertificate"): raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn) + # Validate the subject alt name, if any + subjectaltname = get_subjectaltname(csr) + if subjectaltname is not None: + for name in subjectaltname: + try: + (hostdn, hostentry) = api.Command['host_show'](name, all=True, raw=True) + except errors.NotFound: + # We don't want to issue any certificates referencing + # machines we don't know about. Nothing is stored in this + # host record related to this certificate. + raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name) + authprincipal = getattr(context, 'principal') + if authprincipal.startswith("host/"): + if not hostdn in service.get('managedby', []): + raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name) + # Request the certificate result = self.Backend.ra.request_certificate(csr, **kw) |