From c2af032c0333f7e210c54369159d1d9f5e3fec74 Mon Sep 17 00:00:00 2001 From: Martin Babinsky Date: Thu, 23 Jun 2016 18:54:49 +0200 Subject: Migrate management framework plugins to use Principal parameter All plugins will now use this parameter and common code for all operations on Kerberos principals. Additional semantic validators and normalizers were added to determine or append a correct realm so that the previous behavior is kept intact. https://fedorahosted.org/freeipa/ticket/3864 Reviewed-By: David Kupka Reviewed-By: Jan Cholasta --- ipaserver/plugins/cert.py | 88 +++++++++++++++++++++-------------------------- 1 file changed, 39 insertions(+), 49 deletions(-) (limited to 'ipaserver/plugins/cert.py') diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index 4cd2ab096..2f7904cd7 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -38,18 +38,18 @@ from ipalib import ngettext from ipalib.constants import IPA_CA_CN from ipalib.crud import Create, PKQuery, Retrieve, Search from ipalib.frontend import Method, Object -from ipalib.parameters import Bytes, DateTime, DNParam +from ipalib.parameters import Bytes, DateTime, DNParam, Principal from ipalib.plugable import Registry from .virtual import VirtualCommand from .baseldap import pkey_to_value -from .service import split_any_principal from .certprofile import validate_profile_id from .caacl import acl_evaluate from ipalib.text import _ from ipalib.request import context from ipalib import output -from .service import validate_principal +from ipapython import kerberos from ipapython.dn import DN +from ipaserver.plugins.service import normalize_principal, validate_realm if six.PY3: unicode = str @@ -216,35 +216,21 @@ def normalize_serial_number(num): return unicode(num) -def get_host_from_principal(principal): - """ - Given a principal with or without a realm return the - host portion. - """ - validate_principal(None, principal) - realm = principal.find('@') - slash = principal.find('/') - if realm == -1: - realm = len(principal) - hostname = principal[slash+1:realm] - - return hostname - def ca_enabled_check(): if not api.Command.ca_is_enabled()['result']: raise errors.NotFound(reason=_('CA is not configured')) -def caacl_check(principal_type, principal_string, ca, profile_id): +def caacl_check(principal_type, principal, ca, profile_id): principal_type_map = {USER: 'user', HOST: 'host', SERVICE: 'service'} if not acl_evaluate( principal_type_map[principal_type], - principal_string, ca, profile_id): + principal, ca, profile_id): raise errors.ACIError(info=_( "Principal '%(principal)s' " "is not permitted to use CA '%(ca)s' " "with profile '%(profile_id)s' for certificate issuance." ) % dict( - principal=principal_string, + principal=unicode(principal), ca=ca, profile_id=profile_id ) @@ -386,10 +372,12 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): operation="request certificate" takes_options = ( - Str( + Principal( 'principal', + validate_realm, label=_('Principal'), doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'), + normalizer=normalize_principal ), Flag( 'add', @@ -432,27 +420,29 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): taskgroup (directly or indirectly via role membership). """ - principal_string = kw.get('principal') - principal = split_any_principal(principal_string) - servicename, principal_name, realm = principal - if servicename is None: + principal = kw.get('principal') + principal_string = unicode(principal) + + if principal.is_user: principal_type = USER - elif servicename == 'host': + elif principal.is_host: principal_type = HOST else: principal_type = SERVICE - bind_principal = split_any_principal(getattr(context, 'principal')) - bind_service, bind_name, bind_realm = bind_principal + bind_principal = kerberos.Principal( + getattr(context, 'principal')) + bind_principal_string = unicode(bind_principal) - if bind_service is None: + if bind_principal.is_user: bind_principal_type = USER - elif bind_service == 'host': + elif bind_principal.is_host: bind_principal_type = HOST else: bind_principal_type = SERVICE - if bind_principal != principal and bind_principal_type != HOST: + if (bind_principal_string != principal_string and + bind_principal_type != HOST): # Can the bound principal request certs for another principal? self.check_access() @@ -463,7 +453,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): bypass_caacl = False if not bypass_caacl: - caacl_check(principal_type, principal_string, ca, profile_id) + caacl_check(principal_type, principal, ca, profile_id) try: subject = pkcs10.get_subject(csr) @@ -474,7 +464,8 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): error=_("Failure decoding Certificate Signing Request: %s") % e) # self-service and host principals may bypass SAN permission check - if bind_principal != principal and bind_principal_type != HOST: + if (bind_principal_string != principal_string + and bind_principal_type != HOST): if '2.5.29.17' in extensions: self.check_access('request certificate with subjectaltname') @@ -486,9 +477,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if principal_type == SERVICE: principal_obj = api.Command['service_show'](principal_string, all=True) elif principal_type == HOST: - principal_obj = api.Command['host_show'](principal_name, all=True) + principal_obj = api.Command['host_show']( + principal.hostname, all=True) elif principal_type == USER: - principal_obj = api.Command['user_show'](principal_name, all=True) + principal_obj = api.Command['user_show']( + principal.username, all=True) except errors.NotFound as e: if add: if principal_type == SERVICE: @@ -512,14 +505,14 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): error=_("No Common Name was found in subject of request.")) if principal_type in (SERVICE, HOST): - if cn.lower() != principal_name.lower(): + if cn.lower() != principal.hostname.lower(): raise errors.ACIError( info=_("hostname in subject of request '%(cn)s' " "does not match principal hostname '%(hostname)s'") - % dict(cn=cn, hostname=principal_name)) + % dict(cn=cn, hostname=principal.hostname)) elif principal_type == USER: # check user name - if cn != principal_name: + if cn != principal.username: raise errors.ValidationError( name='csr', error=_("DN commonName does not match user's login") @@ -545,13 +538,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if name_type == pkcs10.SAN_DNSNAME: name = unicode(name) alt_principal_obj = None - alt_principal_string = None + alt_principal_string = unicode(principal) try: if principal_type == HOST: - alt_principal_string = 'host/%s@%s' % (name, realm) alt_principal_obj = api.Command['host_show'](name, all=True) elif principal_type == SERVICE: - alt_principal_string = '%s/%s@%s' % (servicename, name, realm) alt_principal_obj = api.Command['service_show']( alt_principal_string, all=True) elif principal_type == USER: @@ -574,11 +565,10 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): "Insufficient privilege to create a certificate " "with subject alt name '%s'.") % name) if alt_principal_string is not None and not bypass_caacl: - caacl_check( - principal_type, alt_principal_string, ca, profile_id) + caacl_check(principal_type, principal, ca, profile_id) elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME, pkcs10.SAN_OTHERNAME_UPN): - if split_any_principal(name) != principal: + if name != principal_string: raise errors.ACIError( info=_("Principal '%s' in subject alt name does not " "match requested principal") % name) @@ -619,9 +609,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if principal_type == SERVICE: api.Command['service_mod'](principal_string, **kwargs) elif principal_type == HOST: - api.Command['host_mod'](principal_name, **kwargs) + api.Command['host_mod'](principal.hostname, **kwargs) elif principal_type == USER: - api.Command['user_mod'](principal_name, **kwargs) + api.Command['user_mod'](principal.username, **kwargs) return dict( result=result, @@ -748,10 +738,10 @@ class cert_show(Retrieve, CertMethod, VirtualCommand): self.check_access() except errors.ACIError as acierr: self.debug("Not granted by ACI to retrieve certificate, looking at principal") - bind_principal = getattr(context, 'principal') - if not bind_principal.startswith('host/'): + bind_principal = kerberos.Principal(getattr(context, 'principal')) + if not bind_principal.is_host: raise acierr - hostname = get_host_from_principal(bind_principal) + hostname = bind_principal.hostname ca_obj = api.Command.ca_show(options['cacn'])['result'] issuer_dn = ca_obj['ipacasubjectdn'][0] -- cgit