diff options
author | Milan KubĂk <mkubik@redhat.com> | 2015-11-19 16:07:29 +0100 |
---|---|---|
committer | Martin Basti <mbasti@redhat.com> | 2015-12-02 17:12:24 +0100 |
commit | 17f9ca154b47f1e21797d25435e25676fdca284c (patch) | |
tree | fd3f5b2976acd3ca0718c88dbbe35782982be2b6 /ipatests | |
parent | b8c619a7139bd7b65caa03b68431e22791ff19bf (diff) | |
download | freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.gz freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.xz freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.zip |
Separated Tracker implementations into standalone package
The previous way of implementing trackers in the module with
the test caused circular imports. The separate package resolves
this issue.
https://fedorahosted.org/freeipa/ticket/5467
Reviewed-By: Ales 'alich' Marecek <amarecek@redhat.com>
Diffstat (limited to 'ipatests')
-rw-r--r-- | ipatests/setup.py.in | 3 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_caacl_plugin.py | 367 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_caacl_profile_enforcement.py | 4 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_certprofile_plugin.py | 126 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_group_plugin.py | 192 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_host_plugin.py | 144 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_stageuser_plugin.py | 247 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_user_plugin.py | 335 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/__init__.py | 0 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/base.py (renamed from ipatests/test_xmlrpc/ldaptracker.py) | 0 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/caacl_plugin.py | 367 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/certprofile_plugin.py | 133 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/group_plugin.py | 196 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/host_plugin.py | 154 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/stageuser_plugin.py | 267 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/tracker/user_plugin.py | 340 | ||||
-rw-r--r-- | ipatests/util.py | 6 |
17 files changed, 1480 insertions, 1401 deletions
diff --git a/ipatests/setup.py.in b/ipatests/setup.py.in index afc77ad56..ce1efb761 100644 --- a/ipatests/setup.py.in +++ b/ipatests/setup.py.in @@ -76,7 +76,8 @@ def setup_package(): "ipatests.test_ipaserver.test_install", "ipatests.test_pkcs10", "ipatests.test_webui", - "ipatests.test_xmlrpc"], + "ipatests.test_xmlrpc", + "ipatests.test_xmlrpc.tracker"], scripts=['ipa-run-tests', 'ipa-test-config', 'ipa-test-task'], package_data = { 'ipatests': ['pytest.ini'], diff --git a/ipatests/test_xmlrpc/test_caacl_plugin.py b/ipatests/test_xmlrpc/test_caacl_plugin.py index 8b156a65a..d5ded1951 100644 --- a/ipatests/test_xmlrpc/test_caacl_plugin.py +++ b/ipatests/test_xmlrpc/test_caacl_plugin.py @@ -9,373 +9,12 @@ Test the `ipalib.plugins.caacl` module. import pytest from ipalib import errors -from ipatests.test_xmlrpc.ldaptracker import Tracker -from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test, fuzzy_caacldn, - fuzzy_uuid, fuzzy_ipauniqueid) - -from ipatests.test_xmlrpc import objectclasses -from ipatests.util import assert_deepequal +from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test # reuse the fixture from ipatests.test_xmlrpc.test_certprofile_plugin import default_profile -from ipatests.test_xmlrpc.test_stageuser_plugin import StageUserTracker - - -class CAACLTracker(Tracker): - """Tracker class for CA ACL LDAP object. - - The class implements methods required by the base class - to help with basic CRUD operations. - - Methods for adding and deleting actual member entries into an ACL - do not have check methods as these would make the class - unnecessarily complicated. The checks are implemented as - a standalone test suite. However, this makes the test crucial - in debugging more complicated test cases. Since the add/remove - operation won't be checked right away by the tracker, a problem - in this operation may propagate into more complicated test case. - - It is possible to pass a list of member uids to these methods. - - The test uses instances of Fuzzy class to compare results as they - are in the UUID format. The dn and rdn properties were modified - to reflect this as well. - """ - - member_keys = { - u'memberuser_user', u'memberuser_group', - u'memberhost_host', u'memberhost_hostgroup', - u'memberservice_service', - u'ipamembercertprofile_certprofile'} - category_keys = { - u'ipacacategory', u'ipacertprofilecategory', u'usercategory', - u'hostcategory', u'servicecategory'} - retrieve_keys = { - u'dn', u'cn', u'description', u'ipaenabledflag', - u'ipamemberca', u'ipamembercertprofile', u'memberuser', - u'memberhost', u'memberservice'} | member_keys | category_keys - retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'} - create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory', - u'usercategory', u'hostcategory', u'ipacacategory', - u'servicecategory', u'ipaenabledflag', u'objectclass', - u'ipauniqueid'} - update_keys = create_keys - {u'dn'} - - def __init__(self, name, ipacertprofile_category=None, user_category=None, - service_category=None, host_category=None, description=None, - default_version=None): - super(CAACLTracker, self).__init__(default_version=default_version) - - self._name = name - self.description = description - self._categories = dict( - ipacertprofilecategory=ipacertprofile_category, - usercategory=user_category, - servicecategory=service_category, - hostcategory=host_category) - - self.dn = fuzzy_caacldn - - @property - def name(self): - return self._name - - @property - def rdn(self): - return fuzzy_ipauniqueid - - @property - def categories(self): - """To be used in track_create""" - return {cat: v for cat, v in self._categories.items() if v} - - @property - def create_categories(self): - """ Return the categories set on create. - Unused categories are left out. - """ - return {cat: [v] for cat, v in self.categories.items() if v} - - def make_create_command(self, force=True): - return self.make_command(u'caacl_add', self.name, - description=self.description, - **self.categories) - - def check_create(self, result): - assert_deepequal(dict( - value=self.name, - summary=u'Added CA ACL "{}"'.format(self.name), - result=dict(self.filter_attrs(self.create_keys)) - ), result) - - def track_create(self): - self.attrs = dict( - dn=self.dn, - ipauniqueid=[fuzzy_uuid], - cn=[self.name], - objectclass=objectclasses.caacl, - ipaenabledflag=[u'TRUE']) - - self.attrs.update(self.create_categories) - if self.description: - self.attrs.update({u'description', [self.description]}) - - self.exists = True - - def make_delete_command(self): - return self.make_command('caacl_del', self.name) - - def check_delete(self, result): - assert_deepequal(dict( - value=[self.name], - summary=u'Deleted CA ACL "{}"'.format(self.name), - result=dict(failed=[]) - ), result) - - def make_retrieve_command(self, all=False, raw=False): - return self.make_command('caacl_show', self.name, all=all, raw=raw) - - def check_retrieve(self, result, all=False, raw=False): - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - assert_deepequal(dict( - value=self.name, - summary=None, - result=expected - ), result) - - def make_find_command(self, *args, **kwargs): - return self.make_command('caacl_find', *args, **kwargs) - - def check_find(self, result, all=False, raw=False): - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - assert_deepequal(dict( - count=1, - truncated=False, - summary=u'1 CA ACL matched', - result=[expected] - ), result) - - def make_update_command(self, updates): - return self.make_command('caacl_mod', self.name, **updates) - - def update(self, updates, expected_updates=None, silent=False): - """If removing a category, delete it from tracker as well""" - # filter out empty categories and track changes - - filtered_updates = dict() - for key, value in updates.items(): - if key in self.category_keys: - if not value: - try: - del self.attrs[key] - except IndexError: - if silent: - pass - else: - # if there is a value, prepare the pair for update - filtered_updates.update({key: value}) - else: - filtered_updates.update({key: value}) - - if expected_updates is None: - expected_updates = {} - - command = self.make_update_command(updates) - - try: - result = command() - except errors.EmptyModlist: - if silent: - self.attrs.update(filtered_updates) - self.attrs.update(expected_updates) - self.check_update(result, - extra_keys=set(self.update_keys) | - set(expected_updates.keys())) - - def check_update(self, result, extra_keys=()): - assert_deepequal(dict( - value=self.name, - summary=u'Modified CA ACL "{}"'.format(self.name), - result=self.filter_attrs(self.update_keys | set(extra_keys)) - ), result) - - # Helper methods for caacl subcommands. The check methods will be - # implemented in standalone test - # - # The methods implemented here will be: - # caacl_{add,remove}_{host, service, certprofile, user [, subca]} - - def _add_acl_component(self, command_name, keys, track): - """ Add a resource into ACL rule and track it. - - command_name - the name in the API - keys = { - 'tracker_attr': { - 'api_key': 'value' - } - } - - e.g. - - keys = { - 'memberhost_host': { - 'host': 'hostname' - }, - 'memberhost_hostgroup': { - 'hostgroup': 'hostgroup_name' - } - } - """ - - if not self.exists: - raise errors.NotFound(reason="The tracked entry doesn't exist.") - - command = self.make_command(command_name, self.name) - command_options = dict() - - # track - for tracker_attr in keys: - api_options = keys[tracker_attr] - if track: - for option in api_options: - try: - if type(option) in (list, tuple): - self.attrs[tracker_attr].extend(api_options[option]) - else: - self.attrs[tracker_attr].append(api_options[option]) - except KeyError: - if type(option) in (list, tuple): - self.attrs[tracker_attr] = api_options[option] - else: - self.attrs[tracker_attr] = [api_options[option]] - # prepare options for the command call - command_options.update(api_options) - - return command(**command_options) - - def _remove_acl_component(self, command_name, keys, track): - """ Remove a resource from ACL rule and track it. - - command_name - the name in the API - keys = { - 'tracker_attr': { - 'api_key': 'value' - } - } - - e.g. - - keys = { - 'memberhost_host': { - 'host': 'hostname' - }, - 'memberhost_hostgroup': { - 'hostgroup': 'hostgroup_name' - } - } - """ - command = self.make_command(command_name, self.name) - command_options = dict() - - for tracker_attr in keys: - api_options = keys[tracker_attr] - if track: - for option in api_options: - if type(option) in (list, tuple): - for item in option: - self.attrs[tracker_attr].remove(item) - else: - self.attrs[tracker_attr].remove(api_options[option]) - if len(self.attrs[tracker_attr]) == 0: - del self.attrs[tracker_attr] - command_options.update(api_options) - - return command(**command_options) - - def add_host(self, host=None, hostgroup=None, track=True): - """Associates an host or hostgroup entry with the ACL. - - The command takes an unicode string with the name - of the entry (RDN). - - It is the responsibility of a test writer to provide - the correct value, object type as the method does not - verify whether the entry exists. - - The method can add only one entry of each type - in one call. - """ - - options = { - u'memberhost_host': {u'host': host}, - u'memberhost_hostgroup': {u'hostgroup': hostgroup}} - - return self._add_acl_component(u'caacl_add_host', options, track) - - def remove_host(self, host=None, hostgroup=None, track=True): - options = { - u'memberhost_host': {u'host': host}, - u'memberhost_hostgroup': {u'hostgroup': hostgroup}} - - return self._remove_acl_component(u'caacl_remove_host', options, track) - - def add_user(self, user=None, group=None, track=True): - options = { - u'memberuser_user': {u'user': user}, - u'memberuser_group': {u'group': group}} - - return self._add_acl_component(u'caacl_add_user', options, track) - - def remove_user(self, user=None, group=None, track=True): - options = { - u'memberuser_user': {u'user': user}, - u'memberuser_group': {u'group': group}} - - return self._remove_acl_component(u'caacl_remove_user', options, track) - - def add_service(self, service=None, track=True): - options = { - u'memberservice_service': {u'service': service}} - - return self._add_acl_component(u'caacl_add_service', options, track) - - def remove_service(self, service=None, track=True): - options = { - u'memberservice_service': {u'service': service}} - - return self._remove_acl_component(u'caacl_remove_service', options, track) - - def add_profile(self, certprofile=None, track=True): - options = { - u'ipamembercertprofile_certprofile': - {u'certprofile': certprofile}} - - return self._add_acl_component(u'caacl_add_profile', options, track) - - def remove_profile(self, certprofile=None, track=True): - options = { - u'ipamembercertprofile_certprofile': - {u'certprofile': certprofile}} - - return self._remove_acl_component(u'caacl_remove_profile', options, track) - - def enable(self): - command = self.make_command(u'caacl_enable', self.name) - self.attrs.update({u'ipaenabledflag': [u'TRUE']}) - command() - - def disable(self): - command = self.make_command(u'caacl_disable', self.name) - self.attrs.update({u'ipaenabledflag': [u'FALSE']}) - command() +from ipatests.test_xmlrpc.tracker.caacl_plugin import CAACLTracker +from ipatests.test_xmlrpc.tracker.stageuser_plugin import StageUserTracker @pytest.fixture(scope='class') diff --git a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py index 78262ae8c..9de257a26 100644 --- a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py +++ b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py @@ -11,8 +11,8 @@ from ipalib import api, errors from ipatests.util import ( prepare_config, unlock_principal_password, change_principal) from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test -from ipatests.test_xmlrpc.test_certprofile_plugin import CertprofileTracker -from ipatests.test_xmlrpc.test_caacl_plugin import CAACLTracker +from ipatests.test_xmlrpc.tracker.certprofile_plugin import CertprofileTracker +from ipatests.test_xmlrpc.tracker.caacl_plugin import CAACLTracker from ipapython.ipautil import run diff --git a/ipatests/test_xmlrpc/test_certprofile_plugin.py b/ipatests/test_xmlrpc/test_certprofile_plugin.py index 1f06f99f5..66a72de3e 100644 --- a/ipatests/test_xmlrpc/test_certprofile_plugin.py +++ b/ipatests/test_xmlrpc/test_certprofile_plugin.py @@ -12,133 +12,9 @@ import os import pytest from ipalib import api, errors -from ipapython.dn import DN from ipatests.util import prepare_config -from ipatests.test_xmlrpc.ldaptracker import Tracker from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test, raises_exact -from ipatests.test_xmlrpc import objectclasses -from ipatests.util import assert_deepequal - - -class CertprofileTracker(Tracker): - """Tracker class for certprofile plugin. - """ - - retrieve_keys = { - 'dn', 'cn', 'description', 'ipacertprofilestoreissued' - } - retrieve_all_keys = retrieve_keys | {'objectclass'} - create_keys = retrieve_keys | {'objectclass'} - update_keys = retrieve_keys - {'dn'} - managedby_keys = retrieve_keys - allowedto_keys = retrieve_keys - - def __init__(self, name, store=False, desc='dummy description', - profile=None, default_version=None): - super(CertprofileTracker, self).__init__( - default_version=default_version - ) - - self.store = store - self.description = desc - self._profile_path = profile - - self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca', - self.api.env.basedn) - - @property - def profile(self): - if not self._profile_path: - return None - - if os.path.isabs(self._profile_path): - path = self._profile_path - else: - path = os.path.join(os.path.dirname(__file__), - self._profile_path) - - with open(path, 'r') as f: - content = f.read() - return unicode(content) - - def make_create_command(self, force=True): - if not self.profile: - raise RuntimeError('Tracker object without path to profile ' - 'cannot be used to create profile entry.') - - return self.make_command('certprofile_import', self.name, - description=self.description, - ipacertprofilestoreissued=self.store, - file=self.profile) - - def check_create(self, result): - assert_deepequal(dict( - value=self.name, - summary=u'Imported profile "{}"'.format(self.name), - result=dict(self.filter_attrs(self.create_keys)) - ), result) - - def track_create(self): - self.attrs = dict( - dn=unicode(self.dn), - cn=[self.name], - description=[self.description], - ipacertprofilestoreissued=[unicode(self.store).upper()], - objectclass=objectclasses.certprofile - ) - self.exists = True - - def make_delete_command(self): - return self.make_command('certprofile_del', self.name) - - def check_delete(self, result): - assert_deepequal(dict( - value=[self.name], # correctly a list? - summary=u'Deleted profile "{}"'.format(self.name), - result=dict(failed=[]), - ), result) - - def make_retrieve_command(self, all=False, raw=False, **options): - return self.make_command('certprofile_show', self.name, all=all, - raw=raw, **options) - - def check_retrieve(self, result, all=False, raw=False): - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - assert_deepequal(dict( - value=self.name, - summary=None, - result=expected, - ), result) - - def make_find_command(self, *args, **kwargs): - return self.make_command('certprofile_find', *args, **kwargs) - - def check_find(self, result, all=False, raw=False): - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - assert_deepequal(dict( - count=1, - truncated=False, - summary=u'1 profile matched', - result=[expected] - ), result) - - def make_update_command(self, updates): - return self.make_command('certprofile_mod', self.name, **updates) - - def check_update(self, result, extra_keys=()): - assert_deepequal(dict( - value=self.name, - summary=u'Modified Certificate Profile "{}"'.format(self.name), - result=self.filter_attrs(self.update_keys | set(extra_keys)) - ), result) +from ipatests.test_xmlrpc.tracker.certprofile_plugin import CertprofileTracker IPA_CERT_SUBJ_BASE = ( diff --git a/ipatests/test_xmlrpc/test_group_plugin.py b/ipatests/test_xmlrpc/test_group_plugin.py index ed38c696e..f2bd0f4b9 100644 --- a/ipatests/test_xmlrpc/test_group_plugin.py +++ b/ipatests/test_xmlrpc/test_group_plugin.py @@ -26,13 +26,12 @@ import pytest from ipalib import api, errors from ipatests.test_xmlrpc import objectclasses -from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci, +from ipatests.test_xmlrpc.xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci, add_sid, add_oc, XMLRPC_test, raises_exact) from ipapython.dn import DN from ipatests.test_xmlrpc.test_user_plugin import get_user_result -from ipatests.test_xmlrpc.ldaptracker import Tracker -from ipatests.test_xmlrpc.test_user_plugin import UserTracker +from ipatests.test_xmlrpc.tracker.user_plugin import UserTracker from ipatests.util import assert_deepequal @@ -1161,190 +1160,3 @@ class test_group_full_set_of_objectclass_not_available_post_detach(Declarative): ), ), ] - - -class GroupTracker(Tracker): - """ Class for host plugin like tests """ - retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user', - u'member_group'} - retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'} - - create_keys = retrieve_all_keys - update_keys = retrieve_keys - {u'dn'} - - add_member_keys = retrieve_keys | {u'description'} - - def __init__(self, name): - super(GroupTracker, self).__init__(default_version=None) - self.cn = name - self.dn = get_group_dn(name) - - def make_create_command(self, nonposix=False, external=False, - force=True): - """ Make function that creates a group using 'group-add' """ - return self.make_command('group_add', self.cn, - nonposix=nonposix, external=external) - - def make_delete_command(self): - """ Make function that deletes a group using 'group-del' """ - return self.make_command('group_del', self.cn) - - def make_retrieve_command(self, all=False, raw=False): - """ Make function that retrieves a group using 'group-show' """ - return self.make_command('group_show', self.cn, all=all) - - def make_find_command(self, *args, **kwargs): - """ Make function that searches for a group using 'group-find' """ - return self.make_command('group_find', *args, **kwargs) - - def make_update_command(self, updates): - """ Make function that updates a group using 'group-mod' """ - return self.make_command('group_mod', self.cn, **updates) - - def make_add_member_command(self, options={}): - """ Make function that adds a member to a group - Attention: only works for one user OR group! """ - if u'user' in options: - self.attrs[u'member_user'] = [options[u'user']] - elif u'group' in options: - self.attrs[u'member_group'] = [options[u'group']] - self.adds = options - - return self.make_command('group_add_member', self.cn, **options) - - def make_remove_member_command(self, options={}): - """ Make function that removes a member from a group - Attention: only works for one user OR group! """ - if u'user' in options: - del self.attrs[u'member_user'] - elif u'group' in options: - del self.attrs[u'member_group'] - return self.make_command('group_remove_member', self.cn, **options) - - def make_detach_command(self): - """ Make function that detaches a managed group using - 'group-detach' """ - self.exists = True - return self.make_command('group_detach', self.cn) - - def track_create(self): - """ Updates expected state for group creation""" - self.attrs = dict( - dn=get_group_dn(self.cn), - cn=[self.cn], - gidnumber=[fuzzy_digits], - ipauniqueid=[fuzzy_uuid], - objectclass=objectclasses.posixgroup, - ) - self.exists = True - - def check_create(self, result): - """ Checks 'group_add' command result """ - assert_deepequal(dict( - value=self.cn, - summary=u'Added group "%s"' % self.cn, - result=self.filter_attrs(self.create_keys) - ), result) - - def check_delete(self, result): - """ Checks 'group_del' command result """ - assert_deepequal(dict( - value=[self.cn], - summary=u'Deleted group "%s"' % self.cn, - result=dict(failed=[]), - ), result) - - def check_retrieve(self, result, all=False, raw=False): - """ Checks 'group_show' command result """ - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - assert_deepequal(dict( - value=self.cn, - summary=None, - result=expected - ), result) - - def check_find(self, result, all=False, raw=False): - """ Checks 'group_find' command result """ - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - assert_deepequal(dict( - count=1, - truncated=False, - summary=u'1 group matched', - result=[expected], - ), result) - - def check_update(self, result, extra_keys={}): - """ Checks 'group_mod' command result """ - assert_deepequal(dict( - value=self.cn, - summary=u'Modified group "%s"' % self.cn, - result=self.filter_attrs(self.update_keys | set(extra_keys)) - ), result) - - def check_add_member(self, result): - """ Checks 'group_add_member' command result """ - assert_deepequal(dict( - completed=1, - failed={u'member': {u'group': (), u'user': ()}}, - result=self.filter_attrs(self.add_member_keys) - ), result) - - def check_add_member_negative(self, result): - """ Checks 'group_add_member' command result when expected result - is failure of the operation""" - if u'member_user' in self.attrs: - del self.attrs[u'member_user'] - elif u'member_group' in self.attrs: - del self.attrs[u'member_group'] - - expected = dict( - completed=0, - failed={u'member': {u'group': (), u'user': ()}}, - result=self.filter_attrs(self.add_member_keys) - ) - if u'user' in self.adds: - expected[u'failed'][u'member'][u'user'] = [( - self.adds[u'user'], u'no such entry')] - elif u'group' in self.adds: - expected[u'failed'][u'member'][u'group'] = [( - self.adds[u'group'], u'no such entry')] - - assert_deepequal(expected, result) - - def check_remove_member(self, result): - """ Checks 'group_remove_member' command result """ - assert_deepequal(dict( - completed=1, - failed={u'member': {u'group': (), u'user': ()}}, - result=self.filter_attrs(self.add_member_keys) - ), result) - - def check_detach(self, result): - """ Checks 'group_detach' command result """ - assert_deepequal(dict( - value=self.cn, - summary=u'Detached group "%s" from user "%s"' % ( - self.cn, self.cn), - result=True - ), result) - - def make_fixture_detach(self, request): - """Make a pytest fixture for this tracker - - The fixture ensures the plugin entry does not exist before - and after the tests that use itself. - """ - def cleanup(): - pass - - request.addfinalizer(cleanup) - - return self diff --git a/ipatests/test_xmlrpc/test_host_plugin.py b/ipatests/test_xmlrpc/test_host_plugin.py index 868c09ce8..be2b2d6af 100644 --- a/ipatests/test_xmlrpc/test_host_plugin.py +++ b/ipatests/test_xmlrpc/test_host_plugin.py @@ -34,12 +34,12 @@ from ipapython import ipautil from ipalib import api, errors, x509 from ipapython.dn import DN from ipapython.dnsutil import DNSName -from ipatests.test_xmlrpc.ldaptracker import Tracker from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test, fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer, fuzzy_hex, raises_exact) from ipatests.test_xmlrpc.test_user_plugin import get_group_dn from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.tracker.host_plugin import HostTracker from ipatests.test_xmlrpc.testcert import get_testcert from ipatests.util import assert_deepequal @@ -99,148 +99,6 @@ host_cert = get_testcert(DN(('CN', api.env.host), x509.subject_base()), 'host/%s@%s' % (api.env.host, api.env.realm)) -class HostTracker(Tracker): - """Wraps and tracks modifications to a Host object - - Implements the helper functions for host plugin. - - The HostTracker object stores information about the host, e.g. - ``fqdn`` and ``dn``. - """ - retrieve_keys = { - 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host', - 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint', - 'serial_number', 'serial_number_hex', 'sha1_fingerprint', - 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before', - 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user', - 'ipaallowedtoperform_read_keys_group', - 'ipaallowedtoperform_read_keys_host', - 'ipaallowedtoperform_read_keys_hostgroup', - 'ipaallowedtoperform_write_keys_user', - 'ipaallowedtoperform_write_keys_group', - 'ipaallowedtoperform_write_keys_host', - 'ipaallowedtoperform_write_keys_hostgroup'} - retrieve_all_keys = retrieve_keys | { - u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid', - u'managing_host', u'objectclass', u'serverhostname'} - create_keys = retrieve_keys | {'objectclass', 'ipauniqueid', - 'randompassword'} - update_keys = retrieve_keys - {'dn'} - managedby_keys = retrieve_keys - {'has_keytab', 'has_password'} - allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'} - - def __init__(self, name, fqdn=None, default_version=None): - super(HostTracker, self).__init__(default_version=default_version) - - self.shortname = name - if fqdn: - self.fqdn = fqdn - else: - self.fqdn = u'%s.%s' % (name, self.api.env.domain) - self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts', - self.api.env.basedn) - - self.description = u'Test host <%s>' % name - self.location = u'Undisclosed location <%s>' % name - - def make_create_command(self, force=True): - """Make function that creates this host using host_add""" - return self.make_command('host_add', self.fqdn, - description=self.description, - l=self.location, - force=force) - - def make_delete_command(self): - """Make function that deletes the host using host_del""" - return self.make_command('host_del', self.fqdn) - - def make_retrieve_command(self, all=False, raw=False): - """Make function that retrieves the host using host_show""" - return self.make_command('host_show', self.fqdn, all=all, raw=raw) - - def make_find_command(self, *args, **kwargs): - """Make function that finds hosts using host_find - - Note that the fqdn (or other search terms) needs to be specified - in arguments. - """ - return self.make_command('host_find', *args, **kwargs) - - def make_update_command(self, updates): - """Make function that modifies the host using host_mod""" - return self.make_command('host_mod', self.fqdn, **updates) - - def track_create(self): - """Update expected state for host creation""" - self.attrs = dict( - dn=self.dn, - fqdn=[self.fqdn], - description=[self.description], - l=[self.location], - krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[self.fqdn], - has_keytab=False, - has_password=False, - cn=[self.fqdn], - ipakrbokasdelegate=False, - ipakrbrequirespreauth=True, - managing_host=[self.fqdn], - serverhostname=[self.shortname], - ) - self.exists = True - - def check_create(self, result): - """Check `host_add` command result""" - assert_deepequal(dict( - value=self.fqdn, - summary=u'Added host "%s"' % self.fqdn, - result=self.filter_attrs(self.create_keys), - ), result) - - def check_delete(self, result): - """Check `host_del` command result""" - assert_deepequal(dict( - value=[self.fqdn], - summary=u'Deleted host "%s"' % self.fqdn, - result=dict(failed=[]), - ), result) - - def check_retrieve(self, result, all=False, raw=False): - """Check `host_show` command result""" - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - assert_deepequal(dict( - value=self.fqdn, - summary=None, - result=expected, - ), result) - - def check_find(self, result, all=False, raw=False): - """Check `host_find` command result""" - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - assert_deepequal(dict( - count=1, - truncated=False, - summary=u'1 host matched', - result=[expected], - ), result) - - def check_update(self, result, extra_keys=()): - """Check `host_update` command result""" - assert_deepequal(dict( - value=self.fqdn, - summary=u'Modified host "%s"' % self.fqdn, - result=self.filter_attrs(self.update_keys | set(extra_keys)) - ), result) - - @pytest.fixture(scope='class') def host(request): tracker = HostTracker(name=u'testhost1') diff --git a/ipatests/test_xmlrpc/test_stageuser_plugin.py b/ipatests/test_xmlrpc/test_stageuser_plugin.py index 43c59b7c7..4eb968451 100644 --- a/ipatests/test_xmlrpc/test_stageuser_plugin.py +++ b/ipatests/test_xmlrpc/test_stageuser_plugin.py @@ -17,17 +17,17 @@ import six from ipalib import api, errors -from ipatests.test_xmlrpc.ldaptracker import Tracker from ipatests.test_xmlrpc import objectclasses from ipatests.test_xmlrpc.xmlrpc_test import ( XMLRPC_test, fuzzy_digits, fuzzy_uuid, fuzzy_password, fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact) from ipatests.util import ( - assert_equal, assert_deepequal, assert_not_equal, raises) + assert_equal, assert_deepequal, assert_not_equal, raises, get_user_dn) from ipapython.dn import DN -from ipatests.test_xmlrpc.test_user_plugin import UserTracker, get_user_dn -from ipatests.test_xmlrpc.test_group_plugin import GroupTracker +from ipatests.test_xmlrpc.tracker.user_plugin import UserTracker +from ipatests.test_xmlrpc.tracker.group_plugin import GroupTracker +from ipatests.test_xmlrpc.tracker.stageuser_plugin import StageUserTracker if six.PY3: unicode = str @@ -84,245 +84,6 @@ options_ok = [ ] -class StageUserTracker(Tracker): - """ Tracker class for staged user LDAP object - - Implements helper functions for host plugin. - StageUserTracker object stores information about the user. - """ - - retrieve_keys = { - u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell', - u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber', - u'title', u'memberof', u'nsaccountlock', u'memberofindirect', - u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink', - u'ipatokenradiususername', u'krbprincipalexpiration', - u'usercertificate', u'dn', u'has_keytab', u'has_password', - u'street', u'postalcode', u'facsimiletelephonenumber', - u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l', - u'st', u'mobile', u'pager', } - retrieve_all_keys = retrieve_keys | { - u'cn', u'ipauniqueid', u'objectclass', u'description', - u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} - - create_keys = retrieve_all_keys | { - u'objectclass', u'ipauniqueid', u'randompassword', - u'userpassword', u'krbextradata', u'krblastpwdchange', - u'krbpasswordexpiration', u'krbprincipalkey'} - - update_keys = retrieve_keys - {u'dn', u'nsaccountlock'} - activate_keys = retrieve_keys | { - u'has_keytab', u'has_password', u'nsaccountlock'} - - def __init__(self, name, givenname, sn, **kwargs): - super(StageUserTracker, self).__init__(default_version=None) - self.uid = name - self.givenname = givenname - self.sn = sn - self.dn = DN( - ('uid', self.uid), api.env.container_stageuser, api.env.basedn) - - self.kwargs = kwargs - - def make_create_command(self, options=None, force=None): - """ Make function that creates a staged user using stageuser-add """ - if options is not None: - self.kwargs = options - return self.make_command('stageuser_add', self.uid, - givenname=self.givenname, - sn=self.sn, **self.kwargs) - - def make_delete_command(self): - """ Make function that deletes a staged user using stageuser-del """ - return self.make_command('stageuser_del', self.uid) - - def make_retrieve_command(self, all=False, raw=False): - """ Make function that retrieves a staged user using stageuser-show """ - return self.make_command('stageuser_show', self.uid, all=all) - - def make_find_command(self, *args, **kwargs): - """ Make function that finds staged user using stageuser-find """ - return self.make_command('stageuser_find', *args, **kwargs) - - def make_update_command(self, updates): - """ Make function that updates staged user using stageuser-mod """ - return self.make_command('stageuser_mod', self.uid, **updates) - - def make_activate_command(self): - """ Make function that activates staged user - using stageuser-activate """ - return self.make_command('stageuser_activate', self.uid) - - def track_create(self): - """ Update expected state for staged user creation """ - self.attrs = dict( - dn=self.dn, - uid=[self.uid], - givenname=[self.givenname], - sn=[self.sn], - homedirectory=[u'/home/%s' % self.uid], - displayname=[u'%s %s' % (self.givenname, self.sn)], - cn=[u'%s %s' % (self.givenname, self.sn)], - initials=[u'%s%s' % (self.givenname[0], self.sn[0])], - objectclass=objectclasses.user_base, - description=[u'__no_upg__'], - ipauniqueid=[u'autogenerate'], - uidnumber=[u'-1'], - gidnumber=[u'-1'], - krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], - mail=[u'%s@%s' % (self.uid, self.api.env.domain)], - gecos=[u'%s %s' % (self.givenname, self.sn)], - loginshell=[u'/bin/sh'], - has_keytab=False, - has_password=False, - nsaccountlock=[u'true'], - ) - - for key in self.kwargs: - if key == u'krbprincipalname': - self.attrs[key] = [u'%s@%s' % ( - (self.kwargs[key].split('@'))[0].lower(), - (self.kwargs[key].split('@'))[1])] - elif key == u'manager': - self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))] - elif key == u'ipasshpubkey': - self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp] - self.attrs[key] = [self.kwargs[key]] - elif key == u'random' or key == u'userpassword': - self.attrs[u'krbextradata'] = [fuzzy_string] - self.attrs[u'krbpasswordexpiration'] = [ - fuzzy_dergeneralizedtime] - self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime] - self.attrs[u'krbprincipalkey'] = [fuzzy_string] - self.attrs[u'userpassword'] = [fuzzy_string] - self.attrs[u'has_keytab'] = True - self.attrs[u'has_password'] = True - if key == u'random': - self.attrs[u'randompassword'] = fuzzy_string - else: - self.attrs[key] = [self.kwargs[key]] - - self.exists = True - - def check_create(self, result): - """ Check 'stageuser-add' command result """ - assert_deepequal(dict( - value=self.uid, - summary=u'Added stage user "%s"' % self.uid, - result=self.filter_attrs(self.create_keys), - ), result) - - def check_delete(self, result): - """ Check 'stageuser-del' command result """ - assert_deepequal(dict( - value=[self.uid], - summary=u'Deleted stage user "%s"' % self.uid, - result=dict(failed=[]), - ), result) - - def check_retrieve(self, result, all=False, raw=False): - """ Check 'stageuser-show' command result """ - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - # small override because stageuser-find returns different - # type of nsaccountlock value than DS, but overall the value - # fits expected result - if expected[u'nsaccountlock'] == [u'true']: - expected[u'nsaccountlock'] = True - elif expected[u'nsaccountlock'] == [u'false']: - expected[u'nsaccountlock'] = False - - assert_deepequal(dict( - value=self.uid, - summary=None, - result=expected, - ), result) - - def check_find(self, result, all=False, raw=False): - """ Check 'stageuser-find' command result """ - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - # small override because stageuser-find returns different - # type of nsaccountlock value than DS, but overall the value - # fits expected result - if expected[u'nsaccountlock'] == [u'true']: - expected[u'nsaccountlock'] = True - elif expected[u'nsaccountlock'] == [u'false']: - expected[u'nsaccountlock'] = False - - assert_deepequal(dict( - count=1, - truncated=False, - summary=u'1 user matched', - result=[expected], - ), result) - - def check_find_nomatch(self, result): - """ Check 'stageuser-find' command result when no match is expected """ - assert_deepequal(dict( - count=0, - truncated=False, - summary=u'0 users matched', - result=[], - ), result) - - def check_update(self, result, extra_keys=()): - """ Check 'stageuser-mod' command result """ - assert_deepequal(dict( - value=self.uid, - summary=u'Modified stage user "%s"' % self.uid, - result=self.filter_attrs(self.update_keys | set(extra_keys)) - ), result) - - def check_restore_preserved(self, result): - assert_deepequal(dict( - value=[self.uid], - summary=u'Staged user account "%s"' % self.uid, - result=dict(failed=[]), - ), result) - - def make_fixture_activate(self, request): - """Make a pytest fixture for a staged user that is to be activated - - The fixture ensures the plugin entry does not exist before - and after the tests that use it. It takes into account - that the staged user no longer exists after activation, - therefore the fixture verifies after the tests - that the staged user doesn't exist instead of deleting it. - """ - del_command = self.make_delete_command() - try: - del_command() - except errors.NotFound: - pass - - def finish(): - with raises_exact(errors.NotFound( - reason=u'%s: stage user not found' % self.uid)): - del_command() - - request.addfinalizer(finish) - - return self - - def create_from_preserved(self, user): - """ Copies values from preserved user - helper function for - restoration tests """ - self.attrs = user.attrs - self.uid = user.uid - self.givenname = user.givenname - self.sn = user.sn - self.dn = DN( - ('uid', self.uid), api.env.container_stageuser, api.env.basedn) - self.attrs[u'dn'] = self.dn - - @pytest.fixture(scope='class') def stageduser(request): tracker = StageUserTracker(name=u'suser1', givenname=u'staged', sn=u'user') diff --git a/ipatests/test_xmlrpc/test_user_plugin.py b/ipatests/test_xmlrpc/test_user_plugin.py index 81185e449..084fb83c4 100644 --- a/ipatests/test_xmlrpc/test_user_plugin.py +++ b/ipatests/test_xmlrpc/test_user_plugin.py @@ -23,7 +23,6 @@ Test the `ipalib/plugins/user.py` module. """ -import functools import datetime import ldap import re @@ -31,12 +30,11 @@ import re from ipalib import api, errors from ipatests.test_xmlrpc import objectclasses from ipatests.util import ( - assert_equal, assert_not_equal, raises, assert_deepequal) + assert_equal, assert_not_equal, raises) from xmlrpc_test import ( XMLRPC_test, Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_password, - fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact) + fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc) from ipapython.dn import DN -from ipatests.test_xmlrpc.ldaptracker import Tracker import pytest user1 = u'tuser1' @@ -1654,332 +1652,3 @@ class test_denied_bind_with_expired_principal(XMLRPC_test): krbprincipalexpiration=principal_expiration_string) self.connection.simple_bind_s(str(get_user_dn(user1)), self.password) - - -class UserTracker(Tracker): - """ Class for host plugin like tests """ - - retrieve_keys = { - u'uid', u'givenname', u'sn', u'homedirectory', - u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou', - u'telephonenumber', u'title', u'memberof', - u'memberofindirect', u'ipauserauthtype', u'userclass', - u'ipatokenradiusconfiglink', u'ipatokenradiususername', - u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab', - u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber', - u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock', - u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata', - u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st' - } - - retrieve_all_keys = retrieve_keys | { - u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry', - u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} - - retrieve_preserved_keys = retrieve_keys - {u'memberof_group'} - retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'} - - create_keys = retrieve_all_keys | { - u'randompassword', u'mepmanagedentry', - u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange', - u'krbprincipalkey', u'randompassword', u'userpassword' - } - update_keys = retrieve_keys - {u'dn'} - activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password', - u'nsaccountlock', u'sshpubkeyfp'} - - find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'} - find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'} - - def __init__(self, name, givenname, sn, **kwargs): - super(UserTracker, self).__init__(default_version=None) - self.uid = name - self.givenname = givenname - self.sn = sn - self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn) - - self.kwargs = kwargs - - def make_create_command(self, force=None): - """ Make function that crates a user using user-add """ - return self.make_command( - 'user_add', self.uid, - givenname=self.givenname, - sn=self.sn, **self.kwargs - ) - - def make_delete_command(self, no_preserve=True, preserve=False): - """ Make function that deletes a user using user-del """ - - if preserve and not no_preserve: - # necessary to change some user attributes due to moving - # to different container - self.attrs[u'dn'] = DN( - ('uid', self.uid), - api.env.container_deleteuser, - api.env.basedn - ) - self.attrs[u'objectclass'] = objectclasses.user_base - - return self.make_command( - 'user_del', self.uid, - no_preserve=no_preserve, - preserve=preserve - ) - - def make_retrieve_command(self, all=False, raw=False): - """ Make function that retrieves a user using user-show """ - return self.make_command('user_show', self.uid, all=all) - - def make_find_command(self, *args, **kwargs): - """ Make function that finds user using user-find """ - return self.make_command('user_find', *args, **kwargs) - - def make_update_command(self, updates): - """ Make function that updates user using user-mod """ - return self.make_command('user_mod', self.uid, **updates) - - def make_undelete_command(self): - """ Make function that activates preserved user using user-undel """ - return self.make_command('user_undel', self.uid) - - def make_enable_command(self): - """ Make function that enables user using user-enable """ - return self.make_command('user_enable', self.uid) - - def make_stage_command(self): - """ Make function that restores preserved user by moving it to - staged container """ - return self.make_command('user_stage', self.uid) - - def track_create(self): - """ Update expected state for user creation """ - self.attrs = dict( - dn=self.dn, - uid=[self.uid], - givenname=[self.givenname], - sn=[self.sn], - homedirectory=[u'/home/%s' % self.uid], - displayname=[u'%s %s' % (self.givenname, self.sn)], - cn=[u'%s %s' % (self.givenname, self.sn)], - initials=[u'%s%s' % (self.givenname[0], self.sn[0])], - objectclass=objectclasses.user, - description=[u'__no_upg__'], - ipauniqueid=[fuzzy_uuid], - uidnumber=[fuzzy_digits], - gidnumber=[fuzzy_digits], - krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], - mail=[u'%s@%s' % (self.uid, self.api.env.domain)], - gecos=[u'%s %s' % (self.givenname, self.sn)], - loginshell=[u'/bin/sh'], - has_keytab=False, - has_password=False, - mepmanagedentry=[get_group_dn(self.uid)], - memberof_group=[u'ipausers'], - ) - - for key in self.kwargs: - if key == u'krbprincipalname': - self.attrs[key] = [u'%s@%s' % ( - (self.kwargs[key].split('@'))[0].lower(), - (self.kwargs[key].split('@'))[1] - )] - else: - self.attrs[key] = [self.kwargs[key]] - - self.exists = True - - def check_create(self, result): - """ Check 'user-add' command result """ - assert_deepequal(dict( - value=self.uid, - summary=u'Added user "%s"' % self.uid, - result=self.filter_attrs(self.create_keys), - ), result) - - def check_delete(self, result): - """ Check 'user-del' command result """ - assert_deepequal(dict( - value=[self.uid], - summary=u'Deleted user "%s"' % self.uid, - result=dict(failed=[]), - ), result) - - def check_retrieve(self, result, all=False): - """ Check 'user-show' command result """ - - if u'preserved' in self.attrs and self.attrs[u'preserved']: - self.retrieve_all_keys = self.retrieve_preserved_all_keys - self.retrieve_keys = self.retrieve_preserved_keys - elif u'preserved' not in self.attrs and all: - self.attrs[u'preserved'] = False - - if all: - expected = self.filter_attrs(self.retrieve_all_keys) - else: - expected = self.filter_attrs(self.retrieve_keys) - - # small override because stageuser-find returns different type - # of nsaccountlock value than DS, but overall the value fits - # expected result - if u'nsaccountlock' in expected: - if expected[u'nsaccountlock'] == [u'true']: - expected[u'nsaccountlock'] = True - elif expected[u'nsaccountlock'] == [u'false']: - expected[u'nsaccountlock'] = False - - assert_deepequal(dict( - value=self.uid, - summary=None, - result=expected, - ), result) - - def check_find(self, result, all=False, raw=False): - """ Check 'user-find' command result """ - self.attrs[u'nsaccountlock'] = True - self.attrs[u'preserved'] = True - - if all: - expected = self.filter_attrs(self.find_all_keys) - else: - expected = self.filter_attrs(self.find_keys) - - assert_deepequal(dict( - count=1, - truncated=False, - summary=u'1 user matched', - result=[expected], - ), result) - - def check_find_nomatch(self, result): - """ Check 'user-find' command result when no user should be found """ - assert_deepequal(dict( - count=0, - truncated=False, - summary=u'0 users matched', - result=[], - ), result) - - def check_update(self, result, extra_keys=()): - """ Check 'user-mod' command result """ - assert_deepequal(dict( - value=self.uid, - summary=u'Modified user "%s"' % self.uid, - result=self.filter_attrs(self.update_keys | set(extra_keys)) - ), result) - - def create_from_staged(self, stageduser): - """ Copies attributes from staged user - helper function for - activation tests """ - self.attrs = stageduser.attrs - self.uid = stageduser.uid - self.givenname = stageduser.givenname - self.sn = stageduser.sn - - self.attrs[u'mepmanagedentry'] = None - self.attrs[u'dn'] = self.dn - self.attrs[u'ipauniqueid'] = [fuzzy_uuid] - self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % ( - api.env.container_group, api.env.basedn - )] - self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % ( - self.uid, api.env.container_group, api.env.basedn - )] - self.attrs[u'objectclass'] = objectclasses.user - if self.attrs[u'gidnumber'] == [u'-1']: - self.attrs[u'gidnumber'] = [fuzzy_digits] - if self.attrs[u'uidnumber'] == [u'-1']: - self.attrs[u'uidnumber'] = [fuzzy_digits] - - if u'ipasshpubkey' in self.kwargs: - self.attrs[u'ipasshpubkey'] = [str( - self.kwargs[u'ipasshpubkey'] - )] - - def check_activate(self, result): - """ Check 'stageuser-activate' command result """ - expected = dict( - value=self.uid, - summary=u'Stage user %s activated' % self.uid, - result=self.filter_attrs(self.activate_keys)) - - # work around to eliminate inconsistency in returned objectclass - # (case sensitive assertion) - expected['result']['objectclass'] = [item.lower() for item in - expected['result']['objectclass']] - result['result']['objectclass'] = [item.lower() for item in - result['result']['objectclass']] - - assert_deepequal(expected, result) - - self.exists = True - - def check_undel(self, result): - """ Check 'user-undel' command result """ - assert_deepequal(dict( - value=self.uid, - summary=u'Undeleted user account "%s"' % self.uid, - result=True - ), result) - - def track_delete(self, preserve=False): - """Update expected state for host deletion""" - if preserve: - self.exists = True - if u'memberof_group' in self.attrs: - del self.attrs[u'memberof_group'] - self.attrs[u'nsaccountlock'] = True - self.attrs[u'preserved'] = True - else: - self.exists = False - self.attrs = {} - - def make_preserved_user(self): - """ 'Creates' a preserved user necessary for some tests """ - self.ensure_exists() - self.track_delete(preserve=True) - command = self.make_delete_command(no_preserve=False, preserve=True) - result = command() - self.check_delete(result) - - def check_attr_preservation(self, expected): - """ Verifies that ipaUniqueID, uidNumber and gidNumber are - preserved upon reactivation. Also verifies that resulting - active user is a member of ipausers group only.""" - command = self.make_retrieve_command(all=True) - result = command() - - assert_deepequal(dict( - ipauniqueid=result[u'result'][u'ipauniqueid'], - uidnumber=result[u'result'][u'uidnumber'], - gidnumber=result[u'result'][u'gidnumber'] - ), expected) - - if (u'memberof_group' not in result[u'result'] or - result[u'result'][u'memberof_group'] != (u'ipausers',)): - assert False - - def make_fixture_restore(self, request): - """Make a pytest fixture for a preserved user that is to be moved to - staged area. - - The fixture ensures the plugin entry does not exist before - and after the tests that use it. It takes into account - that the preserved user no longer exists after restoring it, - therefore the fixture verifies after the tests - that the preserved user doesn't exist instead of deleting it. - """ - del_command = self.make_delete_command() - try: - del_command() - except errors.NotFound: - pass - - def finish(): - with raises_exact(errors.NotFound( - reason=u'%s: user not found' % self.uid)): - del_command() - - request.addfinalizer(finish) - - return self diff --git a/ipatests/test_xmlrpc/tracker/__init__.py b/ipatests/test_xmlrpc/tracker/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/__init__.py diff --git a/ipatests/test_xmlrpc/ldaptracker.py b/ipatests/test_xmlrpc/tracker/base.py index acd382dd3..acd382dd3 100644 --- a/ipatests/test_xmlrpc/ldaptracker.py +++ b/ipatests/test_xmlrpc/tracker/base.py diff --git a/ipatests/test_xmlrpc/tracker/caacl_plugin.py b/ipatests/test_xmlrpc/tracker/caacl_plugin.py new file mode 100644 index 000000000..afe7ee0c0 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/caacl_plugin.py @@ -0,0 +1,367 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from ipalib import errors +from ipatests.util import assert_deepequal +from ipatests.test_xmlrpc.xmlrpc_test import (fuzzy_caacldn, fuzzy_uuid, + fuzzy_ipauniqueid) +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.tracker.base import Tracker + + +class CAACLTracker(Tracker): + """Tracker class for CA ACL LDAP object. + + The class implements methods required by the base class + to help with basic CRUD operations. + + Methods for adding and deleting actual member entries into an ACL + do not have check methods as these would make the class + unnecessarily complicated. The checks are implemented as + a standalone test suite. However, this makes the test crucial + in debugging more complicated test cases. Since the add/remove + operation won't be checked right away by the tracker, a problem + in this operation may propagate into more complicated test case. + + It is possible to pass a list of member uids to these methods. + + The test uses instances of Fuzzy class to compare results as they + are in the UUID format. The dn and rdn properties were modified + to reflect this as well. + """ + + member_keys = { + u'memberuser_user', u'memberuser_group', + u'memberhost_host', u'memberhost_hostgroup', + u'memberservice_service', + u'ipamembercertprofile_certprofile'} + category_keys = { + u'ipacacategory', u'ipacertprofilecategory', u'usercategory', + u'hostcategory', u'servicecategory'} + retrieve_keys = { + u'dn', u'cn', u'description', u'ipaenabledflag', + u'ipamemberca', u'ipamembercertprofile', u'memberuser', + u'memberhost', u'memberservice'} | member_keys | category_keys + retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'} + create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory', + u'usercategory', u'hostcategory', u'ipacacategory', + u'servicecategory', u'ipaenabledflag', u'objectclass', + u'ipauniqueid'} + update_keys = create_keys - {u'dn'} + + def __init__(self, name, ipacertprofile_category=None, user_category=None, + service_category=None, host_category=None, description=None, + default_version=None): + super(CAACLTracker, self).__init__(default_version=default_version) + + self._name = name + self.description = description + self._categories = dict( + ipacertprofilecategory=ipacertprofile_category, + usercategory=user_category, + servicecategory=service_category, + hostcategory=host_category) + + self.dn = fuzzy_caacldn + + @property + def name(self): + return self._name + + @property + def rdn(self): + return fuzzy_ipauniqueid + + @property + def categories(self): + """To be used in track_create""" + return {cat: v for cat, v in self._categories.items() if v} + + @property + def create_categories(self): + """ Return the categories set on create. + Unused categories are left out. + """ + return {cat: [v] for cat, v in self.categories.items() if v} + + def make_create_command(self, force=True): + return self.make_command(u'caacl_add', self.name, + description=self.description, + **self.categories) + + def check_create(self, result): + assert_deepequal(dict( + value=self.name, + summary=u'Added CA ACL "{}"'.format(self.name), + result=dict(self.filter_attrs(self.create_keys)) + ), result) + + def track_create(self): + self.attrs = dict( + dn=self.dn, + ipauniqueid=[fuzzy_uuid], + cn=[self.name], + objectclass=objectclasses.caacl, + ipaenabledflag=[u'TRUE']) + + self.attrs.update(self.create_categories) + if self.description: + self.attrs.update({u'description', [self.description]}) + + self.exists = True + + def make_delete_command(self): + return self.make_command('caacl_del', self.name) + + def check_delete(self, result): + assert_deepequal(dict( + value=[self.name], + summary=u'Deleted CA ACL "{}"'.format(self.name), + result=dict(failed=[]) + ), result) + + def make_retrieve_command(self, all=False, raw=False): + return self.make_command('caacl_show', self.name, all=all, raw=raw) + + def check_retrieve(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + value=self.name, + summary=None, + result=expected + ), result) + + def make_find_command(self, *args, **kwargs): + return self.make_command('caacl_find', *args, **kwargs) + + def check_find(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 CA ACL matched', + result=[expected] + ), result) + + def make_update_command(self, updates): + return self.make_command('caacl_mod', self.name, **updates) + + def update(self, updates, expected_updates=None, silent=False): + """If removing a category, delete it from tracker as well""" + # filter out empty categories and track changes + + filtered_updates = dict() + for key, value in updates.items(): + if key in self.category_keys: + if not value: + try: + del self.attrs[key] + except IndexError: + if silent: + pass + else: + # if there is a value, prepare the pair for update + filtered_updates.update({key: value}) + else: + filtered_updates.update({key: value}) + + if expected_updates is None: + expected_updates = {} + + command = self.make_update_command(updates) + + try: + result = command() + except errors.EmptyModlist: + if silent: + self.attrs.update(filtered_updates) + self.attrs.update(expected_updates) + self.check_update(result, + extra_keys=set(self.update_keys) | + set(expected_updates.keys())) + + def check_update(self, result, extra_keys=()): + assert_deepequal(dict( + value=self.name, + summary=u'Modified CA ACL "{}"'.format(self.name), + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + # Helper methods for caacl subcommands. The check methods will be + # implemented in standalone test + # + # The methods implemented here will be: + # caacl_{add,remove}_{host, service, certprofile, user [, subca]} + + def _add_acl_component(self, command_name, keys, track): + """ Add a resource into ACL rule and track it. + + command_name - the name in the API + keys = { + 'tracker_attr': { + 'api_key': 'value' + } + } + + e.g. + + keys = { + 'memberhost_host': { + 'host': 'hostname' + }, + 'memberhost_hostgroup': { + 'hostgroup': 'hostgroup_name' + } + } + """ + + if not self.exists: + raise errors.NotFound(reason="The tracked entry doesn't exist.") + + command = self.make_command(command_name, self.name) + command_options = dict() + + # track + for tracker_attr in keys: + api_options = keys[tracker_attr] + if track: + for option in api_options: + try: + if type(option) in (list, tuple): + self.attrs[tracker_attr].extend(api_options[option]) + else: + self.attrs[tracker_attr].append(api_options[option]) + except KeyError: + if type(option) in (list, tuple): + self.attrs[tracker_attr] = api_options[option] + else: + self.attrs[tracker_attr] = [api_options[option]] + # prepare options for the command call + command_options.update(api_options) + + return command(**command_options) + + def _remove_acl_component(self, command_name, keys, track): + """ Remove a resource from ACL rule and track it. + + command_name - the name in the API + keys = { + 'tracker_attr': { + 'api_key': 'value' + } + } + + e.g. + + keys = { + 'memberhost_host': { + 'host': 'hostname' + }, + 'memberhost_hostgroup': { + 'hostgroup': 'hostgroup_name' + } + } + """ + command = self.make_command(command_name, self.name) + command_options = dict() + + for tracker_attr in keys: + api_options = keys[tracker_attr] + if track: + for option in api_options: + if type(option) in (list, tuple): + for item in option: + self.attrs[tracker_attr].remove(item) + else: + self.attrs[tracker_attr].remove(api_options[option]) + if len(self.attrs[tracker_attr]) == 0: + del self.attrs[tracker_attr] + command_options.update(api_options) + + return command(**command_options) + + def add_host(self, host=None, hostgroup=None, track=True): + """Associates an host or hostgroup entry with the ACL. + + The command takes an unicode string with the name + of the entry (RDN). + + It is the responsibility of a test writer to provide + the correct value, object type as the method does not + verify whether the entry exists. + + The method can add only one entry of each type + in one call. + """ + + options = { + u'memberhost_host': {u'host': host}, + u'memberhost_hostgroup': {u'hostgroup': hostgroup}} + + return self._add_acl_component(u'caacl_add_host', options, track) + + def remove_host(self, host=None, hostgroup=None, track=True): + options = { + u'memberhost_host': {u'host': host}, + u'memberhost_hostgroup': {u'hostgroup': hostgroup}} + + return self._remove_acl_component(u'caacl_remove_host', options, track) + + def add_user(self, user=None, group=None, track=True): + options = { + u'memberuser_user': {u'user': user}, + u'memberuser_group': {u'group': group}} + + return self._add_acl_component(u'caacl_add_user', options, track) + + def remove_user(self, user=None, group=None, track=True): + options = { + u'memberuser_user': {u'user': user}, + u'memberuser_group': {u'group': group}} + + return self._remove_acl_component(u'caacl_remove_user', options, track) + + def add_service(self, service=None, track=True): + options = { + u'memberservice_service': {u'service': service}} + + return self._add_acl_component(u'caacl_add_service', options, track) + + def remove_service(self, service=None, track=True): + options = { + u'memberservice_service': {u'service': service}} + + return self._remove_acl_component(u'caacl_remove_service', options, track) + + def add_profile(self, certprofile=None, track=True): + options = { + u'ipamembercertprofile_certprofile': + {u'certprofile': certprofile}} + + return self._add_acl_component(u'caacl_add_profile', options, track) + + def remove_profile(self, certprofile=None, track=True): + options = { + u'ipamembercertprofile_certprofile': + {u'certprofile': certprofile}} + + return self._remove_acl_component(u'caacl_remove_profile', options, track) + + def enable(self): + command = self.make_command(u'caacl_enable', self.name) + self.attrs.update({u'ipaenabledflag': [u'TRUE']}) + command() + + def disable(self): + command = self.make_command(u'caacl_disable', self.name) + self.attrs.update({u'ipaenabledflag': [u'FALSE']}) + command() diff --git a/ipatests/test_xmlrpc/tracker/certprofile_plugin.py b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py new file mode 100644 index 000000000..eeb27eb14 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +import os + + +from ipapython.dn import DN +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.test_xmlrpc import objectclasses +from ipatests.util import assert_deepequal + + +class CertprofileTracker(Tracker): + """Tracker class for certprofile plugin. + """ + + retrieve_keys = { + 'dn', 'cn', 'description', 'ipacertprofilestoreissued' + } + retrieve_all_keys = retrieve_keys | {'objectclass'} + create_keys = retrieve_keys | {'objectclass'} + update_keys = retrieve_keys - {'dn'} + managedby_keys = retrieve_keys + allowedto_keys = retrieve_keys + + def __init__(self, name, store=False, desc='dummy description', + profile=None, default_version=None): + super(CertprofileTracker, self).__init__( + default_version=default_version + ) + + self.store = store + self.description = desc + self._profile_path = profile + + self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca', + self.api.env.basedn) + + @property + def profile(self): + if not self._profile_path: + return None + + if os.path.isabs(self._profile_path): + path = self._profile_path + else: + path = os.path.join(os.path.dirname(__file__), + self._profile_path) + + with open(path, 'r') as f: + content = f.read() + return unicode(content) + + def make_create_command(self, force=True): + if not self.profile: + raise RuntimeError('Tracker object without path to profile ' + 'cannot be used to create profile entry.') + + return self.make_command('certprofile_import', self.name, + description=self.description, + ipacertprofilestoreissued=self.store, + file=self.profile) + + def check_create(self, result): + assert_deepequal(dict( + value=self.name, + summary=u'Imported profile "{}"'.format(self.name), + result=dict(self.filter_attrs(self.create_keys)) + ), result) + + def track_create(self): + self.attrs = dict( + dn=unicode(self.dn), + cn=[self.name], + description=[self.description], + ipacertprofilestoreissued=[unicode(self.store).upper()], + objectclass=objectclasses.certprofile + ) + self.exists = True + + def make_delete_command(self): + return self.make_command('certprofile_del', self.name) + + def check_delete(self, result): + assert_deepequal(dict( + value=[self.name], # correctly a list? + summary=u'Deleted profile "{}"'.format(self.name), + result=dict(failed=[]), + ), result) + + def make_retrieve_command(self, all=False, raw=False, **options): + return self.make_command('certprofile_show', self.name, all=all, + raw=raw, **options) + + def check_retrieve(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + value=self.name, + summary=None, + result=expected, + ), result) + + def make_find_command(self, *args, **kwargs): + return self.make_command('certprofile_find', *args, **kwargs) + + def check_find(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 profile matched', + result=[expected] + ), result) + + def make_update_command(self, updates): + return self.make_command('certprofile_mod', self.name, **updates) + + def check_update(self, result, extra_keys=()): + assert_deepequal(dict( + value=self.name, + summary=u'Modified Certificate Profile "{}"'.format(self.name), + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) diff --git a/ipatests/test_xmlrpc/tracker/group_plugin.py b/ipatests/test_xmlrpc/tracker/group_plugin.py new file mode 100644 index 000000000..c47ce8ecf --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/group_plugin.py @@ -0,0 +1,196 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid + +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.util import assert_deepequal, get_group_dn + + +class GroupTracker(Tracker): + """ Class for host plugin like tests """ + retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user', + u'member_group'} + retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'} + + create_keys = retrieve_all_keys + update_keys = retrieve_keys - {u'dn'} + + add_member_keys = retrieve_keys | {u'description'} + + def __init__(self, name): + super(GroupTracker, self).__init__(default_version=None) + self.cn = name + self.dn = get_group_dn(name) + + def make_create_command(self, nonposix=False, external=False, + force=True): + """ Make function that creates a group using 'group-add' """ + return self.make_command('group_add', self.cn, + nonposix=nonposix, external=external) + + def make_delete_command(self): + """ Make function that deletes a group using 'group-del' """ + return self.make_command('group_del', self.cn) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a group using 'group-show' """ + return self.make_command('group_show', self.cn, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that searches for a group using 'group-find' """ + return self.make_command('group_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates a group using 'group-mod' """ + return self.make_command('group_mod', self.cn, **updates) + + def make_add_member_command(self, options={}): + """ Make function that adds a member to a group + Attention: only works for one user OR group! """ + if u'user' in options: + self.attrs[u'member_user'] = [options[u'user']] + elif u'group' in options: + self.attrs[u'member_group'] = [options[u'group']] + self.adds = options + + return self.make_command('group_add_member', self.cn, **options) + + def make_remove_member_command(self, options={}): + """ Make function that removes a member from a group + Attention: only works for one user OR group! """ + if u'user' in options: + del self.attrs[u'member_user'] + elif u'group' in options: + del self.attrs[u'member_group'] + return self.make_command('group_remove_member', self.cn, **options) + + def make_detach_command(self): + """ Make function that detaches a managed group using + 'group-detach' """ + self.exists = True + return self.make_command('group_detach', self.cn) + + def track_create(self): + """ Updates expected state for group creation""" + self.attrs = dict( + dn=get_group_dn(self.cn), + cn=[self.cn], + gidnumber=[fuzzy_digits], + ipauniqueid=[fuzzy_uuid], + objectclass=objectclasses.posixgroup, + ) + self.exists = True + + def check_create(self, result): + """ Checks 'group_add' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Added group "%s"' % self.cn, + result=self.filter_attrs(self.create_keys) + ), result) + + def check_delete(self, result): + """ Checks 'group_del' command result """ + assert_deepequal(dict( + value=[self.cn], + summary=u'Deleted group "%s"' % self.cn, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """ Checks 'group_show' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + value=self.cn, + summary=None, + result=expected + ), result) + + def check_find(self, result, all=False, raw=False): + """ Checks 'group_find' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 group matched', + result=[expected], + ), result) + + def check_update(self, result, extra_keys={}): + """ Checks 'group_mod' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Modified group "%s"' % self.cn, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def check_add_member(self, result): + """ Checks 'group_add_member' command result """ + assert_deepequal(dict( + completed=1, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ), result) + + def check_add_member_negative(self, result): + """ Checks 'group_add_member' command result when expected result + is failure of the operation""" + if u'member_user' in self.attrs: + del self.attrs[u'member_user'] + elif u'member_group' in self.attrs: + del self.attrs[u'member_group'] + + expected = dict( + completed=0, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ) + if u'user' in self.adds: + expected[u'failed'][u'member'][u'user'] = [( + self.adds[u'user'], u'no such entry')] + elif u'group' in self.adds: + expected[u'failed'][u'member'][u'group'] = [( + self.adds[u'group'], u'no such entry')] + + assert_deepequal(expected, result) + + def check_remove_member(self, result): + """ Checks 'group_remove_member' command result """ + assert_deepequal(dict( + completed=1, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ), result) + + def check_detach(self, result): + """ Checks 'group_detach' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Detached group "%s" from user "%s"' % ( + self.cn, self.cn), + result=True + ), result) + + def make_fixture_detach(self, request): + """Make a pytest fixture for this tracker + + The fixture ensures the plugin entry does not exist before + and after the tests that use itself. + """ + def cleanup(): + pass + + request.addfinalizer(cleanup) + + return self diff --git a/ipatests/test_xmlrpc/tracker/host_plugin.py b/ipatests/test_xmlrpc/tracker/host_plugin.py new file mode 100644 index 000000000..bf199f4f5 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/host_plugin.py @@ -0,0 +1,154 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from __future__ import print_function + + +from ipapython.dn import DN +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_uuid +from ipatests.test_xmlrpc import objectclasses +from ipatests.util import assert_deepequal + + +class HostTracker(Tracker): + """Wraps and tracks modifications to a Host object + + Implements the helper functions for host plugin. + + The HostTracker object stores information about the host, e.g. + ``fqdn`` and ``dn``. + """ + retrieve_keys = { + 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host', + 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint', + 'serial_number', 'serial_number_hex', 'sha1_fingerprint', + 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before', + 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user', + 'ipaallowedtoperform_read_keys_group', + 'ipaallowedtoperform_read_keys_host', + 'ipaallowedtoperform_read_keys_hostgroup', + 'ipaallowedtoperform_write_keys_user', + 'ipaallowedtoperform_write_keys_group', + 'ipaallowedtoperform_write_keys_host', + 'ipaallowedtoperform_write_keys_hostgroup'} + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid', + u'managing_host', u'objectclass', u'serverhostname'} + create_keys = retrieve_keys | {'objectclass', 'ipauniqueid', + 'randompassword'} + update_keys = retrieve_keys - {'dn'} + managedby_keys = retrieve_keys - {'has_keytab', 'has_password'} + allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'} + + def __init__(self, name, fqdn=None, default_version=None): + super(HostTracker, self).__init__(default_version=default_version) + + self.shortname = name + if fqdn: + self.fqdn = fqdn + else: + self.fqdn = u'%s.%s' % (name, self.api.env.domain) + self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts', + self.api.env.basedn) + + self.description = u'Test host <%s>' % name + self.location = u'Undisclosed location <%s>' % name + + def make_create_command(self, force=True): + """Make function that creates this host using host_add""" + return self.make_command('host_add', self.fqdn, + description=self.description, + l=self.location, + force=force) + + def make_delete_command(self): + """Make function that deletes the host using host_del""" + return self.make_command('host_del', self.fqdn) + + def make_retrieve_command(self, all=False, raw=False): + """Make function that retrieves the host using host_show""" + return self.make_command('host_show', self.fqdn, all=all, raw=raw) + + def make_find_command(self, *args, **kwargs): + """Make function that finds hosts using host_find + + Note that the fqdn (or other search terms) needs to be specified + in arguments. + """ + return self.make_command('host_find', *args, **kwargs) + + def make_update_command(self, updates): + """Make function that modifies the host using host_mod""" + return self.make_command('host_mod', self.fqdn, **updates) + + def track_create(self): + """Update expected state for host creation""" + self.attrs = dict( + dn=self.dn, + fqdn=[self.fqdn], + description=[self.description], + l=[self.location], + krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)], + objectclass=objectclasses.host, + ipauniqueid=[fuzzy_uuid], + managedby_host=[self.fqdn], + has_keytab=False, + has_password=False, + cn=[self.fqdn], + ipakrbokasdelegate=False, + ipakrbrequirespreauth=True, + managing_host=[self.fqdn], + serverhostname=[self.shortname], + ) + self.exists = True + + def check_create(self, result): + """Check `host_add` command result""" + assert_deepequal(dict( + value=self.fqdn, + summary=u'Added host "%s"' % self.fqdn, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """Check `host_del` command result""" + assert_deepequal(dict( + value=[self.fqdn], + summary=u'Deleted host "%s"' % self.fqdn, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """Check `host_show` command result""" + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + assert_deepequal(dict( + value=self.fqdn, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """Check `host_find` command result""" + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 host matched', + result=[expected], + ), result) + + def check_update(self, result, extra_keys=()): + """Check `host_update` command result""" + assert_deepequal(dict( + value=self.fqdn, + summary=u'Modified host "%s"' % self.fqdn, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) diff --git a/ipatests/test_xmlrpc/tracker/stageuser_plugin.py b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py new file mode 100644 index 000000000..09ad282e4 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py @@ -0,0 +1,267 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +import six + +from ipalib import api, errors + +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.xmlrpc_test import ( + fuzzy_string, fuzzy_dergeneralizedtime, raises_exact) + +from ipatests.util import assert_deepequal, get_user_dn +from ipapython.dn import DN + +if six.PY3: + unicode = str + +sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X' + 'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI' + 'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2' + 'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm' + 'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01' + '9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF' + '0L public key test') +sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B ' + 'public key test (ssh-rsa)') + + +class StageUserTracker(Tracker): + """ Tracker class for staged user LDAP object + + Implements helper functions for host plugin. + StageUserTracker object stores information about the user. + """ + + retrieve_keys = { + u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell', + u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber', + u'title', u'memberof', u'nsaccountlock', u'memberofindirect', + u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink', + u'ipatokenradiususername', u'krbprincipalexpiration', + u'usercertificate', u'dn', u'has_keytab', u'has_password', + u'street', u'postalcode', u'facsimiletelephonenumber', + u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l', + u'st', u'mobile', u'pager', } + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipauniqueid', u'objectclass', u'description', + u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} + + create_keys = retrieve_all_keys | { + u'objectclass', u'ipauniqueid', u'randompassword', + u'userpassword', u'krbextradata', u'krblastpwdchange', + u'krbpasswordexpiration', u'krbprincipalkey'} + + update_keys = retrieve_keys - {u'dn', u'nsaccountlock'} + activate_keys = retrieve_keys | { + u'has_keytab', u'has_password', u'nsaccountlock'} + + def __init__(self, name, givenname, sn, **kwargs): + super(StageUserTracker, self).__init__(default_version=None) + self.uid = name + self.givenname = givenname + self.sn = sn + self.dn = DN( + ('uid', self.uid), api.env.container_stageuser, api.env.basedn) + + self.kwargs = kwargs + + def make_create_command(self, options=None, force=None): + """ Make function that creates a staged user using stageuser-add """ + if options is not None: + self.kwargs = options + return self.make_command('stageuser_add', self.uid, + givenname=self.givenname, + sn=self.sn, **self.kwargs) + + def make_delete_command(self): + """ Make function that deletes a staged user using stageuser-del """ + return self.make_command('stageuser_del', self.uid) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a staged user using stageuser-show """ + return self.make_command('stageuser_show', self.uid, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that finds staged user using stageuser-find """ + return self.make_command('stageuser_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates staged user using stageuser-mod """ + return self.make_command('stageuser_mod', self.uid, **updates) + + def make_activate_command(self): + """ Make function that activates staged user + using stageuser-activate """ + return self.make_command('stageuser_activate', self.uid) + + def track_create(self): + """ Update expected state for staged user creation """ + self.attrs = dict( + dn=self.dn, + uid=[self.uid], + givenname=[self.givenname], + sn=[self.sn], + homedirectory=[u'/home/%s' % self.uid], + displayname=[u'%s %s' % (self.givenname, self.sn)], + cn=[u'%s %s' % (self.givenname, self.sn)], + initials=[u'%s%s' % (self.givenname[0], self.sn[0])], + objectclass=objectclasses.user_base, + description=[u'__no_upg__'], + ipauniqueid=[u'autogenerate'], + uidnumber=[u'-1'], + gidnumber=[u'-1'], + krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], + mail=[u'%s@%s' % (self.uid, self.api.env.domain)], + gecos=[u'%s %s' % (self.givenname, self.sn)], + loginshell=[u'/bin/sh'], + has_keytab=False, + has_password=False, + nsaccountlock=[u'true'], + ) + + for key in self.kwargs: + if key == u'krbprincipalname': + self.attrs[key] = [u'%s@%s' % ( + (self.kwargs[key].split('@'))[0].lower(), + (self.kwargs[key].split('@'))[1])] + elif key == u'manager': + self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))] + elif key == u'ipasshpubkey': + self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp] + self.attrs[key] = [self.kwargs[key]] + elif key == u'random' or key == u'userpassword': + self.attrs[u'krbextradata'] = [fuzzy_string] + self.attrs[u'krbpasswordexpiration'] = [ + fuzzy_dergeneralizedtime] + self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime] + self.attrs[u'krbprincipalkey'] = [fuzzy_string] + self.attrs[u'userpassword'] = [fuzzy_string] + self.attrs[u'has_keytab'] = True + self.attrs[u'has_password'] = True + if key == u'random': + self.attrs[u'randompassword'] = fuzzy_string + else: + self.attrs[key] = [self.kwargs[key]] + + self.exists = True + + def check_create(self, result): + """ Check 'stageuser-add' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Added stage user "%s"' % self.uid, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """ Check 'stageuser-del' command result """ + assert_deepequal(dict( + value=[self.uid], + summary=u'Deleted stage user "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """ Check 'stageuser-show' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different + # type of nsaccountlock value than DS, but overall the value + # fits expected result + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + value=self.uid, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """ Check 'stageuser-find' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different + # type of nsaccountlock value than DS, but overall the value + # fits expected result + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 user matched', + result=[expected], + ), result) + + def check_find_nomatch(self, result): + """ Check 'stageuser-find' command result when no match is expected """ + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 users matched', + result=[], + ), result) + + def check_update(self, result, extra_keys=()): + """ Check 'stageuser-mod' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Modified stage user "%s"' % self.uid, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def check_restore_preserved(self, result): + assert_deepequal(dict( + value=[self.uid], + summary=u'Staged user account "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def make_fixture_activate(self, request): + """Make a pytest fixture for a staged user that is to be activated + + The fixture ensures the plugin entry does not exist before + and after the tests that use it. It takes into account + that the staged user no longer exists after activation, + therefore the fixture verifies after the tests + that the staged user doesn't exist instead of deleting it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass + + def finish(): + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % self.uid)): + del_command() + + request.addfinalizer(finish) + + return self + + def create_from_preserved(self, user): + """ Copies values from preserved user - helper function for + restoration tests """ + self.attrs = user.attrs + self.uid = user.uid + self.givenname = user.givenname + self.sn = user.sn + self.dn = DN( + ('uid', self.uid), api.env.container_stageuser, api.env.basedn) + self.attrs[u'dn'] = self.dn diff --git a/ipatests/test_xmlrpc/tracker/user_plugin.py b/ipatests/test_xmlrpc/tracker/user_plugin.py new file mode 100644 index 000000000..af7d85836 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/user_plugin.py @@ -0,0 +1,340 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from ipalib import api, errors +from ipapython.dn import DN + +from ipatests.util import assert_deepequal, get_group_dn +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid, raises_exact +from ipatests.test_xmlrpc.tracker.base import Tracker + + +class UserTracker(Tracker): + """ Class for host plugin like tests """ + + retrieve_keys = { + u'uid', u'givenname', u'sn', u'homedirectory', + u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou', + u'telephonenumber', u'title', u'memberof', + u'memberofindirect', u'ipauserauthtype', u'userclass', + u'ipatokenradiusconfiglink', u'ipatokenradiususername', + u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab', + u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber', + u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock', + u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata', + u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st' + } + + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry', + u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} + + retrieve_preserved_keys = retrieve_keys - {u'memberof_group'} + retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'} + + create_keys = retrieve_all_keys | { + u'randompassword', u'mepmanagedentry', + u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange', + u'krbprincipalkey', u'randompassword', u'userpassword' + } + update_keys = retrieve_keys - {u'dn'} + activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password', + u'nsaccountlock', u'sshpubkeyfp'} + + find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'} + find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'} + + def __init__(self, name, givenname, sn, **kwargs): + super(UserTracker, self).__init__(default_version=None) + self.uid = name + self.givenname = givenname + self.sn = sn + self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn) + + self.kwargs = kwargs + + def make_create_command(self, force=None): + """ Make function that crates a user using user-add """ + return self.make_command( + 'user_add', self.uid, + givenname=self.givenname, + sn=self.sn, **self.kwargs + ) + + def make_delete_command(self, no_preserve=True, preserve=False): + """ Make function that deletes a user using user-del """ + + if preserve and not no_preserve: + # necessary to change some user attributes due to moving + # to different container + self.attrs[u'dn'] = DN( + ('uid', self.uid), + api.env.container_deleteuser, + api.env.basedn + ) + self.attrs[u'objectclass'] = objectclasses.user_base + + return self.make_command( + 'user_del', self.uid, + no_preserve=no_preserve, + preserve=preserve + ) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a user using user-show """ + return self.make_command('user_show', self.uid, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that finds user using user-find """ + return self.make_command('user_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates user using user-mod """ + return self.make_command('user_mod', self.uid, **updates) + + def make_undelete_command(self): + """ Make function that activates preserved user using user-undel """ + return self.make_command('user_undel', self.uid) + + def make_enable_command(self): + """ Make function that enables user using user-enable """ + return self.make_command('user_enable', self.uid) + + def make_stage_command(self): + """ Make function that restores preserved user by moving it to + staged container """ + return self.make_command('user_stage', self.uid) + + def track_create(self): + """ Update expected state for user creation """ + self.attrs = dict( + dn=self.dn, + uid=[self.uid], + givenname=[self.givenname], + sn=[self.sn], + homedirectory=[u'/home/%s' % self.uid], + displayname=[u'%s %s' % (self.givenname, self.sn)], + cn=[u'%s %s' % (self.givenname, self.sn)], + initials=[u'%s%s' % (self.givenname[0], self.sn[0])], + objectclass=objectclasses.user, + description=[u'__no_upg__'], + ipauniqueid=[fuzzy_uuid], + uidnumber=[fuzzy_digits], + gidnumber=[fuzzy_digits], + krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], + mail=[u'%s@%s' % (self.uid, self.api.env.domain)], + gecos=[u'%s %s' % (self.givenname, self.sn)], + loginshell=[u'/bin/sh'], + has_keytab=False, + has_password=False, + mepmanagedentry=[get_group_dn(self.uid)], + memberof_group=[u'ipausers'], + ) + + for key in self.kwargs: + if key == u'krbprincipalname': + self.attrs[key] = [u'%s@%s' % ( + (self.kwargs[key].split('@'))[0].lower(), + (self.kwargs[key].split('@'))[1] + )] + else: + self.attrs[key] = [self.kwargs[key]] + + self.exists = True + + def check_create(self, result): + """ Check 'user-add' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Added user "%s"' % self.uid, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """ Check 'user-del' command result """ + assert_deepequal(dict( + value=[self.uid], + summary=u'Deleted user "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False): + """ Check 'user-show' command result """ + + if u'preserved' in self.attrs and self.attrs[u'preserved']: + self.retrieve_all_keys = self.retrieve_preserved_all_keys + self.retrieve_keys = self.retrieve_preserved_keys + elif u'preserved' not in self.attrs and all: + self.attrs[u'preserved'] = False + + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different type + # of nsaccountlock value than DS, but overall the value fits + # expected result + if u'nsaccountlock' in expected: + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + value=self.uid, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """ Check 'user-find' command result """ + self.attrs[u'nsaccountlock'] = True + self.attrs[u'preserved'] = True + + if all: + expected = self.filter_attrs(self.find_all_keys) + else: + expected = self.filter_attrs(self.find_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 user matched', + result=[expected], + ), result) + + def check_find_nomatch(self, result): + """ Check 'user-find' command result when no user should be found """ + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 users matched', + result=[], + ), result) + + def check_update(self, result, extra_keys=()): + """ Check 'user-mod' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Modified user "%s"' % self.uid, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def create_from_staged(self, stageduser): + """ Copies attributes from staged user - helper function for + activation tests """ + self.attrs = stageduser.attrs + self.uid = stageduser.uid + self.givenname = stageduser.givenname + self.sn = stageduser.sn + + self.attrs[u'mepmanagedentry'] = None + self.attrs[u'dn'] = self.dn + self.attrs[u'ipauniqueid'] = [fuzzy_uuid] + self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % ( + api.env.container_group, api.env.basedn + )] + self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % ( + self.uid, api.env.container_group, api.env.basedn + )] + self.attrs[u'objectclass'] = objectclasses.user + if self.attrs[u'gidnumber'] == [u'-1']: + self.attrs[u'gidnumber'] = [fuzzy_digits] + if self.attrs[u'uidnumber'] == [u'-1']: + self.attrs[u'uidnumber'] = [fuzzy_digits] + + if u'ipasshpubkey' in self.kwargs: + self.attrs[u'ipasshpubkey'] = [str( + self.kwargs[u'ipasshpubkey'] + )] + + def check_activate(self, result): + """ Check 'stageuser-activate' command result """ + expected = dict( + value=self.uid, + summary=u'Stage user %s activated' % self.uid, + result=self.filter_attrs(self.activate_keys)) + + # work around to eliminate inconsistency in returned objectclass + # (case sensitive assertion) + expected['result']['objectclass'] = [item.lower() for item in + expected['result']['objectclass']] + result['result']['objectclass'] = [item.lower() for item in + result['result']['objectclass']] + + assert_deepequal(expected, result) + + self.exists = True + + def check_undel(self, result): + """ Check 'user-undel' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Undeleted user account "%s"' % self.uid, + result=True + ), result) + + def track_delete(self, preserve=False): + """Update expected state for host deletion""" + if preserve: + self.exists = True + if u'memberof_group' in self.attrs: + del self.attrs[u'memberof_group'] + self.attrs[u'nsaccountlock'] = True + self.attrs[u'preserved'] = True + else: + self.exists = False + self.attrs = {} + + def make_preserved_user(self): + """ 'Creates' a preserved user necessary for some tests """ + self.ensure_exists() + self.track_delete(preserve=True) + command = self.make_delete_command(no_preserve=False, preserve=True) + result = command() + self.check_delete(result) + + def check_attr_preservation(self, expected): + """ Verifies that ipaUniqueID, uidNumber and gidNumber are + preserved upon reactivation. Also verifies that resulting + active user is a member of ipausers group only.""" + command = self.make_retrieve_command(all=True) + result = command() + + assert_deepequal(dict( + ipauniqueid=result[u'result'][u'ipauniqueid'], + uidnumber=result[u'result'][u'uidnumber'], + gidnumber=result[u'result'][u'gidnumber'] + ), expected) + + if (u'memberof_group' not in result[u'result'] or + result[u'result'][u'memberof_group'] != (u'ipausers',)): + assert False + + def make_fixture_restore(self, request): + """Make a pytest fixture for a preserved user that is to be moved to + staged area. + + The fixture ensures the plugin entry does not exist before + and after the tests that use it. It takes into account + that the preserved user no longer exists after restoring it, + therefore the fixture verifies after the tests + that the preserved user doesn't exist instead of deleting it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass + + def finish(): + with raises_exact(errors.NotFound( + reason=u'%s: user not found' % self.uid)): + del_command() + + request.addfinalizer(finish) + + return self diff --git a/ipatests/util.py b/ipatests/util.py index c3c69816e..6aefe74d3 100644 --- a/ipatests/util.py +++ b/ipatests/util.py @@ -706,3 +706,9 @@ def change_principal(user, password, client=None, path=None): client.Backend.rpcclient.disconnect() client.Backend.rpcclient.connect() + +def get_group_dn(cn): + return DN(('cn', cn), api.env.container_group, api.env.basedn) + +def get_user_dn(uid): + return DN(('uid', uid), api.env.container_user, api.env.basedn) |