diff options
Diffstat (limited to 'ipaserver/plugins/cert.py')
-rw-r--r-- | ipaserver/plugins/cert.py | 86 |
1 files changed, 68 insertions, 18 deletions
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index e4efa7d37..81872cffd 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -144,11 +144,12 @@ http://www.ietf.org/rfc/rfc5280.txt """) -USER, HOST, SERVICE = range(3) +USER, HOST, KRBTGT, SERVICE = range(4) PRINCIPAL_TYPE_STRING_MAP = { USER: _('user'), HOST: _('host'), + KRBTGT: _('krbtgt'), SERVICE: _('service'), } @@ -216,6 +217,13 @@ def caacl_check(principal_type, principal, ca, profile_id): ) +def ca_kdc_check(ldap, hostname): + result = api.Command.config_show()['result'] + if hostname not in result['ipa_master_server']: + raise errors.ACIError(info=_( + "Host '%(hostname)s' is not a KDC") % dict(hostname=hostname)) + + def validate_certificate(value): return x509.validate_certificate(value, x509.DER) @@ -533,6 +541,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): ca_enabled_check() ldap = self.api.Backend.ldap2 + realm = unicode(self.api.env.realm) add = kw.get('add') request_type = kw.get('request_type') profile_id = kw.get('profile_id', self.Backend.ra.DEFAULT_PROFILE) @@ -563,11 +572,16 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): principal_type = USER elif principal.is_host: principal_type = HOST + elif principal.service_name == 'krbtgt': + principal_type = KRBTGT + if profile_id != self.Backend.ra.KDC_PROFILE: + raise errors.ACIError( + info=_("krbtgt certs can use only the %s profile") % ( + self.Backend.ra.KDC_PROFILE)) else: principal_type = SERVICE - bind_principal = kerberos.Principal( - getattr(context, 'principal')) + bind_principal = kerberos.Principal(getattr(context, 'principal')) bind_principal_string = unicode(bind_principal) if bind_principal.is_user: @@ -589,7 +603,10 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): bypass_caacl = False if not bypass_caacl: - caacl_check(principal_type, principal, ca, profile_id) + if principal_type == KRBTGT: + ca_kdc_check(ldap, bind_principal.hostname) + else: + caacl_check(principal_type, principal, ca, profile_id) try: csr_obj = pkcs10.load_certificate_request(csr) @@ -616,6 +633,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): try: if principal_type == SERVICE: principal_obj = api.Command['service_show'](principal_string, all=True) + elif principal_type == KRBTGT: + # Allow only our own realm krbtgt for now, no trusted realm's. + if principal != kerberos.Principal((u'krbtgt', realm), + realm=realm): + raise errors.NotFound("Not our realm's krbtgt") elif principal_type == HOST: principal_obj = api.Command['host_show']( principal.hostname, all=True) @@ -635,8 +657,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): else: raise errors.NotFound( reason=_("The principal for this request doesn't exist.")) - principal_obj = principal_obj['result'] - dn = principal_obj['dn'] + if principal_obj: + principal_obj = principal_obj['result'] + dn = principal_obj['dn'] # Ensure that the DN in the CSR matches the principal # @@ -656,6 +679,13 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): "hostname in subject of request '%(cn)s' does not " "match name or aliases of principal '%(principal)s'" ) % dict(cn=cn, principal=principal)) + elif principal_type == KRBTGT and not bypass_caacl: + if cn.lower() != bind_principal.hostname.lower(): + raise errors.ACIError( + info=_("hostname in subject of request '%(cn)s' " + "does not match principal hostname " + "'%(hostname)s'") % dict( + cn=cn, hostname=bind_principal.hostname)) elif principal_type == USER: # check user name if cn != principal.username: @@ -677,10 +707,12 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): "any of user's email addresses") ) - # We got this far so the principal entry exists, can we write it? - if not ldap.can_write(dn, "usercertificate"): - raise errors.ACIError(info=_("Insufficient 'write' privilege " - "to the 'userCertificate' attribute of entry '%s'.") % dn) + if principal_type != KRBTGT: + # We got this far so the principal entry exists, can we write it? + if not ldap.can_write(dn, "usercertificate"): + raise errors.ACIError( + info=_("Insufficient 'write' privilege to the " + "'userCertificate' attribute of entry '%s'.") % dn) # Validate the subject alt name, if any generalnames = [] @@ -711,6 +743,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if principal_type == HOST: alt_principal_obj = api.Command['host_show']( name, all=True) + elif principal_type == KRBTGT: + alt_principal = kerberos.Principal( + (u'host', name), principal.realm) elif principal_type == SERVICE: alt_principal_obj = api.Command['service_show']( alt_principal, all=True) @@ -722,17 +757,26 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): 'subject alt name %s in certificate request does not ' 'exist') % name) - # we found an alternative principal; - # now check write access and caacl - altdn = alt_principal_obj['result']['dn'] - if not ldap.can_write(altdn, "usercertificate"): - raise errors.ACIError(info=_( - "Insufficient privilege to create a certificate " - "with subject alt name '%s'.") % name) + if alt_principal_obj is not None: + # we found an alternative principal; + # now check write access and caacl + altdn = alt_principal_obj['result']['dn'] + if not ldap.can_write(altdn, "usercertificate"): + raise errors.ACIError(info=_( + "Insufficient privilege to create a certificate " + "with subject alt name '%s'.") % name) if not bypass_caacl: - caacl_check(principal_type, alt_principal, ca, profile_id) + if principal_type == KRBTGT: + ca_kdc_check(ldap, alt_principal.hostname) + else: + caacl_check(principal_type, alt_principal, ca, + profile_id) elif isinstance(gn, (x509.KRB5PrincipalName, x509.UPN)): + if principal_type == KRBTGT: + principal_obj = dict() + principal_obj['krbprincipalname'] = [ + kerberos.Principal((u'krbtgt', realm), realm)] if not _principal_name_matches_principal( gn.name, principal_obj): raise errors.ValidationError( @@ -793,6 +837,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): api.Command['host_mod'](principal.hostname, **kwargs) elif principal_type == USER: api.Command['user_mod'](principal.username, **kwargs) + elif principal_type == KRBTGT: + self.log.error("Profiles used to store cert should't be " + "used for krbtgt certificates") return dict( result=result, @@ -810,6 +857,9 @@ def _dns_name_matches_principal(name, principal, principal_obj): :return: True if name matches, otherwise False """ + if principal_obj is None: + return False + for alias in principal_obj.get('krbprincipalname', []): # we can only compare them if both subject principal and # the alias are service or host principals |