diff options
author | Petr Viktorin <pviktori@redhat.com> | 2013-05-21 13:40:27 +0200 |
---|---|---|
committer | Martin Kosek <mkosek@redhat.com> | 2013-06-17 19:22:50 +0200 |
commit | c60142efda817f030a7495cd6fe4a19953e55afa (patch) | |
tree | 31a840ceddd4381311bbc879f9851bb71a8e2ffa /ipatests/test_xmlrpc/xmlrpc_test.py | |
parent | 6d66e826c1c248dffc80056b20c1e4b74b04d46f (diff) | |
download | freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.gz freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.xz freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.zip |
Make an ipa-tests package
Rename the 'tests' directory to 'ipa-tests', and create an ipa-tests RPM
containing the test suite
Part of the work for: https://fedorahosted.org/freeipa/ticket/3654
Diffstat (limited to 'ipatests/test_xmlrpc/xmlrpc_test.py')
-rw-r--r-- | ipatests/test_xmlrpc/xmlrpc_test.py | 329 |
1 files changed, 329 insertions, 0 deletions
diff --git a/ipatests/test_xmlrpc/xmlrpc_test.py b/ipatests/test_xmlrpc/xmlrpc_test.py new file mode 100644 index 000000000..bfe8efa46 --- /dev/null +++ b/ipatests/test_xmlrpc/xmlrpc_test.py @@ -0,0 +1,329 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Base class for all XML-RPC tests +""" + +import sys +import socket +import nose +from ipatests.util import assert_deepequal, Fuzzy +from ipalib import api, request, errors +from ipalib.x509 import valid_issuer +from ipapython.version import API_VERSION + + +# Matches a gidnumber like '1391016742' +# FIXME: Does it make more sense to return gidnumber, uidnumber, etc. as `int` +# or `long`? If not, we still need to return them as `unicode` instead of `str`. +fuzzy_digits = Fuzzy('^\d+$', type=basestring) + +uuid_re = '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' + +# Matches an ipauniqueid like u'784d85fd-eae7-11de-9d01-54520012478b' +fuzzy_uuid = Fuzzy('^%s$' % uuid_re) + +# Matches trusted domain GUID, like u'463bf2be-3456-4a57-979e-120304f2a0eb' +fuzzy_guid = fuzzy_uuid + +# Matches SID of a trusted domain +# SID syntax: http://msdn.microsoft.com/en-us/library/ff632068.aspx +_sid_identifier_authority = '(0x[0-9a-f]{1,12}|[0-9]{1,10})' +fuzzy_domain_sid = Fuzzy( + '^S-1-5-21-%(idauth)s-%(idauth)s-%(idauth)s$' % dict(idauth=_sid_identifier_authority) +) +fuzzy_user_or_group_sid = Fuzzy( + '^S-1-5-21-%(idauth)s-%(idauth)s-%(idauth)s-%(idauth)s$' % dict(idauth=_sid_identifier_authority) +) + +# Matches netgroup dn. Note (?i) at the beginning of the regexp is the ingnore case flag +fuzzy_netgroupdn = Fuzzy( + '(?i)ipauniqueid=%s,cn=ng,cn=alt,%s' % (uuid_re, api.env.basedn) +) + +# Matches sudocmd dn +fuzzy_sudocmddn = Fuzzy( + '(?i)ipauniqueid=%s,cn=sudocmds,cn=sudo,%s' % (uuid_re, api.env.basedn) +) + +# Matches a hash signature, not enforcing length +fuzzy_hash = Fuzzy('^([a-f0-9][a-f0-9]:)+[a-f0-9][a-f0-9]$', type=basestring) + +# Matches a date, like Tue Apr 26 17:45:35 2016 UTC +fuzzy_date = Fuzzy('^[a-zA-Z]{3} [a-zA-Z]{3} \d{2} \d{2}:\d{2}:\d{2} \d{4} UTC$') + +fuzzy_issuer = Fuzzy(type=basestring, test=lambda issuer: valid_issuer(issuer)) + +fuzzy_hex = Fuzzy('^0x[0-9a-fA-F]+$', type=basestring) + +# Matches password - password consists of all printable characters without whitespaces +# The only exception is space, but space cannot be at the beggingin or end of the pwd +fuzzy_password = Fuzzy('^\S([\S ]*\S)*$') + +# Matches generalized time value. Time format is: %Y%m%d%H%M%SZ +fuzzy_dergeneralizedtime = Fuzzy('^[0-9]{14}Z$') + +# match any string +fuzzy_string = Fuzzy(type=basestring) + +# case insensitive match of sets +def fuzzy_set_ci(s): + return Fuzzy(test=lambda other: set(x.lower() for x in other) == set(y.lower() for y in s)) + +try: + if not api.Backend.xmlclient.isconnected(): + api.Backend.xmlclient.connect(fallback=False) + res = api.Command['user_show'](u'notfound') +except errors.NetworkError: + server_available = False +except IOError: + server_available = False +except errors.NotFound: + server_available = True + + + +def assert_attr_equal(entry, key, value): + if type(entry) is not dict: + raise AssertionError( + 'assert_attr_equal: entry must be a %r; got a %r: %r' % ( + dict, type(entry), entry) + ) + if key not in entry: + raise AssertionError( + 'assert_attr_equal: entry has no key %r: %r' % (key, entry) + ) + if value not in entry[key]: + raise AssertionError( + 'assert_attr_equal: %r: %r not in %r' % (key, value, entry[key]) + ) + + +def assert_is_member(entry, value, key='member'): + if type(entry) is not dict: + raise AssertionError( + 'assert_is_member: entry must be a %r; got a %r: %r' % ( + dict, type(entry), entry) + ) + if key not in entry: + raise AssertionError( + 'assert_is_member: entry has no key %r: %r' % (key, entry) + ) + for member in entry[key]: + if member.startswith(value): + return + raise AssertionError( + 'assert_is_member: %r: %r not in %r' % (key, value, entry[key]) + ) + + +# Initialize the API. We do this here so that one can run the tests +# individually instead of at the top-level. If API.bootstrap() +# has already been called we continue gracefully. Other errors will be +# raised. + +class XMLRPC_test(object): + """ + Base class for all XML-RPC plugin tests + """ + + @classmethod + def setUpClass(cls): + if not server_available: + raise nose.SkipTest('%r: Server not available: %r' % + (cls.__module__, api.env.xmlrpc_uri)) + + def setUp(self): + if not api.Backend.xmlclient.isconnected(): + api.Backend.xmlclient.connect(fallback=False) + + def tearDown(self): + """ + nose tear-down fixture. + """ + request.destroy_context() + + def failsafe_add(self, obj, pk, **options): + """ + Delete possible leftover entry first, then add. + + This helps speed us up when a partial test failure has left LDAP in a + dirty state. + + :param obj: An Object like api.Object.user + :param pk: The primary key of the entry to be created + :param options: Kwargs to be passed to obj.add() + """ + try: + obj.methods['del'](pk) + except errors.NotFound: + pass + return obj.methods['add'](pk, **options) + + +IGNORE = """Command %r is missing attribute %r in output entry. + args = %r + options = %r + entry = %r""" + + +EXPECTED = """Expected %r to raise %s. + args = %r + options = %r + output = %r""" + + +UNEXPECTED = """Expected %r to raise %s, but caught different. + args = %r + options = %r + %s: %s""" + + +KWARGS = """Command %r raised %s with wrong kwargs. + args = %r + options = %r + kw_expected = %r + kw_got = %r""" + + +class Declarative(XMLRPC_test): + """A declarative-style test suite + + A Declarative test suite is controlled by the ``tests`` and + ``cleanup_commands`` class variables. + + The ``tests`` is a list of dictionaries with the following keys: + + ``desc`` + A name/description of the test + ``command`` + A (command, args, kwargs) triple specifying the command to run + ``expected`` + Can be either an ``errors.PublicError`` instance, in which case + the command must fail with the given error; or the + expected result. + The result is checked with ``tests.util.assert_deepequal``. + ``extra_check`` (optional) + A checking function that is called with the response. It must + return true for the test to pass. + + The ``cleanup_commands`` is a list of (command, args, kwargs) + triples. These are commands get run both before and after tests, + and must not fail. + """ + + cleanup_commands = tuple() + tests = tuple() + + def cleanup_generate(self, stage): + for (i, command) in enumerate(self.cleanup_commands): + func = lambda: self.cleanup(command) + func.description = '%s %s-cleanup[%d]: %r' % ( + self.__class__.__name__, stage, i, command + ) + yield (func,) + + def cleanup(self, command): + (cmd, args, options) = command + if cmd not in api.Command: + raise nose.SkipTest( + 'cleanup command %r not in api.Command' % cmd + ) + try: + api.Command[cmd](*args, **options) + except (errors.NotFound, errors.EmptyModlist): + pass + + def test_generator(self): + """ + Iterate through tests. + + nose reports each one as a seperate test. + """ + + # Iterate through pre-cleanup: + for tup in self.cleanup_generate('pre'): + yield tup + + # Iterate through the tests: + name = self.__class__.__name__ + for (i, test) in enumerate(self.tests): + nice = '%s[%d]: %s: %s' % ( + name, i, test['command'][0], test.get('desc', '') + ) + func = lambda: self.check(nice, **test) + func.description = nice + yield (func,) + + # Iterate through post-cleanup: + for tup in self.cleanup_generate('post'): + yield tup + + def check(self, nice, desc, command, expected, extra_check=None): + (cmd, args, options) = command + options.setdefault('version', API_VERSION) + if cmd not in api.Command: + raise nose.SkipTest('%r not in api.Command' % cmd) + if isinstance(expected, errors.PublicError): + self.check_exception(nice, cmd, args, options, expected) + elif hasattr(expected, '__call__'): + self.check_callable(nice, cmd, args, options, expected) + else: + self.check_output(nice, cmd, args, options, expected, extra_check) + + def check_exception(self, nice, cmd, args, options, expected): + klass = expected.__class__ + name = klass.__name__ + try: + output = api.Command[cmd](*args, **options) + except StandardError, e: + pass + else: + raise AssertionError( + EXPECTED % (cmd, name, args, options, output) + ) + if not isinstance(e, klass): + raise AssertionError( + UNEXPECTED % (cmd, name, args, options, e.__class__.__name__, e) + ) + # FIXME: the XML-RPC transport doesn't allow us to return structured + # information through the exception, so we can't test the kw on the + # client side. However, if we switch to using JSON-RPC for the default + # transport, the exception is a free-form data structure (dict). + # For now just compare the strings + assert_deepequal(expected.strerror, e.strerror) + + def check_callable(self, nice, cmd, args, options, expected): + output = dict() + e = None + try: + output = api.Command[cmd](*args, **options) + except StandardError, e: + pass + if not expected(e, output): + raise AssertionError( + UNEXPECTED % (cmd, args, options, e.__class__.__name__, e) + ) + + def check_output(self, nice, cmd, args, options, expected, extra_check): + got = api.Command[cmd](*args, **options) + assert_deepequal(expected, got, nice) + if extra_check and not extra_check(got): + raise AssertionError('Extra check %s failed' % extra_check) |