diff options
author | Martin Babinsky <mbabinsk@redhat.com> | 2015-06-23 13:40:30 +0200 |
---|---|---|
committer | Jan Cholasta <jcholast@redhat.com> | 2015-07-02 14:43:44 +0000 |
commit | 53b11b611766d79015e17298f2354b7688437e20 (patch) | |
tree | 7849c9900371bbb83545d83a4b3680d931a63cc9 | |
parent | 93dab56ebfa6801e4f032af4a57b7b2179ba29ff (diff) | |
download | freeipa-53b11b611766d79015e17298f2354b7688437e20.tar.gz freeipa-53b11b611766d79015e17298f2354b7688437e20.tar.xz freeipa-53b11b611766d79015e17298f2354b7688437e20.zip |
reworked certificate normalization and revocation
Validation of certificate is now handled by `x509.validate_certificate'.
Revocation of the host and service certificates was factored out to a separate
function.
Part of http://www.freeipa.org/page/V4/User_Certificates
Reviewed-By: Jan Cholasta <jcholast@redhat.com>
-rw-r--r-- | ipalib/plugins/host.py | 75 | ||||
-rw-r--r-- | ipalib/plugins/service.py | 112 | ||||
-rw-r--r-- | ipalib/x509.py | 14 |
3 files changed, 55 insertions, 146 deletions
diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py index e81dca94e..f5871f81e 100644 --- a/ipalib/plugins/host.py +++ b/ipalib/plugins/host.py @@ -786,30 +786,8 @@ class host_del(LDAPDelete): entry_attrs = ldap.get_entry(dn, ['usercertificate']) except errors.NotFound: self.obj.handle_not_found(*keys) - for cert in entry_attrs.get('usercertificate', []): - cert = x509.normalize_certificate(cert) - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - try: - result = api.Command['cert_show'](serial)['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke'](serial, - revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except NSPRError, nsprerr: - if nsprerr.errno == -8183: - # If we can't decode the cert them proceed with - # removing the host. - self.log.info("Problem decoding certificate %s" % - nsprerr.args[1]) - else: - raise nsprerr + + revoke_certs(entry_attrs.get('usercertificate', []), self.log) return dn @@ -879,29 +857,8 @@ class host_mod(LDAPUpdate): old_certs = entry_attrs_old.get('usercertificate', []) old_certs_der = map(x509.normalize_certificate, old_certs) removed_certs_der = set(old_certs_der) - set(certs_der) - for cert in removed_certs_der: - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - try: - result = api.Command['cert_show'](serial)['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke']( - serial, revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except NSPRError, nsprerr: - if nsprerr.errno == -8183: - # If we can't decode the cert them proceed with - # modifying the host. - self.log.info("Problem decoding certificate %s" % - nsprerr.args[1]) - else: - raise nsprerr + revoke_certs(removed_certs_der, self.log) + if certs: entry_attrs['usercertificate'] = certs_der @@ -1163,31 +1120,9 @@ class host_disable(LDAPQuery): self.obj.handle_not_found(*keys) if self.api.Command.ca_is_enabled()['result']: certs = entry_attrs.get('usercertificate', []) - for cert in map(x509.normalize_certificate, certs): - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - try: - result = api.Command['cert_show'](serial)['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke'](serial, - revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except NSPRError, nsprerr: - if nsprerr.errno == -8183: - # If we can't decode the cert them proceed with - # disabling the host. - self.log.info("Problem decoding certificate %s" % - nsprerr.args[1]) - else: - raise nsprerr if certs: + revoke_certs(certs, self.log) # Remove the usercertificate altogether entry_attrs['usercertificate'] = None ldap.update_entry(entry_attrs) diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 166d978a2..18d7b3e54 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -238,16 +238,42 @@ def normalize_principal(principal): def validate_certificate(ugettext, cert): """ - For now just verify that it is properly base64-encoded. + Check whether the certificate is properly encoded to DER """ - if cert and util.isvalid_base64(cert): + if api.env.in_server: + x509.validate_certificate(cert, datatype=x509.DER) + + +def revoke_certs(certs, logger=None): + """ + revoke the certificates removed from host/service entry + """ + for cert in certs: try: - base64.b64decode(cert) - except Exception, e: - raise errors.Base64DecodeError(reason=str(e)) - else: - # We'll assume this is DER data - pass + cert = x509.normalize_certificate(cert) + except errors.CertificateFormatError as e: + if logger is not None: + logger.info("Problem decoding certificate: %s" % e) + + serial = unicode(x509.get_serial_number(cert, x509.DER)) + + try: + result = api.Command['cert_show'](unicode(serial))['result'] + except errors.CertificateOperationError: + continue + if 'revocation_reason' in result: + continue + if x509.normalize_certificate(result['certificate']) != cert: + continue + + try: + api.Command['cert_revoke'](unicode(serial), + revocation_reason=4) + except errors.NotImplementedError: + # some CA's might not implement revoke + pass + + def set_certificate_attrs(entry_attrs): """ @@ -566,27 +592,8 @@ class service_del(LDAPDelete): entry_attrs = ldap.get_entry(dn, ['usercertificate']) except errors.NotFound: self.obj.handle_not_found(*keys) - for cert in entry_attrs.get('usercertificate', []): - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - try: - result = api.Command['cert_show'](unicode(serial))['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke'](unicode(serial), revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except NSPRError, nsprerr: - if nsprerr.errno == -8183: - # If we can't decode the cert them proceed with - # removing the service. - self.log.info("Problem decoding certificate %s" % nsprerr.args[1]) - else: - raise nsprerr + revoke_certs(entry_attrs.get('usercertificate', []), self.log) + return dn @@ -622,29 +629,8 @@ class service_mod(LDAPUpdate): old_certs = entry_attrs_old.get('usercertificate', []) old_certs_der = map(x509.normalize_certificate, old_certs) removed_certs_der = set(old_certs_der) - set(certs_der) - for cert in removed_certs_der: - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - try: - result = api.Command['cert_show'](serial)['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke']( - serial, revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except NSPRError, nsprerr: - if nsprerr.errno == -8183: - # If we can't decode the cert them proceed with - # modifying the host. - self.log.info("Problem decoding certificate %s" % - nsprerr.args[1]) - else: - raise nsprerr + revoke_certs(removed_certs_der, self.log) + if certs: entry_attrs['usercertificate'] = certs_der @@ -854,29 +840,9 @@ class service_disable(LDAPQuery): if self.api.Command.ca_is_enabled()['result']: certs = entry_attrs.get('usercertificate', []) - for cert in map(x509.normalize_certificate, certs): - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - try: - result = api.Command['cert_show'](unicode(serial))['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke'](unicode(serial), revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except NSPRError, nsprerr: - if nsprerr.errno == -8183: - # If we can't decode the cert them proceed with - # disabling the service - self.log.info("Problem decoding certificate %s" % nsprerr.args[1]) - else: - raise nsprerr if len(certs) > 0: + revoke_certs(certs, self.log) # Remove the usercertificate altogether entry_attrs['usercertificate'] = None ldap.update_entry(entry_attrs) diff --git a/ipalib/x509.py b/ipalib/x509.py index a87dbf413..edd73ebdc 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -294,16 +294,24 @@ def normalize_certificate(rawcert): # was base64-encoded and now its not or it came in as DER format. # Let's decode it and see. Fetching the serial number will pass the # certificate through the NSS DER parser. + validate_certificate(dercert, datatype=DER) + + return dercert + + +def validate_certificate(cert, datatype=PEM, dbdir=None): + """ + Perform certificate validation by trying to load it into NSS database + """ try: - serial = unicode(get_serial_number(dercert, DER)) - except NSPRError, nsprerr: + load_certificate(cert, datatype=datatype, dbdir=dbdir) + except NSPRError as nsprerr: if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER raise errors.CertificateFormatError( error=_('improperly formatted DER-encoded certificate')) else: raise errors.CertificateFormatError(error=str(nsprerr)) - return dercert def write_certificate(rawcert, filename): """ |