summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/cert.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipalib/plugins/cert.py')
-rw-r--r--ipalib/plugins/cert.py78
1 files changed, 71 insertions, 7 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 1681a22f6..f446b36b5 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -29,8 +29,11 @@ if api.env.enable_ra is not True:
from ipalib import Command, Str, Int, Bytes, Flag
from ipalib import errors
from ipalib.plugins.virtual import *
+from ipalib.plugins.service import split_principal
import base64
from OpenSSL import crypto
+from ipalib.request import context
+from ipapython import dnsclient
def get_serial(certificate):
"""
@@ -49,6 +52,22 @@ def get_serial(certificate):
return serial
+def get_csr_hostname(csr):
+ """
+ Return the value of CN in the subject of the request
+ """
+ try:
+ der = base64.b64decode(csr)
+ request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der)
+ sub = request.get_subject().get_components()
+ for s in sub:
+ if s[0].lower() == "cn":
+ return s[1]
+ except crypto.Error, e:
+ raise errors.GenericError(format='Unable to decode CSR: %s' % str(e))
+
+ return None
+
def validate_csr(ugettext, csr):
"""
For now just verify that it is properly base64-encoded.
@@ -61,7 +80,7 @@ def validate_csr(ugettext, csr):
class cert_request(VirtualCommand):
"""
- Submit a certificate singing request.
+ Submit a certificate signing request.
"""
takes_args = (Str('csr', validate_csr),)
@@ -83,7 +102,6 @@ class cert_request(VirtualCommand):
)
def execute(self, csr, **kw):
- super(cert_request, self).execute()
skw = {"all": True}
principal = kw.get('principal')
add = kw.get('add')
@@ -91,6 +109,47 @@ class cert_request(VirtualCommand):
del kw['add']
service = None
+ # Can this user request certs?
+ self.check_access()
+
+ # FIXME: add support for subject alt name
+ # Is this cert for this principal?
+ subject_host = get_csr_hostname(csr)
+
+ # Ensure that the hostname in the CSR matches the principal
+ (servicename, hostname, realm) = split_principal(principal)
+ if subject_host.lower() != hostname.lower():
+ raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname))
+
+ # Get the IP address of the machine that submitted the request. We
+ # will compare this to the subjectname of the CSR.
+ client_ip = getattr(context, 'client_ip')
+ rhost = None
+ if client_ip not in (None, ''):
+ rev = client_ip.split('.')
+ if len(rev) == 0:
+ rev = client_ip.split(':')
+ rev.reverse()
+ addr = "%s.in-addr.arpa." % ".".join(rev)
+ else:
+ rev.reverse()
+ addr = "%s.in-addr.arpa." % ".".join(rev)
+ rs = dnsclient.query(addr, dnsclient.DNS_C_IN, dnsclient.DNS_T_PTR)
+ if len(rs) == 0:
+ raise errors.ACIError(info='DNS lookup on client failed for IP %s' % client_ip)
+ for rsn in rs:
+ if rsn.dns_type == dnsclient.DNS_T_PTR:
+ rhost = rsn
+ break
+
+ if rhost is None:
+ raise errors.ACIError(info='DNS lookup on client failed for IP %s' % client_ip)
+
+ client_hostname = rhost.rdata.ptrdname
+ if subject_host.lower() != client_hostname.lower():
+ self.log.debug("IPA: hostname in subject of request '%s' does not match requesting hostname '%s'" % (subject_host, client_hostname))
+ self.check_access(operation="request certificate different host")
+
# See if the service exists and punt if it doesn't and we aren't
# going to add it
try:
@@ -98,6 +157,8 @@ class cert_request(VirtualCommand):
if 'usercertificate' in service:
# FIXME, what to do here? Do we revoke the old cert?
raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate']))
+ if not can_write(dn, "usercertificate"):
+ raise errors.ACIError(info='You need to be a member of the serviceadmin role to update services')
except errors.NotFound, e:
if not add:
@@ -110,7 +171,10 @@ class cert_request(VirtualCommand):
# either exists or we should add it.
if result.get('status') == '0':
if service is None:
- service = api.Command['service_add'](principal, **{})
+ try:
+ service = api.Command['service_add'](principal, **{})
+ except errors.ACIError:
+ raise errors.ACIError(info='You need to be a member of the serviceadmin role to add services')
skw = {"usercertificate": str(result.get('certificate'))}
api.Command['service_mod'](principal, **skw)
@@ -162,7 +226,7 @@ class cert_status(VirtualCommand):
def execute(self, request_id, **kw):
- super(cert_status, self).execute()
+ self.check_access()
return self.Backend.ra.check_request_status(request_id)
def output_for_cli(self, textui, result, *args, **kw):
@@ -183,7 +247,7 @@ class cert_get(VirtualCommand):
operation="retrieve certificate"
def execute(self, serial_number):
- super(cert_get, self).execute()
+ self.check_access()
return self.Backend.ra.get_certificate(serial_number)
def output_for_cli(self, textui, result, *args, **kw):
@@ -215,7 +279,7 @@ class cert_revoke(VirtualCommand):
def execute(self, serial_number, **kw):
- super(cert_revoke, self).execute()
+ self.check_access()
return self.Backend.ra.revoke_certificate(serial_number, **kw)
def output_for_cli(self, textui, result, *args, **kw):
@@ -236,7 +300,7 @@ class cert_remove_hold(VirtualCommand):
operation = "certificate remove hold"
def execute(self, serial_number, **kw):
- super(cert_remove_hold, self).execute()
+ self.check_access()
return self.Backend.ra.take_certificate_off_hold(serial_number)
def output_for_cli(self, textui, result, *args, **kw):