diff options
Diffstat (limited to 'ipaserver/plugins/cert.py')
-rw-r--r-- | ipaserver/plugins/cert.py | 88 |
1 files changed, 39 insertions, 49 deletions
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index 4cd2ab096..2f7904cd7 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -38,18 +38,18 @@ from ipalib import ngettext from ipalib.constants import IPA_CA_CN from ipalib.crud import Create, PKQuery, Retrieve, Search from ipalib.frontend import Method, Object -from ipalib.parameters import Bytes, DateTime, DNParam +from ipalib.parameters import Bytes, DateTime, DNParam, Principal from ipalib.plugable import Registry from .virtual import VirtualCommand from .baseldap import pkey_to_value -from .service import split_any_principal from .certprofile import validate_profile_id from .caacl import acl_evaluate from ipalib.text import _ from ipalib.request import context from ipalib import output -from .service import validate_principal +from ipapython import kerberos from ipapython.dn import DN +from ipaserver.plugins.service import normalize_principal, validate_realm if six.PY3: unicode = str @@ -216,35 +216,21 @@ def normalize_serial_number(num): return unicode(num) -def get_host_from_principal(principal): - """ - Given a principal with or without a realm return the - host portion. - """ - validate_principal(None, principal) - realm = principal.find('@') - slash = principal.find('/') - if realm == -1: - realm = len(principal) - hostname = principal[slash+1:realm] - - return hostname - def ca_enabled_check(): if not api.Command.ca_is_enabled()['result']: raise errors.NotFound(reason=_('CA is not configured')) -def caacl_check(principal_type, principal_string, ca, profile_id): +def caacl_check(principal_type, principal, ca, profile_id): principal_type_map = {USER: 'user', HOST: 'host', SERVICE: 'service'} if not acl_evaluate( principal_type_map[principal_type], - principal_string, ca, profile_id): + principal, ca, profile_id): raise errors.ACIError(info=_( "Principal '%(principal)s' " "is not permitted to use CA '%(ca)s' " "with profile '%(profile_id)s' for certificate issuance." ) % dict( - principal=principal_string, + principal=unicode(principal), ca=ca, profile_id=profile_id ) @@ -386,10 +372,12 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): operation="request certificate" takes_options = ( - Str( + Principal( 'principal', + validate_realm, label=_('Principal'), doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'), + normalizer=normalize_principal ), Flag( 'add', @@ -432,27 +420,29 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): taskgroup (directly or indirectly via role membership). """ - principal_string = kw.get('principal') - principal = split_any_principal(principal_string) - servicename, principal_name, realm = principal - if servicename is None: + principal = kw.get('principal') + principal_string = unicode(principal) + + if principal.is_user: principal_type = USER - elif servicename == 'host': + elif principal.is_host: principal_type = HOST else: principal_type = SERVICE - bind_principal = split_any_principal(getattr(context, 'principal')) - bind_service, bind_name, bind_realm = bind_principal + bind_principal = kerberos.Principal( + getattr(context, 'principal')) + bind_principal_string = unicode(bind_principal) - if bind_service is None: + if bind_principal.is_user: bind_principal_type = USER - elif bind_service == 'host': + elif bind_principal.is_host: bind_principal_type = HOST else: bind_principal_type = SERVICE - if bind_principal != principal and bind_principal_type != HOST: + if (bind_principal_string != principal_string and + bind_principal_type != HOST): # Can the bound principal request certs for another principal? self.check_access() @@ -463,7 +453,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): bypass_caacl = False if not bypass_caacl: - caacl_check(principal_type, principal_string, ca, profile_id) + caacl_check(principal_type, principal, ca, profile_id) try: subject = pkcs10.get_subject(csr) @@ -474,7 +464,8 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): error=_("Failure decoding Certificate Signing Request: %s") % e) # self-service and host principals may bypass SAN permission check - if bind_principal != principal and bind_principal_type != HOST: + if (bind_principal_string != principal_string + and bind_principal_type != HOST): if '2.5.29.17' in extensions: self.check_access('request certificate with subjectaltname') @@ -486,9 +477,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if principal_type == SERVICE: principal_obj = api.Command['service_show'](principal_string, all=True) elif principal_type == HOST: - principal_obj = api.Command['host_show'](principal_name, all=True) + principal_obj = api.Command['host_show']( + principal.hostname, all=True) elif principal_type == USER: - principal_obj = api.Command['user_show'](principal_name, all=True) + principal_obj = api.Command['user_show']( + principal.username, all=True) except errors.NotFound as e: if add: if principal_type == SERVICE: @@ -512,14 +505,14 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): error=_("No Common Name was found in subject of request.")) if principal_type in (SERVICE, HOST): - if cn.lower() != principal_name.lower(): + if cn.lower() != principal.hostname.lower(): raise errors.ACIError( info=_("hostname in subject of request '%(cn)s' " "does not match principal hostname '%(hostname)s'") - % dict(cn=cn, hostname=principal_name)) + % dict(cn=cn, hostname=principal.hostname)) elif principal_type == USER: # check user name - if cn != principal_name: + if cn != principal.username: raise errors.ValidationError( name='csr', error=_("DN commonName does not match user's login") @@ -545,13 +538,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if name_type == pkcs10.SAN_DNSNAME: name = unicode(name) alt_principal_obj = None - alt_principal_string = None + alt_principal_string = unicode(principal) try: if principal_type == HOST: - alt_principal_string = 'host/%s@%s' % (name, realm) alt_principal_obj = api.Command['host_show'](name, all=True) elif principal_type == SERVICE: - alt_principal_string = '%s/%s@%s' % (servicename, name, realm) alt_principal_obj = api.Command['service_show']( alt_principal_string, all=True) elif principal_type == USER: @@ -574,11 +565,10 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): "Insufficient privilege to create a certificate " "with subject alt name '%s'.") % name) if alt_principal_string is not None and not bypass_caacl: - caacl_check( - principal_type, alt_principal_string, ca, profile_id) + caacl_check(principal_type, principal, ca, profile_id) elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME, pkcs10.SAN_OTHERNAME_UPN): - if split_any_principal(name) != principal: + if name != principal_string: raise errors.ACIError( info=_("Principal '%s' in subject alt name does not " "match requested principal") % name) @@ -619,9 +609,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand): if principal_type == SERVICE: api.Command['service_mod'](principal_string, **kwargs) elif principal_type == HOST: - api.Command['host_mod'](principal_name, **kwargs) + api.Command['host_mod'](principal.hostname, **kwargs) elif principal_type == USER: - api.Command['user_mod'](principal_name, **kwargs) + api.Command['user_mod'](principal.username, **kwargs) return dict( result=result, @@ -748,10 +738,10 @@ class cert_show(Retrieve, CertMethod, VirtualCommand): self.check_access() except errors.ACIError as acierr: self.debug("Not granted by ACI to retrieve certificate, looking at principal") - bind_principal = getattr(context, 'principal') - if not bind_principal.startswith('host/'): + bind_principal = kerberos.Principal(getattr(context, 'principal')) + if not bind_principal.is_host: raise acierr - hostname = get_host_from_principal(bind_principal) + hostname = bind_principal.hostname ca_obj = api.Command.ca_show(options['cacn'])['result'] issuer_dn = ca_obj['ipacasubjectdn'][0] |