diff options
| author | Milan KubĂk <mkubik@redhat.com> | 2015-11-19 16:07:29 +0100 |
|---|---|---|
| committer | Martin Basti <mbasti@redhat.com> | 2015-12-02 17:12:24 +0100 |
| commit | 17f9ca154b47f1e21797d25435e25676fdca284c (patch) | |
| tree | fd3f5b2976acd3ca0718c88dbbe35782982be2b6 /ipatests/test_xmlrpc/tracker | |
| parent | b8c619a7139bd7b65caa03b68431e22791ff19bf (diff) | |
| download | freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.gz freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.tar.xz freeipa-17f9ca154b47f1e21797d25435e25676fdca284c.zip | |
Separated Tracker implementations into standalone package
The previous way of implementing trackers in the module with
the test caused circular imports. The separate package resolves
this issue.
https://fedorahosted.org/freeipa/ticket/5467
Reviewed-By: Ales 'alich' Marecek <amarecek@redhat.com>
Diffstat (limited to 'ipatests/test_xmlrpc/tracker')
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/__init__.py | 0 | ||||
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/base.py | 289 | ||||
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/caacl_plugin.py | 367 | ||||
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/certprofile_plugin.py | 133 | ||||
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/group_plugin.py | 196 | ||||
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/host_plugin.py | 154 | ||||
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/stageuser_plugin.py | 267 | ||||
| -rw-r--r-- | ipatests/test_xmlrpc/tracker/user_plugin.py | 340 |
8 files changed, 1746 insertions, 0 deletions
diff --git a/ipatests/test_xmlrpc/tracker/__init__.py b/ipatests/test_xmlrpc/tracker/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/__init__.py diff --git a/ipatests/test_xmlrpc/tracker/base.py b/ipatests/test_xmlrpc/tracker/base.py new file mode 100644 index 000000000..acd382dd3 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/base.py @@ -0,0 +1,289 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +""" +Implements a base class to track changes to an LDAP object. +""" +from __future__ import print_function + +import functools + +from ipalib import api, errors +from ipapython.dn import DN +from ipapython.version import API_VERSION +from ipatests.util import Fuzzy + + +class Tracker(object): + """Wraps and tracks modifications to a plugin LDAP entry object + + Stores a copy of state of a plugin entry object and allows checking that + the state in the database is the same as expected. + This allows creating independent tests: the individual tests check + that the relevant changes have been made. At the same time + the entry doesn't need to be recreated and cleaned up for each test. + + Two attributes are used for tracking: ``exists`` (true if the entry is + supposed to exist) and ``attrs`` (a dict of LDAP attributes that are + expected to be returned from IPA commands). + + For commonly used operations, there is a helper method, e.g. + ``create``, ``update``, or ``find``, that does these steps: + + * ensure the entry exists (or does not exist, for "create") + * store the expected modifications + * get the IPA command to run, and run it + * check that the result matches the expected state + + Tests that require customization of these steps are expected to do them + manually, using lower-level methods. + Especially the first step (ensure the entry exists) is important for + achieving independent tests. + + The Tracker object also stores information about the entry, e.g. + ``dn``, ``rdn`` and ``name`` which is derived from DN property. + + To use this class, the programer must subclass it and provide the + implementation of following methods: + + * make_*_command -- implementing the API call for particular plugin + and operation (add, delete, ...) + These methods should use the make_command method + * check_* commands -- an assertion for a plugin command (CRUD) + * track_create -- to make an internal representation of the + entry + + Apart from overriding these methods, the subclass must provide the + distinguished name of the entry in `self.dn` property. + + It is also required to override the class variables defining the sets + of ldap attributes/keys for these operations specific to the plugin + being implemented. Take the host plugin test for an example. + + The implementation of these methods is not strictly enforced. + A missing method will cause a NotImplementedError during runtime + as a result. + """ + retrieve_keys = None + retrieve_all_keys = None + create_keys = None + update_keys = None + managedby_keys = None + allowedto_keys = None + + _override_me_msg = "This method needs to be overridden in a subclass" + + def __init__(self, default_version=None): + self.api = api + self.default_version = default_version or API_VERSION + self._dn = None + + self.exists = False + + @property + def dn(self): + """A property containing the distinguished name of the entry.""" + if not self._dn: + raise ValueError('The DN must be set in the init method.') + return self._dn + + @dn.setter + def dn(self, value): + if not (isinstance(value, DN) or isinstance(value, Fuzzy)): + raise ValueError('The value must be an instance of DN or Fuzzy.') + self._dn = value + + @property + def rdn(self): + return self.dn[0] + + @property + def name(self): + """Property holding the name of the entry in LDAP. + + This property is computed in runtime. + """ + return self.rdn.value + + def filter_attrs(self, keys): + """Return a dict of expected attrs, filtered by the given keys""" + if not self.attrs: + raise RuntimeError('The tracker instance has no attributes.') + return {k: v for k, v in self.attrs.items() if k in keys} + + def run_command(self, name, *args, **options): + """Run the given IPA command + + Logs the command using print for easier debugging + """ + cmd = self.api.Command[name] + + options.setdefault('version', self.default_version) + + args_repr = ', '.join( + [repr(a) for a in args] + + ['%s=%r' % item for item in list(options.items())]) + try: + result = cmd(*args, **options) + except Exception as e: + print('Ran command: %s(%s): %s: %s' % (cmd, args_repr, + type(e).__name__, e)) + raise + else: + print('Ran command: %s(%s): OK' % (cmd, args_repr)) + return result + + def make_command(self, name, *args, **options): + """Make a functools.partial function to run the given command""" + return functools.partial(self.run_command, name, *args, **options) + + def make_fixture(self, request): + """Make a pytest fixture for this tracker + + The fixture ensures the plugin entry does not exist before + and after the tests that use it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass + + def cleanup(): + existed = self.exists + try: + del_command() + except errors.NotFound: + if existed: + raise + self.exists = False + + request.addfinalizer(cleanup) + + return self + + def ensure_exists(self): + """If the entry does not exist (according to tracker state), create it + """ + if not self.exists: + self.create(force=True) + + def ensure_missing(self): + """If the entry exists (according to tracker state), delete it + """ + if self.exists: + self.delete() + + def make_create_command(self, force=True): + """Make function that creates the plugin entry object.""" + raise NotImplementedError(self._override_me_msg) + + def make_delete_command(self): + """Make function that deletes the plugin entry object.""" + raise NotImplementedError(self._override_me_msg) + + def make_retrieve_command(self, all=False, raw=False): + """Make function that retrieves the entry using ${CMD}_show""" + raise NotImplementedError(self._override_me_msg) + + def make_find_command(self, *args, **kwargs): + """Make function that finds the entry using ${CMD}_find + + Note that the name (or other search terms) needs to be specified + in arguments. + """ + raise NotImplementedError(self._override_me_msg) + + def make_update_command(self, updates): + """Make function that modifies the entry using ${CMD}_mod""" + raise NotImplementedError(self._override_me_msg) + + def create(self, force=True): + """Helper function to create an entry and check the result""" + self.ensure_missing() + self.track_create() + command = self.make_create_command(force=force) + result = command() + self.check_create(result) + + def track_create(self): + """Update expected state for host creation + + The method should look similar to the following + example of host plugin. + + self.attrs = dict( + dn=self.dn, + fqdn=[self.fqdn], + description=[self.description], + ... # all required attributes + ) + self.exists = True + """ + raise NotImplementedError(self._override_me_msg) + + def check_create(self, result): + """Check plugin's add command result""" + raise NotImplementedError(self._override_me_msg) + + def delete(self): + """Helper function to delete a host and check the result""" + self.ensure_exists() + self.track_delete() + command = self.make_delete_command() + result = command() + self.check_delete(result) + + def track_delete(self): + """Update expected state for host deletion""" + self.exists = False + self.attrs = {} + + def check_delete(self, result): + """Check plugin's `del` command result""" + raise NotImplementedError(self._override_me_msg) + + def retrieve(self, all=False, raw=False): + """Helper function to retrieve an entry and check the result""" + self.ensure_exists() + command = self.make_retrieve_command(all=all, raw=raw) + result = command() + self.check_retrieve(result, all=all, raw=raw) + + def check_retrieve(self, result, all=False, raw=False): + """Check the plugin's `show` command result""" + raise NotImplementedError(self._override_me_msg) + + def find(self, all=False, raw=False): + """Helper function to search for this hosts and check the result""" + self.ensure_exists() + command = self.make_find_command(self.name, all=all, raw=raw) + result = command() + self.check_find(result, all=all, raw=raw) + + def check_find(self, result, all=False, raw=False): + """Check the plugin's `find` command result""" + raise NotImplementedError(self._override_me_msg) + + def update(self, updates, expected_updates=None): + """Helper function to update this hosts and check the result + + The ``updates`` are used as options to the *_mod command, + and the self.attrs is updated with this dict. + Additionally, self.attrs is updated with ``expected_updates``. + """ + if expected_updates is None: + expected_updates = {} + + self.ensure_exists() + command = self.make_update_command(updates) + result = command() + self.attrs.update(updates) + self.attrs.update(expected_updates) + self.check_update(result, extra_keys=set(updates.keys()) | + set(expected_updates.keys())) + + def check_update(self, result, extra_keys=()): + """Check the plugin's `find` command result""" + raise NotImplementedError(self._override_me_msg) diff --git a/ipatests/test_xmlrpc/tracker/caacl_plugin.py b/ipatests/test_xmlrpc/tracker/caacl_plugin.py new file mode 100644 index 000000000..afe7ee0c0 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/caacl_plugin.py @@ -0,0 +1,367 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from ipalib import errors +from ipatests.util import assert_deepequal +from ipatests.test_xmlrpc.xmlrpc_test import (fuzzy_caacldn, fuzzy_uuid, + fuzzy_ipauniqueid) +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.tracker.base import Tracker + + +class CAACLTracker(Tracker): + """Tracker class for CA ACL LDAP object. + + The class implements methods required by the base class + to help with basic CRUD operations. + + Methods for adding and deleting actual member entries into an ACL + do not have check methods as these would make the class + unnecessarily complicated. The checks are implemented as + a standalone test suite. However, this makes the test crucial + in debugging more complicated test cases. Since the add/remove + operation won't be checked right away by the tracker, a problem + in this operation may propagate into more complicated test case. + + It is possible to pass a list of member uids to these methods. + + The test uses instances of Fuzzy class to compare results as they + are in the UUID format. The dn and rdn properties were modified + to reflect this as well. + """ + + member_keys = { + u'memberuser_user', u'memberuser_group', + u'memberhost_host', u'memberhost_hostgroup', + u'memberservice_service', + u'ipamembercertprofile_certprofile'} + category_keys = { + u'ipacacategory', u'ipacertprofilecategory', u'usercategory', + u'hostcategory', u'servicecategory'} + retrieve_keys = { + u'dn', u'cn', u'description', u'ipaenabledflag', + u'ipamemberca', u'ipamembercertprofile', u'memberuser', + u'memberhost', u'memberservice'} | member_keys | category_keys + retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'} + create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory', + u'usercategory', u'hostcategory', u'ipacacategory', + u'servicecategory', u'ipaenabledflag', u'objectclass', + u'ipauniqueid'} + update_keys = create_keys - {u'dn'} + + def __init__(self, name, ipacertprofile_category=None, user_category=None, + service_category=None, host_category=None, description=None, + default_version=None): + super(CAACLTracker, self).__init__(default_version=default_version) + + self._name = name + self.description = description + self._categories = dict( + ipacertprofilecategory=ipacertprofile_category, + usercategory=user_category, + servicecategory=service_category, + hostcategory=host_category) + + self.dn = fuzzy_caacldn + + @property + def name(self): + return self._name + + @property + def rdn(self): + return fuzzy_ipauniqueid + + @property + def categories(self): + """To be used in track_create""" + return {cat: v for cat, v in self._categories.items() if v} + + @property + def create_categories(self): + """ Return the categories set on create. + Unused categories are left out. + """ + return {cat: [v] for cat, v in self.categories.items() if v} + + def make_create_command(self, force=True): + return self.make_command(u'caacl_add', self.name, + description=self.description, + **self.categories) + + def check_create(self, result): + assert_deepequal(dict( + value=self.name, + summary=u'Added CA ACL "{}"'.format(self.name), + result=dict(self.filter_attrs(self.create_keys)) + ), result) + + def track_create(self): + self.attrs = dict( + dn=self.dn, + ipauniqueid=[fuzzy_uuid], + cn=[self.name], + objectclass=objectclasses.caacl, + ipaenabledflag=[u'TRUE']) + + self.attrs.update(self.create_categories) + if self.description: + self.attrs.update({u'description', [self.description]}) + + self.exists = True + + def make_delete_command(self): + return self.make_command('caacl_del', self.name) + + def check_delete(self, result): + assert_deepequal(dict( + value=[self.name], + summary=u'Deleted CA ACL "{}"'.format(self.name), + result=dict(failed=[]) + ), result) + + def make_retrieve_command(self, all=False, raw=False): + return self.make_command('caacl_show', self.name, all=all, raw=raw) + + def check_retrieve(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + value=self.name, + summary=None, + result=expected + ), result) + + def make_find_command(self, *args, **kwargs): + return self.make_command('caacl_find', *args, **kwargs) + + def check_find(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 CA ACL matched', + result=[expected] + ), result) + + def make_update_command(self, updates): + return self.make_command('caacl_mod', self.name, **updates) + + def update(self, updates, expected_updates=None, silent=False): + """If removing a category, delete it from tracker as well""" + # filter out empty categories and track changes + + filtered_updates = dict() + for key, value in updates.items(): + if key in self.category_keys: + if not value: + try: + del self.attrs[key] + except IndexError: + if silent: + pass + else: + # if there is a value, prepare the pair for update + filtered_updates.update({key: value}) + else: + filtered_updates.update({key: value}) + + if expected_updates is None: + expected_updates = {} + + command = self.make_update_command(updates) + + try: + result = command() + except errors.EmptyModlist: + if silent: + self.attrs.update(filtered_updates) + self.attrs.update(expected_updates) + self.check_update(result, + extra_keys=set(self.update_keys) | + set(expected_updates.keys())) + + def check_update(self, result, extra_keys=()): + assert_deepequal(dict( + value=self.name, + summary=u'Modified CA ACL "{}"'.format(self.name), + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + # Helper methods for caacl subcommands. The check methods will be + # implemented in standalone test + # + # The methods implemented here will be: + # caacl_{add,remove}_{host, service, certprofile, user [, subca]} + + def _add_acl_component(self, command_name, keys, track): + """ Add a resource into ACL rule and track it. + + command_name - the name in the API + keys = { + 'tracker_attr': { + 'api_key': 'value' + } + } + + e.g. + + keys = { + 'memberhost_host': { + 'host': 'hostname' + }, + 'memberhost_hostgroup': { + 'hostgroup': 'hostgroup_name' + } + } + """ + + if not self.exists: + raise errors.NotFound(reason="The tracked entry doesn't exist.") + + command = self.make_command(command_name, self.name) + command_options = dict() + + # track + for tracker_attr in keys: + api_options = keys[tracker_attr] + if track: + for option in api_options: + try: + if type(option) in (list, tuple): + self.attrs[tracker_attr].extend(api_options[option]) + else: + self.attrs[tracker_attr].append(api_options[option]) + except KeyError: + if type(option) in (list, tuple): + self.attrs[tracker_attr] = api_options[option] + else: + self.attrs[tracker_attr] = [api_options[option]] + # prepare options for the command call + command_options.update(api_options) + + return command(**command_options) + + def _remove_acl_component(self, command_name, keys, track): + """ Remove a resource from ACL rule and track it. + + command_name - the name in the API + keys = { + 'tracker_attr': { + 'api_key': 'value' + } + } + + e.g. + + keys = { + 'memberhost_host': { + 'host': 'hostname' + }, + 'memberhost_hostgroup': { + 'hostgroup': 'hostgroup_name' + } + } + """ + command = self.make_command(command_name, self.name) + command_options = dict() + + for tracker_attr in keys: + api_options = keys[tracker_attr] + if track: + for option in api_options: + if type(option) in (list, tuple): + for item in option: + self.attrs[tracker_attr].remove(item) + else: + self.attrs[tracker_attr].remove(api_options[option]) + if len(self.attrs[tracker_attr]) == 0: + del self.attrs[tracker_attr] + command_options.update(api_options) + + return command(**command_options) + + def add_host(self, host=None, hostgroup=None, track=True): + """Associates an host or hostgroup entry with the ACL. + + The command takes an unicode string with the name + of the entry (RDN). + + It is the responsibility of a test writer to provide + the correct value, object type as the method does not + verify whether the entry exists. + + The method can add only one entry of each type + in one call. + """ + + options = { + u'memberhost_host': {u'host': host}, + u'memberhost_hostgroup': {u'hostgroup': hostgroup}} + + return self._add_acl_component(u'caacl_add_host', options, track) + + def remove_host(self, host=None, hostgroup=None, track=True): + options = { + u'memberhost_host': {u'host': host}, + u'memberhost_hostgroup': {u'hostgroup': hostgroup}} + + return self._remove_acl_component(u'caacl_remove_host', options, track) + + def add_user(self, user=None, group=None, track=True): + options = { + u'memberuser_user': {u'user': user}, + u'memberuser_group': {u'group': group}} + + return self._add_acl_component(u'caacl_add_user', options, track) + + def remove_user(self, user=None, group=None, track=True): + options = { + u'memberuser_user': {u'user': user}, + u'memberuser_group': {u'group': group}} + + return self._remove_acl_component(u'caacl_remove_user', options, track) + + def add_service(self, service=None, track=True): + options = { + u'memberservice_service': {u'service': service}} + + return self._add_acl_component(u'caacl_add_service', options, track) + + def remove_service(self, service=None, track=True): + options = { + u'memberservice_service': {u'service': service}} + + return self._remove_acl_component(u'caacl_remove_service', options, track) + + def add_profile(self, certprofile=None, track=True): + options = { + u'ipamembercertprofile_certprofile': + {u'certprofile': certprofile}} + + return self._add_acl_component(u'caacl_add_profile', options, track) + + def remove_profile(self, certprofile=None, track=True): + options = { + u'ipamembercertprofile_certprofile': + {u'certprofile': certprofile}} + + return self._remove_acl_component(u'caacl_remove_profile', options, track) + + def enable(self): + command = self.make_command(u'caacl_enable', self.name) + self.attrs.update({u'ipaenabledflag': [u'TRUE']}) + command() + + def disable(self): + command = self.make_command(u'caacl_disable', self.name) + self.attrs.update({u'ipaenabledflag': [u'FALSE']}) + command() diff --git a/ipatests/test_xmlrpc/tracker/certprofile_plugin.py b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py new file mode 100644 index 000000000..eeb27eb14 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/certprofile_plugin.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +import os + + +from ipapython.dn import DN +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.test_xmlrpc import objectclasses +from ipatests.util import assert_deepequal + + +class CertprofileTracker(Tracker): + """Tracker class for certprofile plugin. + """ + + retrieve_keys = { + 'dn', 'cn', 'description', 'ipacertprofilestoreissued' + } + retrieve_all_keys = retrieve_keys | {'objectclass'} + create_keys = retrieve_keys | {'objectclass'} + update_keys = retrieve_keys - {'dn'} + managedby_keys = retrieve_keys + allowedto_keys = retrieve_keys + + def __init__(self, name, store=False, desc='dummy description', + profile=None, default_version=None): + super(CertprofileTracker, self).__init__( + default_version=default_version + ) + + self.store = store + self.description = desc + self._profile_path = profile + + self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca', + self.api.env.basedn) + + @property + def profile(self): + if not self._profile_path: + return None + + if os.path.isabs(self._profile_path): + path = self._profile_path + else: + path = os.path.join(os.path.dirname(__file__), + self._profile_path) + + with open(path, 'r') as f: + content = f.read() + return unicode(content) + + def make_create_command(self, force=True): + if not self.profile: + raise RuntimeError('Tracker object without path to profile ' + 'cannot be used to create profile entry.') + + return self.make_command('certprofile_import', self.name, + description=self.description, + ipacertprofilestoreissued=self.store, + file=self.profile) + + def check_create(self, result): + assert_deepequal(dict( + value=self.name, + summary=u'Imported profile "{}"'.format(self.name), + result=dict(self.filter_attrs(self.create_keys)) + ), result) + + def track_create(self): + self.attrs = dict( + dn=unicode(self.dn), + cn=[self.name], + description=[self.description], + ipacertprofilestoreissued=[unicode(self.store).upper()], + objectclass=objectclasses.certprofile + ) + self.exists = True + + def make_delete_command(self): + return self.make_command('certprofile_del', self.name) + + def check_delete(self, result): + assert_deepequal(dict( + value=[self.name], # correctly a list? + summary=u'Deleted profile "{}"'.format(self.name), + result=dict(failed=[]), + ), result) + + def make_retrieve_command(self, all=False, raw=False, **options): + return self.make_command('certprofile_show', self.name, all=all, + raw=raw, **options) + + def check_retrieve(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + value=self.name, + summary=None, + result=expected, + ), result) + + def make_find_command(self, *args, **kwargs): + return self.make_command('certprofile_find', *args, **kwargs) + + def check_find(self, result, all=False, raw=False): + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 profile matched', + result=[expected] + ), result) + + def make_update_command(self, updates): + return self.make_command('certprofile_mod', self.name, **updates) + + def check_update(self, result, extra_keys=()): + assert_deepequal(dict( + value=self.name, + summary=u'Modified Certificate Profile "{}"'.format(self.name), + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) diff --git a/ipatests/test_xmlrpc/tracker/group_plugin.py b/ipatests/test_xmlrpc/tracker/group_plugin.py new file mode 100644 index 000000000..c47ce8ecf --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/group_plugin.py @@ -0,0 +1,196 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid + +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.util import assert_deepequal, get_group_dn + + +class GroupTracker(Tracker): + """ Class for host plugin like tests """ + retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user', + u'member_group'} + retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'} + + create_keys = retrieve_all_keys + update_keys = retrieve_keys - {u'dn'} + + add_member_keys = retrieve_keys | {u'description'} + + def __init__(self, name): + super(GroupTracker, self).__init__(default_version=None) + self.cn = name + self.dn = get_group_dn(name) + + def make_create_command(self, nonposix=False, external=False, + force=True): + """ Make function that creates a group using 'group-add' """ + return self.make_command('group_add', self.cn, + nonposix=nonposix, external=external) + + def make_delete_command(self): + """ Make function that deletes a group using 'group-del' """ + return self.make_command('group_del', self.cn) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a group using 'group-show' """ + return self.make_command('group_show', self.cn, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that searches for a group using 'group-find' """ + return self.make_command('group_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates a group using 'group-mod' """ + return self.make_command('group_mod', self.cn, **updates) + + def make_add_member_command(self, options={}): + """ Make function that adds a member to a group + Attention: only works for one user OR group! """ + if u'user' in options: + self.attrs[u'member_user'] = [options[u'user']] + elif u'group' in options: + self.attrs[u'member_group'] = [options[u'group']] + self.adds = options + + return self.make_command('group_add_member', self.cn, **options) + + def make_remove_member_command(self, options={}): + """ Make function that removes a member from a group + Attention: only works for one user OR group! """ + if u'user' in options: + del self.attrs[u'member_user'] + elif u'group' in options: + del self.attrs[u'member_group'] + return self.make_command('group_remove_member', self.cn, **options) + + def make_detach_command(self): + """ Make function that detaches a managed group using + 'group-detach' """ + self.exists = True + return self.make_command('group_detach', self.cn) + + def track_create(self): + """ Updates expected state for group creation""" + self.attrs = dict( + dn=get_group_dn(self.cn), + cn=[self.cn], + gidnumber=[fuzzy_digits], + ipauniqueid=[fuzzy_uuid], + objectclass=objectclasses.posixgroup, + ) + self.exists = True + + def check_create(self, result): + """ Checks 'group_add' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Added group "%s"' % self.cn, + result=self.filter_attrs(self.create_keys) + ), result) + + def check_delete(self, result): + """ Checks 'group_del' command result """ + assert_deepequal(dict( + value=[self.cn], + summary=u'Deleted group "%s"' % self.cn, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """ Checks 'group_show' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + value=self.cn, + summary=None, + result=expected + ), result) + + def check_find(self, result, all=False, raw=False): + """ Checks 'group_find' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 group matched', + result=[expected], + ), result) + + def check_update(self, result, extra_keys={}): + """ Checks 'group_mod' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Modified group "%s"' % self.cn, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def check_add_member(self, result): + """ Checks 'group_add_member' command result """ + assert_deepequal(dict( + completed=1, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ), result) + + def check_add_member_negative(self, result): + """ Checks 'group_add_member' command result when expected result + is failure of the operation""" + if u'member_user' in self.attrs: + del self.attrs[u'member_user'] + elif u'member_group' in self.attrs: + del self.attrs[u'member_group'] + + expected = dict( + completed=0, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ) + if u'user' in self.adds: + expected[u'failed'][u'member'][u'user'] = [( + self.adds[u'user'], u'no such entry')] + elif u'group' in self.adds: + expected[u'failed'][u'member'][u'group'] = [( + self.adds[u'group'], u'no such entry')] + + assert_deepequal(expected, result) + + def check_remove_member(self, result): + """ Checks 'group_remove_member' command result """ + assert_deepequal(dict( + completed=1, + failed={u'member': {u'group': (), u'user': ()}}, + result=self.filter_attrs(self.add_member_keys) + ), result) + + def check_detach(self, result): + """ Checks 'group_detach' command result """ + assert_deepequal(dict( + value=self.cn, + summary=u'Detached group "%s" from user "%s"' % ( + self.cn, self.cn), + result=True + ), result) + + def make_fixture_detach(self, request): + """Make a pytest fixture for this tracker + + The fixture ensures the plugin entry does not exist before + and after the tests that use itself. + """ + def cleanup(): + pass + + request.addfinalizer(cleanup) + + return self diff --git a/ipatests/test_xmlrpc/tracker/host_plugin.py b/ipatests/test_xmlrpc/tracker/host_plugin.py new file mode 100644 index 000000000..bf199f4f5 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/host_plugin.py @@ -0,0 +1,154 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from __future__ import print_function + + +from ipapython.dn import DN +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_uuid +from ipatests.test_xmlrpc import objectclasses +from ipatests.util import assert_deepequal + + +class HostTracker(Tracker): + """Wraps and tracks modifications to a Host object + + Implements the helper functions for host plugin. + + The HostTracker object stores information about the host, e.g. + ``fqdn`` and ``dn``. + """ + retrieve_keys = { + 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host', + 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint', + 'serial_number', 'serial_number_hex', 'sha1_fingerprint', + 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before', + 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user', + 'ipaallowedtoperform_read_keys_group', + 'ipaallowedtoperform_read_keys_host', + 'ipaallowedtoperform_read_keys_hostgroup', + 'ipaallowedtoperform_write_keys_user', + 'ipaallowedtoperform_write_keys_group', + 'ipaallowedtoperform_write_keys_host', + 'ipaallowedtoperform_write_keys_hostgroup'} + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid', + u'managing_host', u'objectclass', u'serverhostname'} + create_keys = retrieve_keys | {'objectclass', 'ipauniqueid', + 'randompassword'} + update_keys = retrieve_keys - {'dn'} + managedby_keys = retrieve_keys - {'has_keytab', 'has_password'} + allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'} + + def __init__(self, name, fqdn=None, default_version=None): + super(HostTracker, self).__init__(default_version=default_version) + + self.shortname = name + if fqdn: + self.fqdn = fqdn + else: + self.fqdn = u'%s.%s' % (name, self.api.env.domain) + self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts', + self.api.env.basedn) + + self.description = u'Test host <%s>' % name + self.location = u'Undisclosed location <%s>' % name + + def make_create_command(self, force=True): + """Make function that creates this host using host_add""" + return self.make_command('host_add', self.fqdn, + description=self.description, + l=self.location, + force=force) + + def make_delete_command(self): + """Make function that deletes the host using host_del""" + return self.make_command('host_del', self.fqdn) + + def make_retrieve_command(self, all=False, raw=False): + """Make function that retrieves the host using host_show""" + return self.make_command('host_show', self.fqdn, all=all, raw=raw) + + def make_find_command(self, *args, **kwargs): + """Make function that finds hosts using host_find + + Note that the fqdn (or other search terms) needs to be specified + in arguments. + """ + return self.make_command('host_find', *args, **kwargs) + + def make_update_command(self, updates): + """Make function that modifies the host using host_mod""" + return self.make_command('host_mod', self.fqdn, **updates) + + def track_create(self): + """Update expected state for host creation""" + self.attrs = dict( + dn=self.dn, + fqdn=[self.fqdn], + description=[self.description], + l=[self.location], + krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)], + objectclass=objectclasses.host, + ipauniqueid=[fuzzy_uuid], + managedby_host=[self.fqdn], + has_keytab=False, + has_password=False, + cn=[self.fqdn], + ipakrbokasdelegate=False, + ipakrbrequirespreauth=True, + managing_host=[self.fqdn], + serverhostname=[self.shortname], + ) + self.exists = True + + def check_create(self, result): + """Check `host_add` command result""" + assert_deepequal(dict( + value=self.fqdn, + summary=u'Added host "%s"' % self.fqdn, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """Check `host_del` command result""" + assert_deepequal(dict( + value=[self.fqdn], + summary=u'Deleted host "%s"' % self.fqdn, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """Check `host_show` command result""" + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + assert_deepequal(dict( + value=self.fqdn, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """Check `host_find` command result""" + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 host matched', + result=[expected], + ), result) + + def check_update(self, result, extra_keys=()): + """Check `host_update` command result""" + assert_deepequal(dict( + value=self.fqdn, + summary=u'Modified host "%s"' % self.fqdn, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) diff --git a/ipatests/test_xmlrpc/tracker/stageuser_plugin.py b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py new file mode 100644 index 000000000..09ad282e4 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py @@ -0,0 +1,267 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +import six + +from ipalib import api, errors + +from ipatests.test_xmlrpc.tracker.base import Tracker +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.xmlrpc_test import ( + fuzzy_string, fuzzy_dergeneralizedtime, raises_exact) + +from ipatests.util import assert_deepequal, get_user_dn +from ipapython.dn import DN + +if six.PY3: + unicode = str + +sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X' + 'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI' + 'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2' + 'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm' + 'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01' + '9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF' + '0L public key test') +sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B ' + 'public key test (ssh-rsa)') + + +class StageUserTracker(Tracker): + """ Tracker class for staged user LDAP object + + Implements helper functions for host plugin. + StageUserTracker object stores information about the user. + """ + + retrieve_keys = { + u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell', + u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber', + u'title', u'memberof', u'nsaccountlock', u'memberofindirect', + u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink', + u'ipatokenradiususername', u'krbprincipalexpiration', + u'usercertificate', u'dn', u'has_keytab', u'has_password', + u'street', u'postalcode', u'facsimiletelephonenumber', + u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l', + u'st', u'mobile', u'pager', } + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipauniqueid', u'objectclass', u'description', + u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} + + create_keys = retrieve_all_keys | { + u'objectclass', u'ipauniqueid', u'randompassword', + u'userpassword', u'krbextradata', u'krblastpwdchange', + u'krbpasswordexpiration', u'krbprincipalkey'} + + update_keys = retrieve_keys - {u'dn', u'nsaccountlock'} + activate_keys = retrieve_keys | { + u'has_keytab', u'has_password', u'nsaccountlock'} + + def __init__(self, name, givenname, sn, **kwargs): + super(StageUserTracker, self).__init__(default_version=None) + self.uid = name + self.givenname = givenname + self.sn = sn + self.dn = DN( + ('uid', self.uid), api.env.container_stageuser, api.env.basedn) + + self.kwargs = kwargs + + def make_create_command(self, options=None, force=None): + """ Make function that creates a staged user using stageuser-add """ + if options is not None: + self.kwargs = options + return self.make_command('stageuser_add', self.uid, + givenname=self.givenname, + sn=self.sn, **self.kwargs) + + def make_delete_command(self): + """ Make function that deletes a staged user using stageuser-del """ + return self.make_command('stageuser_del', self.uid) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a staged user using stageuser-show """ + return self.make_command('stageuser_show', self.uid, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that finds staged user using stageuser-find """ + return self.make_command('stageuser_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates staged user using stageuser-mod """ + return self.make_command('stageuser_mod', self.uid, **updates) + + def make_activate_command(self): + """ Make function that activates staged user + using stageuser-activate """ + return self.make_command('stageuser_activate', self.uid) + + def track_create(self): + """ Update expected state for staged user creation """ + self.attrs = dict( + dn=self.dn, + uid=[self.uid], + givenname=[self.givenname], + sn=[self.sn], + homedirectory=[u'/home/%s' % self.uid], + displayname=[u'%s %s' % (self.givenname, self.sn)], + cn=[u'%s %s' % (self.givenname, self.sn)], + initials=[u'%s%s' % (self.givenname[0], self.sn[0])], + objectclass=objectclasses.user_base, + description=[u'__no_upg__'], + ipauniqueid=[u'autogenerate'], + uidnumber=[u'-1'], + gidnumber=[u'-1'], + krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], + mail=[u'%s@%s' % (self.uid, self.api.env.domain)], + gecos=[u'%s %s' % (self.givenname, self.sn)], + loginshell=[u'/bin/sh'], + has_keytab=False, + has_password=False, + nsaccountlock=[u'true'], + ) + + for key in self.kwargs: + if key == u'krbprincipalname': + self.attrs[key] = [u'%s@%s' % ( + (self.kwargs[key].split('@'))[0].lower(), + (self.kwargs[key].split('@'))[1])] + elif key == u'manager': + self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))] + elif key == u'ipasshpubkey': + self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp] + self.attrs[key] = [self.kwargs[key]] + elif key == u'random' or key == u'userpassword': + self.attrs[u'krbextradata'] = [fuzzy_string] + self.attrs[u'krbpasswordexpiration'] = [ + fuzzy_dergeneralizedtime] + self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime] + self.attrs[u'krbprincipalkey'] = [fuzzy_string] + self.attrs[u'userpassword'] = [fuzzy_string] + self.attrs[u'has_keytab'] = True + self.attrs[u'has_password'] = True + if key == u'random': + self.attrs[u'randompassword'] = fuzzy_string + else: + self.attrs[key] = [self.kwargs[key]] + + self.exists = True + + def check_create(self, result): + """ Check 'stageuser-add' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Added stage user "%s"' % self.uid, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """ Check 'stageuser-del' command result """ + assert_deepequal(dict( + value=[self.uid], + summary=u'Deleted stage user "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False, raw=False): + """ Check 'stageuser-show' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different + # type of nsaccountlock value than DS, but overall the value + # fits expected result + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + value=self.uid, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """ Check 'stageuser-find' command result """ + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different + # type of nsaccountlock value than DS, but overall the value + # fits expected result + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 user matched', + result=[expected], + ), result) + + def check_find_nomatch(self, result): + """ Check 'stageuser-find' command result when no match is expected """ + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 users matched', + result=[], + ), result) + + def check_update(self, result, extra_keys=()): + """ Check 'stageuser-mod' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Modified stage user "%s"' % self.uid, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def check_restore_preserved(self, result): + assert_deepequal(dict( + value=[self.uid], + summary=u'Staged user account "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def make_fixture_activate(self, request): + """Make a pytest fixture for a staged user that is to be activated + + The fixture ensures the plugin entry does not exist before + and after the tests that use it. It takes into account + that the staged user no longer exists after activation, + therefore the fixture verifies after the tests + that the staged user doesn't exist instead of deleting it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass + + def finish(): + with raises_exact(errors.NotFound( + reason=u'%s: stage user not found' % self.uid)): + del_command() + + request.addfinalizer(finish) + + return self + + def create_from_preserved(self, user): + """ Copies values from preserved user - helper function for + restoration tests """ + self.attrs = user.attrs + self.uid = user.uid + self.givenname = user.givenname + self.sn = user.sn + self.dn = DN( + ('uid', self.uid), api.env.container_stageuser, api.env.basedn) + self.attrs[u'dn'] = self.dn diff --git a/ipatests/test_xmlrpc/tracker/user_plugin.py b/ipatests/test_xmlrpc/tracker/user_plugin.py new file mode 100644 index 000000000..af7d85836 --- /dev/null +++ b/ipatests/test_xmlrpc/tracker/user_plugin.py @@ -0,0 +1,340 @@ +# +# Copyright (C) 2015 FreeIPA Contributors see COPYING for license +# + +from ipalib import api, errors +from ipapython.dn import DN + +from ipatests.util import assert_deepequal, get_group_dn +from ipatests.test_xmlrpc import objectclasses +from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid, raises_exact +from ipatests.test_xmlrpc.tracker.base import Tracker + + +class UserTracker(Tracker): + """ Class for host plugin like tests """ + + retrieve_keys = { + u'uid', u'givenname', u'sn', u'homedirectory', + u'loginshell', u'uidnumber', u'gidnumber', u'mail', u'ou', + u'telephonenumber', u'title', u'memberof', + u'memberofindirect', u'ipauserauthtype', u'userclass', + u'ipatokenradiusconfiglink', u'ipatokenradiususername', + u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab', + u'has_password', u'street', u'postalcode', u'facsimiletelephonenumber', + u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock', + u'preserved', u'memberof_group', u'l', u'mobile', u'krbextradata', + u'krblastpwdchange', u'krbpasswordexpiration', u'pager', u'st' + } + + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry', + u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'} + + retrieve_preserved_keys = retrieve_keys - {u'memberof_group'} + retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'} + + create_keys = retrieve_all_keys | { + u'randompassword', u'mepmanagedentry', + u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange', + u'krbprincipalkey', u'randompassword', u'userpassword' + } + update_keys = retrieve_keys - {u'dn'} + activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password', + u'nsaccountlock', u'sshpubkeyfp'} + + find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'} + find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'} + + def __init__(self, name, givenname, sn, **kwargs): + super(UserTracker, self).__init__(default_version=None) + self.uid = name + self.givenname = givenname + self.sn = sn + self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn) + + self.kwargs = kwargs + + def make_create_command(self, force=None): + """ Make function that crates a user using user-add """ + return self.make_command( + 'user_add', self.uid, + givenname=self.givenname, + sn=self.sn, **self.kwargs + ) + + def make_delete_command(self, no_preserve=True, preserve=False): + """ Make function that deletes a user using user-del """ + + if preserve and not no_preserve: + # necessary to change some user attributes due to moving + # to different container + self.attrs[u'dn'] = DN( + ('uid', self.uid), + api.env.container_deleteuser, + api.env.basedn + ) + self.attrs[u'objectclass'] = objectclasses.user_base + + return self.make_command( + 'user_del', self.uid, + no_preserve=no_preserve, + preserve=preserve + ) + + def make_retrieve_command(self, all=False, raw=False): + """ Make function that retrieves a user using user-show """ + return self.make_command('user_show', self.uid, all=all) + + def make_find_command(self, *args, **kwargs): + """ Make function that finds user using user-find """ + return self.make_command('user_find', *args, **kwargs) + + def make_update_command(self, updates): + """ Make function that updates user using user-mod """ + return self.make_command('user_mod', self.uid, **updates) + + def make_undelete_command(self): + """ Make function that activates preserved user using user-undel """ + return self.make_command('user_undel', self.uid) + + def make_enable_command(self): + """ Make function that enables user using user-enable """ + return self.make_command('user_enable', self.uid) + + def make_stage_command(self): + """ Make function that restores preserved user by moving it to + staged container """ + return self.make_command('user_stage', self.uid) + + def track_create(self): + """ Update expected state for user creation """ + self.attrs = dict( + dn=self.dn, + uid=[self.uid], + givenname=[self.givenname], + sn=[self.sn], + homedirectory=[u'/home/%s' % self.uid], + displayname=[u'%s %s' % (self.givenname, self.sn)], + cn=[u'%s %s' % (self.givenname, self.sn)], + initials=[u'%s%s' % (self.givenname[0], self.sn[0])], + objectclass=objectclasses.user, + description=[u'__no_upg__'], + ipauniqueid=[fuzzy_uuid], + uidnumber=[fuzzy_digits], + gidnumber=[fuzzy_digits], + krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)], + mail=[u'%s@%s' % (self.uid, self.api.env.domain)], + gecos=[u'%s %s' % (self.givenname, self.sn)], + loginshell=[u'/bin/sh'], + has_keytab=False, + has_password=False, + mepmanagedentry=[get_group_dn(self.uid)], + memberof_group=[u'ipausers'], + ) + + for key in self.kwargs: + if key == u'krbprincipalname': + self.attrs[key] = [u'%s@%s' % ( + (self.kwargs[key].split('@'))[0].lower(), + (self.kwargs[key].split('@'))[1] + )] + else: + self.attrs[key] = [self.kwargs[key]] + + self.exists = True + + def check_create(self, result): + """ Check 'user-add' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Added user "%s"' % self.uid, + result=self.filter_attrs(self.create_keys), + ), result) + + def check_delete(self, result): + """ Check 'user-del' command result """ + assert_deepequal(dict( + value=[self.uid], + summary=u'Deleted user "%s"' % self.uid, + result=dict(failed=[]), + ), result) + + def check_retrieve(self, result, all=False): + """ Check 'user-show' command result """ + + if u'preserved' in self.attrs and self.attrs[u'preserved']: + self.retrieve_all_keys = self.retrieve_preserved_all_keys + self.retrieve_keys = self.retrieve_preserved_keys + elif u'preserved' not in self.attrs and all: + self.attrs[u'preserved'] = False + + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) + + # small override because stageuser-find returns different type + # of nsaccountlock value than DS, but overall the value fits + # expected result + if u'nsaccountlock' in expected: + if expected[u'nsaccountlock'] == [u'true']: + expected[u'nsaccountlock'] = True + elif expected[u'nsaccountlock'] == [u'false']: + expected[u'nsaccountlock'] = False + + assert_deepequal(dict( + value=self.uid, + summary=None, + result=expected, + ), result) + + def check_find(self, result, all=False, raw=False): + """ Check 'user-find' command result """ + self.attrs[u'nsaccountlock'] = True + self.attrs[u'preserved'] = True + + if all: + expected = self.filter_attrs(self.find_all_keys) + else: + expected = self.filter_attrs(self.find_keys) + + assert_deepequal(dict( + count=1, + truncated=False, + summary=u'1 user matched', + result=[expected], + ), result) + + def check_find_nomatch(self, result): + """ Check 'user-find' command result when no user should be found """ + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 users matched', + result=[], + ), result) + + def check_update(self, result, extra_keys=()): + """ Check 'user-mod' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Modified user "%s"' % self.uid, + result=self.filter_attrs(self.update_keys | set(extra_keys)) + ), result) + + def create_from_staged(self, stageduser): + """ Copies attributes from staged user - helper function for + activation tests """ + self.attrs = stageduser.attrs + self.uid = stageduser.uid + self.givenname = stageduser.givenname + self.sn = stageduser.sn + + self.attrs[u'mepmanagedentry'] = None + self.attrs[u'dn'] = self.dn + self.attrs[u'ipauniqueid'] = [fuzzy_uuid] + self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % ( + api.env.container_group, api.env.basedn + )] + self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % ( + self.uid, api.env.container_group, api.env.basedn + )] + self.attrs[u'objectclass'] = objectclasses.user + if self.attrs[u'gidnumber'] == [u'-1']: + self.attrs[u'gidnumber'] = [fuzzy_digits] + if self.attrs[u'uidnumber'] == [u'-1']: + self.attrs[u'uidnumber'] = [fuzzy_digits] + + if u'ipasshpubkey' in self.kwargs: + self.attrs[u'ipasshpubkey'] = [str( + self.kwargs[u'ipasshpubkey'] + )] + + def check_activate(self, result): + """ Check 'stageuser-activate' command result """ + expected = dict( + value=self.uid, + summary=u'Stage user %s activated' % self.uid, + result=self.filter_attrs(self.activate_keys)) + + # work around to eliminate inconsistency in returned objectclass + # (case sensitive assertion) + expected['result']['objectclass'] = [item.lower() for item in + expected['result']['objectclass']] + result['result']['objectclass'] = [item.lower() for item in + result['result']['objectclass']] + + assert_deepequal(expected, result) + + self.exists = True + + def check_undel(self, result): + """ Check 'user-undel' command result """ + assert_deepequal(dict( + value=self.uid, + summary=u'Undeleted user account "%s"' % self.uid, + result=True + ), result) + + def track_delete(self, preserve=False): + """Update expected state for host deletion""" + if preserve: + self.exists = True + if u'memberof_group' in self.attrs: + del self.attrs[u'memberof_group'] + self.attrs[u'nsaccountlock'] = True + self.attrs[u'preserved'] = True + else: + self.exists = False + self.attrs = {} + + def make_preserved_user(self): + """ 'Creates' a preserved user necessary for some tests """ + self.ensure_exists() + self.track_delete(preserve=True) + command = self.make_delete_command(no_preserve=False, preserve=True) + result = command() + self.check_delete(result) + + def check_attr_preservation(self, expected): + """ Verifies that ipaUniqueID, uidNumber and gidNumber are + preserved upon reactivation. Also verifies that resulting + active user is a member of ipausers group only.""" + command = self.make_retrieve_command(all=True) + result = command() + + assert_deepequal(dict( + ipauniqueid=result[u'result'][u'ipauniqueid'], + uidnumber=result[u'result'][u'uidnumber'], + gidnumber=result[u'result'][u'gidnumber'] + ), expected) + + if (u'memberof_group' not in result[u'result'] or + result[u'result'][u'memberof_group'] != (u'ipausers',)): + assert False + + def make_fixture_restore(self, request): + """Make a pytest fixture for a preserved user that is to be moved to + staged area. + + The fixture ensures the plugin entry does not exist before + and after the tests that use it. It takes into account + that the preserved user no longer exists after restoring it, + therefore the fixture verifies after the tests + that the preserved user doesn't exist instead of deleting it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass + + def finish(): + with raises_exact(errors.NotFound( + reason=u'%s: user not found' % self.uid)): + del_command() + + request.addfinalizer(finish) + + return self |
