summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--API.txt3
-rw-r--r--ipalib/pkcs10.py1
-rw-r--r--ipalib/plugins/cert.py220
3 files changed, 135 insertions, 89 deletions
diff --git a/API.txt b/API.txt
index abd9407af..7574bc900 100644
--- a/API.txt
+++ b/API.txt
@@ -485,10 +485,11 @@ arg: Str('serial_number')
option: Str('version?', exclude='webui')
output: Output('result', None, None)
command: cert_request
-args: 1,4,1
+args: 1,5,1
arg: File('csr', cli_name='csr_file')
option: Flag('add', autofill=True, default=False)
option: Str('principal')
+option: Str('profile_id?')
option: Str('request_type', autofill=True, default=u'pkcs10')
option: Str('version?', exclude='webui')
output: Output('result', <type 'dict'>, None)
diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py
index f35e200a2..6299dfea4 100644
--- a/ipalib/pkcs10.py
+++ b/ipalib/pkcs10.py
@@ -30,6 +30,7 @@ PEM = 0
DER = 1
SAN_DNSNAME = 'DNS name'
+SAN_RFC822NAME = 'RFC822 Name'
SAN_OTHERNAME_UPN = 'Other Name (OID.1.3.6.1.4.1.311.20.2.3)'
SAN_OTHERNAME_KRB5PRINCIPALNAME = 'Other Name (OID.1.3.6.1.5.2.2)'
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index e4cb6dc0a..d12290017 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -31,7 +31,8 @@ from ipalib import ngettext
from ipalib.plugable import Registry
from ipalib.plugins.virtual import *
from ipalib.plugins.baseldap import pkey_to_value
-from ipalib.plugins.service import split_principal
+from ipalib.plugins.service import split_any_principal
+from ipalib.plugins.certprofile import validate_profile_id
import base64
import traceback
from ipalib.text import _
@@ -122,6 +123,8 @@ http://www.ietf.org/rfc/rfc5280.txt
""")
+USER, HOST, SERVICE = range(3)
+
register = Registry()
def validate_pkidate(ugettext, value):
@@ -232,7 +235,7 @@ class cert_request(VirtualCommand):
takes_options = (
Str('principal',
label=_('Principal'),
- doc=_('Service principal for this certificate (e.g. HTTP/test.example.com)'),
+ doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'),
),
Str('request_type',
default=u'pkcs10',
@@ -243,6 +246,10 @@ class cert_request(VirtualCommand):
default=False,
autofill=True
),
+ Str('profile_id?', validate_profile_id,
+ label=_("Profile ID"),
+ doc=_("Certificate Profile to use"),
+ )
)
has_output_params = (
@@ -294,10 +301,9 @@ class cert_request(VirtualCommand):
ca_enabled_check()
ldap = self.api.Backend.ldap2
- principal = kw.get('principal')
add = kw.get('add')
request_type = kw.get('request_type')
- service = None
+ profile_id = kw.get('profile_id', self.Backend.ra.DEFAULT_PROFILE)
"""
Access control is partially handled by the ACI titled
@@ -310,9 +316,28 @@ class cert_request(VirtualCommand):
taskgroup (directly or indirectly via role membership).
"""
- bind_principal = getattr(context, 'principal')
- # Can this user request certs?
- if not bind_principal.startswith('host/'):
+ principal_string = kw.get('principal')
+ principal = split_any_principal(principal_string)
+ servicename, principal_name, realm = principal
+ if servicename is None:
+ principal_type = USER
+ elif servicename == 'host':
+ principal_type = HOST
+ else:
+ principal_type = SERVICE
+
+ bind_principal = split_any_principal(getattr(context, 'principal'))
+ bind_service, bind_name, bind_realm = bind_principal
+
+ if bind_service is None:
+ bind_principal_type = USER
+ elif bind_service == 'host':
+ bind_principal_type = HOST
+ else:
+ bind_principal_type = SERVICE
+
+ if bind_principal != principal and bind_principal_type != HOST:
+ # Can the bound principal request certs for another principal?
self.check_access()
try:
@@ -323,57 +348,71 @@ class cert_request(VirtualCommand):
raise errors.CertificateOperationError(
error=_("Failure decoding Certificate Signing Request: %s") % e)
- if not bind_principal.startswith('host/'):
+ # host principals may bypass allowed ext check
+ if bind_principal_type != HOST:
for ext in extensions:
operation = self._allowed_extensions.get(ext)
if operation:
self.check_access(operation)
- # Ensure that the hostname in the CSR matches the principal
- subject_host = subject.common_name #pylint: disable=E1101
- if not subject_host:
+ dn = None
+ principal_obj = None
+ # See if the service exists and punt if it doesn't and we aren't
+ # going to add it
+ try:
+ if principal_type == SERVICE:
+ principal_obj = api.Command['service_show'](principal_string, all=True)
+ elif principal_type == HOST:
+ principal_obj = api.Command['host_show'](principal_name, all=True)
+ elif principal_type == USER:
+ principal_obj = api.Command['user_show'](principal_name, all=True)
+ except errors.NotFound as e:
+ if principal_type == SERVICE and add:
+ principal_obj = api.Command['service_add'](principal_string, force=True)
+ else:
+ raise errors.NotFound(
+ reason=_("The principal for this request doesn't exist."))
+ principal_obj = principal_obj['result']
+ dn = principal_obj['dn']
+
+ # Ensure that the DN in the CSR matches the principal
+ cn = subject.common_name #pylint: disable=E1101
+ if not cn:
raise errors.ValidationError(name='csr',
- error=_("No hostname was found in subject of request."))
+ error=_("No Common Name was found in subject of request."))
- (servicename, hostname, realm) = split_principal(principal)
- if subject_host.lower() != hostname.lower():
- raise errors.ACIError(
- info=_("hostname in subject of request '%(subject_host)s' "
- "does not match principal hostname '%(hostname)s'") % dict(
- subject_host=subject_host, hostname=hostname))
+ if principal_type in (SERVICE, HOST):
+ if cn.lower() != principal_name.lower():
+ raise errors.ACIError(
+ info=_("hostname in subject of request '%(cn)s' "
+ "does not match principal hostname '%(hostname)s'")
+ % dict(cn=cn, hostname=principal_name))
+ elif principal_type == USER:
+ # check user name
+ if cn != principal_name:
+ raise errors.ValidationError(
+ name='csr',
+ error=_(
+ "DN commonName does not match "
+ "any of user's email addresses")
+ )
+
+ # check email address
+ mail = subject.email_address #pylint: disable=E1101
+ if mail is not None and mail not in principal_obj.get('mail', []):
+ raise errors.ValidationError(
+ name='csr',
+ error=_(
+ "DN emailAddress does not match "
+ "any of user's email addresses")
+ )
for ext in extensions:
if ext not in self._allowed_extensions:
raise errors.ValidationError(
name='csr', error=_("extension %s is forbidden") % ext)
- for name_type, name in subjectaltname:
- if name_type not in (pkcs10.SAN_DNSNAME,
- pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME,
- pkcs10.SAN_OTHERNAME_UPN):
- raise errors.ValidationError(
- name='csr',
- error=_("subject alt name type %s is forbidden") %
- name_type)
-
- dn = None
- service = None
- # See if the service exists and punt if it doesn't and we aren't
- # going to add it
- try:
- if servicename != 'host':
- service = api.Command['service_show'](principal, all=True)
- else:
- service = api.Command['host_show'](hostname, all=True)
- except errors.NotFound, e:
- if not add:
- raise errors.NotFound(reason=_("The service principal for "
- "this request doesn't exist."))
- service = api.Command['service_add'](principal, force=True)
- service = service['result']
- dn = service['dn']
-
- # We got this far so the service entry exists, can we write it?
+ # We got this far so the principal entry exists, can we write it?
if not ldap.can_write(dn, "usercertificate"):
raise errors.ACIError(info=_("Insufficient 'write' privilege "
"to the 'userCertificate' attribute of entry '%s'.") % dn)
@@ -382,13 +421,20 @@ class cert_request(VirtualCommand):
for name_type, name in subjectaltname:
if name_type == pkcs10.SAN_DNSNAME:
name = unicode(name)
+ alt_principal_obj = None
try:
- if servicename == 'host':
- altservice = api.Command['host_show'](name, all=True)
- else:
+ if principal_type == HOST:
+ alt_principal_obj = api.Command['host_show'](name, all=True)
+ elif principal_type == SERVICE:
altprincipal = '%s/%s@%s' % (servicename, name, realm)
- altservice = api.Command['service_show'](
+ alt_principal_obj = api.Command['service_show'](
altprincipal, all=True)
+ elif principal_type == USER:
+ raise errors.ValidationError(
+ name='csr',
+ error=_("subject alt name type %s is forbidden "
+ "for user principals") % name_type
+ )
except errors.NotFound:
# We don't want to issue any certificates referencing
# machines we don't know about. Nothing is stored in this
@@ -396,47 +442,41 @@ class cert_request(VirtualCommand):
raise errors.NotFound(reason=_('The service principal for '
'subject alt name %s in certificate request does not '
'exist') % name)
- altdn = altservice['result']['dn']
- if not ldap.can_write(altdn, "usercertificate"):
- raise errors.ACIError(info=_(
- "Insufficient privilege to create a certificate with "
- "subject alt name '%s'.") % name)
+ if alt_principal_obj is not None:
+ altdn = alt_principal_obj['result']['dn']
+ if not ldap.can_write(altdn, "usercertificate"):
+ raise errors.ACIError(info=_(
+ "Insufficient privilege to create a certificate "
+ "with subject alt name '%s'.") % name)
elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME,
pkcs10.SAN_OTHERNAME_UPN):
- if name != principal:
+ if name != principal_string:
raise errors.ACIError(
info=_("Principal '%s' in subject alt name does not "
- "match requested service principal") % name)
+ "match requested principal") % name)
+ elif name_type == pkcs10.SAN_RFC822NAME:
+ if principal_type == USER:
+ if name not in principal_obj.get('mail', []):
+ raise errors.ValidationError(
+ name='csr',
+ error=_(
+ "RFC822Name does not match "
+ "any of user's email addresses")
+ )
+ else:
+ raise errors.ValidationError(
+ name='csr',
+ error=_("subject alt name type %s is forbidden "
+ "for non-user principals") % name_type
+ )
else:
raise errors.ACIError(
info=_("Subject alt name type %s is forbidden") %
name_type)
- if 'usercertificate' in service:
- serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER)
- # revoke the certificate and remove it from the service
- # entry before proceeding. First we retrieve the certificate to
- # see if it is already revoked, if not then we revoke it.
- try:
- result = api.Command['cert_show'](unicode(serial))['result']
- if 'revocation_reason' not in result:
- try:
- api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except errors.NotImplementedError:
- # some CA's might not implement get
- pass
- if not principal.startswith('host/'):
- api.Command['service_mod'](principal, usercertificate=None)
- else:
- hostname = get_host_from_principal(principal)
- api.Command['host_mod'](hostname, usercertificate=None)
-
# Request the certificate
result = self.Backend.ra.request_certificate(
- csr, 'caIPAserviceCert', request_type=request_type)
+ csr, profile_id, request_type=request_type)
cert = x509.load_certificate(result['certificate'])
result['issuer'] = unicode(cert.issuer)
result['valid_not_before'] = unicode(cert.valid_not_before_str)
@@ -444,15 +484,19 @@ class cert_request(VirtualCommand):
result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
- # Success? Then add it to the service entry.
- if 'certificate' in result:
- if not principal.startswith('host/'):
- skw = {"usercertificate": str(result.get('certificate'))}
- api.Command['service_mod'](principal, **skw)
- else:
- hostname = get_host_from_principal(principal)
- skw = {"usercertificate": str(result.get('certificate'))}
- api.Command['host_mod'](hostname, **skw)
+ # Success? Then add it to the principal's entry
+ # (unless the profile tells us not to)
+ profile = api.Command['certprofile_show'](profile_id)
+ store = profile['result']['ipacertprofilestoreissued'][0] == 'TRUE'
+ if store and 'certificate' in result:
+ cert = str(result.get('certificate'))
+ kwargs = dict(addattr=u'usercertificate={}'.format(cert))
+ if principal_type == SERVICE:
+ api.Command['service_mod'](principal_string, **kwargs)
+ elif principal_type == HOST:
+ api.Command['host_mod'](principal_name, **kwargs)
+ elif principal_type == USER:
+ api.Command['user_mod'](principal_name, **kwargs)
return dict(
result=result