summaryrefslogtreecommitdiffstats
path: root/ipaserver/plugins/cert.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipaserver/plugins/cert.py')
-rw-r--r--ipaserver/plugins/cert.py86
1 files changed, 68 insertions, 18 deletions
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index e4efa7d37..81872cffd 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -144,11 +144,12 @@ http://www.ietf.org/rfc/rfc5280.txt
""")
-USER, HOST, SERVICE = range(3)
+USER, HOST, KRBTGT, SERVICE = range(4)
PRINCIPAL_TYPE_STRING_MAP = {
USER: _('user'),
HOST: _('host'),
+ KRBTGT: _('krbtgt'),
SERVICE: _('service'),
}
@@ -216,6 +217,13 @@ def caacl_check(principal_type, principal, ca, profile_id):
)
+def ca_kdc_check(ldap, hostname):
+ result = api.Command.config_show()['result']
+ if hostname not in result['ipa_master_server']:
+ raise errors.ACIError(info=_(
+ "Host '%(hostname)s' is not a KDC") % dict(hostname=hostname))
+
+
def validate_certificate(value):
return x509.validate_certificate(value, x509.DER)
@@ -533,6 +541,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
ca_enabled_check()
ldap = self.api.Backend.ldap2
+ realm = unicode(self.api.env.realm)
add = kw.get('add')
request_type = kw.get('request_type')
profile_id = kw.get('profile_id', self.Backend.ra.DEFAULT_PROFILE)
@@ -563,11 +572,16 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
principal_type = USER
elif principal.is_host:
principal_type = HOST
+ elif principal.service_name == 'krbtgt':
+ principal_type = KRBTGT
+ if profile_id != self.Backend.ra.KDC_PROFILE:
+ raise errors.ACIError(
+ info=_("krbtgt certs can use only the %s profile") % (
+ self.Backend.ra.KDC_PROFILE))
else:
principal_type = SERVICE
- bind_principal = kerberos.Principal(
- getattr(context, 'principal'))
+ bind_principal = kerberos.Principal(getattr(context, 'principal'))
bind_principal_string = unicode(bind_principal)
if bind_principal.is_user:
@@ -589,7 +603,10 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
bypass_caacl = False
if not bypass_caacl:
- caacl_check(principal_type, principal, ca, profile_id)
+ if principal_type == KRBTGT:
+ ca_kdc_check(ldap, bind_principal.hostname)
+ else:
+ caacl_check(principal_type, principal, ca, profile_id)
try:
csr_obj = pkcs10.load_certificate_request(csr)
@@ -616,6 +633,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
try:
if principal_type == SERVICE:
principal_obj = api.Command['service_show'](principal_string, all=True)
+ elif principal_type == KRBTGT:
+ # Allow only our own realm krbtgt for now, no trusted realm's.
+ if principal != kerberos.Principal((u'krbtgt', realm),
+ realm=realm):
+ raise errors.NotFound("Not our realm's krbtgt")
elif principal_type == HOST:
principal_obj = api.Command['host_show'](
principal.hostname, all=True)
@@ -635,8 +657,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
else:
raise errors.NotFound(
reason=_("The principal for this request doesn't exist."))
- principal_obj = principal_obj['result']
- dn = principal_obj['dn']
+ if principal_obj:
+ principal_obj = principal_obj['result']
+ dn = principal_obj['dn']
# Ensure that the DN in the CSR matches the principal
#
@@ -656,6 +679,13 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
"hostname in subject of request '%(cn)s' does not "
"match name or aliases of principal '%(principal)s'"
) % dict(cn=cn, principal=principal))
+ elif principal_type == KRBTGT and not bypass_caacl:
+ if cn.lower() != bind_principal.hostname.lower():
+ raise errors.ACIError(
+ info=_("hostname in subject of request '%(cn)s' "
+ "does not match principal hostname "
+ "'%(hostname)s'") % dict(
+ cn=cn, hostname=bind_principal.hostname))
elif principal_type == USER:
# check user name
if cn != principal.username:
@@ -677,10 +707,12 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
"any of user's email addresses")
)
- # We got this far so the principal entry exists, can we write it?
- if not ldap.can_write(dn, "usercertificate"):
- raise errors.ACIError(info=_("Insufficient 'write' privilege "
- "to the 'userCertificate' attribute of entry '%s'.") % dn)
+ if principal_type != KRBTGT:
+ # We got this far so the principal entry exists, can we write it?
+ if not ldap.can_write(dn, "usercertificate"):
+ raise errors.ACIError(
+ info=_("Insufficient 'write' privilege to the "
+ "'userCertificate' attribute of entry '%s'.") % dn)
# Validate the subject alt name, if any
generalnames = []
@@ -711,6 +743,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
if principal_type == HOST:
alt_principal_obj = api.Command['host_show'](
name, all=True)
+ elif principal_type == KRBTGT:
+ alt_principal = kerberos.Principal(
+ (u'host', name), principal.realm)
elif principal_type == SERVICE:
alt_principal_obj = api.Command['service_show'](
alt_principal, all=True)
@@ -722,17 +757,26 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
'subject alt name %s in certificate request does not '
'exist') % name)
- # we found an alternative principal;
- # now check write access and caacl
- altdn = alt_principal_obj['result']['dn']
- if not ldap.can_write(altdn, "usercertificate"):
- raise errors.ACIError(info=_(
- "Insufficient privilege to create a certificate "
- "with subject alt name '%s'.") % name)
+ if alt_principal_obj is not None:
+ # we found an alternative principal;
+ # now check write access and caacl
+ altdn = alt_principal_obj['result']['dn']
+ if not ldap.can_write(altdn, "usercertificate"):
+ raise errors.ACIError(info=_(
+ "Insufficient privilege to create a certificate "
+ "with subject alt name '%s'.") % name)
if not bypass_caacl:
- caacl_check(principal_type, alt_principal, ca, profile_id)
+ if principal_type == KRBTGT:
+ ca_kdc_check(ldap, alt_principal.hostname)
+ else:
+ caacl_check(principal_type, alt_principal, ca,
+ profile_id)
elif isinstance(gn, (x509.KRB5PrincipalName, x509.UPN)):
+ if principal_type == KRBTGT:
+ principal_obj = dict()
+ principal_obj['krbprincipalname'] = [
+ kerberos.Principal((u'krbtgt', realm), realm)]
if not _principal_name_matches_principal(
gn.name, principal_obj):
raise errors.ValidationError(
@@ -793,6 +837,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
api.Command['host_mod'](principal.hostname, **kwargs)
elif principal_type == USER:
api.Command['user_mod'](principal.username, **kwargs)
+ elif principal_type == KRBTGT:
+ self.log.error("Profiles used to store cert should't be "
+ "used for krbtgt certificates")
return dict(
result=result,
@@ -810,6 +857,9 @@ def _dns_name_matches_principal(name, principal, principal_obj):
:return: True if name matches, otherwise False
"""
+ if principal_obj is None:
+ return False
+
for alias in principal_obj.get('krbprincipalname', []):
# we can only compare them if both subject principal and
# the alias are service or host principals