diff options
author | Fraser Tweedale <ftweedal@redhat.com> | 2015-05-14 01:45:16 -0400 |
---|---|---|
committer | Jan Cholasta <jcholast@redhat.com> | 2015-06-04 08:27:33 +0000 |
commit | a931d3edc00f7578223df2afeebdf2da3dd85a68 (patch) | |
tree | 915b0121e8f9882e6b9666b16a29db8331598f6b /ipalib | |
parent | 979947f7f21749b45176c39f66060564e19466e3 (diff) | |
download | freeipa-a931d3edc00f7578223df2afeebdf2da3dd85a68.tar.gz freeipa-a931d3edc00f7578223df2afeebdf2da3dd85a68.tar.xz freeipa-a931d3edc00f7578223df2afeebdf2da3dd85a68.zip |
Update cert-request to support user certs and profiles
Part of: https://fedorahosted.org/freeipa/ticket/57
Part of: https://fedorahosted.org/freeipa/ticket/4938
Reviewed-By: Martin Basti <mbasti@redhat.com>
Diffstat (limited to 'ipalib')
-rw-r--r-- | ipalib/pkcs10.py | 1 | ||||
-rw-r--r-- | ipalib/plugins/cert.py | 220 |
2 files changed, 133 insertions, 88 deletions
diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py index f35e200a2..6299dfea4 100644 --- a/ipalib/pkcs10.py +++ b/ipalib/pkcs10.py @@ -30,6 +30,7 @@ PEM = 0 DER = 1 SAN_DNSNAME = 'DNS name' +SAN_RFC822NAME = 'RFC822 Name' SAN_OTHERNAME_UPN = 'Other Name (OID.1.3.6.1.4.1.311.20.2.3)' SAN_OTHERNAME_KRB5PRINCIPALNAME = 'Other Name (OID.1.3.6.1.5.2.2)' diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index e4cb6dc0a..d12290017 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -31,7 +31,8 @@ from ipalib import ngettext from ipalib.plugable import Registry from ipalib.plugins.virtual import * from ipalib.plugins.baseldap import pkey_to_value -from ipalib.plugins.service import split_principal +from ipalib.plugins.service import split_any_principal +from ipalib.plugins.certprofile import validate_profile_id import base64 import traceback from ipalib.text import _ @@ -122,6 +123,8 @@ http://www.ietf.org/rfc/rfc5280.txt """) +USER, HOST, SERVICE = range(3) + register = Registry() def validate_pkidate(ugettext, value): @@ -232,7 +235,7 @@ class cert_request(VirtualCommand): takes_options = ( Str('principal', label=_('Principal'), - doc=_('Service principal for this certificate (e.g. HTTP/test.example.com)'), + doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'), ), Str('request_type', default=u'pkcs10', @@ -243,6 +246,10 @@ class cert_request(VirtualCommand): default=False, autofill=True ), + Str('profile_id?', validate_profile_id, + label=_("Profile ID"), + doc=_("Certificate Profile to use"), + ) ) has_output_params = ( @@ -294,10 +301,9 @@ class cert_request(VirtualCommand): ca_enabled_check() ldap = self.api.Backend.ldap2 - principal = kw.get('principal') add = kw.get('add') request_type = kw.get('request_type') - service = None + profile_id = kw.get('profile_id', self.Backend.ra.DEFAULT_PROFILE) """ Access control is partially handled by the ACI titled @@ -310,9 +316,28 @@ class cert_request(VirtualCommand): taskgroup (directly or indirectly via role membership). """ - bind_principal = getattr(context, 'principal') - # Can this user request certs? - if not bind_principal.startswith('host/'): + principal_string = kw.get('principal') + principal = split_any_principal(principal_string) + servicename, principal_name, realm = principal + if servicename is None: + principal_type = USER + elif servicename == 'host': + principal_type = HOST + else: + principal_type = SERVICE + + bind_principal = split_any_principal(getattr(context, 'principal')) + bind_service, bind_name, bind_realm = bind_principal + + if bind_service is None: + bind_principal_type = USER + elif bind_service == 'host': + bind_principal_type = HOST + else: + bind_principal_type = SERVICE + + if bind_principal != principal and bind_principal_type != HOST: + # Can the bound principal request certs for another principal? self.check_access() try: @@ -323,57 +348,71 @@ class cert_request(VirtualCommand): raise errors.CertificateOperationError( error=_("Failure decoding Certificate Signing Request: %s") % e) - if not bind_principal.startswith('host/'): + # host principals may bypass allowed ext check + if bind_principal_type != HOST: for ext in extensions: operation = self._allowed_extensions.get(ext) if operation: self.check_access(operation) - # Ensure that the hostname in the CSR matches the principal - subject_host = subject.common_name #pylint: disable=E1101 - if not subject_host: + dn = None + principal_obj = None + # See if the service exists and punt if it doesn't and we aren't + # going to add it + try: + if principal_type == SERVICE: + principal_obj = api.Command['service_show'](principal_string, all=True) + elif principal_type == HOST: + principal_obj = api.Command['host_show'](principal_name, all=True) + elif principal_type == USER: + principal_obj = api.Command['user_show'](principal_name, all=True) + except errors.NotFound as e: + if principal_type == SERVICE and add: + principal_obj = api.Command['service_add'](principal_string, force=True) + else: + raise errors.NotFound( + reason=_("The principal for this request doesn't exist.")) + principal_obj = principal_obj['result'] + dn = principal_obj['dn'] + + # Ensure that the DN in the CSR matches the principal + cn = subject.common_name #pylint: disable=E1101 + if not cn: raise errors.ValidationError(name='csr', - error=_("No hostname was found in subject of request.")) + error=_("No Common Name was found in subject of request.")) - (servicename, hostname, realm) = split_principal(principal) - if subject_host.lower() != hostname.lower(): - raise errors.ACIError( - info=_("hostname in subject of request '%(subject_host)s' " - "does not match principal hostname '%(hostname)s'") % dict( - subject_host=subject_host, hostname=hostname)) + if principal_type in (SERVICE, HOST): + if cn.lower() != principal_name.lower(): + raise errors.ACIError( + info=_("hostname in subject of request '%(cn)s' " + "does not match principal hostname '%(hostname)s'") + % dict(cn=cn, hostname=principal_name)) + elif principal_type == USER: + # check user name + if cn != principal_name: + raise errors.ValidationError( + name='csr', + error=_( + "DN commonName does not match " + "any of user's email addresses") + ) + + # check email address + mail = subject.email_address #pylint: disable=E1101 + if mail is not None and mail not in principal_obj.get('mail', []): + raise errors.ValidationError( + name='csr', + error=_( + "DN emailAddress does not match " + "any of user's email addresses") + ) for ext in extensions: if ext not in self._allowed_extensions: raise errors.ValidationError( name='csr', error=_("extension %s is forbidden") % ext) - for name_type, name in subjectaltname: - if name_type not in (pkcs10.SAN_DNSNAME, - pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME, - pkcs10.SAN_OTHERNAME_UPN): - raise errors.ValidationError( - name='csr', - error=_("subject alt name type %s is forbidden") % - name_type) - - dn = None - service = None - # See if the service exists and punt if it doesn't and we aren't - # going to add it - try: - if servicename != 'host': - service = api.Command['service_show'](principal, all=True) - else: - service = api.Command['host_show'](hostname, all=True) - except errors.NotFound, e: - if not add: - raise errors.NotFound(reason=_("The service principal for " - "this request doesn't exist.")) - service = api.Command['service_add'](principal, force=True) - service = service['result'] - dn = service['dn'] - - # We got this far so the service entry exists, can we write it? + # We got this far so the principal entry exists, can we write it? if not ldap.can_write(dn, "usercertificate"): raise errors.ACIError(info=_("Insufficient 'write' privilege " "to the 'userCertificate' attribute of entry '%s'.") % dn) @@ -382,13 +421,20 @@ class cert_request(VirtualCommand): for name_type, name in subjectaltname: if name_type == pkcs10.SAN_DNSNAME: name = unicode(name) + alt_principal_obj = None try: - if servicename == 'host': - altservice = api.Command['host_show'](name, all=True) - else: + if principal_type == HOST: + alt_principal_obj = api.Command['host_show'](name, all=True) + elif principal_type == SERVICE: altprincipal = '%s/%s@%s' % (servicename, name, realm) - altservice = api.Command['service_show']( + alt_principal_obj = api.Command['service_show']( altprincipal, all=True) + elif principal_type == USER: + raise errors.ValidationError( + name='csr', + error=_("subject alt name type %s is forbidden " + "for user principals") % name_type + ) except errors.NotFound: # We don't want to issue any certificates referencing # machines we don't know about. Nothing is stored in this @@ -396,47 +442,41 @@ class cert_request(VirtualCommand): raise errors.NotFound(reason=_('The service principal for ' 'subject alt name %s in certificate request does not ' 'exist') % name) - altdn = altservice['result']['dn'] - if not ldap.can_write(altdn, "usercertificate"): - raise errors.ACIError(info=_( - "Insufficient privilege to create a certificate with " - "subject alt name '%s'.") % name) + if alt_principal_obj is not None: + altdn = alt_principal_obj['result']['dn'] + if not ldap.can_write(altdn, "usercertificate"): + raise errors.ACIError(info=_( + "Insufficient privilege to create a certificate " + "with subject alt name '%s'.") % name) elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME, pkcs10.SAN_OTHERNAME_UPN): - if name != principal: + if name != principal_string: raise errors.ACIError( info=_("Principal '%s' in subject alt name does not " - "match requested service principal") % name) + "match requested principal") % name) + elif name_type == pkcs10.SAN_RFC822NAME: + if principal_type == USER: + if name not in principal_obj.get('mail', []): + raise errors.ValidationError( + name='csr', + error=_( + "RFC822Name does not match " + "any of user's email addresses") + ) + else: + raise errors.ValidationError( + name='csr', + error=_("subject alt name type %s is forbidden " + "for non-user principals") % name_type + ) else: raise errors.ACIError( info=_("Subject alt name type %s is forbidden") % name_type) - if 'usercertificate' in service: - serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER) - # revoke the certificate and remove it from the service - # entry before proceeding. First we retrieve the certificate to - # see if it is already revoked, if not then we revoke it. - try: - result = api.Command['cert_show'](unicode(serial))['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke'](unicode(serial), revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement get - pass - if not principal.startswith('host/'): - api.Command['service_mod'](principal, usercertificate=None) - else: - hostname = get_host_from_principal(principal) - api.Command['host_mod'](hostname, usercertificate=None) - # Request the certificate result = self.Backend.ra.request_certificate( - csr, 'caIPAserviceCert', request_type=request_type) + csr, profile_id, request_type=request_type) cert = x509.load_certificate(result['certificate']) result['issuer'] = unicode(cert.issuer) result['valid_not_before'] = unicode(cert.valid_not_before_str) @@ -444,15 +484,19 @@ class cert_request(VirtualCommand): result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) - # Success? Then add it to the service entry. - if 'certificate' in result: - if not principal.startswith('host/'): - skw = {"usercertificate": str(result.get('certificate'))} - api.Command['service_mod'](principal, **skw) - else: - hostname = get_host_from_principal(principal) - skw = {"usercertificate": str(result.get('certificate'))} - api.Command['host_mod'](hostname, **skw) + # Success? Then add it to the principal's entry + # (unless the profile tells us not to) + profile = api.Command['certprofile_show'](profile_id) + store = profile['result']['ipacertprofilestoreissued'][0] == 'TRUE' + if store and 'certificate' in result: + cert = str(result.get('certificate')) + kwargs = dict(addattr=u'usercertificate={}'.format(cert)) + if principal_type == SERVICE: + api.Command['service_mod'](principal_string, **kwargs) + elif principal_type == HOST: + api.Command['host_mod'](principal_name, **kwargs) + elif principal_type == USER: + api.Command['user_mod'](principal_name, **kwargs) return dict( result=result |