From a14c4b5001b09002ae5a6ef239fe9050c7de96f9 Mon Sep 17 00:00:00 2001 From: Lenka Doudova Date: Mon, 24 Aug 2015 12:04:03 +0200 Subject: Automated test for stageuser plugin Ticket: https://fedorahosted.org/freeipa/ticket/3813 Test plan: http://www.freeipa.org/page/V4/User_Life-Cycle_Management/Test_Plan Reviewed-By: Martin Basti Reviewed-By: Thierry Bordaz --- ipatests/test_xmlrpc/test_group_plugin.py | 201 +++++- ipatests/test_xmlrpc/test_stageuser_plugin.py | 888 ++++++++++++++++++++++++++ ipatests/test_xmlrpc/test_user_plugin.py | 338 +++++++++- 3 files changed, 1421 insertions(+), 6 deletions(-) create mode 100644 ipatests/test_xmlrpc/test_stageuser_plugin.py (limited to 'ipatests/test_xmlrpc') diff --git a/ipatests/test_xmlrpc/test_group_plugin.py b/ipatests/test_xmlrpc/test_group_plugin.py index f7f619680..d668b399e 100644 --- a/ipatests/test_xmlrpc/test_group_plugin.py +++ b/ipatests/test_xmlrpc/test_group_plugin.py @@ -21,13 +21,21 @@ Test the `ipalib/plugins/group.py` module. """ +import functools +import pytest + from ipalib import api, errors from ipatests.test_xmlrpc import objectclasses -from ipatests.test_xmlrpc.xmlrpc_test import ( - Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci, add_sid, add_oc) +from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci, + add_sid, add_oc, XMLRPC_test, raises_exact) from ipapython.dn import DN from ipatests.test_xmlrpc.test_user_plugin import get_user_result +from ipatests.test_xmlrpc.ldaptracker import Tracker +from ipatests.test_xmlrpc.test_user_plugin import UserTracker +from ipatests.util import assert_deepequal + + group1 = u'testgroup1' group2 = u'testgroup2' group3 = u'testgroup3' @@ -1146,4 +1154,191 @@ class test_group_full_set_of_objectclass_not_available_post_detach(Declarative): }, ), ), - ] \ No newline at end of file + ] + + +class GroupTracker(Tracker): + """ Class for host plugin like tests """ + retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user', + u'member_group'} + retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'} + + create_keys = retrieve_all_keys + update_keys = retrieve_keys - {u'dn'} + + add_member_keys = retrieve_keys | {u'description'} + + def __init__(self, name): + super(GroupTracker, self).__init__(default_version=None) + self.cn = name + self.dn = get_group_dn(name) + + def make_create_command(self, nonposix=False, external=False, + force=True): + """ Make function that creates a group using 'group-add' """ + return self.make_command('group_add', self.cn, + nonposix=nonposix, external=external) + + def make_delete_command(self): + """ Make function that deletes a group using 'group-del' """ + return self.make_command('group_del', self.cn) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a group using 'group-show' """ + return self.make_command('group_show', self.cn, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that searches for a group using 'group-find' """ + return self.make_command('group_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates a group using 'group-mod' """ + return self.make_command('group_mod', self.cn, **updates) + + def make_add_member_command(self, options={}): + """ Make function that adds a member to a group + Attention: only works for one user OR group! """ + if u'user' in options: + self.attrs[u'member_user'] = [options[u'user']] + elif u'group' in options: + self.attrs[u'member_group'] = [options[u'group']] + self.adds = options + + return self.make_command('group_add_member', self.cn, **options) + + def make_remove_member_command(self, options={}): + """ Make function that removes a member from a group + Attention: only works for one user OR group! """ + if u'user' in options: + del self.attrs[u'member_user'] + elif u'group' in options: + del self.attrs[u'member_group'] + return self.make_command('group_remove_member', self.cn, **options) + + def make_detach_command(self): + """ Make function that detaches a managed group using + 'group-detach' """ + self.exists = True + return self.make_command('group_detach', self.cn) + + def track_create(self): + """ Updates expected state for group creation""" + self.attrs = dict( + dn=get_group_dn(self.cn), + cn=[self.cn], + gidnumber=[fuzzy_digits], + ipauniqueid=[fuzzy_uuid], + objectclass=objectclasses.posixgroup, + ) + self.exists = True + + def check_create(self, result): + """ Checks 'group_add' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Added group "%s"' % self.cn, + result=self.filter_attrs(self.create_keys) + ), result) + + def check_delete(self, result): + """ Checks 'group_del' command result """ + assert_deepequal(dict( + value=[self.cn], + summary=u'Deleted group "%s"' % self.cn, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """ Checks 'group_show' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + value=self.cn, + summary=None, + result=expected + ), result) + + def check_find(self, result, all=False, raw=False): + """ Checks 'group_find' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 group matched', + result=[expected], + ), result) + + def check_update(self, result, extra_keys={}): + """ Checks 'group_mod' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Modified group "%s"' % self.cn, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def check_add_member(self, result): + """ Checks 'group_add_member' command result """ + assert_deepequal(dict( + completed=1, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ), result) + + def check_add_member_negative(self, result): + """ Checks 'group_add_member' command result when expected result + is failure of the operation""" + if u'member_user' in self.attrs: + del self.attrs[u'member_user'] + elif u'member_group' in self.attrs: + del self.attrs[u'member_group'] + + expected = dict( + completed=0, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ) + if u'user' in self.adds: + expected[u'failed'][u'member'][u'user'] = [( + self.adds[u'user'], u'no such entry')] + elif u'group' in self.adds: + expected[u'failed'][u'member'][u'group'] = [( + self.adds[u'group'], u'no such entry')] + + assert_deepequal(expected, result) + + def check_remove_member(self, result): + """ Checks 'group_remove_member' command result """ + assert_deepequal(dict( + completed=1, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ), result) + + def check_detach(self, result): + """ Checks 'group_detach' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Detached group "%s" from user "%s"' % ( + self.cn, self.cn), + result=True + ), result) + + def make_fixture_detach(self, request): + """Make a pytest fixture for this tracker + + The fixture ensures the plugin entry does not exist before + and after the tests that use itself. + """ + def cleanup(): + pass + + request.addfinalizer(cleanup) + + return self diff --git a/ipatests/test_xmlrpc/test_stageuser_plugin.py b/ipatests/test_xmlrpc/test_stageuser_plugin.py new file mode 100644 index 000000000..515191a45 --- /dev/null +++ b/ipatests/test_xmlrpc/test_stageuser_plugin.py @@ -0,0 +1,888 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +""" +Test the `ipalib/plugins/stageuser.py` module. +""" + + +import datetime +import ldap +import re +import functools +import pytest + +from ipalib import api, errors + +from ipatests.test_xmlrpc.ldaptracker import Tracker +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.xmlrpc_test import ( + XMLRPC_test, fuzzy_digits, fuzzy_uuid, fuzzy_password, fuzzy_string, + fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact) + +from ipatests.util import ( + assert_equal, assert_deepequal, assert_not_equal, raises) +from ipapython.dn import DN +from ipatests.test_xmlrpc.test_user_plugin import UserTracker, get_user_dn +from ipatests.test_xmlrpc.test_group_plugin import GroupTracker + +validuser1 = u'tuser1' +validuser2 = u'tuser2' + +uid = u'123' +gid = u'456' +invalidrealm1 = u'suser1@NOTFOUND.ORG' +invalidrealm2 = u'suser1@BAD@NOTFOUND.ORG' + +invaliduser1 = u'+tuser1' +invaliduser2 = u'tuser1234567890123456789012345678901234567890' + +sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X' + 'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI' + 'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2' + 'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm' + 'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01' + '9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF' + '0L public key test') +sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B ' + 'public key test (ssh-rsa)') + +options_ok = [ + {u'cn': u'name'}, + {u'initials': u'in'}, + {u'displayname': u'display'}, + {u'homedirectory': u'/home/homedir'}, + {u'gecos': u'gecos'}, + {u'loginshell': u'/bin/shell'}, + {u'mail': u'email@email.email'}, + {u'title': u'newbie'}, + {u'krbprincipalname': u'kerberos@%s' % api.env.realm}, + {u'krbprincipalname': u'KERBEROS@%s' % api.env.realm}, + {u'street': u'first street'}, + {u'l': u'prague'}, + {u'st': u'czech'}, + {u'postalcode': u'12345'}, + {u'telephonenumber': u'123456789'}, + {u'facsimiletelephonenumber': u'123456789'}, + {u'mobile': u'123456789'}, + {u'pager': u'123456789'}, + {u'ou': u'engineering'}, + {u'carlicense': u'abc1234'}, + {u'ipasshpubkey': sshpubkey}, + {u'manager': u'auser1'}, + {u'uidnumber': uid}, + {u'gidnumber': gid}, + {u'uidnumber': uid, u'gidnumber': gid}, + {u'userpassword': u'Secret123'}, + {u'random': True}, + ] + + +class StageUserTracker(Tracker): + """ Tracker class for staged user LDAP object + + Implements helper functions for host plugin. + StageUserTracker object stores information about the user. + """ + + retrieve_keys = { + u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell', + u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber', + u'title', u'memberof', u'nsaccountlock', u'memberofindirect', + u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink', + u'ipatokenradiususername', u'krbprincipalexpiration', + u'usercertificate', u'dn', u'has_keytab', u'has_password', + u'street', u'postalcode', u'facsimiletelephonenumber', + u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l', + u'st', u'mobile', u'pager', } + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipauniqueid', u'objectclass', u'description', + u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} + + create_keys = retrieve_all_keys | { + u'objectclass', u'ipauniqueid', u'randompassword', + u'userpassword', u'krbextradata', u'krblastpwdchange', + u'krbpasswordexpiration', u'krbprincipalkey'} + + update_keys = retrieve_keys - {u'dn', u'nsaccountlock'} + activate_keys = retrieve_keys | { + u'has_keytab', u'has_password', u'nsaccountlock'} + + def __init__(self, name, givenname, sn, **kwargs): + super(StageUserTracker, self).__init__(default_version=None) + self.uid = name + self.givenname = givenname + self.sn = sn + self.dn = DN( + ('uid', self.uid), api.env.container_stageuser, api.env.basedn) + + self.kwargs = kwargs + + def make_create_command(self, options=None, force=None): + """ Make function that creates a staged user using stageuser-add """ + if options is not None: + self.kwargs = options + return self.make_command('stageuser_add', self.uid, + givenname=self.givenname, + sn=self.sn, **self.kwargs) + + def make_delete_command(self): + """ Make function that deletes a staged user using stageuser-del """ + return self.make_command('stageuser_del', self.uid) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a staged user using stageuser-show """ + return self.make_command('stageuser_show', self.uid, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that finds staged user using stageuser-find """ + return self.make_command('stageuser_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates staged user using stageuser-mod """ + return self.make_command('stageuser_mod', self.uid, **updates) + + def make_activate_command(self): + """ Make function that activates staged user + using stageuser-activate """ + return self.make_command('stageuser_activate', self.uid) + + def track_create(self): + """ Update expected state for staged user creation """ + self.attrs = dict( + dn=self.dn, + uid=[self.uid], + givenname=[self.givenname], + sn=[self.sn], + homedirectory=[u'/home/%s' % self.uid], + displayname=[u'%s %s' % (self.givenname, self.sn)], + cn=[u'%s %s' % (self.givenname, self.sn)], + initials=[u'%s%s' % (self.givenname[0], self.sn[0])], + objectclass=objectclasses.user_base, + description=[u'__no_upg__'], + ipauniqueid=[u'autogenerate'], + uidnumber=[u'-1'], + gidnumber=[u'-1'], + krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], + mail=[u'%s@%s' % (self.uid, self.api.env.domain)], + gecos=[u'%s %s' % (self.givenname, self.sn)], + loginshell=[u'/bin/sh'], + has_keytab=False, + has_password=False, + nsaccountlock=[u'true'], + ) + + for key in self.kwargs: + if key == u'krbprincipalname': + self.attrs[key] = [u'%s@%s' % ( + (self.kwargs[key].split('@'))[0].lower(), + (self.kwargs[key].split('@'))[1])] + elif key == u'manager': + self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))] + elif key == u'ipasshpubkey': + self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp] + self.attrs[key] = [self.kwargs[key]] + elif key == u'random' or key == u'userpassword': + self.attrs[u'krbextradata'] = [fuzzy_string] + self.attrs[u'krbpasswordexpiration'] = [ + fuzzy_dergeneralizedtime] + self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime] + self.attrs[u'krbprincipalkey'] = [fuzzy_string] + self.attrs[u'userpassword'] = [fuzzy_string] + self.attrs[u'has_keytab'] = True + self.attrs[u'has_password'] = True + if key == u'random': + self.attrs[u'randompassword'] = fuzzy_string + else: + self.attrs[key] = [self.kwargs[key]] + + self.exists = True + + def check_create(self, result): + """ Check 'stageuser-add' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Added stage user "%s"' % self.uid, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """ Check 'stageuser-del' command result """ + assert_deepequal(dict( + value=[self.uid], + summary=u'Deleted stage user "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """ Check 'stageuser-show' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different + # type of nsaccountlock value than DS, but overall the value + # fits expected result + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + value=self.uid, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """ Check 'stageuser-find' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different + # type of nsaccountlock value than DS, but overall the value + # fits expected result + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 user matched', + result=[expected], + ), result) + + def check_find_nomatch(self, result): + """ Check 'stageuser-find' command result when no match is expected """ + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 users matched', + result=[], + ), result) + + def check_update(self, result, extra_keys=()): + """ Check 'stageuser-mod' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Modified stage user "%s"' % self.uid, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def check_restore_preserved(self, result): + assert_deepequal(dict( + value=[self.uid], + summary=u'Staged user account "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def make_fixture_activate(self, request): + """Make a pytest fixture for a staged user that is to be activated + + The fixture ensures the plugin entry does not exist before + and after the tests that use it. It takes into account + that the staged user no longer exists after activation, + therefore the fixture verifies after the tests + that the staged user doesn't exist instead of deleting it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass + + def finish(): + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % self.uid)): + del_command() + + request.addfinalizer(finish) + + return self + + def create_from_preserved(self, user): + """ Copies values from preserved user - helper function for + restoration tests """ + self.attrs = user.attrs + self.uid = user.uid + self.givenname = user.givenname + self.sn = user.sn + self.dn = DN( + ('uid', self.uid), api.env.container_stageuser, api.env.basedn) + self.attrs[u'dn'] = self.dn + + +@pytest.fixture(scope='class') +def stageduser(request): + tracker = StageUserTracker(name=u'suser1', givenname=u'staged', sn=u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class', params=options_ok) +def stageduser2(request): + tracker = StageUserTracker(u'suser2', u'staged', u'user', **request.param) + return tracker.make_fixture_activate(request) + + +@pytest.fixture(scope='class') +def stageduser3(request): + tracker = StageUserTracker(name=u'suser3', givenname=u'staged', sn=u'user') + return tracker.make_fixture_activate(request) + + +@pytest.fixture(scope='class') +def stageduser4(request): + tracker = StageUserTracker(u'tuser', u'test', u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def user(request): + tracker = UserTracker(u'auser1', u'active', u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def user2(request): + tracker = UserTracker(u'suser3', u'staged', u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def user3(request): + tracker = UserTracker(u'auser2', u'active', u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def user4(request): + tracker = UserTracker(u'tuser', u'test', u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def user5(request): + tracker = UserTracker(u'tuser', u'test', u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def user6(request): + tracker = UserTracker(u'suser2', u'staged', u'user') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def user7(request): + tracker = UserTracker(u'puser1', u'preserved', u'user') + return tracker.make_fixture_restore(request) + + +class TestNonexistentStagedUser(XMLRPC_test): + def test_retrieve_nonexistent(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_retrieve_command() + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % stageduser.uid)): + command() + + def test_delete_nonexistent(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_delete_command() + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % stageduser.uid)): + command() + + def test_update_nonexistent(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_update_command( + updates=dict(givenname=u'changed')) + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % stageduser.uid)): + command() + + def test_find_nonexistent(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_find_command(uid=stageduser.uid) + result = command() + stageduser.check_find_nomatch(result) + + def test_activate_nonexistent(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_activate_command() + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % stageduser.uid)): + command() + + +class TestStagedUser(XMLRPC_test): + def test_create_duplicate(self, stageduser): + stageduser.ensure_exists() + command = stageduser.make_create_command() + with raises_exact(errors.DuplicateEntry( + message=u'stage user with name "%s" already exists' % + stageduser.uid)): + command() + + def test_activate(self, stageduser3, user2): + stageduser3.ensure_exists() + user2.ensure_missing() + user2 = UserTracker( + stageduser3.uid, stageduser3.givenname, stageduser3.sn) + user2.create_from_staged(stageduser3) + command = stageduser3.make_activate_command() + result = command() + user2.check_activate(result) + + command = stageduser3.make_retrieve_command() + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % stageduser3.uid)): + command() + user2.delete() + + def test_show_stageduser(self, stageduser): + stageduser.retrieve() + + def test_showall_stageduser(self, stageduser): + stageduser.retrieve(all=True) + + def test_create_attr(self, stageduser2, user, user6): + """ Tests creating a user with various valid attributes listed + in 'options_ok' list""" + # create staged user with specified parameters + user.ensure_exists() # necessary for manager test + stageduser2.ensure_missing() + command = stageduser2.make_create_command() + result = command() + stageduser2.track_create() + stageduser2.check_create(result) + + # activate user, verify that specified values were preserved + # after activation + user6.ensure_missing() + user6 = UserTracker( + stageduser2.uid, stageduser2.givenname, + stageduser2.sn, **stageduser2.kwargs) + user6.create_from_staged(stageduser2) + command = stageduser2.make_activate_command() + result = command() + user6.check_activate(result) + + # verify the staged user does not exist after activation + command = stageduser2.make_retrieve_command() + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % stageduser2.uid)): + command() + + user6.delete() + + def test_delete_stageduser(self, stageduser): + stageduser.delete() + + def test_find_stageduser(self, stageduser): + stageduser.find() + + def test_findall_stageduser(self, stageduser): + stageduser.find(all=True) + + def test_update_stageduser(self, stageduser): + stageduser.update(updates=dict(givenname=u'changed',), + expected_updates=dict(givenname=[u'changed'],)) + stageduser.retrieve() + + def test_update_uid(self, stageduser): + stageduser.update(updates=dict(uidnumber=uid), + expected_updates=dict(uidnumber=[uid])) + stageduser.retrieve() + + def test_update_gid(self, stageduser): + stageduser.update(updates=dict(uidnumber=gid), + expected_updates=dict(uidnumber=[gid])) + stageduser.retrieve() + + def test_update_uid_gid(self, stageduser): + stageduser.update(updates=dict(uidnumber=uid, gidnumber=gid), + expected_updates=dict( + uidnumber=[uid], gidnumber=[gid])) + stageduser.retrieve() + + +class TestCreateInvalidAttributes(XMLRPC_test): + def test_create_invalid_uid(self): + invalid = StageUserTracker(invaliduser1, u'invalid', u'user') + command = invalid.make_create_command() + with raises_exact(errors.ValidationError( + name='login', + error=u"may only include letters, numbers, _, -, . and $")): + command() + + def test_create_long_uid(self): + invalid = StageUserTracker(invaliduser2, u'invalid', u'user') + command = invalid.make_create_command() + with raises_exact(errors.ValidationError( + name='login', + error=u"can be at most 32 characters")): + command() + + def test_create_uid_string(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_create_command( + options={u'uidnumber': u'text'}) + with raises_exact(errors.ConversionError( + message=u'invalid \'uid\': must be an integer')): + command() + + def test_create_gid_string(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_create_command( + options={u'gidnumber': u'text'}) + with raises_exact(errors.ConversionError( + message=u'invalid \'gidnumber\': must be an integer')): + command() + + def test_create_uid_negative(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_create_command( + options={u'uidnumber': u'-123'}) + with raises_exact(errors.ValidationError( + message=u'invalid \'uid\': must be at least 1')): + command() + + def test_create_gid_negative(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_create_command( + options={u'gidnumber': u'-123'}) + with raises_exact(errors.ValidationError( + message=u'invalid \'gidnumber\': must be at least 1')): + command() + + def test_create_krbprincipal_bad_realm(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_create_command( + options={u'krbprincipalname': invalidrealm1}) + with raises_exact(errors.RealmMismatch( + message=u'The realm for the principal does not match ' + 'the realm for this IPA server')): + command() + + def test_create_krbprincipal_malformed(self, stageduser): + stageduser.ensure_missing() + command = stageduser.make_create_command( + options={u'krbprincipalname': invalidrealm2}) + with raises_exact(errors.MalformedUserPrincipal( + message=u'Principal is not of the form user@REALM: \'%s\'' % + invalidrealm2)): + command() + + +class TestUpdateInvalidAttributes(XMLRPC_test): + def test_update_uid_string(self, stageduser): + stageduser.ensure_exists() + command = stageduser.make_update_command( + updates={u'uidnumber': u'text'}) + with raises_exact(errors.ConversionError( + message=u'invalid \'uid\': must be an integer')): + command() + + def test_update_gid_string(self, stageduser): + stageduser.ensure_exists() + command = stageduser.make_update_command( + updates={u'gidnumber': u'text'}) + with raises_exact(errors.ConversionError( + message=u'invalid \'gidnumber\': must be an integer')): + command() + + def test_update_uid_negative(self, stageduser): + stageduser.ensure_exists() + command = stageduser.make_update_command( + updates={u'uidnumber': u'-123'}) + with raises_exact(errors.ValidationError( + message=u'invalid \'uid\': must be at least 1')): + command() + + def test_update_gid_negative(self, stageduser): + stageduser.ensure_exists() + command = stageduser.make_update_command( + updates={u'gidnumber': u'-123'}) + with raises_exact(errors.ValidationError( + message=u'invalid \'gidnumber\': must be at least 1')): + command() + + +class TestActive(XMLRPC_test): + def test_delete(self, user): + user.ensure_exists() + user.track_delete() + command = user.make_delete_command() + result = command() + user.check_delete(result) + + def test_delete_nopreserve(self, user): + user.ensure_exists() + user.track_delete() + command = user.make_delete_command(no_preserve=True) + result = command() + user.check_delete(result) + + def test_delete_preserve_nopreserve(self, user): + user.ensure_exists() + command = user.make_delete_command(no_preserve=True, preserve=True) + with raises_exact(errors.MutuallyExclusiveError( + message=u'preserve and no-preserve cannot be both set')): + command() + + def test_delete_preserve(self, user): + user.ensure_exists() + user.track_delete() + command = user.make_delete_command(no_preserve=False, preserve=True) + result = command() + user.check_delete(result) + + command = user.make_delete_command() + result = command() + user.check_delete(result) + + command = user.make_retrieve_command() + with raises_exact(errors.NotFound( + reason=u'%s: user not found' % user.uid)): + command() + + +class TestPreserved(XMLRPC_test): + def test_search_preserved_invalid(self, user): + user.make_preserved_user() + + command = user.make_find_command(uid=user.uid) + result = command() + user.check_find_nomatch(result) + user.delete() + + def test_search_preserved_valid(self, user): + user.make_preserved_user() + + command = user.make_find_command( + uid=user.uid, preserved=True, all=False) + result = command() + user.check_find(result, all=False) + user.delete() + + def test_search_preserved_valid_all(self, user): + user.make_preserved_user() + + command = user.make_find_command( + uid=user.uid, preserved=True, all=True) + result = command() + user.check_find(result, all=True) + user.delete() + + def test_retrieve_preserved(self, user): + user.make_preserved_user() + + command = user.make_retrieve_command() + result = command() + user.check_retrieve(result) + user.delete() + + def test_permanently_delete_preserved_user(self, user): + user.make_preserved_user() + user.delete() + + command = user.make_retrieve_command() + with raises_exact(errors.NotFound( + reason=u'%s: user not found' % user.uid)): + command() + + def test_enable_preserved(self, user): + user.make_preserved_user() + command = user.make_enable_command() + with raises_exact(errors.MidairCollision( + message=u'change collided with another change')): + command() + user.delete() + + def test_reactivate_preserved(self, user): + user.make_preserved_user() + + command = user.make_retrieve_command(all=True) + result = command() + attr_check = { + u'ipauniqueid': result[u'result'][u'ipauniqueid'], + u'uidnumber': result[u'result'][u'uidnumber'], + u'gidnumber': result[u'result'][u'gidnumber'] + } + + command = user.make_undelete_command() + result = command() + user.check_undel(result) + user.check_attr_preservation(attr_check) + + user.delete() + + def test_staged_from_preserved(self, user7, stageduser): + user7.make_preserved_user() + + stageduser.ensure_missing() + stageduser = StageUserTracker(user7.uid, user7.givenname, user7.sn) + stageduser.create_from_preserved(user7) + command = user7.make_stage_command() + result = command() + stageduser.check_restore_preserved(result) + stageduser.exists = True + + command = user7.make_retrieve_command() + with raises_exact(errors.NotFound( + reason=u'%s: user not found' % stageduser.uid)): + command() + + command = stageduser.make_retrieve_command() + result = command() + stageduser.check_retrieve(result) + + stageduser.delete() + + +class TestManagers(XMLRPC_test): + def test_staged_manager(self, user, stageduser): + user.ensure_exists() + stageduser.ensure_exists() + + command = user.make_update_command( + updates=dict(manager=stageduser.uid)) + with raises_exact(errors.NotFound( + reason=u'manager %s not found' % stageduser.uid)): + command() + user.delete() + stageduser.delete() + + def test_preserved_manager(self, user, user3): + user.ensure_exists() + user3.make_preserved_user() + + command = user.make_update_command(updates=dict(manager=user3.uid)) + with raises_exact(errors.NotFound( + reason=u'manager %s not found' % user3.uid)): + command() + + user3.delete() + + def test_delete_manager_preserved(self, user, user3): + user3.ensure_exists() + + user.update( + updates=dict(manager=user3.uid), + expected_updates=dict(manager=[user3.uid], nsaccountlock=False)) + + user3.make_preserved_user() + del user.attrs[u'manager'] + + command = user.make_retrieve_command(all=True) + result = command() + user.check_retrieve(result, all=True) + + # verify whether user has a manager attribute + if u'manager' in result['result']: + assert False + + +class TestDuplicates(XMLRPC_test): + def test_active_same_as_preserved(self, user4, user5): + user4.ensure_missing() + user5.make_preserved_user() + command = user4.make_create_command() + with raises_exact(errors.DuplicateEntry( + message=u'user with name "%s" already exists' % user4.uid)): + command() + user5.delete() + + def test_staged_same_as_active(self, user4, stageduser4): + user4.ensure_exists() + stageduser4.create() # can be created + + command = stageduser4.make_activate_command() + with raises_exact(errors.DuplicateEntry( + message=u'active user with name "%s" already exists' % + user4.uid)): + command() # cannot be activated + + user4.delete() + stageduser4.delete() + + def test_staged_same_as_preserved(self, user5, stageduser4): + user5.make_preserved_user() + stageduser4.create() # can be created + + command = stageduser4.make_activate_command() + with raises_exact(errors.DuplicateEntry( + message=u'This entry already exists')): + command() # cannot be activated + + user5.delete() + stageduser4.delete() + + def test_active_same_as_staged(self, user4, stageduser4): + user4.ensure_missing() + stageduser4.ensure_exists() + command = user4.make_create_command() + result = command() + user4.track_create() + user4.check_create(result) # can be created + + command = stageduser4.make_activate_command() + with raises_exact(errors.DuplicateEntry( + message=u'active user with name "%s" already exists' % + user4.uid)): + command() # cannot be activated + + +@pytest.fixture(scope='class') +def group(request): + tracker = GroupTracker(u'testgroup') + return tracker.make_fixture(request) + + +class TestGroups(XMLRPC_test): + def test_stageduser_membership(self, stageduser, group): + stageduser.ensure_exists() + group.ensure_exists() + command = group.make_add_member_command( + options={u'user': stageduser.uid}) + result = command() + group.check_add_member_negative(result) + + def test_remove_preserved_from_group(self, user, group): + user.ensure_exists() + group.ensure_exists() + command = group.make_add_member_command(options={u'user': user.uid}) + result = command() + group.check_add_member(result) + + command = group.make_retrieve_command() + result = command() + group.check_retrieve(result) + + command = user.make_delete_command(no_preserve=False, preserve=True) + result = command() + user.check_delete(result) + + command = group.make_retrieve_command() + result = command() + + if (u'member_user' in result[u'result'] and + user.uid in result['result']['member_user']): + assert False + + user.delete() + group.delete() + + def test_preserveduser_membership(self, user, group): + user.make_preserved_user() + group.ensure_exists() + command = group.make_add_member_command(options={u'user': user.uid}) + result = command() + group.check_add_member_negative(result) diff --git a/ipatests/test_xmlrpc/test_user_plugin.py b/ipatests/test_xmlrpc/test_user_plugin.py index cc01eb2ee..1809f3d0a 100644 --- a/ipatests/test_xmlrpc/test_user_plugin.py +++ b/ipatests/test_xmlrpc/test_user_plugin.py @@ -23,17 +23,20 @@ Test the `ipalib/plugins/user.py` module. """ +import functools import datetime import ldap import re from ipalib import api, errors from ipatests.test_xmlrpc import objectclasses -from ipatests.util import assert_equal, assert_not_equal, raises -from ipatests.test_xmlrpc.xmlrpc_test import ( +from ipatests.util import ( + assert_equal, assert_not_equal, raises, assert_deepequal) +from xmlrpc_test import ( XMLRPC_test, Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_password, - fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc) + fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact) from ipapython.dn import DN +from ipatests.test_xmlrpc.ldaptracker import Tracker user1 = u'tuser1' user2 = u'tuser2' @@ -1643,3 +1646,332 @@ class test_denied_bind_with_expired_principal(XMLRPC_test): krbprincipalexpiration=principal_expiration_string) self.connection.simple_bind_s(str(get_user_dn(user1)), self.password) + + +class UserTracker(Tracker): + """ Class for host plugin like tests """ + + retrieve_keys = { + u'uid', u'givenname', u'sn', u'homedirectory', + u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou', + u'telephonenumber', u'title', u'memberof', + u'memberofindirect', u'ipauserauthtype', u'userclass', + u'ipatokenradiusconfiglink', u'ipatokenradiususername', + u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab', + u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber', + u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock', + u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata', + u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st' + } + + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry', + u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} + + retrieve_preserved_keys = retrieve_keys - {u'memberof_group'} + retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'} + + create_keys = retrieve_all_keys | { + u'randompassword', u'mepmanagedentry', + u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange', + u'krbprincipalkey', u'randompassword', u'userpassword' + } + update_keys = retrieve_keys - {u'dn'} + activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password', + u'nsaccountlock', u'sshpubkeyfp'} + + find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'} + find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'} + + def __init__(self, name, givenname, sn, **kwargs): + super(UserTracker, self).__init__(default_version=None) + self.uid = name + self.givenname = givenname + self.sn = sn + self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn) + + self.kwargs = kwargs + + def make_create_command(self, force=None): + """ Make function that crates a user using user-add """ + return self.make_command( + 'user_add', self.uid, + givenname=self.givenname, + sn=self.sn, **self.kwargs + ) + + def make_delete_command(self, no_preserve=True, preserve=False): + """ Make function that deletes a user using user-del """ + + if preserve and not no_preserve: + # necessary to change some user attributes due to moving + # to different container + self.attrs[u'dn'] = DN( + ('uid', self.uid), + api.env.container_deleteuser, + api.env.basedn + ) + self.attrs[u'objectclass'] = objectclasses.user_base + + return self.make_command( + 'user_del', self.uid, + no_preserve=no_preserve, + preserve=preserve + ) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a user using user-show """ + return self.make_command('user_show', self.uid, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that finds user using user-find """ + return self.make_command('user_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates user using user-mod """ + return self.make_command('user_mod', self.uid, **updates) + + def make_undelete_command(self): + """ Make function that activates preserved user using user-undel """ + return self.make_command('user_undel', self.uid) + + def make_enable_command(self): + """ Make function that enables user using user-enable """ + return self.make_command('user_enable', self.uid) + + def make_stage_command(self): + """ Make function that restores preserved user by moving it to + staged container """ + return self.make_command('user_stage', self.uid) + + def track_create(self): + """ Update expected state for user creation """ + self.attrs = dict( + dn=self.dn, + uid=[self.uid], + givenname=[self.givenname], + sn=[self.sn], + homedirectory=[u'/home/%s' % self.uid], + displayname=[u'%s %s' % (self.givenname, self.sn)], + cn=[u'%s %s' % (self.givenname, self.sn)], + initials=[u'%s%s' % (self.givenname[0], self.sn[0])], + objectclass=objectclasses.user, + description=[u'__no_upg__'], + ipauniqueid=[fuzzy_uuid], + uidnumber=[fuzzy_digits], + gidnumber=[fuzzy_digits], + krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], + mail=[u'%s@%s' % (self.uid, self.api.env.domain)], + gecos=[u'%s %s' % (self.givenname, self.sn)], + loginshell=[u'/bin/sh'], + has_keytab=False, + has_password=False, + mepmanagedentry=[get_group_dn(self.uid)], + memberof_group=[u'ipausers'], + ) + + for key in self.kwargs: + if key == u'krbprincipalname': + self.attrs[key] = [u'%s@%s' % ( + (self.kwargs[key].split('@'))[0].lower(), + (self.kwargs[key].split('@'))[1] + )] + else: + self.attrs[key] = [self.kwargs[key]] + + self.exists = True + + def check_create(self, result): + """ Check 'user-add' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Added user "%s"' % self.uid, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """ Check 'user-del' command result """ + assert_deepequal(dict( + value=[self.uid], + summary=u'Deleted user "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False): + """ Check 'user-show' command result """ + + if u'preserved' in self.attrs and self.attrs[u'preserved']: + self.retrieve_all_keys = self.retrieve_preserved_all_keys + self.retrieve_keys = self.retrieve_preserved_keys + elif u'preserved' not in self.attrs and all: + self.attrs[u'preserved'] = False + + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different type + # of nsaccountlock value than DS, but overall the value fits + # expected result + if u'nsaccountlock' in expected: + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + value=self.uid, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """ Check 'user-find' command result """ + self.attrs[u'nsaccountlock'] = True + self.attrs[u'preserved'] = True + + if all: + expected = self.filter_attrs(self.find_all_keys) + else: + expected = self.filter_attrs(self.find_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 user matched', + result=[expected], + ), result) + + def check_find_nomatch(self, result): + """ Check 'user-find' command result when no user should be found """ + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 users matched', + result=[], + ), result) + + def check_update(self, result, extra_keys=()): + """ Check 'user-mod' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Modified user "%s"' % self.uid, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def create_from_staged(self, stageduser): + """ Copies attributes from staged user - helper function for + activation tests """ + self.attrs = stageduser.attrs + self.uid = stageduser.uid + self.givenname = stageduser.givenname + self.sn = stageduser.sn + + self.attrs[u'mepmanagedentry'] = None + self.attrs[u'dn'] = self.dn + self.attrs[u'ipauniqueid'] = [fuzzy_uuid] + self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % ( + api.env.container_group, api.env.basedn + )] + self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % ( + self.uid, api.env.container_group, api.env.basedn + )] + self.attrs[u'objectclass'] = objectclasses.user + if self.attrs[u'gidnumber'] == [u'-1']: + self.attrs[u'gidnumber'] = [fuzzy_digits] + if self.attrs[u'uidnumber'] == [u'-1']: + self.attrs[u'uidnumber'] = [fuzzy_digits] + + if u'ipasshpubkey' in self.kwargs: + self.attrs[u'ipasshpubkey'] = [str( + self.kwargs[u'ipasshpubkey'] + )] + + def check_activate(self, result): + """ Check 'stageuser-activate' command result """ + expected = dict( + value=self.uid, + summary=u'Stage user %s activated' % self.uid, + result=self.filter_attrs(self.activate_keys)) + + # work around to eliminate inconsistency in returned objectclass + # (case sensitive assertion) + expected['result']['objectclass'] = [item.lower() for item in + expected['result']['objectclass']] + result['result']['objectclass'] = [item.lower() for item in + result['result']['objectclass']] + + assert_deepequal(expected, result) + + self.exists = True + + def check_undel(self, result): + """ Check 'user-undel' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Undeleted user account "%s"' % self.uid, + result=True + ), result) + + def track_delete(self, preserve=False): + """Update expected state for host deletion""" + if preserve: + self.exists = True + if u'memberof_group' in self.attrs: + del self.attrs[u'memberof_group'] + self.attrs[u'nsaccountlock'] = True + self.attrs[u'preserved'] = True + else: + self.exists = False + self.attrs = {} + + def make_preserved_user(self): + """ 'Creates' a preserved user necessary for some tests """ + self.ensure_exists() + self.track_delete(preserve=True) + command = self.make_delete_command(no_preserve=False, preserve=True) + result = command() + self.check_delete(result) + + def check_attr_preservation(self, expected): + """ Verifies that ipaUniqueID, uidNumber and gidNumber are + preserved upon reactivation. Also verifies that resulting + active user is a member of ipausers group only.""" + command = self.make_retrieve_command(all=True) + result = command() + + assert_deepequal(dict( + ipauniqueid=result[u'result'][u'ipauniqueid'], + uidnumber=result[u'result'][u'uidnumber'], + gidnumber=result[u'result'][u'gidnumber'] + ), expected) + + if (u'memberof_group' not in result[u'result'] or + result[u'result'][u'memberof_group'] != (u'ipausers',)): + assert False + + def make_fixture_restore(self, request): + """Make a pytest fixture for a preserved user that is to be moved to + staged area. + + The fixture ensures the plugin entry does not exist before + and after the tests that use it. It takes into account + that the preserved user no longer exists after restoring it, + therefore the fixture verifies after the tests + that the preserved user doesn't exist instead of deleting it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass + + def finish(): + with raises_exact(errors.NotFound( + reason=u'no such entry')): + del_command() + + request.addfinalizer(finish) + + return self -- cgit