summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/cert.py
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2009-10-20 11:59:07 -0400
committerJason Gerard DeRose <jderose@redhat.com>2009-10-21 03:22:44 -0600
commit453a19fcaca9c2be1e3d0e78b734bd05e7d50764 (patch)
tree76d5a8516f1d515e74da848050eae32732a64fad /ipalib/plugins/cert.py
parentaa2183578cb58d9f55b5f1b64c13627b88dae37c (diff)
downloadfreeipa-453a19fcaca9c2be1e3d0e78b734bd05e7d50764.tar.gz
freeipa-453a19fcaca9c2be1e3d0e78b734bd05e7d50764.tar.xz
freeipa-453a19fcaca9c2be1e3d0e78b734bd05e7d50764.zip
First pass at enforcing certificates be requested from same host
We want to only allow a machine to request a certificate for itself, not for other machines. I've added a new taksgroup which will allow this. The requesting IP is resolved and compared to the subject of the CSR to determine if they are the same host. The same is done with the service principal. Subject alt names are not queried yet. This does not yet grant machines actual permission to request certificates yet, that is still limited to the taskgroup request_certs.
Diffstat (limited to 'ipalib/plugins/cert.py')
-rw-r--r--ipalib/plugins/cert.py78
1 files changed, 71 insertions, 7 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 1681a22f6..f446b36b5 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -29,8 +29,11 @@ if api.env.enable_ra is not True:
from ipalib import Command, Str, Int, Bytes, Flag
from ipalib import errors
from ipalib.plugins.virtual import *
+from ipalib.plugins.service import split_principal
import base64
from OpenSSL import crypto
+from ipalib.request import context
+from ipapython import dnsclient
def get_serial(certificate):
"""
@@ -49,6 +52,22 @@ def get_serial(certificate):
return serial
+def get_csr_hostname(csr):
+ """
+ Return the value of CN in the subject of the request
+ """
+ try:
+ der = base64.b64decode(csr)
+ request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der)
+ sub = request.get_subject().get_components()
+ for s in sub:
+ if s[0].lower() == "cn":
+ return s[1]
+ except crypto.Error, e:
+ raise errors.GenericError(format='Unable to decode CSR: %s' % str(e))
+
+ return None
+
def validate_csr(ugettext, csr):
"""
For now just verify that it is properly base64-encoded.
@@ -61,7 +80,7 @@ def validate_csr(ugettext, csr):
class cert_request(VirtualCommand):
"""
- Submit a certificate singing request.
+ Submit a certificate signing request.
"""
takes_args = (Str('csr', validate_csr),)
@@ -83,7 +102,6 @@ class cert_request(VirtualCommand):
)
def execute(self, csr, **kw):
- super(cert_request, self).execute()
skw = {"all": True}
principal = kw.get('principal')
add = kw.get('add')
@@ -91,6 +109,47 @@ class cert_request(VirtualCommand):
del kw['add']
service = None
+ # Can this user request certs?
+ self.check_access()
+
+ # FIXME: add support for subject alt name
+ # Is this cert for this principal?
+ subject_host = get_csr_hostname(csr)
+
+ # Ensure that the hostname in the CSR matches the principal
+ (servicename, hostname, realm) = split_principal(principal)
+ if subject_host.lower() != hostname.lower():
+ raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname))
+
+ # Get the IP address of the machine that submitted the request. We
+ # will compare this to the subjectname of the CSR.
+ client_ip = getattr(context, 'client_ip')
+ rhost = None
+ if client_ip not in (None, ''):
+ rev = client_ip.split('.')
+ if len(rev) == 0:
+ rev = client_ip.split(':')
+ rev.reverse()
+ addr = "%s.in-addr.arpa." % ".".join(rev)
+ else:
+ rev.reverse()
+ addr = "%s.in-addr.arpa." % ".".join(rev)
+ rs = dnsclient.query(addr, dnsclient.DNS_C_IN, dnsclient.DNS_T_PTR)
+ if len(rs) == 0:
+ raise errors.ACIError(info='DNS lookup on client failed for IP %s' % client_ip)
+ for rsn in rs:
+ if rsn.dns_type == dnsclient.DNS_T_PTR:
+ rhost = rsn
+ break
+
+ if rhost is None:
+ raise errors.ACIError(info='DNS lookup on client failed for IP %s' % client_ip)
+
+ client_hostname = rhost.rdata.ptrdname
+ if subject_host.lower() != client_hostname.lower():
+ self.log.debug("IPA: hostname in subject of request '%s' does not match requesting hostname '%s'" % (subject_host, client_hostname))
+ self.check_access(operation="request certificate different host")
+
# See if the service exists and punt if it doesn't and we aren't
# going to add it
try:
@@ -98,6 +157,8 @@ class cert_request(VirtualCommand):
if 'usercertificate' in service:
# FIXME, what to do here? Do we revoke the old cert?
raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate']))
+ if not can_write(dn, "usercertificate"):
+ raise errors.ACIError(info='You need to be a member of the serviceadmin role to update services')
except errors.NotFound, e:
if not add:
@@ -110,7 +171,10 @@ class cert_request(VirtualCommand):
# either exists or we should add it.
if result.get('status') == '0':
if service is None:
- service = api.Command['service_add'](principal, **{})
+ try:
+ service = api.Command['service_add'](principal, **{})
+ except errors.ACIError:
+ raise errors.ACIError(info='You need to be a member of the serviceadmin role to add services')
skw = {"usercertificate": str(result.get('certificate'))}
api.Command['service_mod'](principal, **skw)
@@ -162,7 +226,7 @@ class cert_status(VirtualCommand):
def execute(self, request_id, **kw):
- super(cert_status, self).execute()
+ self.check_access()
return self.Backend.ra.check_request_status(request_id)
def output_for_cli(self, textui, result, *args, **kw):
@@ -183,7 +247,7 @@ class cert_get(VirtualCommand):
operation="retrieve certificate"
def execute(self, serial_number):
- super(cert_get, self).execute()
+ self.check_access()
return self.Backend.ra.get_certificate(serial_number)
def output_for_cli(self, textui, result, *args, **kw):
@@ -215,7 +279,7 @@ class cert_revoke(VirtualCommand):
def execute(self, serial_number, **kw):
- super(cert_revoke, self).execute()
+ self.check_access()
return self.Backend.ra.revoke_certificate(serial_number, **kw)
def output_for_cli(self, textui, result, *args, **kw):
@@ -236,7 +300,7 @@ class cert_remove_hold(VirtualCommand):
operation = "certificate remove hold"
def execute(self, serial_number, **kw):
- super(cert_remove_hold, self).execute()
+ self.check_access()
return self.Backend.ra.take_certificate_off_hold(serial_number)
def output_for_cli(self, textui, result, *args, **kw):