summaryrefslogtreecommitdiffstats
path: root/ipaserver/plugins/cert.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipaserver/plugins/cert.py')
-rw-r--r--ipaserver/plugins/cert.py88
1 files changed, 39 insertions, 49 deletions
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index 4cd2ab096..2f7904cd7 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -38,18 +38,18 @@ from ipalib import ngettext
from ipalib.constants import IPA_CA_CN
from ipalib.crud import Create, PKQuery, Retrieve, Search
from ipalib.frontend import Method, Object
-from ipalib.parameters import Bytes, DateTime, DNParam
+from ipalib.parameters import Bytes, DateTime, DNParam, Principal
from ipalib.plugable import Registry
from .virtual import VirtualCommand
from .baseldap import pkey_to_value
-from .service import split_any_principal
from .certprofile import validate_profile_id
from .caacl import acl_evaluate
from ipalib.text import _
from ipalib.request import context
from ipalib import output
-from .service import validate_principal
+from ipapython import kerberos
from ipapython.dn import DN
+from ipaserver.plugins.service import normalize_principal, validate_realm
if six.PY3:
unicode = str
@@ -216,35 +216,21 @@ def normalize_serial_number(num):
return unicode(num)
-def get_host_from_principal(principal):
- """
- Given a principal with or without a realm return the
- host portion.
- """
- validate_principal(None, principal)
- realm = principal.find('@')
- slash = principal.find('/')
- if realm == -1:
- realm = len(principal)
- hostname = principal[slash+1:realm]
-
- return hostname
-
def ca_enabled_check():
if not api.Command.ca_is_enabled()['result']:
raise errors.NotFound(reason=_('CA is not configured'))
-def caacl_check(principal_type, principal_string, ca, profile_id):
+def caacl_check(principal_type, principal, ca, profile_id):
principal_type_map = {USER: 'user', HOST: 'host', SERVICE: 'service'}
if not acl_evaluate(
principal_type_map[principal_type],
- principal_string, ca, profile_id):
+ principal, ca, profile_id):
raise errors.ACIError(info=_(
"Principal '%(principal)s' "
"is not permitted to use CA '%(ca)s' "
"with profile '%(profile_id)s' for certificate issuance."
) % dict(
- principal=principal_string,
+ principal=unicode(principal),
ca=ca,
profile_id=profile_id
)
@@ -386,10 +372,12 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
operation="request certificate"
takes_options = (
- Str(
+ Principal(
'principal',
+ validate_realm,
label=_('Principal'),
doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'),
+ normalizer=normalize_principal
),
Flag(
'add',
@@ -432,27 +420,29 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
taskgroup (directly or indirectly via role membership).
"""
- principal_string = kw.get('principal')
- principal = split_any_principal(principal_string)
- servicename, principal_name, realm = principal
- if servicename is None:
+ principal = kw.get('principal')
+ principal_string = unicode(principal)
+
+ if principal.is_user:
principal_type = USER
- elif servicename == 'host':
+ elif principal.is_host:
principal_type = HOST
else:
principal_type = SERVICE
- bind_principal = split_any_principal(getattr(context, 'principal'))
- bind_service, bind_name, bind_realm = bind_principal
+ bind_principal = kerberos.Principal(
+ getattr(context, 'principal'))
+ bind_principal_string = unicode(bind_principal)
- if bind_service is None:
+ if bind_principal.is_user:
bind_principal_type = USER
- elif bind_service == 'host':
+ elif bind_principal.is_host:
bind_principal_type = HOST
else:
bind_principal_type = SERVICE
- if bind_principal != principal and bind_principal_type != HOST:
+ if (bind_principal_string != principal_string and
+ bind_principal_type != HOST):
# Can the bound principal request certs for another principal?
self.check_access()
@@ -463,7 +453,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
bypass_caacl = False
if not bypass_caacl:
- caacl_check(principal_type, principal_string, ca, profile_id)
+ caacl_check(principal_type, principal, ca, profile_id)
try:
subject = pkcs10.get_subject(csr)
@@ -474,7 +464,8 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
error=_("Failure decoding Certificate Signing Request: %s") % e)
# self-service and host principals may bypass SAN permission check
- if bind_principal != principal and bind_principal_type != HOST:
+ if (bind_principal_string != principal_string
+ and bind_principal_type != HOST):
if '2.5.29.17' in extensions:
self.check_access('request certificate with subjectaltname')
@@ -486,9 +477,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
if principal_type == SERVICE:
principal_obj = api.Command['service_show'](principal_string, all=True)
elif principal_type == HOST:
- principal_obj = api.Command['host_show'](principal_name, all=True)
+ principal_obj = api.Command['host_show'](
+ principal.hostname, all=True)
elif principal_type == USER:
- principal_obj = api.Command['user_show'](principal_name, all=True)
+ principal_obj = api.Command['user_show'](
+ principal.username, all=True)
except errors.NotFound as e:
if add:
if principal_type == SERVICE:
@@ -512,14 +505,14 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
error=_("No Common Name was found in subject of request."))
if principal_type in (SERVICE, HOST):
- if cn.lower() != principal_name.lower():
+ if cn.lower() != principal.hostname.lower():
raise errors.ACIError(
info=_("hostname in subject of request '%(cn)s' "
"does not match principal hostname '%(hostname)s'")
- % dict(cn=cn, hostname=principal_name))
+ % dict(cn=cn, hostname=principal.hostname))
elif principal_type == USER:
# check user name
- if cn != principal_name:
+ if cn != principal.username:
raise errors.ValidationError(
name='csr',
error=_("DN commonName does not match user's login")
@@ -545,13 +538,11 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
if name_type == pkcs10.SAN_DNSNAME:
name = unicode(name)
alt_principal_obj = None
- alt_principal_string = None
+ alt_principal_string = unicode(principal)
try:
if principal_type == HOST:
- alt_principal_string = 'host/%s@%s' % (name, realm)
alt_principal_obj = api.Command['host_show'](name, all=True)
elif principal_type == SERVICE:
- alt_principal_string = '%s/%s@%s' % (servicename, name, realm)
alt_principal_obj = api.Command['service_show'](
alt_principal_string, all=True)
elif principal_type == USER:
@@ -574,11 +565,10 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
"Insufficient privilege to create a certificate "
"with subject alt name '%s'.") % name)
if alt_principal_string is not None and not bypass_caacl:
- caacl_check(
- principal_type, alt_principal_string, ca, profile_id)
+ caacl_check(principal_type, principal, ca, profile_id)
elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME,
pkcs10.SAN_OTHERNAME_UPN):
- if split_any_principal(name) != principal:
+ if name != principal_string:
raise errors.ACIError(
info=_("Principal '%s' in subject alt name does not "
"match requested principal") % name)
@@ -619,9 +609,9 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
if principal_type == SERVICE:
api.Command['service_mod'](principal_string, **kwargs)
elif principal_type == HOST:
- api.Command['host_mod'](principal_name, **kwargs)
+ api.Command['host_mod'](principal.hostname, **kwargs)
elif principal_type == USER:
- api.Command['user_mod'](principal_name, **kwargs)
+ api.Command['user_mod'](principal.username, **kwargs)
return dict(
result=result,
@@ -748,10 +738,10 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
self.check_access()
except errors.ACIError as acierr:
self.debug("Not granted by ACI to retrieve certificate, looking at principal")
- bind_principal = getattr(context, 'principal')
- if not bind_principal.startswith('host/'):
+ bind_principal = kerberos.Principal(getattr(context, 'principal'))
+ if not bind_principal.is_host:
raise acierr
- hostname = get_host_from_principal(bind_principal)
+ hostname = bind_principal.hostname
ca_obj = api.Command.ca_show(options['cacn'])['result']
issuer_dn = ca_obj['ipacasubjectdn'][0]