summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMilan KubĂ­k <mkubik@redhat.com>2015-11-19 16:07:29 +0100
committerMartin Basti <mbasti@redhat.com>2015-12-02 17:12:24 +0100
commit17f9ca154b47f1e21797d25435e25676fdca284c (patch)
treefd3f5b2976acd3ca0718c88dbbe35782982be2b6
parentb8c619a7139bd7b65caa03b68431e22791ff19bf (diff)
Separated Tracker implementations into standalone package
The previous way of implementing trackers in the module with the test caused circular imports. The separate package resolves this issue. https://fedorahosted.org/freeipa/ticket/5467 Reviewed-By: Ales 'alich' Marecek <amarecek@redhat.com>
-rw-r--r--ipatests/setup.py.in3
-rw-r--r--ipatests/test_xmlrpc/test_caacl_plugin.py367
-rw-r--r--ipatests/test_xmlrpc/test_caacl_profile_enforcement.py4
-rw-r--r--ipatests/test_xmlrpc/test_certprofile_plugin.py126
-rw-r--r--ipatests/test_xmlrpc/test_group_plugin.py192
-rw-r--r--ipatests/test_xmlrpc/test_host_plugin.py144
-rw-r--r--ipatests/test_xmlrpc/test_stageuser_plugin.py247
-rw-r--r--ipatests/test_xmlrpc/test_user_plugin.py335
-rw-r--r--ipatests/test_xmlrpc/tracker/__init__.py0
-rw-r--r--ipatests/test_xmlrpc/tracker/base.py (renamed from ipatests/test_xmlrpc/ldaptracker.py)0
-rw-r--r--ipatests/test_xmlrpc/tracker/caacl_plugin.py367
-rw-r--r--ipatests/test_xmlrpc/tracker/certprofile_plugin.py133
-rw-r--r--ipatests/test_xmlrpc/tracker/group_plugin.py196
-rw-r--r--ipatests/test_xmlrpc/tracker/host_plugin.py154
-rw-r--r--ipatests/test_xmlrpc/tracker/stageuser_plugin.py267
-rw-r--r--ipatests/test_xmlrpc/tracker/user_plugin.py340
-rw-r--r--ipatests/util.py6
17 files changed, 1480 insertions, 1401 deletions
diff --git a/ipatests/setup.py.in b/ipatests/setup.py.in
index afc77ad56..ce1efb761 100644
--- a/ipatests/setup.py.in
+++ b/ipatests/setup.py.in
@@ -76,7 +76,8 @@ def setup_package():
"ipatests.test_ipaserver.test_install",
"ipatests.test_pkcs10",
"ipatests.test_webui",
- "ipatests.test_xmlrpc"],
+ "ipatests.test_xmlrpc",
+ "ipatests.test_xmlrpc.tracker"],
scripts=['ipa-run-tests', 'ipa-test-config', 'ipa-test-task'],
package_data = {
'ipatests': ['pytest.ini'],
diff --git a/ipatests/test_xmlrpc/test_caacl_plugin.py b/ipatests/test_xmlrpc/test_caacl_plugin.py
index 8b156a65a..d5ded1951 100644
--- a/ipatests/test_xmlrpc/test_caacl_plugin.py
+++ b/ipatests/test_xmlrpc/test_caacl_plugin.py
@@ -9,373 +9,12 @@ Test the `ipalib.plugins.caacl` module.
import pytest
from ipalib import errors
-from ipatests.test_xmlrpc.ldaptracker import Tracker
-from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test, fuzzy_caacldn,
- fuzzy_uuid, fuzzy_ipauniqueid)
-
-from ipatests.test_xmlrpc import objectclasses
-from ipatests.util import assert_deepequal
+from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
# reuse the fixture
from ipatests.test_xmlrpc.test_certprofile_plugin import default_profile
-from ipatests.test_xmlrpc.test_stageuser_plugin import StageUserTracker
-
-
-class CAACLTracker(Tracker):
- """Tracker class for CA ACL LDAP object.
-
- The class implements methods required by the base class
- to help with basic CRUD operations.
-
- Methods for adding and deleting actual member entries into an ACL
- do not have check methods as these would make the class
- unnecessarily complicated. The checks are implemented as
- a standalone test suite. However, this makes the test crucial
- in debugging more complicated test cases. Since the add/remove
- operation won't be checked right away by the tracker, a problem
- in this operation may propagate into more complicated test case.
-
- It is possible to pass a list of member uids to these methods.
-
- The test uses instances of Fuzzy class to compare results as they
- are in the UUID format. The dn and rdn properties were modified
- to reflect this as well.
- """
-
- member_keys = {
- u'memberuser_user', u'memberuser_group',
- u'memberhost_host', u'memberhost_hostgroup',
- u'memberservice_service',
- u'ipamembercertprofile_certprofile'}
- category_keys = {
- u'ipacacategory', u'ipacertprofilecategory', u'usercategory',
- u'hostcategory', u'servicecategory'}
- retrieve_keys = {
- u'dn', u'cn', u'description', u'ipaenabledflag',
- u'ipamemberca', u'ipamembercertprofile', u'memberuser',
- u'memberhost', u'memberservice'} | member_keys | category_keys
- retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'}
- create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory',
- u'usercategory', u'hostcategory', u'ipacacategory',
- u'servicecategory', u'ipaenabledflag', u'objectclass',
- u'ipauniqueid'}
- update_keys = create_keys - {u'dn'}
-
- def __init__(self, name, ipacertprofile_category=None, user_category=None,
- service_category=None, host_category=None, description=None,
- default_version=None):
- super(CAACLTracker, self).__init__(default_version=default_version)
-
- self._name = name
- self.description = description
- self._categories = dict(
- ipacertprofilecategory=ipacertprofile_category,
- usercategory=user_category,
- servicecategory=service_category,
- hostcategory=host_category)
-
- self.dn = fuzzy_caacldn
-
- @property
- def name(self):
- return self._name
-
- @property
- def rdn(self):
- return fuzzy_ipauniqueid
-
- @property
- def categories(self):
- """To be used in track_create"""
- return {cat: v for cat, v in self._categories.items() if v}
-
- @property
- def create_categories(self):
- """ Return the categories set on create.
- Unused categories are left out.
- """
- return {cat: [v] for cat, v in self.categories.items() if v}
-
- def make_create_command(self, force=True):
- return self.make_command(u'caacl_add', self.name,
- description=self.description,
- **self.categories)
-
- def check_create(self, result):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Added CA ACL "{}"'.format(self.name),
- result=dict(self.filter_attrs(self.create_keys))
- ), result)
-
- def track_create(self):
- self.attrs = dict(
- dn=self.dn,
- ipauniqueid=[fuzzy_uuid],
- cn=[self.name],
- objectclass=objectclasses.caacl,
- ipaenabledflag=[u'TRUE'])
-
- self.attrs.update(self.create_categories)
- if self.description:
- self.attrs.update({u'description', [self.description]})
-
- self.exists = True
-
- def make_delete_command(self):
- return self.make_command('caacl_del', self.name)
-
- def check_delete(self, result):
- assert_deepequal(dict(
- value=[self.name],
- summary=u'Deleted CA ACL "{}"'.format(self.name),
- result=dict(failed=[])
- ), result)
-
- def make_retrieve_command(self, all=False, raw=False):
- return self.make_command('caacl_show', self.name, all=all, raw=raw)
-
- def check_retrieve(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- value=self.name,
- summary=None,
- result=expected
- ), result)
-
- def make_find_command(self, *args, **kwargs):
- return self.make_command('caacl_find', *args, **kwargs)
-
- def check_find(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 CA ACL matched',
- result=[expected]
- ), result)
-
- def make_update_command(self, updates):
- return self.make_command('caacl_mod', self.name, **updates)
-
- def update(self, updates, expected_updates=None, silent=False):
- """If removing a category, delete it from tracker as well"""
- # filter out empty categories and track changes
-
- filtered_updates = dict()
- for key, value in updates.items():
- if key in self.category_keys:
- if not value:
- try:
- del self.attrs[key]
- except IndexError:
- if silent:
- pass
- else:
- # if there is a value, prepare the pair for update
- filtered_updates.update({key: value})
- else:
- filtered_updates.update({key: value})
-
- if expected_updates is None:
- expected_updates = {}
-
- command = self.make_update_command(updates)
-
- try:
- result = command()
- except errors.EmptyModlist:
- if silent:
- self.attrs.update(filtered_updates)
- self.attrs.update(expected_updates)
- self.check_update(result,
- extra_keys=set(self.update_keys) |
- set(expected_updates.keys()))
-
- def check_update(self, result, extra_keys=()):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Modified CA ACL "{}"'.format(self.name),
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- # Helper methods for caacl subcommands. The check methods will be
- # implemented in standalone test
- #
- # The methods implemented here will be:
- # caacl_{add,remove}_{host, service, certprofile, user [, subca]}
-
- def _add_acl_component(self, command_name, keys, track):
- """ Add a resource into ACL rule and track it.
-
- command_name - the name in the API
- keys = {
- 'tracker_attr': {
- 'api_key': 'value'
- }
- }
-
- e.g.
-
- keys = {
- 'memberhost_host': {
- 'host': 'hostname'
- },
- 'memberhost_hostgroup': {
- 'hostgroup': 'hostgroup_name'
- }
- }
- """
-
- if not self.exists:
- raise errors.NotFound(reason="The tracked entry doesn't exist.")
-
- command = self.make_command(command_name, self.name)
- command_options = dict()
-
- # track
- for tracker_attr in keys:
- api_options = keys[tracker_attr]
- if track:
- for option in api_options:
- try:
- if type(option) in (list, tuple):
- self.attrs[tracker_attr].extend(api_options[option])
- else:
- self.attrs[tracker_attr].append(api_options[option])
- except KeyError:
- if type(option) in (list, tuple):
- self.attrs[tracker_attr] = api_options[option]
- else:
- self.attrs[tracker_attr] = [api_options[option]]
- # prepare options for the command call
- command_options.update(api_options)
-
- return command(**command_options)
-
- def _remove_acl_component(self, command_name, keys, track):
- """ Remove a resource from ACL rule and track it.
-
- command_name - the name in the API
- keys = {
- 'tracker_attr': {
- 'api_key': 'value'
- }
- }
-
- e.g.
-
- keys = {
- 'memberhost_host': {
- 'host': 'hostname'
- },
- 'memberhost_hostgroup': {
- 'hostgroup': 'hostgroup_name'
- }
- }
- """
- command = self.make_command(command_name, self.name)
- command_options = dict()
-
- for tracker_attr in keys:
- api_options = keys[tracker_attr]
- if track:
- for option in api_options:
- if type(option) in (list, tuple):
- for item in option:
- self.attrs[tracker_attr].remove(item)
- else:
- self.attrs[tracker_attr].remove(api_options[option])
- if len(self.attrs[tracker_attr]) == 0:
- del self.attrs[tracker_attr]
- command_options.update(api_options)
-
- return command(**command_options)
-
- def add_host(self, host=None, hostgroup=None, track=True):
- """Associates an host or hostgroup entry with the ACL.
-
- The command takes an unicode string with the name
- of the entry (RDN).
-
- It is the responsibility of a test writer to provide
- the correct value, object type as the method does not
- verify whether the entry exists.
-
- The method can add only one entry of each type
- in one call.
- """
-
- options = {
- u'memberhost_host': {u'host': host},
- u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
-
- return self._add_acl_component(u'caacl_add_host', options, track)
-
- def remove_host(self, host=None, hostgroup=None, track=True):
- options = {
- u'memberhost_host': {u'host': host},
- u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
-
- return self._remove_acl_component(u'caacl_remove_host', options, track)
-
- def add_user(self, user=None, group=None, track=True):
- options = {
- u'memberuser_user': {u'user': user},
- u'memberuser_group': {u'group': group}}
-
- return self._add_acl_component(u'caacl_add_user', options, track)
-
- def remove_user(self, user=None, group=None, track=True):
- options = {
- u'memberuser_user': {u'user': user},
- u'memberuser_group': {u'group': group}}
-
- return self._remove_acl_component(u'caacl_remove_user', options, track)
-
- def add_service(self, service=None, track=True):
- options = {
- u'memberservice_service': {u'service': service}}
-
- return self._add_acl_component(u'caacl_add_service', options, track)
-
- def remove_service(self, service=None, track=True):
- options = {
- u'memberservice_service': {u'service': service}}
-
- return self._remove_acl_component(u'caacl_remove_service', options, track)
-
- def add_profile(self, certprofile=None, track=True):
- options = {
- u'ipamembercertprofile_certprofile':
- {u'certprofile': certprofile}}
-
- return self._add_acl_component(u'caacl_add_profile', options, track)
-
- def remove_profile(self, certprofile=None, track=True):
- options = {
- u'ipamembercertprofile_certprofile':
- {u'certprofile': certprofile}}
-
- return self._remove_acl_component(u'caacl_remove_profile', options, track)
-
- def enable(self):
- command = self.make_command(u'caacl_enable', self.name)
- self.attrs.update({u'ipaenabledflag': [u'TRUE']})
- command()
-
- def disable(self):
- command = self.make_command(u'caacl_disable', self.name)
- self.attrs.update({u'ipaenabledflag': [u'FALSE']})
- command()
+from ipatests.test_xmlrpc.tracker.caacl_plugin import CAACLTracker
+from ipatests.test_xmlrpc.tracker.stageuser_plugin import StageUserTracker
@pytest.fixture(scope='class')
diff --git a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
index 78262ae8c..9de257a26 100644
--- a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
+++ b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
@@ -11,8 +11,8 @@ from ipalib import api, errors
from ipatests.util import (
prepare_config, unlock_principal_password, change_principal)
from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
-from ipatests.test_xmlrpc.test_certprofile_plugin import CertprofileTracker
-from ipatests.test_xmlrpc.test_caacl_plugin import CAACLTracker
+from ipatests.test_xmlrpc.tracker.certprofile_plugin import CertprofileTracker
+from ipatests.test_xmlrpc.tracker.caacl_plugin import CAACLTracker
from ipapython.ipautil import run
diff --git a/ipatests/test_xmlrpc/test_certprofile_plugin.py b/ipatests/test_xmlrpc/test_certprofile_plugin.py
index 1f06f99f5..66a72de3e 100644
--- a/ipatests/test_xmlrpc/test_certprofile_plugin.py
+++ b/ipatests/test_xmlrpc/test_certprofile_plugin.py
@@ -12,133 +12,9 @@ import os
import pytest
from ipalib import api, errors
-from ipapython.dn import DN
from ipatests.util import prepare_config
-from ipatests.test_xmlrpc.ldaptracker import Tracker
from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test, raises_exact
-from ipatests.test_xmlrpc import objectclasses
-from ipatests.util import assert_deepequal
-
-
-class CertprofileTracker(Tracker):
- """Tracker class for certprofile plugin.
- """
-
- retrieve_keys = {
- 'dn', 'cn', 'description', 'ipacertprofilestoreissued'
- }
- retrieve_all_keys = retrieve_keys | {'objectclass'}
- create_keys = retrieve_keys | {'objectclass'}
- update_keys = retrieve_keys - {'dn'}
- managedby_keys = retrieve_keys
- allowedto_keys = retrieve_keys
-
- def __init__(self, name, store=False, desc='dummy description',
- profile=None, default_version=None):
- super(CertprofileTracker, self).__init__(
- default_version=default_version
- )
-
- self.store = store
- self.description = desc
- self._profile_path = profile
-
- self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca',
- self.api.env.basedn)
-
- @property
- def profile(self):
- if not self._profile_path:
- return None
-
- if os.path.isabs(self._profile_path):
- path = self._profile_path
- else:
- path = os.path.join(os.path.dirname(__file__),
- self._profile_path)
-
- with open(path, 'r') as f:
- content = f.read()
- return unicode(content)
-
- def make_create_command(self, force=True):
- if not self.profile:
- raise RuntimeError('Tracker object without path to profile '
- 'cannot be used to create profile entry.')
-
- return self.make_command('certprofile_import', self.name,
- description=self.description,
- ipacertprofilestoreissued=self.store,
- file=self.profile)
-
- def check_create(self, result):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Imported profile "{}"'.format(self.name),
- result=dict(self.filter_attrs(self.create_keys))
- ), result)
-
- def track_create(self):
- self.attrs = dict(
- dn=unicode(self.dn),
- cn=[self.name],
- description=[self.description],
- ipacertprofilestoreissued=[unicode(self.store).upper()],
- objectclass=objectclasses.certprofile
- )
- self.exists = True
-
- def make_delete_command(self):
- return self.make_command('certprofile_del', self.name)
-
- def check_delete(self, result):
- assert_deepequal(dict(
- value=[self.name], # correctly a list?
- summary=u'Deleted profile "{}"'.format(self.name),
- result=dict(failed=[]),
- ), result)
-
- def make_retrieve_command(self, all=False, raw=False, **options):
- return self.make_command('certprofile_show', self.name, all=all,
- raw=raw, **options)
-
- def check_retrieve(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- value=self.name,
- summary=None,
- result=expected,
- ), result)
-
- def make_find_command(self, *args, **kwargs):
- return self.make_command('certprofile_find', *args, **kwargs)
-
- def check_find(self, result, all=False, raw=False):
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 profile matched',
- result=[expected]
- ), result)
-
- def make_update_command(self, updates):
- return self.make_command('certprofile_mod', self.name, **updates)
-
- def check_update(self, result, extra_keys=()):
- assert_deepequal(dict(
- value=self.name,
- summary=u'Modified Certificate Profile "{}"'.format(self.name),
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
+from ipatests.test_xmlrpc.tracker.certprofile_plugin import CertprofileTracker
IPA_CERT_SUBJ_BASE = (
diff --git a/ipatests/test_xmlrpc/test_group_plugin.py b/ipatests/test_xmlrpc/test_group_plugin.py
index ed38c696e..f2bd0f4b9 100644
--- a/ipatests/test_xmlrpc/test_group_plugin.py
+++ b/ipatests/test_xmlrpc/test_group_plugin.py
@@ -26,13 +26,12 @@ import pytest
from ipalib import api, errors
from ipatests.test_xmlrpc import objectclasses
-from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci,
+from ipatests.test_xmlrpc.xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_set_ci,
add_sid, add_oc, XMLRPC_test, raises_exact)
from ipapython.dn import DN
from ipatests.test_xmlrpc.test_user_plugin import get_user_result
-from ipatests.test_xmlrpc.ldaptracker import Tracker
-from ipatests.test_xmlrpc.test_user_plugin import UserTracker
+from ipatests.test_xmlrpc.tracker.user_plugin import UserTracker
from ipatests.util import assert_deepequal
@@ -1161,190 +1160,3 @@ class test_group_full_set_of_objectclass_not_available_post_detach(Declarative):
),
),
]
-
-
-class GroupTracker(Tracker):
- """ Class for host plugin like tests """
- retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user',
- u'member_group'}
- retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'}
-
- create_keys = retrieve_all_keys
- update_keys = retrieve_keys - {u'dn'}
-
- add_member_keys = retrieve_keys | {u'description'}
-
- def __init__(self, name):
- super(GroupTracker, self).__init__(default_version=None)
- self.cn = name
- self.dn = get_group_dn(name)
-
- def make_create_command(self, nonposix=False, external=False,
- force=True):
- """ Make function that creates a group using 'group-add' """
- return self.make_command('group_add', self.cn,
- nonposix=nonposix, external=external)
-
- def make_delete_command(self):
- """ Make function that deletes a group using 'group-del' """
- return self.make_command('group_del', self.cn)
-
- def make_retrieve_command(self, all=False, raw=False):
- """ Make function that retrieves a group using 'group-show' """
- return self.make_command('group_show', self.cn, all=all)
-
- def make_find_command(self, *args, **kwargs):
- """ Make function that searches for a group using 'group-find' """
- return self.make_command('group_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """ Make function that updates a group using 'group-mod' """
- return self.make_command('group_mod', self.cn, **updates)
-
- def make_add_member_command(self, options={}):
- """ Make function that adds a member to a group
- Attention: only works for one user OR group! """
- if u'user' in options:
- self.attrs[u'member_user'] = [options[u'user']]
- elif u'group' in options:
- self.attrs[u'member_group'] = [options[u'group']]
- self.adds = options
-
- return self.make_command('group_add_member', self.cn, **options)
-
- def make_remove_member_command(self, options={}):
- """ Make function that removes a member from a group
- Attention: only works for one user OR group! """
- if u'user' in options:
- del self.attrs[u'member_user']
- elif u'group' in options:
- del self.attrs[u'member_group']
- return self.make_command('group_remove_member', self.cn, **options)
-
- def make_detach_command(self):
- """ Make function that detaches a managed group using
- 'group-detach' """
- self.exists = True
- return self.make_command('group_detach', self.cn)
-
- def track_create(self):
- """ Updates expected state for group creation"""
- self.attrs = dict(
- dn=get_group_dn(self.cn),
- cn=[self.cn],
- gidnumber=[fuzzy_digits],
- ipauniqueid=[fuzzy_uuid],
- objectclass=objectclasses.posixgroup,
- )
- self.exists = True
-
- def check_create(self, result):
- """ Checks 'group_add' command result """
- assert_deepequal(dict(
- value=self.cn,
- summary=u'Added group "%s"' % self.cn,
- result=self.filter_attrs(self.create_keys)
- ), result)
-
- def check_delete(self, result):
- """ Checks 'group_del' command result """
- assert_deepequal(dict(
- value=[self.cn],
- summary=u'Deleted group "%s"' % self.cn,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False, raw=False):
- """ Checks 'group_show' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- value=self.cn,
- summary=None,
- result=expected
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """ Checks 'group_find' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 group matched',
- result=[expected],
- ), result)
-
- def check_update(self, result, extra_keys={}):
- """ Checks 'group_mod' command result """
- assert_deepequal(dict(
- value=self.cn,
- summary=u'Modified group "%s"' % self.cn,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- def check_add_member(self, result):
- """ Checks 'group_add_member' command result """
- assert_deepequal(dict(
- completed=1,
- failed={u'member': {u'group': (), u'user': ()}},
- result=self.filter_attrs(self.add_member_keys)
- ), result)
-
- def check_add_member_negative(self, result):
- """ Checks 'group_add_member' command result when expected result
- is failure of the operation"""
- if u'member_user' in self.attrs:
- del self.attrs[u'member_user']
- elif u'member_group' in self.attrs:
- del self.attrs[u'member_group']
-
- expected = dict(
- completed=0,
- failed={u'member': {u'group': (), u'user': ()}},
- result=self.filter_attrs(self.add_member_keys)
- )
- if u'user' in self.adds:
- expected[u'failed'][u'member'][u'user'] = [(
- self.adds[u'user'], u'no such entry')]
- elif u'group' in self.adds:
- expected[u'failed'][u'member'][u'group'] = [(
- self.adds[u'group'], u'no such entry')]
-
- assert_deepequal(expected, result)
-
- def check_remove_member(self, result):
- """ Checks 'group_remove_member' command result """
- assert_deepequal(dict(
- completed=1,
- failed={u'member': {u'group': (), u'user': ()}},
- result=self.filter_attrs(self.add_member_keys)
- ), result)
-
- def check_detach(self, result):
- """ Checks 'group_detach' command result """
- assert_deepequal(dict(
- value=self.cn,
- summary=u'Detached group "%s" from user "%s"' % (
- self.cn, self.cn),
- result=True
- ), result)
-
- def make_fixture_detach(self, request):
- """Make a pytest fixture for this tracker
-
- The fixture ensures the plugin entry does not exist before
- and after the tests that use itself.
- """
- def cleanup():
- pass
-
- request.addfinalizer(cleanup)
-
- return self
diff --git a/ipatests/test_xmlrpc/test_host_plugin.py b/ipatests/test_xmlrpc/test_host_plugin.py
index 868c09ce8..be2b2d6af 100644
--- a/ipatests/test_xmlrpc/test_host_plugin.py
+++ b/ipatests/test_xmlrpc/test_host_plugin.py
@@ -34,12 +34,12 @@ from ipapython import ipautil
from ipalib import api, errors, x509
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
-from ipatests.test_xmlrpc.ldaptracker import Tracker
from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test,
fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer,
fuzzy_hex, raises_exact)
from ipatests.test_xmlrpc.test_user_plugin import get_group_dn
from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.tracker.host_plugin import HostTracker
from ipatests.test_xmlrpc.testcert import get_testcert
from ipatests.util import assert_deepequal
@@ -99,148 +99,6 @@ host_cert = get_testcert(DN(('CN', api.env.host), x509.subject_base()),
'host/%s@%s' % (api.env.host, api.env.realm))
-class HostTracker(Tracker):
- """Wraps and tracks modifications to a Host object
-
- Implements the helper functions for host plugin.
-
- The HostTracker object stores information about the host, e.g.
- ``fqdn`` and ``dn``.
- """
- retrieve_keys = {
- 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host',
- 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint',
- 'serial_number', 'serial_number_hex', 'sha1_fingerprint',
- 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before',
- 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user',
- 'ipaallowedtoperform_read_keys_group',
- 'ipaallowedtoperform_read_keys_host',
- 'ipaallowedtoperform_read_keys_hostgroup',
- 'ipaallowedtoperform_write_keys_user',
- 'ipaallowedtoperform_write_keys_group',
- 'ipaallowedtoperform_write_keys_host',
- 'ipaallowedtoperform_write_keys_hostgroup'}
- retrieve_all_keys = retrieve_keys | {
- u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid',
- u'managing_host', u'objectclass', u'serverhostname'}
- create_keys = retrieve_keys | {'objectclass', 'ipauniqueid',
- 'randompassword'}
- update_keys = retrieve_keys - {'dn'}
- managedby_keys = retrieve_keys - {'has_keytab', 'has_password'}
- allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
-
- def __init__(self, name, fqdn=None, default_version=None):
- super(HostTracker, self).__init__(default_version=default_version)
-
- self.shortname = name
- if fqdn:
- self.fqdn = fqdn
- else:
- self.fqdn = u'%s.%s' % (name, self.api.env.domain)
- self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts',
- self.api.env.basedn)
-
- self.description = u'Test host <%s>' % name
- self.location = u'Undisclosed location <%s>' % name
-
- def make_create_command(self, force=True):
- """Make function that creates this host using host_add"""
- return self.make_command('host_add', self.fqdn,
- description=self.description,
- l=self.location,
- force=force)
-
- def make_delete_command(self):
- """Make function that deletes the host using host_del"""
- return self.make_command('host_del', self.fqdn)
-
- def make_retrieve_command(self, all=False, raw=False):
- """Make function that retrieves the host using host_show"""
- return self.make_command('host_show', self.fqdn, all=all, raw=raw)
-
- def make_find_command(self, *args, **kwargs):
- """Make function that finds hosts using host_find
-
- Note that the fqdn (or other search terms) needs to be specified
- in arguments.
- """
- return self.make_command('host_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """Make function that modifies the host using host_mod"""
- return self.make_command('host_mod', self.fqdn, **updates)
-
- def track_create(self):
- """Update expected state for host creation"""
- self.attrs = dict(
- dn=self.dn,
- fqdn=[self.fqdn],
- description=[self.description],
- l=[self.location],
- krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)],
- objectclass=objectclasses.host,
- ipauniqueid=[fuzzy_uuid],
- managedby_host=[self.fqdn],
- has_keytab=False,
- has_password=False,
- cn=[self.fqdn],
- ipakrbokasdelegate=False,
- ipakrbrequirespreauth=True,
- managing_host=[self.fqdn],
- serverhostname=[self.shortname],
- )
- self.exists = True
-
- def check_create(self, result):
- """Check `host_add` command result"""
- assert_deepequal(dict(
- value=self.fqdn,
- summary=u'Added host "%s"' % self.fqdn,
- result=self.filter_attrs(self.create_keys),
- ), result)
-
- def check_delete(self, result):
- """Check `host_del` command result"""
- assert_deepequal(dict(
- value=[self.fqdn],
- summary=u'Deleted host "%s"' % self.fqdn,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False, raw=False):
- """Check `host_show` command result"""
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
- assert_deepequal(dict(
- value=self.fqdn,
- summary=None,
- result=expected,
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """Check `host_find` command result"""
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 host matched',
- result=[expected],
- ), result)
-
- def check_update(self, result, extra_keys=()):
- """Check `host_update` command result"""
- assert_deepequal(dict(
- value=self.fqdn,
- summary=u'Modified host "%s"' % self.fqdn,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
-
@pytest.fixture(scope='class')
def host(request):
tracker = HostTracker(name=u'testhost1')
diff --git a/ipatests/test_xmlrpc/test_stageuser_plugin.py b/ipatests/test_xmlrpc/test_stageuser_plugin.py
index 43c59b7c7..4eb968451 100644
--- a/ipatests/test_xmlrpc/test_stageuser_plugin.py
+++ b/ipatests/test_xmlrpc/test_stageuser_plugin.py
@@ -17,17 +17,17 @@ import six
from ipalib import api, errors
-from ipatests.test_xmlrpc.ldaptracker import Tracker
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.xmlrpc_test import (
XMLRPC_test, fuzzy_digits, fuzzy_uuid, fuzzy_password, fuzzy_string,
fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact)
from ipatests.util import (
- assert_equal, assert_deepequal, assert_not_equal, raises)
+ assert_equal, assert_deepequal, assert_not_equal, raises, get_user_dn)
from ipapython.dn import DN
-from ipatests.test_xmlrpc.test_user_plugin import UserTracker, get_user_dn
-from ipatests.test_xmlrpc.test_group_plugin import GroupTracker
+from ipatests.test_xmlrpc.tracker.user_plugin import UserTracker
+from ipatests.test_xmlrpc.tracker.group_plugin import GroupTracker
+from ipatests.test_xmlrpc.tracker.stageuser_plugin import StageUserTracker
if six.PY3:
unicode = str
@@ -84,245 +84,6 @@ options_ok = [
]
-class StageUserTracker(Tracker):
- """ Tracker class for staged user LDAP object
-
- Implements helper functions for host plugin.
- StageUserTracker object stores information about the user.
- """
-
- retrieve_keys = {
- u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
- u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber',
- u'title', u'memberof', u'nsaccountlock', u'memberofindirect',
- u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink',
- u'ipatokenradiususername', u'krbprincipalexpiration',
- u'usercertificate', u'dn', u'has_keytab', u'has_password',
- u'street', u'postalcode', u'facsimiletelephonenumber',
- u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l',
- u'st', u'mobile', u'pager', }
- retrieve_all_keys = retrieve_keys | {
- u'cn', u'ipauniqueid', u'objectclass', u'description',
- u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
-
- create_keys = retrieve_all_keys | {
- u'objectclass', u'ipauniqueid', u'randompassword',
- u'userpassword', u'krbextradata', u'krblastpwdchange',
- u'krbpasswordexpiration', u'krbprincipalkey'}
-
- update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
- activate_keys = retrieve_keys | {
- u'has_keytab', u'has_password', u'nsaccountlock'}
-
- def __init__(self, name, givenname, sn, **kwargs):
- super(StageUserTracker, self).__init__(default_version=None)
- self.uid = name
- self.givenname = givenname
- self.sn = sn
- self.dn = DN(
- ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
-
- self.kwargs = kwargs
-
- def make_create_command(self, options=None, force=None):
- """ Make function that creates a staged user using stageuser-add """
- if options is not None:
- self.kwargs = options
- return self.make_command('stageuser_add', self.uid,
- givenname=self.givenname,
- sn=self.sn, **self.kwargs)
-
- def make_delete_command(self):
- """ Make function that deletes a staged user using stageuser-del """
- return self.make_command('stageuser_del', self.uid)
-
- def make_retrieve_command(self, all=False, raw=False):
- """ Make function that retrieves a staged user using stageuser-show """
- return self.make_command('stageuser_show', self.uid, all=all)
-
- def make_find_command(self, *args, **kwargs):
- """ Make function that finds staged user using stageuser-find """
- return self.make_command('stageuser_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """ Make function that updates staged user using stageuser-mod """
- return self.make_command('stageuser_mod', self.uid, **updates)
-
- def make_activate_command(self):
- """ Make function that activates staged user
- using stageuser-activate """
- return self.make_command('stageuser_activate', self.uid)
-
- def track_create(self):
- """ Update expected state for staged user creation """
- self.attrs = dict(
- dn=self.dn,
- uid=[self.uid],
- givenname=[self.givenname],
- sn=[self.sn],
- homedirectory=[u'/home/%s' % self.uid],
- displayname=[u'%s %s' % (self.givenname, self.sn)],
- cn=[u'%s %s' % (self.givenname, self.sn)],
- initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
- objectclass=objectclasses.user_base,
- description=[u'__no_upg__'],
- ipauniqueid=[u'autogenerate'],
- uidnumber=[u'-1'],
- gidnumber=[u'-1'],
- krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
- mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
- gecos=[u'%s %s' % (self.givenname, self.sn)],
- loginshell=[u'/bin/sh'],
- has_keytab=False,
- has_password=False,
- nsaccountlock=[u'true'],
- )
-
- for key in self.kwargs:
- if key == u'krbprincipalname':
- self.attrs[key] = [u'%s@%s' % (
- (self.kwargs[key].split('@'))[0].lower(),
- (self.kwargs[key].split('@'))[1])]
- elif key == u'manager':
- self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))]
- elif key == u'ipasshpubkey':
- self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp]
- self.attrs[key] = [self.kwargs[key]]
- elif key == u'random' or key == u'userpassword':
- self.attrs[u'krbextradata'] = [fuzzy_string]
- self.attrs[u'krbpasswordexpiration'] = [
- fuzzy_dergeneralizedtime]
- self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime]
- self.attrs[u'krbprincipalkey'] = [fuzzy_string]
- self.attrs[u'userpassword'] = [fuzzy_string]
- self.attrs[u'has_keytab'] = True
- self.attrs[u'has_password'] = True
- if key == u'random':
- self.attrs[u'randompassword'] = fuzzy_string
- else:
- self.attrs[key] = [self.kwargs[key]]
-
- self.exists = True
-
- def check_create(self, result):
- """ Check 'stageuser-add' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Added stage user "%s"' % self.uid,
- result=self.filter_attrs(self.create_keys),
- ), result)
-
- def check_delete(self, result):
- """ Check 'stageuser-del' command result """
- assert_deepequal(dict(
- value=[self.uid],
- summary=u'Deleted stage user "%s"' % self.uid,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False, raw=False):
- """ Check 'stageuser-show' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- # small override because stageuser-find returns different
- # type of nsaccountlock value than DS, but overall the value
- # fits expected result
- if expected[u'nsaccountlock'] == [u'true']:
- expected[u'nsaccountlock'] = True
- elif expected[u'nsaccountlock'] == [u'false']:
- expected[u'nsaccountlock'] = False
-
- assert_deepequal(dict(
- value=self.uid,
- summary=None,
- result=expected,
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """ Check 'stageuser-find' command result """
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- # small override because stageuser-find returns different
- # type of nsaccountlock value than DS, but overall the value
- # fits expected result
- if expected[u'nsaccountlock'] == [u'true']:
- expected[u'nsaccountlock'] = True
- elif expected[u'nsaccountlock'] == [u'false']:
- expected[u'nsaccountlock'] = False
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 user matched',
- result=[expected],
- ), result)
-
- def check_find_nomatch(self, result):
- """ Check 'stageuser-find' command result when no match is expected """
- assert_deepequal(dict(
- count=0,
- truncated=False,
- summary=u'0 users matched',
- result=[],
- ), result)
-
- def check_update(self, result, extra_keys=()):
- """ Check 'stageuser-mod' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Modified stage user "%s"' % self.uid,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- def check_restore_preserved(self, result):
- assert_deepequal(dict(
- value=[self.uid],
- summary=u'Staged user account "%s"' % self.uid,
- result=dict(failed=[]),
- ), result)
-
- def make_fixture_activate(self, request):
- """Make a pytest fixture for a staged user that is to be activated
-
- The fixture ensures the plugin entry does not exist before
- and after the tests that use it. It takes into account
- that the staged user no longer exists after activation,
- therefore the fixture verifies after the tests
- that the staged user doesn't exist instead of deleting it.
- """
- del_command = self.make_delete_command()
- try:
- del_command()
- except errors.NotFound:
- pass
-
- def finish():
- with raises_exact(errors.NotFound(
- reason=u'%s: stage user not found' % self.uid)):
- del_command()
-
- request.addfinalizer(finish)
-
- return self
-
- def create_from_preserved(self, user):
- """ Copies values from preserved user - helper function for
- restoration tests """
- self.attrs = user.attrs
- self.uid = user.uid
- self.givenname = user.givenname
- self.sn = user.sn
- self.dn = DN(
- ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
- self.attrs[u'dn'] = self.dn
-
-
@pytest.fixture(scope='class')
def stageduser(request):
tracker = StageUserTracker(name=u'suser1', givenname=u'staged', sn=u'user')
diff --git a/ipatests/test_xmlrpc/test_user_plugin.py b/ipatests/test_xmlrpc/test_user_plugin.py
index 81185e449..084fb83c4 100644
--- a/ipatests/test_xmlrpc/test_user_plugin.py
+++ b/ipatests/test_xmlrpc/test_user_plugin.py
@@ -23,7 +23,6 @@
Test the `ipalib/plugins/user.py` module.
"""
-import functools
import datetime
import ldap
import re
@@ -31,12 +30,11 @@ import re
from ipalib import api, errors
from ipatests.test_xmlrpc import objectclasses
from ipatests.util import (
- assert_equal, assert_not_equal, raises, assert_deepequal)
+ assert_equal, assert_not_equal, raises)
from xmlrpc_test import (
XMLRPC_test, Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_password,
- fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact)
+ fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc)
from ipapython.dn import DN
-from ipatests.test_xmlrpc.ldaptracker import Tracker
import pytest
user1 = u'tuser1'
@@ -1654,332 +1652,3 @@ class test_denied_bind_with_expired_principal(XMLRPC_test):
krbprincipalexpiration=principal_expiration_string)
self.connection.simple_bind_s(str(get_user_dn(user1)), self.password)
-
-
-class UserTracker(Tracker):
- """ Class for host plugin like tests """
-
- retrieve_keys = {
- u'uid', u'givenname', u'sn', u'homedirectory',
- u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou',
- u'telephonenumber', u'title', u'memberof',
- u'memberofindirect', u'ipauserauthtype', u'userclass',
- u'ipatokenradiusconfiglink', u'ipatokenradiususername',
- u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab',
- u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber',
- u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock',
- u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata',
- u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st'
- }
-
- retrieve_all_keys = retrieve_keys | {
- u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry',
- u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
-
- retrieve_preserved_keys = retrieve_keys - {u'memberof_group'}
- retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'}
-
- create_keys = retrieve_all_keys | {
- u'randompassword', u'mepmanagedentry',
- u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange',
- u'krbprincipalkey', u'randompassword', u'userpassword'
- }
- update_keys = retrieve_keys - {u'dn'}
- activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password',
- u'nsaccountlock', u'sshpubkeyfp'}
-
- find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
- find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'}
-
- def __init__(self, name, givenname, sn, **kwargs):
- super(UserTracker, self).__init__(default_version=None)
- self.uid = name
- self.givenname = givenname
- self.sn = sn
- self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn)
-
- self.kwargs = kwargs
-
- def make_create_command(self, force=None):
- """ Make function that crates a user using user-add """
- return self.make_command(
- 'user_add', self.uid,
- givenname=self.givenname,
- sn=self.sn, **self.kwargs
- )
-
- def make_delete_command(self, no_preserve=True, preserve=False):
- """ Make function that deletes a user using user-del """
-
- if preserve and not no_preserve:
- # necessary to change some user attributes due to moving
- # to different container
- self.attrs[u'dn'] = DN(
- ('uid', self.uid),
- api.env.container_deleteuser,
- api.env.basedn
- )
- self.attrs[u'objectclass'] = objectclasses.user_base
-
- return self.make_command(
- 'user_del', self.uid,
- no_preserve=no_preserve,
- preserve=preserve
- )
-
- def make_retrieve_command(self, all=False, raw=False):
- """ Make function that retrieves a user using user-show """
- return self.make_command('user_show', self.uid, all=all)
-
- def make_find_command(self, *args, **kwargs):
- """ Make function that finds user using user-find """
- return self.make_command('user_find', *args, **kwargs)
-
- def make_update_command(self, updates):
- """ Make function that updates user using user-mod """
- return self.make_command('user_mod', self.uid, **updates)
-
- def make_undelete_command(self):
- """ Make function that activates preserved user using user-undel """
- return self.make_command('user_undel', self.uid)
-
- def make_enable_command(self):
- """ Make function that enables user using user-enable """
- return self.make_command('user_enable', self.uid)
-
- def make_stage_command(self):
- """ Make function that restores preserved user by moving it to
- staged container """
- return self.make_command('user_stage', self.uid)
-
- def track_create(self):
- """ Update expected state for user creation """
- self.attrs = dict(
- dn=self.dn,
- uid=[self.uid],
- givenname=[self.givenname],
- sn=[self.sn],
- homedirectory=[u'/home/%s' % self.uid],
- displayname=[u'%s %s' % (self.givenname, self.sn)],
- cn=[u'%s %s' % (self.givenname, self.sn)],
- initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
- objectclass=objectclasses.user,
- description=[u'__no_upg__'],
- ipauniqueid=[fuzzy_uuid],
- uidnumber=[fuzzy_digits],
- gidnumber=[fuzzy_digits],
- krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
- mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
- gecos=[u'%s %s' % (self.givenname, self.sn)],
- loginshell=[u'/bin/sh'],
- has_keytab=False,
- has_password=False,
- mepmanagedentry=[get_group_dn(self.uid)],
- memberof_group=[u'ipausers'],
- )
-
- for key in self.kwargs:
- if key == u'krbprincipalname':
- self.attrs[key] = [u'%s@%s' % (
- (self.kwargs[key].split('@'))[0].lower(),
- (self.kwargs[key].split('@'))[1]
- )]
- else:
- self.attrs[key] = [self.kwargs[key]]
-
- self.exists = True
-
- def check_create(self, result):
- """ Check 'user-add' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Added user "%s"' % self.uid,
- result=self.filter_attrs(self.create_keys),
- ), result)
-
- def check_delete(self, result):
- """ Check 'user-del' command result """
- assert_deepequal(dict(
- value=[self.uid],
- summary=u'Deleted user "%s"' % self.uid,
- result=dict(failed=[]),
- ), result)
-
- def check_retrieve(self, result, all=False):
- """ Check 'user-show' command result """
-
- if u'preserved' in self.attrs and self.attrs[u'preserved']:
- self.retrieve_all_keys = self.retrieve_preserved_all_keys
- self.retrieve_keys = self.retrieve_preserved_keys
- elif u'preserved' not in self.attrs and all:
- self.attrs[u'preserved'] = False
-
- if all:
- expected = self.filter_attrs(self.retrieve_all_keys)
- else:
- expected = self.filter_attrs(self.retrieve_keys)
-
- # small override because stageuser-find returns different type
- # of nsaccountlock value than DS, but overall the value fits
- # expected result
- if u'nsaccountlock' in expected:
- if expected[u'nsaccountlock'] == [u'true']:
- expected[u'nsaccountlock'] = True
- elif expected[u'nsaccountlock'] == [u'false']:
- expected[u'nsaccountlock'] = False
-
- assert_deepequal(dict(
- value=self.uid,
- summary=None,
- result=expected,
- ), result)
-
- def check_find(self, result, all=False, raw=False):
- """ Check 'user-find' command result """
- self.attrs[u'nsaccountlock'] = True
- self.attrs[u'preserved'] = True
-
- if all:
- expected = self.filter_attrs(self.find_all_keys)
- else:
- expected = self.filter_attrs(self.find_keys)
-
- assert_deepequal(dict(
- count=1,
- truncated=False,
- summary=u'1 user matched',
- result=[expected],
- ), result)
-
- def check_find_nomatch(self, result):
- """ Check 'user-find' command result when no user should be found """
- assert_deepequal(dict(
- count=0,
- truncated=False,
- summary=u'0 users matched',
- result=[],
- ), result)
-
- def check_update(self, result, extra_keys=()):
- """ Check 'user-mod' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Modified user "%s"' % self.uid,
- result=self.filter_attrs(self.update_keys | set(extra_keys))
- ), result)
-
- def create_from_staged(self, stageduser):
- """ Copies attributes from staged user - helper function for
- activation tests """
- self.attrs = stageduser.attrs
- self.uid = stageduser.uid
- self.givenname = stageduser.givenname
- self.sn = stageduser.sn
-
- self.attrs[u'mepmanagedentry'] = None
- self.attrs[u'dn'] = self.dn
- self.attrs[u'ipauniqueid'] = [fuzzy_uuid]
- self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % (
- api.env.container_group, api.env.basedn
- )]
- self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % (
- self.uid, api.env.container_group, api.env.basedn
- )]
- self.attrs[u'objectclass'] = objectclasses.user
- if self.attrs[u'gidnumber'] == [u'-1']:
- self.attrs[u'gidnumber'] = [fuzzy_digits]
- if self.attrs[u'uidnumber'] == [u'-1']:
- self.attrs[u'uidnumber'] = [fuzzy_digits]
-
- if u'ipasshpubkey' in self.kwargs:
- self.attrs[u'ipasshpubkey'] = [str(
- self.kwargs[u'ipasshpubkey']
- )]
-
- def check_activate(self, result):
- """ Check 'stageuser-activate' command result """
- expected = dict(
- value=self.uid,
- summary=u'Stage user %s activated' % self.uid,
- result=self.filter_attrs(self.activate_keys))
-
- # work around to eliminate inconsistency in returned objectclass
- # (case sensitive assertion)
- expected['result']['objectclass'] = [item.lower() for item in
- expected['result']['objectclass']]
- result['result']['objectclass'] = [item.lower() for item in
- result['result']['objectclass']]
-
- assert_deepequal(expected, result)
-
- self.exists = True
-
- def check_undel(self, result):
- """ Check 'user-undel' command result """
- assert_deepequal(dict(
- value=self.uid,
- summary=u'Undeleted user account "%s"' % self.uid,
- result=True
- ), result)
-
- def track_delete(self, preserve=False):
- """Update expected state for host deletion"""
- if preserve:
- self.exists = True
- if u'memberof_group' in self.attrs:
- del self.attrs[u'memberof_group']
- self.attrs[u'nsaccountlock'] = True
- self.attrs[u'preserved'] = True
- else:
- self.exists = False
- self.attrs = {}
-
- def make_preserved_user(self):
- """ 'Creates' a preserved user necessary for some tests """
- self.ensure_exists()
- self.track_delete(preserve=True)
- command = self.make_delete_command(no_preserve=False, preserve=True)
- result = command()
- self.check_delete(result)
-
- def check_attr_preservation(self, expected):
- """ Verifies that ipaUniqueID, uidNumber and gidNumber are
- preserved upon reactivation. Also verifies that resulting
- active user is a member of ipausers group only."""
- command = self.make_retrieve_command(all=True)
- result = command()
-
- assert_deepequal(dict(
- ipauniqueid=result[u'result'][u'ipauniqueid'],
- uidnumber=result[u'result'][u'uidnumber'],
- gidnumber=result[u'result'][u'gidnumber']
- ), expected)
-
- if (u'memberof_group' not in result[u'result'] or
- result[u'result'][u'memberof_group'] != (u'ipausers',)):
- assert False
-
- def make_fixture_restore(self, request):
- """Make a pytest fixture for a preserved user that is to be moved to
- staged area.
-
- The fixture ensures the plugin entry does not exist before
- and after the tests that use it. It takes into account
- that the preserved user no longer exists after restoring it,
- therefore the fixture verifies after the tests
- that the preserved user doesn't exist instead of deleting it.
- """
- del_command = self.make_delete_command()
- try:
- del_command()
- except errors.NotFound:
- pass
-
- def finish():
- with raises_exact(errors.NotFound(
- reason=u'%s: user not found' % self.uid)):
- del_command()
-
- request.addfinalizer(finish)
-
- return self
diff --git a/ipatests/test_xmlrpc/tracker/__init__.py b/ipatests/test_xmlrpc/tracker/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/__init__.py
diff --git a/ipatests/test_xmlrpc/ldaptracker.py b/ipatests/test_xmlrpc/tracker/base.py
index acd382dd3..acd382dd3 100644
--- a/ipatests/test_xmlrpc/ldaptracker.py
+++ b/ipatests/test_xmlrpc/tracker/base.py
diff --git a/ipatests/test_xmlrpc/tracker/caacl_plugin.py b/ipatests/test_xmlrpc/tracker/caacl_plugin.py
new file mode 100644
index 000000000..afe7ee0c0
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/caacl_plugin.py
@@ -0,0 +1,367 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipalib import errors
+from ipatests.util import assert_deepequal
+from ipatests.test_xmlrpc.xmlrpc_test import (fuzzy_caacldn, fuzzy_uuid,
+ fuzzy_ipauniqueid)
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.tracker.base import Tracker
+
+
+class CAACLTracker(Tracker):
+ """Tracker class for CA ACL LDAP object.
+
+ The class implements methods required by the base class
+ to help with basic CRUD operations.
+
+ Methods for adding and deleting actual member entries into an ACL
+ do not have check methods as these would make the class
+ unnecessarily complicated. The checks are implemented as
+ a standalone test suite. However, this makes the test crucial
+ in debugging more complicated test cases. Since the add/remove
+ operation won't be checked right away by the tracker, a problem
+ in this operation may propagate into more complicated test case.
+
+ It is possible to pass a list of member uids to these methods.
+
+ The test uses instances of Fuzzy class to compare results as they
+ are in the UUID format. The dn and rdn properties were modified
+ to reflect this as well.
+ """
+
+ member_keys = {
+ u'memberuser_user', u'memberuser_group',
+ u'memberhost_host', u'memberhost_hostgroup',
+ u'memberservice_service',
+ u'ipamembercertprofile_certprofile'}
+ category_keys = {
+ u'ipacacategory', u'ipacertprofilecategory', u'usercategory',
+ u'hostcategory', u'servicecategory'}
+ retrieve_keys = {
+ u'dn', u'cn', u'description', u'ipaenabledflag',
+ u'ipamemberca', u'ipamembercertprofile', u'memberuser',
+ u'memberhost', u'memberservice'} | member_keys | category_keys
+ retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'}
+ create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory',
+ u'usercategory', u'hostcategory', u'ipacacategory',
+ u'servicecategory', u'ipaenabledflag', u'objectclass',
+ u'ipauniqueid'}
+ update_keys = create_keys - {u'dn'}
+
+ def __init__(self, name, ipacertprofile_category=None, user_category=None,
+ service_category=None, host_category=None, description=None,
+ default_version=None):
+ super(CAACLTracker, self).__init__(default_version=default_version)
+
+ self._name = name
+ self.description = description
+ self._categories = dict(
+ ipacertprofilecategory=ipacertprofile_category,
+ usercategory=user_category,
+ servicecategory=service_category,
+ hostcategory=host_category)
+
+ self.dn = fuzzy_caacldn
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def rdn(self):
+ return fuzzy_ipauniqueid
+
+ @property
+ def categories(self):
+ """To be used in track_create"""
+ return {cat: v for cat, v in self._categories.items() if v}
+
+ @property
+ def create_categories(self):
+ """ Return the categories set on create.
+ Unused categories are left out.
+ """
+ return {cat: [v] for cat, v in self.categories.items() if v}
+
+ def make_create_command(self, force=True):
+ return self.make_command(u'caacl_add', self.name,
+ description=self.description,
+ **self.categories)
+
+ def check_create(self, result):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Added CA ACL "{}"'.format(self.name),
+ result=dict(self.filter_attrs(self.create_keys))
+ ), result)
+
+ def track_create(self):
+ self.attrs = dict(
+ dn=self.dn,
+ ipauniqueid=[fuzzy_uuid],
+ cn=[self.name],
+ objectclass=objectclasses.caacl,
+ ipaenabledflag=[u'TRUE'])
+
+ self.attrs.update(self.create_categories)
+ if self.description:
+ self.attrs.update({u'description', [self.description]})
+
+ self.exists = True
+
+ def make_delete_command(self):
+ return self.make_command('caacl_del', self.name)
+
+ def check_delete(self, result):
+ assert_deepequal(dict(
+ value=[self.name],
+ summary=u'Deleted CA ACL "{}"'.format(self.name),
+ result=dict(failed=[])
+ ), result)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ return self.make_command('caacl_show', self.name, all=all, raw=raw)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.name,
+ summary=None,
+ result=expected
+ ), result)
+
+ def make_find_command(self, *args, **kwargs):
+ return self.make_command('caacl_find', *args, **kwargs)
+
+ def check_find(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 CA ACL matched',
+ result=[expected]
+ ), result)
+
+ def make_update_command(self, updates):
+ return self.make_command('caacl_mod', self.name, **updates)
+
+ def update(self, updates, expected_updates=None, silent=False):
+ """If removing a category, delete it from tracker as well"""
+ # filter out empty categories and track changes
+
+ filtered_updates = dict()
+ for key, value in updates.items():
+ if key in self.category_keys:
+ if not value:
+ try:
+ del self.attrs[key]
+ except IndexError:
+ if silent:
+ pass
+ else:
+ # if there is a value, prepare the pair for update
+ filtered_updates.update({key: value})
+ else:
+ filtered_updates.update({key: value})
+
+ if expected_updates is None:
+ expected_updates = {}
+
+ command = self.make_update_command(updates)
+
+ try:
+ result = command()
+ except errors.EmptyModlist:
+ if silent:
+ self.attrs.update(filtered_updates)
+ self.attrs.update(expected_updates)
+ self.check_update(result,
+ extra_keys=set(self.update_keys) |
+ set(expected_updates.keys()))
+
+ def check_update(self, result, extra_keys=()):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Modified CA ACL "{}"'.format(self.name),
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ # Helper methods for caacl subcommands. The check methods will be
+ # implemented in standalone test
+ #
+ # The methods implemented here will be:
+ # caacl_{add,remove}_{host, service, certprofile, user [, subca]}
+
+ def _add_acl_component(self, command_name, keys, track):
+ """ Add a resource into ACL rule and track it.
+
+ command_name - the name in the API
+ keys = {
+ 'tracker_attr': {
+ 'api_key': 'value'
+ }
+ }
+
+ e.g.
+
+ keys = {
+ 'memberhost_host': {
+ 'host': 'hostname'
+ },
+ 'memberhost_hostgroup': {
+ 'hostgroup': 'hostgroup_name'
+ }
+ }
+ """
+
+ if not self.exists:
+ raise errors.NotFound(reason="The tracked entry doesn't exist.")
+
+ command = self.make_command(command_name, self.name)
+ command_options = dict()
+
+ # track
+ for tracker_attr in keys:
+ api_options = keys[tracker_attr]
+ if track:
+ for option in api_options:
+ try:
+ if type(option) in (list, tuple):
+ self.attrs[tracker_attr].extend(api_options[option])
+ else:
+ self.attrs[tracker_attr].append(api_options[option])
+ except KeyError:
+ if type(option) in (list, tuple):
+ self.attrs[tracker_attr] = api_options[option]
+ else:
+ self.attrs[tracker_attr] = [api_options[option]]
+ # prepare options for the command call
+ command_options.update(api_options)
+
+ return command(**command_options)
+
+ def _remove_acl_component(self, command_name, keys, track):
+ """ Remove a resource from ACL rule and track it.
+
+ command_name - the name in the API
+ keys = {
+ 'tracker_attr': {
+ 'api_key': 'value'
+ }
+ }
+
+ e.g.
+
+ keys = {
+ 'memberhost_host': {
+ 'host': 'hostname'
+ },
+ 'memberhost_hostgroup': {
+ 'hostgroup': 'hostgroup_name'
+ }
+ }
+ """
+ command = self.make_command(command_name, self.name)
+ command_options = dict()
+
+ for tracker_attr in keys:
+ api_options = keys[tracker_attr]
+ if track:
+ for option in api_options:
+ if type(option) in (list, tuple):
+ for item in option:
+ self.attrs[tracker_attr].remove(item)
+ else:
+ self.attrs[tracker_attr].remove(api_options[option])
+ if len(self.attrs[tracker_attr]) == 0:
+ del self.attrs[tracker_attr]
+ command_options.update(api_options)
+
+ return command(**command_options)
+
+ def add_host(self, host=None, hostgroup=None, track=True):
+ """Associates an host or hostgroup entry with the ACL.
+
+ The command takes an unicode string with the name
+ of the entry (RDN).
+
+ It is the responsibility of a test writer to provide
+ the correct value, object type as the method does not
+ verify whether the entry exists.
+
+ The method can add only one entry of each type
+ in one call.
+ """
+
+ options = {
+ u'memberhost_host': {u'host': host},
+ u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
+
+ return self._add_acl_component(u'caacl_add_host', options, track)
+
+ def remove_host(self, host=None, hostgroup=None, track=True):
+ options = {
+ u'memberhost_host': {u'host': host},
+ u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
+
+ return self._remove_acl_component(u'caacl_remove_host', options, track)
+
+ def add_user(self, user=None, group=None, track=True):
+ options = {
+ u'memberuser_user': {u'user': user},
+ u'memberuser_group': {u'group': group}}
+
+ return self._add_acl_component(u'caacl_add_user', options, track)
+
+ def remove_user(self, user=None, group=None, track=True):
+ options = {
+ u'memberuser_user': {u'user': user},
+ u'memberuser_group': {u'group': group}}
+
+ return self._remove_acl_component(u'caacl_remove_user', options, track)
+
+ def add_service(self, service=None, track=True):
+ options = {
+ u'memberservice_service': {u'service': service}}
+
+ return self._add_acl_component(u'caacl_add_service', options, track)
+
+ def remove_service(self, service=None, track=True):
+ options = {
+ u'memberservice_service': {u'service': service}}
+
+ return self._remove_acl_component(u'caacl_remove_service', options, track)
+
+ def add_profile(self, certprofile=None, track=True):
+ options = {
+ u'ipamembercertprofile_certprofile':
+ {u'certprofile': certprofile}}
+
+ return self._add_acl_component(u'caacl_add_profile', options, track)
+
+ def remove_profile(self, certprofile=None, track=True):
+ options = {
+ u'ipamembercertprofile_certprofile':
+ {u'certprofile': certprofile}}
+
+ return self._remove_acl_component(u'caacl_remove_profile', options, track)
+
+ def enable(self):
+ command = self.make_command(u'caacl_enable', self.name)
+ self.attrs.update({u'ipaenabledflag': [u'TRUE']})
+ command()
+
+ def disable(self):
+ command = self.make_command(u'caacl_disable', self.name)
+ self.attrs.update({u'ipaenabledflag': [u'FALSE']})
+ command()
diff --git a/ipatests/test_xmlrpc/tracker/certprofile_plugin.py b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py
new file mode 100644
index 000000000..eeb27eb14
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py
@@ -0,0 +1,133 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+import os
+
+
+from ipapython.dn import DN
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.util import assert_deepequal
+
+
+class CertprofileTracker(Tracker):
+ """Tracker class for certprofile plugin.
+ """
+
+ retrieve_keys = {
+ 'dn', 'cn', 'description', 'ipacertprofilestoreissued'
+ }
+ retrieve_all_keys = retrieve_keys | {'objectclass'}
+ create_keys = retrieve_keys | {'objectclass'}
+ update_keys = retrieve_keys - {'dn'}
+ managedby_keys = retrieve_keys
+ allowedto_keys = retrieve_keys
+
+ def __init__(self, name, store=False, desc='dummy description',
+ profile=None, default_version=None):
+ super(CertprofileTracker, self).__init__(
+ default_version=default_version
+ )
+
+ self.store = store
+ self.description = desc
+ self._profile_path = profile
+
+ self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca',
+ self.api.env.basedn)
+
+ @property
+ def profile(self):
+ if not self._profile_path:
+ return None
+
+ if os.path.isabs(self._profile_path):
+ path = self._profile_path
+ else:
+ path = os.path.join(os.path.dirname(__file__),
+ self._profile_path)
+
+ with open(path, 'r') as f:
+ content = f.read()
+ return unicode(content)
+
+ def make_create_command(self, force=True):
+ if not self.profile:
+ raise RuntimeError('Tracker object without path to profile '
+ 'cannot be used to create profile entry.')
+
+ return self.make_command('certprofile_import', self.name,
+ description=self.description,
+ ipacertprofilestoreissued=self.store,
+ file=self.profile)
+
+ def check_create(self, result):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Imported profile "{}"'.format(self.name),
+ result=dict(self.filter_attrs(self.create_keys))
+ ), result)
+
+ def track_create(self):
+ self.attrs = dict(
+ dn=unicode(self.dn),
+ cn=[self.name],
+ description=[self.description],
+ ipacertprofilestoreissued=[unicode(self.store).upper()],
+ objectclass=objectclasses.certprofile
+ )
+ self.exists = True
+
+ def make_delete_command(self):
+ return self.make_command('certprofile_del', self.name)
+
+ def check_delete(self, result):
+ assert_deepequal(dict(
+ value=[self.name], # correctly a list?
+ summary=u'Deleted profile "{}"'.format(self.name),
+ result=dict(failed=[]),
+ ), result)
+
+ def make_retrieve_command(self, all=False, raw=False, **options):
+ return self.make_command('certprofile_show', self.name, all=all,
+ raw=raw, **options)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.name,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def make_find_command(self, *args, **kwargs):
+ return self.make_command('certprofile_find', *args, **kwargs)
+
+ def check_find(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 profile matched',
+ result=[expected]
+ ), result)
+
+ def make_update_command(self, updates):
+ return self.make_command('certprofile_mod', self.name, **updates)
+
+ def check_update(self, result, extra_keys=()):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Modified Certificate Profile "{}"'.format(self.name),
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
diff --git a/ipatests/test_xmlrpc/tracker/group_plugin.py b/ipatests/test_xmlrpc/tracker/group_plugin.py
new file mode 100644
index 000000000..c47ce8ecf
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/group_plugin.py
@@ -0,0 +1,196 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid
+
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.util import assert_deepequal, get_group_dn
+
+
+class GroupTracker(Tracker):
+ """ Class for host plugin like tests """
+ retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user',
+ u'member_group'}
+ retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'}
+
+ create_keys = retrieve_all_keys
+ update_keys = retrieve_keys - {u'dn'}
+
+ add_member_keys = retrieve_keys | {u'description'}
+
+ def __init__(self, name):
+ super(GroupTracker, self).__init__(default_version=None)
+ self.cn = name
+ self.dn = get_group_dn(name)
+
+ def make_create_command(self, nonposix=False, external=False,
+ force=True):
+ """ Make function that creates a group using 'group-add' """
+ return self.make_command('group_add', self.cn,
+ nonposix=nonposix, external=external)
+
+ def make_delete_command(self):
+ """ Make function that deletes a group using 'group-del' """
+ return self.make_command('group_del', self.cn)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a group using 'group-show' """
+ return self.make_command('group_show', self.cn, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that searches for a group using 'group-find' """
+ return self.make_command('group_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates a group using 'group-mod' """
+ return self.make_command('group_mod', self.cn, **updates)
+
+ def make_add_member_command(self, options={}):
+ """ Make function that adds a member to a group
+ Attention: only works for one user OR group! """
+ if u'user' in options:
+ self.attrs[u'member_user'] = [options[u'user']]
+ elif u'group' in options:
+ self.attrs[u'member_group'] = [options[u'group']]
+ self.adds = options
+
+ return self.make_command('group_add_member', self.cn, **options)
+
+ def make_remove_member_command(self, options={}):
+ """ Make function that removes a member from a group
+ Attention: only works for one user OR group! """
+ if u'user' in options:
+ del self.attrs[u'member_user']
+ elif u'group' in options:
+ del self.attrs[u'member_group']
+ return self.make_command('group_remove_member', self.cn, **options)
+
+ def make_detach_command(self):
+ """ Make function that detaches a managed group using
+ 'group-detach' """
+ self.exists = True
+ return self.make_command('group_detach', self.cn)
+
+ def track_create(self):
+ """ Updates expected state for group creation"""
+ self.attrs = dict(
+ dn=get_group_dn(self.cn),
+ cn=[self.cn],
+ gidnumber=[fuzzy_digits],
+ ipauniqueid=[fuzzy_uuid],
+ objectclass=objectclasses.posixgroup,
+ )
+ self.exists = True
+
+ def check_create(self, result):
+ """ Checks 'group_add' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Added group "%s"' % self.cn,
+ result=self.filter_attrs(self.create_keys)
+ ), result)
+
+ def check_delete(self, result):
+ """ Checks 'group_del' command result """
+ assert_deepequal(dict(
+ value=[self.cn],
+ summary=u'Deleted group "%s"' % self.cn,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """ Checks 'group_show' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=None,
+ result=expected
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Checks 'group_find' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 group matched',
+ result=[expected],
+ ), result)
+
+ def check_update(self, result, extra_keys={}):
+ """ Checks 'group_mod' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Modified group "%s"' % self.cn,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def check_add_member(self, result):
+ """ Checks 'group_add_member' command result """
+ assert_deepequal(dict(
+ completed=1,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ ), result)
+
+ def check_add_member_negative(self, result):
+ """ Checks 'group_add_member' command result when expected result
+ is failure of the operation"""
+ if u'member_user' in self.attrs:
+ del self.attrs[u'member_user']
+ elif u'member_group' in self.attrs:
+ del self.attrs[u'member_group']
+
+ expected = dict(
+ completed=0,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ )
+ if u'user' in self.adds:
+ expected[u'failed'][u'member'][u'user'] = [(
+ self.adds[u'user'], u'no such entry')]
+ elif u'group' in self.adds:
+ expected[u'failed'][u'member'][u'group'] = [(
+ self.adds[u'group'], u'no such entry')]
+
+ assert_deepequal(expected, result)
+
+ def check_remove_member(self, result):
+ """ Checks 'group_remove_member' command result """
+ assert_deepequal(dict(
+ completed=1,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ ), result)
+
+ def check_detach(self, result):
+ """ Checks 'group_detach' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Detached group "%s" from user "%s"' % (
+ self.cn, self.cn),
+ result=True
+ ), result)
+
+ def make_fixture_detach(self, request):
+ """Make a pytest fixture for this tracker
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use itself.
+ """
+ def cleanup():
+ pass
+
+ request.addfinalizer(cleanup)
+
+ return self
diff --git a/ipatests/test_xmlrpc/tracker/host_plugin.py b/ipatests/test_xmlrpc/tracker/host_plugin.py
new file mode 100644
index 000000000..bf199f4f5
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/host_plugin.py
@@ -0,0 +1,154 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from __future__ import print_function
+
+
+from ipapython.dn import DN
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_uuid
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.util import assert_deepequal
+
+
+class HostTracker(Tracker):
+ """Wraps and tracks modifications to a Host object
+
+ Implements the helper functions for host plugin.
+
+ The HostTracker object stores information about the host, e.g.
+ ``fqdn`` and ``dn``.
+ """
+ retrieve_keys = {
+ 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host',
+ 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint',
+ 'serial_number', 'serial_number_hex', 'sha1_fingerprint',
+ 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before',
+ 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user',
+ 'ipaallowedtoperform_read_keys_group',
+ 'ipaallowedtoperform_read_keys_host',
+ 'ipaallowedtoperform_read_keys_hostgroup',
+ 'ipaallowedtoperform_write_keys_user',
+ 'ipaallowedtoperform_write_keys_group',
+ 'ipaallowedtoperform_write_keys_host',
+ 'ipaallowedtoperform_write_keys_hostgroup'}
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid',
+ u'managing_host', u'objectclass', u'serverhostname'}
+ create_keys = retrieve_keys | {'objectclass', 'ipauniqueid',
+ 'randompassword'}
+ update_keys = retrieve_keys - {'dn'}
+ managedby_keys = retrieve_keys - {'has_keytab', 'has_password'}
+ allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
+
+ def __init__(self, name, fqdn=None, default_version=None):
+ super(HostTracker, self).__init__(default_version=default_version)
+
+ self.shortname = name
+ if fqdn:
+ self.fqdn = fqdn
+ else:
+ self.fqdn = u'%s.%s' % (name, self.api.env.domain)
+ self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts',
+ self.api.env.basedn)
+
+ self.description = u'Test host <%s>' % name
+ self.location = u'Undisclosed location <%s>' % name
+
+ def make_create_command(self, force=True):
+ """Make function that creates this host using host_add"""
+ return self.make_command('host_add', self.fqdn,
+ description=self.description,
+ l=self.location,
+ force=force)
+
+ def make_delete_command(self):
+ """Make function that deletes the host using host_del"""
+ return self.make_command('host_del', self.fqdn)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """Make function that retrieves the host using host_show"""
+ return self.make_command('host_show', self.fqdn, all=all, raw=raw)
+
+ def make_find_command(self, *args, **kwargs):
+ """Make function that finds hosts using host_find
+
+ Note that the fqdn (or other search terms) needs to be specified
+ in arguments.
+ """
+ return self.make_command('host_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """Make function that modifies the host using host_mod"""
+ return self.make_command('host_mod', self.fqdn, **updates)
+
+ def track_create(self):
+ """Update expected state for host creation"""
+ self.attrs = dict(
+ dn=self.dn,
+ fqdn=[self.fqdn],
+ description=[self.description],
+ l=[self.location],
+ krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)],
+ objectclass=objectclasses.host,
+ ipauniqueid=[fuzzy_uuid],
+ managedby_host=[self.fqdn],
+ has_keytab=False,
+ has_password=False,
+ cn=[self.fqdn],
+ ipakrbokasdelegate=False,
+ ipakrbrequirespreauth=True,
+ managing_host=[self.fqdn],
+ serverhostname=[self.shortname],
+ )
+ self.exists = True
+
+ def check_create(self, result):
+ """Check `host_add` command result"""
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=u'Added host "%s"' % self.fqdn,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """Check `host_del` command result"""
+ assert_deepequal(dict(
+ value=[self.fqdn],
+ summary=u'Deleted host "%s"' % self.fqdn,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """Check `host_show` command result"""
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """Check `host_find` command result"""
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 host matched',
+ result=[expected],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """Check `host_update` command result"""
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=u'Modified host "%s"' % self.fqdn,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
diff --git a/ipatests/test_xmlrpc/tracker/stageuser_plugin.py b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
new file mode 100644
index 000000000..09ad282e4
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
@@ -0,0 +1,267 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+import six
+
+from ipalib import api, errors
+
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import (
+ fuzzy_string, fuzzy_dergeneralizedtime, raises_exact)
+
+from ipatests.util import assert_deepequal, get_user_dn
+from ipapython.dn import DN
+
+if six.PY3:
+ unicode = str
+
+sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X'
+ 'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI'
+ 'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2'
+ 'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm'
+ 'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01'
+ '9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF'
+ '0L public key test')
+sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B '
+ 'public key test (ssh-rsa)')
+
+
+class StageUserTracker(Tracker):
+ """ Tracker class for staged user LDAP object
+
+ Implements helper functions for host plugin.
+ StageUserTracker object stores information about the user.
+ """
+
+ retrieve_keys = {
+ u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
+ u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber',
+ u'title', u'memberof', u'nsaccountlock', u'memberofindirect',
+ u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink',
+ u'ipatokenradiususername', u'krbprincipalexpiration',
+ u'usercertificate', u'dn', u'has_keytab', u'has_password',
+ u'street', u'postalcode', u'facsimiletelephonenumber',
+ u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l',
+ u'st', u'mobile', u'pager', }
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipauniqueid', u'objectclass', u'description',
+ u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+
+ create_keys = retrieve_all_keys | {
+ u'objectclass', u'ipauniqueid', u'randompassword',
+ u'userpassword', u'krbextradata', u'krblastpwdchange',
+ u'krbpasswordexpiration', u'krbprincipalkey'}
+
+ update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
+ activate_keys = retrieve_keys | {
+ u'has_keytab', u'has_password', u'nsaccountlock'}
+
+ def __init__(self, name, givenname, sn, **kwargs):
+ super(StageUserTracker, self).__init__(default_version=None)
+ self.uid = name
+ self.givenname = givenname
+ self.sn = sn
+ self.dn = DN(
+ ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
+
+ self.kwargs = kwargs
+
+ def make_create_command(self, options=None, force=None):
+ """ Make function that creates a staged user using stageuser-add """
+ if options is not None:
+ self.kwargs = options
+ return self.make_command('stageuser_add', self.uid,
+ givenname=self.givenname,
+ sn=self.sn, **self.kwargs)
+
+ def make_delete_command(self):
+ """ Make function that deletes a staged user using stageuser-del """
+ return self.make_command('stageuser_del', self.uid)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a staged user using stageuser-show """
+ return self.make_command('stageuser_show', self.uid, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that finds staged user using stageuser-find """
+ return self.make_command('stageuser_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates staged user using stageuser-mod """
+ return self.make_command('stageuser_mod', self.uid, **updates)
+
+ def make_activate_command(self):
+ """ Make function that activates staged user
+ using stageuser-activate """
+ return self.make_command('stageuser_activate', self.uid)
+
+ def track_create(self):
+ """ Update expected state for staged user creation """
+ self.attrs = dict(
+ dn=self.dn,
+ uid=[self.uid],
+ givenname=[self.givenname],
+ sn=[self.sn],
+ homedirectory=[u'/home/%s' % self.uid],
+ displayname=[u'%s %s' % (self.givenname, self.sn)],
+ cn=[u'%s %s' % (self.givenname, self.sn)],
+ initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+ objectclass=objectclasses.user_base,
+ description=[u'__no_upg__'],
+ ipauniqueid=[u'autogenerate'],
+ uidnumber=[u'-1'],
+ gidnumber=[u'-1'],
+ krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+ mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+ gecos=[u'%s %s' % (self.givenname, self.sn)],
+ loginshell=[u'/bin/sh'],
+ has_keytab=False,
+ has_password=False,
+ nsaccountlock=[u'true'],
+ )
+
+ for key in self.kwargs:
+ if key == u'krbprincipalname':
+ self.attrs[key] = [u'%s@%s' % (
+ (self.kwargs[key].split('@'))[0].lower(),
+ (self.kwargs[key].split('@'))[1])]
+ elif key == u'manager':
+ self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))]
+ elif key == u'ipasshpubkey':
+ self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp]
+ self.attrs[key] = [self.kwargs[key]]
+ elif key == u'random' or key == u'userpassword':
+ self.attrs[u'krbextradata'] = [fuzzy_string]
+ self.attrs[u'krbpasswordexpiration'] = [
+ fuzzy_dergeneralizedtime]
+ self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime]
+ self.attrs[u'krbprincipalkey'] = [fuzzy_string]
+ self.attrs[u'userpassword'] = [fuzzy_string]
+ self.attrs[u'has_keytab'] = True
+ self.attrs[u'has_password'] = True
+ if key == u'random':
+ self.attrs[u'randompassword'] = fuzzy_string
+ else:
+ self.attrs[key] = [self.kwargs[key]]
+
+ self.exists = True
+
+ def check_create(self, result):
+ """ Check 'stageuser-add' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Added stage user "%s"' % self.uid,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """ Check 'stageuser-del' command result """
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Deleted stage user "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """ Check 'stageuser-show' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different
+ # type of nsaccountlock value than DS, but overall the value
+ # fits expected result
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Check 'stageuser-find' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different
+ # type of nsaccountlock value than DS, but overall the value
+ # fits expected result
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 user matched',
+ result=[expected],
+ ), result)
+
+ def check_find_nomatch(self, result):
+ """ Check 'stageuser-find' command result when no match is expected """
+ assert_deepequal(dict(
+ count=0,
+ truncated=False,
+ summary=u'0 users matched',
+ result=[],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """ Check 'stageuser-mod' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Modified stage user "%s"' % self.uid,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def check_restore_preserved(self, result):
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Staged user account "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def make_fixture_activate(self, request):
+ """Make a pytest fixture for a staged user that is to be activated
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it. It takes into account
+ that the staged user no longer exists after activation,
+ therefore the fixture verifies after the tests
+ that the staged user doesn't exist instead of deleting it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def finish():
+ with raises_exact(errors.NotFound(
+ reason=u'%s: stage user not found' % self.uid)):
+ del_command()
+
+ request.addfinalizer(finish)
+
+ return self
+
+ def create_from_preserved(self, user):
+ """ Copies values from preserved user - helper function for
+ restoration tests """
+ self.attrs = user.attrs
+ self.uid = user.uid
+ self.givenname = user.givenname
+ self.sn = user.sn
+ self.dn = DN(
+ ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
+ self.attrs[u'dn'] = self.dn
diff --git a/ipatests/test_xmlrpc/tracker/user_plugin.py b/ipatests/test_xmlrpc/tracker/user_plugin.py
new file mode 100644
index 000000000..af7d85836
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/user_plugin.py
@@ -0,0 +1,340 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipalib import api, errors
+from ipapython.dn import DN
+
+from ipatests.util import assert_deepequal, get_group_dn
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid, raises_exact
+from ipatests.test_xmlrpc.tracker.base import Tracker
+
+
+class UserTracker(Tracker):
+ """ Class for host plugin like tests """
+
+ retrieve_keys = {
+ u'uid', u'givenname', u'sn', u'homedirectory',
+ u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou',
+ u'telephonenumber', u'title', u'memberof',
+ u'memberofindirect', u'ipauserauthtype', u'userclass',
+ u'ipatokenradiusconfiglink', u'ipatokenradiususername',
+ u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab',
+ u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber',
+ u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock',
+ u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata',
+ u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st'
+ }
+
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry',
+ u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+
+ retrieve_preserved_keys = retrieve_keys - {u'memberof_group'}
+ retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'}
+
+ create_keys = retrieve_all_keys | {
+ u'randompassword', u'mepmanagedentry',
+ u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange',
+ u'krbprincipalkey', u'randompassword', u'userpassword'
+ }
+ update_keys = retrieve_keys - {u'dn'}
+ activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password',
+ u'nsaccountlock', u'sshpubkeyfp'}
+
+ find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
+ find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'}
+
+ def __init__(self, name, givenname, sn, **kwargs):
+ super(UserTracker, self).__init__(default_version=None)
+ self.uid = name
+ self.givenname = givenname
+ self.sn = sn
+ self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn)
+
+ self.kwargs = kwargs
+
+ def make_create_command(self, force=None):
+ """ Make function that crates a user using user-add """
+ return self.make_command(
+ 'user_add', self.uid,
+ givenname=self.givenname,
+ sn=self.sn, **self.kwargs
+ )
+
+ def make_delete_command(self, no_preserve=True, preserve=False):
+ """ Make function that deletes a user using user-del """
+
+ if preserve and not no_preserve:
+ # necessary to change some user attributes due to moving
+ # to different container
+ self.attrs[u'dn'] = DN(
+ ('uid', self.uid),
+ api.env.container_deleteuser,
+ api.env.basedn
+ )
+ self.attrs[u'objectclass'] = objectclasses.user_base
+
+ return self.make_command(
+ 'user_del', self.uid,
+ no_preserve=no_preserve,
+ preserve=preserve
+ )
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a user using user-show """
+ return self.make_command('user_show', self.uid, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that finds user using user-find """
+ return self.make_command('user_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates user using user-mod """
+ return self.make_command('user_mod', self.uid, **updates)
+
+ def make_undelete_command(self):
+ """ Make function that activates preserved user using user-undel """
+ return self.make_command('user_undel', self.uid)
+
+ def make_enable_command(self):
+ """ Make function that enables user using user-enable """
+ return self.make_command('user_enable', self.uid)
+
+ def make_stage_command(self):
+ """ Make function that restores preserved user by moving it to
+ staged container """
+ return self.make_command('user_stage', self.uid)
+
+ def track_create(self):
+ """ Update expected state for user creation """
+ self.attrs = dict(
+ dn=self.dn,
+ uid=[self.uid],
+ givenname=[self.givenname],
+ sn=[self.sn],
+ homedirectory=[u'/home/%s' % self.uid],
+ displayname=[u'%s %s' % (self.givenname, self.sn)],
+ cn=[u'%s %s' % (self.givenname, self.sn)],
+ initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+ objectclass=objectclasses.user,
+ description=[u'__no_upg__'],
+ ipauniqueid=[fuzzy_uuid],
+ uidnumber=[fuzzy_digits],
+ gidnumber=[fuzzy_digits],
+ krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+ mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+ gecos=[u'%s %s' % (self.givenname, self.sn)],
+ loginshell=[u'/bin/sh'],
+ has_keytab=False,
+ has_password=False,
+ mepmanagedentry=[get_group_dn(self.uid)],
+ memberof_group=[u'ipausers'],
+ )
+
+ for key in self.kwargs:
+ if key == u'krbprincipalname':
+ self.attrs[key] = [u'%s@%s' % (
+ (self.kwargs[key].split('@'))[0].lower(),
+ (self.kwargs[key].split('@'))[1]
+ )]
+ else:
+ self.attrs[key] = [self.kwargs[key]]
+
+ self.exists = True
+
+ def check_create(self, result):
+ """ Check 'user-add' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Added user "%s"' % self.uid,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """ Check 'user-del' command result """
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Deleted user "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False):
+ """ Check 'user-show' command result """
+
+ if u'preserved' in self.attrs and self.attrs[u'preserved']:
+ self.retrieve_all_keys = self.retrieve_preserved_all_keys
+ self.retrieve_keys = self.retrieve_preserved_keys
+ elif u'preserved' not in self.attrs and all:
+ self.attrs[u'preserved'] = False
+
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different type
+ # of nsaccountlock value than DS, but overall the value fits
+ # expected result
+ if u'nsaccountlock' in expected:
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Check 'user-find' command result """
+ self.attrs[u'nsaccountlock'] = True
+ self.attrs[u'preserved'] = True
+
+ if all:
+ expected = self.filter_attrs(self.find_all_keys)
+ else:
+ expected = self.filter_attrs(self.find_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 user matched',
+ result=[expected],
+ ), result)
+
+ def check_find_nomatch(self, result):
+ """ Check 'user-find' command result when no user should be found """
+ assert_deepequal(dict(
+ count=0,
+ truncated=False,
+ summary=u'0 users matched',
+ result=[],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """ Check 'user-mod' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Modified user "%s"' % self.uid,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def create_from_staged(self, stageduser):
+ """ Copies attributes from staged user - helper function for
+ activation tests """
+ self.attrs = stageduser.attrs
+ self.uid = stageduser.uid
+ self.givenname = stageduser.givenname
+ self.sn = stageduser.sn
+
+ self.attrs[u'mepmanagedentry'] = None
+ self.attrs[u'dn'] = self.dn
+ self.attrs[u'ipauniqueid'] = [fuzzy_uuid]
+ self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % (
+ api.env.container_group, api.env.basedn
+ )]
+ self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % (
+ self.uid, api.env.container_group, api.env.basedn
+ )]
+ self.attrs[u'objectclass'] = objectclasses.user
+ if self.attrs[u'gidnumber'] == [u'-1']:
+ self.attrs[u'gidnumber'] = [fuzzy_digits]
+ if self.attrs[u'uidnumber'] == [u'-1']:
+ self.attrs[u'uidnumber'] = [fuzzy_digits]
+
+ if u'ipasshpubkey' in self.kwargs:
+ self.attrs[u'ipasshpubkey'] = [str(
+ self.kwargs[u'ipasshpubkey']
+ )]
+
+ def check_activate(self, result):
+ """ Check 'stageuser-activate' command result """
+ expected = dict(
+ value=self.uid,
+ summary=u'Stage user %s activated' % self.uid,
+ result=self.filter_attrs(self.activate_keys))
+
+ # work around to eliminate inconsistency in returned objectclass
+ # (case sensitive assertion)
+ expected['result']['objectclass'] = [item.lower() for item in
+ expected['result']['objectclass']]
+ result['result']['objectclass'] = [item.lower() for item in
+ result['result']['objectclass']]
+
+ assert_deepequal(expected, result)
+
+ self.exists = True
+
+ def check_undel(self, result):
+ """ Check 'user-undel' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Undeleted user account "%s"' % self.uid,
+ result=True
+ ), result)
+
+ def track_delete(self, preserve=False):
+ """Update expected state for host deletion"""
+ if preserve:
+ self.exists = True
+ if u'memberof_group' in self.attrs:
+ del self.attrs[u'memberof_group']
+ self.attrs[u'nsaccountlock'] = True
+ self.attrs[u'preserved'] = True
+ else:
+ self.exists = False
+ self.attrs = {}
+
+ def make_preserved_user(self):
+ """ 'Creates' a preserved user necessary for some tests """
+ self.ensure_exists()
+ self.track_delete(preserve=True)
+ command = self.make_delete_command(no_preserve=False, preserve=True)
+ result = command()
+ self.check_delete(result)
+
+ def check_attr_preservation(self, expected):
+ """ Verifies that ipaUniqueID, uidNumber and gidNumber are
+ preserved upon reactivation. Also verifies that resulting
+ active user is a member of ipausers group only."""
+ command = self.make_retrieve_command(all=True)
+ result = command()
+
+ assert_deepequal(dict(
+ ipauniqueid=result[u'result'][u'ipauniqueid'],
+ uidnumber=result[u'result'][u'uidnumber'],
+ gidnumber=result[u'result'][u'gidnumber']
+ ), expected)
+
+ if (u'memberof_group' not in result[u'result'] or
+ result[u'result'][u'memberof_group'] != (u'ipausers',)):
+ assert False
+
+ def make_fixture_restore(self, request):
+ """Make a pytest fixture for a preserved user that is to be moved to
+ staged area.
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it. It takes into account
+ that the preserved user no longer exists after restoring it,
+ therefore the fixture verifies after the tests
+ that the preserved user doesn't exist instead of deleting it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def finish():
+ with raises_exact(errors.NotFound(
+ reason=u'%s: user not found' % self.uid)):
+ del_command()
+
+ request.addfinalizer(finish)
+
+ return self
diff --git a/ipatests/util.py b/ipatests/util.py
index c3c69816e..6aefe74d3 100644
--- a/ipatests/util.py
+++ b/ipatests/util.py
@@ -706,3 +706,9 @@ def change_principal(user, password, client=None, path=None):
client.Backend.rpcclient.disconnect()
client.Backend.rpcclient.connect()
+
+def get_group_dn(cn):
+ return DN(('cn', cn), api.env.container_group, api.env.basedn)
+
+def get_user_dn(uid):
+ return DN(('uid', uid), api.env.container_user, api.env.basedn)