diff options
-rw-r--r-- | ipalib/plugins/aci.py | 76 | ||||
-rw-r--r-- | tests/test_xmlrpc/test_aci_plugin.py | 257 |
2 files changed, 265 insertions, 68 deletions
diff --git a/ipalib/plugins/aci.py b/ipalib/plugins/aci.py index ab69c9a3c..f64589833 100644 --- a/ipalib/plugins/aci.py +++ b/ipalib/plugins/aci.py @@ -145,7 +145,7 @@ def _make_aci(current, aciname, kw): if 'attrs' in kw: a.set_target_attr(kw['attrs']) if 'memberof' in kw: - (dn, entry_attrs) = api.Command['group_show'](kw['memberof']) + entry_attrs = api.Command['group_show'](kw['memberof'])['result'] a.set_target_filter('memberOf=%s' % dn) if 'filter' in kw: a.set_target_filter(kw['filter']) @@ -154,7 +154,7 @@ def _make_aci(current, aciname, kw): a.set_target(target) if 'targetgroup' in kw: # Purposely no try here so we'll raise a NotFound - (dn, entry_attrs) = api.Command['group_show'](kw['targetgroup']) + entry_attrs = api.Command['group_show'](kw['targetgroup'])['result'] target = 'ldap:///%s' % dn a.set_target(target) if 'subtree' in kw: @@ -166,6 +166,53 @@ def _make_aci(current, aciname, kw): return a +def _aci_to_kw(ldap, a): + """Convert an ACI into its equivalent keywords. + + This is used for the modify operation so we can merge the + incoming kw and existing ACI and pass the result to + _make_aci(). + """ + kw = {} + kw['aciname'] = a.name + kw['permissions'] = tuple(a.permissions) + if 'targetattr' in a.target: + kw['attrs'] = tuple(a.target['targetattr']['expression']) + if 'targetfilter' in a.target: + target = a.target['targetfilter']['expression'] + if target.startswith('memberOf'): + kw['memberof'] = target + else: + kw['filter'] = target + if 'target' in a.target: + target = a.target['target']['expression'] + found = False + for k in _type_map.keys(): + if _type_map[k] == target: + kw['type'] = unicode(k) + found = True + break; + if not found: + if target.startswith('('): + kw['filter'] = target + else: + # See if the target is a group. If so we set the + # targetgroup attr, otherwise we consider it a subtree + if api.env.container_group in target: + kw['targetgroup'] = target + else: + kw['subtree'] = target + + groupdn = a.bindrule['expression'] + groupdn = groupdn.replace('ldap:///','') + (dn, entry_attrs) = ldap.get_entry(groupdn, ['cn']) + if api.env.container_taskgroup in dn: + kw['taskgroup'] = entry_attrs['cn'][0] + else: + kw['group'] = entry_attrs['cn'][0] + + return kw + def _convert_strings_to_acis(acistrs): acis = [] for a in acistrs: @@ -362,18 +409,23 @@ class aci_mod(crud.Update): acis = _convert_strings_to_acis(entry_attrs.get('aci', [])) aci = _find_aci_by_name(acis, aciname) - kw.setdefault('aciname', aci.name) - kw.setdefault('taskgroup', aci.bindrule['expression']) - kw.setdefault('permissions', aci.permissions) - kw.setdefault('attrs', aci.target['targetattr']['expression']) - if 'type' not in kw and 'targetgroup' not in kw and 'subtree' not in kw: - kw['subtree'] = aci.target['target']['expression'] - if 'memberof' not in kw and 'filter' not in kw: - kw['filter'] = aci.target['targetfilter']['expression'] + # The strategy here is to convert the ACI we're updating back into + # a series of keywords. Then we replace any keywords that have been + # updated and convert that back into an ACI and write it out. + newkw = _aci_to_kw(ldap, aci) + for k in kw.keys(): + newkw[k] = kw[k] + if 'aciname' in newkw: + del newkw['aciname'] self.api.Command['aci_del'](aciname) - return self.api.Command['aci_add'](aciname, **kw) + result = self.api.Command['aci_add'](aciname, **newkw)['result'] + + return dict( + result=result, + value=aciname, + ) def output_for_cli(self, textui, result, aciname, **options): """ @@ -451,7 +503,7 @@ class aci_find(crud.Search): try: self.api.Command['taskgroup_show']( kw['taskgroup'] - )['result'] + ) except errors.NotFound: pass else: diff --git a/tests/test_xmlrpc/test_aci_plugin.py b/tests/test_xmlrpc/test_aci_plugin.py index c42f1abd8..14d3c8950 100644 --- a/tests/test_xmlrpc/test_aci_plugin.py +++ b/tests/test_xmlrpc/test_aci_plugin.py @@ -1,7 +1,7 @@ # Authors: # Rob Crittenden <rcritten@redhat.com> # -# Copyright (C) 2009 Red Hat +# Copyright (C) 2010 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or @@ -16,62 +16,207 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + """ Test the `ipalib/plugins/aci.py` module. """ -import sys -from xmlrpc_test import XMLRPC_test, assert_attr_equal -from ipalib import api -from ipalib import errors - - -class test_aci(XMLRPC_test): - """ - Test the `aci` plugin. - """ - aciname = u'acitest' - taskgroup = u'testtaskgroup' - kw = {'permissions': u'add', 'type': u'user', 'taskgroup': taskgroup } - aci = u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "acitest";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn) - - def test_1_aci_add(self): - """ - Test adding an aci using the `xmlrpc.aci_add` method. - """ - result = api.Command['aci_add'](self.aciname, **self.kw)['result'] - - assert result == self.aci - - def test_2_aci_show(self): - """ - Test showing an aci using the `xmlrpc.aci_show` method. - """ - result = api.Command['aci_show'](self.aciname)['result'] - - assert result == self.aci - - def test_3_aci_find(self): - """ - Test showing an aci using the `xmlrpc.aci_show` method. - """ - outcome = api.Command['aci_find'](self.aciname) - result = outcome['result'] - count = outcome['count'] - - assert count == 1 - assert result[0] == self.aci - - def test_4_aci_del(self): - """ - Remove the second test policy with `xmlrpc.aci_del`. - """ - assert api.Command['aci_del'](self.aciname)['result'] is True - - # Verify that it is gone - try: - api.Command['aci_show'](self.aciname) - except errors.NotFound: - pass - else: - assert False +from ipalib import api, errors +from tests.test_xmlrpc import objectclasses +from xmlrpc_test import Declarative, fuzzy_digits, fuzzy_uuid + + +aci1=u'test1' +taskgroup = u'testtaskgroup' + + +class test_aci(Declarative): + + cleanup_commands = [ + ('aci_del', [aci1], {}), + ] + + tests = [ + + dict( + desc='Try to retrieve non-existent %r' % aci1, + command=('aci_show', [aci1], {}), + expected=errors.NotFound(reason='no such entry'), + ), + + + dict( + desc='Try to update non-existent %r' % aci1, + command=('aci_mod', [aci1], dict(permissions=u'write')), + expected=errors.NotFound(reason='no such entry'), + ), + + + dict( + desc='Try to delete non-existent %r' % aci1, + command=('aci_del', [aci1], {}), + expected=errors.NotFound(reason='no such entry'), + ), + + + dict( + desc='Create %r' % aci1, + command=( + 'aci_add', [aci1], dict(permissions=u'add', type=u'user', taskgroup=taskgroup) + ), + expected=dict( + value=aci1, + summary=u'Created ACI "test1"', + result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn), + ), + ), + + + dict( + desc='Try to create duplicate %r' % aci1, + command=( + 'aci_add', [aci1], dict(permissions=u'add', type=u'user', taskgroup=taskgroup) + ), + expected=errors.DuplicateEntry(), + ), + + + dict( + desc='Retrieve %r' % aci1, + command=( + 'aci_show', [aci1], {} + ), + expected=dict( + value=aci1, + summary=None, + result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn), + ), + ), + + + dict( + desc='Search for %r with all=True' % aci1, + command=( + 'aci_find', [aci1], {'all': True} + ), + expected=dict( + result=[ + u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn) + ], + summary=u'1 ACI matched', + count=1, + ), + ), + + + dict( + desc='Search for %r with minimal attributes' % aci1, + command=( + 'aci_find', [aci1], {} + ), + expected=dict( + result=[ + u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn) + ], + summary=u'1 ACI matched', + count=1, + ), + ), + + + dict( + desc='Update permissions in %r' % aci1, + command=( + 'aci_mod', [aci1], dict(permissions=u'add,write') + ), + expected=dict( + value=aci1, + summary=u'Updated ACI "test1"', + result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn), + ), + ), + + + dict( + desc='Retrieve %r to verify update' % aci1, + command=('aci_show', [aci1], {}), + expected=dict( + value=aci1, + summary=None, + result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn), + ), + + ), + + dict( + desc='Update attributes in %r' % aci1, + command=( + 'aci_mod', [aci1], dict(attrs=u'cn, sn,givenName') + ), + expected=dict( + value=aci1, + summary=u'Updated ACI "test1"', + result=u'(targetattr = "cn || sn || givenName")(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn), + ), + ), + + + dict( + desc='Update type in %r' % aci1, + command=( + 'aci_mod', [aci1], dict(type=u'group') + ), + expected=dict( + value=aci1, + summary=u'Updated ACI "test1"', + result=u'(targetattr = "cn || sn || givenName")(target = "ldap:///cn=*,cn=groups,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn), + ), + ), + + + dict( + desc='Update memberOf in %r' % aci1, + command=( + 'aci_mod', [aci1], dict(memberof=u'ipausers') + ), + expected=dict( + value=aci1, + summary=u'Updated ACI "test1"', + result=u'(targetattr = "cn || sn || givenName")(targetfilter = "(memberOf=cn=testtaskgroup,cn=taskgroups,cn=accounts,%s)")(target = "ldap:///cn=*,cn=groups,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn, api.env.basedn), + ), + ), + + + dict( + desc='Delete %r' % aci1, + command=('aci_del', [aci1], {}), + expected=dict( + result=True, + summary=u'Deleted ACI "test1"', + value=aci1, + ), + ), + + + dict( + desc='Try to delete non-existent %r' % aci1, + command=('aci_del', [aci1], {}), + expected=errors.NotFound(reason='no such entry'), + ), + + + dict( + desc='Try to retrieve non-existent %r' % aci1, + command=('aci_show', [aci1], {}), + expected=errors.NotFound(reason='no such entry'), + ), + + + dict( + desc='Try to update non-existent %r' % aci1, + command=('aci_mod', [aci1], dict(givenname=u'Foo')), + expected=errors.NotFound(reason='no such entry'), + ), + + + ] |