summaryrefslogtreecommitdiffstats
path: root/ipatests/test_xmlrpc/tracker
diff options
context:
space:
mode:
authorMilan KubĂ­k <mkubik@redhat.com>2015-11-19 16:07:29 +0100
committerMartin Basti <mbasti@redhat.com>2015-12-02 17:12:24 +0100
commit17f9ca154b47f1e21797d25435e25676fdca284c (patch)
treefd3f5b2976acd3ca0718c88dbbe35782982be2b6 /ipatests/test_xmlrpc/tracker
parentb8c619a7139bd7b65caa03b68431e22791ff19bf (diff)
downloadfreeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.gz
freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.xz
freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.zip
Separated Tracker implementations into standalone package
The previous way of implementing trackers in the module with the test caused circular imports. The separate package resolves this issue. https://fedorahosted.org/freeipa/ticket/5467 Reviewed-By: Ales 'alich' Marecek <amarecek@redhat.com>
Diffstat (limited to 'ipatests/test_xmlrpc/tracker')
-rw-r--r--ipatests/test_xmlrpc/tracker/__init__.py0
-rw-r--r--ipatests/test_xmlrpc/tracker/base.py289
-rw-r--r--ipatests/test_xmlrpc/tracker/caacl_plugin.py367
-rw-r--r--ipatests/test_xmlrpc/tracker/certprofile_plugin.py133
-rw-r--r--ipatests/test_xmlrpc/tracker/group_plugin.py196
-rw-r--r--ipatests/test_xmlrpc/tracker/host_plugin.py154
-rw-r--r--ipatests/test_xmlrpc/tracker/stageuser_plugin.py267
-rw-r--r--ipatests/test_xmlrpc/tracker/user_plugin.py340
8 files changed, 1746 insertions, 0 deletions
diff --git a/ipatests/test_xmlrpc/tracker/__init__.py b/ipatests/test_xmlrpc/tracker/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/__init__.py
diff --git a/ipatests/test_xmlrpc/tracker/base.py b/ipatests/test_xmlrpc/tracker/base.py
new file mode 100644
index 000000000..acd382dd3
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/base.py
@@ -0,0 +1,289 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+"""
+Implements a base class to track changes to an LDAP object.
+"""
+from __future__ import print_function
+
+import functools
+
+from ipalib import api, errors
+from ipapython.dn import DN
+from ipapython.version import API_VERSION
+from ipatests.util import Fuzzy
+
+
+class Tracker(object):
+ """Wraps and tracks modifications to a plugin LDAP entry object
+
+ Stores a copy of state of a plugin entry object and allows checking that
+ the state in the database is the same as expected.
+ This allows creating independent tests: the individual tests check
+ that the relevant changes have been made. At the same time
+ the entry doesn't need to be recreated and cleaned up for each test.
+
+ Two attributes are used for tracking: ``exists`` (true if the entry is
+ supposed to exist) and ``attrs`` (a dict of LDAP attributes that are
+ expected to be returned from IPA commands).
+
+ For commonly used operations, there is a helper method, e.g.
+ ``create``, ``update``, or ``find``, that does these steps:
+
+ * ensure the entry exists (or does not exist, for "create")
+ * store the expected modifications
+ * get the IPA command to run, and run it
+ * check that the result matches the expected state
+
+ Tests that require customization of these steps are expected to do them
+ manually, using lower-level methods.
+ Especially the first step (ensure the entry exists) is important for
+ achieving independent tests.
+
+ The Tracker object also stores information about the entry, e.g.
+ ``dn``, ``rdn`` and ``name`` which is derived from DN property.
+
+ To use this class, the programer must subclass it and provide the
+ implementation of following methods:
+
+ * make_*_command -- implementing the API call for particular plugin
+ and operation (add, delete, ...)
+ These methods should use the make_command method
+ * check_* commands -- an assertion for a plugin command (CRUD)
+ * track_create -- to make an internal representation of the
+ entry
+
+ Apart from overriding these methods, the subclass must provide the
+ distinguished name of the entry in `self.dn` property.
+
+ It is also required to override the class variables defining the sets
+ of ldap attributes/keys for these operations specific to the plugin
+ being implemented. Take the host plugin test for an example.
+
+ The implementation of these methods is not strictly enforced.
+ A missing method will cause a NotImplementedError during runtime
+ as a result.
+ """
+ retrieve_keys = None
+ retrieve_all_keys = None
+ create_keys = None
+ update_keys = None
+ managedby_keys = None
+ allowedto_keys = None
+
+ _override_me_msg = "This method needs to be overridden in a subclass"
+
+ def __init__(self, default_version=None):
+ self.api = api
+ self.default_version = default_version or API_VERSION
+ self._dn = None
+
+ self.exists = False
+
+ @property
+ def dn(self):
+ """A property containing the distinguished name of the entry."""
+ if not self._dn:
+ raise ValueError('The DN must be set in the init method.')
+ return self._dn
+
+ @dn.setter
+ def dn(self, value):
+ if not (isinstance(value, DN) or isinstance(value, Fuzzy)):
+ raise ValueError('The value must be an instance of DN or Fuzzy.')
+ self._dn = value
+
+ @property
+ def rdn(self):
+ return self.dn[0]
+
+ @property
+ def name(self):
+ """Property holding the name of the entry in LDAP.
+
+ This property is computed in runtime.
+ """
+ return self.rdn.value
+
+ def filter_attrs(self, keys):
+ """Return a dict of expected attrs, filtered by the given keys"""
+ if not self.attrs:
+ raise RuntimeError('The tracker instance has no attributes.')
+ return {k: v for k, v in self.attrs.items() if k in keys}
+
+ def run_command(self, name, *args, **options):
+ """Run the given IPA command
+
+ Logs the command using print for easier debugging
+ """
+ cmd = self.api.Command[name]
+
+ options.setdefault('version', self.default_version)
+
+ args_repr = ', '.join(
+ [repr(a) for a in args] +
+ ['%s=%r' % item for item in list(options.items())])
+ try:
+ result = cmd(*args, **options)
+ except Exception as e:
+ print('Ran command: %s(%s): %s: %s' % (cmd, args_repr,
+ type(e).__name__, e))
+ raise
+ else:
+ print('Ran command: %s(%s): OK' % (cmd, args_repr))
+ return result
+
+ def make_command(self, name, *args, **options):
+ """Make a functools.partial function to run the given command"""
+ return functools.partial(self.run_command, name, *args, **options)
+
+ def make_fixture(self, request):
+ """Make a pytest fixture for this tracker
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def cleanup():
+ existed = self.exists
+ try:
+ del_command()
+ except errors.NotFound:
+ if existed:
+ raise
+ self.exists = False
+
+ request.addfinalizer(cleanup)
+
+ return self
+
+ def ensure_exists(self):
+ """If the entry does not exist (according to tracker state), create it
+ """
+ if not self.exists:
+ self.create(force=True)
+
+ def ensure_missing(self):
+ """If the entry exists (according to tracker state), delete it
+ """
+ if self.exists:
+ self.delete()
+
+ def make_create_command(self, force=True):
+ """Make function that creates the plugin entry object."""
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_delete_command(self):
+ """Make function that deletes the plugin entry object."""
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """Make function that retrieves the entry using ${CMD}_show"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_find_command(self, *args, **kwargs):
+ """Make function that finds the entry using ${CMD}_find
+
+ Note that the name (or other search terms) needs to be specified
+ in arguments.
+ """
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_update_command(self, updates):
+ """Make function that modifies the entry using ${CMD}_mod"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def create(self, force=True):
+ """Helper function to create an entry and check the result"""
+ self.ensure_missing()
+ self.track_create()
+ command = self.make_create_command(force=force)
+ result = command()
+ self.check_create(result)
+
+ def track_create(self):
+ """Update expected state for host creation
+
+ The method should look similar to the following
+ example of host plugin.
+
+ self.attrs = dict(
+ dn=self.dn,
+ fqdn=[self.fqdn],
+ description=[self.description],
+ ... # all required attributes
+ )
+ self.exists = True
+ """
+ raise NotImplementedError(self._override_me_msg)
+
+ def check_create(self, result):
+ """Check plugin's add command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def delete(self):
+ """Helper function to delete a host and check the result"""
+ self.ensure_exists()
+ self.track_delete()
+ command = self.make_delete_command()
+ result = command()
+ self.check_delete(result)
+
+ def track_delete(self):
+ """Update expected state for host deletion"""
+ self.exists = False
+ self.attrs = {}
+
+ def check_delete(self, result):
+ """Check plugin's `del` command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def retrieve(self, all=False, raw=False):
+ """Helper function to retrieve an entry and check the result"""
+ self.ensure_exists()
+ command = self.make_retrieve_command(all=all, raw=raw)
+ result = command()
+ self.check_retrieve(result, all=all, raw=raw)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """Check the plugin's `show` command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def find(self, all=False, raw=False):
+ """Helper function to search for this hosts and check the result"""
+ self.ensure_exists()
+ command = self.make_find_command(self.name, all=all, raw=raw)
+ result = command()
+ self.check_find(result, all=all, raw=raw)
+
+ def check_find(self, result, all=False, raw=False):
+ """Check the plugin's `find` command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def update(self, updates, expected_updates=None):
+ """Helper function to update this hosts and check the result
+
+ The ``updates`` are used as options to the *_mod command,
+ and the self.attrs is updated with this dict.
+ Additionally, self.attrs is updated with ``expected_updates``.
+ """
+ if expected_updates is None:
+ expected_updates = {}
+
+ self.ensure_exists()
+ command = self.make_update_command(updates)
+ result = command()
+ self.attrs.update(updates)
+ self.attrs.update(expected_updates)
+ self.check_update(result, extra_keys=set(updates.keys()) |
+ set(expected_updates.keys()))
+
+ def check_update(self, result, extra_keys=()):
+ """Check the plugin's `find` command result"""
+ raise NotImplementedError(self._override_me_msg)
diff --git a/ipatests/test_xmlrpc/tracker/caacl_plugin.py b/ipatests/test_xmlrpc/tracker/caacl_plugin.py
new file mode 100644
index 000000000..afe7ee0c0
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/caacl_plugin.py
@@ -0,0 +1,367 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipalib import errors
+from ipatests.util import assert_deepequal
+from ipatests.test_xmlrpc.xmlrpc_test import (fuzzy_caacldn, fuzzy_uuid,
+ fuzzy_ipauniqueid)
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.tracker.base import Tracker
+
+
+class CAACLTracker(Tracker):
+ """Tracker class for CA ACL LDAP object.
+
+ The class implements methods required by the base class
+ to help with basic CRUD operations.
+
+ Methods for adding and deleting actual member entries into an ACL
+ do not have check methods as these would make the class
+ unnecessarily complicated. The checks are implemented as
+ a standalone test suite. However, this makes the test crucial
+ in debugging more complicated test cases. Since the add/remove
+ operation won't be checked right away by the tracker, a problem
+ in this operation may propagate into more complicated test case.
+
+ It is possible to pass a list of member uids to these methods.
+
+ The test uses instances of Fuzzy class to compare results as they
+ are in the UUID format. The dn and rdn properties were modified
+ to reflect this as well.
+ """
+
+ member_keys = {
+ u'memberuser_user', u'memberuser_group',
+ u'memberhost_host', u'memberhost_hostgroup',
+ u'memberservice_service',
+ u'ipamembercertprofile_certprofile'}
+ category_keys = {
+ u'ipacacategory', u'ipacertprofilecategory', u'usercategory',
+ u'hostcategory', u'servicecategory'}
+ retrieve_keys = {
+ u'dn', u'cn', u'description', u'ipaenabledflag',
+ u'ipamemberca', u'ipamembercertprofile', u'memberuser',
+ u'memberhost', u'memberservice'} | member_keys | category_keys
+ retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'}
+ create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory',
+ u'usercategory', u'hostcategory', u'ipacacategory',
+ u'servicecategory', u'ipaenabledflag', u'objectclass',
+ u'ipauniqueid'}
+ update_keys = create_keys - {u'dn'}
+
+ def __init__(self, name, ipacertprofile_category=None, user_category=None,
+ service_category=None, host_category=None, description=None,
+ default_version=None):
+ super(CAACLTracker, self).__init__(default_version=default_version)
+
+ self._name = name
+ self.description = description
+ self._categories = dict(
+ ipacertprofilecategory=ipacertprofile_category,
+ usercategory=user_category,
+ servicecategory=service_category,
+ hostcategory=host_category)
+
+ self.dn = fuzzy_caacldn
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def rdn(self):
+ return fuzzy_ipauniqueid
+
+ @property
+ def categories(self):
+ """To be used in track_create"""
+ return {cat: v for cat, v in self._categories.items() if v}
+
+ @property
+ def create_categories(self):
+ """ Return the categories set on create.
+ Unused categories are left out.
+ """
+ return {cat: [v] for cat, v in self.categories.items() if v}
+
+ def make_create_command(self, force=True):
+ return self.make_command(u'caacl_add', self.name,
+ description=self.description,
+ **self.categories)
+
+ def check_create(self, result):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Added CA ACL "{}"'.format(self.name),
+ result=dict(self.filter_attrs(self.create_keys))
+ ), result)
+
+ def track_create(self):
+ self.attrs = dict(
+ dn=self.dn,
+ ipauniqueid=[fuzzy_uuid],
+ cn=[self.name],
+ objectclass=objectclasses.caacl,
+ ipaenabledflag=[u'TRUE'])
+
+ self.attrs.update(self.create_categories)
+ if self.description:
+ self.attrs.update({u'description', [self.description]})
+
+ self.exists = True
+
+ def make_delete_command(self):
+ return self.make_command('caacl_del', self.name)
+
+ def check_delete(self, result):
+ assert_deepequal(dict(
+ value=[self.name],
+ summary=u'Deleted CA ACL "{}"'.format(self.name),
+ result=dict(failed=[])
+ ), result)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ return self.make_command('caacl_show', self.name, all=all, raw=raw)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.name,
+ summary=None,
+ result=expected
+ ), result)
+
+ def make_find_command(self, *args, **kwargs):
+ return self.make_command('caacl_find', *args, **kwargs)
+
+ def check_find(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 CA ACL matched',
+ result=[expected]
+ ), result)
+
+ def make_update_command(self, updates):
+ return self.make_command('caacl_mod', self.name, **updates)
+
+ def update(self, updates, expected_updates=None, silent=False):
+ """If removing a category, delete it from tracker as well"""
+ # filter out empty categories and track changes
+
+ filtered_updates = dict()
+ for key, value in updates.items():
+ if key in self.category_keys:
+ if not value:
+ try:
+ del self.attrs[key]
+ except IndexError:
+ if silent:
+ pass
+ else:
+ # if there is a value, prepare the pair for update
+ filtered_updates.update({key: value})
+ else:
+ filtered_updates.update({key: value})
+
+ if expected_updates is None:
+ expected_updates = {}
+
+ command = self.make_update_command(updates)
+
+ try:
+ result = command()
+ except errors.EmptyModlist:
+ if silent:
+ self.attrs.update(filtered_updates)
+ self.attrs.update(expected_updates)
+ self.check_update(result,
+ extra_keys=set(self.update_keys) |
+ set(expected_updates.keys()))
+
+ def check_update(self, result, extra_keys=()):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Modified CA ACL "{}"'.format(self.name),
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ # Helper methods for caacl subcommands. The check methods will be
+ # implemented in standalone test
+ #
+ # The methods implemented here will be:
+ # caacl_{add,remove}_{host, service, certprofile, user [, subca]}
+
+ def _add_acl_component(self, command_name, keys, track):
+ """ Add a resource into ACL rule and track it.
+
+ command_name - the name in the API
+ keys = {
+ 'tracker_attr': {
+ 'api_key': 'value'
+ }
+ }
+
+ e.g.
+
+ keys = {
+ 'memberhost_host': {
+ 'host': 'hostname'
+ },
+ 'memberhost_hostgroup': {
+ 'hostgroup': 'hostgroup_name'
+ }
+ }
+ """
+
+ if not self.exists:
+ raise errors.NotFound(reason="The tracked entry doesn't exist.")
+
+ command = self.make_command(command_name, self.name)
+ command_options = dict()
+
+ # track
+ for tracker_attr in keys:
+ api_options = keys[tracker_attr]
+ if track:
+ for option in api_options:
+ try:
+ if type(option) in (list, tuple):
+ self.attrs[tracker_attr].extend(api_options[option])
+ else:
+ self.attrs[tracker_attr].append(api_options[option])
+ except KeyError:
+ if type(option) in (list, tuple):
+ self.attrs[tracker_attr] = api_options[option]
+ else:
+ self.attrs[tracker_attr] = [api_options[option]]
+ # prepare options for the command call
+ command_options.update(api_options)
+
+ return command(**command_options)
+
+ def _remove_acl_component(self, command_name, keys, track):
+ """ Remove a resource from ACL rule and track it.
+
+ command_name - the name in the API
+ keys = {
+ 'tracker_attr': {
+ 'api_key': 'value'
+ }
+ }
+
+ e.g.
+
+ keys = {
+ 'memberhost_host': {
+ 'host': 'hostname'
+ },
+ 'memberhost_hostgroup': {
+ 'hostgroup': 'hostgroup_name'
+ }
+ }
+ """
+ command = self.make_command(command_name, self.name)
+ command_options = dict()
+
+ for tracker_attr in keys:
+ api_options = keys[tracker_attr]
+ if track:
+ for option in api_options:
+ if type(option) in (list, tuple):
+ for item in option:
+ self.attrs[tracker_attr].remove(item)
+ else:
+ self.attrs[tracker_attr].remove(api_options[option])
+ if len(self.attrs[tracker_attr]) == 0:
+ del self.attrs[tracker_attr]
+ command_options.update(api_options)
+
+ return command(**command_options)
+
+ def add_host(self, host=None, hostgroup=None, track=True):
+ """Associates an host or hostgroup entry with the ACL.
+
+ The command takes an unicode string with the name
+ of the entry (RDN).
+
+ It is the responsibility of a test writer to provide
+ the correct value, object type as the method does not
+ verify whether the entry exists.
+
+ The method can add only one entry of each type
+ in one call.
+ """
+
+ options = {
+ u'memberhost_host': {u'host': host},
+ u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
+
+ return self._add_acl_component(u'caacl_add_host', options, track)
+
+ def remove_host(self, host=None, hostgroup=None, track=True):
+ options = {
+ u'memberhost_host': {u'host': host},
+ u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
+
+ return self._remove_acl_component(u'caacl_remove_host', options, track)
+
+ def add_user(self, user=None, group=None, track=True):
+ options = {
+ u'memberuser_user': {u'user': user},
+ u'memberuser_group': {u'group': group}}
+
+ return self._add_acl_component(u'caacl_add_user', options, track)
+
+ def remove_user(self, user=None, group=None, track=True):
+ options = {
+ u'memberuser_user': {u'user': user},
+ u'memberuser_group': {u'group': group}}
+
+ return self._remove_acl_component(u'caacl_remove_user', options, track)
+
+ def add_service(self, service=None, track=True):
+ options = {
+ u'memberservice_service': {u'service': service}}
+
+ return self._add_acl_component(u'caacl_add_service', options, track)
+
+ def remove_service(self, service=None, track=True):
+ options = {
+ u'memberservice_service': {u'service': service}}
+
+ return self._remove_acl_component(u'caacl_remove_service', options, track)
+
+ def add_profile(self, certprofile=None, track=True):
+ options = {
+ u'ipamembercertprofile_certprofile':
+ {u'certprofile': certprofile}}
+
+ return self._add_acl_component(u'caacl_add_profile', options, track)
+
+ def remove_profile(self, certprofile=None, track=True):
+ options = {
+ u'ipamembercertprofile_certprofile':
+ {u'certprofile': certprofile}}
+
+ return self._remove_acl_component(u'caacl_remove_profile', options, track)
+
+ def enable(self):
+ command = self.make_command(u'caacl_enable', self.name)
+ self.attrs.update({u'ipaenabledflag': [u'TRUE']})
+ command()
+
+ def disable(self):
+ command = self.make_command(u'caacl_disable', self.name)
+ self.attrs.update({u'ipaenabledflag': [u'FALSE']})
+ command()
diff --git a/ipatests/test_xmlrpc/tracker/certprofile_plugin.py b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py
new file mode 100644
index 000000000..eeb27eb14
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py
@@ -0,0 +1,133 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+import os
+
+
+from ipapython.dn import DN
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.util import assert_deepequal
+
+
+class CertprofileTracker(Tracker):
+ """Tracker class for certprofile plugin.
+ """
+
+ retrieve_keys = {
+ 'dn', 'cn', 'description', 'ipacertprofilestoreissued'
+ }
+ retrieve_all_keys = retrieve_keys | {'objectclass'}
+ create_keys = retrieve_keys | {'objectclass'}
+ update_keys = retrieve_keys - {'dn'}
+ managedby_keys = retrieve_keys
+ allowedto_keys = retrieve_keys
+
+ def __init__(self, name, store=False, desc='dummy description',
+ profile=None, default_version=None):
+ super(CertprofileTracker, self).__init__(
+ default_version=default_version
+ )
+
+ self.store = store
+ self.description = desc
+ self._profile_path = profile
+
+ self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca',
+ self.api.env.basedn)
+
+ @property
+ def profile(self):
+ if not self._profile_path:
+ return None
+
+ if os.path.isabs(self._profile_path):
+ path = self._profile_path
+ else:
+ path = os.path.join(os.path.dirname(__file__),
+ self._profile_path)
+
+ with open(path, 'r') as f:
+ content = f.read()
+ return unicode(content)
+
+ def make_create_command(self, force=True):
+ if not self.profile:
+ raise RuntimeError('Tracker object without path to profile '
+ 'cannot be used to create profile entry.')
+
+ return self.make_command('certprofile_import', self.name,
+ description=self.description,
+ ipacertprofilestoreissued=self.store,
+ file=self.profile)
+
+ def check_create(self, result):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Imported profile "{}"'.format(self.name),
+ result=dict(self.filter_attrs(self.create_keys))
+ ), result)
+
+ def track_create(self):
+ self.attrs = dict(
+ dn=unicode(self.dn),
+ cn=[self.name],
+ description=[self.description],
+ ipacertprofilestoreissued=[unicode(self.store).upper()],
+ objectclass=objectclasses.certprofile
+ )
+ self.exists = True
+
+ def make_delete_command(self):
+ return self.make_command('certprofile_del', self.name)
+
+ def check_delete(self, result):
+ assert_deepequal(dict(
+ value=[self.name], # correctly a list?
+ summary=u'Deleted profile "{}"'.format(self.name),
+ result=dict(failed=[]),
+ ), result)
+
+ def make_retrieve_command(self, all=False, raw=False, **options):
+ return self.make_command('certprofile_show', self.name, all=all,
+ raw=raw, **options)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.name,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def make_find_command(self, *args, **kwargs):
+ return self.make_command('certprofile_find', *args, **kwargs)
+
+ def check_find(self, result, all=False, raw=False):
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 profile matched',
+ result=[expected]
+ ), result)
+
+ def make_update_command(self, updates):
+ return self.make_command('certprofile_mod', self.name, **updates)
+
+ def check_update(self, result, extra_keys=()):
+ assert_deepequal(dict(
+ value=self.name,
+ summary=u'Modified Certificate Profile "{}"'.format(self.name),
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
diff --git a/ipatests/test_xmlrpc/tracker/group_plugin.py b/ipatests/test_xmlrpc/tracker/group_plugin.py
new file mode 100644
index 000000000..c47ce8ecf
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/group_plugin.py
@@ -0,0 +1,196 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid
+
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.util import assert_deepequal, get_group_dn
+
+
+class GroupTracker(Tracker):
+ """ Class for host plugin like tests """
+ retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user',
+ u'member_group'}
+ retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'}
+
+ create_keys = retrieve_all_keys
+ update_keys = retrieve_keys - {u'dn'}
+
+ add_member_keys = retrieve_keys | {u'description'}
+
+ def __init__(self, name):
+ super(GroupTracker, self).__init__(default_version=None)
+ self.cn = name
+ self.dn = get_group_dn(name)
+
+ def make_create_command(self, nonposix=False, external=False,
+ force=True):
+ """ Make function that creates a group using 'group-add' """
+ return self.make_command('group_add', self.cn,
+ nonposix=nonposix, external=external)
+
+ def make_delete_command(self):
+ """ Make function that deletes a group using 'group-del' """
+ return self.make_command('group_del', self.cn)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a group using 'group-show' """
+ return self.make_command('group_show', self.cn, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that searches for a group using 'group-find' """
+ return self.make_command('group_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates a group using 'group-mod' """
+ return self.make_command('group_mod', self.cn, **updates)
+
+ def make_add_member_command(self, options={}):
+ """ Make function that adds a member to a group
+ Attention: only works for one user OR group! """
+ if u'user' in options:
+ self.attrs[u'member_user'] = [options[u'user']]
+ elif u'group' in options:
+ self.attrs[u'member_group'] = [options[u'group']]
+ self.adds = options
+
+ return self.make_command('group_add_member', self.cn, **options)
+
+ def make_remove_member_command(self, options={}):
+ """ Make function that removes a member from a group
+ Attention: only works for one user OR group! """
+ if u'user' in options:
+ del self.attrs[u'member_user']
+ elif u'group' in options:
+ del self.attrs[u'member_group']
+ return self.make_command('group_remove_member', self.cn, **options)
+
+ def make_detach_command(self):
+ """ Make function that detaches a managed group using
+ 'group-detach' """
+ self.exists = True
+ return self.make_command('group_detach', self.cn)
+
+ def track_create(self):
+ """ Updates expected state for group creation"""
+ self.attrs = dict(
+ dn=get_group_dn(self.cn),
+ cn=[self.cn],
+ gidnumber=[fuzzy_digits],
+ ipauniqueid=[fuzzy_uuid],
+ objectclass=objectclasses.posixgroup,
+ )
+ self.exists = True
+
+ def check_create(self, result):
+ """ Checks 'group_add' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Added group "%s"' % self.cn,
+ result=self.filter_attrs(self.create_keys)
+ ), result)
+
+ def check_delete(self, result):
+ """ Checks 'group_del' command result """
+ assert_deepequal(dict(
+ value=[self.cn],
+ summary=u'Deleted group "%s"' % self.cn,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """ Checks 'group_show' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=None,
+ result=expected
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Checks 'group_find' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 group matched',
+ result=[expected],
+ ), result)
+
+ def check_update(self, result, extra_keys={}):
+ """ Checks 'group_mod' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Modified group "%s"' % self.cn,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def check_add_member(self, result):
+ """ Checks 'group_add_member' command result """
+ assert_deepequal(dict(
+ completed=1,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ ), result)
+
+ def check_add_member_negative(self, result):
+ """ Checks 'group_add_member' command result when expected result
+ is failure of the operation"""
+ if u'member_user' in self.attrs:
+ del self.attrs[u'member_user']
+ elif u'member_group' in self.attrs:
+ del self.attrs[u'member_group']
+
+ expected = dict(
+ completed=0,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ )
+ if u'user' in self.adds:
+ expected[u'failed'][u'member'][u'user'] = [(
+ self.adds[u'user'], u'no such entry')]
+ elif u'group' in self.adds:
+ expected[u'failed'][u'member'][u'group'] = [(
+ self.adds[u'group'], u'no such entry')]
+
+ assert_deepequal(expected, result)
+
+ def check_remove_member(self, result):
+ """ Checks 'group_remove_member' command result """
+ assert_deepequal(dict(
+ completed=1,
+ failed={u'member': {u'group': (), u'user': ()}},
+ result=self.filter_attrs(self.add_member_keys)
+ ), result)
+
+ def check_detach(self, result):
+ """ Checks 'group_detach' command result """
+ assert_deepequal(dict(
+ value=self.cn,
+ summary=u'Detached group "%s" from user "%s"' % (
+ self.cn, self.cn),
+ result=True
+ ), result)
+
+ def make_fixture_detach(self, request):
+ """Make a pytest fixture for this tracker
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use itself.
+ """
+ def cleanup():
+ pass
+
+ request.addfinalizer(cleanup)
+
+ return self
diff --git a/ipatests/test_xmlrpc/tracker/host_plugin.py b/ipatests/test_xmlrpc/tracker/host_plugin.py
new file mode 100644
index 000000000..bf199f4f5
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/host_plugin.py
@@ -0,0 +1,154 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from __future__ import print_function
+
+
+from ipapython.dn import DN
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_uuid
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.util import assert_deepequal
+
+
+class HostTracker(Tracker):
+ """Wraps and tracks modifications to a Host object
+
+ Implements the helper functions for host plugin.
+
+ The HostTracker object stores information about the host, e.g.
+ ``fqdn`` and ``dn``.
+ """
+ retrieve_keys = {
+ 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host',
+ 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint',
+ 'serial_number', 'serial_number_hex', 'sha1_fingerprint',
+ 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before',
+ 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user',
+ 'ipaallowedtoperform_read_keys_group',
+ 'ipaallowedtoperform_read_keys_host',
+ 'ipaallowedtoperform_read_keys_hostgroup',
+ 'ipaallowedtoperform_write_keys_user',
+ 'ipaallowedtoperform_write_keys_group',
+ 'ipaallowedtoperform_write_keys_host',
+ 'ipaallowedtoperform_write_keys_hostgroup'}
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid',
+ u'managing_host', u'objectclass', u'serverhostname'}
+ create_keys = retrieve_keys | {'objectclass', 'ipauniqueid',
+ 'randompassword'}
+ update_keys = retrieve_keys - {'dn'}
+ managedby_keys = retrieve_keys - {'has_keytab', 'has_password'}
+ allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
+
+ def __init__(self, name, fqdn=None, default_version=None):
+ super(HostTracker, self).__init__(default_version=default_version)
+
+ self.shortname = name
+ if fqdn:
+ self.fqdn = fqdn
+ else:
+ self.fqdn = u'%s.%s' % (name, self.api.env.domain)
+ self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts',
+ self.api.env.basedn)
+
+ self.description = u'Test host <%s>' % name
+ self.location = u'Undisclosed location <%s>' % name
+
+ def make_create_command(self, force=True):
+ """Make function that creates this host using host_add"""
+ return self.make_command('host_add', self.fqdn,
+ description=self.description,
+ l=self.location,
+ force=force)
+
+ def make_delete_command(self):
+ """Make function that deletes the host using host_del"""
+ return self.make_command('host_del', self.fqdn)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """Make function that retrieves the host using host_show"""
+ return self.make_command('host_show', self.fqdn, all=all, raw=raw)
+
+ def make_find_command(self, *args, **kwargs):
+ """Make function that finds hosts using host_find
+
+ Note that the fqdn (or other search terms) needs to be specified
+ in arguments.
+ """
+ return self.make_command('host_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """Make function that modifies the host using host_mod"""
+ return self.make_command('host_mod', self.fqdn, **updates)
+
+ def track_create(self):
+ """Update expected state for host creation"""
+ self.attrs = dict(
+ dn=self.dn,
+ fqdn=[self.fqdn],
+ description=[self.description],
+ l=[self.location],
+ krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)],
+ objectclass=objectclasses.host,
+ ipauniqueid=[fuzzy_uuid],
+ managedby_host=[self.fqdn],
+ has_keytab=False,
+ has_password=False,
+ cn=[self.fqdn],
+ ipakrbokasdelegate=False,
+ ipakrbrequirespreauth=True,
+ managing_host=[self.fqdn],
+ serverhostname=[self.shortname],
+ )
+ self.exists = True
+
+ def check_create(self, result):
+ """Check `host_add` command result"""
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=u'Added host "%s"' % self.fqdn,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """Check `host_del` command result"""
+ assert_deepequal(dict(
+ value=[self.fqdn],
+ summary=u'Deleted host "%s"' % self.fqdn,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """Check `host_show` command result"""
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """Check `host_find` command result"""
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 host matched',
+ result=[expected],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """Check `host_update` command result"""
+ assert_deepequal(dict(
+ value=self.fqdn,
+ summary=u'Modified host "%s"' % self.fqdn,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
diff --git a/ipatests/test_xmlrpc/tracker/stageuser_plugin.py b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
new file mode 100644
index 000000000..09ad282e4
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
@@ -0,0 +1,267 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+import six
+
+from ipalib import api, errors
+
+from ipatests.test_xmlrpc.tracker.base import Tracker
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import (
+ fuzzy_string, fuzzy_dergeneralizedtime, raises_exact)
+
+from ipatests.util import assert_deepequal, get_user_dn
+from ipapython.dn import DN
+
+if six.PY3:
+ unicode = str
+
+sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X'
+ 'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI'
+ 'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2'
+ 'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm'
+ 'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01'
+ '9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF'
+ '0L public key test')
+sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B '
+ 'public key test (ssh-rsa)')
+
+
+class StageUserTracker(Tracker):
+ """ Tracker class for staged user LDAP object
+
+ Implements helper functions for host plugin.
+ StageUserTracker object stores information about the user.
+ """
+
+ retrieve_keys = {
+ u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
+ u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber',
+ u'title', u'memberof', u'nsaccountlock', u'memberofindirect',
+ u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink',
+ u'ipatokenradiususername', u'krbprincipalexpiration',
+ u'usercertificate', u'dn', u'has_keytab', u'has_password',
+ u'street', u'postalcode', u'facsimiletelephonenumber',
+ u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l',
+ u'st', u'mobile', u'pager', }
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipauniqueid', u'objectclass', u'description',
+ u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+
+ create_keys = retrieve_all_keys | {
+ u'objectclass', u'ipauniqueid', u'randompassword',
+ u'userpassword', u'krbextradata', u'krblastpwdchange',
+ u'krbpasswordexpiration', u'krbprincipalkey'}
+
+ update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
+ activate_keys = retrieve_keys | {
+ u'has_keytab', u'has_password', u'nsaccountlock'}
+
+ def __init__(self, name, givenname, sn, **kwargs):
+ super(StageUserTracker, self).__init__(default_version=None)
+ self.uid = name
+ self.givenname = givenname
+ self.sn = sn
+ self.dn = DN(
+ ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
+
+ self.kwargs = kwargs
+
+ def make_create_command(self, options=None, force=None):
+ """ Make function that creates a staged user using stageuser-add """
+ if options is not None:
+ self.kwargs = options
+ return self.make_command('stageuser_add', self.uid,
+ givenname=self.givenname,
+ sn=self.sn, **self.kwargs)
+
+ def make_delete_command(self):
+ """ Make function that deletes a staged user using stageuser-del """
+ return self.make_command('stageuser_del', self.uid)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a staged user using stageuser-show """
+ return self.make_command('stageuser_show', self.uid, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that finds staged user using stageuser-find """
+ return self.make_command('stageuser_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates staged user using stageuser-mod """
+ return self.make_command('stageuser_mod', self.uid, **updates)
+
+ def make_activate_command(self):
+ """ Make function that activates staged user
+ using stageuser-activate """
+ return self.make_command('stageuser_activate', self.uid)
+
+ def track_create(self):
+ """ Update expected state for staged user creation """
+ self.attrs = dict(
+ dn=self.dn,
+ uid=[self.uid],
+ givenname=[self.givenname],
+ sn=[self.sn],
+ homedirectory=[u'/home/%s' % self.uid],
+ displayname=[u'%s %s' % (self.givenname, self.sn)],
+ cn=[u'%s %s' % (self.givenname, self.sn)],
+ initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+ objectclass=objectclasses.user_base,
+ description=[u'__no_upg__'],
+ ipauniqueid=[u'autogenerate'],
+ uidnumber=[u'-1'],
+ gidnumber=[u'-1'],
+ krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+ mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+ gecos=[u'%s %s' % (self.givenname, self.sn)],
+ loginshell=[u'/bin/sh'],
+ has_keytab=False,
+ has_password=False,
+ nsaccountlock=[u'true'],
+ )
+
+ for key in self.kwargs:
+ if key == u'krbprincipalname':
+ self.attrs[key] = [u'%s@%s' % (
+ (self.kwargs[key].split('@'))[0].lower(),
+ (self.kwargs[key].split('@'))[1])]
+ elif key == u'manager':
+ self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))]
+ elif key == u'ipasshpubkey':
+ self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp]
+ self.attrs[key] = [self.kwargs[key]]
+ elif key == u'random' or key == u'userpassword':
+ self.attrs[u'krbextradata'] = [fuzzy_string]
+ self.attrs[u'krbpasswordexpiration'] = [
+ fuzzy_dergeneralizedtime]
+ self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime]
+ self.attrs[u'krbprincipalkey'] = [fuzzy_string]
+ self.attrs[u'userpassword'] = [fuzzy_string]
+ self.attrs[u'has_keytab'] = True
+ self.attrs[u'has_password'] = True
+ if key == u'random':
+ self.attrs[u'randompassword'] = fuzzy_string
+ else:
+ self.attrs[key] = [self.kwargs[key]]
+
+ self.exists = True
+
+ def check_create(self, result):
+ """ Check 'stageuser-add' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Added stage user "%s"' % self.uid,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """ Check 'stageuser-del' command result """
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Deleted stage user "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """ Check 'stageuser-show' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different
+ # type of nsaccountlock value than DS, but overall the value
+ # fits expected result
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Check 'stageuser-find' command result """
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different
+ # type of nsaccountlock value than DS, but overall the value
+ # fits expected result
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 user matched',
+ result=[expected],
+ ), result)
+
+ def check_find_nomatch(self, result):
+ """ Check 'stageuser-find' command result when no match is expected """
+ assert_deepequal(dict(
+ count=0,
+ truncated=False,
+ summary=u'0 users matched',
+ result=[],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """ Check 'stageuser-mod' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Modified stage user "%s"' % self.uid,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def check_restore_preserved(self, result):
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Staged user account "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def make_fixture_activate(self, request):
+ """Make a pytest fixture for a staged user that is to be activated
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it. It takes into account
+ that the staged user no longer exists after activation,
+ therefore the fixture verifies after the tests
+ that the staged user doesn't exist instead of deleting it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def finish():
+ with raises_exact(errors.NotFound(
+ reason=u'%s: stage user not found' % self.uid)):
+ del_command()
+
+ request.addfinalizer(finish)
+
+ return self
+
+ def create_from_preserved(self, user):
+ """ Copies values from preserved user - helper function for
+ restoration tests """
+ self.attrs = user.attrs
+ self.uid = user.uid
+ self.givenname = user.givenname
+ self.sn = user.sn
+ self.dn = DN(
+ ('uid', self.uid), api.env.container_stageuser, api.env.basedn)
+ self.attrs[u'dn'] = self.dn
diff --git a/ipatests/test_xmlrpc/tracker/user_plugin.py b/ipatests/test_xmlrpc/tracker/user_plugin.py
new file mode 100644
index 000000000..af7d85836
--- /dev/null
+++ b/ipatests/test_xmlrpc/tracker/user_plugin.py
@@ -0,0 +1,340 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+from ipalib import api, errors
+from ipapython.dn import DN
+
+from ipatests.util import assert_deepequal, get_group_dn
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid, raises_exact
+from ipatests.test_xmlrpc.tracker.base import Tracker
+
+
+class UserTracker(Tracker):
+ """ Class for host plugin like tests """
+
+ retrieve_keys = {
+ u'uid', u'givenname', u'sn', u'homedirectory',
+ u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou',
+ u'telephonenumber', u'title', u'memberof',
+ u'memberofindirect', u'ipauserauthtype', u'userclass',
+ u'ipatokenradiusconfiglink', u'ipatokenradiususername',
+ u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab',
+ u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber',
+ u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock',
+ u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata',
+ u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st'
+ }
+
+ retrieve_all_keys = retrieve_keys | {
+ u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry',
+ u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+
+ retrieve_preserved_keys = retrieve_keys - {u'memberof_group'}
+ retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'}
+
+ create_keys = retrieve_all_keys | {
+ u'randompassword', u'mepmanagedentry',
+ u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange',
+ u'krbprincipalkey', u'randompassword', u'userpassword'
+ }
+ update_keys = retrieve_keys - {u'dn'}
+ activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password',
+ u'nsaccountlock', u'sshpubkeyfp'}
+
+ find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
+ find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'}
+
+ def __init__(self, name, givenname, sn, **kwargs):
+ super(UserTracker, self).__init__(default_version=None)
+ self.uid = name
+ self.givenname = givenname
+ self.sn = sn
+ self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn)
+
+ self.kwargs = kwargs
+
+ def make_create_command(self, force=None):
+ """ Make function that crates a user using user-add """
+ return self.make_command(
+ 'user_add', self.uid,
+ givenname=self.givenname,
+ sn=self.sn, **self.kwargs
+ )
+
+ def make_delete_command(self, no_preserve=True, preserve=False):
+ """ Make function that deletes a user using user-del """
+
+ if preserve and not no_preserve:
+ # necessary to change some user attributes due to moving
+ # to different container
+ self.attrs[u'dn'] = DN(
+ ('uid', self.uid),
+ api.env.container_deleteuser,
+ api.env.basedn
+ )
+ self.attrs[u'objectclass'] = objectclasses.user_base
+
+ return self.make_command(
+ 'user_del', self.uid,
+ no_preserve=no_preserve,
+ preserve=preserve
+ )
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """ Make function that retrieves a user using user-show """
+ return self.make_command('user_show', self.uid, all=all)
+
+ def make_find_command(self, *args, **kwargs):
+ """ Make function that finds user using user-find """
+ return self.make_command('user_find', *args, **kwargs)
+
+ def make_update_command(self, updates):
+ """ Make function that updates user using user-mod """
+ return self.make_command('user_mod', self.uid, **updates)
+
+ def make_undelete_command(self):
+ """ Make function that activates preserved user using user-undel """
+ return self.make_command('user_undel', self.uid)
+
+ def make_enable_command(self):
+ """ Make function that enables user using user-enable """
+ return self.make_command('user_enable', self.uid)
+
+ def make_stage_command(self):
+ """ Make function that restores preserved user by moving it to
+ staged container """
+ return self.make_command('user_stage', self.uid)
+
+ def track_create(self):
+ """ Update expected state for user creation """
+ self.attrs = dict(
+ dn=self.dn,
+ uid=[self.uid],
+ givenname=[self.givenname],
+ sn=[self.sn],
+ homedirectory=[u'/home/%s' % self.uid],
+ displayname=[u'%s %s' % (self.givenname, self.sn)],
+ cn=[u'%s %s' % (self.givenname, self.sn)],
+ initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+ objectclass=objectclasses.user,
+ description=[u'__no_upg__'],
+ ipauniqueid=[fuzzy_uuid],
+ uidnumber=[fuzzy_digits],
+ gidnumber=[fuzzy_digits],
+ krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+ mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+ gecos=[u'%s %s' % (self.givenname, self.sn)],
+ loginshell=[u'/bin/sh'],
+ has_keytab=False,
+ has_password=False,
+ mepmanagedentry=[get_group_dn(self.uid)],
+ memberof_group=[u'ipausers'],
+ )
+
+ for key in self.kwargs:
+ if key == u'krbprincipalname':
+ self.attrs[key] = [u'%s@%s' % (
+ (self.kwargs[key].split('@'))[0].lower(),
+ (self.kwargs[key].split('@'))[1]
+ )]
+ else:
+ self.attrs[key] = [self.kwargs[key]]
+
+ self.exists = True
+
+ def check_create(self, result):
+ """ Check 'user-add' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Added user "%s"' % self.uid,
+ result=self.filter_attrs(self.create_keys),
+ ), result)
+
+ def check_delete(self, result):
+ """ Check 'user-del' command result """
+ assert_deepequal(dict(
+ value=[self.uid],
+ summary=u'Deleted user "%s"' % self.uid,
+ result=dict(failed=[]),
+ ), result)
+
+ def check_retrieve(self, result, all=False):
+ """ Check 'user-show' command result """
+
+ if u'preserved' in self.attrs and self.attrs[u'preserved']:
+ self.retrieve_all_keys = self.retrieve_preserved_all_keys
+ self.retrieve_keys = self.retrieve_preserved_keys
+ elif u'preserved' not in self.attrs and all:
+ self.attrs[u'preserved'] = False
+
+ if all:
+ expected = self.filter_attrs(self.retrieve_all_keys)
+ else:
+ expected = self.filter_attrs(self.retrieve_keys)
+
+ # small override because stageuser-find returns different type
+ # of nsaccountlock value than DS, but overall the value fits
+ # expected result
+ if u'nsaccountlock' in expected:
+ if expected[u'nsaccountlock'] == [u'true']:
+ expected[u'nsaccountlock'] = True
+ elif expected[u'nsaccountlock'] == [u'false']:
+ expected[u'nsaccountlock'] = False
+
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=None,
+ result=expected,
+ ), result)
+
+ def check_find(self, result, all=False, raw=False):
+ """ Check 'user-find' command result """
+ self.attrs[u'nsaccountlock'] = True
+ self.attrs[u'preserved'] = True
+
+ if all:
+ expected = self.filter_attrs(self.find_all_keys)
+ else:
+ expected = self.filter_attrs(self.find_keys)
+
+ assert_deepequal(dict(
+ count=1,
+ truncated=False,
+ summary=u'1 user matched',
+ result=[expected],
+ ), result)
+
+ def check_find_nomatch(self, result):
+ """ Check 'user-find' command result when no user should be found """
+ assert_deepequal(dict(
+ count=0,
+ truncated=False,
+ summary=u'0 users matched',
+ result=[],
+ ), result)
+
+ def check_update(self, result, extra_keys=()):
+ """ Check 'user-mod' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Modified user "%s"' % self.uid,
+ result=self.filter_attrs(self.update_keys | set(extra_keys))
+ ), result)
+
+ def create_from_staged(self, stageduser):
+ """ Copies attributes from staged user - helper function for
+ activation tests """
+ self.attrs = stageduser.attrs
+ self.uid = stageduser.uid
+ self.givenname = stageduser.givenname
+ self.sn = stageduser.sn
+
+ self.attrs[u'mepmanagedentry'] = None
+ self.attrs[u'dn'] = self.dn
+ self.attrs[u'ipauniqueid'] = [fuzzy_uuid]
+ self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % (
+ api.env.container_group, api.env.basedn
+ )]
+ self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % (
+ self.uid, api.env.container_group, api.env.basedn
+ )]
+ self.attrs[u'objectclass'] = objectclasses.user
+ if self.attrs[u'gidnumber'] == [u'-1']:
+ self.attrs[u'gidnumber'] = [fuzzy_digits]
+ if self.attrs[u'uidnumber'] == [u'-1']:
+ self.attrs[u'uidnumber'] = [fuzzy_digits]
+
+ if u'ipasshpubkey' in self.kwargs:
+ self.attrs[u'ipasshpubkey'] = [str(
+ self.kwargs[u'ipasshpubkey']
+ )]
+
+ def check_activate(self, result):
+ """ Check 'stageuser-activate' command result """
+ expected = dict(
+ value=self.uid,
+ summary=u'Stage user %s activated' % self.uid,
+ result=self.filter_attrs(self.activate_keys))
+
+ # work around to eliminate inconsistency in returned objectclass
+ # (case sensitive assertion)
+ expected['result']['objectclass'] = [item.lower() for item in
+ expected['result']['objectclass']]
+ result['result']['objectclass'] = [item.lower() for item in
+ result['result']['objectclass']]
+
+ assert_deepequal(expected, result)
+
+ self.exists = True
+
+ def check_undel(self, result):
+ """ Check 'user-undel' command result """
+ assert_deepequal(dict(
+ value=self.uid,
+ summary=u'Undeleted user account "%s"' % self.uid,
+ result=True
+ ), result)
+
+ def track_delete(self, preserve=False):
+ """Update expected state for host deletion"""
+ if preserve:
+ self.exists = True
+ if u'memberof_group' in self.attrs:
+ del self.attrs[u'memberof_group']
+ self.attrs[u'nsaccountlock'] = True
+ self.attrs[u'preserved'] = True
+ else:
+ self.exists = False
+ self.attrs = {}
+
+ def make_preserved_user(self):
+ """ 'Creates' a preserved user necessary for some tests """
+ self.ensure_exists()
+ self.track_delete(preserve=True)
+ command = self.make_delete_command(no_preserve=False, preserve=True)
+ result = command()
+ self.check_delete(result)
+
+ def check_attr_preservation(self, expected):
+ """ Verifies that ipaUniqueID, uidNumber and gidNumber are
+ preserved upon reactivation. Also verifies that resulting
+ active user is a member of ipausers group only."""
+ command = self.make_retrieve_command(all=True)
+ result = command()
+
+ assert_deepequal(dict(
+ ipauniqueid=result[u'result'][u'ipauniqueid'],
+ uidnumber=result[u'result'][u'uidnumber'],
+ gidnumber=result[u'result'][u'gidnumber']
+ ), expected)
+
+ if (u'memberof_group' not in result[u'result'] or
+ result[u'result'][u'memberof_group'] != (u'ipausers',)):
+ assert False
+
+ def make_fixture_restore(self, request):
+ """Make a pytest fixture for a preserved user that is to be moved to
+ staged area.
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it. It takes into account
+ that the preserved user no longer exists after restoring it,
+ therefore the fixture verifies after the tests
+ that the preserved user doesn't exist instead of deleting it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def finish():
+ with raises_exact(errors.NotFound(
+ reason=u'%s: user not found' % self.uid)):
+ del_command()
+
+ request.addfinalizer(finish)
+
+ return self