diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-01-16 10:34:13 -0500 |
---|---|---|
committer | Rob Crittenden <rcritten@redhat.com> | 2009-01-19 10:40:12 -0500 |
commit | 98ab09fafc7e24fb32b44e691eb6d6c9464197d5 (patch) | |
tree | 539cbf03d8a8bba9d0df3e38fc29b7e6d08f967b | |
parent | c892051006ed27de8833822431b1f9adc976942c (diff) | |
download | freeipa-98ab09fafc7e24fb32b44e691eb6d6c9464197d5.tar.gz freeipa-98ab09fafc7e24fb32b44e691eb6d6c9464197d5.tar.xz freeipa-98ab09fafc7e24fb32b44e691eb6d6c9464197d5.zip |
Initial implementation of netgroups
-rw-r--r-- | ipalib/plugins/f_netgroup.py | 461 | ||||
-rw-r--r-- | tests/test_xmlrpc/test_netgroup_plugin.py | 320 |
2 files changed, 781 insertions, 0 deletions
diff --git a/ipalib/plugins/f_netgroup.py b/ipalib/plugins/f_netgroup.py new file mode 100644 index 000000000..6ee55b0db --- /dev/null +++ b/ipalib/plugins/f_netgroup.py @@ -0,0 +1,461 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Frontend plugin for netgroups. +""" + +from ipalib import api, crud, errors +from ipalib import Object, Command # Plugin base classes +from ipalib import Str # Parameter types +from ipalib import uuid + +netgroup_base = "cn=ng, cn=alt" +netgroup_filter = "ipaNISNetgroup" +hostgroup_filter = "groupofnames)(!(objectclass=posixGroup)" + +def get_members(members): + """ + Return a list of members. + + It is possible that the value passed in is None. + """ + if members: + members = members.split(',') + else: + members = [] + + return members + +def find_members(ldap, failed, members, attribute, filter=None): + """ + Return 2 lists: one a list of DNs found, one a list of errors + """ + found = [] + for m in members: + if not m: continue + try: + member_dn = ldap.find_entry_dn(attribute, m, filter) + found.append(member_dn) + except errors.NotFound: + failed.append(m) + continue + + return found, failed + +def add_members(ldap, completed, members, dn, memberattr): + add_failed = [] + for member_dn in members: + try: + ldap.add_member_to_group(member_dn, dn, memberattr) + completed+=1 + except: + add_failed.append(member_dn) + + return completed, add_failed + +def add_external(ldap, completed, members, cn): + failed = [] + netgroup = api.Command['netgroup_show'](cn) + external = netgroup.get('externalhost', []) + if not isinstance(external, list): + external = [external] + external_len = len(external) + for m in members: + if not m in external: + external.append(m) + completed+=1 + else: + failed.append(m) + if len(external) > external_len: + kw = {'externalhost': external} + ldap.update(netgroup['dn'], **kw) + + return completed, failed + +def remove_members(ldap, completed, members, dn, memberattr): + remove_failed = [] + for member_dn in members: + try: + ldap.remove_member_from_group(member_dn, dn, memberattr) + completed+=1 + except: + remove_failed.append(member_dn) + + return completed, remove_failed + +def remove_external(ldap, completed, members, cn): + failed = [] + netgroup = api.Command['netgroup_show'](cn) + external = netgroup.get('externalhost', []) + if not isinstance(external, list): + external = [external] + external_len = len(external) + for m in members: + try: + external.remove(m) + completed+=1 + except ValueError: + failed.append(m) + if len(external) < external_len: + kw = {'externalhost': external} + ldap.update(netgroup['dn'], **kw) + + return completed, failed + +class netgroup(Object): + """ + netgroups object. + """ + takes_params = ( + Str('cn', + cli_name='name', + primary_key=True + ), + Str('description', + doc='Description', + ), + Str('nisdomainname?', + cli_name='domainname', + doc='Domain name', + ), + ) +api.register(netgroup) + + +class netgroup_add(crud.Add): + 'Add a new netgroup.' + + def execute(self, cn, **kw): + """ + Execute the netgroup-add operation. + + The dn should not be passed as a keyword argument as it is constructed + by this method. + + Returns the entry as it will be created in LDAP. + + :param cn: The name of the netgroup + :param kw: Keyword arguments for the other LDAP attributes. + """ + self.log.info("IPA: netgroup-add '%s'" % cn) + assert 'cn' not in kw + assert 'dn' not in kw + ldap = self.api.Backend.ldap + kw['cn'] = cn +# kw['dn'] = ldap.make_netgroup_dn() + kw['ipauniqueid'] = str(uuid.uuid1()) + kw['dn'] = "ipauniqueid=%s,%s,%s" % (kw['ipauniqueid'], netgroup_base, api.env.basedn) + + if not kw.get('nisdomainname', False): + kw['nisdomainname'] = api.env.domain + + # some required objectclasses + kw['objectClass'] = ['top', 'ipaAssociation', 'ipaNISNetgroup'] + + return ldap.create(**kw) + + def output_for_cli(self, textui, result, *args, **options): + """ + Output result of this command to command line interface. + """ + textui.print_name(self.name) + textui.print_entry(result) + textui.print_dashed('Added netgroup "%s"' % result.get('cn')) + +api.register(netgroup_add) + + +class netgroup_del(crud.Del): + 'Delete an existing netgroup.' + + def execute(self, cn, **kw): + """Delete a netgroup. + + cn is the cn of the netgroup to delete + + The memberOf plugin handles removing the netgroup from any other + groups. + + :param cn: The name of the netgroup being removed. + :param kw: Not used. + """ + self.log.info("IPA: netgroup-del '%s'" % cn) + + ldap = self.api.Backend.ldap + dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base) + return ldap.delete(dn) + + def output_for_cli(self, textui, result, cn): + """ + Output result of this command to command line interface. + """ + textui.print_plain('Deleted net group "%s"' % cn) + +api.register(netgroup_del) + + +class netgroup_mod(crud.Mod): + 'Edit an existing netgroup.' + def execute(self, cn, **kw): + """ + Execute the netgroup-mod operation. + + The dn should not be passed as a keyword argument as it is constructed + by this method. + + Returns the entry + + :param cn: The name of the netgroup to retrieve. + :param kw: Keyword arguments for the other LDAP attributes. + """ + self.log.info("IPA: netgroup-mod '%s'" % cn) + assert 'cn' not in kw + assert 'dn' not in kw + ldap = self.api.Backend.ldap + dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base) + return ldap.update(dn, **kw) + + def output_for_cli(self, textui, result, cn, **options): + """ + Output result of this command to command line interface. + """ + textui.print_name(self.name) + textui.print_entry(result) + textui.print_dashed('Updated netgroup "%s"' % result['cn']) + +api.register(netgroup_mod) + + +class netgroup_find(crud.Find): + 'Search the netgroups.' + def execute(self, term, **kw): + ldap = self.api.Backend.ldap + + search_fields = ['ipauniqueid','description','nisdomainname','cn'] + + search_kw = {} + for s in search_fields: + search_kw[s] = term + + search_kw['objectclass'] = netgroup_filter + search_kw['base'] = netgroup_base + return ldap.search(**search_kw) + + def output_for_cli(self, textui, result, *args, **options): + counter = result[0] + groups = result[1:] + if counter == 0 or len(groups) == 0: + textui.print_plain("No entries found") + return + if len(groups) == 1: + textui.print_entry(groups[0]) + return + textui.print_name(self.name) + for g in groups: + textui.print_entry(g) + textui.print_plain('') + if counter == -1: + textui.print_plain('These results are truncated.') + textui.print_plain('Please refine your search and try again.') + textui.print_count(groups, '%d netgroups matched') + +api.register(netgroup_find) + + +class netgroup_show(crud.Get): + 'Examine an existing netgroup.' + def execute(self, cn, **kw): + """ + Execute the netgroup-show operation. + + The dn should not be passed as a keyword argument as it is constructed + by this method. + + Returns the entry + + :param cn: The name of the netgroup to retrieve. + :param kw: Unused + """ + ldap = self.api.Backend.ldap + dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base) + return ldap.retrieve(dn) + + def output_for_cli(self, textui, result, *args, **options): + textui.print_entry(result) + +api.register(netgroup_show) + +class netgroup_add_member(Command): + 'Add a member to a group.' + takes_args = ( + Str('cn', + cli_name='name', + primary_key=True + ), + ) + takes_options = ( + Str('hosts?', doc='comma-separated list of hosts to add'), + Str('hostgroups?', doc='comma-separated list of host groups to add'), + Str('users?', doc='comma-separated list of users to add'), + Str('groups?', doc='comma-separated list of groups to add'), + ) + + def execute(self, cn, **kw): + """ + Execute the netgroup-add-member operation. + + Returns the updated group entry + + :param cn: The netgroup name to add new members to. + :param kw: hosts is a comma-separated list of hosts to add + :param kw: hostgroups is a comma-separated list of host groups to add + :param kw: users is a comma-separated list of users to add + :param kw: groups is a comma-separated list of host to add + """ + ldap = self.api.Backend.ldap + dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base) + add_failed = [] + to_add = [] + completed = 0 + + # Hosts + members = get_members(kw.get('hosts', '')) + (to_add, add_failed) = find_members(ldap, add_failed, members, "cn", "ipaHost") + + # If a host is not found we'll consider it an externalHost. It will + # be up to the user to handle typos + if add_failed: + (completed, failed) = add_external(ldap, completed, add_failed, cn) + add_failed = failed + + (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberhost') + add_failed+=failed + + # Host groups + members = get_members(kw.get('hostgroups', '')) + (to_add, add_failed) = find_members(ldap, add_failed, members, "cn", hostgroup_filter) + (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberhost') + add_failed+=failed + + # User + members = get_members(kw.get('users', '')) + (to_add, add_failed) = find_members(ldap, add_failed, members, "uid") + (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberuser') + add_failed+=failed + + # Groups + members = get_members(kw.get('groups', '')) + (to_add, add_failed) = find_members(ldap, add_failed, members, "cn", "posixGroup") + (completed, failed) = add_members(ldap, completed, to_add, dn, 'memberuser') + add_failed+=failed + + return add_failed + + def output_for_cli(self, textui, result, *args, **options): + """ + Output result of this command to command line interface. + """ + if result: + textui.print_plain("These entries failed to add to the group:") + for a in result: + print "\t'%s'" % a + else: + textui.print_plain("netgroup membership updated.") + +api.register(netgroup_add_member) + + +class netgroup_remove_member(Command): + 'Remove a member from a group.' + takes_args = ( + Str('cn', + cli_name='name', + primary_key=True + ), + ) + takes_options = ( + Str('hosts?', doc='comma-separated list of hosts to remove'), + Str('hostgroups?', doc='comma-separated list of groups to remove'), + Str('users?', doc='comma-separated list of users to remove'), + Str('groups?', doc='comma-separated list of groups to remove'), + ) + def execute(self, cn, **kw): + """ + Execute the group-remove-member operation. + + Returns the members that could not be added + + :param cn: The group name to add new members to. + :param kw: hosts is a comma-separated list of hosts to remove + :param kw: hostgroups is a comma-separated list of host groups to remove + :param kw: users is a comma-separated list of users to remove + :param kw: groups is a comma-separated list of host to remove + """ + ldap = self.api.Backend.ldap + dn = ldap.find_entry_dn("cn", cn, netgroup_filter, netgroup_base) + remove_failed = [] + to_remove = [] + completed = 0 + + # Hosts + members = get_members(kw.get('hosts', '')) + (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "cn", "ipaHost") + + # If a host is not found we'll consider it an externalHost. It will + # be up to the user to handle typos + if remove_failed: + (completed, failed) = remove_external(ldap, completed, remove_failed, cn) + remove_failed = failed + + (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberhost') + remove_failed+=failed + + # Host groups + members = get_members(kw.get('hostgroups', '')) + (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "cn", hostgroup_filter) + (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberhost') + remove_failed+=failed + + # User + members = get_members(kw.get('users', '')) + (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "uid") + (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberuser') + remove_failed+=failed + + # Groups + members = get_members(kw.get('groups', '')) + (to_remove, remove_failed) = find_members(ldap, remove_failed, members, "cn", "posixGroup") + (completed, failed) = remove_members(ldap, completed, to_remove, dn, 'memberuser') + remove_failed+=failed + + return remove_failed + + def output_for_cli(self, textui, result, *args, **options): + """ + Output result of this command to command line interface. + """ + if result: + textui.print_plain("These entries failed to be removed from the group:") + for a in result: + print "\t'%s'" % a + else: + textui.print_plain("netgroup membership updated.") + +api.register(netgroup_remove_member) diff --git a/tests/test_xmlrpc/test_netgroup_plugin.py b/tests/test_xmlrpc/test_netgroup_plugin.py new file mode 100644 index 000000000..3d3e4afff --- /dev/null +++ b/tests/test_xmlrpc/test_netgroup_plugin.py @@ -0,0 +1,320 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_netgroup` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +def is_member_of(members, candidate): + if not isinstance(members, list): + members = [members] + for m in members: + if m.startswith(candidate): + return True + return False + +class test_Netgroup(XMLRPC_test): + """ + Test the `f_netgroup` plugin. + """ + ng_cn='ng1' + ng_description='Netgroup' + ng_kw={'cn': ng_cn, 'description': ng_description} + + host_cn='ipaexample.%s' % api.env.domain + host_description='Test host' + host_localityname='Undisclosed location' + host_kw={'cn': host_cn, 'description': host_description, 'localityname': host_localityname} + + hg_cn='ng1' + hg_description='Netgroup' + hg_kw={'cn': hg_cn, 'description': hg_description} + + user_uid='jexample' + user_givenname='Jim' + user_sn='Example' + user_home='/home/%s' % user_uid + user_kw={'givenname':user_givenname,'sn':user_sn,'uid':user_uid,'homedirectory':user_home} + + group_cn='testgroup' + group_description='This is a test' + group_kw={'description':group_description,'cn':group_cn} + + def test_add(self): + """ + Test the `xmlrpc.netgroup_add` method. + """ + res = api.Command['netgroup_add'](**self.ng_kw) + assert res + assert res.get('description','') == self.ng_description + assert res.get('cn','') == self.ng_cn + + def test_adddata(self): + """ + Add the data needed to do additional testing. + """ + + # Add a host + res = api.Command['host_add'](**self.host_kw) + assert res + assert res.get('description','') == self.host_description + assert res.get('cn','') == self.host_cn + + # Add a hostgroup + res = api.Command['hostgroup_add'](**self.hg_kw) + assert res + assert res.get('description','') == self.hg_description + assert res.get('cn','') == self.hg_cn + + # Add a user + res = api.Command['user_add'](**self.user_kw) + assert res + assert res.get('givenname','') == self.user_givenname + assert res.get('uid','') == self.user_uid + + # Add a group + res = api.Command['group_add'](**self.group_kw) + assert res + assert res.get('description','') == self.group_description + assert res.get('cn','') == self.group_cn + + def test_addmembers(self): + """ + Test the `xmlrpc.netgroup_add_member` method. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + def test_addmembers2(self): + """ + Test the `xmlrpc.netgroup_add_member` method again to test dupes. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.host_cn) + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.hg_cn) + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'uid=%s' % self.user_uid) + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.group_cn) + + def test_addexternalmembers(self): + """ + Test adding external hosts + """ + kw={} + kw['hosts'] = "nosuchhost" + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + res = api.Command['netgroup_show'](self.ng_cn) + assert res + assert is_member_of(res.get('externalhost',[]), kw['hosts']) + + def test_doshow(self): + """ + Test the `xmlrpc.netgroup_show` method. + """ + res = api.Command['netgroup_show'](self.ng_cn) + assert res + assert res.get('description','') == self.ng_description + assert res.get('cn','') == self.ng_cn + assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.host_cn) + assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.hg_cn) + assert is_member_of(res.get('memberuser',[]), 'uid=%s' % self.user_uid) + assert is_member_of(res.get('memberuser',[]), 'cn=%s' % self.group_cn) + + def test_find(self): + """ + Test the `xmlrpc.hostgroup_find` method. + """ + res = api.Command['netgroup_find'](self.ng_cn) + assert res + assert len(res) == 2 + assert res[1].get('description','') == self.ng_description + assert res[1].get('cn','') == self.ng_cn + + def test_mod(self): + """ + Test the `xmlrpc.hostgroup_mod` method. + """ + newdesc='Updated host group' + modkw={'cn': self.ng_cn, 'description': newdesc} + res = api.Command['netgroup_mod'](**modkw) + assert res + assert res.get('description','') == newdesc + + # Ok, double-check that it was changed + res = api.Command['netgroup_show'](self.ng_cn) + assert res + assert res.get('description','') == newdesc + assert res.get('cn','') == self.ng_cn + + def test_member_remove(self): + """ + Test the `xmlrpc.hostgroup_remove_member` method. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + def test_member_remove2(self): + """ + Test the `xmlrpc.netgroup_remove_member` method again to test not found. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.host_cn) + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.hg_cn) + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'uid=%s' % self.user_uid) + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.group_cn) + + def test_remove(self): + """ + Test the `xmlrpc.netgroup_del` method. + """ + res = api.Command['netgroup_del'](self.ng_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['netgroup_show'](self.ng_cn) + except errors.NotFound: + pass + else: + assert False + + def test_removedata(self): + """ + Remove the test data we added + """ + # Remove the host + res = api.Command['host_del'](self.host_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['host_show'](self.host_cn) + except errors.NotFound: + pass + else: + assert False + + # Remove the hostgroup + res = api.Command['hostgroup_del'](self.hg_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['hostgroup_show'](self.hg_cn) + except errors.NotFound: + pass + else: + assert False + + # Remove the user + res = api.Command['user_del'](self.user_uid) + assert res == True + + # Verify that it is gone + try: + res = api.Command['user_show'](self.user_uid) + except errors.NotFound: + pass + else: + assert False + + # Remove the group + res = api.Command['group_del'](self.group_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['group_show'](self.group_cn) + except errors.NotFound: + pass + else: + assert False |