From 7f7c247bb5a4b0030d531f4f14c156162e808212 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale Date: Wed, 27 May 2015 08:02:08 -0400 Subject: Support multiple host and service certificates Update the framework to support multiple host and service certificates. host-mod and service-mod revoke existing certificates that are not included in the modified entry. Using addattr=certificate=... will result in no certificates being revoked. The existing behaviour of host-disable, host-del, service-disable and service-del (revoke existing certificate) is preserved but now applies to all certificates in the host or service entry. Also update host-show and service-show to write all the principal's certificates to the file given by the ``--out=FILE`` option. Part of: http://www.freeipa.org/page/V4/User_Certificates https://fedorahosted.org/freeipa/ticket/4238 Reviewed-By: Martin Basti --- ipalib/plugins/host.py | 107 +++++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 49 deletions(-) (limited to 'ipalib/plugins/host.py') diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py index c47439743..9ad087e26 100644 --- a/ipalib/plugins/host.py +++ b/ipalib/plugins/host.py @@ -493,7 +493,7 @@ class host(LDAPObject): label=_('Random password'), flags=('no_create', 'no_update', 'no_search', 'virtual_attribute'), ), - Bytes('usercertificate?', validate_certificate, + Bytes('usercertificate*', validate_certificate, cli_name='certificate', label=_('Certificate'), doc=_('Base-64 encoded server certificate'), @@ -640,11 +640,11 @@ class host_add(LDAPCreate): entry_attrs['userpassword'] = ipa_generate_password(characters=host_pwd_chars) # save the password so it can be displayed in post_callback setattr(context, 'randompassword', entry_attrs['userpassword']) - cert = options.get('usercertificate') - if cert: - cert = x509.normalize_certificate(cert) + certs = options.get('usercertificate', []) + certs_der = map(x509.normalize_certificate, certs) + for cert in certs_der: x509.verify_cert_subject(ldap, keys[-1], cert) - entry_attrs['usercertificate'] = cert + entry_attrs['usercertificate'] = certs_der entry_attrs['managedby'] = dn entry_attrs['objectclass'].append('ieee802device') entry_attrs['objectclass'].append('ipasshhost') @@ -786,8 +786,7 @@ class host_del(LDAPDelete): entry_attrs = ldap.get_entry(dn, ['usercertificate']) except errors.NotFound: self.obj.handle_not_found(*keys) - cert = entry_attrs.single_value.get('usercertificate') - if cert: + for cert in entry_attrs.get('usercertificate', []): cert = x509.normalize_certificate(cert) try: serial = unicode(x509.get_serial_number(cert, x509.DER)) @@ -864,39 +863,43 @@ class host_mod(LDAPUpdate): if 'krbprincipalaux' not in obj_classes: obj_classes.append('krbprincipalaux') entry_attrs['objectclass'] = obj_classes - cert = x509.normalize_certificate(entry_attrs.get('usercertificate')) - if cert: - if self.api.Command.ca_is_enabled()['result']: - x509.verify_cert_subject(ldap, keys[-1], cert) - entry_attrs_old = ldap.get_entry(dn, ['usercertificate']) - oldcert = entry_attrs_old.single_value.get('usercertificate') - if oldcert: - oldcert = x509.normalize_certificate(oldcert) - try: - serial = x509.get_serial_number(oldcert, x509.DER) - serial = unicode(serial) - try: - result = api.Command['cert_show'](serial)['result'] - if 'revocation_reason' not in result: - try: - api.Command['cert_revoke']( - serial, revocation_reason=4) - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except errors.NotImplementedError: - # some CA's might not implement revoke - pass - except NSPRError, nsprerr: - if nsprerr.errno == -8183: - # If we can't decode the cert them proceed with - # modifying the host. - self.log.info("Problem decoding certificate %s" % - nsprerr.args[1]) - else: - raise nsprerr - entry_attrs['usercertificate'] = cert + # verify certificates + certs = entry_attrs.get('usercertificate') or [] + certs_der = map(x509.normalize_certificate, certs) + for cert in certs_der: + x509.verify_cert_subject(ldap, keys[-1], cert) + + # revoke removed certificates + if self.api.Command.ca_is_enabled()['result']: + entry_attrs_old = ldap.get_entry(dn, ['usercertificate']) + old_certs = entry_attrs_old.get('usercertificate', []) + old_certs_der = map(x509.normalize_certificate, old_certs) + removed_certs_der = set(old_certs_der) - set(certs_der) + for cert in removed_certs_der: + try: + serial = unicode(x509.get_serial_number(cert, x509.DER)) + try: + result = api.Command['cert_show'](serial)['result'] + if 'revocation_reason' not in result: + try: + api.Command['cert_revoke']( + serial, revocation_reason=4) + except errors.NotImplementedError: + # some CA's might not implement revoke + pass + except errors.NotImplementedError: + # some CA's might not implement revoke + pass + except NSPRError, nsprerr: + if nsprerr.errno == -8183: + # If we can't decode the cert them proceed with + # modifying the host. + self.log.info("Problem decoding certificate %s" % + nsprerr.args[1]) + else: + raise nsprerr + entry_attrs['usercertificate'] = certs_der if options.get('random'): entry_attrs['userpassword'] = ipa_generate_password(characters=host_pwd_chars) @@ -1093,8 +1096,14 @@ class host_show(LDAPRetrieve): util.check_writable_file(options['out']) result = super(host_show, self).forward(*keys, **options) if 'usercertificate' in result['result']: - x509.write_certificate(result['result']['usercertificate'][0], options['out']) - result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out']) + x509.write_certificate_list( + result['result']['usercertificate'], + options['out'] + ) + result['summary'] = ( + _('Certificate(s) stored in file \'%(file)s\'') + % dict(file=options['out']) + ) return result else: raise errors.NoCertificateError(entry=keys[-1]) @@ -1148,10 +1157,9 @@ class host_disable(LDAPQuery): entry_attrs = ldap.get_entry(dn, ['usercertificate']) except errors.NotFound: self.obj.handle_not_found(*keys) - cert = entry_attrs.single_value.get('usercertificate') - if cert: - if self.api.Command.ca_is_enabled()['result']: - cert = x509.normalize_certificate(cert) + if self.api.Command.ca_is_enabled()['result']: + certs = entry_attrs.get('usercertificate', []) + for cert in map(x509.normalize_certificate, certs): try: serial = unicode(x509.get_serial_number(cert, x509.DER)) try: @@ -1175,10 +1183,11 @@ class host_disable(LDAPQuery): else: raise nsprerr - # Remove the usercertificate altogether - entry_attrs['usercertificate'] = None - ldap.update_entry(entry_attrs) - done_work = True + if certs: + # Remove the usercertificate altogether + entry_attrs['usercertificate'] = None + ldap.update_entry(entry_attrs) + done_work = True self.obj.get_password_attributes(ldap, dn, entry_attrs) if entry_attrs['has_keytab']: -- cgit