summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'ipalib/plugins')
-rw-r--r--ipalib/plugins/host.py75
-rw-r--r--ipalib/plugins/service.py112
2 files changed, 44 insertions, 143 deletions
diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py
index e81dca94e..f5871f81e 100644
--- a/ipalib/plugins/host.py
+++ b/ipalib/plugins/host.py
@@ -786,30 +786,8 @@ class host_del(LDAPDelete):
entry_attrs = ldap.get_entry(dn, ['usercertificate'])
except errors.NotFound:
self.obj.handle_not_found(*keys)
- for cert in entry_attrs.get('usercertificate', []):
- cert = x509.normalize_certificate(cert)
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- try:
- result = api.Command['cert_show'](serial)['result']
- if 'revocation_reason' not in result:
- try:
- api.Command['cert_revoke'](serial,
- revocation_reason=4)
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183:
- # If we can't decode the cert them proceed with
- # removing the host.
- self.log.info("Problem decoding certificate %s" %
- nsprerr.args[1])
- else:
- raise nsprerr
+
+ revoke_certs(entry_attrs.get('usercertificate', []), self.log)
return dn
@@ -879,29 +857,8 @@ class host_mod(LDAPUpdate):
old_certs = entry_attrs_old.get('usercertificate', [])
old_certs_der = map(x509.normalize_certificate, old_certs)
removed_certs_der = set(old_certs_der) - set(certs_der)
- for cert in removed_certs_der:
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- try:
- result = api.Command['cert_show'](serial)['result']
- if 'revocation_reason' not in result:
- try:
- api.Command['cert_revoke'](
- serial, revocation_reason=4)
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183:
- # If we can't decode the cert them proceed with
- # modifying the host.
- self.log.info("Problem decoding certificate %s" %
- nsprerr.args[1])
- else:
- raise nsprerr
+ revoke_certs(removed_certs_der, self.log)
+
if certs:
entry_attrs['usercertificate'] = certs_der
@@ -1163,31 +1120,9 @@ class host_disable(LDAPQuery):
self.obj.handle_not_found(*keys)
if self.api.Command.ca_is_enabled()['result']:
certs = entry_attrs.get('usercertificate', [])
- for cert in map(x509.normalize_certificate, certs):
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- try:
- result = api.Command['cert_show'](serial)['result']
- if 'revocation_reason' not in result:
- try:
- api.Command['cert_revoke'](serial,
- revocation_reason=4)
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183:
- # If we can't decode the cert them proceed with
- # disabling the host.
- self.log.info("Problem decoding certificate %s" %
- nsprerr.args[1])
- else:
- raise nsprerr
if certs:
+ revoke_certs(certs, self.log)
# Remove the usercertificate altogether
entry_attrs['usercertificate'] = None
ldap.update_entry(entry_attrs)
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 166d978a2..18d7b3e54 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -238,16 +238,42 @@ def normalize_principal(principal):
def validate_certificate(ugettext, cert):
"""
- For now just verify that it is properly base64-encoded.
+ Check whether the certificate is properly encoded to DER
"""
- if cert and util.isvalid_base64(cert):
+ if api.env.in_server:
+ x509.validate_certificate(cert, datatype=x509.DER)
+
+
+def revoke_certs(certs, logger=None):
+ """
+ revoke the certificates removed from host/service entry
+ """
+ for cert in certs:
try:
- base64.b64decode(cert)
- except Exception, e:
- raise errors.Base64DecodeError(reason=str(e))
- else:
- # We'll assume this is DER data
- pass
+ cert = x509.normalize_certificate(cert)
+ except errors.CertificateFormatError as e:
+ if logger is not None:
+ logger.info("Problem decoding certificate: %s" % e)
+
+ serial = unicode(x509.get_serial_number(cert, x509.DER))
+
+ try:
+ result = api.Command['cert_show'](unicode(serial))['result']
+ except errors.CertificateOperationError:
+ continue
+ if 'revocation_reason' in result:
+ continue
+ if x509.normalize_certificate(result['certificate']) != cert:
+ continue
+
+ try:
+ api.Command['cert_revoke'](unicode(serial),
+ revocation_reason=4)
+ except errors.NotImplementedError:
+ # some CA's might not implement revoke
+ pass
+
+
def set_certificate_attrs(entry_attrs):
"""
@@ -566,27 +592,8 @@ class service_del(LDAPDelete):
entry_attrs = ldap.get_entry(dn, ['usercertificate'])
except errors.NotFound:
self.obj.handle_not_found(*keys)
- for cert in entry_attrs.get('usercertificate', []):
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- try:
- result = api.Command['cert_show'](unicode(serial))['result']
- if 'revocation_reason' not in result:
- try:
- api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183:
- # If we can't decode the cert them proceed with
- # removing the service.
- self.log.info("Problem decoding certificate %s" % nsprerr.args[1])
- else:
- raise nsprerr
+ revoke_certs(entry_attrs.get('usercertificate', []), self.log)
+
return dn
@@ -622,29 +629,8 @@ class service_mod(LDAPUpdate):
old_certs = entry_attrs_old.get('usercertificate', [])
old_certs_der = map(x509.normalize_certificate, old_certs)
removed_certs_der = set(old_certs_der) - set(certs_der)
- for cert in removed_certs_der:
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- try:
- result = api.Command['cert_show'](serial)['result']
- if 'revocation_reason' not in result:
- try:
- api.Command['cert_revoke'](
- serial, revocation_reason=4)
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183:
- # If we can't decode the cert them proceed with
- # modifying the host.
- self.log.info("Problem decoding certificate %s" %
- nsprerr.args[1])
- else:
- raise nsprerr
+ revoke_certs(removed_certs_der, self.log)
+
if certs:
entry_attrs['usercertificate'] = certs_der
@@ -854,29 +840,9 @@ class service_disable(LDAPQuery):
if self.api.Command.ca_is_enabled()['result']:
certs = entry_attrs.get('usercertificate', [])
- for cert in map(x509.normalize_certificate, certs):
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- try:
- result = api.Command['cert_show'](unicode(serial))['result']
- if 'revocation_reason' not in result:
- try:
- api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except errors.NotImplementedError:
- # some CA's might not implement revoke
- pass
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183:
- # If we can't decode the cert them proceed with
- # disabling the service
- self.log.info("Problem decoding certificate %s" % nsprerr.args[1])
- else:
- raise nsprerr
if len(certs) > 0:
+ revoke_certs(certs, self.log)
# Remove the usercertificate altogether
entry_attrs['usercertificate'] = None
ldap.update_entry(entry_attrs)