diff options
author | Pavel Zuna <pzuna@redhat.com> | 2009-06-16 13:16:03 +0200 |
---|---|---|
committer | Rob Crittenden <rcritten@redhat.com> | 2009-07-02 13:33:00 -0400 |
commit | 4b993782e6c171ffd159c8ed8489fcb787886322 (patch) | |
tree | 74dff5891f5b47ff3daff3b8543e015fb297fa8b /ipalib/plugins/aci.py | |
parent | 9352d2fc10dde6ed8cc57f2acee3e27ed7e340c2 (diff) | |
download | freeipa-4b993782e6c171ffd159c8ed8489fcb787886322.tar.gz freeipa-4b993782e6c171ffd159c8ed8489fcb787886322.tar.xz freeipa-4b993782e6c171ffd159c8ed8489fcb787886322.zip |
Delete plugins using old LDAP backend.
Diffstat (limited to 'ipalib/plugins/aci.py')
-rw-r--r-- | ipalib/plugins/aci.py | 462 |
1 files changed, 0 insertions, 462 deletions
diff --git a/ipalib/plugins/aci.py b/ipalib/plugins/aci.py deleted file mode 100644 index 5e1eb0dba..000000000 --- a/ipalib/plugins/aci.py +++ /dev/null @@ -1,462 +0,0 @@ -# Authors: -# Rob Crittenden <rcritten@redhat.com> -# -# Copyright (C) 2009 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -""" -Front-end plugins for managing DS ACIs -""" - -from ipalib import api, crud, errors -from ipalib import Object, Command # Plugin base classes -from ipalib import Str, Flag, Int, StrEnum # Parameter types -from ipalib.aci import ACI - -type_map = { - 'user': 'ldap:///uid=*,%s,%s' % (api.env.container_user, api.env.basedn), - 'group': 'ldap:///cn=*,%s,%s' % (api.env.container_group, api.env.basedn), - 'host': 'ldap:///cn=*,%s,%s' % (api.env.container_host, api.env.basedn) -} - -def make_aci(current, aciname, kw): - try: - taskgroup = api.Command['taskgroup_show'](kw['taskgroup']) - except errors.NotFound: - # The task group doesn't exist, let's be helpful and add it - tgkw = {'description':aciname} - taskgroup = api.Command['taskgroup_add'](kw['taskgroup'], **tgkw) - - a = ACI(current) - a.name = aciname - a.permissions = kw['permissions'].replace(' ','').split(',') - a.set_bindrule("groupdn = \"ldap:///%s\"" % taskgroup['dn']) - if kw.get('attrs', None): - a.set_target_attr(kw['attrs'].split(',')) - if kw.get('memberof', None): - group = api.Command['group_show'](kw['memberof']) - a.set_target_filter("memberOf=%s" % group['dn'].decode('UTF-8')) - if kw.get('type', None): - target = type_map[kw.get('type')] - a.set_target(target) - if kw.get('targetgroup', None): - # Purposely no try here so we'll raise a NotFound - group = api.Command['group_show'](kw.get('targetgroup')) - target = "ldap:///%s" % group.get('dn') - a.set_target(target) - if kw.get('subtree',None): - # See if the subtree is a full URI - target = kw.get('subtree') - if not target.startswith("ldap:///"): - target = "ldap:///" + target - a.set_target(target) - - return a - -def search_by_name(acis, aciname): - """ - Find an aci using the name field. - - Must be an exact match of the entire name. - """ - for a in acis: - try: - t = ACI(a) - if t.name == aciname: - return str(t) - except SyntaxError, e: - # FIXME: need to log syntax errors, ignore for now - pass - - raise errors.NotFound(reason="Unable to find aci %s" % aciname) - -def search_by_attr(acis, attrlist): - """ - Find an aci by targetattr. - - Returns an ACI list of all acis the attribute appears in. - """ - results = [] - for a in acis: - try: - t = ACI(a) - for attr in attrlist: - attr = attr.lower() - for v in t.target['targetattr'].get('expression'): - if attr == v.lower(): - results.append(str(t)) - except SyntaxError, e: - # FIXME: need to log syntax errors, ignore for now - pass - - if results: - return results - - raise errors.NotFound(reason="Unable to find any ACIs with attribute %s" % ",".join(attrlist)) - -def search_by_taskgroup(acis, tgdn): - """ - Find an aci by taskgroup. This searches the ACI bind rule. - - Returns an ACI list of all acis that match. - """ - results = [] - for a in acis: - try: - t = ACI(a) - if t.bindrule['expression'] == "ldap:///" + tgdn: - results.append(str(t)) - except SyntaxError, e: - # FIXME: need to log syntax errors, ignore for now - pass - - if results: - return results - - raise errors.NotFound(reason="taskgroup %s not found" % tgdn) - -def search_by_perm(acis, permlist): - """ - Find an aci by permissions - - Returns an ACI list of all acis the permission appears in. - """ - results = [] - for a in acis: - try: - t = ACI(a) - for perm in permlist: - if perm.lower() in t.permissions: - results.append(str(t)) - except SyntaxError, e: - # FIXME: need to log syntax errors, ignore for now - pass - - if results: - return results - - raise errors.NotFound(reason="No ACIs with permissions %s found" % ",".join(permlist)) - -def search_by_memberof(acis, memberoffilter): - """ - Find an aci by memberof - - Returns an ACI list of all acis that has a matching memberOf as a - targetfilter. - """ - results = [] - memberoffilter = memberoffilter.lower() - for a in acis: - try: - t = ACI(a) - try: - if memberoffilter == t.target['targetfilter'].get('expression').lower(): - results.append(str(t)) - except KeyError: - pass - except SyntaxError, e: - # FIXME: need to log syntax errors, ignore for now - pass - - if results: - return results - - raise errors.NotFound(reason="Nothing found for %s" % memberoffilter) - -class aci(Object): - """ - ACI object. - """ - takes_params = ( - Str('aciname', - doc='Name of ACI', - primary_key=True, - ), - Str('taskgroup', - doc='Name of taskgroup this ACI grants access to', - ), - StrEnum('permissions', - doc='Permissions to grant: read, write, add, delete, selfwrite, all', - values=(u'read', u'write', u'add', u'delete', u'selfwrite', u'all') - ), - Str('attrs?', - doc='Comma-separated list of attributes', - ), - StrEnum('type?', - doc='type of IPA object: user, group, host', - values=(u'user', u'group') - ), - Str('memberof?', - doc='member of a group', - ), - Str('filter?', - doc='A legal LDAP filter (ou=Engineering)', - ), - Str('subtree?', - doc='A subtree to apply the ACI to', - ), - Str('targetgroup?', - doc='Apply the ACI to a specific group', - ), - ) -api.register(aci) - - -class aci_add(crud.Create): - """ - Add a new aci. - """ - - def execute(self, aciname, **kw): - """ - Execute the aci-add operation. - - Returns the entry as it will be created in LDAP. - - :param aciname: The name of the ACI being added. - :param kw: Keyword arguments for the other LDAP attributes. - """ - assert 'aciname' not in kw - ldap = self.api.Backend.ldap - - newaci = make_aci(None, aciname, kw) - - currentaci = ldap.retrieve(self.api.env.basedn, ['aci']) - - acilist = currentaci.get('aci') - for a in acilist: - try: - b = ACI(a) - if newaci.isequal(b): - raise errors.DuplicateEntry() - except SyntaxError: - pass - acilist.append(str(newaci)) - kwupdate = {'aci': acilist} - - return ldap.update(currentaci.get('dn'), **kwupdate) - -api.register(aci_add) - - -class aci_del(crud.Delete): - 'Delete an existing aci.' - """ - Remove an aci by name. - """ - - def execute(self, aciname, **kw): - """ - Execute the aci-del operation. - - :param aciname: The name of the ACI being added. - :param kw: unused - """ - assert 'aciname' not in kw - ldap = self.api.Backend.ldap - - currentaci = ldap.retrieve(self.api.env.basedn, ['aci']) - acilist = currentaci.get('aci') - a = search_by_name(acilist, aciname) - i = acilist.index(a) - del acilist[i] - - kwupdate = {'aci': acilist} - - return ldap.update(currentaci.get('dn'), **kwupdate) - - def output_for_cli(self, textui, result, aciname): - """ - Output result of this command to command line interface. - """ - textui.print_plain('Deleted aci "%s"' % aciname) - -api.register(aci_del) - - -class aci_mod(crud.Update): - 'Edit an existing aci.' - def execute(self, aciname, **kw): - return "Not implemented" - def output_for_cli(self, textui, result, aciname, **options): - textui.print_plain(result) -api.register(aci_mod) - - -class aci_find(crud.Search): - 'Search for a aci.' - takes_options = ( - Str('bindrule?', - doc='The bindrule (e.g. ldap:///self)' - ), - Flag('and?', - doc='Consider multiple options to be \"and\" so all are required.') - ) - def execute(self, term, **kw): - ldap = self.api.Backend.ldap - currentaci = ldap.retrieve(self.api.env.basedn, ['aci']) - currentaci = currentaci.get('aci') - results = [] - - # aciname - if kw.get('aciname'): - try: - a = search_by_name(currentaci, kw.get('aciname')) - results = [a] - if kw.get('and'): - currentaci = results - except errors.NotFound: - if kw.get('and'): - results = [] - currentaci = [] - pass - - # attributes - if kw.get('attrs'): - try: - attrs = kw.get('attrs') - attrs = attrs.replace(' ','').split(',') - a=search_by_attr(currentaci, attrs) - if kw.get('and'): - results = a - currentaci = results - else: - results = results + a - except errors.NotFound: - if kw.get('and'): - results = [] - currentaci = [] - pass - - # taskgroup - if kw.get('taskgroup'): - try: - tg = api.Command['taskgroup_show'](kw.get('taskgroup')) - except errors.NotFound: - # FIXME, need more precise error - raise - try: - a=search_by_taskgroup(currentaci, tg.get('dn')) - if kw.get('and'): - results = a - currentaci = results - else: - results = results + a - except errors.NotFound: - if kw.get('and'): - results = [] - currentaci = [] - pass - - # permissions - if kw.get('permissions'): - try: - permissions = kw.get('permissions') - permissions = permissions.replace(' ','').split(',') - a=search_by_perm(currentaci, permissions) - if kw.get('and'): - results = a - currentaci = results - else: - results = results + a - except errors.NotFound: - if kw.get('and'): - results = [] - currentaci = [] - pass - - # memberOf - if kw.get('memberof'): - try: - group = api.Command['group_show'](kw['memberof']) - memberof = "(memberOf=%s)" % group['dn'].decode('UTF-8') - a=search_by_memberof(currentaci, memberof) - results = results + a - if kw.get('and'): - currentaci = results - except errors.NotFound: - if kw.get('and'): - results = [] - currentaci = [] - pass - -# TODO -# --type=STR type of IPA object: user, group, host -# --filter=STR A legal LDAP filter (ou=Engineering) -# --subtree=STR A subtree to apply the ACI to -# --bindrule=STR A subtree to apply the ACI to - - # Make sure we have no dupes in the list - results = list(set(results)) - - # the first entry contains the count - counter = len(results) - return [counter] + results - - def output_for_cli(self, textui, result, term, **options): - counter = result[0] - acis = result[1:] - if counter == 0 or len(acis) == 0: - textui.print_plain("No entries found") - return - textui.print_name(self.name) - for a in acis: - textui.print_plain(a) - textui.print_count(acis, '%d acis matched') - -api.register(aci_find) - - -class aci_show(crud.Retrieve): - 'Examine an existing aci.' - def execute(self, aciname, **kw): - """ - Execute the aci-show operation. - - Returns the entry - - :param uid: The login name of the user to retrieve. - :param kw: unused - """ - ldap = self.api.Backend.ldap - currentaci = ldap.retrieve(self.api.env.basedn, ['aci']) - - a = search_by_name(currentaci.get('aci'), aciname) - return str(a) - - def output_for_cli(self, textui, result, aciname, **options): - textui.print_plain(result) - -api.register(aci_show) - - -class aci_showall(Command): - 'Examine all existing acis.' - def execute(self): - """ - Execute the aci-show operation. - - Returns the entry - - :param uid: The login name of the user to retrieve. - :param kw: unused - """ - ldap = self.api.Backend.ldap - return ldap.retrieve(self.api.env.basedn, ['aci']) - def output_for_cli(self, textui, result, **options): - textui.print_entry(result) - -api.register(aci_showall) |