From 608851d3f86a9082b394c30fe0c7a7b33d43f363 Mon Sep 17 00:00:00 2001 From: Jan Cholasta Date: Mon, 13 Oct 2014 14:30:15 +0200 Subject: Check LDAP instead of local configuration to see if IPA CA is enabled The check is done using a new hidden command ca_is_enabled. https://fedorahosted.org/freeipa/ticket/4621 Reviewed-By: David Kupka --- ipalib/plugins/cert.py | 38 ++++++++++++++++++++++++++++++++++---- ipalib/plugins/host.py | 6 +++--- ipalib/plugins/service.py | 4 ++-- 3 files changed, 39 insertions(+), 9 deletions(-) (limited to 'ipalib/plugins') diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 679ac14a6..7e2c77622 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -19,13 +19,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from ipalib import api, SkipPluginModule -if api.env.enable_ra is not True: - # In this case, abort loading this plugin module... - raise SkipPluginModule(reason='env.enable_ra is not True') import os import time from ipalib import Command, Str, Int, Bytes, Flag, File +from ipalib import api from ipalib import errors from ipalib import pkcs10 from ipalib import x509 @@ -33,6 +30,7 @@ from ipalib import util from ipalib import ngettext from ipalib.plugable import Registry from ipalib.plugins.virtual import * +from ipalib.plugins.baseldap import pkey_to_value from ipalib.plugins.service import split_principal import base64 import traceback @@ -214,6 +212,10 @@ def get_host_from_principal(principal): return hostname +def ca_enabled_check(): + if not api.Command.ca_is_enabled()['result']: + raise errors.NotFound(reason=_('CA is not configured')) + @register() class cert_request(VirtualCommand): __doc__ = _('Submit a certificate signing request.') @@ -289,6 +291,8 @@ class cert_request(VirtualCommand): } def execute(self, csr, **kw): + ca_enabled_check() + ldap = self.api.Backend.ldap2 principal = kw.get('principal') add = kw.get('add') @@ -475,6 +479,7 @@ class cert_status(VirtualCommand): def execute(self, request_id, **kw): + ca_enabled_check() self.check_access() return dict( result=self.Backend.ra.check_request_status(request_id) @@ -536,6 +541,7 @@ class cert_show(VirtualCommand): operation="retrieve certificate" def execute(self, serial_number, **options): + ca_enabled_check() hostname = None try: self.check_access() @@ -603,6 +609,7 @@ class cert_revoke(VirtualCommand): ) def execute(self, serial_number, **kw): + ca_enabled_check() hostname = None try: self.check_access() @@ -641,6 +648,7 @@ class cert_remove_hold(VirtualCommand): operation = "certificate remove hold" def execute(self, serial_number, **kw): + ca_enabled_check() self.check_access() return dict( result=self.Backend.ra.take_certificate_off_hold(serial_number) @@ -740,6 +748,7 @@ class cert_find(Command): ) def execute(self, **options): + ca_enabled_check() ret = dict( result=self.Backend.ra.find(options) ) @@ -747,3 +756,24 @@ class cert_find(Command): ret['truncated'] = False return ret + +@register() +class ca_is_enabled(Command): + """ + Checks if any of the servers has the CA service enabled. + """ + NO_CLI = True + has_output = output.standard_value + + def execute(self, *args, **options): + base_dn = DN(('cn', 'masters'), ('cn', 'ipa'), ('cn', 'etc'), + self.api.env.basedn) + filter = '(&(objectClass=ipaConfigObject)(cn=CA))' + try: + self.api.Backend.ldap2.find_entries( + base_dn=base_dn, filter=filter, attrs_list=[]) + except errors.NotFound: + result = False + else: + result = True + return dict(result=result, value=pkey_to_value(None, options)) diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py index bbee09395..91fb75b87 100644 --- a/ipalib/plugins/host.py +++ b/ipalib/plugins/host.py @@ -721,7 +721,7 @@ class host_del(LDAPDelete): **delkw) break - if self.api.env.enable_ra: + if self.api.Command.ca_is_enabled()['result']: try: entry_attrs = ldap.get_entry(dn, ['usercertificate']) except errors.NotFound: @@ -806,7 +806,7 @@ class host_mod(LDAPUpdate): entry_attrs['objectclass'] = obj_classes cert = x509.normalize_certificate(entry_attrs.get('usercertificate')) if cert: - if self.api.env.enable_ra: + if self.api.Command.ca_is_enabled()['result']: x509.verify_cert_subject(ldap, keys[-1], cert) entry_attrs_old = ldap.get_entry(dn, ['usercertificate']) oldcert = entry_attrs_old.single_value.get('usercertificate') @@ -1084,7 +1084,7 @@ class host_disable(LDAPQuery): self.obj.handle_not_found(*keys) cert = entry_attrs.single_value.get('usercertificate') if cert: - if self.api.env.enable_ra: + if self.api.Command.ca_is_enabled()['result']: cert = x509.normalize_certificate(cert) try: serial = unicode(x509.get_serial_number(cert, x509.DER)) diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 3ca5066f3..55f412625 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -486,7 +486,7 @@ class service_del(LDAPDelete): # custom services allow them to manage them. (service, hostname, realm) = split_principal(keys[-1]) check_required_principal(ldap, hostname, service) - if self.api.env.enable_ra: + if self.api.Command.ca_is_enabled()['result']: try: entry_attrs = ldap.get_entry(dn, ['usercertificate']) except errors.NotFound: @@ -676,7 +676,7 @@ class service_disable(LDAPQuery): done_work = False if 'usercertificate' in entry_attrs: - if self.api.env.enable_ra: + if self.api.Command.ca_is_enabled()['result']: cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0]) try: serial = unicode(x509.get_serial_number(cert, x509.DER)) -- cgit