summaryrefslogtreecommitdiffstats
path: root/tests/test_xmlrpc/xmlrpc_test.py
diff options
context:
space:
mode:
authorJason Gerard DeRose <jderose@redhat.com>2009-12-09 09:09:53 -0700
committerJason Gerard DeRose <jderose@redhat.com>2009-12-10 08:29:15 -0700
commitb6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7 (patch)
tree7e5329a51af169ce34a7d275a1bbd63c1e31c026 /tests/test_xmlrpc/xmlrpc_test.py
parentd08b8858ddc3bf6265f6ea8acae6661b9fff5112 (diff)
downloadfreeipa-b6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7.tar.gz
freeipa-b6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7.tar.xz
freeipa-b6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7.zip
Take 2: Extensible return values and validation; steps toward a single output_for_cli(); enable more webUI stuff
Diffstat (limited to 'tests/test_xmlrpc/xmlrpc_test.py')
-rw-r--r--tests/test_xmlrpc/xmlrpc_test.py213
1 files changed, 199 insertions, 14 deletions
diff --git a/tests/test_xmlrpc/xmlrpc_test.py b/tests/test_xmlrpc/xmlrpc_test.py
index 9c41d053f..a51a82bb3 100644
--- a/tests/test_xmlrpc/xmlrpc_test.py
+++ b/tests/test_xmlrpc/xmlrpc_test.py
@@ -24,18 +24,54 @@ Base class for all XML-RPC tests
import sys
import socket
import nose
+from tests.util import assert_deepequal
from ipalib import api, request
from ipalib import errors
-def assert_attr_equal(entry_attrs, attr, value):
- assert value in entry_attrs.get(attr, [])
+try:
+ if not api.Backend.xmlclient.isconnected():
+ api.Backend.xmlclient.connect()
+ res = api.Command['user_show'](u'notfound')
+except errors.NetworkError:
+ server_available = False
+except errors.NotFound:
+ server_available = True
-def assert_is_member(entry_attrs, value, member_attr='member'):
- for m in entry_attrs[member_attr]:
- if m.startswith(value):
+
+
+def assert_attr_equal(entry, key, value):
+ if type(entry) is not dict:
+ raise AssertionError(
+ 'assert_attr_equal: entry must be a %r; got a %r: %r' % (
+ dict, type(entry), entry)
+ )
+ if key not in entry:
+ raise AssertionError(
+ 'assert_attr_equal: entry has no key %r: %r' % (key, entry)
+ )
+ if value not in entry[key]:
+ raise AssertionError(
+ 'assert_attr_equal: %r: %r not in %r' % (key, value, entry[key])
+ )
+
+
+def assert_is_member(entry, value, key='member'):
+ if type(entry) is not dict:
+ raise AssertionError(
+ 'assert_is_member: entry must be a %r; got a %r: %r' % (
+ dict, type(entry), entry)
+ )
+ if key not in entry:
+ raise AssertionError(
+ 'assert_is_member: entry has no key %r: %r' % (key, entry)
+ )
+ for member in entry[key]:
+ if member.startswith(value):
return
- assert False
+ raise AssertionError(
+ 'assert_is_member: %r: %r not in %r' % (key, value, entry[key])
+ )
# Initialize the API. We do this here so that one can run the tests
@@ -49,14 +85,12 @@ class XMLRPC_test(object):
"""
def setUp(self):
- try:
- if not api.Backend.xmlclient.isconnected():
- api.Backend.xmlclient.connect()
- res = api.Command['user_show'](u'notfound')
- except errors.NetworkError:
- raise nose.SkipTest()
- except errors.NotFound:
- pass
+ if not server_available:
+ raise nose.SkipTest(
+ 'Server not available: %r' % api.env.xmlrpc_uri
+ )
+ if not api.Backend.xmlclient.isconnected():
+ api.Backend.xmlclient.connect()
def tearDown(self):
"""
@@ -64,3 +98,154 @@ class XMLRPC_test(object):
"""
request.destroy_context()
+ def failsafe_add(self, obj, pk, **options):
+ """
+ Delete possible leftover entry first, then add.
+
+ This helps speed us up when a partial test failure has left LDAP in a
+ dirty state.
+
+ :param obj: An Object like api.Object.user
+ :param pk: The primary key of the entry to be created
+ :param options: Kwargs to be passed to obj.add()
+ """
+ try:
+ obj.methods['del'](pk)
+ except errors.NotFound:
+ pass
+ return obj.methods['add'](pk, **options)
+
+
+IGNORE = """Command %r is missing attribute %r in output entry.
+ args = %r
+ options = %r
+ entry = %r"""
+
+
+EXPECTED = """Expected %r to raise %s.
+ args = %r
+ options = %r
+ output = %r"""
+
+
+UNEXPECTED = """Expected %r to raise %s, but caught different.
+ args = %r
+ options = %r
+ %s: %s"""
+
+
+KWARGS = """Command %r raised %s with wrong kwargs.
+ args = %r
+ options = %r
+ kw_expected = %r
+ kw_got = %r"""
+
+
+class Declarative(XMLRPC_test):
+ cleanup_commands = tuple()
+ tests = tuple()
+
+ def cleanup_generate(self, stage):
+ for command in self.cleanup_commands:
+ func = lambda: self.cleanup(command)
+ func.description = '%s %s-cleanup: %r' % (
+ self.__class__.__name__, stage, command
+ )
+ yield (func,)
+
+ def cleanup(self, command):
+ (cmd, args, options) = command
+ if cmd not in api.Command:
+ raise nose.SkipTest(
+ 'cleanup command %r not in api.Command' % cmd
+ )
+ try:
+ api.Command[cmd](*args, **options)
+ except errors.NotFound:
+ pass
+
+ def test_generator(self):
+ """
+ Iterate through tests.
+
+ nose reports each one as a seperate test.
+ """
+
+ # Iterate through pre-cleanup:
+ for tup in self.cleanup_generate('pre'):
+ yield tup
+
+ # Iterate through the tests:
+ name = self.__class__.__name__
+ for (i, test) in enumerate(self.tests):
+ nice = '%s[%d]: %s: %s' % (
+ name, i, test['command'][0], test.get('desc', '')
+ )
+ func = lambda: self.check(nice, test)
+ func.description = nice
+ yield (func,)
+
+ # Iterate through post-cleanup:
+ for tup in self.cleanup_generate('post'):
+ yield tup
+
+ def check(self, nice, test):
+ (cmd, args, options) = test['command']
+ if cmd not in api.Command:
+ raise nose.SkipTest('%r not in api.Command' % cmd)
+ expected = test['expected']
+ ignore_values = test.get('ignore_values')
+ if isinstance(expected, errors.PublicError):
+ self.check_exception(nice, cmd, args, options, expected)
+ else:
+ self.check_output(nice, cmd, args, options, expected, ignore_values)
+
+ def check_exception(self, nice, cmd, args, options, expected):
+ klass = expected.__class__
+ name = klass.__name__
+ try:
+ output = api.Command[cmd](*args, **options)
+ except StandardError, e:
+ pass
+ else:
+ raise AssertionError(
+ EXPECTED % (cmd, name, args, options, output)
+ )
+ if not isinstance(e, klass):
+ raise AssertionError(
+ UNEXPECTED % (cmd, name, args, options, e.__class__.__name__, e)
+ )
+ # FIXME: the XML-RPC transport doesn't allow us to return structured
+ # information through the exception, so we can't test the kw on the
+ # client side. However, if we switch to using JSON-RPC for the default
+ # transport, the exception is a free-form data structure (dict).
+# if e.kw != expected.kw:
+# raise AssertionError(
+# KWARGS % (cmd, name, args, options, expected.kw, e.kw)
+# )
+
+ def check_output(self, nice, cmd, args, options, expected, ignore_values):
+ got = api.Command[cmd](*args, **options)
+ result = got['result']
+ if ignore_values:
+ if isinstance(result, dict):
+ self.clean_entry(
+ nice, cmd, args, options, result, ignore_values
+ )
+ elif isinstance(result, (list, tuple)):
+ for entry in result:
+ self.clean_entry(
+ nice, cmd, args, options, entry, ignore_values
+ )
+ assert_deepequal(expected, got, nice)
+
+ def clean_entry(self, nice, cmd, args, options, entry, ignore_values):
+ """
+ Remove attributes like 'ipauniqueid' whose value is unpredictable.
+ """
+ for key in ignore_values:
+ if key not in entry:
+ raise AssertionError(
+ IGNORE % (cmd, key, args, options, entry)
+ )
+ entry.pop(key)