summaryrefslogtreecommitdiffstats
path: root/ipatests
diff options
context:
space:
mode:
authorMilan KubĂ­k <mkubik@redhat.com>2015-05-25 16:05:46 +0200
committerPetr Vobornik <pvoborni@redhat.com>2015-06-05 19:06:20 +0200
commitd25a45a9f99aa5d841f47baa0332f49223ecffca (patch)
tree2b19f88766d0b51d2139a9a9d82c38229675adce /ipatests
parent0138595f581d4b8ed76622d067e8dba0c29fd62f (diff)
downloadfreeipa-d25a45a9f99aa5d841f47baa0332f49223ecffca.tar.gz
freeipa-d25a45a9f99aa5d841f47baa0332f49223ecffca.tar.xz
freeipa-d25a45a9f99aa5d841f47baa0332f49223ecffca.zip
Abstract the HostTracker class from host plugin test
Implements a base class to help test LDAP based plugins. The class has been decoupled from the original host plugin test and moved to separate module ipatests.test_xmlrpc.ldaptracker. https://fedorahosted.org/freeipa/ticket/5032 Reviewed-By: David Kupka <dkupka@redhat.com>
Diffstat (limited to 'ipatests')
-rw-r--r--ipatests/test_xmlrpc/ldaptracker.py287
-rw-r--r--ipatests/test_xmlrpc/test_host_plugin.py155
2 files changed, 292 insertions, 150 deletions
diff --git a/ipatests/test_xmlrpc/ldaptracker.py b/ipatests/test_xmlrpc/ldaptracker.py
new file mode 100644
index 000000000..974124404
--- /dev/null
+++ b/ipatests/test_xmlrpc/ldaptracker.py
@@ -0,0 +1,287 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+"""
+Implements a base class to track changes to an LDAP object.
+"""
+
+import functools
+
+from ipalib import api, errors
+from ipapython.dn import DN
+from ipapython.version import API_VERSION
+
+
+class Tracker(object):
+ """Wraps and tracks modifications to a plugin LDAP entry object
+
+ Stores a copy of state of a plugin entry object and allows checking that
+ the state in the database is the same as expected.
+ This allows creating independent tests: the individual tests check
+ that the relevant changes have been made. At the same time
+ the entry doesn't need to be recreated and cleaned up for each test.
+
+ Two attributes are used for tracking: ``exists`` (true if the entry is
+ supposed to exist) and ``attrs`` (a dict of LDAP attributes that are
+ expected to be returned from IPA commands).
+
+ For commonly used operations, there is a helper method, e.g.
+ ``create``, ``update``, or ``find``, that does these steps:
+
+ * ensure the entry exists (or does not exist, for "create")
+ * store the expected modifications
+ * get the IPA command to run, and run it
+ * check that the result matches the expected state
+
+ Tests that require customization of these steps are expected to do them
+ manually, using lower-level methods.
+ Especially the first step (ensure the entry exists) is important for
+ achieving independent tests.
+
+ The Tracker object also stores information about the entry, e.g.
+ ``dn``, ``rdn`` and ``name`` which is derived from DN property.
+
+ To use this class, the programer must subclass it and provide the
+ implementation of following methods:
+
+ * make_*_command -- implementing the API call for particular plugin
+ and operation (add, delete, ...)
+ These methods should use the make_command method
+ * check_* commands -- an assertion for a plugin command (CRUD)
+ * track_create -- to make an internal representation of the
+ entry
+
+ Apart from overriding these methods, the subclass must provide the
+ distinguished name of the entry in `self.dn` property.
+
+ It is also required to override the class variables defining the sets
+ of ldap attributes/keys for these operations specific to the plugin
+ being implemented. Take the host plugin test for an example.
+
+ The implementation of these methods is not strictly enforced.
+ A missing method will cause a NotImplementedError during runtime
+ as a result.
+ """
+ retrieve_keys = None
+ retrieve_all_keys = None
+ create_keys = None
+ update_keys = None
+ managedby_keys = None
+ allowedto_keys = None
+
+ _override_me_msg = "This method needs to be overriden in a subclass"
+
+ def __init__(self, default_version=None):
+ self.api = api
+ self.default_version = default_version or API_VERSION
+ self._dn = None
+
+ self.exists = False
+
+ @property
+ def dn(self):
+ """A property containing the distinguished name of the entry."""
+ if not self._dn:
+ raise ValueError('The DN must be set in the init method.')
+ return self._dn
+
+ @dn.setter
+ def dn(self, value):
+ if not isinstance(value, DN):
+ raise ValueError('The value must be an instance of DN.')
+ self._dn = value
+
+ @property
+ def rdn(self):
+ return self.dn[0]
+
+ @property
+ def name(self):
+ """Property holding the name of the entry in LDAP.
+
+ This property is computed in runtime.
+ """
+ return self.rdn.value
+
+ def filter_attrs(self, keys):
+ """Return a dict of expected attrs, filtered by the given keys"""
+ if not self.attrs:
+ raise RuntimeError('The tracker instance has no attributes.')
+ return {k: v for k, v in self.attrs.items() if k in keys}
+
+ def run_command(self, name, *args, **options):
+ """Run the given IPA command
+
+ Logs the command using print for easier debugging
+ """
+ cmd = self.api.Command[name]
+
+ options.setdefault('version', self.default_version)
+
+ args_repr = ', '.join(
+ [repr(a) for a in args] +
+ ['%s=%r' % item for item in options.items()])
+ try:
+ result = cmd(*args, **options)
+ except Exception as e:
+ print 'Ran command: %s(%s): %s: %s' % (cmd, args_repr,
+ type(e).__name__, e)
+ raise
+ else:
+ print 'Ran command: %s(%s): OK' % (cmd, args_repr)
+ return result
+
+ def make_command(self, name, *args, **options):
+ """Make a functools.partial function to run the given command"""
+ return functools.partial(self.run_command, name, *args, **options)
+
+ def make_fixture(self, request):
+ """Make a pytest fixture for this tracker
+
+ The fixture ensures the plugin entry does not exist before
+ and after the tests that use it.
+ """
+ del_command = self.make_delete_command()
+ try:
+ del_command()
+ except errors.NotFound:
+ pass
+
+ def cleanup():
+ existed = self.exists
+ try:
+ del_command()
+ except errors.NotFound:
+ if existed:
+ raise
+ self.exists = False
+
+ request.addfinalizer(cleanup)
+
+ return self
+
+ def ensure_exists(self):
+ """If the entry does not exist (according to tracker state), create it
+ """
+ if not self.exists:
+ self.create(force=True)
+
+ def ensure_missing(self):
+ """If the entry exists (according to tracker state), delete it
+ """
+ if self.exists:
+ self.delete()
+
+ def make_create_command(self, force=True):
+ """Make function that creates the plugin entry object."""
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_delete_command(self):
+ """Make function that deletes the plugin entry object."""
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_retrieve_command(self, all=False, raw=False):
+ """Make function that retrieves the entry using ${CMD}_show"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_find_command(self, *args, **kwargs):
+ """Make function that finds the entry using ${CMD}_find
+
+ Note that the name (or other search terms) needs to be specified
+ in arguments.
+ """
+ raise NotImplementedError(self._override_me_msg)
+
+ def make_update_command(self, updates):
+ """Make function that modifies the entry using ${CMD}_mod"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def create(self, force=True):
+ """Helper function to create an entry and check the result"""
+ self.ensure_missing()
+ self.track_create()
+ command = self.make_create_command(force=force)
+ result = command()
+ self.check_create(result)
+
+ def track_create(self):
+ """Update expected state for host creation
+
+ The method should look similar to the following
+ example of host plugin.
+
+ self.attrs = dict(
+ dn=self.dn,
+ fqdn=[self.fqdn],
+ description=[self.description],
+ ... # all required attributes
+ )
+ self.exists = True
+ """
+ raise NotImplementedError(self._override_me_msg)
+
+ def check_create(self, result):
+ """Check plugin's add command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def delete(self):
+ """Helper function to delete a host and check the result"""
+ self.ensure_exists()
+ self.track_delete()
+ command = self.make_delete_command()
+ result = command()
+ self.check_delete(result)
+
+ def track_delete(self):
+ """Update expected state for host deletion"""
+ self.exists = False
+ self.attrs = {}
+
+ def check_delete(self, result):
+ """Check plugin's `del` command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def retrieve(self, all=False, raw=False):
+ """Helper function to retrieve an entry and check the result"""
+ self.ensure_exists()
+ command = self.make_retrieve_command(all=all, raw=raw)
+ result = command()
+ self.check_retrieve(result, all=all, raw=raw)
+
+ def check_retrieve(self, result, all=False, raw=False):
+ """Check the plugin's `show` command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def find(self, all=False, raw=False):
+ """Helper function to search for this hosts and check the result"""
+ self.ensure_exists()
+ command = self.make_find_command(self.name, all=all, raw=raw)
+ result = command()
+ self.check_find(result, all=all, raw=raw)
+
+ def check_find(self, result, all=False, raw=False):
+ """Check the plugin's `find` command result"""
+ raise NotImplementedError(self._override_me_msg)
+
+ def update(self, updates, expected_updates=None):
+ """Helper function to update this hosts and check the result
+
+ The ``updates`` are used as options to the *_mod command,
+ and the self.attrs is updated with this dict.
+ Additionally, self.attrs is updated with ``expected_updates``.
+ """
+ if expected_updates is None:
+ expected_updates = {}
+
+ self.ensure_exists()
+ command = self.make_update_command(updates)
+ result = command()
+ self.attrs.update(updates)
+ self.attrs.update(expected_updates)
+ self.check_update(result, extra_keys=set(updates.keys()) |
+ set(expected_updates.keys()))
+
+ def check_update(self, result, extra_keys=()):
+ """Check the plugin's `find` command result"""
+ raise NotImplementedError(self._override_me_msg)
diff --git a/ipatests/test_xmlrpc/test_host_plugin.py b/ipatests/test_xmlrpc/test_host_plugin.py
index 2b256772e..e46e502d2 100644
--- a/ipatests/test_xmlrpc/test_host_plugin.py
+++ b/ipatests/test_xmlrpc/test_host_plugin.py
@@ -26,7 +26,6 @@ Test the `ipalib.plugins.host` module.
import os
import tempfile
import base64
-import functools
import pytest
@@ -34,6 +33,7 @@ from ipapython import ipautil
from ipalib import api, errors, x509
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
+from ipatests.test_xmlrpc.ldaptracker import Tracker
from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test,
fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer,
fuzzy_hex, raises_exact)
@@ -41,7 +41,6 @@ from ipatests.test_xmlrpc.test_user_plugin import get_group_dn
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.testcert import get_testcert
from ipatests.util import assert_deepequal
-from ipapython.version import API_VERSION
# Constants DNS integration tests
# TODO: Use tracker fixtures for zones/records/users/groups
@@ -96,33 +95,12 @@ hostgroup1_dn = DN(('cn',hostgroup1),('cn','hostgroups'),('cn','accounts'),
api.env.basedn)
-class HostTracker(object):
+class HostTracker(Tracker):
"""Wraps and tracks modifications to a Host object
- Stores a copy of state of a Host object, and allows checking that
- the state in the database is the same as expected.
- This allows creating independent tests: the individual tests check
- that the relevant changes have been made. At the same time
- the Host doesn't heet to be recreated and cleaned up for each test.
+ Implements the helper functions for host plugin.
- Two attributes are used for tracking: ``exists`` (true if the Host is
- supposed to exist) and ``attrs`` (a dict of LDAP attributes that are
- expected to be returned from IPA commands).
-
- For commonly used operations, there is a helper method, e.g.
- ``create``, ``update``, or ``find``, that does these steps:
-
- * ensure the Host exists (or does not exist, for "create")
- * store the expected modifications
- * get the IPA command to run, and run it
- * check that the result matches the expected state
-
- Tests that require customization of these steps are expected to do them
- manually, using lower-level methods.
- Especially the first step (ensure the Host exists) is important for
- achieving independent tests.
-
- The HostTracker object also stores information about the host, e.g.
+ The HostTracker object stores information about the host, e.g.
``fqdn`` and ``dn``.
"""
retrieve_keys = {
@@ -148,8 +126,7 @@ class HostTracker(object):
allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
def __init__(self, name, fqdn=None, default_version=None):
- self.api = api
- self.default_version = default_version or API_VERSION
+ super(HostTracker, self).__init__(default_version=default_version)
self.shortname = name
if fqdn:
@@ -162,75 +139,6 @@ class HostTracker(object):
self.description = u'Test host <%s>' % name
self.location = u'Undisclosed location <%s>' % name
- self.exists = False
-
- def filter_attrs(self, keys):
- """Return a dict of expected attrs, filtered by the given keys"""
- return {k: v for k, v in self.attrs.items() if k in keys}
-
- def run_command(self, name, *args, **options):
- """Run the given IPA command
-
- Logs the command using print for easier debugging
- """
- cmd = self.api.Command[name]
-
- options.setdefault('version', self.default_version)
-
- args_repr = ', '.join(
- [repr(a) for a in args] +
- ['%s=%r' % item for item in options.items()])
- try:
- result = cmd(*args, **options)
- except Exception as e:
- print 'Ran command: %s(%s): %s: %s' % (cmd, args_repr,
- type(e).__name__, e)
- raise
- else:
- print 'Ran command: %s(%s): OK' % (cmd, args_repr)
- return result
-
- def make_command(self, name, *args, **options):
- """Make a functools.partial function to run the given command"""
- return functools.partial(self.run_command, name, *args, **options)
-
- def make_fixture(self, request):
- """Make a pytest fixture for this tracker
-
- The fixture ensures the host does not exist before and after the tests
- that use it.
- """
- del_command = self.make_delete_command()
- try:
- del_command()
- except errors.NotFound:
- pass
-
- def cleanup():
- existed = self.exists
- try:
- del_command()
- except errors.NotFound:
- if existed:
- raise
- self.exists = False
-
- request.addfinalizer(cleanup)
-
- return self
-
- def ensure_exists(self):
- """If the host does not exist (according to tracker state), create it
- """
- if not self.exists:
- self.create(force=True)
-
- def ensure_missing(self):
- """If the host exists (according to tracker state), delete it
- """
- if self.exists:
- self.delete()
-
def make_create_command(self, force=True):
"""Make function that creates this host using host_add"""
return self.make_command('host_add', self.fqdn,
@@ -258,14 +166,6 @@ class HostTracker(object):
"""Make function that modifies the host using host_mod"""
return self.make_command('host_mod', self.fqdn, **updates)
- def create(self, force=True):
- """Helper function to create a host and check the result"""
- self.ensure_missing()
- self.track_create()
- command = self.make_create_command(force=force)
- result = command()
- self.check_create(result)
-
def track_create(self):
"""Update expected state for host creation"""
self.attrs = dict(
@@ -295,19 +195,6 @@ class HostTracker(object):
result=self.filter_attrs(self.create_keys),
), result)
- def delete(self):
- """Helper function to delete a host and check the result"""
- self.ensure_exists()
- self.track_delete()
- command = self.make_delete_command()
- result = command()
- self.check_delete(result)
-
- def track_delete(self):
- """Update expected state for host deletion"""
- self.exists = False
- self.attrs = {}
-
def check_delete(self, result):
"""Check `host_del` command result"""
assert_deepequal(dict(
@@ -316,13 +203,6 @@ class HostTracker(object):
result=dict(failed=[]),
), result)
- def retrieve(self, all=False, raw=False):
- """Helper function to retrieve a host and check the result"""
- self.ensure_exists()
- command = self.make_retrieve_command(all=all, raw=raw)
- result = command()
- self.check_retrieve(result, all=all, raw=raw)
-
def check_retrieve(self, result, all=False, raw=False):
"""Check `host_show` command result"""
if all:
@@ -335,13 +215,6 @@ class HostTracker(object):
result=expected,
), result)
- def find(self, all=False, raw=False):
- """Helper function to search for this hosts and check the result"""
- self.ensure_exists()
- command = self.make_find_command(self.fqdn, all=all, raw=raw)
- result = command()
- self.check_find(result, all=all, raw=raw)
-
def check_find(self, result, all=False, raw=False):
"""Check `host_find` command result"""
if all:
@@ -355,24 +228,6 @@ class HostTracker(object):
result=[expected],
), result)
- def update(self, updates, expected_updates=None):
- """Helper function to update this hosts and check the result
-
- The ``updates`` are used as options to the *_mod command,
- and the self.attrs is updated with this dict.
- Additionally, self.attrs is updated with ``expected_updates``.
- """
- if expected_updates is None:
- expected_updates = {}
-
- self.ensure_exists()
- command = self.make_update_command(updates)
- result = command()
- self.attrs.update(updates)
- self.attrs.update(expected_updates)
- self.check_update(result, extra_keys=set(updates.keys()) |
- set(expected_updates.keys()))
-
def check_update(self, result, extra_keys=()):
"""Check `host_update` command result"""
assert_deepequal(dict(