summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/cert.py
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2009-11-24 16:07:44 -0500
committerJason Gerard DeRose <jderose@redhat.com>2009-11-30 18:10:09 -0700
commitab1667f3c1607a22c6df49ceba58274347bc5826 (patch)
treebc2e6102d3d9cd103d2418ad5372e164e0e7533d /ipalib/plugins/cert.py
parent7c2c2d6130648fb6dd7c0e52d802cc6eff39ef95 (diff)
downloadfreeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.gz
freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.xz
freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.zip
Use pyasn1-based PKCS#10 and X509v3 parsers instead of pyOpenSSL.
The pyOpenSSL PKCS#10 parser doesn't support attributes so we can't identify requests with subject alt names. Subject alt names are only allowed if: - the host for the alt name exists in IPA - if binding as host principal, the host is in the services managedBy attr
Diffstat (limited to 'ipalib/plugins/cert.py')
-rw-r--r--ipalib/plugins/cert.py90
1 files changed, 69 insertions, 21 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 48ceaa6d7..ba088dd96 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -28,12 +28,16 @@ if api.env.enable_ra is not True:
raise SkipPluginModule(reason='env.enable_ra is not True')
from ipalib import Command, Str, Int, Bytes, Flag, File
from ipalib import errors
+from ipalib import pkcs10
+from ipalib import x509
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
import base64
-from OpenSSL import crypto
from ipalib.request import context
from ipapython import dnsclient
+from pyasn1.error import PyAsn1Error
+import logging
+import traceback
def get_serial(certificate):
"""
@@ -45,9 +49,8 @@ def get_serial(certificate):
if type(certificate) in (list, tuple):
certificate = certificate[0]
try:
- x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate)
- serial = str(x509.get_serial_number())
- except crypto.Error:
+ serial = str(x509.get_serial_number(certificate))
+ except PyAsn1Error:
raise errors.GenericError(format='Unable to decode certificate in entry')
return serial
@@ -57,25 +60,49 @@ def get_csr_hostname(csr):
Return the value of CN in the subject of the request
"""
try:
- der = base64.b64decode(csr)
- request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der)
+ request = pkcs10.load_certificate_request(csr)
sub = request.get_subject().get_components()
for s in sub:
if s[0].lower() == "cn":
return s[1]
- except crypto.Error, e:
- raise errors.GenericError(format='Unable to decode CSR: %s' % str(e))
+ except PyAsn1Error:
+ # The ASN.1 decoding errors tend to be long and involved and the
+ # last bit is generally not interesting. We need the whole traceback.
+ logging.error('Unable to decode CSR\n%s', traceback.format_exc())
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request')
return None
+def get_subjectaltname(csr):
+ """
+ Return the value of the subject alt name, if any
+ """
+ try:
+ request = pkcs10.load_certificate_request(csr)
+ except PyAsn1Error:
+ # The ASN.1 decoding errors tend to be long and involved and the
+ # last bit is generally not interesting. We need the whole traceback.
+ logging.error('Unable to decode CSR\n%s', traceback.format_exc())
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request')
+ return request.get_subjectaltname()
+
def validate_csr(ugettext, csr):
"""
- For now just verify that it is properly base64-encoded.
+ Ensure the CSR is base64-encoded and can be decoded by our PKCS#10
+ parser.
"""
try:
- base64.b64decode(csr)
- except Exception, e:
+ request = pkcs10.load_certificate_request(csr)
+
+ # Explicitly request the attributes. This fires off additional
+ # decoding to get things like the subjectAltName.
+ attrs = request.get_attributes()
+ except TypeError, e:
raise errors.Base64DecodeError(reason=str(e))
+ except PyAsn1Error:
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request')
+ except Exception, e:
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request: %s' % str(e))
class cert_request(VirtualCommand):
@@ -107,38 +134,43 @@ class cert_request(VirtualCommand):
def execute(self, csr, **kw):
ldap = self.api.Backend.ldap2
- skw = {"all": True}
principal = kw.get('principal')
add = kw.get('add')
del kw['principal']
del kw['add']
service = None
- # We just want the CSR bits, make sure there is nothing else
- s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----")
- e = csr.find("-----END NEW CERTIFICATE REQUEST-----")
- if s >= 0:
- csr = csr[s+40:e]
+ """
+ Access control is partially handled by the ACI titled
+ 'Hosts can modify service userCertificate'. This is for the case
+ where a machine binds using a host/ prinicpal. It can only do the
+ request if the target hostname is in the managedBy attribute which
+ is managed using the add/del member commands.
+
+ Binding with a user principal one needs to be in the request_certs
+ taskgroup (directly or indirectly via role membership).
+ """
# Can this user request certs?
self.check_access()
# FIXME: add support for subject alt name
- # Is this cert for this principal?
- subject_host = get_csr_hostname(csr)
# Ensure that the hostname in the CSR matches the principal
+ subject_host = get_csr_hostname(csr)
(servicename, hostname, realm) = split_principal(principal)
if subject_host.lower() != hostname.lower():
raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname))
+ dn = None
+ service = None
# See if the service exists and punt if it doesn't and we aren't
# going to add it
try:
- (dn, service) = api.Command['service_show'](principal, **skw)
+ (dn, service) = api.Command['service_show'](principal, all=True, raw=True)
if 'usercertificate' in service:
# FIXME, what to do here? Do we revoke the old cert?
- raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate']))
+ raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(base64.b64encode(service['usercertificate'][0])))
except errors.NotFound, e:
if not add:
raise errors.NotFound(reason="The service principal for this request doesn't exist.")
@@ -151,6 +183,22 @@ class cert_request(VirtualCommand):
if not ldap.can_write(dn, "usercertificate"):
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
+ # Validate the subject alt name, if any
+ subjectaltname = get_subjectaltname(csr)
+ if subjectaltname is not None:
+ for name in subjectaltname:
+ try:
+ (hostdn, hostentry) = api.Command['host_show'](name, all=True, raw=True)
+ except errors.NotFound:
+ # We don't want to issue any certificates referencing
+ # machines we don't know about. Nothing is stored in this
+ # host record related to this certificate.
+ raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name)
+ authprincipal = getattr(context, 'principal')
+ if authprincipal.startswith("host/"):
+ if not hostdn in service.get('managedby', []):
+ raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
+
# Request the certificate
result = self.Backend.ra.request_certificate(csr, **kw)