From be82f941d038bda2430eedaad73102684557dada Mon Sep 17 00:00:00 2001 From: Pavel Zuna Date: Tue, 15 Sep 2009 13:04:09 +0200 Subject: Make the service plugin use baseldap classes. --- ipalib/plugins/service.py | 271 +++++++++++----------------------------------- 1 file changed, 66 insertions(+), 205 deletions(-) (limited to 'ipalib/plugins') diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 70812579d..782948a5a 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -25,24 +25,22 @@ import base64 from OpenSSL import crypto -from ipalib import api, crud, errors -from ipalib import Object +from ipalib import api, errors from ipalib import Str, Flag, Bytes -from ipalib import uuid +from ipalib.plugins.baseldap import * -_container_dn = api.env.container_service -_default_attributes = ['krbprincipalname', 'usercertificate'] def get_serial(certificate): """ - Given a certificate, return the serial number in that cert + Given a certificate, return the serial number in that cert. """ try: x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate) serial = str(x509.get_serial_number()) except crypto.Error: - raise errors.GenericError(format='Unable to decode certificate in entry') - + raise errors.GenericError( + format='Unable to decode certificate in entry' + ) return serial def split_principal(principal): @@ -94,60 +92,63 @@ def validate_certificate(ugettext, cert): raise errors.Base64DecodeError(reason=str(e)) -class service(Object): +class service(LDAPObject): """ Service object. """ + container_dn = api.env.container_service + object_name = 'service' + object_name_plural = 'services' + object_class = [ + 'krbprincipal', 'krbprincipalaux', 'krbticketpolicyaux', 'ipaobject', + 'ipaservice', 'pkiuser' + ] + default_attributes = ['krbprincipalname', 'usercertificate'] + uuid_attribute = 'ipauniqueid' + attribute_names = { + 'krbprincipalname': 'kerberos principal', + 'usercertificate': 'user certificate', + 'ipauniqueid': 'unique identifier', + } + takes_params = ( Str('krbprincipalname', validate_principal, cli_name='principal', - doc='Service principal', + doc='service principal', primary_key=True, normalizer=lambda value: normalize_principal(value), ), Bytes('usercertificate?', validate_certificate, cli_name='certificate', - doc='Base-64 encoded server certificate', + doc='base-64 encoded server certificate', ), ) api.register(service) -class service_add(crud.Create): +class service_add(LDAPCreate): """ Add new service. """ takes_options = ( Flag('force', - doc='Force principal name even if not in DNS', + doc='force principal name even if not in DNS', ), ) - def execute(self, principal, **kw): - """ - Execute the service-add operation. - - The dn should not be passed as a keyword argument as it is constructed - by this method. - - Returns the entry as it will be created in LDAP. - - :param principal: The service to be added in the form: service/hostname - :param kw: Keyword arguments for the other LDAP attributes. - """ - assert 'krbprincipalname' not in kw - ldap = self.api.Backend.ldap2 - # FIXME: should be in a normalizer. Need to fix normalizers to work - # on non-unicode data - if kw.get('usercertificate'): - kw['usercertificate'] = base64.b64decode(kw['usercertificate']) - - (service, hostname, realm) = split_principal(principal) - - if service.lower() == 'host' and not kw['force']: + def pre_callback(self, ldap, dn, entry_attrs, *keys, **options): + (service, hostname, realm) = split_principal(keys[-1]) + if service.lower() == 'host' and not options['force']: raise errors.HostService() - # FIXME: once DNS client is done + cert = entry_attrs.get('usercertificate') + if cert: + # FIXME: should be in a normalizer: need to fix normalizers + # to work on non-unicode data + entry_attrs['usercertificate'] = base64.b64decode(cert) + # FIXME: shouldn't we request signing at this point? + + # TODO: once DNS client is done (code below for reference only!) # if not kw['force']: # fqdn = hostname + '.' # rs = dnsclient.query(fqdn, dnsclient.DNS_C_IN, dnsclient.DNS_T_A) @@ -161,115 +162,53 @@ class service_add(crud.Create): # 'IPA: found %d records for '%s'" % (len(rs), hostname) # ) - entry_attrs = self.args_options_2_entry(principal, **kw) - entry_attrs['objectclass'] = [ - 'krbprincipal', 'krbprincipalaux', 'krbticketpolicyaux', - 'ipaobject', 'ipaservice', 'pkiuser' - ] - entry_attrs['ipauniqueid'] = str(uuid.uuid1()) - dn = ldap.make_dn(entry_attrs, 'krbprincipalname', _container_dn) - - ldap.add_entry(dn, entry_attrs) - - return ldap.get_entry(dn, entry_attrs.keys()) - - def output_for_cli(self, textui, result, principal, **options): - (dn, entry_attrs) = result - - textui.print_name(self.name) - textui.print_attribute('dn', dn) - textui.print_entry(entry_attrs) - textui.print_dashed('Created service "%s".' % principal) + return dn api.register(service_add) -class service_del(crud.Delete): +class service_del(LDAPDelete): """ Delete an existing service. """ - def execute(self, principal, **kw): - """ - Delete a service principal. - - principal is the krbprincipalname of the entry to delete. - - This should be called with much care. - - :param principal: The service to be added in the form: service/hostname - :param kw: not used - """ - ldap = self.api.Backend.ldap2 - - (dn, entry_attrs) = ldap.find_entry_by_attr( - 'krbprincipalname', principal, 'ipaservice' - ) - - if 'usercertificate' in entry_attrs: - serial = get_serial(entry_attrs['usercertificate']) - api.Command['cert_revoke'](unicode(serial), revocation_reason=5) - - ldap.delete_entry(dn) - - return True - - def output_for_cli(self, textui, result, principal, **options): - textui.print_name(self.name) - textui.print_dashed('Deleted service "%s".' % principal) + def pre_callback(self, ldap, dn, *keys, **options): + if self.api.env.enable_ra: + (dn, entry_attrs) = ldap.get_entry(dn, ['usercertificate']) + cert = entry_attrs.get('usercertificate') + if cert: + serial = unicode(get_serial(cert)) + self.api.Command['cert_revoke'](serial, revocation_reason=5) + return dn api.register(service_del) -class service_mod(crud.Update): +class service_mod(LDAPUpdate): """ Modify service. """ - def execute(self, principal, **options): - ldap = self.api.Backend.ldap2 - # FIXME, should be in a normalizer. Need to fix normalizers to work - # on non-unicode data. - if options.get('usercertificate'): - options['usercertificate'] = base64.b64decode(options['usercertificate']) - - entry_attrs = self.args_options_2_entry(*tuple(), **options) - dn = ldap.make_dn_from_attr('krbprincipalname', principal, _container_dn) - - (dn, old_entry_attrs) = ldap.get_entry(dn) - if 'usercertificate' in old_entry_attrs and 'usercertificate' in options: - # FIXME, what to do here? Do we revoke the old cert? - raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(old_entry_attrs['usercertificate'])) - - try: - ldap.update_entry(dn, entry_attrs) - except errors.EmptyModlist: - pass - - return ldap.get_entry(dn, entry_attrs.keys()) - - def output_to_cli(self, textui, result, principal, **options): - (dn, entry_attrs) = result - - textui.print_name(self.name) - textui.print_attribute('dn', dn) - textui.print_entry(entry_attrs) - textui.print_dashed('Modified service "%s".' % principal) + def pre_callback(self, ldap, dn, entry_attrs, *keys, **options): + cert = entry_attrs.get('usercertificate') + if cert: + (dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate']) + if 'usercertificate' in entry_attrs_old: + # FIXME: what to do here? do we revoke the old cert? + fmt = 'entry already has a certificate, serial number: %s' % ( + get_serial(entry_attrs_old['usercertificate']) + ) + raise errors.GenericError(format=fmt) + # FIXME: should be in normalizer; see service_add + entry_attrs['usercertificate'] = base64.b64decode(cert) + return dn api.register(service_mod) -class service_find(crud.Search): +class service_find(LDAPSearch): """ Search for services. """ - takes_options = ( - Flag('all', - doc='Retrieve all attributes' - ), - ) - - def execute(self, term, **kw): - ldap = self.api.Backend.ldap2 - + def pre_callback(self, ldap, filter, attrs_list, base_dn, *args, **options): # lisp style! custom_filter = '(&(objectclass=ipaService)' \ '(!(objectClass=posixAccount))' \ @@ -278,95 +217,17 @@ class service_find(crud.Search): '(krbprincipalname=krbtgt/*))' \ ')' \ ')' - - search_kw = self.args_options_2_entry(**kw) - search_kw['objectclass'] = 'krbprincipal' - filter = ldap.make_filter(search_kw, rules=ldap.MATCH_ALL) - - search_kw = {} - for a in _default_attributes: - search_kw[a] = term - term_filter = ldap.make_filter(search_kw, exact=False) - - filter = ldap.combine_filters( - (custom_filter, filter, term_filter), rules=ldap.MATCH_ALL + return ldap.combine_filters( + (custom_filter, filter), rules=ldap.MATCH_ALL ) - if kw['all']: - attrs_list = ['*'] - else: - attrs_list = _default_attributes - - try: - (entries, truncated) = ldap.find_entries( - filter, attrs_list, _container_dn - ) - except errors.NotFound: - (entries, truncated) = (tuple(), False) - - return (entries, truncated) - - def output_for_cli(self, textui, result, principal, **options): - (entries, truncated) = result - - textui.print_name(self.name) - for (dn, entry_attrs) in entries: - textui.print_attribute('dn', dn) - textui.print_entry(entry_attrs) - textui.print_plain('') - textui.print_count( - len(entries), '%i service matched.', '%i services matched.' - ) - if truncated: - textui.print_dashed('These results are truncated.', below=False) - textui.print_dashed( - 'Please refine your search and try again.', above=False - ) - api.register(service_find) -class service_show(crud.Retrieve): +class service_show(LDAPRetrieve): """ Display service. """ - takes_options = ( - Flag('all', - doc='Retrieve all attributes' - ), - ) - - def execute(self, principal, **kw): - """ - Execute the service-show operation. - - The dn should not be passed as a keyword argument as it is constructed - by this method. - - Returns the entry - - :param principal: The service principal to retrieve - :param kw: Not used. - """ - ldap = self.api.Backend.ldap2 - - dn = ldap.make_dn_from_attr( - 'krbprincipalname', principal, _container_dn - ) - - if kw['all']: - attrs_list = ['*'] - else: - attrs_list = _default_attributes - - return ldap.get_entry(dn, attrs_list) - - def output_for_cli(self, textui, result, principal, **options): - (dn, entry_attrs) = result - - textui.print_name(self.name) - textui.print_attribute('dn', dn) - textui.print_entry(entry_attrs) api.register(service_show) -- cgit