diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-10-20 11:59:07 -0400 |
---|---|---|
committer | Jason Gerard DeRose <jderose@redhat.com> | 2009-10-21 03:22:44 -0600 |
commit | 453a19fcaca9c2be1e3d0e78b734bd05e7d50764 (patch) | |
tree | 76d5a8516f1d515e74da848050eae32732a64fad /ipalib | |
parent | aa2183578cb58d9f55b5f1b64c13627b88dae37c (diff) | |
download | freeipa-453a19fcaca9c2be1e3d0e78b734bd05e7d50764.tar.gz freeipa-453a19fcaca9c2be1e3d0e78b734bd05e7d50764.tar.xz freeipa-453a19fcaca9c2be1e3d0e78b734bd05e7d50764.zip |
First pass at enforcing certificates be requested from same host
We want to only allow a machine to request a certificate for itself, not for
other machines. I've added a new taksgroup which will allow this.
The requesting IP is resolved and compared to the subject of the CSR to
determine if they are the same host. The same is done with the service
principal. Subject alt names are not queried yet.
This does not yet grant machines actual permission to request certificates
yet, that is still limited to the taskgroup request_certs.
Diffstat (limited to 'ipalib')
-rw-r--r-- | ipalib/backend.py | 5 | ||||
-rw-r--r-- | ipalib/plugins/cert.py | 78 | ||||
-rw-r--r-- | ipalib/plugins/virtual.py | 37 |
3 files changed, 91 insertions, 29 deletions
diff --git a/ipalib/backend.py b/ipalib/backend.py index b123ed140..7c964b799 100644 --- a/ipalib/backend.py +++ b/ipalib/backend.py @@ -97,10 +97,15 @@ class Executioner(Backend): def create_context(self, ccache=None, client_ip=None): + """ + client_ip: The IP address of the remote client. + """ if self.env.in_server: self.Backend.ldap2.connect(ccache=ccache) else: self.Backend.xmlclient.connect() + if client_ip is not None: + setattr(context, "client_ip", client_ip) def destroy_context(self): destroy_context() diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 1681a22f6..f446b36b5 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -29,8 +29,11 @@ if api.env.enable_ra is not True: from ipalib import Command, Str, Int, Bytes, Flag from ipalib import errors from ipalib.plugins.virtual import * +from ipalib.plugins.service import split_principal import base64 from OpenSSL import crypto +from ipalib.request import context +from ipapython import dnsclient def get_serial(certificate): """ @@ -49,6 +52,22 @@ def get_serial(certificate): return serial +def get_csr_hostname(csr): + """ + Return the value of CN in the subject of the request + """ + try: + der = base64.b64decode(csr) + request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der) + sub = request.get_subject().get_components() + for s in sub: + if s[0].lower() == "cn": + return s[1] + except crypto.Error, e: + raise errors.GenericError(format='Unable to decode CSR: %s' % str(e)) + + return None + def validate_csr(ugettext, csr): """ For now just verify that it is properly base64-encoded. @@ -61,7 +80,7 @@ def validate_csr(ugettext, csr): class cert_request(VirtualCommand): """ - Submit a certificate singing request. + Submit a certificate signing request. """ takes_args = (Str('csr', validate_csr),) @@ -83,7 +102,6 @@ class cert_request(VirtualCommand): ) def execute(self, csr, **kw): - super(cert_request, self).execute() skw = {"all": True} principal = kw.get('principal') add = kw.get('add') @@ -91,6 +109,47 @@ class cert_request(VirtualCommand): del kw['add'] service = None + # Can this user request certs? + self.check_access() + + # FIXME: add support for subject alt name + # Is this cert for this principal? + subject_host = get_csr_hostname(csr) + + # Ensure that the hostname in the CSR matches the principal + (servicename, hostname, realm) = split_principal(principal) + if subject_host.lower() != hostname.lower(): + raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname)) + + # Get the IP address of the machine that submitted the request. We + # will compare this to the subjectname of the CSR. + client_ip = getattr(context, 'client_ip') + rhost = None + if client_ip not in (None, ''): + rev = client_ip.split('.') + if len(rev) == 0: + rev = client_ip.split(':') + rev.reverse() + addr = "%s.in-addr.arpa." % ".".join(rev) + else: + rev.reverse() + addr = "%s.in-addr.arpa." % ".".join(rev) + rs = dnsclient.query(addr, dnsclient.DNS_C_IN, dnsclient.DNS_T_PTR) + if len(rs) == 0: + raise errors.ACIError(info='DNS lookup on client failed for IP %s' % client_ip) + for rsn in rs: + if rsn.dns_type == dnsclient.DNS_T_PTR: + rhost = rsn + break + + if rhost is None: + raise errors.ACIError(info='DNS lookup on client failed for IP %s' % client_ip) + + client_hostname = rhost.rdata.ptrdname + if subject_host.lower() != client_hostname.lower(): + self.log.debug("IPA: hostname in subject of request '%s' does not match requesting hostname '%s'" % (subject_host, client_hostname)) + self.check_access(operation="request certificate different host") + # See if the service exists and punt if it doesn't and we aren't # going to add it try: @@ -98,6 +157,8 @@ class cert_request(VirtualCommand): if 'usercertificate' in service: # FIXME, what to do here? Do we revoke the old cert? raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate'])) + if not can_write(dn, "usercertificate"): + raise errors.ACIError(info='You need to be a member of the serviceadmin role to update services') except errors.NotFound, e: if not add: @@ -110,7 +171,10 @@ class cert_request(VirtualCommand): # either exists or we should add it. if result.get('status') == '0': if service is None: - service = api.Command['service_add'](principal, **{}) + try: + service = api.Command['service_add'](principal, **{}) + except errors.ACIError: + raise errors.ACIError(info='You need to be a member of the serviceadmin role to add services') skw = {"usercertificate": str(result.get('certificate'))} api.Command['service_mod'](principal, **skw) @@ -162,7 +226,7 @@ class cert_status(VirtualCommand): def execute(self, request_id, **kw): - super(cert_status, self).execute() + self.check_access() return self.Backend.ra.check_request_status(request_id) def output_for_cli(self, textui, result, *args, **kw): @@ -183,7 +247,7 @@ class cert_get(VirtualCommand): operation="retrieve certificate" def execute(self, serial_number): - super(cert_get, self).execute() + self.check_access() return self.Backend.ra.get_certificate(serial_number) def output_for_cli(self, textui, result, *args, **kw): @@ -215,7 +279,7 @@ class cert_revoke(VirtualCommand): def execute(self, serial_number, **kw): - super(cert_revoke, self).execute() + self.check_access() return self.Backend.ra.revoke_certificate(serial_number, **kw) def output_for_cli(self, textui, result, *args, **kw): @@ -236,7 +300,7 @@ class cert_remove_hold(VirtualCommand): operation = "certificate remove hold" def execute(self, serial_number, **kw): - super(cert_remove_hold, self).execute() + self.check_access() return self.Backend.ra.take_certificate_off_hold(serial_number) def output_for_cli(self, textui, result, *args, **kw): diff --git a/ipalib/plugins/virtual.py b/ipalib/plugins/virtual.py index d21a58f12..3ac96301e 100644 --- a/ipalib/plugins/virtual.py +++ b/ipalib/plugins/virtual.py @@ -40,34 +40,27 @@ class VirtualCommand(Command): """ operation = None - def execute(self, *args, **kw): + def check_access(self, operation=None): """ - Perform the LDAP query to determine authorization. + Perform an LDAP query to determine authorization. - This should be executed via super() before any actual work is done. + This should be executed before any actual work is done. """ - if self.operation is None: + if self.operation is None and operation is None: raise errors.ACIError(info='operation not defined') + if operation is None: + operation = self.operation + ldap = self.api.Backend.ldap2 - self.log.info("IPA: virtual verify %s" % self.operation) + self.log.info("IPA: virtual verify %s" % operation) - operationdn = "cn=%s,%s,%s" % (self.operation, self.api.env.container_virtual, self.api.env.basedn) + operationdn = "cn=%s,%s,%s" % (operation, self.api.env.container_virtual, self.api.env.basedn) - # By adding this unknown objectclass we do several things. - # DS checks ACIs before the objectclass so we can test for ACI - # errors to know if we have rights. If we do have rights then the - # update will fail anyway with a Database error because of an - # unknown objectclass, so we can catch that gracefully as well. try: - updatekw = {'objectclass': ['somerandomunknownclass']} - ldap.update(operationdn, **updatekw) - except errors.ACIError, e: - self.log.debug("%s" % str(e)) - raise errors.ACIError(info='not allowed to perform this command') - except errors.ObjectclassViolation: - return - except Exception, e: - # Something unexpected happened. Log it and deny access to be safe. - self.log.info("Virtual verify failed: %s %s" % (type(e), str(e))) - raise errors.ACIError(info='not allowed to perform this command') + if not ldap.can_write(operationdn, "objectclass"): + raise errors.ACIError(info='not allowed to perform this command') + except errors.NotFound: + raise errors.ACIError(info='No such virtual command') + + return True |