summaryrefslogtreecommitdiffstats
path: root/ipatests
diff options
context:
space:
mode:
authorMilan KubĂ­k <mkubik@redhat.com>2015-11-19 16:07:29 +0100
committerMartin Basti <mbasti@redhat.com>2015-12-02 17:12:24 +0100
commit17f9ca154b47f1e21797d25435e25676fdca284c (patch)
treefd3f5b2976acd3ca0718c88dbbe35782982be2b6 /ipatests
parentb8c619a7139bd7b65caa03b68431e22791ff19bf (diff)
downloadfreeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.gz
freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.xz
freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.zip
Separated Tracker implementations into standalone package
The previous way of implementing trackers in the module with the test caused circular imports. The separate package resolves this issue. https://fedorahosted.org/freeipa/ticket/5467 Reviewed-By: Ales 'alich' Marecek <amarecek@redhat.com>
Diffstat (limited to 'ipatests')
-rw-r--r--ipatests/setup.py.in3
-rw-r--r--ipatests/test_xmlrpc/test_caacl_plugin.py367
-rw-r--r--ipatests/test_xmlrpc/test_caacl_profile_enforcement.py4
-rw-r--r--ipatests/test_xmlrpc/test_certprofile_plugin.py126
-rw-r--r--ipatests/test_xmlrpc/test_group_plugin.py192
-rw-r--r--ipatests/test_xmlrpc/test_host_plugin.py144
-rw-r--r--ipatests/test_xmlrpc/test_stageuser_plugin.py247
-rw-r--r--ipatests/test_xmlrpc/test_user_plugin.py335
-rw-r--r--ipatests/test_xmlrpc/tracker/__init__.py0
-rw-r--r--ipatests/test_xmlrpc/tracker/base.py (renamed from ipatests/test_xmlrpc/ldaptracker.py)0
-rw-r--r--ipatests/test_xmlrpc/tracker/caacl_plugin.py367
-rw-r--r--ipatests/test_xmlrpc/tracker/certprofile_plugin.py133
-rw-r--r--ipatests/test_xmlrpc/tracker/group_plugin.py196
-rw-r--r--ipatests/test_xmlrpc/tracker/host_plugin.py154
-rw-r--r--ipatests/test_xmlrpc/tracker/stageuser_plugin.py267
-rw-r--r--ipatests/test_xmlrpc/tracker/user_plugin.py340
-rw-r--r--ipatests/util.py6
17 files changed, 1480 insertions, 1401 deletions
diff --git a/ipatests/setup.py.in b/ipatests/setup.py.in
index afc77ad56..ce1efb761 100644
--- a/ipatests/setup.py.in
+++ b/ipatests/setup.py.in
@@ -76,7 +76,8 @@ def setup_package():
"ipatests.test_ipaserver.test_install",
"ipatests.test_pkcs10",
"ipatests.test_webui",
- "ipatests.test_xmlrpc"],
+ "ipatests.test_xmlrpc",
+ "ipatests.test_xmlrpc.tracker"],
scripts=['ipa-run-tests', 'ipa-test-config', 'ipa-test-task'],
package_data = {
'ipatests': ['pytest.ini'],
diff --git a/ipatests/test_xmlrpc/test_caacl_plugin.py b/ipatests/test_xmlrpc/test_caacl_plugin.py
index 8b156a65a..d5ded1951 100644
--- a/ipatests/test_xmlrpc/test_caacl_plugin.py
+++ b/ipatests/test_xmlrpc/test_caacl_plugin.py
@@ -9,373 +9,12 @@ Test the `ipalib.plugins.caacl` module.
import pytest
from ipalib import errors
-from ipatests.test_xmlrpc.ldaptracker import Tracker
-from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test, fuzzy_caacldn,
- fuzzy_uuid, fuzzy_ipauniqueid)
-
-from ipatests.test_xmlrpc import objectclasses
-from ipatests.util import assert_deepequal
+from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
# reuse the fixture
from ipatests.test_xmlrpc.test_certprofile_plugin import default_profile
-from ipatests.test_xmlrpc.test_stageuser_plugin import StageUserTracker
-
-
-class CAACLTracker(Tracker):
- """Tracker class for CA ACL LDAP object.
-
- The class implements methods required by the base class
- to help with basic CRUD operations.
-
- Methods for adding and deleting actual member entries into an ACL
- do not have check methods as these would make the class
- unnecessarily complicated. The checks are implemented as
- a standalone test suite. However, this makes the test crucial
- in debugging more complicated test cases. Since the add/remove
- operation won't be checked right away by the tracker, a problem
- in this operation may propagate into more complicated test case.
-
- It is possible to pass a list of member uids to these methods.
-
- The test uses instances of Fuzzy class to compare results as they
- are in the UUID format. The dn and rdn properties were modified
- to reflect this as well.
- """
-
- member_keys = {
- u'memberuser_user', u'memberuser_group',
- u'memberhost_host', u'memberhost_hostgroup',
- u'memberservice_service',
- u'ipamembercertprofile_certprofile'}
- category_keys = {
- u'ipacacategory', u'ipacertprofilecategory', u'usercategory',
- u'hostcategory', u'servicecategory'}
- retrieve_keys = {
- u'dn', u'cn', u'description', u'ipaenabledflag',
- u'ipamemberca', u'ipamembercertprofile', u'memberuser',
- u'memberhost', u'memberservice'} | member_keys | category_keys
- retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'}
- create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory',
- u'usercategory', u'hostcategory', u'ipacacategory',
- u'servicecategory', u'ipaenabledflag', u'objectclass',
- u'ipauniqueid'}
- update_keys = create_keys - {u'dn'}
-
- def __init__(self, name, ipacertprofile_category=None, user_category=None,
- service_category=None, host_category=None, description=None,
- default_version=None):
- super(CAACLTracker, self).__init__(default_version=default_version)
-
- self._name = name
- self.description = description
- self._categories = dict(
- ipacertprofilecategory=ipacertprofile_category,
- usercategory=user_category,
- servicecategory=service_category,
- hostcategory=host_category)
-
- self.dn = fuzzy_caacldn
-
- @property
- def name(self):
- return self._name
-
- @property
- def rdn(self):
- return fuzzy_ipauniqueid
-
- @property
- def categories(self):
- """To be used in track_create"""
- return {cat: v for cat, v in self._categories.items() if v}
-
- @property
- def create_categories(self):
- """ Return the categories set on create.
- Unused categories are left out.
- """
- return {cat: [v] for cat, v in self.categories.items() if v}
-
- def make_create_command(self, force=True):
- return self.make_command(u'caacl_add', self.name,
- description=self.description,
- **self.categories)
-
- def check_create(self, result):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Added CA ACL "{}"'.format(self.name),
- result=dict(self.filter_attrs(self.create_keys))
- ), result)
-
- def track_create(self):
- self.attrs = dict(
- dn=self.dn,
- ipauniqueid=[fuzzy_uuid],
- cn=[self.name],
- objectclass=objectclasses.caacl,
- ipaenabledflag=[u'TRUE'])
-
- self.attrs.update(self.create_categories)
- if self.description:
- self.attrs.update({u'description', [self.description]})
-
- self.exists = True
-
- def make_delete_command(self):
- return self.make_command('caacl_del', self.name)
-
- def check_delete(self, result):
- assert_deepequal(dict(
- value=[self.name],
- summary=u'Deleted CA ACL "{}"'.format(self.name),
- result=dict(failed=[])
- ), result)
-
- def make_retrieve_command(self, all=False, raw=False):
- return self.make_command('caacl_show', self.name, all=all, raw=raw)
-
- def check_retrieve(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- value=self.name,
- summary=None,
- result=expected
- ), result)
-
- def make_find_command(self, *args, **kwargs):
- return self.make_command('caacl_find', *args, **kwargs)
-
- def check_find(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 CA ACL matched',
- result=[expected]
- ), result)
-
- def make_update_command(self, updates):
- return self.make_command('caacl_mod', self.name, **updates)
-
- def update(self, updates, expected_updates=None, silent=False):
- """If removing a category, delete it from tracker as well"""
- # filter out empty categories and track changes
-
- filtered_updates = dict()
- for key, value in updates.items():
- if key in self.category_keys:
- if not value:
- try:
- del self.attrs[key]
- except IndexError:
- if silent:
- pass
- else:
- # if there is a value, prepare the pair for update
- filtered_updates.update({key: value})
- else:
- filtered_updates.update({key: value})
-
- if expected_updates is None:
- expected_updates = {}
-
- command = self.make_update_command(updates)
-
- try:
- result = command()
- except errors.EmptyModlist:
- if silent:
- self.attrs.update(filtered_updates)
- self.attrs.update(expected_updates)
- self.check_update(result,
- extra_keys=set(self.update_keys) |
- set(expected_updates.keys()))
-
- def check_update(self, result, extra_keys=()):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Modified CA ACL "{}"'.format(self.name),
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- # Helper methods for caacl subcommands. The check methods will be
- # implemented in standalone test
- #
- # The methods implemented here will be:
- # caacl_{add,remove}_{host, service, certprofile, user [, subca]}
-
- def _add_acl_component(self, command_name, keys, track):
- """ Add a resource into ACL rule and track it.
-
- command_name - the name in the API
- keys = {
- 'tracker_attr': {
- 'api_key': 'value'
- }
- }
-
- e.g.
-
- keys = {
- 'memberhost_host': {
- 'host': 'hostname'
- },
- 'memberhost_hostgroup': {
- 'hostgroup': 'hostgroup_name'
- }
- }
- """
-
- if not self.exists:
- raise errors.NotFound(reason="The tracked entry doesn't exist.")
-
- command = self.make_command(command_name, self.name)
- command_options = dict()
-
- # track
- for tracker_attr in keys:
- api_options = keys[tracker_attr]
- if track:
- for option in api_options:
- try:
- if type(option) in (list, tuple):
- self.attrs[tracker_attr].extend(api_options[option])
- else:
- self.attrs[tracker_attr].append(api_options[option])
- except KeyError:
- if type(option) in (list, tuple):
- self.attrs[tracker_attr] = api_options[option]
- else:
- self.attrs[tracker_attr] = [api_options[option]]
- # prepare options for the command call
- command_options.update(api_options)
-
- return command(**command_options)
-
- def _remove_acl_component(self, command_name, keys, track):
- """ Remove a resource from ACL rule and track it.
-
- command_name - the name in the API
- keys = {
- 'tracker_attr': {
- 'api_key': 'value'
- }
- }
-
- e.g.
-
- keys = {
- 'memberhost_host': {
- 'host': 'hostname'
- },
- 'memberhost_hostgroup': {
- 'hostgroup': 'hostgroup_name'
- }
- }
- """
- command = self.make_command(command_name, self.name)
- command_options = dict()
-
- for tracker_attr in keys:
- api_options = keys[tracker_attr]
- if track:
- for option in api_options:
- if type(option) in (list, tuple):
- for item in option:
- self.attrs[tracker_attr].remove(item)
- else:
- self.attrs[tracker_attr].remove(api_options[option])
- if len(self.attrs[tracker_attr]) == 0:
- del self.attrs[tracker_attr]
- command_options.update(api_options)
-
- return command(**command_options)
-
- def add_host(self, host=None, hostgroup=None, track=True):
- """Associates an host or hostgroup entry with the ACL.
-
- The command takes an unicode string with the name
- of the entry (RDN).
-
- It is the responsibility of a test writer to provide
- the correct value, object type as the method does not
- verify whether the entry exists.
-
- The method can add only one entry of each type
- in one call.
- """
-
- options = {
- u'memberhost_host': {u'host': host},
- u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
-
- return self._add_acl_component(u'caacl_add_host', options, track)
-
- def remove_host(self, host=None, hostgroup=None, track=True):
- options = {
- u'memberhost_host': {u'host': host},
- u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
-
- return self._remove_acl_component(u'caacl_remove_host', options, track)
-
- def add_user(self, user=None, group=None, track=True):
- options = {
- u'memberuser_user': {u'user': user},
- u'memberuser_group': {u'group': group}}
-
- return self._add_acl_component(u'caacl_add_user', options, track)
-
- def remove_user(self, user=None, group=None, track=True):
- options = {
- u'memberuser_user': {u'user': user},
- u'memberuser_group': {u'group': group}}
-
- return self._remove_acl_component(u'caacl_remove_user', options, track)
-
- def add_service(self, service=None, track=True):
- options = {
- u'memberservice_service': {u'service': service}}
-
- return self._add_acl_component(u'caacl_add_service', options, track)
-
- def remove_service(self, service=None, track=True):
- options = {
- u'memberservice_service': {u'service': service}}
-
- return self._remove_acl_component(u'caacl_remove_service', options, track)
-
- def add_profile(self, certprofile=None, track=True):
- options = {
- u'ipamembercertprofile_certprofile':
- {u'certprofile': certprofile}}
-
- return self._add_acl_component(u'caacl_add_profile', options, track)
-
- def remove_profile(self, certprofile=None, track=True):
- options = {
- u'ipamembercertprofile_certprofile':
- {u'certprofile': certprofile}}
-
- return self._remove_acl_component(u'caacl_remove_profile', options, track)
-
- def enable(self):
- command = self.make_command(u'caacl_enable', self.name)
- self.attrs.update({u'ipaenabledflag': [u'TRUE']})
- command()
-
- def disable(self):
- command = self.make_command(u'caacl_disable', self.name)
- self.attrs.update({u'ipaenabledflag': [u'FALSE']})
- command()
+from ipatests.test_xmlrpc.tracker.caacl_plugin import CAACLTracker
+from ipatests.test_xmlrpc.tracker.stageuser_plugin import StageUserTracker
@pytest.fixture(scope='class')
diff --git a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
index 78262ae8c..9de257a26 100644
--- a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
+++ b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
@@ -11,8 +11,8 @@ from ipalib import api, errors
from ipatests.util import (
prepare_config, unlock_principal_password, change_principal)
from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
-from ipatests.test_xmlrpc.test_certprofile_plugin import CertprofileTracker
-from ipatests.test_xmlrpc.test_caacl_plugin import CAACLTracker
+from ipatests.test_xmlrpc.tracker.certprofile_plugin import CertprofileTracker
+from ipatests.test_xmlrpc.tracker.caacl_plugin import CAACLTracker
from ipapython.ipautil import run
diff --git a/ipatests/test_xmlrpc/test_certprofile_plugin.py b/ipatests/test_xmlrpc/test_certprofile_plugin.py
index 1f06f99f5..66a72de3e 100644
--- a/ipatests/test_xmlrpc/test_certprofile_plugin.py
+++ b/ipatests/test_xmlrpc/test_certprofile_plugin.py
@@ -12,133 +12,9 @@ import os
import pytest
from ipalib import api, errors
-from ipapython.dn import DN
from ipatests.util import prepare_config
-from ipatests.test_xmlrpc.ldaptracker import Tracker
from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test, raises_exact
-from ipatests.test_xmlrpc import objectclasses
-from ipatests.util import assert_deepequal
-
-
-class CertprofileTracker(Tracker):
- """Tracker class for certprofile plugin.
- """
-
- retrieve_keys = {
- 'dn', 'cn', 'description', 'ipacertprofilestoreissued'
- }
- retrieve_all_keys = retrieve_keys | {'objectclass'}
- create_keys = retrieve_keys | {'objectclass'}
- update_keys = retrieve_keys - {'dn'}
- managedby_keys = retrieve_keys
- allowedto_keys = retrieve_keys
-
- def __init__(self, name, store=False, desc='dummy description',
- profile=None, default_version=None):
- super(CertprofileTracker, self).__init__(
- default_version=default_version
- )
-
- self.store = store
- self.description = desc
- self._profile_path = profile
-
- self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca',
- self.api.env.basedn)
-
- @property
- def profile(self):
- if not self._profile_path:
- return None
-
- if os.path.isabs(self._profile_path):
- path = self._profile_path
- else:
- path = os.path.join(os.path.dirname(__file__),
- self._profile_path)
-
- with open(path, 'r') as f:
- content = f.read()
- return unicode(content)
-
- def make_create_command(self, force=True):
- if not self.profile:
- raise RuntimeError('Tracker object without path to profile '
- 'cannot be used to create profile entry.')
-
- return self.make_command('certprofile_import', self.name,
- description=self.description,
- ipacertprofilestoreissued=self.store,
- file=self.profile)
-
- def check_create(self, result):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Imported profile "{}"'.format(self.name),
- result=dict(self.filter_attrs(self.create_keys))
- ), result)
-
- def track_create(self):
- self.attrs = dict(
- dn=unicode(self.dn),
- cn=[self.name],
- description=[self.description],
- ipacertprofilestoreissued=[unicode(self.store).upper()],
- objectclass=objectclasses.certprofile
- )
- self.exists = True
-
- def make_delete_command(self):
- return self.make_command('certprofile_del', self.name)
-
- def check_delete(self, result):
- assert_deepequal(dict(
- value=[self.name], # correctly a list?
- summary=u'Deleted profile "{}"'.format(self.name),
- result=dict(failed=[]),
- ), result)
-
- def make_retrieve_command(self, all=False, raw=False, **options):
- return self.make_command('certprofile_show', self.name, all=all,
- raw=raw, **options)
-
- def check_retrieve(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- value=self.name,
- summary=None,
- result=expected,
- ), result)
-
- def make_find_command(self, *args, **kwargs):
- return self.make_command('certprofile_find', *args, **kwargs)
-
- def check_find(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 profile matched',
- result=[expected]
- ), result)
-
- def make_update_command(self, updates):
- return self.make_command('certprofile_mod', self.name, **updates)
-
- def check_update(self, result, extra_keys=()):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Modified Certificate Profile "{}"'.format(self.name),
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
+from ipatests.test_xmlrpc.tracker.certprofile_plugin import CertprofileTracker
IPA_CERT_SUBJ_BASE = (
diff --git a/ipatests/test_xmlrpc/test_group_plugin.py b/ipatests/test_xmlrpc/test_group_plugin.py
index ed38c696e..f2bd0f4b9 100644
--- a/ipatests/test_xmlrpc/test_group_plugin.py
+++ b/ipatests/test_xmlrpc/test_group_plugin.py
@@ -26,13 +26,12 @@ import pytest
from ipalib import api, errors
from ipatests.test_xmlrpc import objectclasses
-from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci,
+from ipatests.test_xmlrpc.xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci,
add_sid, add_oc, XMLRPC_test, raises_exact)
from ipapython.dn import DN
from ipatests.test_xmlrpc.test_user_plugin import get_user_result
-from ipatests.test_xmlrpc.ldaptracker import Tracker
-from ipatests.test_xmlrpc.test_user_plugin import UserTracker
+from ipatests.test_xmlrpc.tracker.user_plugin import UserTracker
from ipatests.util import assert_deepequal
@@ -1161,190 +1160,3 @@ class test_group_full_set_of_objectclass_not_available_post_detach(Declarative):
),
),
]
-
-
-class GroupTracker(Tracker):
- """ Class for host plugin like tests """
- retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user',
- u'member_group'}
- retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'}
-
- create_keys = retrieve_all_keys
- update_keys = retrieve_keys - {u'dn'}
-
- add_member_keys = retrieve_keys | {u'description'}
-
- def __init__(self, name):
- super(GroupTracker, self).__init__(default_version=None)
- self.cn = name
- self.dn = get_group_dn(name)
-
- def make_create_command(self, nonposix=False, external=False,
- force=True):
- """ Make function that creates a group using 'group-add' """
- return self.make_command('group_add', self.cn,
- nonposix=nonposix, external=external)
-
- def make_delete_command(self):
- """ Make function that deletes a group using 'group-del' """
- return self.make_command('group_del', self.cn)
-
- def make_retrieve_command(self, all=False, raw=False):
- """ Make function that retrieves a group using 'group-show' """
- return self.make_command('group_show', self.cn, all=all)
-
- def make_find_command(self, *args, **kwargs):
- """ Make function that searches for a group using 'group-find' """
- return self.make_command('group_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """ Make function that updates a group using 'group-mod' """
- return self.make_command('group_mod', self.cn, **updates)
-
- def make_add_member_command(self, options={}):
- """ Make function that adds a member to a group
- Attention: only works for one user OR group! """
- if u'user' in options:
- self.attrs[u'member_user'] = [options[u'user']]
- elif u'group' in options:
- self.attrs[u'member_group'] = [options[u'group']]
- self.adds = options
-
- return self.make_command('group_add_member', self.cn, **options)
-
- def make_remove_member_command(self, options={}):
- """ Make function that removes a member from a group
- Attention: only works for one user OR group! """
- if u'user' in options:
- del self.attrs[u'member_user']
- elif u'group' in options:
- del self.attrs[u'member_group']
- return self.make_command('group_remove_member', self.cn, **options)
-
- def make_detach_command(self):
- """ Make function that detaches a managed group using
- 'group-detach' """
- self.exists = True
- return self.make_command('group_detach', self.cn)
-
- def track_create(self):
- """ Updates expected state for group creation"""
- self.attrs = dict(
- dn=get_group_dn(self.cn),
- cn=[self.cn],
- gidnumber=[fuzzy_digits],
- ipauniqueid=[fuzzy_uuid],
- objectclass=objectclasses.posixgroup,
- )
- self.exists = True
-
- def check_create(self, result):
- """ Checks 'group_add' command result """
- assert_deepequal(dict(
- value=self.cn,
- summary=u'Added group "%s"' % self.cn,
- result=self.filter_attrs(self.create_keys)
- ), result)
-
- def check_delete(self, result):
- """ Checks 'group_del' command result """
- assert_deepequal(dict(
- value=[self.cn],
- summary=u'Deleted group "%s"' % self.cn,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False, raw=False):
- """ Checks 'group_show' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- value=self.cn,
- summary=None,
- result=expected
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """ Checks 'group_find' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 group matched',
- result=[expected],
- ), result)
-
- def check_update(self, result, extra_keys={}):
- """ Checks 'group_mod' command result """
- assert_deepequal(dict(
- value=self.cn,
- summary=u'Modified group "%s"' % self.cn,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- def check_add_member(self, result):
- """ Checks 'group_add_member' command result """
- assert_deepequal(dict(
- completed=1,
- failed={u'member': {u'group': (), u'user': ()}},
- result=self.filter_attrs(self.add_member_keys)
- ), result)
-
- def check_add_member_negative(self, result):
- """ Checks 'group_add_member' command result when expected result
- is failure of the operation"""
- if u'member_user' in self.attrs:
- del self.attrs[u'member_user']
- elif u'member_group' in self.attrs:
- del self.attrs[u'member_group']
-
- expected = dict(
- completed=0,
- failed={u'member': {u'group': (), u'user': ()}},
- result=self.filter_attrs(self.add_member_keys)
- )
- if u'user' in self.adds:
- expected[u'failed'][u'member'][u'user'] = [(
- self.adds[u'user'], u'no such entry')]
- elif u'group' in self.adds:
- expected[u'failed'][u'member'][u'group'] = [(
- self.adds[u'group'], u'no such entry')]
-
- assert_deepequal(expected, result)
-
- def check_remove_member(self, result):
- """ Checks 'group_remove_member' command result """
- assert_deepequal(dict(
- completed=1,
- failed={u'member': {u'group': (), u'user': ()}},
- result=self.filter_attrs(self.add_member_keys)
- ), result)
-
- def check_detach(self, result):
- """ Checks 'group_detach' command result """
- assert_deepequal(dict(
- value=self.cn,
- summary=u'Detached group "%s" from user "%s"' % (
- self.cn, self.cn),
- result=True
- ), result)
-
- def make_fixture_detach(self, request):
- """Make a pytest fixture for this tracker
-
- The fixture ensures the plugin entry does not exist before
- and after the tests that use itself.
- """
- def cleanup():
- pass
-
- request.addfinalizer(cleanup)
-
- return self
diff --git a/ipatests/test_xmlrpc/test_host_plugin.py b/ipatests/test_xmlrpc/test_host_plugin.py
index 868c09ce8..be2b2d6af 100644
--- a/ipatests/test_xmlrpc/test_host_plugin.py
+++ b/ipatests/test_xmlrpc/test_host_plugin.py
@@ -34,12 +34,12 @@ from ipapython import ipautil
from ipalib import api, errors, x509
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
-from ipatests.test_xmlrpc.ldaptracker import Tracker
from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test,
fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer,
fuzzy_hex, raises_exact)
from ipatests.test_xmlrpc.test_user_plugin import get_group_dn
from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.tracker.host_plugin import HostTracker
from ipatests.test_xmlrpc.testcert import get_testcert
from ipatests.util import assert_deepequal
@@ -99,148 +99,6 @@ host_cert = get_testcert(DN(('CN', api.env.host), x509.subject_base()),
'host/%s@%s' % (api.env.host, api.env.realm))
-class HostTracker(Tracker):
- """Wraps and tracks modifications to a Host object
-
- Implements the helper functions for host plugin.
-
- The HostTracker object stores information about the host, e.g.
- ``fqdn`` and ``dn``.
- """
- retrieve_keys = {
- 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host',
- 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint',
- 'serial_number', 'serial_number_hex', 'sha1_fingerprint',
- 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before',
- 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user',
- 'ipaallowedtoperform_read_keys_group',
- 'ipaallowedtoperform_read_keys_host',
- 'ipaallowedtoperform_read_keys_hostgroup',
- 'ipaallowedtoperform_write_keys_user',
- 'ipaallowedtoperform_write_keys_group',
- 'ipaallowedtoperform_write_keys_host',
- 'ipaallowedtoperform_write_keys_hostgroup'}
- retrieve_all_keys = retrieve_keys | {
- u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid',
- u'managing_host', u'objectclass', u'serverhostname'}
- create_keys = retrieve_keys | {'objectclass', 'ipauniqueid',
- 'randompassword'}
- update_keys = retrieve_keys - {'dn'}
- managedby_keys = retrieve_keys - {'has_keytab', 'has_password'}
- allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
-
- def __init__(self, name, fqdn=None, default_version=None):
- super(HostTracker, self).__init__(default_version=default_version)
-
- self.shortname = name
- if fqdn:
- self.fqdn = fqdn
- else:
- self.fqdn = u'%s.%s' % (name, self.api.env.domain)
- self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts',
- self.api.env.basedn)
-
- self.description = u'Test host <%s>' % name
- self.location = u'Undisclosed location <%s>' % name
-
- def make_create_command(self, force=True):
- """Make function that creates this host using host_add"""
- return self.make_command('host_add', self.fqdn,
- description=self.description,
- l=self.location,
- force=force)
-
- def make_delete_command(self):
- """Make function that deletes the host using host_del"""
- return self.make_command('host_del', self.fqdn)
-
- def make_retrieve_command(self, all=False, raw=False):
- """Make function that retrieves the host using host_show"""
- return self.make_command('host_show', self.fqdn, all=all, raw=raw)
-
- def make_find_command(self, *args, **kwargs):
- """Make function that finds hosts using host_find
-
- Note that the fqdn (or other search terms) needs to be specified
- in arguments.
- """
- return self.make_command('host_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """Make function that modifies the host using host_mod"""
- return self.make_command('host_mod', self.fqdn, **updates)
-
- def track_create(self):
- """Update expected state for host creation"""
- self.attrs = dict(
- dn=self.dn,
- fqdn=[self.fqdn],
- description=[self.description],
- l=[self.location],
- krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)],
- objectclass=objectclasses.host,
- ipauniqueid=[fuzzy_uuid],
- managedby_host=[self.fqdn],
- has_keytab=False,
- has_password=False,
- cn=[self.fqdn],
- ipakrbokasdelegate=False,
- ipakrbrequirespreauth=True,
- managing_host=[self.fqdn],
- serverhostname=[self.shortname],
- )
- self.exists = True
-
- def check_create(self, result):
- """Check `host_add` command result"""
- assert_deepequal(dict(
- value=self.fqdn,
- summary=u'Added host "%s"' % self.fqdn,
- result=self.filter_attrs(self.create_keys),
- ), result)
-
- def check_delete(self, result):
- """Check `host_del` command result"""
- assert_deepequal(dict(
- value=[self.fqdn],
- summary=u'Deleted host "%s"' % self.fqdn,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False, raw=False):
- """Check `host_show` command result"""
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
- assert_deepequal(dict(
- value=self.fqdn,
- summary=None,
- result=expected,
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """Check `host_find` command result"""
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 host matched',
- result=[expected],
- ), result)
-
- def check_update(self, result, extra_keys=()):
- """Check `host_update` command result"""
- assert_deepequal(dict(
- value=self.fqdn,
- summary=u'Modified host "%s"' % self.fqdn,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
-
@pytest.fixture(scope='class')
def host(request):
tracker = HostTracker(name=u'testhost1')
diff --git a/ipatests/test_xmlrpc/test_stageuser_plugin.py b/ipatests/test_xmlrpc/test_stageuser_plugin.py
index 43c59b7c7..4eb968451 100644
--- a/ipatests/test_xmlrpc/test_stageuser_plugin.py
+++ b/ipatests/test_xmlrpc/test_stageuser_plugin.py
@@ -17,17 +17,17 @@ import six
from ipalib import api, errors
-from ipatests.test_xmlrpc.ldaptracker import Tracker
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.xmlrpc_test import (
XMLRPC_test, fuzzy_digits, fuzzy_uuid, fuzzy_password, fuzzy_string,
fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact)
from ipatests.util import (
- assert_equal, assert_deepequal, assert_not_equal, raises)
+ assert_equal, assert_deepequal, assert_not_equal, raises, get_user_dn)
from ipapython.dn import DN
-from ipatests.test_xmlrpc.test_user_plugin import UserTracker, get_user_dn
-from ipatests.test_xmlrpc.test_group_plugin import GroupTracker
+from ipatests.test_xmlrpc.tracker.user_plugin import UserTracker
+from ipatests.test_xmlrpc.tracker.group_plugin import GroupTracker
+from ipatests.test_xmlrpc.tracker.stageuser_plugin import StageUserTracker
if six.PY3:
unicode = str
@@ -84,245 +84,6 @@ options_ok = [
]
-class StageUserTracker(Tracker):
- """ Tracker class for staged user LDAP object
-
- Implements helper functions for host plugin.
- StageUserTracker object stores information about the user.
- """
-
- retrieve_keys = {
- u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
- u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber',
- u'title', u'memberof', u'nsaccountlock', u'memberofindirect',
- u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink',
- u'ipatokenradiususername', u'krbprincipalexpiration',
- u'usercertificate', u'dn', u'has_keytab', u'has_password',
- u'street', u'postalcode', u'facsimiletelephonenumber',
- u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l',
- u'st', u'mobile', u'pager', }
- retrieve_all_keys = retrieve_keys | {
- u'cn', u'ipauniqueid', u'objectclass', u'description',
- u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
-
- create_keys = retrieve_all_keys | {
- u'objectclass', u'ipauniqueid', u'randompassword',
- u'userpassword', u'krbextradata', u'krblastpwdchange',
- u'krbpasswordexpiration', u'krbprincipalkey'}
-
- update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
- activate_keys = retrieve_keys | {
- u'has_keytab', u'has_password', u'nsaccountlock'}
-
- def __init__(self, name, givenname, sn, **kwargs):
- super(StageUserTracker, self).__init__(default_version=None)
- self.uid = name
- self.givenname = givenname
- self.sn = sn
- self.dn = DN(
- ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
-
- self.kwargs = kwargs
-
- def make_create_command(self, options=None, force=None):
- """ Make function that creates a staged user using stageuser-add """
- if options is not None:
- self.kwargs = options
- return self.make_command('stageuser_add', self.uid,
- givenname=self.givenname,
- sn=self.sn, **self.kwargs)
-
- def make_delete_command(self):
- """ Make function that deletes a staged user using stageuser-del """
- return self.make_command('stageuser_del', self.uid)
-
- def make_retrieve_command(self, all=False, raw=False):
- """ Make function that retrieves a staged user using stageuser-show """
- return self.make_command('stageuser_show', self.uid, all=all)
-
- def make_find_command(self, *args, **kwargs):
- """ Make function that finds staged user using stageuser-find """
- return self.make_command('stageuser_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """ Make function that updates staged user using stageuser-mod """
- return self.make_command('stageuser_mod', self.uid, **updates)
-
- def make_activate_command(self):
- """ Make function that activates staged user
- using stageuser-activate """
- return self.make_command('stageuser_activate', self.uid)
-
- def track_create(self):
- """ Update expected state for staged user creation """
- self.attrs = dict(
- dn=self.dn,
- uid=[self.uid],
- givenname=[self.givenname],
- sn=[self.sn],
- homedirectory=[u'/home/%s' % self.uid],
- displayname=[u'%s %s' % (self.givenname, self.sn)],
- cn=[u'%s %s' % (self.givenname, self.sn)],
- initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
- objectclass=objectclasses.user_base,
- description=[u'__no_upg__'],
- ipauniqueid=[u'autogenerate'],
- uidnumber=[u'-1'],
- gidnumber=[u'-1'],
- krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
- mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
- gecos=[u'%s %s' % (self.givenname, self.sn)],
- loginshell=[u'/bin/sh'],
- has_keytab=False,
- has_password=False,
- nsaccountlock=[u'true'],
- )
-
- for key in self.kwargs:
- if key == u'krbprincipalname':
- self.attrs[key] = [u'%s@%s' % (
- (self.kwargs[key].split('@'))[0].lower(),
- (self.kwargs[key].split('@'))[1])]
- elif key == u'manager':
- self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))]
- elif key == u'ipasshpubkey':
- self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp]
- self.attrs[key] = [self.kwargs[key]]
- elif key == u'random' or key == u'userpassword':
- self.attrs[u'krbextradata'] = [fuzzy_string]
- self.attrs[u'krbpasswordexpiration'] = [
- fuzzy_dergeneralizedtime]
- self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime]
- self.attrs[u'krbprincipalkey'] = [fuzzy_string]
- self.attrs[u'userpassword'] = [fuzzy_string]
- self.attrs[u'has_keytab'] = True
- self.attrs[u'has_password'] = True
- if key == u'random':
- self.attrs[u'randompassword'] = fuzzy_string
- else:
- self.attrs[key] = [self.kwargs[key]]
-
- self.exists = True
-
- def check_create(self, result):
- """ Check 'stageuser-add' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Added stage user "%s"' % self.uid,
- result=self.filter_attrs(self.create_keys),
- ), result)
-
- def check_delete(self, result):
- """ Check 'stageuser-del' command result """
- assert_deepequal(dict(
- value=[self.uid],
- summary=u'Deleted stage user "%s"' % self.uid,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False, raw=False):
- """ Check 'stageuser-show' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- # small override because stageuser-find returns different
- # type of nsaccountlock value than DS, but overall the value
- # fits expected result
- if expected[u'nsaccountlock'] == [u'true']:
- expected[u'nsaccountlock'] = True
- elif expected[u'nsaccountlock'] == [u'false']:
- expected[u'nsaccountlock'] = False
-
- assert_deepequal(dict(
- value=self.uid,
- summary=None,
- result=expected,
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """ Check 'stageuser-find' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- # small override because stageuser-find returns different
- # type of nsaccountlock value than DS, but overall the value
- # fits expected result
- if expected[u'nsaccountlock'] == [u'true']:
- expected[u'nsaccountlock'] = True
- elif expected[u'nsaccountlock'] == [u'false']:
- expected[u'nsaccountlock'] = False
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 user matched',
- result=[expected],
- ), result)
-
- def check_find_nomatch(self, result):
- """ Check 'stageuser-find' command result when no match is expected """
- assert_deepequal(dict(
- count=0,
- truncated=False,
- summary=u'0 users matched',
- result=[],
- ), result)
-
- def check_update(self, result, extra_keys=()):
- """ Check 'stageuser-mod' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Modified stage user "%s"' % self.uid,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- def check_restore_preserved(self, result):
- assert_deepequal(dict(
- value=[self.uid],
- summary=u'Staged user account "%s"' % self.uid,
- result=dict(failed=[]),
- ), result)
-
- def make_fixture_activate(self, request):
- """Make a pytest fixture for a staged user that is to be activated
-
- The fixture ensures the plugin entry does not exist before
- and after the tests that use it. It takes into account
- that the staged user no longer exists after activation,
- therefore the fixture verifies after the tests
- that the staged user doesn't exist instead of deleting it.
- """
- del_command = self.make_delete_command()
- try:
- del_command()
- except errors.NotFound:
- pass
-
- def finish():
- with raises_exact(errors.NotFound(
- reason=u'%s: stage user not found' % self.uid)):
- del_command()
-
- request.addfinalizer(finish)
-
- return self
-
- def create_from_preserved(self, user):
- """ Copies values from preserved user - helper function for
- restoration tests """
- self.attrs = user.attrs
- self.uid = user.uid
- self.givenname = user.givenname
- self.sn = user.sn
- self.dn = DN(
- ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
- self.attrs[u'dn'] = self.dn
-
-
@pytest.fixture(scope='class')
def stageduser(request):
tracker = StageUserTracker(name=u'suser1', givenname=u'staged', sn=u'user')
diff --git a/ipatests/test_xmlrpc/test_user_plugin.py b/ipatests/test_xmlrpc/test_user_plugin.py
index 81185e449..084fb83c4 100644
--- a/ipatests/test_xmlrpc/test_user_plugin.py
+++ b/ipatests/test_xmlrpc/test_user_plugin.py
@@ -23,7 +23,6 @@
Test the `ipalib/plugins/user.py` module.
"""
-import functools
import datetime
import ldap
import re
@@ -31,12 +30,11 @@ import re
from ipalib import api, errors
from ipatests.test_xmlrpc import objectclasses
from ipatests.util import (
- assert_equal, assert_not_equal, raises, assert_deepequal)
+ assert_equal, assert_not_equal, raises)
from xmlrpc_test import (
XMLRPC_test, Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_password,
- fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact)
+ fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc)
from ipapython.dn import DN
-from ipatests.test_xmlrpc.ldaptracker import Tracker
import pytest
user1 = u'tuser1'
@@ -1654,332 +1652,3 @@ class test_denied_bind_with_expired_principal(XMLRPC_test):
krbprincipalexpiration=principal_expiration_string)
self.connection.simple_bind_s(str(get_user_dn(user1)), self.password)
-
-
-class UserTracker(Tracker):
- """ Class for host plugin like tests """
-
- retrieve_keys = {
- u'uid', u'givenname', u'sn', u'homedirectory',
- u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou',
- u'telephonenumber', u'title', u'memberof',
- u'memberofindirect', u'ipauserauthtype', u'userclass',
- u'ipatokenradiusconfiglink', u'ipatokenradiususername',
- u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab',
- u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber',
- u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock',
- u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata',
- u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st'
- }
-
- retrieve_all_keys = retrieve_keys | {
- u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry',
- u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
-
- retrieve_preserved_keys = retrieve_keys - {u'memberof_group'}
- retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'}
-
- create_keys = retrieve_all_keys | {
- u'randompassword', u'mepmanagedentry',
- u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange',
- u'krbprincipalkey', u'randompassword', u'userpassword'
- }
- update_keys = retrieve_keys - {u'dn'}
- activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password',
- u'nsaccountlock', u'sshpubkeyfp'}
-
- find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
- find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'}
-
- def __init__(self, name, givenname, sn, **kwargs):
- super(UserTracker, self).__init__(default_version=None)
- self.uid = name
- self.givenname = givenname
- self.sn = sn
- self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn)
-
- self.kwargs = kwargs
-
- def make_create_command(self, force=None):
- """ Make function that crates a user using user-add """
- return self.make_command(
- 'user_add', self.uid,
- givenname=self.givenname,
- sn=self.sn, **self.kwargs
- )
-
- def make_delete_command(self, no_preserve=True, preserve=False):
- """ Make function that deletes a user using user-del """
-
- if preserve and not no_preserve:
- # necessary to change some user attributes due to moving
- # to different container
- self.attrs[u'dn'] = DN(
- ('uid', self.uid),
- api.env.container_deleteuser,
- api.env.basedn
- )
- self.attrs[u'objectclass'] = objectclasses.user_base
-
- return self.make_command(
- 'user_del', self.uid,
- no_preserve=no_preserve,
- preserve=preserve
- )
-
- def make_retrieve_command(self, all=False, raw=False):
- """ Make function that retrieves a user using user-show """
- return self.make_command('user_show', self.uid, all=all)
-
- def make_find_command(self, *args, **kwargs):
- """ Make function that finds user using user-find """
- return self.make_command('user_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """ Make function that updates user using user-mod """
- return self.make_command('user_mod', self.uid, **updates)
-
- def make_undelete_command(self):
- """ Make function that activates preserved user using user-undel """
- return self.make_command('user_undel', self.uid)
-
- def make_enable_command(self):
- """ Make function that enables user using user-enable """
- return self.make_command('user_enable', self.uid)
-
- def make_stage_command(self):
- """ Make function that restores preserved user by moving it to
- staged container """
- return self.make_command('user_stage', self.uid)
-
- def track_create(self):
- """ Update expected state for user creation """
- self.attrs = dict(
- dn=self.dn,
- uid=[self.uid],
- givenname=[self.givenname],
- sn=[self.sn],
- homedirectory=[u'/home/%s' % self.uid],
- displayname=[u'%s %s' % (self.givenname, self.sn)],
- cn=[u'%s %s' % (self.givenname, self.sn)],
- initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
- objectclass=objectclasses.user,
- description=[u'__no_upg__'],
- ipauniqueid=[fuzzy_uuid],
- uidnumber=[fuzzy_digits],
- gidnumber=[fuzzy_digits],
- krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
- mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
- gecos=[u'%s %s' % (self.givenname, self.sn)],
- loginshell=[u'/bin/sh'],
- has_keytab=False,
- has_password=False,
- mepmanagedentry=[get_group_dn(self.uid)],
- memberof_group=[u'ipausers'],
- )
-
- for key in self.kwargs:
- if key == u'krbprincipalname':
- self.attrs[key] = [u'%s@%s' % (
- (self.kwargs[key].split('@'))[0].lower(),
- (self.kwargs[key].split('@'))[1]
- )]
- else:
- self.attrs[key] = [self.kwargs[key]]
-
- self.exists = True
-
- def check_create(self, result):
- """ Check 'user-add' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Added user "%s"' % self.uid,
- result=self.filter_attrs(self.create_keys),
- ), result)
-
- def check_delete(self, result):
- """ Check 'user-del' command result """
- assert_deepequal(dict(
- value=[self.uid],
- summary=u'Deleted user "%s"' % self.uid,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False):
- """ Check 'user-show' command result """
-
- if u'preserved' in self.attrs and self.attrs[u'preserved']:
- self.retrieve_all_keys = self.retrieve_preserved_all_keys
- self.retrieve_keys = self.retrieve_preserved_keys
- elif u'preserved' not in self.attrs and all:
- self.attrs[u'preserved'] = False
-
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- # small override because stageuser-find returns different type
- # of nsaccountlock value than DS, but overall the value fits
- # expected result
- if u'nsaccountlock' in expected:
- if expected[u'nsaccountlock'] == [u'true']:
- expected[u'nsaccountlock'] = True
- elif expected[u'nsaccountlock'] == [u'false']:
- expected[u'nsaccountlock'] = False
-
- assert_deepequal(dict(
- value=self.uid,
- summary=None,
- result=expected,
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """ Check 'user-find' command result """
- self.attrs[u'nsaccountlock'] = True
- self.attrs[u'preserved'] = True
-
- if all:
- expected = self.filter_attrs(self.find_all_keys)
- else:
- expected = self.filter_attrs(self.find_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 user matched',
- result=[expected],
- ), result)
-
- def check_find_nomatch(self, result):
- """ Check 'user-find' command result when no user should be found """
- assert_deepequal(dict(
- count=0,
- truncated=False,
- summary=u'0 users matched',
- result=[],
- ), result)
-
- def check_update(self, result, extra_keys=()):
- """ Check 'user-mod' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Modified user "%s"' % self.uid,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- def create_from_staged(self, stageduser):
- """ Copies attributes from staged user - helper function for
- activation tests """
- self.attrs = stageduser.attrs
- self.uid = stageduser.uid
- self.givenname = stageduser.givenname
- self.sn = stageduser.sn
-
- self.attrs[u'mepmanagedentry'] = None
- self.attrs[u'dn'] = self.dn
- self.attrs[u'ipauniqueid'] = [fuzzy_uuid]
- self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % (
- api.env.container_group, api.env.basedn
- )]
- self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % (
- self.uid, api.env.container_group, api.env.basedn
- )]
- self.attrs[u'objectclass'] = objectclasses.user
- if self.attrs[u'gidnumber'] == [u'-1']:
- self.attrs[u'gidnumber'] = [fuzzy_digits]
- if self.attrs[u'uidnumber'] == [u'-1']:
- self.attrs[u'uidnumber'] = [fuzzy_digits]
-
- if u'ipasshpubkey' in self.kwargs:
- self.attrs[u'ipasshpubkey'] = [str(
- self.kwargs[u'ipasshpubkey']
- )]
-
- def check_activate(self, result):
- """ Check 'stageuser-activate' command result """
- expected = dict(
- value=self.uid,
- summary=u'Stage user %s activated' % self.uid,
- result=self.filter_attrs(self.activate_keys))
-
- # work around to eliminate inconsistency in returned objectclass
- # (case sensitive assertion)
- expected['result']['objectclass'] = [item.lower() for item in
- expected['result']['objectclass']]
- result['result']['objectclass'] = [item.lower() for item in
- result['result']['objectclass']]
-
- assert_deepequal(expected, result)
-
- self.exists = True
-
- def check_undel(self, result):
- """ Check 'user-undel' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Undeleted user account "%s"' % self.uid,
- result=True
- ), result)
-
- def track_delete(self, preserve=False):
- """Update expected state for host deletion"""
- if preserve:
- self.exists = True
- if u'memberof_group' in self.attrs:
- del self.attrs[u'memberof_group']
- self.attrs[u'nsaccountlock'] = True
- self.attrs[u'preserved'] = True
- else:
- self.exists = False
- self.attrs = {}
-
- def make_preserved_user(self):
- """ 'Creates' a preserved user necessary for some tests """
- self.ensure_exists()
- self.track_delete(preserve=True)
- command = self.make_delete_command(no_preserve=False, preserve=True)
- result = command()
- self.check_delete(result)
-
- def check_attr_preservation(self, expected):
- """ Verifies that ipaUniqueID, uidNumber and gidNumber are
- preserved upon reactivation. Also verifies that resulting
- active user is a member of ipausers group only."""
- command = self.make_retrieve_command(all=True)
- result = command()
-
- assert_deepequal(dict(
- ipauniqueid=result[u'result'][u'ipauniqueid'],
- uidnumber=result[u'result'][u'uidnumber'],
- gidnumber=result[u'result'][u'gidnumber']
- ), expected)
-
- if (u'memberof_group' not in result[u'result'] or
- result[u'result'][u'memberof_group'] != (u'ipausers',)):
- assert False
-
- def make_fixture_restore(self, request):
- """Make a pytest fixture for a preserved user that is to be moved to
- staged area.
-
- The fixture ensures the plugin entry does not exist before
- and after the tests that use it. It takes into account
- that the preserved user no longer exists after restoring it,
- therefore the fixture verifies after the tests
- that the preserved user doesn't exist instead of deleting it.
- """
- del_command = self.make_delete_command()
- try:
- del_command()
- except errors.NotFound:
- pass
-
- def finish():
- with raises_exact(errors.NotFound(
- reason=u'%s: user not found' % self.uid)):
- del_command()
-
- request.addfinalizer(finish)
-
- return self
diff --git a/ipatests/test_xmlrpc/tracker/__init__.py b/ipatests/test_xmlrpc/tracker/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/__init__.py
diff --git a/ipatests/test_xmlrpc/ldaptracker.py b/ipatests/test_xmlrpc/tracker/base.py
index acd382dd3..acd382dd3 100644
--- a/ipatests/test_xmlrpc/ldaptracker.py
+++ b/ipatests/test_xmlrpc/tracker/base.py
diff --git a/ipatests/test_xmlrpc/tracker/caacl_plugin.py b/ipatests/test_xmlrpc/tracker/caacl_plugin.py
new file mode 100644
index 000000000..afe7ee0c0
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/caacl_plugin.py
@@ -0,0 +1,367 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipalib import errors
+from ipatests.util import assert_deepequal
+from ipatests.test_xmlrpc.xmlrpc_test import (fuzzy_caacldn, fuzzy_uuid,
+ fuzzy_ipauniqueid)
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.tracker.base import Tracker
+
+
+class CAACLTracker(Tracker):
+ """Tracker class for CA ACL LDAP object.
+
+ The class implements methods required by the base class
+ to help with basic CRUD operations.
+
+ Methods for adding and deleting actual member entries into an ACL
+ do not have check methods as these would make the class
+ unnecessarily complicated. The checks are implemented as
+ a standalone test suite. However, this makes the test crucial
+ in debugging more complicated test cases. Since the add/remove
+ operation won't be checked right away by the tracker, a problem
+ in this operation may propagate into more complicated test case.
+
+ It is possible to pass a list of member uids to these methods.
+
+ The test uses instances of Fuzzy class to compare results as they
+ are in the UUID format. The dn and rdn properties were modified
+ to reflect this as well.
+ """
+
+ member_keys = {
+ u'memberuser_user', u'memberuser_group',
+ u'memberhost_host', u'memberhost_hostgroup',
+ u'memberservice_service',
+ u'ipamembercertprofile_certprofile'}
+ category_keys = {
+ u'ipacacategory', u'ipacertprofilecategory', u'usercategory',
+ u'hostcategory', u'servicecategory'}
+ retrieve_keys = {
+ u'dn', u'cn', u'description', u'ipaenabledflag',
+ u'ipamemberca', u'ipamembercertprofile', u'memberuser',
+ u'memberhost', u'memberservice'} | member_keys | category_keys
+ retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'}
+ create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory',
+ u'usercategory', u'hostcategory', u'ipacacategory',
+ u'servicecategory', u'ipaenabledflag', u'objectclass',
+ u'ipauniqueid'}
+ update_keys = create_keys - {u'dn'}
+
+ def __init__(self, name, ipacertprofile_category=None, user_category=None,
+ service_category=None, host_category=None, description=None,
+ default_version=None):
+ super(CAACLTracker, self).__init__(default_version=default_version)
+
+ self._name = name
+ self.description = description
+ self._categories = dict(
+ ipacertprofilecategory=ipacertprofile_category,
+ usercategory=user_category,
+ servicecategory=service_category,
+ hostcategory=host_category)
+
+ self.dn = fuzzy_caacldn
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def rdn(self):
+ return fuzzy_ipauniqueid
+
+ @property
+ def categories(self):
+ """To be used in track_create"""
+ return {cat: v for cat, v in self._categories.items() if v}
+
+ @property
+ def create_categories(self):
+ """ Return the categories set on create.
+ Unused categories are left out.
+ """
+ return {cat: [v] for cat, v in self.categories.items() if v}
+
+ def make_create_command(self, force=True):
+ return self.make_command(u'caacl_add', self.name,
+ description=self.description,
+ **self.categories)
+
+ def check_create(self, result):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Added CA ACL "{}"'.format(self.name),
+ result=dict(self.filter_attrs(self.create_keys))
+ ), result)
+
+ def track_create(self):
+ self.attrs = dict(
+ dn=self.dn,
+ ipauniqueid=[fuzzy_uuid],
+ cn=[self.name],
+ objectclass=objectclasses.caacl,
+ ipaenabledflag=[u'TRUE'])
+
+ self.attrs.update(self.create_categories)
+ if self.description:
+ self.attrs.update({u'description', [self.description]})
+
+ self.exists = True
+
+ def make_delete_command(self):
+ return self.make_command('caacl_del', self.name)
+
+ def check_delete(self, result):
+ assert_deepequal(dict(
+ value=[self.name],
+ summary=u'Deleted CA ACL "{}"'.format(self.name),
+ result=dict(failed=[])
+ ), result)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ return self.make_command('caacl_show', self.name, all=all, raw=raw)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.name,
+ summary=None,
+ result=expected
+ ), result)
+
+ def make_find_command(self, *args, **kwargs):
+ return self.make_command('caacl_find', *args, **kwargs)
+
+ def check_find(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 CA ACL matched',
+ result=[expected]
+ ), result)
+
+ def make_update_command(self, updates):
+ return self.make_command('caacl_mod', self.name, **updates)
+
+ def update(self, updates, expected_updates=None, silent=False):
+ """If removing a category, delete it from tracker as well"""
+ # filter out empty categories and track changes
+
+ filtered_updates = dict()
+ for key, value in updates.items():
+ if key in self.category_keys:
+ if not value:
+ try:
+ del self.attrs[key]
+ except IndexError:
+ if silent:
+ pass
+ else:
+ # if there is a value, prepare the pair for update
+ filtered_updates.update({key: value})
+ else:
+ filtered_updates.update({key: value})
+
+ if expected_updates is None:
+ expected_updates = {}
+
+ command = self.make_update_command(updates)
+
+ try:
+ result = command()
+ except errors.EmptyModlist:
+ if silent:
+ self.attrs.update(filtered_updates)
+ self.attrs.update(expected_updates)
+ self.check_update(result,
+ extra_keys=set(self.update_keys) |
+ set(expected_updates.keys()))
+
+ def check_update(self, result, extra_keys=()):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Modified CA ACL "{}"'.format(self.name),
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ # Helper methods for caacl subcommands. The check methods will be
+ # implemented in standalone test
+ #
+ # The methods implemented here will be:
+ # caacl_{add,remove}_{host, service, certprofile, user [, subca]}
+
+ def _add_acl_component(self, command_name, keys, track):
+ """ Add a resource into ACL rule and track it.
+
+ command_name - the name in the API
+ keys = {
+ 'tracker_attr': {
+ 'api_key': 'value'
+ }
+ }
+
+ e.g.
+
+ keys = {
+ 'memberhost_host': {
+ 'host': 'hostname'
+ },
+ 'memberhost_hostgroup': {
+ 'hostgroup': 'hostgroup_name'
+ }
+ }
+ """
+
+ if not self.exists:
+ raise errors.NotFound(reason="The tracked entry doesn't exist.")
+
+ command = self.make_command(command_name, self.name)
+ command_options = dict()
+
+ # track
+ for tracker_attr in keys:
+ api_options = keys[tracker_attr]
+ if track:
+ for option in api_options:
+ try:
+ if type(option) in (list, tuple):
+ self.attrs[tracker_attr].extend(api_options[option])
+ else:
+ self.attrs[tracker_attr].append(api_options[option])
+ except KeyError:
+ if type(option) in (list, tuple):
+ self.attrs[tracker_attr] = api_options[option]
+ else:
+ self.attrs[tracker_attr] = [api_options[option]]
+ # prepare options for the command call
+ command_options.update(api_options)
+
+ return command(**command_options)
+
+ def _remove_acl_component(self, command_name, keys, track):
+ """ Remove a resource from ACL rule and track it.
+
+ command_name - the name in the API
+ keys = {
+ 'tracker_attr': {
+ 'api_key': 'value'
+ }
+ }
+
+ e.g.
+
+ keys = {
+ 'memberhost_host': {
+ 'host': 'hostname'
+ },
+ 'memberhost_hostgroup': {
+ 'hostgroup': 'hostgroup_name'
+ }
+ }
+ """
+ command = self.make_command(command_name, self.name)
+ command_options = dict()
+
+ for tracker_attr in keys:
+ api_options = keys[tracker_attr]
+ if track:
+ for option in api_options:
+ if type(option) in (list, tuple):
+ for item in option:
+ self.attrs[tracker_attr].remove(item)
+ else:
+ self.attrs[tracker_attr].remove(api_options[option])
+ if len(self.attrs[tracker_attr]) == 0:
+ del self.attrs[tracker_attr]
+ command_options.update(api_options)
+
+ return command(**command_options)
+
+ def add_host(self, host=None, hostgroup=None, track=True):
+ """Associates an host or hostgroup entry with the ACL.
+
+ The command takes an unicode string with the name
+ of the entry (RDN).
+
+ It is the responsibility of a test writer to provide
+ the correct value, object type as the method does not
+ verify whether the entry exists.
+
+ The method can add only one entry of each type
+ in one call.
+ """
+
+ options = {
+ u'memberhost_host': {u'host': host},
+ u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
+
+ return self._add_acl_component(u'caacl_add_host', options, track)
+
+ def remove_host(self, host=None, hostgroup=None, track=True):
+ options = {
+ u'memberhost_host': {u'host': host},
+ u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
+
+ return self._remove_acl_component(u'caacl_remove_host', options, track)
+
+ def add_user(self, user=None, group=None, track=True):
+ options = {
+ u'memberuser_user': {u'user': user},
+ u'memberuser_group': {u'group': group}}
+
+ return self._add_acl_component(u'caacl_add_user', options, track)
+
+ def remove_user(self, user=None, group=None, track=True):
+ options = {
+ u'memberuser_user': {u'user': user},
+ u'memberuser_group': {u'group': group}}
+
+ return self._remove_acl_component(u'caacl_remove_user', options, track)
+
+ def add_service(self, service=None, track=True):
+ options = {
+ u'memberservice_service': {u'service': service}}
+
+ return self._add_acl_component(u'caacl_add_service', options, track)
+
+ def remove_service(self, service=None, track=True):
+ options = {
+ u'memberservice_service': {u'service': service}}
+
+ return self._remove_acl_component(u'caacl_remove_service', options, track)
+
+ def add_profile(self, certprofile=None, track=True):
+ options = {
+ u'ipamembercertprofile_certprofile':
+ {u'certprofile': certprofile}}
+
+ return self._add_acl_component(u'caacl_add_profile', options, track)
+
+ def remove_profile(self, certprofile=None, track=True):
+ options = {
+ u'ipamembercertprofile_certprofile':
+ {u'certprofile': certprofile}}
+
+ return self._remove_acl_component(u'caacl_remove_profile', options, track)
+
+ def enable(self):
+ command = self.make_command(u'caacl_enable', self.name)
+ self.attrs.update({u'ipaenabledflag': [u'TRUE']})
+ command()
+
+ def disable(self):
+ command = self.make_command(u'caacl_disable', self.name)
+ self.attrs.update({u'ipaenabledflag': [u'FALSE']})
+ command()
diff --git a/ipatests/test_xmlrpc/tracker/certprofile_plugin.py b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py
new file mode 100644
index 000000000..eeb27eb14
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py
@@ -0,0 +1,133 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+import os
+
+
+from ipapython.dn import DN
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.util import assert_deepequal
+
+
+class CertprofileTracker(Tracker):
+ """Tracker class for certprofile plugin.
+ """
+
+ retrieve_keys = {
+ 'dn', 'cn', 'description', 'ipacertprofilestoreissued'
+ }
+ retrieve_all_keys = retrieve_keys | {'objectclass'}
+ create_keys = retrieve_keys | {'objectclass'}
+ update_keys = retrieve_keys - {'dn'}
+ managedby_keys = retrieve_keys
+ allowedto_keys = retrieve_keys
+
+ def __init__(self, name, store=False, desc='dummy description',
+ profile=None, default_version=None):
+ super(CertprofileTracker, self).__init__(
+ default_version=default_version
+ )
+
+ self.store = store
+ self.description = desc
+ self._profile_path = profile
+
+ self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca',
+ self.api.env.basedn)
+
+ @property
+ def profile(self):
+ if not self._profile_path:
+ return None
+
+ if os.path.isabs(self._profile_path):
+ path = self._profile_path
+ else:
+ path = os.path.join(os.path.dirname(__file__),
+ self._profile_path)
+
+ with open(path, 'r') as f:
+ content = f.read()
+ return unicode(content)
+
+ def make_create_command(self, force=True):
+ if not self.profile:
+ raise RuntimeError('Tracker object without path to profile '
+ 'cannot be used to create profile entry.')
+
+ return self.make_command('certprofile_import', self.name,
+ description=self.description,
+ ipacertprofilestoreissued=self.store,
+ file=self.profile)
+
+ def check_create(self, result):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Imported profile "{}"'.format(self.name),
+ result=dict(self.filter_attrs(self.create_keys))
+ ), result)
+
+ def track_create(self):
+ self.attrs = dict(
+ dn=unicode(self.dn),
+ cn=[self.name],
+ description=[self.description],
+ ipacertprofilestoreissued=[unicode(self.store).upper()],
+ objectclass=objectclasses.certprofile
+ )
+ self.exists = True
+
+ def make_delete_command(self):
+ return self.make_command('certprofile_del', self.name)
+
+ def check_delete(self, result):
+ assert_deepequal(dict(
+ value=[self.name], # correctly a list?
+ summary=u'Deleted profile "{}"'.format(self.name),
+ result=dict(failed=[]),
+ ), result)
+
+ def make_retrieve_command(self, all=False, raw=False, **options):
+ return self.make_command('certprofile_show', self.name, all=all,
+ raw=raw, **options)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.name,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def make_find_command(self, *args, **kwargs):
+ return self.make_command('certprofile_find', *args, **kwargs)
+
+ def check_find(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 profile matched',
+ result=[expected]
+ ), result)
+
+ def make_update_command(self, updates):
+ return self.make_command('certprofile_mod', self.name, **updates)
+
+ def check_update(self, result, extra_keys=()):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Modified Certificate Profile "{}"'.format(self.name),
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
diff --git a/ipatests/test_xmlrpc/tracker/group_plugin.py b/ipatests/test_xmlrpc/tracker/group_plugin.py
new file mode 100644
index 000000000..c47ce8ecf
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/group_plugin.py
@@ -0,0 +1,196 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid
+
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.util import assert_deepequal, get_group_dn
+
+
+class GroupTracker(Tracker):
+ """ Class for host plugin like tests """
+ retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user',
+ u'member_group'}
+ retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'}
+
+ create_keys = retrieve_all_keys
+ update_keys = retrieve_keys - {u'dn'}
+
+ add_member_keys = retrieve_keys | {u'description'}
+
+ def __init__(self, name):
+ super(GroupTracker, self).__init__(default_version=None)
+ self.cn = name
+ self.dn = get_group_dn(name)
+
+ def make_create_command(self, nonposix=False, external=False,
+ force=True):
+ """ Make function that creates a group using 'group-add' """
+ return self.make_command('group_add', self.cn,
+ nonposix=nonposix, external=external)
+
+ def make_delete_command(self):
+ """ Make function that deletes a group using 'group-del' """
+ return self.make_command('group_del', self.cn)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a group using 'group-show' """
+ return self.make_command('group_show', self.cn, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that searches for a group using 'group-find' """
+ return self.make_command('group_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates a group using 'group-mod' """
+ return self.make_command('group_mod', self.cn, **updates)
+
+ def make_add_member_command(self, options={}):
+ """ Make function that adds a member to a group
+ Attention: only works for one user OR group! """
+ if u'user' in options:
+ self.attrs[u'member_user'] = [options[u'user']]
+ elif u'group' in options:
+ self.attrs[u'member_group'] = [options[u'group']]
+ self.adds = options
+
+ return self.make_command('group_add_member', self.cn, **options)
+
+ def make_remove_member_command(self, options={}):
+ """ Make function that removes a member from a group
+ Attention: only works for one user OR group! """
+ if u'user' in options:
+ del self.attrs[u'member_user']
+ elif u'group' in options:
+ del self.attrs[u'member_group']
+ return self.make_command('group_remove_member', self.cn, **options)
+
+ def make_detach_command(self):
+ """ Make function that detaches a managed group using
+ 'group-detach' """
+ self.exists = True
+ return self.make_command('group_detach', self.cn)
+
+ def track_create(self):
+ """ Updates expected state for group creation"""
+ self.attrs = dict(
+ dn=get_group_dn(self.cn),
+ cn=[self.cn],
+ gidnumber=[fuzzy_digits],
+ ipauniqueid=[fuzzy_uuid],
+ objectclass=objectclasses.posixgroup,
+ )
+ self.exists = True
+
+ def check_create(self, result):
+ """ Checks 'group_add' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Added group "%s"' % self.cn,
+ result=self.filter_attrs(self.create_keys)
+ ), result)
+
+ def check_delete(self, result):
+ """ Checks 'group_del' command result """
+ assert_deepequal(dict(
+ value=[self.cn],
+ summary=u'Deleted group "%s"' % self.cn,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """ Checks 'group_show' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=None,
+ result=expected
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Checks 'group_find' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 group matched',
+ result=[expected],
+ ), result)
+
+ def check_update(self, result, extra_keys={}):
+ """ Checks 'group_mod' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Modified group "%s"' % self.cn,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def check_add_member(self, result):
+ """ Checks 'group_add_member' command result """
+ assert_deepequal(dict(
+ completed=1,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ ), result)
+
+ def check_add_member_negative(self, result):
+ """ Checks 'group_add_member' command result when expected result
+ is failure of the operation"""
+ if u'member_user' in self.attrs:
+ del self.attrs[u'member_user']
+ elif u'member_group' in self.attrs:
+ del self.attrs[u'member_group']
+
+ expected = dict(
+ completed=0,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ )
+ if u'user' in self.adds:
+ expected[u'failed'][u'member'][u'user'] = [(
+ self.adds[u'user'], u'no such entry')]
+ elif u'group' in self.adds:
+ expected[u'failed'][u'member'][u'group'] = [(
+ self.adds[u'group'], u'no such entry')]
+
+ assert_deepequal(expected, result)
+
+ def check_remove_member(self, result):
+ """ Checks 'group_remove_member' command result """
+ assert_deepequal(dict(
+ completed=1,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ ), result)
+
+ def check_detach(self, result):
+ """ Checks 'group_detach' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Detached group "%s" from user "%s"' % (
+ self.cn, self.cn),
+ result=True
+ ), result)
+
+ def make_fixture_detach(self, request):
+ """Make a pytest fixture for this tracker
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use itself.
+ """
+ def cleanup():
+ pass
+
+ request.addfinalizer(cleanup)
+
+ return self
diff --git a/ipatests/test_xmlrpc/tracker/host_plugin.py b/ipatests/test_xmlrpc/tracker/host_plugin.py
new file mode 100644
index 000000000..bf199f4f5
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/host_plugin.py
@@ -0,0 +1,154 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from __future__ import print_function
+
+
+from ipapython.dn import DN
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_uuid
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.util import assert_deepequal
+
+
+class HostTracker(Tracker):
+ """Wraps and tracks modifications to a Host object
+
+ Implements the helper functions for host plugin.
+
+ The HostTracker object stores information about the host, e.g.
+ ``fqdn`` and ``dn``.
+ """
+ retrieve_keys = {
+ 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host',
+ 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint',
+ 'serial_number', 'serial_number_hex', 'sha1_fingerprint',
+ 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before',
+ 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user',
+ 'ipaallowedtoperform_read_keys_group',
+ 'ipaallowedtoperform_read_keys_host',
+ 'ipaallowedtoperform_read_keys_hostgroup',
+ 'ipaallowedtoperform_write_keys_user',
+ 'ipaallowedtoperform_write_keys_group',
+ 'ipaallowedtoperform_write_keys_host',
+ 'ipaallowedtoperform_write_keys_hostgroup'}
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid',
+ u'managing_host', u'objectclass', u'serverhostname'}
+ create_keys = retrieve_keys | {'objectclass', 'ipauniqueid',
+ 'randompassword'}
+ update_keys = retrieve_keys - {'dn'}
+ managedby_keys = retrieve_keys - {'has_keytab', 'has_password'}
+ allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
+
+ def __init__(self, name, fqdn=None, default_version=None):
+ super(HostTracker, self).__init__(default_version=default_version)
+
+ self.shortname = name
+ if fqdn:
+ self.fqdn = fqdn
+ else:
+ self.fqdn = u'%s.%s' % (name, self.api.env.domain)
+ self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts',
+ self.api.env.basedn)
+
+ self.description = u'Test host <%s>' % name
+ self.location = u'Undisclosed location <%s>' % name
+
+ def make_create_command(self, force=True):
+ """Make function that creates this host using host_add"""
+ return self.make_command('host_add', self.fqdn,
+ description=self.description,
+ l=self.location,
+ force=force)
+
+ def make_delete_command(self):
+ """Make function that deletes the host using host_del"""
+ return self.make_command('host_del', self.fqdn)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """Make function that retrieves the host using host_show"""
+ return self.make_command('host_show', self.fqdn, all=all, raw=raw)
+
+ def make_find_command(self, *args, **kwargs):
+ """Make function that finds hosts using host_find
+
+ Note that the fqdn (or other search terms) needs to be specified
+ in arguments.
+ """
+ return self.make_command('host_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """Make function that modifies the host using host_mod"""
+ return self.make_command('host_mod', self.fqdn, **updates)
+
+ def track_create(self):
+ """Update expected state for host creation"""
+ self.attrs = dict(
+ dn=self.dn,
+ fqdn=[self.fqdn],
+ description=[self.description],
+ l=[self.location],
+ krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)],
+ objectclass=objectclasses.host,
+ ipauniqueid=[fuzzy_uuid],
+ managedby_host=[self.fqdn],
+ has_keytab=False,
+ has_password=False,
+ cn=[self.fqdn],
+ ipakrbokasdelegate=False,
+ ipakrbrequirespreauth=True,
+ managing_host=[self.fqdn],
+ serverhostname=[self.shortname],
+ )
+ self.exists = True
+
+ def check_create(self, result):
+ """Check `host_add` command result"""
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=u'Added host "%s"' % self.fqdn,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """Check `host_del` command result"""
+ assert_deepequal(dict(
+ value=[self.fqdn],
+ summary=u'Deleted host "%s"' % self.fqdn,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """Check `host_show` command result"""
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """Check `host_find` command result"""
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 host matched',
+ result=[expected],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """Check `host_update` command result"""
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=u'Modified host "%s"' % self.fqdn,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
diff --git a/ipatests/test_xmlrpc/tracker/stageuser_plugin.py b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
new file mode 100644
index 000000000..09ad282e4
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
@@ -0,0 +1,267 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+import six
+
+from ipalib import api, errors
+
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import (
+ fuzzy_string, fuzzy_dergeneralizedtime, raises_exact)
+
+from ipatests.util import assert_deepequal, get_user_dn
+from ipapython.dn import DN
+
+if six.PY3:
+ unicode = str
+
+sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X'
+ 'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI'
+ 'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2'
+ 'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm'
+ 'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01'
+ '9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF'
+ '0L public key test')
+sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B '
+ 'public key test (ssh-rsa)')
+
+
+class StageUserTracker(Tracker):
+ """ Tracker class for staged user LDAP object
+
+ Implements helper functions for host plugin.
+ StageUserTracker object stores information about the user.
+ """
+
+ retrieve_keys = {
+ u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
+ u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber',
+ u'title', u'memberof', u'nsaccountlock', u'memberofindirect',
+ u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink',
+ u'ipatokenradiususername', u'krbprincipalexpiration',
+ u'usercertificate', u'dn', u'has_keytab', u'has_password',
+ u'street', u'postalcode', u'facsimiletelephonenumber',
+ u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l',
+ u'st', u'mobile', u'pager', }
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipauniqueid', u'objectclass', u'description',
+ u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+
+ create_keys = retrieve_all_keys | {
+ u'objectclass', u'ipauniqueid', u'randompassword',
+ u'userpassword', u'krbextradata', u'krblastpwdchange',
+ u'krbpasswordexpiration', u'krbprincipalkey'}
+
+ update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
+ activate_keys = retrieve_keys | {
+ u'has_keytab', u'has_password', u'nsaccountlock'}
+
+ def __init__(self, name, givenname, sn, **kwargs):
+ super(StageUserTracker, self).__init__(default_version=None)
+ self.uid = name
+ self.givenname = givenname
+ self.sn = sn
+ self.dn = DN(
+ ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
+
+ self.kwargs = kwargs
+
+ def make_create_command(self, options=None, force=None):
+ """ Make function that creates a staged user using stageuser-add """
+ if options is not None:
+ self.kwargs = options
+ return self.make_command('stageuser_add', self.uid,
+ givenname=self.givenname,
+ sn=self.sn, **self.kwargs)
+
+ def make_delete_command(self):
+ """ Make function that deletes a staged user using stageuser-del """
+ return self.make_command('stageuser_del', self.uid)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a staged user using stageuser-show """
+ return self.make_command('stageuser_show', self.uid, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that finds staged user using stageuser-find """
+ return self.make_command('stageuser_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates staged user using stageuser-mod """
+ return self.make_command('stageuser_mod', self.uid, **updates)
+
+ def make_activate_command(self):
+ """ Make function that activates staged user
+ using stageuser-activate """
+ return self.make_command('stageuser_activate', self.uid)
+
+ def track_create(self):
+ """ Update expected state for staged user creation """
+ self.attrs = dict(
+ dn=self.dn,
+ uid=[self.uid],
+ givenname=[self.givenname],
+ sn=[self.sn],
+ homedirectory=[u'/home/%s' % self.uid],
+ displayname=[u'%s %s' % (self.givenname, self.sn)],
+ cn=[u'%s %s' % (self.givenname, self.sn)],
+ initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+ objectclass=objectclasses.user_base,
+ description=[u'__no_upg__'],
+ ipauniqueid=[u'autogenerate'],
+ uidnumber=[u'-1'],
+ gidnumber=[u'-1'],
+ krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+ mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+ gecos=[u'%s %s' % (self.givenname, self.sn)],
+ loginshell=[u'/bin/sh'],
+ has_keytab=False,
+ has_password=False,
+ nsaccountlock=[u'true'],
+ )
+
+ for key in self.kwargs:
+ if key == u'krbprincipalname':
+ self.attrs[key] = [u'%s@%s' % (
+ (self.kwargs[key].split('@'))[0].lower(),
+ (self.kwargs[key].split('@'))[1])]
+ elif key == u'manager':
+ self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))]
+ elif key == u'ipasshpubkey':
+ self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp]
+ self.attrs[key] = [self.kwargs[key]]
+ elif key == u'random' or key == u'userpassword':
+ self.attrs[u'krbextradata'] = [fuzzy_string]
+ self.attrs[u'krbpasswordexpiration'] = [
+ fuzzy_dergeneralizedtime]
+ self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime]
+ self.attrs[u'krbprincipalkey'] = [fuzzy_string]
+ self.attrs[u'userpassword'] = [fuzzy_string]
+ self.attrs[u'has_keytab'] = True
+ self.attrs[u'has_password'] = True
+ if key == u'random':
+ self.attrs[u'randompassword'] = fuzzy_string
+ else:
+ self.attrs[key] = [self.kwargs[key]]
+
+ self.exists = True
+
+ def check_create(self, result):
+ """ Check 'stageuser-add' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Added stage user "%s"' % self.uid,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """ Check 'stageuser-del' command result """
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Deleted stage user "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """ Check 'stageuser-show' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different
+ # type of nsaccountlock value than DS, but overall the value
+ # fits expected result
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Check 'stageuser-find' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different
+ # type of nsaccountlock value than DS, but overall the value
+ # fits expected result
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 user matched',
+ result=[expected],
+ ), result)
+
+ def check_find_nomatch(self, result):
+ """ Check 'stageuser-find' command result when no match is expected """
+ assert_deepequal(dict(
+ count=0,
+ truncated=False,
+ summary=u'0 users matched',
+ result=[],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """ Check 'stageuser-mod' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Modified stage user "%s"' % self.uid,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def check_restore_preserved(self, result):
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Staged user account "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def make_fixture_activate(self, request):
+ """Make a pytest fixture for a staged user that is to be activated
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it. It takes into account
+ that the staged user no longer exists after activation,
+ therefore the fixture verifies after the tests
+ that the staged user doesn't exist instead of deleting it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def finish():
+ with raises_exact(errors.NotFound(
+ reason=u'%s: stage user not found' % self.uid)):
+ del_command()
+
+ request.addfinalizer(finish)
+
+ return self
+
+ def create_from_preserved(self, user):
+ """ Copies values from preserved user - helper function for
+ restoration tests """
+ self.attrs = user.attrs
+ self.uid = user.uid
+ self.givenname = user.givenname
+ self.sn = user.sn
+ self.dn = DN(
+ ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
+ self.attrs[u'dn'] = self.dn
diff --git a/ipatests/test_xmlrpc/tracker/user_plugin.py b/ipatests/test_xmlrpc/tracker/user_plugin.py
new file mode 100644
index 000000000..af7d85836
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/user_plugin.py
@@ -0,0 +1,340 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipalib import api, errors
+from ipapython.dn import DN
+
+from ipatests.util import assert_deepequal, get_group_dn
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid, raises_exact
+from ipatests.test_xmlrpc.tracker.base import Tracker
+
+
+class UserTracker(Tracker):
+ """ Class for host plugin like tests """
+
+ retrieve_keys = {
+ u'uid', u'givenname', u'sn', u'homedirectory',
+ u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou',
+ u'telephonenumber', u'title', u'memberof',
+ u'memberofindirect', u'ipauserauthtype', u'userclass',
+ u'ipatokenradiusconfiglink', u'ipatokenradiususername',
+ u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab',
+ u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber',
+ u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock',
+ u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata',
+ u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st'
+ }
+
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry',
+ u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+
+ retrieve_preserved_keys = retrieve_keys - {u'memberof_group'}
+ retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'}
+
+ create_keys = retrieve_all_keys | {
+ u'randompassword', u'mepmanagedentry',
+ u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange',
+ u'krbprincipalkey', u'randompassword', u'userpassword'
+ }
+ update_keys = retrieve_keys - {u'dn'}
+ activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password',
+ u'nsaccountlock', u'sshpubkeyfp'}
+
+ find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
+ find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'}
+
+ def __init__(self, name, givenname, sn, **kwargs):
+ super(UserTracker, self).__init__(default_version=None)
+ self.uid = name
+ self.givenname = givenname
+ self.sn = sn
+ self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn)
+
+ self.kwargs = kwargs
+
+ def make_create_command(self, force=None):
+ """ Make function that crates a user using user-add """
+ return self.make_command(
+ 'user_add', self.uid,
+ givenname=self.givenname,
+ sn=self.sn, **self.kwargs
+ )
+
+ def make_delete_command(self, no_preserve=True, preserve=False):
+ """ Make function that deletes a user using user-del """
+
+ if preserve and not no_preserve:
+ # necessary to change some user attributes due to moving
+ # to different container
+ self.attrs[u'dn'] = DN(
+ ('uid', self.uid),
+ api.env.container_deleteuser,
+ api.env.basedn
+ )
+ self.attrs[u'objectclass'] = objectclasses.user_base
+
+ return self.make_command(
+ 'user_del', self.uid,
+ no_preserve=no_preserve,
+ preserve=preserve
+ )
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a user using user-show """
+ return self.make_command('user_show', self.uid, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that finds user using user-find """
+ return self.make_command('user_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates user using user-mod """
+ return self.make_command('user_mod', self.uid, **updates)
+
+ def make_undelete_command(self):
+ """ Make function that activates preserved user using user-undel """
+ return self.make_command('user_undel', self.uid)
+
+ def make_enable_command(self):
+ """ Make function that enables user using user-enable """
+ return self.make_command('user_enable', self.uid)
+
+ def make_stage_command(self):
+ """ Make function that restores preserved user by moving it to
+ staged container """
+ return self.make_command('user_stage', self.uid)
+
+ def track_create(self):
+ """ Update expected state for user creation """
+ self.attrs = dict(
+ dn=self.dn,
+ uid=[self.uid],
+ givenname=[self.givenname],
+ sn=[self.sn],
+ homedirectory=[u'/home/%s' % self.uid],
+ displayname=[u'%s %s' % (self.givenname, self.sn)],
+ cn=[u'%s %s' % (self.givenname, self.sn)],
+ initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+ objectclass=objectclasses.user,
+ description=[u'__no_upg__'],
+ ipauniqueid=[fuzzy_uuid],
+ uidnumber=[fuzzy_digits],
+ gidnumber=[fuzzy_digits],
+ krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+ mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+ gecos=[u'%s %s' % (self.givenname, self.sn)],
+ loginshell=[u'/bin/sh'],
+ has_keytab=False,
+ has_password=False,
+ mepmanagedentry=[get_group_dn(self.uid)],
+ memberof_group=[u'ipausers'],
+ )
+
+ for key in self.kwargs:
+ if key == u'krbprincipalname':
+ self.attrs[key] = [u'%s@%s' % (
+ (self.kwargs[key].split('@'))[0].lower(),
+ (self.kwargs[key].split('@'))[1]
+ )]
+ else:
+ self.attrs[key] = [self.kwargs[key]]
+
+ self.exists = True
+
+ def check_create(self, result):
+ """ Check 'user-add' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Added user "%s"' % self.uid,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """ Check 'user-del' command result """
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Deleted user "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False):
+ """ Check 'user-show' command result """
+
+ if u'preserved' in self.attrs and self.attrs[u'preserved']:
+ self.retrieve_all_keys = self.retrieve_preserved_all_keys
+ self.retrieve_keys = self.retrieve_preserved_keys
+ elif u'preserved' not in self.attrs and all:
+ self.attrs[u'preserved'] = False
+
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different type
+ # of nsaccountlock value than DS, but overall the value fits
+ # expected result
+ if u'nsaccountlock' in expected:
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Check 'user-find' command result """
+ self.attrs[u'nsaccountlock'] = True
+ self.attrs[u'preserved'] = True
+
+ if all:
+ expected = self.filter_attrs(self.find_all_keys)
+ else:
+ expected = self.filter_attrs(self.find_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 user matched',
+ result=[expected],
+ ), result)
+
+ def check_find_nomatch(self, result):
+ """ Check 'user-find' command result when no user should be found """
+ assert_deepequal(dict(
+ count=0,
+ truncated=False,
+ summary=u'0 users matched',
+ result=[],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """ Check 'user-mod' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Modified user "%s"' % self.uid,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def create_from_staged(self, stageduser):
+ """ Copies attributes from staged user - helper function for
+ activation tests """
+ self.attrs = stageduser.attrs
+ self.uid = stageduser.uid
+ self.givenname = stageduser.givenname
+ self.sn = stageduser.sn
+
+ self.attrs[u'mepmanagedentry'] = None
+ self.attrs[u'dn'] = self.dn
+ self.attrs[u'ipauniqueid'] = [fuzzy_uuid]
+ self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % (
+ api.env.container_group, api.env.basedn
+ )]
+ self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % (
+ self.uid, api.env.container_group, api.env.basedn
+ )]
+ self.attrs[u'objectclass'] = objectclasses.user
+ if self.attrs[u'gidnumber'] == [u'-1']:
+ self.attrs[u'gidnumber'] = [fuzzy_digits]
+ if self.attrs[u'uidnumber'] == [u'-1']:
+ self.attrs[u'uidnumber'] = [fuzzy_digits]
+
+ if u'ipasshpubkey' in self.kwargs:
+ self.attrs[u'ipasshpubkey'] = [str(
+ self.kwargs[u'ipasshpubkey']
+ )]
+
+ def check_activate(self, result):
+ """ Check 'stageuser-activate' command result """
+ expected = dict(
+ value=self.uid,
+ summary=u'Stage user %s activated' % self.uid,
+ result=self.filter_attrs(self.activate_keys))
+
+ # work around to eliminate inconsistency in returned objectclass
+ # (case sensitive assertion)
+ expected['result']['objectclass'] = [item.lower() for item in
+ expected['result']['objectclass']]
+ result['result']['objectclass'] = [item.lower() for item in
+ result['result']['objectclass']]
+
+ assert_deepequal(expected, result)
+
+ self.exists = True
+
+ def check_undel(self, result):
+ """ Check 'user-undel' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Undeleted user account "%s"' % self.uid,
+ result=True
+ ), result)
+
+ def track_delete(self, preserve=False):
+ """Update expected state for host deletion"""
+ if preserve:
+ self.exists = True
+ if u'memberof_group' in self.attrs:
+ del self.attrs[u'memberof_group']
+ self.attrs[u'nsaccountlock'] = True
+ self.attrs[u'preserved'] = True
+ else:
+ self.exists = False
+ self.attrs = {}
+
+ def make_preserved_user(self):
+ """ 'Creates' a preserved user necessary for some tests """
+ self.ensure_exists()
+ self.track_delete(preserve=True)
+ command = self.make_delete_command(no_preserve=False, preserve=True)
+ result = command()
+ self.check_delete(result)
+
+ def check_attr_preservation(self, expected):
+ """ Verifies that ipaUniqueID, uidNumber and gidNumber are
+ preserved upon reactivation. Also verifies that resulting
+ active user is a member of ipausers group only."""
+ command = self.make_retrieve_command(all=True)
+ result = command()
+
+ assert_deepequal(dict(
+ ipauniqueid=result[u'result'][u'ipauniqueid'],
+ uidnumber=result[u'result'][u'uidnumber'],
+ gidnumber=result[u'result'][u'gidnumber']
+ ), expected)
+
+ if (u'memberof_group' not in result[u'result'] or
+ result[u'result'][u'memberof_group'] != (u'ipausers',)):
+ assert False
+
+ def make_fixture_restore(self, request):
+ """Make a pytest fixture for a preserved user that is to be moved to
+ staged area.
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it. It takes into account
+ that the preserved user no longer exists after restoring it,
+ therefore the fixture verifies after the tests
+ that the preserved user doesn't exist instead of deleting it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def finish():
+ with raises_exact(errors.NotFound(
+ reason=u'%s: user not found' % self.uid)):
+ del_command()
+
+ request.addfinalizer(finish)
+
+ return self
diff --git a/ipatests/util.py b/ipatests/util.py
index c3c69816e..6aefe74d3 100644
--- a/ipatests/util.py
+++ b/ipatests/util.py
@@ -706,3 +706,9 @@ def change_principal(user, password, client=None, path=None):
client.Backend.rpcclient.disconnect()
client.Backend.rpcclient.connect()
+
+def get_group_dn(cn):
+ return DN(('cn', cn), api.env.container_group, api.env.basedn)
+
+def get_user_dn(uid):
+ return DN(('uid', uid), api.env.container_user, api.env.basedn)