From d6fb110b77e2c585f0bfc5eb11b0187a43263fa1 Mon Sep 17 00:00:00 2001 From: Jan Cholasta Date: Wed, 18 Jun 2014 09:02:03 +0200 Subject: Support requests with SAN in cert-request. For each SAN in a request there must be a matching service entry writable by the requestor. Users can request certificates with SAN only if they have "Request Certificate With SubjectAltName" permission. https://fedorahosted.org/freeipa/ticket/3977 Reviewed-By: Martin Kosek --- freeipa.spec.in | 2 +- install/updates/40-delegation.update | 15 +++++ ipalib/pkcs10.py | 116 ++++++++++++++++++++++++++++++++--- ipalib/plugins/cert.py | 103 +++++++++++++++++++++---------- 4 files changed, 193 insertions(+), 43 deletions(-) diff --git a/freeipa.spec.in b/freeipa.spec.in index b719b4a21..0d2221d09 100644 --- a/freeipa.spec.in +++ b/freeipa.spec.in @@ -299,7 +299,7 @@ Requires: gnupg Requires: iproute Requires: keyutils Requires: pyOpenSSL -Requires: python-nss +Requires: python-nss >= 0.15 Requires: python-lxml Requires: python-netaddr Requires: libipa_hbac-python diff --git a/install/updates/40-delegation.update b/install/updates/40-delegation.update index 7d65e9e19..8c711612c 100644 --- a/install/updates/40-delegation.update +++ b/install/updates/40-delegation.update @@ -308,6 +308,21 @@ default:objectClass: top default:objectClass: nsContainer default:cn: certificate remove hold +dn: cn=request certificate with subjectaltname,cn=virtual operations,cn=etc,$SUFFIX +default:objectClass: top +default:objectClass: nsContainer +default:cn: request certificate with subjectaltname + +dn: cn=Request Certificate with SubjectAltName,cn=permissions,cn=pbac,$SUFFIX +default:objectClass: top +default:objectClass: groupofnames +default:objectClass: ipapermission +default:cn: Request Certificate with SubjectAltName +default:member: cn=Certificate Administrators,cn=privileges,cn=pbac,$SUFFIX + +dn: $SUFFIX +add:aci:'(targetattr = "objectclass")(target = "ldap:///cn=request certificate with subjectaltname,cn=virtual operations,cn=etc,$SUFFIX" )(version 3.0; acl "permission:Request Certificate with SubjectAltName"; allow (write) groupdn = "ldap:///cn=Request Certificate with SubjectAltName,cn=permissions,cn=pbac,$SUFFIX";)' + # Read privileges dn: cn=RBAC Readers,cn=privileges,cn=pbac,$SUFFIX diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py index 5958d5a21..f35e200a2 100644 --- a/ipalib/pkcs10.py +++ b/ipalib/pkcs10.py @@ -21,7 +21,7 @@ import os import sys import base64 import nss.nss as nss -from pyasn1.type import univ, namedtype, tag +from pyasn1.type import univ, char, namedtype, tag from pyasn1.codec.der import decoder from ipapython import ipautil from ipalib import api @@ -29,6 +29,10 @@ from ipalib import api PEM = 0 DER = 1 +SAN_DNSNAME = 'DNS name' +SAN_OTHERNAME_UPN = 'Other Name (OID.1.3.6.1.4.1.311.20.2.3)' +SAN_OTHERNAME_KRB5PRINCIPALNAME = 'Other Name (OID.1.3.6.1.5.2.2)' + def get_subject(csr, datatype=PEM): """ Given a CSR return the subject value. @@ -41,6 +45,89 @@ def get_subject(csr, datatype=PEM): finally: del request +def get_extensions(csr, datatype=PEM): + """ + Given a CSR return OIDs of certificate extensions. + + The return value is a tuple of strings + """ + request = load_certificate_request(csr, datatype) + return tuple(nss.oid_dotted_decimal(ext.oid_tag)[4:] + for ext in request.extensions) + +class _PrincipalName(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('name-type', univ.Integer().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)) + ), + namedtype.NamedType('name-string', univ.SequenceOf(char.GeneralString()).subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)) + ), + ) + +class _KRB5PrincipalName(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('realm', char.GeneralString().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)) + ), + namedtype.NamedType('principalName', _PrincipalName().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)) + ), + ) + +def _decode_krb5principalname(data): + principal = decoder.decode(data, asn1Spec=_KRB5PrincipalName())[0] + realm = (str(principal['realm']).replace('\\', '\\\\') + .replace('@', '\\@')) + name = principal['principalName']['name-string'] + name = '/'.join(str(n).replace('\\', '\\\\') + .replace('/', '\\/') + .replace('@', '\\@') for n in name) + name = '%s@%s' % (name, realm) + return name + +class _AnotherName(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type-id', univ.ObjectIdentifier()), + namedtype.NamedType('value', univ.Any().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)) + ), + ) + +class _GeneralName(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('otherName', _AnotherName().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)) + ), + namedtype.NamedType('rfc822Name', char.IA5String().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)) + ), + namedtype.NamedType('dNSName', char.IA5String().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)) + ), + namedtype.NamedType('x400Address', univ.Sequence().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3)) + ), + namedtype.NamedType('directoryName', univ.Choice().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4)) + ), + namedtype.NamedType('ediPartyName', univ.Sequence().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 5)) + ), + namedtype.NamedType('uniformResourceIdentifier', char.IA5String().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6)) + ), + namedtype.NamedType('iPAddress', univ.OctetString().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 7)) + ), + namedtype.NamedType('registeredID', univ.ObjectIdentifier().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 8)) + ), + ) + +class _SubjectAltName(univ.SequenceOf): + componentType = _GeneralName() + def get_subjectaltname(csr, datatype=PEM): """ Given a CSR return the subjectaltname value, if any. @@ -48,13 +135,26 @@ def get_subjectaltname(csr, datatype=PEM): The return value is a tuple of strings or None """ request = load_certificate_request(csr, datatype) - try: - for extension in request.extensions: - if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: - return nss.x509_alt_name(extension.value) - finally: - del request - return None + for extension in request.extensions: + if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: + break + else: + return None + del request + + nss_names = nss.x509_alt_name(extension.value, nss.AsObject) + asn1_names = decoder.decode(extension.value.data, + asn1Spec=_SubjectAltName())[0] + names = [] + for nss_name, asn1_name in zip(nss_names, asn1_names): + name_type = nss_name.type_string + if name_type == SAN_OTHERNAME_KRB5PRINCIPALNAME: + name = _decode_krb5principalname(asn1_name['otherName']['value']) + else: + name = nss_name.name + names.append((name_type, name)) + + return tuple(names) # Unfortunately, NSS can only parse the extension request attribute, so # we have to parse friendly name ourselves (see RFC 2986) diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index b1fa32b7c..e4918a480 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -42,6 +42,7 @@ from ipalib import output from ipalib.plugins.service import validate_principal import nss.nss as nss from nss.error import NSPRError +from pyasn1.error import PyAsn1Error __doc__ = _(""" IPA certificate operations @@ -136,17 +137,6 @@ def validate_pkidate(ugettext, value): return None -def get_csr_hostname(csr): - """ - Return the value of CN in the subject of the request or None - """ - try: - subject = pkcs10.get_subject(csr) - return subject.common_name #pylint: disable=E1101 - except NSPRError, nsprerr: - raise errors.CertificateOperationError( - error=_('Failure decoding Certificate Signing Request: %s') % nsprerr) - def validate_csr(ugettext, csr): """ Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 @@ -290,6 +280,14 @@ class cert_request(VirtualCommand): ), ) + _allowed_extensions = { + '2.5.29.14': None, # Subject Key Identifier + '2.5.29.15': None, # Key Usage + '2.5.29.17': 'request certificate with subjectaltname', + '2.5.29.19': None, # Basic Constraints + '2.5.29.37': None, # Extended Key Usage + } + def execute(self, csr, **kw): ldap = self.api.Backend.ldap2 principal = kw.get('principal') @@ -313,10 +311,22 @@ class cert_request(VirtualCommand): if not bind_principal.startswith('host/'): self.check_access() - # FIXME: add support for subject alt name + try: + subject = pkcs10.get_subject(csr) + extensions = pkcs10.get_extensions(csr) + subjectaltname = pkcs10.get_subjectaltname(csr) or () + except (NSPRError, PyAsn1Error), e: + raise errors.CertificateOperationError( + error=_("Failure decoding Certificate Signing Request: %s") % e) + + if not bind_principal.startswith('host/'): + for ext in extensions: + operation = self._allowed_extensions.get(ext) + if operation: + self.check_access(operation) # Ensure that the hostname in the CSR matches the principal - subject_host = get_csr_hostname(csr) + subject_host = subject.common_name #pylint: disable=E1101 if not subject_host: raise errors.ValidationError(name='csr', error=_("No hostname was found in subject of request.")) @@ -328,28 +338,40 @@ class cert_request(VirtualCommand): "does not match principal hostname '%(hostname)s'") % dict( subject_host=subject_host, hostname=hostname)) + for ext in extensions: + if ext not in self._allowed_extensions: + raise errors.ValidationError( + name='csr', error=_("extension %s is forbidden") % ext) + + for name_type, name in subjectaltname: + if name_type not in (pkcs10.SAN_DNSNAME, + pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME, + pkcs10.SAN_OTHERNAME_UPN): + raise errors.ValidationError( + name='csr', + error=_("subject alt name type %s is forbidden") % + name_type) + dn = None service = None # See if the service exists and punt if it doesn't and we aren't # going to add it try: - if not principal.startswith('host/'): - service = api.Command['service_show'](principal, all=True)['result'] - dn = service['dn'] + if servicename != 'host': + service = api.Command['service_show'](principal, all=True) else: - hostname = get_host_from_principal(principal) - service = api.Command['host_show'](hostname, all=True)['result'] - dn = service['dn'] + service = api.Command['host_show'](hostname, all=True) except errors.NotFound, e: if not add: raise errors.NotFound(reason=_("The service principal for " "this request doesn't exist.")) try: - service = api.Command['service_add'](principal, **{'force': True})['result'] - dn = service['dn'] + service = api.Command['service_add'](principal, force=True) except errors.ACIError: raise errors.ACIError(info=_('You need to be a member of ' 'the serviceadmin role to add services')) + service = service['result'] + dn = service['dn'] # We got this far so the service entry exists, can we write it? if not ldap.can_write(dn, "usercertificate"): @@ -357,25 +379,38 @@ class cert_request(VirtualCommand): "to the 'userCertificate' attribute of entry '%s'.") % dn) # Validate the subject alt name, if any - subjectaltname = pkcs10.get_subjectaltname(csr) - if subjectaltname is not None: - for name in subjectaltname: + for name_type, name in subjectaltname: + if name_type == pkcs10.SAN_DNSNAME: name = unicode(name) try: - hostentry = api.Command['host_show'](name, all=True)['result'] - hostdn = hostentry['dn'] + if servicename == 'host': + altservice = api.Command['host_show'](name, all=True) + else: + altprincipal = '%s/%s@%s' % (servicename, name, realm) + altservice = api.Command['service_show']( + altprincipal, all=True) except errors.NotFound: # We don't want to issue any certificates referencing # machines we don't know about. Nothing is stored in this # host record related to this certificate. - raise errors.NotFound(reason=_('no host record for ' - 'subject alt name %s in certificate request') % name) - authprincipal = getattr(context, 'principal') - if authprincipal.startswith("host/"): - if not hostdn in service.get('managedby_host', []): - raise errors.ACIError(info=_( - "Insufficient privilege to create a certificate " - "with subject alt name '%s'.") % name) + raise errors.NotFound(reason=_('The service principal for ' + 'subject alt name %s in certificate request does not ' + 'exist') % name) + altdn = altservice['result']['dn'] + if not ldap.can_write(altdn, "usercertificate"): + raise errors.ACIError(info=_( + "Insufficient privilege to create a certificate with " + "subject alt name '%s'.") % name) + elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME, + pkcs10.SAN_OTHERNAME_UPN): + if name != principal: + raise errors.ACIError( + info=_("Principal '%s' in subject alt name does not " + "match requested service principal") % name) + else: + raise errors.ACIError( + info=_("Subject alt name type %s is forbidden") % + name_type) if 'usercertificate' in service: serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER) -- cgit