summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py22
-rw-r--r--tests/data.py38
-rw-r--r--tests/test_ipalib/__init__.py22
-rw-r--r--tests/test_ipalib/test_backend.py55
-rw-r--r--tests/test_ipalib/test_base.py352
-rw-r--r--tests/test_ipalib/test_cli.py277
-rw-r--r--tests/test_ipalib/test_config.py608
-rw-r--r--tests/test_ipalib/test_crud.py237
-rw-r--r--tests/test_ipalib/test_error2.py371
-rw-r--r--tests/test_ipalib/test_errors.py289
-rw-r--r--tests/test_ipalib/test_frontend.py771
-rw-r--r--tests/test_ipalib/test_parameters.py994
-rw-r--r--tests/test_ipalib/test_plugable.py756
-rw-r--r--tests/test_ipalib/test_request.py161
-rw-r--r--tests/test_ipalib/test_rpc.py249
-rw-r--r--tests/test_ipalib/test_util.py61
-rw-r--r--tests/test_ipaserver/__init__.py22
-rw-r--r--tests/test_ipaserver/test_rpcserver.py79
-rw-r--r--tests/test_ipawebui/__init__.py21
-rw-r--r--tests/test_ipawebui/test_controllers.py70
-rw-r--r--tests/test_util.py148
-rw-r--r--tests/test_xmlrpc/__init__.py22
-rw-r--r--tests/test_xmlrpc/test_automount_plugin.py243
-rw-r--r--tests/test_xmlrpc/test_group_plugin.py178
-rw-r--r--tests/test_xmlrpc/test_host_plugin.py128
-rw-r--r--tests/test_xmlrpc/test_hostgroup_plugin.py149
-rw-r--r--tests/test_xmlrpc/test_netgroup_plugin.py320
-rw-r--r--tests/test_xmlrpc/test_service_plugin.py93
-rw-r--r--tests/test_xmlrpc/test_user_plugin.py151
-rw-r--r--tests/test_xmlrpc/xmlrpc_test.py49
-rw-r--r--tests/util.py391
31 files changed, 7327 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 00000000..4550e6bc
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,22 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Package containing all unit tests.
+"""
diff --git a/tests/data.py b/tests/data.py
new file mode 100644
index 00000000..cf646ea9
--- /dev/null
+++ b/tests/data.py
@@ -0,0 +1,38 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Data frequently used in the unit tests, especially Unicode related tests.
+"""
+
+import struct
+
+
+# A string that should have bytes 'x\00' through '\xff':
+binary_bytes = ''.join(struct.pack('B', d) for d in xrange(256))
+assert '\x00' in binary_bytes and '\xff' in binary_bytes
+assert type(binary_bytes) is str and len(binary_bytes) == 256
+
+# A UTF-8 encoded str:
+utf8_bytes = '\xd0\x9f\xd0\xb0\xd0\xb2\xd0\xb5\xd0\xbb'
+
+# The same UTF-8 data decoded (a unicode instance):
+unicode_str = u'\u041f\u0430\u0432\u0435\u043b'
+assert utf8_bytes.decode('UTF-8') == unicode_str
+assert unicode_str.encode('UTF-8') == utf8_bytes
diff --git a/tests/test_ipalib/__init__.py b/tests/test_ipalib/__init__.py
new file mode 100644
index 00000000..113881eb
--- /dev/null
+++ b/tests/test_ipalib/__init__.py
@@ -0,0 +1,22 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Sub-package containing unit tests for `ipalib` package.
+"""
diff --git a/tests/test_ipalib/test_backend.py b/tests/test_ipalib/test_backend.py
new file mode 100644
index 00000000..88bd2da4
--- /dev/null
+++ b/tests/test_ipalib/test_backend.py
@@ -0,0 +1,55 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.backend` module.
+"""
+
+from ipalib import backend, plugable, errors
+from tests.util import ClassChecker, raises
+
+
+class test_Backend(ClassChecker):
+ """
+ Test the `ipalib.backend.Backend` class.
+ """
+
+ _cls = backend.Backend
+
+ def test_class(self):
+ assert self.cls.__bases__ == (plugable.Plugin,)
+ assert self.cls.__proxy__ is False
+
+
+class test_Context(ClassChecker):
+ """
+ Test the `ipalib.backend.Context` class.
+ """
+
+ _cls = backend.Context
+
+ def test_get_value(self):
+ """
+ Test the `ipalib.backend.Context.get_value` method.
+ """
+ class Subclass(self.cls):
+ pass
+ o = Subclass()
+ e = raises(NotImplementedError, o.get_value)
+ assert str(e) == 'Subclass.get_value()'
diff --git a/tests/test_ipalib/test_base.py b/tests/test_ipalib/test_base.py
new file mode 100644
index 00000000..ce88f23f
--- /dev/null
+++ b/tests/test_ipalib/test_base.py
@@ -0,0 +1,352 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.base` module.
+"""
+
+from tests.util import ClassChecker, raises
+from ipalib.constants import NAME_REGEX, NAME_ERROR
+from ipalib.constants import TYPE_ERROR, SET_ERROR, DEL_ERROR, OVERRIDE_ERROR
+from ipalib import base
+
+
+class test_ReadOnly(ClassChecker):
+ """
+ Test the `ipalib.base.ReadOnly` class
+ """
+ _cls = base.ReadOnly
+
+ def test_lock(self):
+ """
+ Test the `ipalib.base.ReadOnly.__lock__` method.
+ """
+ o = self.cls()
+ assert o._ReadOnly__locked is False
+ o.__lock__()
+ assert o._ReadOnly__locked is True
+ e = raises(AssertionError, o.__lock__) # Can only be locked once
+ assert str(e) == '__lock__() can only be called once'
+ assert o._ReadOnly__locked is True # This should still be True
+
+ def test_islocked(self):
+ """
+ Test the `ipalib.base.ReadOnly.__islocked__` method.
+ """
+ o = self.cls()
+ assert o.__islocked__() is False
+ o.__lock__()
+ assert o.__islocked__() is True
+
+ def test_setattr(self):
+ """
+ Test the `ipalib.base.ReadOnly.__setattr__` method.
+ """
+ o = self.cls()
+ o.attr1 = 'Hello, world!'
+ assert o.attr1 == 'Hello, world!'
+ o.__lock__()
+ for name in ('attr1', 'attr2'):
+ e = raises(AttributeError, setattr, o, name, 'whatever')
+ assert str(e) == SET_ERROR % ('ReadOnly', name, 'whatever')
+ assert o.attr1 == 'Hello, world!'
+
+ def test_delattr(self):
+ """
+ Test the `ipalib.base.ReadOnly.__delattr__` method.
+ """
+ o = self.cls()
+ o.attr1 = 'Hello, world!'
+ o.attr2 = 'How are you?'
+ assert o.attr1 == 'Hello, world!'
+ assert o.attr2 == 'How are you?'
+ del o.attr1
+ assert not hasattr(o, 'attr1')
+ o.__lock__()
+ e = raises(AttributeError, delattr, o, 'attr2')
+ assert str(e) == DEL_ERROR % ('ReadOnly', 'attr2')
+ assert o.attr2 == 'How are you?'
+
+
+def test_lock():
+ """
+ Test the `ipalib.base.lock` function
+ """
+ f = base.lock
+
+ # Test with ReadOnly instance:
+ o = base.ReadOnly()
+ assert o.__islocked__() is False
+ assert f(o) is o
+ assert o.__islocked__() is True
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'already locked: %r' % o
+
+ # Test with another class implemented locking protocol:
+ class Lockable(object):
+ __locked = False
+ def __lock__(self):
+ self.__locked = True
+ def __islocked__(self):
+ return self.__locked
+ o = Lockable()
+ assert o.__islocked__() is False
+ assert f(o) is o
+ assert o.__islocked__() is True
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'already locked: %r' % o
+
+ # Test with a class incorrectly implementing the locking protocol:
+ class Broken(object):
+ def __lock__(self):
+ pass
+ def __islocked__(self):
+ return False
+ o = Broken()
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'failed to lock: %r' % o
+
+
+def test_islocked():
+ """
+ Test the `ipalib.base.islocked` function.
+ """
+ f = base.islocked
+
+ # Test with ReadOnly instance:
+ o = base.ReadOnly()
+ assert f(o) is False
+ o.__lock__()
+ assert f(o) is True
+
+ # Test with another class implemented locking protocol:
+ class Lockable(object):
+ __locked = False
+ def __lock__(self):
+ self.__locked = True
+ def __islocked__(self):
+ return self.__locked
+ o = Lockable()
+ assert f(o) is False
+ o.__lock__()
+ assert f(o) is True
+
+ # Test with a class incorrectly implementing the locking protocol:
+ class Broken(object):
+ __lock__ = False
+ def __islocked__(self):
+ return False
+ o = Broken()
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'no __lock__() method: %r' % o
+
+
+def test_check_name():
+ """
+ Test the `ipalib.base.check_name` function.
+ """
+ f = base.check_name
+ okay = [
+ 'user_add',
+ 'stuff2junk',
+ 'sixty9',
+ ]
+ nope = [
+ '_user_add',
+ '__user_add',
+ 'user_add_',
+ 'user_add__',
+ '_user_add_',
+ '__user_add__',
+ '60nine',
+ ]
+ for name in okay:
+ assert name is f(name)
+ e = raises(TypeError, f, unicode(name))
+ assert str(e) == TYPE_ERROR % ('name', str, unicode(name), unicode)
+ for name in nope:
+ e = raises(ValueError, f, name)
+ assert str(e) == NAME_ERROR % (NAME_REGEX, name)
+ for name in okay:
+ e = raises(ValueError, f, name.upper())
+ assert str(e) == NAME_ERROR % (NAME_REGEX, name.upper())
+
+
+def membername(i):
+ return 'member%03d' % i
+
+
+class DummyMember(object):
+ def __init__(self, i):
+ self.i = i
+ self.name = membername(i)
+
+
+def gen_members(*indexes):
+ return tuple(DummyMember(i) for i in indexes)
+
+
+class test_NameSpace(ClassChecker):
+ """
+ Test the `ipalib.base.NameSpace` class.
+ """
+ _cls = base.NameSpace
+
+ def new(self, count, sort=True):
+ members = tuple(DummyMember(i) for i in xrange(count, 0, -1))
+ assert len(members) == count
+ o = self.cls(members, sort=sort)
+ return (o, members)
+
+ def test_init(self):
+ """
+ Test the `ipalib.base.NameSpace.__init__` method.
+ """
+ o = self.cls([])
+ assert len(o) == 0
+ assert list(o) == []
+ assert list(o()) == []
+
+ # Test members as attribute and item:
+ for cnt in (3, 42):
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ assert len(members) == cnt
+ for m in members:
+ assert getattr(o, m.name) is m
+ assert o[m.name] is m
+
+ # Test that TypeError is raised if sort is not a bool:
+ e = raises(TypeError, self.cls, [], sort=None)
+ assert str(e) == TYPE_ERROR % ('sort', bool, None, type(None))
+
+ # Test that AttributeError is raised with duplicate member name:
+ members = gen_members(0, 1, 2, 1, 3)
+ e = raises(AttributeError, self.cls, members)
+ assert str(e) == OVERRIDE_ERROR % (
+ 'NameSpace', membername(1), members[1], members[3]
+ )
+
+ def test_len(self):
+ """
+ Test the `ipalib.base.NameSpace.__len__` method.
+ """
+ for count in (5, 18, 127):
+ (o, members) = self.new(count)
+ assert len(o) == count
+ (o, members) = self.new(count, sort=False)
+ assert len(o) == count
+
+ def test_iter(self):
+ """
+ Test the `ipalib.base.NameSpace.__iter__` method.
+ """
+ (o, members) = self.new(25)
+ assert list(o) == sorted(m.name for m in members)
+ (o, members) = self.new(25, sort=False)
+ assert list(o) == list(m.name for m in members)
+
+ def test_call(self):
+ """
+ Test the `ipalib.base.NameSpace.__call__` method.
+ """
+ (o, members) = self.new(25)
+ assert list(o()) == sorted(members, key=lambda m: m.name)
+ (o, members) = self.new(25, sort=False)
+ assert tuple(o()) == members
+
+ def test_contains(self):
+ """
+ Test the `ipalib.base.NameSpace.__contains__` method.
+ """
+ yes = (99, 3, 777)
+ no = (9, 333, 77)
+ for sort in (True, False):
+ members = gen_members(*yes)
+ o = self.cls(members, sort=sort)
+ for i in yes:
+ assert membername(i) in o
+ assert membername(i).upper() not in o
+ for i in no:
+ assert membername(i) not in o
+
+ def test_getitem(self):
+ """
+ Test the `ipalib.base.NameSpace.__getitem__` method.
+ """
+ cnt = 17
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ assert len(members) == cnt
+ if sort is True:
+ members = tuple(sorted(members, key=lambda m: m.name))
+
+ # Test str keys:
+ for m in members:
+ assert o[m.name] is m
+ e = raises(KeyError, o.__getitem__, 'nope')
+
+ # Test int indexes:
+ for i in xrange(cnt):
+ assert o[i] is members[i]
+ e = raises(IndexError, o.__getitem__, cnt)
+
+ # Test negative int indexes:
+ for i in xrange(1, cnt + 1):
+ assert o[-i] is members[-i]
+ e = raises(IndexError, o.__getitem__, -(cnt + 1))
+
+ # Test slicing:
+ assert o[3:] == members[3:]
+ assert o[:10] == members[:10]
+ assert o[3:10] == members[3:10]
+ assert o[-9:] == members[-9:]
+ assert o[:-4] == members[:-4]
+ assert o[-9:-4] == members[-9:-4]
+
+ # Test that TypeError is raised with wrong type
+ e = raises(TypeError, o.__getitem__, 3.0)
+ assert str(e) == TYPE_ERROR % ('key', (str, int, slice), 3.0, float)
+
+ def test_repr(self):
+ """
+ Test the `ipalib.base.NameSpace.__repr__` method.
+ """
+ for cnt in (0, 1, 2):
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ if cnt == 1:
+ assert repr(o) == \
+ 'NameSpace(<%d member>, sort=%r)' % (cnt, sort)
+ else:
+ assert repr(o) == \
+ 'NameSpace(<%d members>, sort=%r)' % (cnt, sort)
+
+ def test_todict(self):
+ """
+ Test the `ipalib.base.NameSpace.__todict__` method.
+ """
+ for cnt in (3, 101):
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ d = o.__todict__()
+ assert d == dict((m.name, m) for m in members)
+
+ # Test that a copy is returned:
+ assert o.__todict__() is not d
diff --git a/tests/test_ipalib/test_cli.py b/tests/test_ipalib/test_cli.py
new file mode 100644
index 00000000..56297fdf
--- /dev/null
+++ b/tests/test_ipalib/test_cli.py
@@ -0,0 +1,277 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.cli` module.
+"""
+
+from tests.util import raises, get_api, ClassChecker
+from ipalib import cli, plugable, frontend, backend
+
+
+class test_textui(ClassChecker):
+ _cls = cli.textui
+
+ def test_max_col_width(self):
+ """
+ Test the `ipalib.cli.textui.max_col_width` method.
+ """
+ o = self.cls()
+ e = raises(TypeError, o.max_col_width, 'hello')
+ assert str(e) == 'rows: need %r or %r; got %r' % (list, tuple, 'hello')
+ rows = [
+ 'hello',
+ 'naughty',
+ 'nurse',
+ ]
+ assert o.max_col_width(rows) == len('naughty')
+ rows = (
+ ( 'a', 'bbb', 'ccccc'),
+ ('aa', 'bbbb', 'cccccc'),
+ )
+ assert o.max_col_width(rows, col=0) == 2
+ assert o.max_col_width(rows, col=1) == 4
+ assert o.max_col_width(rows, col=2) == 6
+
+
+def test_to_cli():
+ """
+ Test the `ipalib.cli.to_cli` function.
+ """
+ f = cli.to_cli
+ assert f('initialize') == 'initialize'
+ assert f('user_add') == 'user-add'
+
+
+def test_from_cli():
+ """
+ Test the `ipalib.cli.from_cli` function.
+ """
+ f = cli.from_cli
+ assert f('initialize') == 'initialize'
+ assert f('user-add') == 'user_add'
+
+
+def get_cmd_name(i):
+ return 'cmd_%d' % i
+
+
+class DummyCommand(object):
+ def __init__(self, name):
+ self.__name = name
+
+ def __get_name(self):
+ return self.__name
+ name = property(__get_name)
+
+
+class DummyAPI(object):
+ def __init__(self, cnt):
+ self.__cmd = plugable.NameSpace(self.__cmd_iter(cnt))
+
+ def __get_cmd(self):
+ return self.__cmd
+ Command = property(__get_cmd)
+
+ def __cmd_iter(self, cnt):
+ for i in xrange(cnt):
+ yield DummyCommand(get_cmd_name(i))
+
+ def finalize(self):
+ pass
+
+ def register(self, *args, **kw):
+ pass
+
+
+config_cli = """
+[global]
+
+from_cli_conf = set in cli.conf
+"""
+
+config_default = """
+[global]
+
+from_default_conf = set in default.conf
+
+# Make sure cli.conf is loaded first:
+from_cli_conf = overridden in default.conf
+"""
+
+
+class test_CLI(ClassChecker):
+ """
+ Test the `ipalib.cli.CLI` class.
+ """
+ _cls = cli.CLI
+
+ def new(self, argv=tuple()):
+ (api, home) = get_api()
+ o = self.cls(api, argv)
+ assert o.api is api
+ return (o, api, home)
+
+ def check_cascade(self, *names):
+ (o, api, home) = self.new()
+ method = getattr(o, names[0])
+ for name in names:
+ assert o.isdone(name) is False
+ method()
+ for name in names:
+ assert o.isdone(name) is True
+ e = raises(StandardError, method)
+ assert str(e) == 'CLI.%s() already called' % names[0]
+
+ def test_init(self):
+ """
+ Test the `ipalib.cli.CLI.__init__` method.
+ """
+ argv = ['-v', 'user-add', '--first=Jonh', '--last=Doe']
+ (o, api, home) = self.new(argv)
+ assert o.api is api
+ assert o.argv == tuple(argv)
+
+ def test_run_real(self):
+ """
+ Test the `ipalib.cli.CLI.run_real` method.
+ """
+ self.check_cascade(
+ 'run_real',
+ 'finalize',
+ 'load_plugins',
+ 'bootstrap',
+ 'parse_globals'
+ )
+
+ def test_finalize(self):
+ """
+ Test the `ipalib.cli.CLI.finalize` method.
+ """
+ self.check_cascade(
+ 'finalize',
+ 'load_plugins',
+ 'bootstrap',
+ 'parse_globals'
+ )
+
+ (o, api, home) = self.new()
+ assert api.isdone('finalize') is False
+ assert 'Command' not in api
+ o.finalize()
+ assert api.isdone('finalize') is True
+ assert list(api.Command) == \
+ sorted(k.__name__ for k in cli.cli_application_commands)
+
+ def test_load_plugins(self):
+ """
+ Test the `ipalib.cli.CLI.load_plugins` method.
+ """
+ self.check_cascade(
+ 'load_plugins',
+ 'bootstrap',
+ 'parse_globals'
+ )
+ (o, api, home) = self.new()
+ assert api.isdone('load_plugins') is False
+ o.load_plugins()
+ assert api.isdone('load_plugins') is True
+
+ def test_bootstrap(self):
+ """
+ Test the `ipalib.cli.CLI.bootstrap` method.
+ """
+ self.check_cascade(
+ 'bootstrap',
+ 'parse_globals'
+ )
+ # Test with empty argv
+ (o, api, home) = self.new()
+ keys = tuple(api.env)
+ assert api.isdone('bootstrap') is False
+ o.bootstrap()
+ assert api.isdone('bootstrap') is True
+ e = raises(StandardError, o.bootstrap)
+ assert str(e) == 'CLI.bootstrap() already called'
+ assert api.env.verbose is False
+ assert api.env.context == 'cli'
+ keys = tuple(api.env)
+ added = (
+ 'my_key',
+ 'from_default_conf',
+ 'from_cli_conf'
+ )
+ for key in added:
+ assert key not in api.env
+ assert key not in keys
+
+ # Test with a populated argv
+ argv = ['-e', 'my_key=my_val,whatever=Hello']
+ (o, api, home) = self.new(argv)
+ home.write(config_default, '.ipa', 'default.conf')
+ home.write(config_cli, '.ipa', 'cli.conf')
+ o.bootstrap()
+ assert api.env.my_key == 'my_val,whatever=Hello'
+ assert api.env.from_default_conf == 'set in default.conf'
+ assert api.env.from_cli_conf == 'set in cli.conf'
+ assert list(api.env) == sorted(keys + added)
+
+ def test_parse_globals(self):
+ """
+ Test the `ipalib.cli.CLI.parse_globals` method.
+ """
+ # Test with empty argv:
+ (o, api, home) = self.new()
+ assert not hasattr(o, 'options')
+ assert not hasattr(o, 'cmd_argv')
+ assert o.isdone('parse_globals') is False
+ o.parse_globals()
+ assert o.isdone('parse_globals') is True
+ assert o.options.prompt_all is False
+ assert o.options.interactive is True
+ assert o.options.verbose is None
+ assert o.options.conf is None
+ assert o.options.env is None
+ assert o.cmd_argv == tuple()
+ e = raises(StandardError, o.parse_globals)
+ assert str(e) == 'CLI.parse_globals() already called'
+
+ # Test with a populated argv:
+ argv = ('-a', '-n', '-v', '-c', '/my/config.conf', '-e', 'my_key=my_val')
+ cmd_argv = ('user-add', '--first', 'John', '--last', 'Doe')
+ (o, api, home) = self.new(argv + cmd_argv)
+ assert not hasattr(o, 'options')
+ assert not hasattr(o, 'cmd_argv')
+ assert o.isdone('parse_globals') is False
+ o.parse_globals()
+ assert o.isdone('parse_globals') is True
+ assert o.options.prompt_all is True
+ assert o.options.interactive is False
+ assert o.options.verbose is True
+ assert o.options.conf == '/my/config.conf'
+ assert o.options.env == ['my_key=my_val']
+ assert o.cmd_argv == cmd_argv
+ e = raises(StandardError, o.parse_globals)
+ assert str(e) == 'CLI.parse_globals() already called'
+
+ # Test with multiple -e args:
+ argv = ('-e', 'key1=val1', '-e', 'key2=val2')
+ (o, api, home) = self.new(argv)
+ o.parse_globals()
+ assert o.options.env == ['key1=val1', 'key2=val2']
diff --git a/tests/test_ipalib/test_config.py b/tests/test_ipalib/test_config.py
new file mode 100644
index 00000000..d3109f7b
--- /dev/null
+++ b/tests/test_ipalib/test_config.py
@@ -0,0 +1,608 @@
+# Authors:
+# Martin Nagy <mnagy@redhat.com>
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.config` module.
+"""
+
+import os
+from os import path
+import sys
+from tests.util import raises, setitem, delitem, ClassChecker
+from tests.util import getitem, setitem, delitem
+from tests.util import TempDir, TempHome
+from ipalib.constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR
+from ipalib.constants import NAME_REGEX, NAME_ERROR
+from ipalib import config, constants, base
+
+
+# Valid environment variables in (key, raw, value) tuples:
+# key: the name of the environment variable
+# raw: the value being set (possibly a string repr)
+# value: the expected value after the lightweight conversion
+good_vars = (
+ ('a_string', 'Hello world!', 'Hello world!'),
+ ('trailing_whitespace', ' value ', 'value'),
+ ('an_int', 42, 42),
+ ('int_repr', ' 42 ', 42),
+ ('a_float', 3.14, 3.14),
+ ('float_repr', ' 3.14 ', 3.14),
+ ('true', True, True),
+ ('true_repr', ' True ', True),
+ ('false', False, False),
+ ('false_repr', ' False ', False),
+ ('none', None, None),
+ ('none_repr', ' None ', None),
+ ('empty', '', None),
+
+ # These verify that the implied conversion is case-sensitive:
+ ('not_true', ' true ', 'true'),
+ ('not_false', ' false ', 'false'),
+ ('not_none', ' none ', 'none'),
+)
+
+
+bad_names = (
+ ('CamelCase', 'value'),
+ ('_leading_underscore', 'value'),
+ ('trailing_underscore_', 'value'),
+)
+
+
+# Random base64-encoded data to simulate a misbehaving config file.
+config_bad = """
+/9j/4AAQSkZJRgABAQEAlgCWAAD//gAIT2xpdmVy/9sAQwAQCwwODAoQDg0OEhEQExgoGhgWFhgx
+IyUdKDozPTw5Mzg3QEhcTkBEV0U3OFBtUVdfYmdoZz5NcXlwZHhcZWdj/8AACwgAlgB0AQERAP/E
+ABsAAAEFAQEAAAAAAAAAAAAAAAQAAQIDBQYH/8QAMhAAAgICAAUDBAIABAcAAAAAAQIAAwQRBRIh
+MUEGE1EiMmFxFIEVI0LBFjNSYnKRof/aAAgBAQAAPwDCtzmNRr1o/MEP1D6f7kdkRakgBsAtoQhk
+xls/y3Z113I11mhiUc1ewCf1Oq4anJgINdhLhQoextfedmYrenfcvdzaFQnYAE08XhONTWEK8+js
+Fpo1oqAKoAA8CWjoJJTHM8kJ5jsiOiszAKD1+IV/hmW76rosbfnlh1Pp3Mah2srCnXQE9YXiel/c
+p5r7uVj2CwxPTuFjjmdLbteNwmrLwsYe3TjsD8cmjKV43ycy+3o76D4llFuXmuCoZEPczXVOSsLv
+f5lgGpNZLxJL2jnvMar0/wAOp6jHDH/uO4RViY9f/KpRdfC6k3R9fRyj+pRZVkWKqF10e+hCKaFq
+XlH/ALlmhK7Met/uUGZ5ow8XL57lU8/Yt4lx4jUOJphLobTe/wDaHeZLxHXtJEya9o5lFzCqpmPY
+CUYoPtDfc9TLj0G5jZvHaMFirAs++oEHq9U4rbNiMp8a6wO/1Zbzn2alC+Nx8P1JfdeBboA+AILx
+rin8pfbA1ynvKuFUXZOXXkLbzOp2R56andL2G45MmO0RPWWLEe8GzaffoKb/ADI44Pt9ZXxAuuFa
+axtgp0BOSPCcviNX8n3Aw8KTNHB4FiY9StkobLWHVSeghq8M4bkAhKKyV6Hl8RV8MwMZG1Uuz3Jn
+IcUQJlMFGlJ6D4hfpymy7iChHKqvVtefxO7Ai1txLBIn7pcojN3jGVhQO0ZgCNfM5ZHycTLycSkr
+yhtqD4Bmrfw5cuqsm6xHXyp1seRLcHCp4dQy1bOzslj1MzeJ5dVFnuMVdgOiHxOWzrmyMg2Nrbde
+k3vR2OTddcd6A5R8GdZqOo67k4wXrLAQPMRKnzImMZEzm+P1nFz6cxQeVujagWR6jsYiqivlH/Ux
+1M+7jWY30i7QHx1gF11tjGyxiSfmVc+503pPidVROHYNNY21b/adVZZySo3uOo1qIZQYd9RCzfYm
+TUk/qW71LjGkTA+IYiZmM1T9N9j8Gee5+McXJem0/Wp8GUK6KOi7b5MgzFjsxpJHZGDKSCOxE3cD
+OvsxbbLc9lsT7Vc73KX4ln3q1ZyVrPx2J/uAjLyan37z7B+Zp4vqPJqKi0K4EvzvUt1qBMdfb+T5
+gycfzkXXuc35InfE6nO8Y9SjFc1Yqh2Hdj2mH/xFxR26XgD/AMRJf45mWMqW5bBD3KqAZlZtb++7
+kEqTsHe//sG1CcTBvy7OWpD+Sewhz8CyKCTYAQPiGV0LVWPdxqQNADQ6zL4nWq2gopU6+ofmA8x3
+1MlvfeIGbnBeCHitRt94IFbRGus2U9H08v13sT+BNHjeX/D4bY4OmP0rPPbHLMWJ2Yy2EDQjVsos
+BdeYDx8wo5L5KpSdLWPAE1+G8NrFtBKgOAXPTf6mzViql5ZBoE87eJZkKbOQ8m+Yjf5EBzcO621y
+GCqD0H41Obzq7U6vzM577HTXgzPPeOIvM1eB59nD8xXVj7bHTr8iej1MtlauvUMNgzi/V2ctliYy
+HYTq37nMExpZXRZYpZVJUdzNjg+FXYwZgdhv6nVVUJU/uH7iNf1CARrtF0IB113M7jTNVjFl2xJA
+5ROey88OrVOugOy67TDs+89NRKdSYILdRC8ZQVJ+PHyJs4fqe3EoFPLzBexPxOdusa2xndiWY7JM
+qMUNrzOTAfHC9XO9/E3vT9blVJB0o2Zu3MAoYrsL13Ii0Muw3XvJG9KkDOeqjf6gWcw5A33XN9nX
+tOeyMRFWy3Jch+bX7mXmCsW/5RBXUoHaOIRi2asAJ0IRbjqzll3o/EAaRiltDojgv2E1aePmhEWq
+rsNHZ7wir1K/8Y1vUCSCAd+IXiZ9b1gLYvN07trXTUD4rxN2TkUgEts8p2NDtD0t5MVGchr2Xe99
+hMPNvD1LX5J2TuZhGyYwBijjfiHU5bJXrnYfqBRtRtSbIBWG3+xI6HiLUWz8xA9RuaVNrMAPfB5x
+r6v9MLr4S1il7LaxyjY69Jl5eG+Kyhiv1jYIMGYMO8etGscKoJJ8Cbp4bVg4ivaq22t3G/tmRYo5
+zyjQ+JRFFET01GB0Yid9YiYh1l9KgEHqT8Tco/hewA/NzgdQdwTNGNTY3uU2crL9HN00ZlovNzfV
+oCanBrBRk1rpCHPUkQjjYoW4GtwAw30MDpuxvbAvpJceR5mXFFEY0W4o4mpg0XNXutQxPUHxLb8q
+7mRDyszLr6esz8u++9wL2LcvQb8RXCkhBV3A6mR5rEVSrdFPT8SBLMdsdmWe6P8AUAx+TB4oooxi
+i1Jmt0+5dfuOLbANB2H6MjzNzc2zv5ji1g2+5/MYnbb+Yh+T0kubUY940UUbUWtRpJN8w1CfebkK
+WfUu+/mDOAGOjsRo0UkIo+pPl6Rckl7ehuR1INGAj9u0kW2nXvK45YlQp1odukaICSAjgSQWf//Z
+"""
+
+
+# A config file that tries to override some standard vars:
+config_override = """
+[global]
+
+key0 = var0
+home = /home/sweet/home
+key1 = var1
+site_packages = planet
+key2 = var2
+key3 = var3
+"""
+
+
+# A config file that tests the automatic type conversion
+config_good = """
+[global]
+
+string = Hello world!
+null = None
+yes = True
+no = False
+number = 42
+floating = 3.14
+"""
+
+
+# A default config file to make sure it does not overwrite the explicit one
+config_default = """
+[global]
+
+yes = Hello
+not_in_other = foo_bar
+"""
+
+
+class test_Env(ClassChecker):
+ """
+ Test the `ipalib.config.Env` class.
+ """
+
+ _cls = config.Env
+
+ def test_init(self):
+ """
+ Test the `ipalib.config.Env.__init__` method.
+ """
+ o = self.cls()
+ assert list(o) == []
+ assert len(o) == 0
+ assert o.__islocked__() is False
+
+ def test_lock(self):
+ """
+ Test the `ipalib.config.Env.__lock__` method.
+ """
+ o = self.cls()
+ assert o.__islocked__() is False
+ o.__lock__()
+ assert o.__islocked__() is True
+ e = raises(StandardError, o.__lock__)
+ assert str(e) == 'Env.__lock__() already called'
+
+ # Also test with base.lock() function:
+ o = self.cls()
+ assert o.__islocked__() is False
+ assert base.lock(o) is o
+ assert o.__islocked__() is True
+ e = raises(AssertionError, base.lock, o)
+ assert str(e) == 'already locked: %r' % o
+
+ def test_islocked(self):
+ """
+ Test the `ipalib.config.Env.__islocked__` method.
+ """
+ o = self.cls()
+ assert o.__islocked__() is False
+ assert base.islocked(o) is False
+ o.__lock__()
+ assert o.__islocked__() is True
+ assert base.islocked(o) is True
+
+ def test_setattr(self):
+ """
+ Test the `ipalib.config.Env.__setattr__` method.
+ """
+ o = self.cls()
+ for (name, raw, value) in good_vars:
+ # Test setting the value:
+ setattr(o, name, raw)
+ result = getattr(o, name)
+ assert type(result) is type(value)
+ assert result == value
+ assert result is o[name]
+
+ # Test that value cannot be overridden once set:
+ e = raises(AttributeError, setattr, o, name, raw)
+ assert str(e) == OVERRIDE_ERROR % ('Env', name, value, raw)
+
+ # Test that values cannot be set once locked:
+ o = self.cls()
+ o.__lock__()
+ for (name, raw, value) in good_vars:
+ e = raises(AttributeError, setattr, o, name, raw)
+ assert str(e) == SET_ERROR % ('Env', name, raw)
+
+ # Test that name is tested with check_name():
+ o = self.cls()
+ for (name, value) in bad_names:
+ e = raises(ValueError, setattr, o, name, value)
+ assert str(e) == NAME_ERROR % (NAME_REGEX, name)
+
+ def test_setitem(self):
+ """
+ Test the `ipalib.config.Env.__setitem__` method.
+ """
+ o = self.cls()
+ for (key, raw, value) in good_vars:
+ # Test setting the value:
+ o[key] = raw
+ result = o[key]
+ assert type(result) is type(value)
+ assert result == value
+ assert result is getattr(o, key)
+
+ # Test that value cannot be overridden once set:
+ e = raises(AttributeError, o.__setitem__, key, raw)
+ assert str(e) == OVERRIDE_ERROR % ('Env', key, value, raw)
+
+ # Test that values cannot be set once locked:
+ o = self.cls()
+ o.__lock__()
+ for (key, raw, value) in good_vars:
+ e = raises(AttributeError, o.__setitem__, key, raw)
+ assert str(e) == SET_ERROR % ('Env', key, raw)
+
+ # Test that name is tested with check_name():
+ o = self.cls()
+ for (key, value) in bad_names:
+ e = raises(ValueError, o.__setitem__, key, value)
+ assert str(e) == NAME_ERROR % (NAME_REGEX, key)
+
+ def test_getitem(self):
+ """
+ Test the `ipalib.config.Env.__getitem__` method.
+ """
+ o = self.cls()
+ value = 'some value'
+ o.key = value
+ assert o.key is value
+ assert o['key'] is value
+ for name in ('one', 'two'):
+ e = raises(KeyError, getitem, o, name)
+ assert str(e) == repr(name)
+
+ def test_delattr(self):
+ """
+ Test the `ipalib.config.Env.__delattr__` method.
+
+ This also tests that ``__delitem__`` is not implemented.
+ """
+ o = self.cls()
+ o.one = 1
+ assert o.one == 1
+ for key in ('one', 'two'):
+ e = raises(AttributeError, delattr, o, key)
+ assert str(e) == DEL_ERROR % ('Env', key)
+ e = raises(AttributeError, delitem, o, key)
+ assert str(e) == '__delitem__'
+
+ def test_contains(self):
+ """
+ Test the `ipalib.config.Env.__contains__` method.
+ """
+ o = self.cls()
+ items = [
+ ('one', 1),
+ ('two', 2),
+ ('three', 3),
+ ('four', 4),
+ ]
+ for (key, value) in items:
+ assert key not in o
+ o[key] = value
+ assert key in o
+
+ def test_len(self):
+ """
+ Test the `ipalib.config.Env.__len__` method.
+ """
+ o = self.cls()
+ assert len(o) == 0
+ for i in xrange(1, 11):
+ key = 'key%d' % i
+ value = 'value %d' % i
+ o[key] = value
+ assert o[key] is value
+ assert len(o) == i
+
+ def test_iter(self):
+ """
+ Test the `ipalib.config.Env.__iter__` method.
+ """
+ o = self.cls()
+ default_keys = tuple(o)
+ keys = ('one', 'two', 'three', 'four', 'five')
+ for key in keys:
+ o[key] = 'the value'
+ assert list(o) == sorted(keys + default_keys)
+
+ def test_merge(self):
+ """
+ Test the `ipalib.config.Env._merge` method.
+ """
+ group1 = (
+ ('key1', 'value 1'),
+ ('key2', 'value 2'),
+ ('key3', 'value 3'),
+ ('key4', 'value 4'),
+ )
+ group2 = (
+ ('key0', 'Value 0'),
+ ('key2', 'Value 2'),
+ ('key4', 'Value 4'),
+ ('key5', 'Value 5'),
+ )
+ o = self.cls()
+ assert o._merge(**dict(group1)) == (4, 4)
+ assert len(o) == 4
+ assert list(o) == list(key for (key, value) in group1)
+ for (key, value) in group1:
+ assert getattr(o, key) is value
+ assert o[key] is value
+ assert o._merge(**dict(group2)) == (2, 4)
+ assert len(o) == 6
+ expected = dict(group2)
+ expected.update(dict(group1))
+ assert list(o) == sorted(expected)
+ assert expected['key2'] == 'value 2' # And not 'Value 2'
+ for (key, value) in expected.iteritems():
+ assert getattr(o, key) is value
+ assert o[key] is value
+ assert o._merge(**expected) == (0, 6)
+ assert len(o) == 6
+ assert list(o) == sorted(expected)
+
+ def test_merge_from_file(self):
+ """
+ Test the `ipalib.config.Env._merge_from_file` method.
+ """
+ tmp = TempDir()
+ assert callable(tmp.join)
+
+ # Test a config file that doesn't exist
+ no_exist = tmp.join('no_exist.conf')
+ assert not path.exists(no_exist)
+ o = self.cls()
+ o._bootstrap()
+ keys = tuple(o)
+ orig = dict((k, o[k]) for k in o)
+ assert o._merge_from_file(no_exist) is None
+ assert tuple(o) == keys
+
+ # Test an empty config file
+ empty = tmp.touch('empty.conf')
+ assert path.isfile(empty)
+ assert o._merge_from_file(empty) == (0, 0)
+ assert tuple(o) == keys
+
+ # Test a mal-formed config file:
+ bad = tmp.join('bad.conf')
+ open(bad, 'w').write(config_bad)
+ assert path.isfile(bad)
+ assert o._merge_from_file(bad) is None
+ assert tuple(o) == keys
+
+ # Test a valid config file that tries to override
+ override = tmp.join('override.conf')
+ open(override, 'w').write(config_override)
+ assert path.isfile(override)
+ assert o._merge_from_file(override) == (4, 6)
+ for (k, v) in orig.items():
+ assert o[k] is v
+ assert list(o) == sorted(keys + ('key0', 'key1', 'key2', 'key3'))
+ for i in xrange(4):
+ assert o['key%d' % i] == ('var%d' % i)
+ keys = tuple(o)
+
+ # Test a valid config file with type conversion
+ good = tmp.join('good.conf')
+ open(good, 'w').write(config_good)
+ assert path.isfile(good)
+ assert o._merge_from_file(good) == (6, 6)
+ added = ('string', 'null', 'yes', 'no', 'number', 'floating')
+ assert list(o) == sorted(keys + added)
+ assert o.string == 'Hello world!'
+ assert o.null is None
+ assert o.yes is True
+ assert o.no is False
+ assert o.number == 42
+ assert o.floating == 3.14
+
+ def new(self):
+ """
+ Set os.environ['HOME'] to a tempdir.
+
+ Returns tuple with new Env instance and the TempHome instance. This
+ helper method is used in testing the bootstrap related methods below.
+ """
+ home = TempHome()
+ return (self.cls(), home)
+
+ def bootstrap(self, **overrides):
+ """
+ Helper method used in testing bootstrap related methods below.
+ """
+ (o, home) = self.new()
+ assert o._isdone('_bootstrap') is False
+ o._bootstrap(**overrides)
+ assert o._isdone('_bootstrap') is True
+ e = raises(StandardError, o._bootstrap)
+ assert str(e) == 'Env._bootstrap() already called'
+ return (o, home)
+
+ def test_bootstrap(self):
+ """
+ Test the `ipalib.config.Env._bootstrap` method.
+ """
+ # Test defaults created by _bootstrap():
+ (o, home) = self.new()
+ o._bootstrap()
+ ipalib = path.dirname(path.abspath(config.__file__))
+ assert o.ipalib == ipalib
+ assert o.site_packages == path.dirname(ipalib)
+ assert o.script == path.abspath(sys.argv[0])
+ assert o.bin == path.dirname(path.abspath(sys.argv[0]))
+ assert o.home == home.path
+ assert o.dot_ipa == home.join('.ipa')
+ assert o.in_tree is False
+ assert o.context == 'default'
+ assert o.conf == '/etc/ipa/default.conf'
+ assert o.conf_default == o.conf
+
+ # Test overriding values created by _bootstrap()
+ (o, home) = self.bootstrap(in_tree='True', context='server')
+ assert o.in_tree is True
+ assert o.context == 'server'
+ assert o.conf == home.join('.ipa', 'server.conf')
+ (o, home) = self.bootstrap(conf='/my/wacky/whatever.conf')
+ assert o.in_tree is False
+ assert o.context == 'default'
+ assert o.conf == '/my/wacky/whatever.conf'
+ assert o.conf_default == '/etc/ipa/default.conf'
+ (o, home) = self.bootstrap(conf_default='/my/wacky/default.conf')
+ assert o.in_tree is False
+ assert o.context == 'default'
+ assert o.conf == '/etc/ipa/default.conf'
+ assert o.conf_default == '/my/wacky/default.conf'
+
+ # Test various overrides and types conversion
+ kw = dict(
+ yes=True,
+ no=False,
+ num=42,
+ msg='Hello, world!',
+ )
+ override = dict(
+ (k, u' %s ' % v) for (k, v) in kw.items()
+ )
+ (o, home) = self.new()
+ for key in kw:
+ assert key not in o
+ o._bootstrap(**override)
+ for (key, value) in kw.items():
+ assert getattr(o, key) == value
+ assert o[key] == value
+
+ def finalize_core(self, **defaults):
+ """
+ Helper method used in testing `Env._finalize_core`.
+ """
+ (o, home) = self.new()
+ assert o._isdone('_finalize_core') is False
+ o._finalize_core(**defaults)
+ assert o._isdone('_finalize_core') is True
+ e = raises(StandardError, o._finalize_core)
+ assert str(e) == 'Env._finalize_core() already called'
+ return (o, home)
+
+ def test_finalize_core(self):
+ """
+ Test the `ipalib.config.Env._finalize_core` method.
+ """
+ # Check that calls cascade up the chain:
+ (o, home) = self.new()
+ assert o._isdone('_bootstrap') is False
+ assert o._isdone('_finalize_core') is False
+ assert o._isdone('_finalize') is False
+ o._finalize_core()
+ assert o._isdone('_bootstrap') is True
+ assert o._isdone('_finalize_core') is True
+ assert o._isdone('_finalize') is False
+
+ # Check that it can't be called twice:
+ e = raises(StandardError, o._finalize_core)
+ assert str(e) == 'Env._finalize_core() already called'
+
+ # Check that _bootstrap() did its job:
+ (o, home) = self.bootstrap()
+ assert 'in_tree' in o
+ assert 'conf' in o
+ assert 'context' in o
+
+ # Check that keys _finalize_core() will set are not set yet:
+ assert 'log' not in o
+ assert 'in_server' not in o
+
+ # Check that _finalize_core() did its job:
+ o._finalize_core()
+ assert 'in_server' in o
+ assert 'log' in o
+ assert o.in_tree is False
+ assert o.context == 'default'
+ assert o.in_server is False
+ assert o.log == '/var/log/ipa/default.log'
+
+ # Check log is in ~/.ipa/log when context='cli'
+ (o, home) = self.bootstrap(context='cli')
+ o._finalize_core()
+ assert o.in_tree is False
+ assert o.log == home.join('.ipa', 'log', 'cli.log')
+
+ # Check **defaults can't set in_server nor log:
+ (o, home) = self.bootstrap(in_server='True')
+ o._finalize_core(in_server=False)
+ assert o.in_server is True
+ (o, home) = self.bootstrap(log='/some/silly/log')
+ o._finalize_core(log='/a/different/log')
+ assert o.log == '/some/silly/log'
+
+ # Test loading config file, plus test some in-tree stuff
+ (o, home) = self.bootstrap(in_tree=True, context='server')
+ for key in ('yes', 'no', 'number'):
+ assert key not in o
+ home.write(config_good, '.ipa', 'server.conf')
+ home.write(config_default, '.ipa', 'default.conf')
+ o._finalize_core()
+ assert o.in_tree is True
+ assert o.context == 'server'
+ assert o.in_server is True
+ assert o.log == home.join('.ipa', 'log', 'server.log')
+ assert o.yes is True
+ assert o.no is False
+ assert o.number == 42
+ assert o.not_in_other == 'foo_bar'
+
+ # Test using DEFAULT_CONFIG:
+ defaults = dict(constants.DEFAULT_CONFIG)
+ (o, home) = self.finalize_core(**defaults)
+ assert list(o) == sorted(defaults)
+ for (key, value) in defaults.items():
+ if value is object:
+ continue
+ assert o[key] is value, value
+
+ def test_finalize(self):
+ """
+ Test the `ipalib.config.Env._finalize` method.
+ """
+ # Check that calls cascade up the chain:
+ (o, home) = self.new()
+ assert o._isdone('_bootstrap') is False
+ assert o._isdone('_finalize_core') is False
+ assert o._isdone('_finalize') is False
+ o._finalize()
+ assert o._isdone('_bootstrap') is True
+ assert o._isdone('_finalize_core') is True
+ assert o._isdone('_finalize') is True
+
+ # Check that it can't be called twice:
+ e = raises(StandardError, o._finalize)
+ assert str(e) == 'Env._finalize() already called'
+
+ # Check that _finalize() calls __lock__()
+ (o, home) = self.new()
+ assert o.__islocked__() is False
+ o._finalize()
+ assert o.__islocked__() is True
+ e = raises(StandardError, o.__lock__)
+ assert str(e) == 'Env.__lock__() already called'
+
+ # Check that **lastchance works
+ (o, home) = self.finalize_core()
+ key = 'just_one_more_key'
+ value = 'with one more value'
+ lastchance = {key: value}
+ assert key not in o
+ assert o._isdone('_finalize') is False
+ o._finalize(**lastchance)
+ assert key in o
+ assert o[key] is value
diff --git a/tests/test_ipalib/test_crud.py b/tests/test_ipalib/test_crud.py
new file mode 100644
index 00000000..ad391e2e
--- /dev/null
+++ b/tests/test_ipalib/test_crud.py
@@ -0,0 +1,237 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.crud` module.
+"""
+
+from tests.util import read_only, raises, get_api, ClassChecker
+from ipalib import crud, frontend, plugable, config
+
+
+class CrudChecker(ClassChecker):
+ """
+ Class for testing base classes in `ipalib.crud`.
+ """
+
+ def get_api(self, args=tuple(), options={}):
+ """
+ Return a finalized `ipalib.plugable.API` instance.
+ """
+ assert self.cls.__bases__ == (frontend.Method,)
+ (api, home) = get_api()
+ class user(frontend.Object):
+ takes_params = (
+ 'givenname',
+ 'sn',
+ frontend.Param('uid', primary_key=True),
+ 'initials',
+ )
+ class user_verb(self.cls):
+ takes_args = args
+ takes_options = options
+ api.register(user)
+ api.register(user_verb)
+ api.finalize()
+ return api
+
+
+class test_Add(CrudChecker):
+ """
+ Test the `ipalib.crud.Add` class.
+ """
+
+ _cls = crud.Add
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Add.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+ api = self.get_api(args=('extra?',))
+ assert list(api.Method.user_verb.args) == ['uid', 'extra']
+ assert api.Method.user_verb.args.uid.required is True
+ assert api.Method.user_verb.args.extra.required is False
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Add.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials']
+ for param in api.Method.user_verb.options():
+ assert param.required is True
+ api = self.get_api(options=('extra?',))
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials', 'extra']
+ assert api.Method.user_verb.options.extra.required is False
+
+
+class test_Get(CrudChecker):
+ """
+ Test the `ipalib.crud.Get` class.
+ """
+
+ _cls = crud.Get
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Get.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Get.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == []
+ assert len(api.Method.user_verb.options) == 0
+
+
+class test_Del(CrudChecker):
+ """
+ Test the `ipalib.crud.Del` class.
+ """
+
+ _cls = crud.Del
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Del.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Del.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == []
+ assert len(api.Method.user_verb.options) == 0
+
+
+class test_Mod(CrudChecker):
+ """
+ Test the `ipalib.crud.Mod` class.
+ """
+
+ _cls = crud.Mod
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Mod.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Mod.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials']
+ for param in api.Method.user_verb.options():
+ assert param.required is False
+
+
+class test_Find(CrudChecker):
+ """
+ Test the `ipalib.crud.Find` class.
+ """
+
+ _cls = crud.Find
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Find.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Find.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials']
+ for param in api.Method.user_verb.options():
+ assert param.required is False
+
+
+class test_CrudBackend(ClassChecker):
+ """
+ Test the `ipalib.crud.CrudBackend` class.
+ """
+
+ _cls = crud.CrudBackend
+
+ def get_subcls(self):
+ class ldap(self.cls):
+ pass
+ return ldap
+
+ def check_method(self, name, *args):
+ o = self.cls()
+ e = raises(NotImplementedError, getattr(o, name), *args)
+ assert str(e) == 'CrudBackend.%s()' % name
+ sub = self.subcls()
+ e = raises(NotImplementedError, getattr(sub, name), *args)
+ assert str(e) == 'ldap.%s()' % name
+
+ def test_create(self):
+ """
+ Test the `ipalib.crud.CrudBackend.create` method.
+ """
+ self.check_method('create')
+
+ def test_retrieve(self):
+ """
+ Test the `ipalib.crud.CrudBackend.retrieve` method.
+ """
+ self.check_method('retrieve', 'primary key', 'attribute')
+
+ def test_update(self):
+ """
+ Test the `ipalib.crud.CrudBackend.update` method.
+ """
+ self.check_method('update', 'primary key')
+
+ def test_delete(self):
+ """
+ Test the `ipalib.crud.CrudBackend.delete` method.
+ """
+ self.check_method('delete', 'primary key')
+
+ def test_search(self):
+ """
+ Test the `ipalib.crud.CrudBackend.search` method.
+ """
+ self.check_method('search')
diff --git a/tests/test_ipalib/test_error2.py b/tests/test_ipalib/test_error2.py
new file mode 100644
index 00000000..cd13ba77
--- /dev/null
+++ b/tests/test_ipalib/test_error2.py
@@ -0,0 +1,371 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.error2` module.
+"""
+
+import re
+import inspect
+from tests.util import assert_equal, raises, dummy_ugettext
+from ipalib import errors2, request
+from ipalib.constants import TYPE_ERROR
+
+
+class PrivateExceptionTester(object):
+ _klass = None
+ __klass = None
+
+ def __get_klass(self):
+ if self.__klass is None:
+ self.__klass = self._klass
+ assert issubclass(self.__klass, StandardError)
+ assert issubclass(self.__klass, errors2.PrivateError)
+ assert not issubclass(self.__klass, errors2.PublicError)
+ return self.__klass
+ klass = property(__get_klass)
+
+ def new(self, **kw):
+ for (key, value) in kw.iteritems():
+ assert not hasattr(self.klass, key), key
+ inst = self.klass(**kw)
+ assert isinstance(inst, StandardError)
+ assert isinstance(inst, errors2.PrivateError)
+ assert isinstance(inst, self.klass)
+ assert not isinstance(inst, errors2.PublicError)
+ for (key, value) in kw.iteritems():
+ assert getattr(inst, key) is value
+ assert str(inst) == self.klass.format % kw
+ assert inst.message == str(inst)
+ return inst
+
+
+class test_PrivateError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors2.PrivateError` exception.
+ """
+ _klass = errors2.PrivateError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors2.PrivateError.__init__` method.
+ """
+ inst = self.klass(key1='Value 1', key2='Value 2')
+ assert inst.key1 == 'Value 1'
+ assert inst.key2 == 'Value 2'
+ assert str(inst) == ''
+
+ # Test subclass and use of format:
+ class subclass(self.klass):
+ format = '%(true)r %(text)r %(number)r'
+
+ kw = dict(true=True, text='Hello!', number=18)
+ inst = subclass(**kw)
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+ assert str(inst) == subclass.format % kw
+
+ # Test via PrivateExceptionTester.new()
+ inst = self.new(**kw)
+ assert isinstance(inst, self.klass)
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+
+
+class test_SubprocessError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors2.SubprocessError` exception.
+ """
+
+ _klass = errors2.SubprocessError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors2.SubprocessError.__init__` method.
+ """
+ inst = self.new(returncode=1, argv=('/bin/false',))
+ assert inst.returncode == 1
+ assert inst.argv == ('/bin/false',)
+ assert str(inst) == "return code 1 from ('/bin/false',)"
+ assert inst.message == str(inst)
+
+
+class test_PluginSubclassError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors2.PluginSubclassError` exception.
+ """
+
+ _klass = errors2.PluginSubclassError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors2.PluginSubclassError.__init__` method.
+ """
+ inst = self.new(plugin='bad', bases=('base1', 'base2'))
+ assert inst.plugin == 'bad'
+ assert inst.bases == ('base1', 'base2')
+ assert str(inst) == \
+ "'bad' not subclass of any base in ('base1', 'base2')"
+ assert inst.message == str(inst)
+
+
+class test_PluginDuplicateError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors2.PluginDuplicateError` exception.
+ """
+
+ _klass = errors2.PluginDuplicateError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors2.PluginDuplicateError.__init__` method.
+ """
+ inst = self.new(plugin='my_plugin')
+ assert inst.plugin == 'my_plugin'
+ assert str(inst) == "'my_plugin' was already registered"
+ assert inst.message == str(inst)
+
+
+class test_PluginOverrideError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors2.PluginOverrideError` exception.
+ """
+
+ _klass = errors2.PluginOverrideError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors2.PluginOverrideError.__init__` method.
+ """
+ inst = self.new(base='Base', name='cmd', plugin='my_cmd')
+ assert inst.base == 'Base'
+ assert inst.name == 'cmd'
+ assert inst.plugin == 'my_cmd'
+ assert str(inst) == "unexpected override of Base.cmd with 'my_cmd'"
+ assert inst.message == str(inst)
+
+
+class test_PluginMissingOverrideError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors2.PluginMissingOverrideError` exception.
+ """
+
+ _klass = errors2.PluginMissingOverrideError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors2.PluginMissingOverrideError.__init__` method.
+ """
+ inst = self.new(base='Base', name='cmd', plugin='my_cmd')
+ assert inst.base == 'Base'
+ assert inst.name == 'cmd'
+ assert inst.plugin == 'my_cmd'
+ assert str(inst) == "Base.cmd not registered, cannot override with 'my_cmd'"
+ assert inst.message == str(inst)
+
+
+
+##############################################################################
+# Unit tests for public errors:
+
+class PublicExceptionTester(object):
+ _klass = None
+ __klass = None
+
+ def __get_klass(self):
+ if self.__klass is None:
+ self.__klass = self._klass
+ assert issubclass(self.__klass, StandardError)
+ assert issubclass(self.__klass, errors2.PublicError)
+ assert not issubclass(self.__klass, errors2.PrivateError)
+ assert type(self.__klass.errno) is int
+ assert 900 <= self.__klass.errno <= 5999
+ return self.__klass
+ klass = property(__get_klass)
+
+ def new(self, format=None, message=None, **kw):
+ # Test that TypeError is raised if message isn't unicode:
+ e = raises(TypeError, self.klass, message='The message')
+ assert str(e) == TYPE_ERROR % ('message', unicode, 'The message', str)
+
+ # Test the instance:
+ for (key, value) in kw.iteritems():
+ assert not hasattr(self.klass, key), key
+ inst = self.klass(format=format, message=message, **kw)
+ assert isinstance(inst, StandardError)
+ assert isinstance(inst, errors2.PublicError)
+ assert isinstance(inst, self.klass)
+ assert not isinstance(inst, errors2.PrivateError)
+ for (key, value) in kw.iteritems():
+ assert getattr(inst, key) is value
+ return inst
+
+
+class test_PublicError(PublicExceptionTester):
+ """
+ Test the `ipalib.errors2.PublicError` exception.
+ """
+ _klass = errors2.PublicError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors2.PublicError.__init__` method.
+ """
+ context = request.context
+ message = u'The translated, interpolated message'
+ format = 'key=%(key1)r and key2=%(key2)r'
+ uformat = u'Translated key=%(key1)r and key2=%(key2)r'
+ val1 = 'Value 1'
+ val2 = 'Value 2'
+ kw = dict(key1=val1, key2=val2)
+
+ assert not hasattr(context, 'ugettext')
+
+ # Test with format=str, message=None
+ dummy = dummy_ugettext(uformat)
+ context.ugettext = dummy
+ inst = self.klass(format, **kw)
+ assert dummy.message is format # Means ugettext() called
+ assert inst.format is format
+ assert_equal(inst.message, format % kw)
+ assert_equal(inst.strerror, uformat % kw)
+ assert inst.forwarded is False
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+ # Test with format=None, message=unicode
+ dummy = dummy_ugettext(uformat)
+ context.ugettext = dummy
+ inst = self.klass(message=message, **kw)
+ assert not hasattr(dummy, 'message') # Means ugettext() not called
+ assert inst.format is None
+ assert inst.message is message
+ assert inst.strerror is message
+ assert inst.forwarded is True
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+ # Test with format=None, message=str
+ e = raises(TypeError, self.klass, message='the message', **kw)
+ assert str(e) == TYPE_ERROR % ('message', unicode, 'the message', str)
+
+ # Test with format=None, message=None
+ e = raises(ValueError, self.klass, **kw)
+ assert str(e) == \
+ 'PublicError.format is None yet format=None, message=None'
+
+
+ ######################################
+ # Test via PublicExceptionTester.new()
+
+ # Test with format=str, message=None
+ dummy = dummy_ugettext(uformat)
+ context.ugettext = dummy
+ inst = self.new(format, **kw)
+ assert isinstance(inst, self.klass)
+ assert dummy.message is format # Means ugettext() called
+ assert inst.format is format
+ assert_equal(inst.message, format % kw)
+ assert_equal(inst.strerror, uformat % kw)
+ assert inst.forwarded is False
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+ # Test with format=None, message=unicode
+ dummy = dummy_ugettext(uformat)
+ context.ugettext = dummy
+ inst = self.new(message=message, **kw)
+ assert isinstance(inst, self.klass)
+ assert not hasattr(dummy, 'message') # Means ugettext() not called
+ assert inst.format is None
+ assert inst.message is message
+ assert inst.strerror is message
+ assert inst.forwarded is True
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+
+ ##################
+ # Test a subclass:
+ class subclass(self.klass):
+ format = '%(true)r %(text)r %(number)r'
+
+ uformat = u'Translated %(true)r %(text)r %(number)r'
+ kw = dict(true=True, text='Hello!', number=18)
+
+ dummy = dummy_ugettext(uformat)
+ context.ugettext = dummy
+
+ # Test with format=str, message=None
+ e = raises(ValueError, subclass, format, **kw)
+ assert str(e) == 'non-generic %r needs format=None; got format=%r' % (
+ 'subclass', format)
+
+ # Test with format=None, message=None:
+ inst = subclass(**kw)
+ assert dummy.message is subclass.format # Means ugettext() called
+ assert inst.format is subclass.format
+ assert_equal(inst.message, subclass.format % kw)
+ assert_equal(inst.strerror, uformat % kw)
+ assert inst.forwarded is False
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+
+ # Test with format=None, message=unicode:
+ dummy = dummy_ugettext(uformat)
+ context.ugettext = dummy
+ inst = subclass(message=message, **kw)
+ assert not hasattr(dummy, 'message') # Means ugettext() not called
+ assert inst.format is subclass.format
+ assert inst.message is message
+ assert inst.strerror is message
+ assert inst.forwarded is True
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+ del context.ugettext
+
+
+def test_public_errors():
+ """
+ Test the `ipalib.errors2.public_errors` module variable.
+ """
+ i = 0
+ for klass in errors2.public_errors:
+ assert issubclass(klass, StandardError)
+ assert issubclass(klass, errors2.PublicError)
+ assert not issubclass(klass, errors2.PrivateError)
+ assert type(klass.errno) is int
+ assert 900 <= klass.errno <= 5999
+ doc = inspect.getdoc(klass)
+ assert doc is not None, 'need class docstring for %s' % klass.__name__
+ m = re.match(r'^\*{2}(\d+)\*{2} ', doc)
+ assert m is not None, "need '**ERRNO**' in %s docstring" % klass.__name__
+ errno = int(m.group(1))
+ assert errno == klass.errno, (
+ 'docstring=%r but errno=%r in %s' % (errno, klass.errno, klass.__name__)
+ )
+
+ # Test format
+ if klass.format is not None:
+ assert klass.format is errors2.__messages[i]
+ i += 1
diff --git a/tests/test_ipalib/test_errors.py b/tests/test_ipalib/test_errors.py
new file mode 100644
index 00000000..f1dd5dc8
--- /dev/null
+++ b/tests/test_ipalib/test_errors.py
@@ -0,0 +1,289 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.errors` module.
+"""
+
+from tests.util import raises, ClassChecker
+from ipalib import errors
+
+
+type_format = '%s: need a %r; got %r'
+
+
+def check_TypeError(f, value, type_, name, **kw):
+ e = raises(TypeError, f, value, type_, name, **kw)
+ assert e.value is value
+ assert e.type is type_
+ assert e.name is name
+ assert str(e) == type_format % (name, type_, value)
+
+
+def test_raise_TypeError():
+ """
+ Test the `ipalib.errors.raise_TypeError` function.
+ """
+ f = errors.raise_TypeError
+ value = 'Hello.'
+ type_ = unicode
+ name = 'message'
+
+ check_TypeError(f, value, type_, name)
+
+ # name not an str
+ fail_name = 42
+ e = raises(AssertionError, f, value, type_, fail_name)
+ assert str(e) == type_format % ('name', str, fail_name), str(e)
+
+ # type_ not a type:
+ fail_type = unicode()
+ e = raises(AssertionError, f, value, fail_type, name)
+ assert str(e) == type_format % ('type_', type, fail_type)
+
+ # type(value) is type_:
+ fail_value = u'How are you?'
+ e = raises(AssertionError, f, fail_value, type_, name)
+ assert str(e) == 'value: %r is a %r' % (fail_value, type_)
+
+
+def test_check_type():
+ """
+ Test the `ipalib.errors.check_type` function.
+ """
+ f = errors.check_type
+ value = 'How are you?'
+ type_ = str
+ name = 'greeting'
+
+ # Should pass:
+ assert value is f(value, type_, name)
+ assert None is f(None, type_, name, allow_none=True)
+
+ # Should raise TypeError
+ check_TypeError(f, None, type_, name)
+ check_TypeError(f, value, basestring, name)
+ check_TypeError(f, value, unicode, name)
+
+ # name not an str
+ fail_name = unicode(name)
+ e = raises(AssertionError, f, value, type_, fail_name)
+ assert str(e) == type_format % ('name', str, fail_name)
+
+ # type_ not a type:
+ fail_type = 42
+ e = raises(AssertionError, f, value, fail_type, name)
+ assert str(e) == type_format % ('type_', type, fail_type)
+
+ # allow_none not a bool:
+ fail_bool = 0
+ e = raises(AssertionError, f, value, type_, name, allow_none=fail_bool)
+ assert str(e) == type_format % ('allow_none', bool, fail_bool)
+
+
+def test_check_isinstance():
+ """
+ Test the `ipalib.errors.check_isinstance` function.
+ """
+ f = errors.check_isinstance
+ value = 'How are you?'
+ type_ = str
+ name = 'greeting'
+
+ # Should pass:
+ assert value is f(value, type_, name)
+ assert value is f(value, basestring, name)
+ assert None is f(None, type_, name, allow_none=True)
+
+ # Should raise TypeError
+ check_TypeError(f, None, type_, name)
+ check_TypeError(f, value, unicode, name)
+
+ # name not an str
+ fail_name = unicode(name)
+ e = raises(AssertionError, f, value, type_, fail_name)
+ assert str(e) == type_format % ('name', str, fail_name)
+
+ # type_ not a type:
+ fail_type = 42
+ e = raises(AssertionError, f, value, fail_type, name)
+ assert str(e) == type_format % ('type_', type, fail_type)
+
+ # allow_none not a bool:
+ fail_bool = 0
+ e = raises(AssertionError, f, value, type_, name, allow_none=fail_bool)
+ assert str(e) == type_format % ('allow_none', bool, fail_bool)
+
+
+class test_IPAError(ClassChecker):
+ """
+ Test the `ipalib.errors.IPAError` exception.
+ """
+ _cls = errors.IPAError
+
+ def test_class(self):
+ """
+ Test the `ipalib.errors.IPAError` exception.
+ """
+ assert self.cls.__bases__ == (StandardError,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.IPAError.__init__` method.
+ """
+ args = ('one fish', 'two fish')
+ e = self.cls(*args)
+ assert e.args == args
+ assert self.cls().args == tuple()
+
+ def test_str(self):
+ """
+ Test the `ipalib.errors.IPAError.__str__` method.
+ """
+ f = 'The %s color is %s.'
+ class custom_error(self.cls):
+ format = f
+ for args in [('sexiest', 'red'), ('most-batman-like', 'black')]:
+ e = custom_error(*args)
+ assert e.args == args
+ assert str(e) == f % args
+
+
+class test_ValidationError(ClassChecker):
+ """
+ Test the `ipalib.errors.ValidationError` exception.
+ """
+ _cls = errors.ValidationError
+
+ def test_class(self):
+ """
+ Test the `ipalib.errors.ValidationError` exception.
+ """
+ assert self.cls.__bases__ == (errors.IPAError,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.ValidationError.__init__` method.
+ """
+ name = 'login'
+ value = 'Whatever'
+ error = 'Must be lowercase.'
+ for index in (None, 3):
+ e = self.cls(name, value, error, index=index)
+ assert e.name is name
+ assert e.value is value
+ assert e.error is error
+ assert e.index is index
+ assert str(e) == 'invalid %r value %r: %s' % (name, value, error)
+ # Check that index default is None:
+ assert self.cls(name, value, error).index is None
+ # Check non str name raises AssertionError:
+ raises(AssertionError, self.cls, unicode(name), value, error)
+ # Check non int index raises AssertionError:
+ raises(AssertionError, self.cls, name, value, error, index=5.0)
+ # Check negative index raises AssertionError:
+ raises(AssertionError, self.cls, name, value, error, index=-2)
+
+
+class test_ConversionError(ClassChecker):
+ """
+ Test the `ipalib.errors.ConversionError` exception.
+ """
+ _cls = errors.ConversionError
+
+ def test_class(self):
+ """
+ Test the `ipalib.errors.ConversionError` exception.
+ """
+ assert self.cls.__bases__ == (errors.ValidationError,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.ConversionError.__init__` method.
+ """
+ name = 'some_arg'
+ value = '42.0'
+ class type_(object):
+ conversion_error = 'Not an integer'
+ for index in (None, 7):
+ e = self.cls(name, value, type_, index=index)
+ assert e.name is name
+ assert e.value is value
+ assert e.type is type_
+ assert e.error is type_.conversion_error
+ assert e.index is index
+ assert str(e) == 'invalid %r value %r: %s' % (name, value,
+ type_.conversion_error)
+ # Check that index default is None:
+ assert self.cls(name, value, type_).index is None
+
+
+class test_RuleError(ClassChecker):
+ """
+ Test the `ipalib.errors.RuleError` exception.
+ """
+ _cls = errors.RuleError
+
+ def test_class(self):
+ """
+ Test the `ipalib.errors.RuleError` exception.
+ """
+ assert self.cls.__bases__ == (errors.ValidationError,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.RuleError.__init__` method.
+ """
+ name = 'whatever'
+ value = 'The smallest weird number.'
+ def my_rule(value):
+ return 'Value is bad.'
+ error = my_rule(value)
+ for index in (None, 42):
+ e = self.cls(name, value, error, my_rule, index=index)
+ assert e.name is name
+ assert e.value is value
+ assert e.error is error
+ assert e.rule is my_rule
+ # Check that index default is None:
+ assert self.cls(name, value, error, my_rule).index is None
+
+
+class test_RequirementError(ClassChecker):
+ """
+ Test the `ipalib.errors.RequirementError` exception.
+ """
+ _cls = errors.RequirementError
+
+ def test_class(self):
+ """
+ Test the `ipalib.errors.RequirementError` exception.
+ """
+ assert self.cls.__bases__ == (errors.ValidationError,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.RequirementError.__init__` method.
+ """
+ name = 'givenname'
+ e = self.cls(name)
+ assert e.name is name
+ assert e.value is None
+ assert e.error == 'Required'
+ assert e.index is None
diff --git a/tests/test_ipalib/test_frontend.py b/tests/test_ipalib/test_frontend.py
new file mode 100644
index 00000000..071a70fd
--- /dev/null
+++ b/tests/test_ipalib/test_frontend.py
@@ -0,0 +1,771 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.frontend` module.
+"""
+
+from tests.util import raises, getitem, no_set, no_del, read_only
+from tests.util import check_TypeError, ClassChecker, create_test_api
+from tests.util import assert_equal
+from ipalib.constants import TYPE_ERROR
+from ipalib import frontend, backend, plugable, errors2, errors, parameters, config
+
+
+def test_RULE_FLAG():
+ assert frontend.RULE_FLAG == 'validation_rule'
+
+
+def test_rule():
+ """
+ Test the `ipalib.frontend.rule` function.
+ """
+ flag = frontend.RULE_FLAG
+ rule = frontend.rule
+ def my_func():
+ pass
+ assert not hasattr(my_func, flag)
+ rule(my_func)
+ assert getattr(my_func, flag) is True
+ @rule
+ def my_func2():
+ pass
+ assert getattr(my_func2, flag) is True
+
+
+def test_is_rule():
+ """
+ Test the `ipalib.frontend.is_rule` function.
+ """
+ is_rule = frontend.is_rule
+ flag = frontend.RULE_FLAG
+
+ class no_call(object):
+ def __init__(self, value):
+ if value is not None:
+ assert value in (True, False)
+ setattr(self, flag, value)
+
+ class call(no_call):
+ def __call__(self):
+ pass
+
+ assert is_rule(call(True))
+ assert not is_rule(no_call(True))
+ assert not is_rule(call(False))
+ assert not is_rule(call(None))
+
+
+class test_Command(ClassChecker):
+ """
+ Test the `ipalib.frontend.Command` class.
+ """
+
+ _cls = frontend.Command
+
+ def get_subcls(self):
+ """
+ Return a standard subclass of `ipalib.frontend.Command`.
+ """
+ class Rule(object):
+ def __init__(self, name):
+ self.name = name
+
+ def __call__(self, _, value):
+ if value != self.name:
+ return _('must equal %r') % self.name
+
+ default_from = parameters.DefaultFrom(
+ lambda arg: arg,
+ 'default_from'
+ )
+ normalizer = lambda value: value.lower()
+
+ class example(self.cls):
+ takes_options = (
+ parameters.Str('option0', Rule('option0'),
+ normalizer=normalizer,
+ default_from=default_from,
+ ),
+ parameters.Str('option1', Rule('option1'),
+ normalizer=normalizer,
+ default_from=default_from,
+ ),
+ )
+ return example
+
+ def get_instance(self, args=tuple(), options=tuple()):
+ """
+ Helper method used to test args and options.
+ """
+ class example(self.cls):
+ takes_args = args
+ takes_options = options
+ o = example()
+ o.finalize()
+ return o
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Command` class.
+ """
+ assert self.cls.__bases__ == (plugable.Plugin,)
+ assert self.cls.takes_options == tuple()
+ assert self.cls.takes_args == tuple()
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.frontend.Command.get_args` method.
+ """
+ assert list(self.cls().get_args()) == []
+ args = ('login', 'stuff')
+ o = self.get_instance(args=args)
+ assert o.get_args() is args
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.frontend.Command.get_options` method.
+ """
+ assert list(self.cls().get_options()) == []
+ options = ('verbose', 'debug')
+ o = self.get_instance(options=options)
+ assert o.get_options() is options
+
+ def test_args(self):
+ """
+ Test the ``ipalib.frontend.Command.args`` instance attribute.
+ """
+ assert 'args' in self.cls.__public__ # Public
+ assert self.cls().args is None
+ o = self.cls()
+ o.finalize()
+ assert type(o.args) is plugable.NameSpace
+ assert len(o.args) == 0
+ args = ('destination', 'source?')
+ ns = self.get_instance(args=args).args
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == len(args)
+ assert list(ns) == ['destination', 'source']
+ assert type(ns.destination) is parameters.Str
+ assert type(ns.source) is parameters.Str
+ assert ns.destination.required is True
+ assert ns.destination.multivalue is False
+ assert ns.source.required is False
+ assert ns.source.multivalue is False
+
+ # Test TypeError:
+ e = raises(TypeError, self.get_instance, args=(u'whatever',))
+ assert str(e) == TYPE_ERROR % (
+ 'spec', (str, parameters.Param), u'whatever', unicode)
+
+ # Test ValueError, required after optional:
+ e = raises(ValueError, self.get_instance, args=('arg1?', 'arg2'))
+ assert str(e) == 'arg2: required argument after optional'
+
+ # Test ValueError, scalar after multivalue:
+ e = raises(ValueError, self.get_instance, args=('arg1+', 'arg2'))
+ assert str(e) == 'arg2: only final argument can be multivalue'
+
+ def test_max_args(self):
+ """
+ Test the ``ipalib.frontend.Command.max_args`` instance attribute.
+ """
+ o = self.get_instance()
+ assert o.max_args == 0
+ o = self.get_instance(args=('one?',))
+ assert o.max_args == 1
+ o = self.get_instance(args=('one', 'two?'))
+ assert o.max_args == 2
+ o = self.get_instance(args=('one', 'multi+',))
+ assert o.max_args is None
+ o = self.get_instance(args=('one', 'multi*',))
+ assert o.max_args is None
+
+ def test_options(self):
+ """
+ Test the ``ipalib.frontend.Command.options`` instance attribute.
+ """
+ assert 'options' in self.cls.__public__ # Public
+ assert self.cls().options is None
+ o = self.cls()
+ o.finalize()
+ assert type(o.options) is plugable.NameSpace
+ assert len(o.options) == 0
+ options = ('target', 'files*')
+ ns = self.get_instance(options=options).options
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == len(options)
+ assert list(ns) == ['target', 'files']
+ assert type(ns.target) is parameters.Str
+ assert type(ns.files) is parameters.Str
+ assert ns.target.required is True
+ assert ns.target.multivalue is False
+ assert ns.files.required is False
+ assert ns.files.multivalue is True
+
+ def test_convert(self):
+ """
+ Test the `ipalib.frontend.Command.convert` method.
+ """
+ assert 'convert' in self.cls.__public__ # Public
+ kw = dict(
+ option0=u'1.5',
+ option1=u'7',
+ )
+ o = self.subcls()
+ o.finalize()
+ for (key, value) in o.convert(**kw).iteritems():
+ assert_equal(unicode(kw[key]), value)
+
+ def test_normalize(self):
+ """
+ Test the `ipalib.frontend.Command.normalize` method.
+ """
+ assert 'normalize' in self.cls.__public__ # Public
+ kw = dict(
+ option0=u'OPTION0',
+ option1=u'OPTION1',
+ )
+ norm = dict((k, v.lower()) for (k, v) in kw.items())
+ sub = self.subcls()
+ sub.finalize()
+ assert sub.normalize(**kw) == norm
+
+ def test_get_default(self):
+ """
+ Test the `ipalib.frontend.Command.get_default` method.
+ """
+ assert 'get_default' in self.cls.__public__ # Public
+ # FIXME: Add an updated unit tests for get_default()
+
+ def test_validate(self):
+ """
+ Test the `ipalib.frontend.Command.validate` method.
+ """
+ assert 'validate' in self.cls.__public__ # Public
+
+ sub = self.subcls()
+ sub.finalize()
+
+ # Check with valid values
+ okay = dict(
+ option0=u'option0',
+ option1=u'option1',
+ another_option='some value',
+ )
+ sub.validate(**okay)
+
+ # Check with an invalid value
+ fail = dict(okay)
+ fail['option0'] = u'whatever'
+ e = raises(errors2.ValidationError, sub.validate, **fail)
+ assert_equal(e.name, 'option0')
+ assert_equal(e.value, u'whatever')
+ assert_equal(e.error, u"must equal 'option0'")
+ assert e.rule.__class__.__name__ == 'Rule'
+ assert e.index is None
+
+ # Check with a missing required arg
+ fail = dict(okay)
+ fail.pop('option1')
+ e = raises(errors.RequirementError, sub.validate, **fail)
+ assert e.name == 'option1'
+ assert e.value is None
+ assert e.index is None
+
+ def test_execute(self):
+ """
+ Test the `ipalib.frontend.Command.execute` method.
+ """
+ assert 'execute' in self.cls.__public__ # Public
+ o = self.cls()
+ e = raises(NotImplementedError, o.execute)
+ assert str(e) == 'Command.execute()'
+
+ def test_args_to_kw(self):
+ """
+ Test the `ipalib.frontend.Command.args_to_kw` method.
+ """
+ assert 'args_to_kw' in self.cls.__public__ # Public
+ o = self.get_instance(args=('one', 'two?'))
+ assert o.args_to_kw(1) == dict(one=1)
+ assert o.args_to_kw(1, 2) == dict(one=1, two=2)
+
+ o = self.get_instance(args=('one', 'two*'))
+ assert o.args_to_kw(1) == dict(one=1)
+ assert o.args_to_kw(1, 2) == dict(one=1, two=(2,))
+ assert o.args_to_kw(1, 2, 3) == dict(one=1, two=(2, 3))
+
+ o = self.get_instance(args=('one', 'two+'))
+ assert o.args_to_kw(1) == dict(one=1)
+ assert o.args_to_kw(1, 2) == dict(one=1, two=(2,))
+ assert o.args_to_kw(1, 2, 3) == dict(one=1, two=(2, 3))
+
+ o = self.get_instance()
+ e = raises(errors.ArgumentError, o.args_to_kw, 1)
+ assert str(e) == 'example takes no arguments'
+
+ o = self.get_instance(args=('one?',))
+ e = raises(errors.ArgumentError, o.args_to_kw, 1, 2)
+ assert str(e) == 'example takes at most 1 argument'
+
+ o = self.get_instance(args=('one', 'two?'))
+ e = raises(errors.ArgumentError, o.args_to_kw, 1, 2, 3)
+ assert str(e) == 'example takes at most 2 arguments'
+
+ def test_params_2_args_options(self):
+ """
+ Test the `ipalib.frontend.Command.params_2_args_options` method.
+ """
+ assert 'params_2_args_options' in self.cls.__public__ # Public
+ o = self.get_instance(args=['one'], options=['two'])
+ assert o.params_2_args_options({}) == ((None,), dict(two=None))
+ assert o.params_2_args_options(dict(one=1)) == ((1,), dict(two=None))
+ assert o.params_2_args_options(dict(two=2)) == ((None,), dict(two=2))
+ assert o.params_2_args_options(dict(two=2, one=1)) == \
+ ((1,), dict(two=2))
+
+ def test_run(self):
+ """
+ Test the `ipalib.frontend.Command.run` method.
+ """
+ class my_cmd(self.cls):
+ def execute(self, *args, **kw):
+ return ('execute', args, kw)
+
+ def forward(self, *args, **kw):
+ return ('forward', args, kw)
+
+ args = ('Hello,', 'world,')
+ kw = dict(how_are='you', on_this='fine day?')
+
+ # Test in server context:
+ (api, home) = create_test_api(in_server=True)
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ assert o.run.im_func is self.cls.run.im_func
+ assert ('execute', args, kw) == o.run(*args, **kw)
+ assert o.run.im_func is my_cmd.execute.im_func
+
+ # Test in non-server context
+ (api, home) = create_test_api(in_server=False)
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ assert o.run.im_func is self.cls.run.im_func
+ assert ('forward', args, kw) == o.run(*args, **kw)
+ assert o.run.im_func is my_cmd.forward.im_func
+
+
+class test_LocalOrRemote(ClassChecker):
+ """
+ Test the `ipalib.frontend.LocalOrRemote` class.
+ """
+ _cls = frontend.LocalOrRemote
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.LocalOrRemote.__init__` method.
+ """
+ o = self.cls()
+ o.finalize()
+ assert list(o.args) == []
+ assert list(o.options) == ['server']
+ op = o.options.server
+ assert op.required is False
+ assert op.default is False
+
+ def test_run(self):
+ """
+ Test the `ipalib.frontend.LocalOrRemote.run` method.
+ """
+ class example(self.cls):
+ takes_args = ['key?']
+
+ def forward(self, *args, **options):
+ return ('forward', args, options)
+
+ def execute(self, *args, **options):
+ return ('execute', args, options)
+
+ # Test when in_server=False:
+ (api, home) = create_test_api(in_server=False)
+ api.register(example)
+ api.finalize()
+ cmd = api.Command.example
+ assert cmd() == ('execute', (None,), dict(server=False))
+ assert cmd(u'var') == ('execute', (u'var',), dict(server=False))
+ assert cmd(server=True) == ('forward', (None,), dict(server=True))
+ assert cmd(u'var', server=True) == \
+ ('forward', (u'var',), dict(server=True))
+
+ # Test when in_server=True (should always call execute):
+ (api, home) = create_test_api(in_server=True)
+ api.register(example)
+ api.finalize()
+ cmd = api.Command.example
+ assert cmd() == ('execute', (None,), dict(server=False))
+ assert cmd(u'var') == ('execute', (u'var',), dict(server=False))
+ assert cmd(server=True) == ('execute', (None,), dict(server=True))
+ assert cmd(u'var', server=True) == \
+ ('execute', (u'var',), dict(server=True))
+
+
+class test_Object(ClassChecker):
+ """
+ Test the `ipalib.frontend.Object` class.
+ """
+ _cls = frontend.Object
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Object` class.
+ """
+ assert self.cls.__bases__ == (plugable.Plugin,)
+ assert self.cls.backend is None
+ assert self.cls.methods is None
+ assert self.cls.properties is None
+ assert self.cls.params is None
+ assert self.cls.params_minus_pk is None
+ assert self.cls.takes_params == tuple()
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Object.__init__` method.
+ """
+ o = self.cls()
+ assert o.backend is None
+ assert o.methods is None
+ assert o.properties is None
+ assert o.params is None
+ assert o.params_minus_pk is None
+ assert o.properties is None
+
+ def test_set_api(self):
+ """
+ Test the `ipalib.frontend.Object.set_api` method.
+ """
+ # Setup for test:
+ class DummyAttribute(object):
+ def __init__(self, obj_name, attr_name, name=None):
+ self.obj_name = obj_name
+ self.attr_name = attr_name
+ if name is None:
+ self.name = '%s_%s' % (obj_name, attr_name)
+ else:
+ self.name = name
+ self.param = frontend.create_param(attr_name)
+
+ def __clone__(self, attr_name):
+ return self.__class__(
+ self.obj_name,
+ self.attr_name,
+ getattr(self, attr_name)
+ )
+
+ def get_attributes(cnt, format):
+ for name in ['other', 'user', 'another']:
+ for i in xrange(cnt):
+ yield DummyAttribute(name, format % i)
+
+ cnt = 10
+ formats = dict(
+ methods='method_%d',
+ properties='property_%d',
+ )
+
+
+ _d = dict(
+ Method=plugable.NameSpace(
+ get_attributes(cnt, formats['methods'])
+ ),
+ Property=plugable.NameSpace(
+ get_attributes(cnt, formats['properties'])
+ ),
+ )
+ api = plugable.MagicDict(_d)
+ assert len(api.Method) == cnt * 3
+ assert len(api.Property) == cnt * 3
+
+ class user(self.cls):
+ pass
+
+ # Actually perform test:
+ o = user()
+ o.set_api(api)
+ assert read_only(o, 'api') is api
+ for name in ['methods', 'properties']:
+ namespace = getattr(o, name)
+ assert isinstance(namespace, plugable.NameSpace)
+ assert len(namespace) == cnt
+ f = formats[name]
+ for i in xrange(cnt):
+ attr_name = f % i
+ attr = namespace[attr_name]
+ assert isinstance(attr, DummyAttribute)
+ assert attr is getattr(namespace, attr_name)
+ assert attr.obj_name == 'user'
+ assert attr.attr_name == attr_name
+ assert attr.name == attr_name
+
+ # Test params instance attribute
+ o = self.cls()
+ o.set_api(api)
+ ns = o.params
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == 0
+ class example(self.cls):
+ takes_params = ('banana', 'apple')
+ o = example()
+ o.set_api(api)
+ ns = o.params
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == 2, repr(ns)
+ assert list(ns) == ['banana', 'apple']
+ for p in ns():
+ assert type(p) is parameters.Str
+ assert p.required is True
+ assert p.multivalue is False
+
+ def test_primary_key(self):
+ """
+ Test the `ipalib.frontend.Object.primary_key` attribute.
+ """
+ (api, home) = create_test_api()
+ api.finalize()
+
+ # Test with no primary keys:
+ class example1(self.cls):
+ takes_params = (
+ 'one',
+ 'two',
+ )
+ o = example1()
+ o.set_api(api)
+ assert o.primary_key is None
+ assert o.params_minus_pk is None
+
+ # Test with 1 primary key:
+ class example2(self.cls):
+ takes_params = (
+ 'one',
+ 'two',
+ parameters.Str('three', primary_key=True),
+ 'four',
+ )
+ o = example2()
+ o.set_api(api)
+ pk = o.primary_key
+ assert type(pk) is parameters.Str
+ assert pk.name == 'three'
+ assert pk.primary_key is True
+ assert o.params[2] is o.primary_key
+ assert isinstance(o.params_minus_pk, plugable.NameSpace)
+ assert list(o.params_minus_pk) == ['one', 'two', 'four']
+
+ # Test with multiple primary_key:
+ class example3(self.cls):
+ takes_params = (
+ parameters.Str('one', primary_key=True),
+ parameters.Str('two', primary_key=True),
+ 'three',
+ parameters.Str('four', primary_key=True),
+ )
+ o = example3()
+ e = raises(ValueError, o.set_api, api)
+ assert str(e) == \
+ 'example3 (Object) has multiple primary keys: one, two, four'
+
+ def test_backend(self):
+ """
+ Test the `ipalib.frontend.Object.backend` attribute.
+ """
+ (api, home) = create_test_api()
+ class ldap(backend.Backend):
+ whatever = 'It worked!'
+ api.register(ldap)
+ class user(frontend.Object):
+ backend_name = 'ldap'
+ api.register(user)
+ api.finalize()
+ b = api.Object.user.backend
+ assert isinstance(b, ldap)
+ assert b.whatever == 'It worked!'
+
+ def test_get_dn(self):
+ """
+ Test the `ipalib.frontend.Object.get_dn` method.
+ """
+ assert 'get_dn' in self.cls.__public__ # Public
+ o = self.cls()
+ e = raises(NotImplementedError, o.get_dn, 'primary key')
+ assert str(e) == 'Object.get_dn()'
+ class user(self.cls):
+ pass
+ o = user()
+ e = raises(NotImplementedError, o.get_dn, 'primary key')
+ assert str(e) == 'user.get_dn()'
+
+
+class test_Attribute(ClassChecker):
+ """
+ Test the `ipalib.frontend.Attribute` class.
+ """
+ _cls = frontend.Attribute
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Attribute` class.
+ """
+ assert self.cls.__bases__ == (plugable.Plugin,)
+ assert type(self.cls.obj) is property
+ assert type(self.cls.obj_name) is property
+ assert type(self.cls.attr_name) is property
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Attribute.__init__` method.
+ """
+ class user_add(self.cls):
+ pass
+ o = user_add()
+ assert read_only(o, 'obj') is None
+ assert read_only(o, 'obj_name') == 'user'
+ assert read_only(o, 'attr_name') == 'add'
+
+ def test_set_api(self):
+ """
+ Test the `ipalib.frontend.Attribute.set_api` method.
+ """
+ user_obj = 'The user frontend.Object instance'
+ class api(object):
+ Object = dict(user=user_obj)
+ class user_add(self.cls):
+ pass
+ o = user_add()
+ assert read_only(o, 'api') is None
+ assert read_only(o, 'obj') is None
+ o.set_api(api)
+ assert read_only(o, 'api') is api
+ assert read_only(o, 'obj') is user_obj
+
+
+class test_Method(ClassChecker):
+ """
+ Test the `ipalib.frontend.Method` class.
+ """
+ _cls = frontend.Method
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Method` class.
+ """
+ assert self.cls.__bases__ == (frontend.Attribute, frontend.Command)
+ assert self.cls.implements(frontend.Command)
+ assert self.cls.implements(frontend.Attribute)
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Method.__init__` method.
+ """
+ class user_add(self.cls):
+ pass
+ o = user_add()
+ assert o.name == 'user_add'
+ assert o.obj_name == 'user'
+ assert o.attr_name == 'add'
+ assert frontend.Command.implemented_by(o)
+ assert frontend.Attribute.implemented_by(o)
+
+
+class test_Property(ClassChecker):
+ """
+ Test the `ipalib.frontend.Property` class.
+ """
+ _cls = frontend.Property
+
+ def get_subcls(self):
+ """
+ Return a standard subclass of `ipalib.frontend.Property`.
+ """
+ class user_givenname(self.cls):
+ 'User first name'
+
+ @frontend.rule
+ def rule0_lowercase(self, value):
+ if not value.islower():
+ return 'Must be lowercase'
+ return user_givenname
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Property` class.
+ """
+ assert self.cls.__bases__ == (frontend.Attribute,)
+ assert self.cls.klass is parameters.Str
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Property.__init__` method.
+ """
+ o = self.subcls()
+ assert len(o.rules) == 1
+ assert o.rules[0].__name__ == 'rule0_lowercase'
+ param = o.param
+ assert isinstance(param, parameters.Str)
+ assert param.name == 'givenname'
+ assert param.doc == 'User first name'
+
+
+class test_Application(ClassChecker):
+ """
+ Test the `ipalib.frontend.Application` class.
+ """
+ _cls = frontend.Application
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Application` class.
+ """
+ assert self.cls.__bases__ == (frontend.Command,)
+ assert type(self.cls.application) is property
+
+ def test_application(self):
+ """
+ Test the `ipalib.frontend.Application.application` property.
+ """
+ assert 'application' in self.cls.__public__ # Public
+ assert 'set_application' in self.cls.__public__ # Public
+ app = 'The external application'
+ class example(self.cls):
+ 'A subclass'
+ for o in (self.cls(), example()):
+ assert read_only(o, 'application') is None
+ e = raises(TypeError, o.set_application, None)
+ assert str(e) == (
+ '%s.application cannot be None' % o.__class__.__name__
+ )
+ o.set_application(app)
+ assert read_only(o, 'application') is app
+ e = raises(AttributeError, o.set_application, app)
+ assert str(e) == (
+ '%s.application can only be set once' % o.__class__.__name__
+ )
+ assert read_only(o, 'application') is app
diff --git a/tests/test_ipalib/test_parameters.py b/tests/test_ipalib/test_parameters.py
new file mode 100644
index 00000000..261e1481
--- /dev/null
+++ b/tests/test_ipalib/test_parameters.py
@@ -0,0 +1,994 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.parameters` module.
+"""
+
+from types import NoneType
+from inspect import isclass
+from tests.util import raises, ClassChecker, read_only
+from tests.util import dummy_ugettext, assert_equal
+from tests.data import binary_bytes, utf8_bytes, unicode_str
+from ipalib import parameters, request, errors2
+from ipalib.constants import TYPE_ERROR, CALLABLE_ERROR, NULLS
+
+
+class test_DefaultFrom(ClassChecker):
+ """
+ Test the `ipalib.parameters.DefaultFrom` class.
+ """
+ _cls = parameters.DefaultFrom
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.DefaultFrom.__init__` method.
+ """
+ def callback(*args):
+ return args
+ keys = ('givenname', 'sn')
+ o = self.cls(callback, *keys)
+ assert read_only(o, 'callback') is callback
+ assert read_only(o, 'keys') == keys
+ lam = lambda first, last: first[0] + last
+ o = self.cls(lam)
+ assert read_only(o, 'keys') == ('first', 'last')
+
+ # Test that TypeError is raised when callback isn't callable:
+ e = raises(TypeError, self.cls, 'whatever')
+ assert str(e) == CALLABLE_ERROR % ('callback', 'whatever', str)
+
+ # Test that TypeError is raised when a key isn't an str:
+ e = raises(TypeError, self.cls, callback, 'givenname', 17)
+ assert str(e) == TYPE_ERROR % ('keys', str, 17, int)
+
+ def test_call(self):
+ """
+ Test the `ipalib.parameters.DefaultFrom.__call__` method.
+ """
+ def callback(givenname, sn):
+ return givenname[0] + sn[0]
+ keys = ('givenname', 'sn')
+ o = self.cls(callback, *keys)
+ kw = dict(
+ givenname='John',
+ sn='Public',
+ hello='world',
+ )
+ assert o(**kw) == 'JP'
+ assert o() is None
+ for key in ('givenname', 'sn'):
+ kw_copy = dict(kw)
+ del kw_copy[key]
+ assert o(**kw_copy) is None
+
+ # Test using implied keys:
+ o = self.cls(lambda first, last: first[0] + last)
+ assert o(first='john', last='doe') == 'jdoe'
+ assert o(first='', last='doe') is None
+ assert o(one='john', two='doe') is None
+
+ # Test that co_varnames slice is used:
+ def callback2(first, last):
+ letter = first[0]
+ return letter + last
+ o = self.cls(callback2)
+ assert o.keys == ('first', 'last')
+ assert o(first='john', last='doe') == 'jdoe'
+
+
+def test_parse_param_spec():
+ """
+ Test the `ipalib.parameters.parse_param_spec` function.
+ """
+ f = parameters.parse_param_spec
+ assert f('name') == ('name', dict(required=True, multivalue=False))
+ assert f('name?') == ('name', dict(required=False, multivalue=False))
+ assert f('name*') == ('name', dict(required=False, multivalue=True))
+ assert f('name+') == ('name', dict(required=True, multivalue=True))
+
+ # Make sure other "funny" endings are *not* treated special:
+ assert f('name^') == ('name^', dict(required=True, multivalue=False))
+
+ # Test that TypeError is raised if spec isn't an str:
+ e = raises(TypeError, f, u'name?')
+ assert str(e) == TYPE_ERROR % ('spec', str, u'name?', unicode)
+
+ # Test that ValueError is raised if len(spec) < 2:
+ e = raises(ValueError, f, 'n')
+ assert str(e) == "spec must be at least 2 characters; got 'n'"
+
+
+class DummyRule(object):
+ def __init__(self, error=None):
+ assert error is None or type(error) is unicode
+ self.error = error
+ self.reset()
+
+ def __call__(self, *args):
+ self.calls.append(args)
+ return self.error
+
+ def reset(self):
+ self.calls = []
+
+
+class test_Param(ClassChecker):
+ """
+ Test the `ipalib.parameters.Param` class.
+ """
+ _cls = parameters.Param
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Param.__init__` method.
+ """
+ name = 'my_param'
+ o = self.cls(name)
+ assert o.param_spec is name
+ assert o.name is name
+ assert o.nice == "Param('my_param')"
+ assert o.__islocked__() is True
+
+ # Test default rules:
+ assert o.rules == tuple()
+ assert o.class_rules == tuple()
+ assert o.all_rules == tuple()
+
+ # Test default kwarg values:
+ assert o.cli_name is name
+ assert o.label is None
+ assert o.doc == ''
+ assert o.required is True
+ assert o.multivalue is False
+ assert o.primary_key is False
+ assert o.normalizer is None
+ assert o.default is None
+ assert o.default_from is None
+ assert o.create_default is None
+ assert o._get_default is None
+ assert o.autofill is False
+ assert o.query is False
+ assert o.flags == frozenset()
+
+ # Test that ValueError is raised when a kwarg from a subclass
+ # conflicts with an attribute:
+ class Subclass(self.cls):
+ kwargs = self.cls.kwargs + (
+ ('convert', callable, None),
+ )
+ e = raises(ValueError, Subclass, name)
+ assert str(e) == "kwarg 'convert' conflicts with attribute on Subclass"
+
+ # Test type validation of keyword arguments:
+ class Subclass(self.cls):
+ kwargs = self.cls.kwargs + (
+ ('extra1', bool, True),
+ ('extra2', str, 'Hello'),
+ ('extra3', (int, float), 42),
+ ('extra4', callable, lambda whatever: whatever + 7),
+ )
+ o = Subclass('my_param') # Test with no **kw:
+ for (key, kind, default) in o.kwargs:
+ # Test with a type invalid for all:
+ value = object()
+ kw = {key: value}
+ e = raises(TypeError, Subclass, 'my_param', **kw)
+ if kind is callable:
+ assert str(e) == CALLABLE_ERROR % (key, value, type(value))
+ else:
+ assert str(e) == TYPE_ERROR % (key, kind, value, type(value))
+ # Test with None:
+ kw = {key: None}
+ Subclass('my_param', **kw)
+
+ # Test when using unknown kwargs:
+ e = raises(TypeError, self.cls, 'my_param',
+ flags=['hello', 'world'],
+ whatever=u'Hooray!',
+ )
+ assert str(e) == \
+ "Param('my_param'): takes no such kwargs: 'whatever'"
+ e = raises(TypeError, self.cls, 'my_param', great='Yes', ape='he is!')
+ assert str(e) == \
+ "Param('my_param'): takes no such kwargs: 'ape', 'great'"
+
+ # Test that ValueError is raised if you provide both default_from and
+ # create_default:
+ e = raises(ValueError, self.cls, 'my_param',
+ default_from=lambda first, last: first[0] + last,
+ create_default=lambda **kw: 'The Default'
+ )
+ assert str(e) == '%s: cannot have both %r and %r' % (
+ "Param('my_param')", 'default_from', 'create_default',
+ )
+
+ # Test that _get_default gets set:
+ call1 = lambda first, last: first[0] + last
+ call2 = lambda **kw: 'The Default'
+ o = self.cls('my_param', default_from=call1)
+ assert o.default_from.callback is call1
+ assert o._get_default is o.default_from
+ o = self.cls('my_param', create_default=call2)
+ assert o.create_default is call2
+ assert o._get_default is call2
+
+ def test_repr(self):
+ """
+ Test the `ipalib.parameters.Param.__repr__` method.
+ """
+ for name in ['name', 'name?', 'name*', 'name+']:
+ o = self.cls(name)
+ assert repr(o) == 'Param(%r)' % name
+ o = self.cls('name', required=False)
+ assert repr(o) == "Param('name', required=False)"
+ o = self.cls('name', multivalue=True)
+ assert repr(o) == "Param('name', multivalue=True)"
+
+ def test_clone(self):
+ """
+ Test the `ipalib.parameters.Param.clone` method.
+ """
+ # Test with the defaults
+ orig = self.cls('my_param')
+ clone = orig.clone()
+ assert clone is not orig
+ assert type(clone) is self.cls
+ assert clone.name is orig.name
+ for (key, kind, default) in self.cls.kwargs:
+ assert getattr(clone, key) is getattr(orig, key)
+
+ # Test with a param spec:
+ orig = self.cls('my_param*')
+ assert orig.param_spec == 'my_param*'
+ clone = orig.clone()
+ assert clone.param_spec == 'my_param'
+ assert clone is not orig
+ assert type(clone) is self.cls
+ for (key, kind, default) in self.cls.kwargs:
+ assert getattr(clone, key) is getattr(orig, key)
+
+ # Test with overrides:
+ orig = self.cls('my_param*')
+ assert orig.required is False
+ assert orig.multivalue is True
+ clone = orig.clone(required=True)
+ assert clone is not orig
+ assert type(clone) is self.cls
+ assert clone.required is True
+ assert clone.multivalue is True
+ assert clone.param_spec == 'my_param'
+ assert clone.name == 'my_param'
+
+ def test_get_label(self):
+ """
+ Test the `ipalib.parameters.get_label` method.
+ """
+ context = request.context
+ cli_name = 'the_cli_name'
+ message = 'The Label'
+ label = lambda _: _(message)
+ o = self.cls('name', cli_name=cli_name, label=label)
+ assert o.label is label
+
+ ## Scenario 1: label=callable (a lambda form)
+
+ # Test with no context.ugettext:
+ assert not hasattr(context, 'ugettext')
+ assert_equal(o.get_label(), u'The Label')
+
+ # Test with dummy context.ugettext:
+ assert not hasattr(context, 'ugettext')
+ dummy = dummy_ugettext()
+ context.ugettext = dummy
+ assert o.get_label() is dummy.translation
+ assert dummy.message is message
+ del context.ugettext
+
+ ## Scenario 2: label=None
+ o = self.cls('name', cli_name=cli_name)
+ assert o.label is None
+
+ # Test with no context.ugettext:
+ assert not hasattr(context, 'ugettext')
+ assert_equal(o.get_label(), u'the_cli_name')
+
+ # Test with dummy context.ugettext:
+ assert not hasattr(context, 'ugettext')
+ dummy = dummy_ugettext()
+ context.ugettext = dummy
+ assert_equal(o.get_label(), u'the_cli_name')
+ assert not hasattr(dummy, 'message')
+
+ # Cleanup
+ del context.ugettext
+ assert not hasattr(context, 'ugettext')
+
+ def test_convert(self):
+ """
+ Test the `ipalib.parameters.Param.convert` method.
+ """
+ okay = ('Hello', u'Hello', 0, 4.2, True, False)
+ class Subclass(self.cls):
+ def _convert_scalar(self, value, index=None):
+ return value
+
+ # Test when multivalue=False:
+ o = Subclass('my_param')
+ for value in NULLS:
+ assert o.convert(value) is None
+ for value in okay:
+ assert o.convert(value) is value
+
+ # Test when multivalue=True:
+ o = Subclass('my_param', multivalue=True)
+ for value in NULLS:
+ assert o.convert(value) is None
+ assert o.convert(okay) == okay
+ assert o.convert(NULLS) is None
+ assert o.convert(okay + NULLS) == okay
+ assert o.convert(NULLS + okay) == okay
+ for value in okay:
+ assert o.convert(value) == (value,)
+ assert o.convert([None, value]) == (value,)
+ assert o.convert([value, None]) == (value,)
+
+ def test_convert_scalar(self):
+ """
+ Test the `ipalib.parameters.Param._convert_scalar` method.
+ """
+ dummy = dummy_ugettext()
+
+ # Test with correct type:
+ o = self.cls('my_param')
+ assert o._convert_scalar(None) is None
+ assert dummy.called() is False
+ # Test with incorrect type
+ e = raises(errors2.ConversionError, o._convert_scalar, 'hello', index=17)
+
+ def test_validate(self):
+ """
+ Test the `ipalib.parameters.Param.validate` method.
+ """
+
+ # Test in default state (with no rules, no kwarg):
+ o = self.cls('my_param')
+ e = raises(errors2.RequirementError, o.validate, None)
+ assert e.name == 'my_param'
+
+ # Test with required=False
+ o = self.cls('my_param', required=False)
+ assert o.required is False
+ assert o.validate(None) is None
+
+ # Test with query=True:
+ o = self.cls('my_param', query=True)
+ assert o.query is True
+ e = raises(errors2.RequirementError, o.validate, None)
+ assert_equal(e.name, 'my_param')
+
+ # Test with multivalue=True:
+ o = self.cls('my_param', multivalue=True)
+ e = raises(TypeError, o.validate, [])
+ assert str(e) == TYPE_ERROR % ('value', tuple, [], list)
+ e = raises(ValueError, o.validate, tuple())
+ assert str(e) == 'value: empty tuple must be converted to None'
+
+ # Test with wrong (scalar) type:
+ e = raises(TypeError, o.validate, (None, None, 42, None))
+ assert str(e) == TYPE_ERROR % ('value[2]', NoneType, 42, int)
+ o = self.cls('my_param')
+ e = raises(TypeError, o.validate, 'Hello')
+ assert str(e) == TYPE_ERROR % ('value', NoneType, 'Hello', str)
+
+ class Example(self.cls):
+ type = int
+
+ # Test with some rules and multivalue=False
+ pass1 = DummyRule()
+ pass2 = DummyRule()
+ fail = DummyRule(u'no good')
+ o = Example('example', pass1, pass2)
+ assert o.multivalue is False
+ assert o.validate(11) is None
+ assert pass1.calls == [(request.ugettext, 11)]
+ assert pass2.calls == [(request.ugettext, 11)]
+ pass1.reset()
+ pass2.reset()
+ o = Example('example', pass1, pass2, fail)
+ e = raises(errors2.ValidationError, o.validate, 42)
+ assert e.name == 'example'
+ assert e.error == u'no good'
+ assert e.index is None
+ assert pass1.calls == [(request.ugettext, 42)]
+ assert pass2.calls == [(request.ugettext, 42)]
+ assert fail.calls == [(request.ugettext, 42)]
+
+ # Test with some rules and multivalue=True
+ pass1 = DummyRule()
+ pass2 = DummyRule()
+ fail = DummyRule(u'this one is not good')
+ o = Example('example', pass1, pass2, multivalue=True)
+ assert o.multivalue is True
+ assert o.validate((3, 9)) is None
+ assert pass1.calls == [
+ (request.ugettext, 3),
+ (request.ugettext, 9),
+ ]
+ assert pass2.calls == [
+ (request.ugettext, 3),
+ (request.ugettext, 9),
+ ]
+ pass1.reset()
+ pass2.reset()
+ o = Example('multi_example', pass1, pass2, fail, multivalue=True)
+ assert o.multivalue is True
+ e = raises(errors2.ValidationError, o.validate, (3, 9))
+ assert e.name == 'multi_example'
+ assert e.error == u'this one is not good'
+ assert e.index == 0
+ assert pass1.calls == [(request.ugettext, 3)]
+ assert pass2.calls == [(request.ugettext, 3)]
+ assert fail.calls == [(request.ugettext, 3)]
+
+ def test_validate_scalar(self):
+ """
+ Test the `ipalib.parameters.Param._validate_scalar` method.
+ """
+ class MyParam(self.cls):
+ type = bool
+ okay = DummyRule()
+ o = MyParam('my_param', okay)
+
+ # Test that TypeError is appropriately raised:
+ e = raises(TypeError, o._validate_scalar, 0)
+ assert str(e) == TYPE_ERROR % ('value', bool, 0, int)
+ e = raises(TypeError, o._validate_scalar, 'Hi', index=4)
+ assert str(e) == TYPE_ERROR % ('value[4]', bool, 'Hi', str)
+ e = raises(TypeError, o._validate_scalar, True, index=3.0)
+ assert str(e) == TYPE_ERROR % ('index', int, 3.0, float)
+
+ # Test with passing rule:
+ assert o._validate_scalar(True, index=None) is None
+ assert o._validate_scalar(False, index=None) is None
+ assert okay.calls == [
+ (request.ugettext, True),
+ (request.ugettext, False),
+ ]
+
+ # Test with a failing rule:
+ okay = DummyRule()
+ fail = DummyRule(u'this describes the error')
+ o = MyParam('my_param', okay, fail)
+ e = raises(errors2.ValidationError, o._validate_scalar, True)
+ assert e.name == 'my_param'
+ assert e.error == u'this describes the error'
+ assert e.index is None
+ e = raises(errors2.ValidationError, o._validate_scalar, False, index=2)
+ assert e.name == 'my_param'
+ assert e.error == u'this describes the error'
+ assert e.index == 2
+ assert okay.calls == [
+ (request.ugettext, True),
+ (request.ugettext, False),
+ ]
+ assert fail.calls == [
+ (request.ugettext, True),
+ (request.ugettext, False),
+ ]
+
+ def test_get_default(self):
+ """
+ Test the `ipalib.parameters.Param._get_default` method.
+ """
+ class PassThrough(object):
+ value = None
+
+ def __call__(self, value):
+ assert self.value is None
+ assert value is not None
+ self.value = value
+ return value
+
+ def reset(self):
+ assert self.value is not None
+ self.value = None
+
+ class Str(self.cls):
+ type = unicode
+
+ def __init__(self, name, **kw):
+ self._convert_scalar = PassThrough()
+ super(Str, self).__init__(name, **kw)
+
+ # Test with only a static default:
+ o = Str('my_str',
+ normalizer=PassThrough(),
+ default=u'Static Default',
+ )
+ assert_equal(o.get_default(), u'Static Default')
+ assert o._convert_scalar.value is None
+ assert o.normalizer.value is None
+
+ # Test with default_from:
+ o = Str('my_str',
+ normalizer=PassThrough(),
+ default=u'Static Default',
+ default_from=lambda first, last: first[0] + last,
+ )
+ assert_equal(o.get_default(), u'Static Default')
+ assert o._convert_scalar.value is None
+ assert o.normalizer.value is None
+ default = o.get_default(first=u'john', last='doe')
+ assert_equal(default, u'jdoe')
+ assert o._convert_scalar.value is default
+ assert o.normalizer.value is default
+
+ # Test with create_default:
+ o = Str('my_str',
+ normalizer=PassThrough(),
+ default=u'Static Default',
+ create_default=lambda **kw: u'The created default',
+ )
+ default = o.get_default(first=u'john', last='doe')
+ assert_equal(default, u'The created default')
+ assert o._convert_scalar.value is default
+ assert o.normalizer.value is default
+
+
+class test_Flag(ClassChecker):
+ """
+ Test the `ipalib.parameters.Flag` class.
+ """
+ _cls = parameters.Flag
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Flag.__init__` method.
+ """
+ # Test with no kwargs:
+ o = self.cls('my_flag')
+ assert o.type is bool
+ assert isinstance(o, parameters.Bool)
+ assert o.autofill is True
+ assert o.default is False
+
+ # Test that TypeError is raise if default is not a bool:
+ e = raises(TypeError, self.cls, 'my_flag', default=None)
+ assert str(e) == TYPE_ERROR % ('default', bool, None, NoneType)
+
+ # Test with autofill=False, default=True
+ o = self.cls('my_flag', autofill=False, default=True)
+ assert o.autofill is True
+ assert o.default is True
+
+ # Test when cloning:
+ orig = self.cls('my_flag')
+ for clone in [orig.clone(), orig.clone(autofill=False)]:
+ assert clone.autofill is True
+ assert clone.default is False
+ assert clone is not orig
+ assert type(clone) is self.cls
+
+ # Test when cloning with default=True/False
+ orig = self.cls('my_flag')
+ assert orig.clone().default is False
+ assert orig.clone(default=True).default is True
+ orig = self.cls('my_flag', default=True)
+ assert orig.clone().default is True
+ assert orig.clone(default=False).default is False
+
+
+class test_Data(ClassChecker):
+ """
+ Test the `ipalib.parameters.Data` class.
+ """
+ _cls = parameters.Data
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Data.__init__` method.
+ """
+ o = self.cls('my_data')
+ assert o.type is NoneType
+ assert o.rules == tuple()
+ assert o.class_rules == tuple()
+ assert o.all_rules == tuple()
+ assert o.minlength is None
+ assert o.maxlength is None
+ assert o.length is None
+ assert not hasattr(o, 'pattern')
+
+ # Test mixing length with minlength or maxlength:
+ o = self.cls('my_data', length=5)
+ assert o.length == 5
+ permutations = [
+ dict(minlength=3),
+ dict(maxlength=7),
+ dict(minlength=3, maxlength=7),
+ ]
+ for kw in permutations:
+ o = self.cls('my_data', **kw)
+ for (key, value) in kw.iteritems():
+ assert getattr(o, key) == value
+ e = raises(ValueError, self.cls, 'my_data', length=5, **kw)
+ assert str(e) == \
+ "Data('my_data'): cannot mix length with minlength or maxlength"
+
+ # Test when minlength or maxlength are less than 1:
+ e = raises(ValueError, self.cls, 'my_data', minlength=0)
+ assert str(e) == "Data('my_data'): minlength must be >= 1; got 0"
+ e = raises(ValueError, self.cls, 'my_data', maxlength=0)
+ assert str(e) == "Data('my_data'): maxlength must be >= 1; got 0"
+
+ # Test when minlength > maxlength:
+ e = raises(ValueError, self.cls, 'my_data', minlength=22, maxlength=15)
+ assert str(e) == \
+ "Data('my_data'): minlength > maxlength (minlength=22, maxlength=15)"
+
+ # Test when minlength == maxlength
+ e = raises(ValueError, self.cls, 'my_data', minlength=7, maxlength=7)
+ assert str(e) == \
+ "Data('my_data'): minlength == maxlength; use length=7 instead"
+
+
+class test_Bytes(ClassChecker):
+ """
+ Test the `ipalib.parameters.Bytes` class.
+ """
+ _cls = parameters.Bytes
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Bytes.__init__` method.
+ """
+ o = self.cls('my_bytes')
+ assert o.type is str
+ assert o.rules == tuple()
+ assert o.class_rules == tuple()
+ assert o.all_rules == tuple()
+ assert o.minlength is None
+ assert o.maxlength is None
+ assert o.length is None
+ assert o.pattern is None
+
+ # Test mixing length with minlength or maxlength:
+ o = self.cls('my_bytes', length=5)
+ assert o.length == 5
+ assert len(o.class_rules) == 1
+ assert len(o.rules) == 0
+ assert len(o.all_rules) == 1
+ permutations = [
+ dict(minlength=3),
+ dict(maxlength=7),
+ dict(minlength=3, maxlength=7),
+ ]
+ for kw in permutations:
+ o = self.cls('my_bytes', **kw)
+ assert len(o.class_rules) == len(kw)
+ assert len(o.rules) == 0
+ assert len(o.all_rules) == len(kw)
+ for (key, value) in kw.iteritems():
+ assert getattr(o, key) == value
+ e = raises(ValueError, self.cls, 'my_bytes', length=5, **kw)
+ assert str(e) == \
+ "Bytes('my_bytes'): cannot mix length with minlength or maxlength"
+
+ # Test when minlength or maxlength are less than 1:
+ e = raises(ValueError, self.cls, 'my_bytes', minlength=0)
+ assert str(e) == "Bytes('my_bytes'): minlength must be >= 1; got 0"
+ e = raises(ValueError, self.cls, 'my_bytes', maxlength=0)
+ assert str(e) == "Bytes('my_bytes'): maxlength must be >= 1; got 0"
+
+ # Test when minlength > maxlength:
+ e = raises(ValueError, self.cls, 'my_bytes', minlength=22, maxlength=15)
+ assert str(e) == \
+ "Bytes('my_bytes'): minlength > maxlength (minlength=22, maxlength=15)"
+
+ # Test when minlength == maxlength
+ e = raises(ValueError, self.cls, 'my_bytes', minlength=7, maxlength=7)
+ assert str(e) == \
+ "Bytes('my_bytes'): minlength == maxlength; use length=7 instead"
+
+ def test_rule_minlength(self):
+ """
+ Test the `ipalib.parameters.Bytes._rule_minlength` method.
+ """
+ o = self.cls('my_bytes', minlength=3)
+ assert o.minlength == 3
+ rule = o._rule_minlength
+ translation = u'minlength=%(minlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in ('abc', 'four', '12345'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in ('', 'a', '12'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(minlength=3)
+ )
+ assert dummy.message == 'must be at least %(minlength)d bytes'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_maxlength(self):
+ """
+ Test the `ipalib.parameters.Bytes._rule_maxlength` method.
+ """
+ o = self.cls('my_bytes', maxlength=4)
+ assert o.maxlength == 4
+ rule = o._rule_maxlength
+ translation = u'maxlength=%(maxlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in ('ab', '123', 'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in ('12345', 'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(maxlength=4)
+ )
+ assert dummy.message == 'can be at most %(maxlength)d bytes'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_length(self):
+ """
+ Test the `ipalib.parameters.Bytes._rule_length` method.
+ """
+ o = self.cls('my_bytes', length=4)
+ assert o.length == 4
+ rule = o._rule_length
+ translation = u'length=%(length)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in ('1234', 'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in ('ab', '123', '12345', 'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(length=4),
+ )
+ assert dummy.message == 'must be exactly %(length)d bytes'
+ assert dummy.called() is True
+ dummy.reset()
+
+
+class test_Str(ClassChecker):
+ """
+ Test the `ipalib.parameters.Str` class.
+ """
+ _cls = parameters.Str
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Str.__init__` method.
+ """
+ o = self.cls('my_str')
+ assert o.type is unicode
+ assert o.minlength is None
+ assert o.maxlength is None
+ assert o.length is None
+ assert o.pattern is None
+
+ def test_convert_scalar(self):
+ """
+ Test the `ipalib.parameters.Str._convert_scalar` method.
+ """
+ o = self.cls('my_str')
+ mthd = o._convert_scalar
+ for value in (u'Hello', 42, 1.2):
+ assert mthd(value) == unicode(value)
+ for value in [True, 'Hello', (u'Hello',), [42.3], dict(one=1)]:
+ e = raises(errors2.ConversionError, mthd, value)
+ assert e.name == 'my_str'
+ assert e.index is None
+ assert_equal(e.error, u'must be Unicode text')
+ e = raises(errors2.ConversionError, mthd, value, index=18)
+ assert e.name == 'my_str'
+ assert e.index == 18
+ assert_equal(e.error, u'must be Unicode text')
+
+ def test_rule_minlength(self):
+ """
+ Test the `ipalib.parameters.Str._rule_minlength` method.
+ """
+ o = self.cls('my_str', minlength=3)
+ assert o.minlength == 3
+ rule = o._rule_minlength
+ translation = u'minlength=%(minlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (u'abc', u'four', u'12345'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (u'', u'a', u'12'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(minlength=3)
+ )
+ assert dummy.message == 'must be at least %(minlength)d characters'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_maxlength(self):
+ """
+ Test the `ipalib.parameters.Str._rule_maxlength` method.
+ """
+ o = self.cls('my_str', maxlength=4)
+ assert o.maxlength == 4
+ rule = o._rule_maxlength
+ translation = u'maxlength=%(maxlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (u'ab', u'123', u'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (u'12345', u'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(maxlength=4)
+ )
+ assert dummy.message == 'can be at most %(maxlength)d characters'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_length(self):
+ """
+ Test the `ipalib.parameters.Str._rule_length` method.
+ """
+ o = self.cls('my_str', length=4)
+ assert o.length == 4
+ rule = o._rule_length
+ translation = u'length=%(length)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (u'1234', u'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (u'ab', u'123', u'12345', u'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(length=4),
+ )
+ assert dummy.message == 'must be exactly %(length)d characters'
+ assert dummy.called() is True
+ dummy.reset()
+
+
+class test_StrEnum(ClassChecker):
+ """
+ Test the `ipalib.parameters.StrEnum` class.
+ """
+ _cls = parameters.StrEnum
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.StrEnum.__init__` method.
+ """
+ values = (u'Hello', u'naughty', u'nurse!')
+ o = self.cls('my_strenum', values=values)
+ assert o.type is unicode
+ assert o.values is values
+ assert o.class_rules == (o._rule_values,)
+ assert o.rules == tuple()
+ assert o.all_rules == (o._rule_values,)
+
+ badvalues = (u'Hello', 'naughty', u'nurse!')
+ e = raises(TypeError, self.cls, 'my_enum', values=badvalues)
+ assert str(e) == TYPE_ERROR % (
+ "StrEnum('my_enum') values[1]", unicode, 'naughty', str
+ )
+
+ def test_rules_values(self):
+ """
+ Test the `ipalib.parameters.StrEnum._rule_values` method.
+ """
+ values = (u'Hello', u'naughty', u'nurse!')
+ o = self.cls('my_enum', values=values)
+ rule = o._rule_values
+ translation = u'values=%(values)s'
+ dummy = dummy_ugettext(translation)
+
+ # Test with passing values:
+ for v in values:
+ assert rule(dummy, v) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for val in (u'Howdy', u'quiet', u'library!'):
+ assert_equal(
+ rule(dummy, val),
+ translation % dict(values=values),
+ )
+ assert_equal(dummy.message, 'must be one of %(values)r')
+ dummy.reset()
+
+
+def test_create_param():
+ """
+ Test the `ipalib.parameters.create_param` function.
+ """
+ f = parameters.create_param
+
+ # Test that Param instances are returned unchanged:
+ params = (
+ parameters.Param('one?'),
+ parameters.Int('two+'),
+ parameters.Str('three*'),
+ parameters.Bytes('four'),
+ )
+ for p in params:
+ assert f(p) is p
+
+ # Test that the spec creates an Str instance:
+ for spec in ('one?', 'two+', 'three*', 'four'):
+ (name, kw) = parameters.parse_param_spec(spec)
+ p = f(spec)
+ assert p.param_spec is spec
+ assert p.name == name
+ assert p.required is kw['required']
+ assert p.multivalue is kw['multivalue']
+
+ # Test that TypeError is raised when spec is neither a Param nor a str:
+ for spec in (u'one', 42, parameters.Param, parameters.Str):
+ e = raises(TypeError, f, spec)
+ assert str(e) == \
+ TYPE_ERROR % ('spec', (str, parameters.Param), spec, type(spec))
+
+
+def test_messages():
+ """
+ Test module level message in `ipalib.parameters`.
+ """
+ for name in dir(parameters):
+ if name.startswith('_'):
+ continue
+ attr = getattr(parameters, name)
+ if not (isclass(attr) and issubclass(attr, parameters.Param)):
+ continue
+ assert type(attr.type_error) is str
+ assert attr.type_error in parameters.__messages
diff --git a/tests/test_ipalib/test_plugable.py b/tests/test_ipalib/test_plugable.py
new file mode 100644
index 00000000..c6c84fa1
--- /dev/null
+++ b/tests/test_ipalib/test_plugable.py
@@ -0,0 +1,756 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.plugable` module.
+"""
+
+import inspect
+from tests.util import raises, no_set, no_del, read_only
+from tests.util import getitem, setitem, delitem
+from tests.util import ClassChecker, create_test_api
+from ipalib import plugable, errors, errors2
+
+
+class test_SetProxy(ClassChecker):
+ """
+ Test the `ipalib.plugable.SetProxy` class.
+ """
+ _cls = plugable.SetProxy
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.SetProxy` class.
+ """
+ assert self.cls.__bases__ == (plugable.ReadOnly,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.SetProxy.__init__` method.
+ """
+ okay = (set, frozenset, dict)
+ fail = (list, tuple)
+ for t in okay:
+ self.cls(t())
+ raises(TypeError, self.cls, t)
+ for t in fail:
+ raises(TypeError, self.cls, t())
+ raises(TypeError, self.cls, t)
+
+ def test_SetProxy(self):
+ """
+ Test container emulation of `ipalib.plugable.SetProxy` class.
+ """
+ def get_key(i):
+ return 'key_%d' % i
+
+ cnt = 10
+ target = set()
+ proxy = self.cls(target)
+ for i in xrange(cnt):
+ key = get_key(i)
+
+ # Check initial state
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert key not in proxy
+ assert key not in target
+
+ # Add and test again
+ target.add(key)
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert key in proxy
+ assert key in target
+
+
+class test_DictProxy(ClassChecker):
+ """
+ Test the `ipalib.plugable.DictProxy` class.
+ """
+ _cls = plugable.DictProxy
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.DictProxy` class.
+ """
+ assert self.cls.__bases__ == (plugable.SetProxy,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.DictProxy.__init__` method.
+ """
+ self.cls(dict())
+ raises(TypeError, self.cls, dict)
+ fail = (set, frozenset, list, tuple)
+ for t in fail:
+ raises(TypeError, self.cls, t())
+ raises(TypeError, self.cls, t)
+
+ def test_DictProxy(self):
+ """
+ Test container emulation of `ipalib.plugable.DictProxy` class.
+ """
+ def get_kv(i):
+ return (
+ 'key_%d' % i,
+ 'val_%d' % i,
+ )
+ cnt = 10
+ target = dict()
+ proxy = self.cls(target)
+ for i in xrange(cnt):
+ (key, val) = get_kv(i)
+
+ # Check initial state
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert list(proxy()) == [target[k] for k in sorted(target)]
+ assert key not in proxy
+ raises(KeyError, getitem, proxy, key)
+
+ # Add and test again
+ target[key] = val
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert list(proxy()) == [target[k] for k in sorted(target)]
+
+ # Verify TypeError is raised trying to set/del via proxy
+ raises(TypeError, setitem, proxy, key, val)
+ raises(TypeError, delitem, proxy, key)
+
+
+class test_MagicDict(ClassChecker):
+ """
+ Test the `ipalib.plugable.MagicDict` class.
+ """
+ _cls = plugable.MagicDict
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.MagicDict` class.
+ """
+ assert self.cls.__bases__ == (plugable.DictProxy,)
+ for non_dict in ('hello', 69, object):
+ raises(TypeError, self.cls, non_dict)
+
+ def test_MagicDict(self):
+ """
+ Test container emulation of `ipalib.plugable.MagicDict` class.
+ """
+ cnt = 10
+ keys = []
+ d = dict()
+ dictproxy = self.cls(d)
+ for i in xrange(cnt):
+ key = 'key_%d' % i
+ val = 'val_%d' % i
+ keys.append(key)
+
+ # Test thet key does not yet exist
+ assert len(dictproxy) == i
+ assert key not in dictproxy
+ assert not hasattr(dictproxy, key)
+ raises(KeyError, getitem, dictproxy, key)
+ raises(AttributeError, getattr, dictproxy, key)
+
+ # Test that items/attributes cannot be set on dictproxy:
+ raises(TypeError, setitem, dictproxy, key, val)
+ raises(AttributeError, setattr, dictproxy, key, val)
+
+ # Test that additions in d are reflected in dictproxy:
+ d[key] = val
+ assert len(dictproxy) == i + 1
+ assert key in dictproxy
+ assert hasattr(dictproxy, key)
+ assert dictproxy[key] is val
+ assert read_only(dictproxy, key) is val
+
+ # Test __iter__
+ assert list(dictproxy) == keys
+
+ for key in keys:
+ # Test that items cannot be deleted through dictproxy:
+ raises(TypeError, delitem, dictproxy, key)
+ raises(AttributeError, delattr, dictproxy, key)
+
+ # Test that deletions in d are reflected in dictproxy
+ del d[key]
+ assert len(dictproxy) == len(d)
+ assert key not in dictproxy
+ raises(KeyError, getitem, dictproxy, key)
+ raises(AttributeError, getattr, dictproxy, key)
+
+
+class test_Plugin(ClassChecker):
+ """
+ Test the `ipalib.plugable.Plugin` class.
+ """
+ _cls = plugable.Plugin
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.Plugin` class.
+ """
+ assert self.cls.__bases__ == (plugable.ReadOnly,)
+ assert self.cls.__public__ == frozenset()
+ assert type(self.cls.api) is property
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.Plugin.__init__` method.
+ """
+ o = self.cls()
+ assert o.name == 'Plugin'
+ assert o.module == 'ipalib.plugable'
+ assert o.fullname == 'ipalib.plugable.Plugin'
+ assert o.doc == inspect.getdoc(self.cls)
+ class some_subclass(self.cls):
+ """
+ Do sub-classy things.
+
+ Although it doesn't know how to comport itself and is not for mixed
+ company, this class *is* useful as we all need a little sub-class
+ now and then.
+
+ One more paragraph.
+ """
+ o = some_subclass()
+ assert o.name == 'some_subclass'
+ assert o.module == __name__
+ assert o.fullname == '%s.some_subclass' % __name__
+ assert o.doc == inspect.getdoc(some_subclass)
+ assert o.summary == 'Do sub-classy things.'
+ class another_subclass(self.cls):
+ pass
+ o = another_subclass()
+ assert o.doc is None
+ assert o.summary == '<%s>' % o.fullname
+
+ # Test that Plugin makes sure the subclass hasn't defined attributes
+ # whose names conflict with the logger methods set in Plugin.__init__():
+ class check(self.cls):
+ info = 'whatever'
+ e = raises(StandardError, check)
+ assert str(e) == \
+ "check.info attribute ('whatever') conflicts with Plugin logger"
+
+ def test_implements(self):
+ """
+ Test the `ipalib.plugable.Plugin.implements` classmethod.
+ """
+ class example(self.cls):
+ __public__ = frozenset((
+ 'some_method',
+ 'some_property',
+ ))
+ class superset(self.cls):
+ __public__ = frozenset((
+ 'some_method',
+ 'some_property',
+ 'another_property',
+ ))
+ class subset(self.cls):
+ __public__ = frozenset((
+ 'some_property',
+ ))
+ class any_object(object):
+ __public__ = frozenset((
+ 'some_method',
+ 'some_property',
+ ))
+
+ for ex in (example, example()):
+ # Test using str:
+ assert ex.implements('some_method')
+ assert not ex.implements('another_method')
+
+ # Test using frozenset:
+ assert ex.implements(frozenset(['some_method']))
+ assert not ex.implements(
+ frozenset(['some_method', 'another_method'])
+ )
+
+ # Test using another object/class with __public__ frozenset:
+ assert ex.implements(example)
+ assert ex.implements(example())
+
+ assert ex.implements(subset)
+ assert not subset.implements(ex)
+
+ assert not ex.implements(superset)
+ assert superset.implements(ex)
+
+ assert ex.implements(any_object)
+ assert ex.implements(any_object())
+
+ def test_implemented_by(self):
+ """
+ Test the `ipalib.plugable.Plugin.implemented_by` classmethod.
+ """
+ class base(self.cls):
+ __public__ = frozenset((
+ 'attr0',
+ 'attr1',
+ 'attr2',
+ ))
+
+ class okay(base):
+ def attr0(self):
+ pass
+ def __get_attr1(self):
+ assert False # Make sure property isn't accesed on instance
+ attr1 = property(__get_attr1)
+ attr2 = 'hello world'
+ another_attr = 'whatever'
+
+ class fail(base):
+ def __init__(self):
+ # Check that class, not instance is inspected:
+ self.attr2 = 'hello world'
+ def attr0(self):
+ pass
+ def __get_attr1(self):
+ assert False # Make sure property isn't accesed on instance
+ attr1 = property(__get_attr1)
+ another_attr = 'whatever'
+
+ # Test that AssertionError is raised trying to pass something not
+ # subclass nor instance of base:
+ raises(AssertionError, base.implemented_by, object)
+
+ # Test on subclass with needed attributes:
+ assert base.implemented_by(okay) is True
+ assert base.implemented_by(okay()) is True
+
+ # Test on subclass *without* needed attributes:
+ assert base.implemented_by(fail) is False
+ assert base.implemented_by(fail()) is False
+
+ def test_set_api(self):
+ """
+ Test the `ipalib.plugable.Plugin.set_api` method.
+ """
+ api = 'the api instance'
+ o = self.cls()
+ assert o.api is None
+ e = raises(AssertionError, o.set_api, None)
+ assert str(e) == 'set_api() argument cannot be None'
+ o.set_api(api)
+ assert o.api is api
+ e = raises(AssertionError, o.set_api, api)
+ assert str(e) == 'set_api() can only be called once'
+
+ def test_finalize(self):
+ """
+ Test the `ipalib.plugable.Plugin.finalize` method.
+ """
+ o = self.cls()
+ assert not o.__islocked__()
+ o.finalize()
+ assert o.__islocked__()
+
+ def test_call(self):
+ """
+ Test the `ipalib.plugable.Plugin.call` method.
+ """
+ o = self.cls()
+ o.call('/bin/true') is None
+ e = raises(errors2.SubprocessError, o.call, '/bin/false')
+ assert e.returncode == 1
+ assert e.argv == ('/bin/false',)
+
+
+class test_PluginProxy(ClassChecker):
+ """
+ Test the `ipalib.plugable.PluginProxy` class.
+ """
+ _cls = plugable.PluginProxy
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.PluginProxy` class.
+ """
+ assert self.cls.__bases__ == (plugable.SetProxy,)
+
+ def test_proxy(self):
+ """
+ Test proxy behaviour of `ipalib.plugable.PluginProxy` instance.
+ """
+ # Setup:
+ class base(object):
+ __public__ = frozenset((
+ 'public_0',
+ 'public_1',
+ '__call__',
+ ))
+
+ def public_0(self):
+ return 'public_0'
+
+ def public_1(self):
+ return 'public_1'
+
+ def __call__(self, caller):
+ return 'ya called it, %s.' % caller
+
+ def private_0(self):
+ return 'private_0'
+
+ def private_1(self):
+ return 'private_1'
+
+ class plugin(base):
+ name = 'user_add'
+ attr_name = 'add'
+ doc = 'add a new user'
+
+ # Test that TypeError is raised when base is not a class:
+ raises(TypeError, self.cls, base(), None)
+
+ # Test that ValueError is raised when target is not instance of base:
+ raises(ValueError, self.cls, base, object())
+
+ # Test with correct arguments:
+ i = plugin()
+ p = self.cls(base, i)
+ assert read_only(p, 'name') is plugin.name
+ assert read_only(p, 'doc') == plugin.doc
+ assert list(p) == sorted(base.__public__)
+
+ # Test normal methods:
+ for n in xrange(2):
+ pub = 'public_%d' % n
+ priv = 'private_%d' % n
+ assert getattr(i, pub)() == pub
+ assert getattr(p, pub)() == pub
+ assert hasattr(p, pub)
+ assert getattr(i, priv)() == priv
+ assert not hasattr(p, priv)
+
+ # Test __call__:
+ value = 'ya called it, dude.'
+ assert i('dude') == value
+ assert p('dude') == value
+ assert callable(p)
+
+ # Test name_attr='name' kw arg
+ i = plugin()
+ p = self.cls(base, i, 'attr_name')
+ assert read_only(p, 'name') == 'add'
+
+ def test_implements(self):
+ """
+ Test the `ipalib.plugable.PluginProxy.implements` method.
+ """
+ class base(object):
+ __public__ = frozenset()
+ name = 'base'
+ doc = 'doc'
+ @classmethod
+ def implements(cls, arg):
+ return arg + 7
+
+ class sub(base):
+ @classmethod
+ def implements(cls, arg):
+ """
+ Defined to make sure base.implements() is called, not
+ target.implements()
+ """
+ return arg
+
+ o = sub()
+ p = self.cls(base, o)
+ assert p.implements(3) == 10
+
+ def test_clone(self):
+ """
+ Test the `ipalib.plugable.PluginProxy.__clone__` method.
+ """
+ class base(object):
+ __public__ = frozenset()
+ class sub(base):
+ name = 'some_name'
+ doc = 'doc'
+ label = 'another_name'
+
+ p = self.cls(base, sub())
+ assert read_only(p, 'name') == 'some_name'
+ c = p.__clone__('label')
+ assert isinstance(c, self.cls)
+ assert c is not p
+ assert read_only(c, 'name') == 'another_name'
+
+
+def test_Registrar():
+ """
+ Test the `ipalib.plugable.Registrar` class
+ """
+ class Base1(object):
+ pass
+ class Base2(object):
+ pass
+ class Base3(object):
+ pass
+ class plugin1(Base1):
+ pass
+ class plugin2(Base2):
+ pass
+ class plugin3(Base3):
+ pass
+
+ # Test creation of Registrar:
+ r = plugable.Registrar(Base1, Base2)
+
+ # Test __iter__:
+ assert list(r) == ['Base1', 'Base2']
+
+ # Test __hasitem__, __getitem__:
+ for base in [Base1, Base2]:
+ name = base.__name__
+ assert name in r
+ assert r[name] is base
+ magic = getattr(r, name)
+ assert type(magic) is plugable.MagicDict
+ assert len(magic) == 0
+
+ # Check that TypeError is raised trying to register something that isn't
+ # a class:
+ p = plugin1()
+ e = raises(TypeError, r, p)
+ assert str(e) == 'plugin must be a class; got %r' % p
+
+ # Check that SubclassError is raised trying to register a class that is
+ # not a subclass of an allowed base:
+ e = raises(errors2.PluginSubclassError, r, plugin3)
+ assert e.plugin is plugin3
+
+ # Check that registration works
+ r(plugin1)
+ assert len(r.Base1) == 1
+ assert r.Base1['plugin1'] is plugin1
+ assert r.Base1.plugin1 is plugin1
+
+ # Check that DuplicateError is raised trying to register exact class
+ # again:
+ e = raises(errors2.PluginDuplicateError, r, plugin1)
+ assert e.plugin is plugin1
+
+ # Check that OverrideError is raised trying to register class with same
+ # name and same base:
+ orig1 = plugin1
+ class base1_extended(Base1):
+ pass
+ class plugin1(base1_extended):
+ pass
+ e = raises(errors2.PluginOverrideError, r, plugin1)
+ assert e.base == 'Base1'
+ assert e.name == 'plugin1'
+ assert e.plugin is plugin1
+
+ # Check that overriding works
+ r(plugin1, override=True)
+ assert len(r.Base1) == 1
+ assert r.Base1.plugin1 is plugin1
+ assert r.Base1.plugin1 is not orig1
+
+ # Check that MissingOverrideError is raised trying to override a name
+ # not yet registerd:
+ e = raises(errors2.PluginMissingOverrideError, r, plugin2, override=True)
+ assert e.base == 'Base2'
+ assert e.name == 'plugin2'
+ assert e.plugin is plugin2
+
+ # Test that another plugin can be registered:
+ assert len(r.Base2) == 0
+ r(plugin2)
+ assert len(r.Base2) == 1
+ assert r.Base2.plugin2 is plugin2
+
+ # Setup to test more registration:
+ class plugin1a(Base1):
+ pass
+ r(plugin1a)
+
+ class plugin1b(Base1):
+ pass
+ r(plugin1b)
+
+ class plugin2a(Base2):
+ pass
+ r(plugin2a)
+
+ class plugin2b(Base2):
+ pass
+ r(plugin2b)
+
+ # Again test __hasitem__, __getitem__:
+ for base in [Base1, Base2]:
+ name = base.__name__
+ assert name in r
+ assert r[name] is base
+ magic = getattr(r, name)
+ assert len(magic) == 3
+ for key in magic:
+ klass = magic[key]
+ assert getattr(magic, key) is klass
+ assert issubclass(klass, base)
+
+
+class test_API(ClassChecker):
+ """
+ Test the `ipalib.plugable.API` class.
+ """
+
+ _cls = plugable.API
+
+ def test_API(self):
+ """
+ Test the `ipalib.plugable.API` class.
+ """
+ assert issubclass(plugable.API, plugable.ReadOnly)
+
+ # Setup the test bases, create the API:
+ class base0(plugable.Plugin):
+ __public__ = frozenset((
+ 'method',
+ ))
+
+ def method(self, n):
+ return n
+
+ class base1(plugable.Plugin):
+ __public__ = frozenset((
+ 'method',
+ ))
+
+ def method(self, n):
+ return n + 1
+
+ api = plugable.API(base0, base1)
+ api.env.mode = 'unit_test'
+ api.env.in_tree = True
+ r = api.register
+ assert isinstance(r, plugable.Registrar)
+ assert read_only(api, 'register') is r
+
+ class base0_plugin0(base0):
+ pass
+ r(base0_plugin0)
+
+ class base0_plugin1(base0):
+ pass
+ r(base0_plugin1)
+
+ class base0_plugin2(base0):
+ pass
+ r(base0_plugin2)
+
+ class base1_plugin0(base1):
+ pass
+ r(base1_plugin0)
+
+ class base1_plugin1(base1):
+ pass
+ r(base1_plugin1)
+
+ class base1_plugin2(base1):
+ pass
+ r(base1_plugin2)
+
+ # Test API instance:
+ assert api.isdone('bootstrap') is False
+ assert api.isdone('finalize') is False
+ api.finalize()
+ assert api.isdone('bootstrap') is True
+ assert api.isdone('finalize') is True
+
+ def get_base(b):
+ return 'base%d' % b
+
+ def get_plugin(b, p):
+ return 'base%d_plugin%d' % (b, p)
+
+ for b in xrange(2):
+ base_name = get_base(b)
+ ns = getattr(api, base_name)
+ assert isinstance(ns, plugable.NameSpace)
+ assert read_only(api, base_name) is ns
+ assert len(ns) == 3
+ for p in xrange(3):
+ plugin_name = get_plugin(b, p)
+ proxy = ns[plugin_name]
+ assert isinstance(proxy, plugable.PluginProxy)
+ assert proxy.name == plugin_name
+ assert read_only(ns, plugin_name) is proxy
+ assert read_only(proxy, 'method')(7) == 7 + b
+
+ # Test that calling finilize again raises AssertionError:
+ e = raises(StandardError, api.finalize)
+ assert str(e) == 'API.finalize() already called', str(e)
+
+ # Test with base class that doesn't request a proxy
+ class NoProxy(plugable.Plugin):
+ __proxy__ = False
+ api = plugable.API(NoProxy)
+ api.env.mode = 'unit_test'
+ class plugin0(NoProxy):
+ pass
+ api.register(plugin0)
+ class plugin1(NoProxy):
+ pass
+ api.register(plugin1)
+ api.finalize()
+ names = ['plugin0', 'plugin1']
+ assert list(api.NoProxy) == names
+ for name in names:
+ plugin = api.NoProxy[name]
+ assert getattr(api.NoProxy, name) is plugin
+ assert isinstance(plugin, plugable.Plugin)
+ assert not isinstance(plugin, plugable.PluginProxy)
+
+ def test_bootstrap(self):
+ """
+ Test the `ipalib.plugable.API.bootstrap` method.
+ """
+ (o, home) = create_test_api()
+ assert o.env._isdone('_bootstrap') is False
+ assert o.env._isdone('_finalize_core') is False
+ assert o.isdone('bootstrap') is False
+ o.bootstrap(my_test_override='Hello, world!')
+ assert o.isdone('bootstrap') is True
+ assert o.env._isdone('_bootstrap') is True
+ assert o.env._isdone('_finalize_core') is True
+ assert o.env.my_test_override == 'Hello, world!'
+ e = raises(StandardError, o.bootstrap)
+ assert str(e) == 'API.bootstrap() already called'
+
+ def test_load_plugins(self):
+ """
+ Test the `ipalib.plugable.API.load_plugins` method.
+ """
+ (o, home) = create_test_api()
+ assert o.isdone('bootstrap') is False
+ assert o.isdone('load_plugins') is False
+ o.load_plugins()
+ assert o.isdone('bootstrap') is True
+ assert o.isdone('load_plugins') is True
+ e = raises(StandardError, o.load_plugins)
+ assert str(e) == 'API.load_plugins() already called'
diff --git a/tests/test_ipalib/test_request.py b/tests/test_ipalib/test_request.py
new file mode 100644
index 00000000..f26c270a
--- /dev/null
+++ b/tests/test_ipalib/test_request.py
@@ -0,0 +1,161 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty contextrmation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.request` module.
+"""
+
+import threading
+import locale
+from tests.util import raises, assert_equal
+from tests.util import TempDir, dummy_ugettext, dummy_ungettext
+from ipalib.constants import OVERRIDE_ERROR
+from ipalib import request
+
+
+def test_ugettext():
+ """
+ Test the `ipalib.request.ugettext` function.
+ """
+ f = request.ugettext
+ context = request.context
+ message = 'Hello, world!'
+
+ # Test with no context.ugettext:
+ assert not hasattr(context, 'ugettext')
+ assert_equal(f(message), u'Hello, world!')
+
+ # Test with dummy context.ugettext:
+ assert not hasattr(context, 'ugettext')
+ dummy = dummy_ugettext()
+ context.ugettext = dummy
+ assert f(message) is dummy.translation
+ assert dummy.message is message
+
+ # Cleanup
+ del context.ugettext
+ assert not hasattr(context, 'ugettext')
+
+
+def test_ungettext():
+ """
+ Test the `ipalib.request.ungettext` function.
+ """
+ f = request.ungettext
+ context = request.context
+ singular = 'Goose'
+ plural = 'Geese'
+
+ # Test with no context.ungettext:
+ assert not hasattr(context, 'ungettext')
+ assert_equal(f(singular, plural, 1), u'Goose')
+ assert_equal(f(singular, plural, 2), u'Geese')
+
+ # Test singular with dummy context.ungettext
+ assert not hasattr(context, 'ungettext')
+ dummy = dummy_ungettext()
+ context.ungettext = dummy
+ assert f(singular, plural, 1) is dummy.translation_singular
+ assert dummy.singular is singular
+ assert dummy.plural is plural
+ assert dummy.n == 1
+ del context.ungettext
+ assert not hasattr(context, 'ungettext')
+
+ # Test plural with dummy context.ungettext
+ assert not hasattr(context, 'ungettext')
+ dummy = dummy_ungettext()
+ context.ungettext = dummy
+ assert f(singular, plural, 2) is dummy.translation_plural
+ assert dummy.singular is singular
+ assert dummy.plural is plural
+ assert dummy.n == 2
+ del context.ungettext
+ assert not hasattr(context, 'ungettext')
+
+
+def test_set_languages():
+ """
+ Test the `ipalib.request.set_languages` function.
+ """
+ f = request.set_languages
+ c = request.context
+ langs = ('ru', 'en')
+
+ # Test that StandardError is raised if languages has already been set:
+ assert not hasattr(c, 'languages')
+ c.languages = None
+ e = raises(StandardError, f, *langs)
+ assert str(e) == OVERRIDE_ERROR % ('context', 'languages', None, langs)
+ del c.languages
+
+ # Test setting the languages:
+ assert not hasattr(c, 'languages')
+ f(*langs)
+ assert c.languages == langs
+ del c.languages
+
+ # Test setting language from locale.getdefaultlocale()
+ assert not hasattr(c, 'languages')
+ f()
+ assert c.languages == locale.getdefaultlocale()[:1]
+ del c.languages
+ assert not hasattr(c, 'languages')
+
+
+def test_create_translation():
+ """
+ Test the `ipalib.request.create_translation` function.
+ """
+ f = request.create_translation
+ c = request.context
+ t = TempDir()
+
+ # Test that StandardError is raised if ugettext or ungettext:
+ assert not (hasattr(c, 'ugettext') or hasattr(c, 'ungettext'))
+ for name in ('ugettext', 'ungettext'):
+ setattr(c, name, None)
+ e = raises(StandardError, f, 'ipa', None)
+ assert str(e) == (
+ 'create_translation() already called in thread %r' %
+ threading.currentThread().getName()
+ )
+ delattr(c, name)
+
+ # Test using default language:
+ assert not hasattr(c, 'ugettext')
+ assert not hasattr(c, 'ungettext')
+ assert not hasattr(c, 'languages')
+ f('ipa', t.path)
+ assert hasattr(c, 'ugettext')
+ assert hasattr(c, 'ungettext')
+ assert c.languages == locale.getdefaultlocale()[:1]
+ del c.ugettext
+ del c.ungettext
+ del c.languages
+
+ # Test using explicit languages:
+ langs = ('de', 'es')
+ f('ipa', t.path, *langs)
+ assert hasattr(c, 'ugettext')
+ assert hasattr(c, 'ungettext')
+ assert c.languages == langs
+ del c.ugettext
+ del c.ungettext
+ del c.languages
diff --git a/tests/test_ipalib/test_rpc.py b/tests/test_ipalib/test_rpc.py
new file mode 100644
index 00000000..296e9bc1
--- /dev/null
+++ b/tests/test_ipalib/test_rpc.py
@@ -0,0 +1,249 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.rpc` module.
+"""
+
+import threading
+from xmlrpclib import Binary, Fault, dumps, loads
+from tests.util import raises, assert_equal, PluginTester, DummyClass
+from tests.data import binary_bytes, utf8_bytes, unicode_str
+from ipalib.frontend import Command
+from ipalib.request import context
+from ipalib import rpc, errors2
+
+
+std_compound = (binary_bytes, utf8_bytes, unicode_str)
+
+
+def dump_n_load(value):
+ (param, method) = loads(
+ dumps((value,), allow_none=True)
+ )
+ return param[0]
+
+
+def round_trip(value):
+ return rpc.xml_unwrap(
+ dump_n_load(rpc.xml_wrap(value))
+ )
+
+
+def test_round_trip():
+ """
+ Test `ipalib.rpc.xml_wrap` and `ipalib.rpc.xml_unwrap`.
+
+ This tests the two functions together with ``xmlrpclib.dumps()`` and
+ ``xmlrpclib.loads()`` in a full wrap/dumps/loads/unwrap round trip.
+ """
+ # We first test that our assumptions about xmlrpclib module in the Python
+ # standard library are correct:
+ assert_equal(dump_n_load(utf8_bytes), unicode_str)
+ assert_equal(dump_n_load(unicode_str), unicode_str)
+ assert_equal(dump_n_load(Binary(binary_bytes)).data, binary_bytes)
+ assert isinstance(dump_n_load(Binary(binary_bytes)), Binary)
+ assert type(dump_n_load('hello')) is str
+ assert type(dump_n_load(u'hello')) is str
+ assert_equal(dump_n_load(''), '')
+ assert_equal(dump_n_load(u''), '')
+ assert dump_n_load(None) is None
+
+ # Now we test our wrap and unwrap methods in combination with dumps, loads:
+ # All str should come back str (because they get wrapped in
+ # xmlrpclib.Binary(). All unicode should come back unicode because str
+ # explicity get decoded by rpc.xml_unwrap() if they weren't already
+ # decoded by xmlrpclib.loads().
+ assert_equal(round_trip(utf8_bytes), utf8_bytes)
+ assert_equal(round_trip(unicode_str), unicode_str)
+ assert_equal(round_trip(binary_bytes), binary_bytes)
+ assert type(round_trip('hello')) is str
+ assert type(round_trip(u'hello')) is unicode
+ assert_equal(round_trip(''), '')
+ assert_equal(round_trip(u''), u'')
+ assert round_trip(None) is None
+ compound = [utf8_bytes, None, binary_bytes, (None, unicode_str),
+ dict(utf8=utf8_bytes, chars=unicode_str, data=binary_bytes)
+ ]
+ assert round_trip(compound) == tuple(compound)
+
+
+def test_xml_wrap():
+ """
+ Test the `ipalib.rpc.xml_wrap` function.
+ """
+ f = rpc.xml_wrap
+ assert f([]) == tuple()
+ assert f({}) == dict()
+ b = f('hello')
+ assert isinstance(b, Binary)
+ assert b.data == 'hello'
+ u = f(u'hello')
+ assert type(u) is unicode
+ assert u == u'hello'
+ value = f([dict(one=False, two=u'hello'), None, 'hello'])
+
+
+def test_xml_unwrap():
+ """
+ Test the `ipalib.rpc.xml_unwrap` function.
+ """
+ f = rpc.xml_unwrap
+ assert f([]) == tuple()
+ assert f({}) == dict()
+ value = f(Binary(utf8_bytes))
+ assert type(value) is str
+ assert value == utf8_bytes
+ assert f(utf8_bytes) == unicode_str
+ assert f(unicode_str) == unicode_str
+ value = f([True, Binary('hello'), dict(one=1, two=utf8_bytes, three=None)])
+ assert value == (True, 'hello', dict(one=1, two=unicode_str, three=None))
+ assert type(value[1]) is str
+ assert type(value[2]['two']) is unicode
+
+
+def test_xml_dumps():
+ """
+ Test the `ipalib.rpc.xml_dumps` function.
+ """
+ f = rpc.xml_dumps
+ params = (binary_bytes, utf8_bytes, unicode_str, None)
+
+ # Test serializing an RPC request:
+ data = f(params, 'the_method')
+ (p, m) = loads(data)
+ assert_equal(m, u'the_method')
+ assert type(p) is tuple
+ assert rpc.xml_unwrap(p) == params
+
+ # Test serializing an RPC response:
+ data = f((params,), methodresponse=True)
+ (tup, m) = loads(data)
+ assert m is None
+ assert len(tup) == 1
+ assert type(tup) is tuple
+ assert rpc.xml_unwrap(tup[0]) == params
+
+ # Test serializing an RPC response containing a Fault:
+ fault = Fault(69, unicode_str)
+ data = f(fault, methodresponse=True)
+ e = raises(Fault, loads, data)
+ assert e.faultCode == 69
+ assert_equal(e.faultString, unicode_str)
+
+
+def test_xml_loads():
+ """
+ Test the `ipalib.rpc.xml_loads` function.
+ """
+ f = rpc.xml_loads
+ params = (binary_bytes, utf8_bytes, unicode_str, None)
+ wrapped = rpc.xml_wrap(params)
+
+ # Test un-serializing an RPC request:
+ data = dumps(wrapped, 'the_method', allow_none=True)
+ (p, m) = f(data)
+ assert_equal(m, u'the_method')
+ assert_equal(p, params)
+
+ # Test un-serializing an RPC response:
+ data = dumps((wrapped,), methodresponse=True, allow_none=True)
+ (tup, m) = f(data)
+ assert m is None
+ assert len(tup) == 1
+ assert type(tup) is tuple
+ assert_equal(tup[0], params)
+
+ # Test un-serializing an RPC response containing a Fault:
+ fault = Fault(69, unicode_str)
+ data = dumps(fault, methodresponse=True, allow_none=True)
+ e = raises(Fault, f, data)
+ assert e.faultCode == 69
+ assert_equal(e.faultString, unicode_str)
+
+
+class test_xmlclient(PluginTester):
+ """
+ Test the `ipalib.rpc.xmlclient` plugin.
+ """
+ _plugin = rpc.xmlclient
+
+ def test_forward(self):
+ """
+ Test the `ipalib.rpc.xmlclient.forward` method.
+ """
+ class user_add(Command):
+ pass
+
+ # Test that ValueError is raised when forwarding a command that is not
+ # in api.Command:
+ (o, api, home) = self.instance('Backend', in_server=False)
+ e = raises(ValueError, o.forward, 'user_add')
+ assert str(e) == '%s.forward(): %r not in api.Command' % (
+ 'xmlclient', 'user_add'
+ )
+
+ # Test that StandardError is raised when context.xmlconn does not exist:
+ (o, api, home) = self.instance('Backend', user_add, in_server=False)
+ e = raises(StandardError, o.forward, 'user_add')
+ assert str(e) == '%s.forward(%r): need context.xmlconn in thread %r' % (
+ 'xmlclient', 'user_add', threading.currentThread().getName()
+ )
+
+ args = (binary_bytes, utf8_bytes, unicode_str)
+ kw = dict(one=binary_bytes, two=utf8_bytes, three=unicode_str)
+ params = args + (kw,)
+ result = (unicode_str, binary_bytes, utf8_bytes)
+ context.xmlconn = DummyClass(
+ (
+ 'user_add',
+ (rpc.xml_wrap(params),),
+ {},
+ rpc.xml_wrap(result),
+ ),
+ (
+ 'user_add',
+ (rpc.xml_wrap(params),),
+ {},
+ Fault(3005, u"'four' is required"), # RequirementError
+ ),
+ (
+ 'user_add',
+ (rpc.xml_wrap(params),),
+ {},
+ Fault(700, u'no such error'), # There is no error 700
+ ),
+
+ )
+
+ # Test with a successful return value:
+ assert o.forward('user_add', *args, **kw) == result
+
+ # Test with an errno the client knows:
+ e = raises(errors2.RequirementError, o.forward, 'user_add', *args, **kw)
+ assert_equal(e.message, u"'four' is required")
+
+ # Test with an errno the client doesn't know
+ e = raises(errors2.UnknownError, o.forward, 'user_add', *args, **kw)
+ assert_equal(e.code, 700)
+ assert_equal(e.error, u'no such error')
+
+ assert context.xmlconn._calledall() is True
+
+ del context.xmlconn
diff --git a/tests/test_ipalib/test_util.py b/tests/test_ipalib/test_util.py
new file mode 100644
index 00000000..6729fcda
--- /dev/null
+++ b/tests/test_ipalib/test_util.py
@@ -0,0 +1,61 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.util` module.
+"""
+
+from tests.util import raises
+from ipalib import util
+
+
+def test_xmlrpc_marshal():
+ """
+ Test the `ipalib.util.xmlrpc_marshal` function.
+ """
+ f = util.xmlrpc_marshal
+ assert f() == ({},)
+ assert f('one', 'two') == ({}, 'one', 'two')
+ assert f(one=1, two=2) == (dict(one=1, two=2),)
+ assert f('one', 'two', three=3, four=4) == \
+ (dict(three=3, four=4), 'one', 'two')
+
+
+def test_xmlrpc_unmarshal():
+ """
+ Test the `ipalib.util.xmlrpc_unmarshal` function.
+ """
+ f = util.xmlrpc_unmarshal
+ assert f() == (tuple(), {})
+ assert f({}, 'one', 'two') == (('one', 'two'), {})
+ assert f(dict(one=1, two=2)) == (tuple(), dict(one=1, two=2))
+ assert f(dict(three=3, four=4), 'one', 'two') == \
+ (('one', 'two'), dict(three=3, four=4))
+
+
+def test_make_repr():
+ """
+ Test the `ipalib.util.make_repr` function.
+ """
+ f = util.make_repr
+ assert f('my') == 'my()'
+ assert f('my', True, u'hello') == "my(True, u'hello')"
+ assert f('my', one=1, two='two') == "my(one=1, two='two')"
+ assert f('my', None, 3, dog='animal', apple='fruit') == \
+ "my(None, 3, apple='fruit', dog='animal')"
diff --git a/tests/test_ipaserver/__init__.py b/tests/test_ipaserver/__init__.py
new file mode 100644
index 00000000..56a6c533
--- /dev/null
+++ b/tests/test_ipaserver/__init__.py
@@ -0,0 +1,22 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Sub-package containing unit tests for `ipaserver` package.
+"""
diff --git a/tests/test_ipaserver/test_rpcserver.py b/tests/test_ipaserver/test_rpcserver.py
new file mode 100644
index 00000000..48c1d36e
--- /dev/null
+++ b/tests/test_ipaserver/test_rpcserver.py
@@ -0,0 +1,79 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipaserver.rpc` module.
+"""
+
+from tests.util import create_test_api, raises, PluginTester
+from tests.data import unicode_str
+from ipalib import errors2, Command
+from ipaserver import rpcserver
+
+
+def test_params_2_args_options():
+ """
+ Test the `ipaserver.rpcserver.params_2_args_options` function.
+ """
+ f = rpcserver.params_2_args_options
+ args = ('Hello', u'world!')
+ options = dict(one=1, two=u'Two', three='Three')
+ assert f(tuple()) == (tuple(), dict())
+ assert f(args) == (args, dict())
+ assert f((options,)) == (tuple(), options)
+ assert f(args + (options,)) == (args, options)
+ assert f((options,) + args) == ((options,) + args, dict())
+
+
+class test_xmlserver(PluginTester):
+ """
+ Test the `ipaserver.rpcserver.xmlserver` plugin.
+ """
+
+ _plugin = rpcserver.xmlserver
+
+ def test_dispatch(self):
+ """
+ Test the `ipaserver.rpcserver.xmlserver.dispatch` method.
+ """
+ (o, api, home) = self.instance('Backend', in_server=True)
+ e = raises(errors2.CommandError, o.dispatch, 'echo', tuple())
+ assert e.name == 'echo'
+
+ class echo(Command):
+ takes_args = ['arg1', 'arg2+']
+ takes_options = ['option1?', 'option2?']
+ def execute(self, *args, **options):
+ assert type(args[1]) is tuple
+ return args + (options,)
+
+ (o, api, home) = self.instance('Backend', echo, in_server=True)
+ def call(params):
+ response = o.dispatch('echo', params)
+ assert type(response) is tuple and len(response) == 1
+ return response[0]
+ arg1 = unicode_str
+ arg2 = (u'Hello', unicode_str, u'world!')
+ options = dict(option1=u'How are you?', option2=unicode_str)
+ assert call((arg1, arg2, options)) == (arg1, arg2, options)
+ assert call((arg1,) + arg2 + (options,)) == (arg1, arg2, options)
+
+
+ def test_execute(self):
+ (o, api, home) = self.instance('Backend', in_server=True)
diff --git a/tests/test_ipawebui/__init__.py b/tests/test_ipawebui/__init__.py
new file mode 100644
index 00000000..f739a856
--- /dev/null
+++ b/tests/test_ipawebui/__init__.py
@@ -0,0 +1,21 @@
+# Authors: Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Sub-package containing unit tests for `ipawebui` package.
+"""
diff --git a/tests/test_ipawebui/test_controllers.py b/tests/test_ipawebui/test_controllers.py
new file mode 100644
index 00000000..e236d1a0
--- /dev/null
+++ b/tests/test_ipawebui/test_controllers.py
@@ -0,0 +1,70 @@
+# Authors: Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipawebui.controller` module.
+"""
+
+from ipawebui import controller
+
+
+
+class test_Controller(object):
+ """
+ Test the `controller.Controller` class.
+ """
+
+ def test_init(self):
+ """
+ Test the `ipawebui.controller.Controller.__init__()` method.
+ """
+ o = controller.Controller()
+ assert o.template is None
+ template = 'The template.'
+ o = controller.Controller(template)
+ assert o.template is template
+
+ def test_output_xhtml(self):
+ """
+ Test the `ipawebui.controller.Controller.output_xhtml` method.
+ """
+ class Template(object):
+ def __init__(self):
+ self.calls = 0
+ self.kw = {}
+
+ def serialize(self, **kw):
+ self.calls += 1
+ self.kw = kw
+ return dict(kw)
+
+ d = dict(output='xhtml-strict', format='pretty')
+ t = Template()
+ o = controller.Controller(t)
+ assert o.output_xhtml() == d
+ assert t.calls == 1
+
+ def test_output_json(self):
+ """
+ Test the `ipawebui.controller.Controller.output_json` method.
+ """
+ o = controller.Controller()
+ assert o.output_json() == '{}'
+ e = '{\n "age": 27, \n "first": "John", \n "last": "Doe"\n}'
+ j = o.output_json(last='Doe', first='John', age=27)
+ assert j == e
diff --git a/tests/test_util.py b/tests/test_util.py
new file mode 100644
index 00000000..7d7038c1
--- /dev/null
+++ b/tests/test_util.py
@@ -0,0 +1,148 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `tests.util` module.
+"""
+
+import util
+
+
+class Prop(object):
+ def __init__(self, *ops):
+ self.__ops = frozenset(ops)
+ self.__prop = 'prop value'
+
+ def __get_prop(self):
+ if 'get' not in self.__ops:
+ raise AttributeError('get prop')
+ return self.__prop
+
+ def __set_prop(self, value):
+ if 'set' not in self.__ops:
+ raise AttributeError('set prop')
+ self.__prop = value
+
+ def __del_prop(self):
+ if 'del' not in self.__ops:
+ raise AttributeError('del prop')
+ self.__prop = None
+
+ prop = property(__get_prop, __set_prop, __del_prop)
+
+
+def test_yes_raised():
+ f = util.raises
+
+ class SomeError(Exception):
+ pass
+
+ class AnotherError(Exception):
+ pass
+
+ def callback1():
+ 'raises correct exception'
+ raise SomeError()
+
+ def callback2():
+ 'raises wrong exception'
+ raise AnotherError()
+
+ def callback3():
+ 'raises no exception'
+
+ f(SomeError, callback1)
+
+ raised = False
+ try:
+ f(SomeError, callback2)
+ except AnotherError:
+ raised = True
+ assert raised
+
+ raised = False
+ try:
+ f(SomeError, callback3)
+ except util.ExceptionNotRaised:
+ raised = True
+ assert raised
+
+
+def test_no_set():
+ # Tests that it works when prop cannot be set:
+ util.no_set(Prop('get', 'del'), 'prop')
+
+ # Tests that ExceptionNotRaised is raised when prop *can* be set:
+ raised = False
+ try:
+ util.no_set(Prop('set'), 'prop')
+ except util.ExceptionNotRaised:
+ raised = True
+ assert raised
+
+
+def test_no_del():
+ # Tests that it works when prop cannot be deleted:
+ util.no_del(Prop('get', 'set'), 'prop')
+
+ # Tests that ExceptionNotRaised is raised when prop *can* be set:
+ raised = False
+ try:
+ util.no_del(Prop('del'), 'prop')
+ except util.ExceptionNotRaised:
+ raised = True
+ assert raised
+
+
+def test_read_only():
+ # Test that it works when prop is read only:
+ assert util.read_only(Prop('get'), 'prop') == 'prop value'
+
+ # Test that ExceptionNotRaised is raised when prop can be set:
+ raised = False
+ try:
+ util.read_only(Prop('get', 'set'), 'prop')
+ except util.ExceptionNotRaised:
+ raised = True
+ assert raised
+
+ # Test that ExceptionNotRaised is raised when prop can be deleted:
+ raised = False
+ try:
+ util.read_only(Prop('get', 'del'), 'prop')
+ except util.ExceptionNotRaised:
+ raised = True
+ assert raised
+
+ # Test that ExceptionNotRaised is raised when prop can be both set and
+ # deleted:
+ raised = False
+ try:
+ util.read_only(Prop('get', 'del'), 'prop')
+ except util.ExceptionNotRaised:
+ raised = True
+ assert raised
+
+ # Test that AttributeError is raised when prop can't be read:
+ raised = False
+ try:
+ util.read_only(Prop(), 'prop')
+ except AttributeError:
+ raised = True
+ assert raised
diff --git a/tests/test_xmlrpc/__init__.py b/tests/test_xmlrpc/__init__.py
new file mode 100644
index 00000000..043007b5
--- /dev/null
+++ b/tests/test_xmlrpc/__init__.py
@@ -0,0 +1,22 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Sub-package containing unit tests for `xmlrpc` package.
+"""
diff --git a/tests/test_xmlrpc/test_automount_plugin.py b/tests/test_xmlrpc/test_automount_plugin.py
new file mode 100644
index 00000000..522ee689
--- /dev/null
+++ b/tests/test_xmlrpc/test_automount_plugin.py
@@ -0,0 +1,243 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_automount' module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+class test_Service(XMLRPC_test):
+ """
+ Test the `f_automount` plugin.
+ """
+ mapname='testmap'
+ keyname='testkey'
+ keyname2='secondkey'
+ description='description of map'
+ info='ro'
+ map_kw={'automountmapname': mapname, 'description': description}
+ key_kw={'automountmapname': mapname, 'automountkey': keyname, 'automountinformation': info}
+ key_kw2={'automountmapname': mapname, 'automountkey': keyname2, 'automountinformation': info}
+
+ def test_add_1map(self):
+ """
+ Test adding a map `xmlrpc.automount_addmap` method.
+ """
+ res = api.Command['automount_addmap'](**self.map_kw)
+ assert res
+ assert res.get('automountmapname','') == self.mapname
+
+ def test_add_2key(self):
+ """
+ Test adding a key using `xmlrpc.automount_addkey` method.
+ """
+ res = api.Command['automount_addkey'](**self.key_kw2)
+ assert res
+ assert res.get('automountkey','') == self.keyname2
+
+ def test_add_3key(self):
+ """
+ Test adding a key using `xmlrpc.automount_addkey` method.
+ """
+ res = api.Command['automount_addkey'](**self.key_kw)
+ assert res
+ assert res.get('automountkey','') == self.keyname
+
+ def test_add_4key(self):
+ """
+ Test adding a duplicate key using `xmlrpc.automount_addkey` method.
+ """
+ try:
+ res = api.Command['automount_addkey'](**self.key_kw)
+ except errors.DuplicateEntry:
+ pass
+ else:
+ assert False
+
+ def test_doshowmap(self):
+ """
+ Test the `xmlrpc.automount_showmap` method.
+ """
+ res = api.Command['automount_showmap'](self.mapname)
+ assert res
+ assert res.get('automountmapname','') == self.mapname
+
+ def test_findmap(self):
+ """
+ Test the `xmlrpc.automount_findmap` method.
+ """
+ res = api.Command['automount_findmap'](self.mapname)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('automountmapname','') == self.mapname
+
+ def test_doshowkey(self):
+ """
+ Test the `xmlrpc.automount_showkey` method.
+ """
+ showkey_kw={'automountmapname': self.mapname, 'automountkey': self.keyname}
+ res = api.Command['automount_showkey'](**showkey_kw)
+ assert res
+ assert res.get('automountkey','') == self.keyname
+ assert res.get('automountinformation','') == self.info
+
+ def test_findkey(self):
+ """
+ Test the `xmlrpc.automount_findkey` method.
+ """
+ res = api.Command['automount_findkey'](self.keyname)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('automountkey','') == self.keyname
+ assert res[1].get('automountinformation','') == self.info
+
+ def test_modkey(self):
+ """
+ Test the `xmlrpc.automount_modkey` method.
+ """
+ self.key_kw['automountinformation'] = 'rw'
+ self.key_kw['description'] = 'new description'
+ res = api.Command['automount_modkey'](**self.key_kw)
+ assert res
+ assert res.get('automountkey','') == self.keyname
+ assert res.get('automountinformation','') == 'rw'
+ assert res.get('description','') == 'new description'
+
+ def test_modmap(self):
+ """
+ Test the `xmlrpc.automount_modmap` method.
+ """
+ self.map_kw['description'] = 'new description'
+ res = api.Command['automount_modmap'](**self.map_kw)
+ assert res
+ assert res.get('automountmapname','') == self.mapname
+ assert res.get('description','') == 'new description'
+
+ def test_remove1key(self):
+ """
+ Test the `xmlrpc.automount_delkey` method.
+ """
+ delkey_kw={'automountmapname': self.mapname, 'automountkey': self.keyname}
+ res = api.Command['automount_delkey'](**delkey_kw)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['automount_showkey'](**delkey_kw)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ def test_remove2map(self):
+ """
+ Test the `xmlrpc.automount_delmap` method.
+ """
+ res = api.Command['automount_delmap'](self.mapname)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['automount_showmap'](self.mapname)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ def test_remove3map(self):
+ """
+ Test that the `xmlrpc.automount_delmap` method removes all keys
+ """
+ # Verify that the second key we added is gone
+ key_kw={'automountmapname': self.mapname, 'automountkey': self.keyname2}
+ try:
+ res = api.Command['automount_showkey'](**key_kw)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+class test_Indirect(XMLRPC_test):
+ """
+ Test the `f_automount` plugin Indirect map function.
+ """
+ mapname='auto.home'
+ keyname='/home'
+ parentmap='auto.master'
+ description='Home directories'
+ map_kw={'automountkey': keyname, 'parentmap': parentmap, 'description': description}
+
+ def test_add_indirect(self):
+ """
+ Test adding an indirect map.
+ """
+ res = api.Command['automount_addindirectmap'](self.mapname, **self.map_kw)
+ assert res
+ assert res.get('automountinformation','') == self.mapname
+
+ def test_doshowkey(self):
+ """
+ Test the `xmlrpc.automount_showkey` method.
+ """
+ showkey_kw={'automountmapname': self.parentmap, 'automountkey': self.keyname}
+ res = api.Command['automount_showkey'](**showkey_kw)
+ assert res
+ assert res.get('automountkey','') == self.keyname
+
+ def test_remove_key(self):
+ """
+ Remove the indirect key /home
+ """
+ delkey_kw={'automountmapname': self.parentmap, 'automountkey': self.keyname}
+ res = api.Command['automount_delkey'](**delkey_kw)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['automount_showkey'](**delkey_kw)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ def test_remove_map(self):
+ """
+ Remove the indirect map for auto.home
+ """
+ res = api.Command['automount_delmap'](self.mapname)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['automount_showmap'](self.mapname)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
diff --git a/tests/test_xmlrpc/test_group_plugin.py b/tests/test_xmlrpc/test_group_plugin.py
new file mode 100644
index 00000000..2b16cc8a
--- /dev/null
+++ b/tests/test_xmlrpc/test_group_plugin.py
@@ -0,0 +1,178 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_group` module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+class test_Group(XMLRPC_test):
+ """
+ Test the `f_group` plugin.
+ """
+ cn='testgroup'
+ cn2='testgroup2'
+ description='This is a test'
+ kw={'description':description,'cn':cn}
+
+ def test_add(self):
+ """
+ Test the `xmlrpc.group_add` method.
+ """
+ res = api.Command['group_add'](**self.kw)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn
+
+ def test_add2(self):
+ """
+ Test the `xmlrpc.group_add` method duplicate detection.
+ """
+ try:
+ res = api.Command['group_add'](**self.kw)
+ except errors.DuplicateEntry:
+ pass
+
+ def test_add2(self):
+ """
+ Test the `xmlrpc.group_add` method.
+ """
+ self.kw['cn'] = self.cn2
+ res = api.Command['group_add'](**self.kw)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn2
+
+ def test_add_member(self):
+ """
+ Test the `xmlrpc.group_add_member` method.
+ """
+ kw={}
+ kw['groups'] = self.cn2
+ res = api.Command['group_add_member'](self.cn, **kw)
+ assert res == []
+
+ def test_add_member2(self):
+ """
+ Test the `xmlrpc.group_add_member` with a non-existent member
+ """
+ kw={}
+ kw['groups'] = "notfound"
+ res = api.Command['group_add_member'](self.cn, **kw)
+ # an error isn't thrown, the list of failed members is returned
+ assert res != []
+
+ def test_doshow(self):
+ """
+ Test the `xmlrpc.group_show` method.
+ """
+ res = api.Command['group_show'](self.cn)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn
+ assert res.get('member','').startswith('cn=%s' % self.cn2)
+
+ def test_find(self):
+ """
+ Test the `xmlrpc.group_find` method.
+ """
+ res = api.Command['group_find'](self.cn)
+ assert res
+ assert len(res) == 3
+ assert res[1].get('description','') == self.description
+ assert res[1].get('cn','') == self.cn
+
+ def test_mod(self):
+ """
+ Test the `xmlrpc.group_mod` method.
+ """
+ modkw = self.kw
+ modkw['cn'] = self.cn
+ modkw['description'] = 'New description'
+ res = api.Command['group_mod'](**modkw)
+ assert res
+ assert res.get('description','') == 'New description'
+
+ # Ok, double-check that it was changed
+ res = api.Command['group_show'](self.cn)
+ assert res
+ assert res.get('description','') == 'New description'
+ assert res.get('cn','') == self.cn
+
+ def test_remove_member(self):
+ """
+ Test the `xmlrpc.group_remove_member` method.
+ """
+ kw={}
+ kw['groups'] = self.cn2
+ res = api.Command['group_remove_member'](self.cn, **kw)
+
+ res = api.Command['group_show'](self.cn)
+ assert res
+ assert res.get('member','') == ''
+
+ def test_remove_member2(self):
+ """
+ Test the `xmlrpc.group_remove_member` method with non-member
+ """
+ kw={}
+ kw['groups'] = "notfound"
+ # an error isn't thrown, the list of failed members is returned
+ res = api.Command['group_remove_member'](self.cn, **kw)
+ assert res != []
+
+ def test_remove_x(self):
+ """
+ Test the `xmlrpc.group_del` method.
+ """
+ res = api.Command['group_del'](self.cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['group_show'](self.cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ def test_remove_x2(self):
+ """
+ Test the `xmlrpc.group_del` method.
+ """
+ res = api.Command['group_del'](self.cn2)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['group_show'](self.cn2)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
diff --git a/tests/test_xmlrpc/test_host_plugin.py b/tests/test_xmlrpc/test_host_plugin.py
new file mode 100644
index 00000000..515cd703
--- /dev/null
+++ b/tests/test_xmlrpc/test_host_plugin.py
@@ -0,0 +1,128 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_host` module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+class test_Host(XMLRPC_test):
+ """
+ Test the `f_host` plugin.
+ """
+ cn='ipaexample.%s' % api.env.domain
+ description='Test host'
+ localityname='Undisclosed location'
+ kw={'cn': cn, 'description': description, 'localityname': localityname}
+
+ def test_add(self):
+ """
+ Test the `xmlrpc.host_add` method.
+ """
+ res = api.Command['host_add'](**self.kw)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn
+ assert res.get('l','') == self.localityname
+
+ def test_doshow_all(self):
+ """
+ Test the `xmlrpc.host_show` method with all attributes.
+ """
+ kw={'cn':self.cn, 'all': True}
+ res = api.Command['host_show'](**kw)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn
+ assert res.get('l','') == self.localityname
+
+ def test_doshow_minimal(self):
+ """
+ Test the `xmlrpc.host_show` method with default attributes.
+ """
+ kw={'cn':self.cn}
+ res = api.Command['host_show'](**kw)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn
+ assert res.get('localityname','') == self.localityname
+
+ def test_find_all(self):
+ """
+ Test the `xmlrpc.host_find` method with all attributes.
+ """
+ kw={'cn':self.cn, 'all': True}
+ res = api.Command['host_find'](**kw)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('description','') == self.description
+ assert res[1].get('cn','') == self.cn
+ assert res[1].get('l','') == self.localityname
+
+ def test_find_minimal(self):
+ """
+ Test the `xmlrpc.host_find` method with default attributes.
+ """
+ res = api.Command['host_find'](self.cn)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('description','') == self.description
+ assert res[1].get('cn','') == self.cn
+ assert res[1].get('localityname','') == self.localityname
+
+ def test_mod(self):
+ """
+ Test the `xmlrpc.host_mod` method.
+ """
+ newdesc='Updated host'
+ modkw={'cn': self.cn, 'description': newdesc}
+ res = api.Command['host_mod'](**modkw)
+ assert res
+ assert res.get('description','') == newdesc
+
+ # Ok, double-check that it was changed
+ res = api.Command['host_show'](self.cn)
+ assert res
+ assert res.get('description','') == newdesc
+ assert res.get('cn','') == self.cn
+
+ def test_remove(self):
+ """
+ Test the `xmlrpc.host_del` method.
+ """
+ res = api.Command['host_del'](self.cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['host_show'](self.cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
diff --git a/tests/test_xmlrpc/test_hostgroup_plugin.py b/tests/test_xmlrpc/test_hostgroup_plugin.py
new file mode 100644
index 00000000..9180c1dd
--- /dev/null
+++ b/tests/test_xmlrpc/test_hostgroup_plugin.py
@@ -0,0 +1,149 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_hostgroup` module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+class test_Host(XMLRPC_test):
+ """
+ Test the `f_hostgroup` plugin.
+ """
+ cn='testgroup'
+ description='Test host group'
+ kw={'cn': cn, 'description': description}
+
+ host_cn='ipaexample.%s' % api.env.domain
+ host_description='Test host'
+ host_localityname='Undisclosed location'
+
+ def test_add(self):
+ """
+ Test the `xmlrpc.hostgroup_add` method.
+ """
+ res = api.Command['hostgroup_add'](**self.kw)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn
+
+ def test_addhost(self):
+ """
+ Add a host to test add/remove member.
+ """
+ kw={'cn': self.host_cn, 'description': self.host_description, 'localityname': self.host_localityname}
+ res = api.Command['host_add'](**kw)
+ assert res
+ assert res.get('description','') == self.host_description
+ assert res.get('cn','') == self.host_cn
+
+ def test_addmember(self):
+ """
+ Test the `xmlrpc.hostgroup_add_member` method.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['hostgroup_add_member'](self.cn, **kw)
+ assert res == []
+
+ def test_doshow(self):
+ """
+ Test the `xmlrpc.hostgroup_show` method.
+ """
+ res = api.Command['hostgroup_show'](self.cn)
+ assert res
+ assert res.get('description','') == self.description
+ assert res.get('cn','') == self.cn
+ assert res.get('member','').startswith('cn=%s' % self.host_cn)
+
+ def test_find(self):
+ """
+ Test the `xmlrpc.hostgroup_find` method.
+ """
+ res = api.Command['hostgroup_find'](self.cn)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('description','') == self.description
+ assert res[1].get('cn','') == self.cn
+ assert res[1].get('member','').startswith('cn=%s' % self.host_cn)
+
+ def test_mod(self):
+ """
+ Test the `xmlrpc.hostgroup_mod` method.
+ """
+ newdesc='Updated host group'
+ modkw={'cn': self.cn, 'description': newdesc}
+ res = api.Command['hostgroup_mod'](**modkw)
+ assert res
+ assert res.get('description','') == newdesc
+
+ # Ok, double-check that it was changed
+ res = api.Command['hostgroup_show'](self.cn)
+ assert res
+ assert res.get('description','') == newdesc
+ assert res.get('cn','') == self.cn
+
+ def test_member_remove(self):
+ """
+ Test the `xmlrpc.hostgroup_remove_member` method.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['hostgroup_remove_member'](self.cn, **kw)
+ assert res == []
+
+ def test_remove(self):
+ """
+ Test the `xmlrpc.hostgroup_del` method.
+ """
+ res = api.Command['hostgroup_del'](self.cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['hostgroup_show'](self.cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ def test_removehost(self):
+ """
+ Test the `xmlrpc.host_del` method.
+ """
+ res = api.Command['host_del'](self.host_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['host_show'](self.host_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
diff --git a/tests/test_xmlrpc/test_netgroup_plugin.py b/tests/test_xmlrpc/test_netgroup_plugin.py
new file mode 100644
index 00000000..3d3e4aff
--- /dev/null
+++ b/tests/test_xmlrpc/test_netgroup_plugin.py
@@ -0,0 +1,320 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_netgroup` module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+def is_member_of(members, candidate):
+ if not isinstance(members, list):
+ members = [members]
+ for m in members:
+ if m.startswith(candidate):
+ return True
+ return False
+
+class test_Netgroup(XMLRPC_test):
+ """
+ Test the `f_netgroup` plugin.
+ """
+ ng_cn='ng1'
+ ng_description='Netgroup'
+ ng_kw={'cn': ng_cn, 'description': ng_description}
+
+ host_cn='ipaexample.%s' % api.env.domain
+ host_description='Test host'
+ host_localityname='Undisclosed location'
+ host_kw={'cn': host_cn, 'description': host_description, 'localityname': host_localityname}
+
+ hg_cn='ng1'
+ hg_description='Netgroup'
+ hg_kw={'cn': hg_cn, 'description': hg_description}
+
+ user_uid='jexample'
+ user_givenname='Jim'
+ user_sn='Example'
+ user_home='/home/%s' % user_uid
+ user_kw={'givenname':user_givenname,'sn':user_sn,'uid':user_uid,'homedirectory':user_home}
+
+ group_cn='testgroup'
+ group_description='This is a test'
+ group_kw={'description':group_description,'cn':group_cn}
+
+ def test_add(self):
+ """
+ Test the `xmlrpc.netgroup_add` method.
+ """
+ res = api.Command['netgroup_add'](**self.ng_kw)
+ assert res
+ assert res.get('description','') == self.ng_description
+ assert res.get('cn','') == self.ng_cn
+
+ def test_adddata(self):
+ """
+ Add the data needed to do additional testing.
+ """
+
+ # Add a host
+ res = api.Command['host_add'](**self.host_kw)
+ assert res
+ assert res.get('description','') == self.host_description
+ assert res.get('cn','') == self.host_cn
+
+ # Add a hostgroup
+ res = api.Command['hostgroup_add'](**self.hg_kw)
+ assert res
+ assert res.get('description','') == self.hg_description
+ assert res.get('cn','') == self.hg_cn
+
+ # Add a user
+ res = api.Command['user_add'](**self.user_kw)
+ assert res
+ assert res.get('givenname','') == self.user_givenname
+ assert res.get('uid','') == self.user_uid
+
+ # Add a group
+ res = api.Command['group_add'](**self.group_kw)
+ assert res
+ assert res.get('description','') == self.group_description
+ assert res.get('cn','') == self.group_cn
+
+ def test_addmembers(self):
+ """
+ Test the `xmlrpc.netgroup_add_member` method.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+
+ def test_addmembers2(self):
+ """
+ Test the `xmlrpc.netgroup_add_member` method again to test dupes.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.host_cn)
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.hg_cn)
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'uid=%s' % self.user_uid)
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.group_cn)
+
+ def test_addexternalmembers(self):
+ """
+ Test adding external hosts
+ """
+ kw={}
+ kw['hosts'] = "nosuchhost"
+ res = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert res == []
+ res = api.Command['netgroup_show'](self.ng_cn)
+ assert res
+ assert is_member_of(res.get('externalhost',[]), kw['hosts'])
+
+ def test_doshow(self):
+ """
+ Test the `xmlrpc.netgroup_show` method.
+ """
+ res = api.Command['netgroup_show'](self.ng_cn)
+ assert res
+ assert res.get('description','') == self.ng_description
+ assert res.get('cn','') == self.ng_cn
+ assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.host_cn)
+ assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.hg_cn)
+ assert is_member_of(res.get('memberuser',[]), 'uid=%s' % self.user_uid)
+ assert is_member_of(res.get('memberuser',[]), 'cn=%s' % self.group_cn)
+
+ def test_find(self):
+ """
+ Test the `xmlrpc.hostgroup_find` method.
+ """
+ res = api.Command['netgroup_find'](self.ng_cn)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('description','') == self.ng_description
+ assert res[1].get('cn','') == self.ng_cn
+
+ def test_mod(self):
+ """
+ Test the `xmlrpc.hostgroup_mod` method.
+ """
+ newdesc='Updated host group'
+ modkw={'cn': self.ng_cn, 'description': newdesc}
+ res = api.Command['netgroup_mod'](**modkw)
+ assert res
+ assert res.get('description','') == newdesc
+
+ # Ok, double-check that it was changed
+ res = api.Command['netgroup_show'](self.ng_cn)
+ assert res
+ assert res.get('description','') == newdesc
+ assert res.get('cn','') == self.ng_cn
+
+ def test_member_remove(self):
+ """
+ Test the `xmlrpc.hostgroup_remove_member` method.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert res == []
+
+ def test_member_remove2(self):
+ """
+ Test the `xmlrpc.netgroup_remove_member` method again to test not found.
+ """
+ kw={}
+ kw['hosts'] = self.host_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.host_cn)
+
+ kw={}
+ kw['hostgroups'] = self.hg_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.hg_cn)
+
+ kw={}
+ kw['users'] = self.user_uid
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'uid=%s' % self.user_uid)
+
+ kw={}
+ kw['groups'] = self.group_cn
+ res = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert is_member_of(res, 'cn=%s' % self.group_cn)
+
+ def test_remove(self):
+ """
+ Test the `xmlrpc.netgroup_del` method.
+ """
+ res = api.Command['netgroup_del'](self.ng_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['netgroup_show'](self.ng_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ def test_removedata(self):
+ """
+ Remove the test data we added
+ """
+ # Remove the host
+ res = api.Command['host_del'](self.host_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['host_show'](self.host_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ # Remove the hostgroup
+ res = api.Command['hostgroup_del'](self.hg_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['hostgroup_show'](self.hg_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ # Remove the user
+ res = api.Command['user_del'](self.user_uid)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['user_show'](self.user_uid)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
+
+ # Remove the group
+ res = api.Command['group_del'](self.group_cn)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['group_show'](self.group_cn)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
diff --git a/tests/test_xmlrpc/test_service_plugin.py b/tests/test_xmlrpc/test_service_plugin.py
new file mode 100644
index 00000000..0a843d36
--- /dev/null
+++ b/tests/test_xmlrpc/test_service_plugin.py
@@ -0,0 +1,93 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_service` module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+class test_Service(XMLRPC_test):
+ """
+ Test the `f_service` plugin.
+ """
+ principal='HTTP/ipatest.%s@%s' % (api.env.domain, api.env.realm)
+ hostprincipal='host/ipatest.%s@%s' % (api.env.domain, api.env.realm)
+ kw={'principal':principal}
+
+ def test_add(self):
+ """
+ Test adding a HTTP principal using the `xmlrpc.service_add` method.
+ """
+ res = api.Command['service_add'](**self.kw)
+ assert res
+ assert res.get('krbprincipalname','') == self.principal
+
+ def test_add_host(self):
+ """
+ Test adding a host principal using `xmlrpc.service_add` method.
+ """
+ kw={'principal':self.hostprincipal}
+ try:
+ res = api.Command['service_add'](**kw)
+ except errors.HostService:
+ pass
+ else:
+ assert False
+
+ def test_doshow(self):
+ """
+ Test the `xmlrpc.service_show` method.
+ """
+ res = api.Command['service_show'](self.principal)
+ assert res
+ assert res.get('krbprincipalname','') == self.principal
+
+ def test_find(self):
+ """
+ Test the `xmlrpc.service_find` method.
+ """
+ res = api.Command['service_find'](self.principal)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('krbprincipalname','') == self.principal
+
+ def test_remove(self):
+ """
+ Test the `xmlrpc.service_del` method.
+ """
+ res = api.Command['service_del'](self.principal)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['service_show'](self.principal)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
diff --git a/tests/test_xmlrpc/test_user_plugin.py b/tests/test_xmlrpc/test_user_plugin.py
new file mode 100644
index 00000000..0189aa5a
--- /dev/null
+++ b/tests/test_xmlrpc/test_user_plugin.py
@@ -0,0 +1,151 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib/plugins/f_user` module.
+"""
+
+import sys
+from xmlrpc_test import XMLRPC_test
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+class test_User(XMLRPC_test):
+ """
+ Test the `f_user` plugin.
+ """
+ uid='jexample'
+ givenname='Jim'
+ sn='Example'
+ home='/home/%s' % uid
+ principalname='%s@%s' % (uid, api.env.realm)
+ kw={'givenname':givenname,'sn':sn,'uid':uid,'homedirectory':home}
+
+ def test_add(self):
+ """
+ Test the `xmlrpc.user_add` method.
+ """
+ res = api.Command['user_add'](**self.kw)
+ assert res
+ assert res.get('givenname','') == self.givenname
+ assert res.get('sn','') == self.sn
+ assert res.get('uid','') == self.uid
+ assert res.get('homedirectory','') == self.home
+
+ def test_add2(self):
+ """
+ Test the `xmlrpc.user_add` method duplicate detection.
+ """
+ try:
+ res = api.Command['user_add'](**self.kw)
+ except errors.DuplicateEntry:
+ pass
+
+ def test_doshow(self):
+ """
+ Test the `xmlrpc.user_show` method.
+ """
+ kw={'uid':self.uid, 'all': True}
+ res = api.Command['user_show'](**kw)
+ assert res
+ assert res.get('givenname','') == self.givenname
+ assert res.get('sn','') == self.sn
+ assert res.get('uid','') == self.uid
+ assert res.get('homedirectory','') == self.home
+ assert res.get('krbprincipalname','') == self.principalname
+
+ def test_find_all(self):
+ """
+ Test the `xmlrpc.user_find` method with all attributes.
+ """
+ kw={'uid':self.uid, 'all': True}
+ res = api.Command['user_find'](**kw)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('givenname','') == self.givenname
+ assert res[1].get('sn','') == self.sn
+ assert res[1].get('uid','') == self.uid
+ assert res[1].get('homedirectory','') == self.home
+ assert res[1].get('krbprincipalname','') == self.principalname
+
+ def test_find_minimal(self):
+ """
+ Test the `xmlrpc.user_find` method with minimal attributes.
+ """
+ res = api.Command['user_find'](self.uid)
+ assert res
+ assert len(res) == 2
+ assert res[1].get('givenname','') == self.givenname
+ assert res[1].get('sn','') == self.sn
+ assert res[1].get('uid','') == self.uid
+ assert res[1].get('homedirectory','') == self.home
+ assert res[1].get('krbprincipalname', None) == None
+
+ def test_lock(self):
+ """
+ Test the `xmlrpc.user_lock` method.
+ """
+ res = api.Command['user_lock'](self.uid)
+ assert res == True
+
+ def test_lockoff(self):
+ """
+ Test the `xmlrpc.user_unlock` method.
+ """
+ res = api.Command['user_unlock'](self.uid)
+ assert res == True
+
+ def test_mod(self):
+ """
+ Test the `xmlrpc.user_mod` method.
+ """
+ modkw = self.kw
+ modkw['givenname'] = 'Finkle'
+ res = api.Command['user_mod'](**modkw)
+ assert res
+ assert res.get('givenname','') == 'Finkle'
+ assert res.get('sn','') == self.sn
+
+ # Ok, double-check that it was changed
+ res = api.Command['user_show'](self.uid)
+ assert res
+ assert res.get('givenname','') == 'Finkle'
+ assert res.get('sn','') == self.sn
+ assert res.get('uid','') == self.uid
+
+ def test_remove(self):
+ """
+ Test the `xmlrpc.user_del` method.
+ """
+ res = api.Command['user_del'](self.uid)
+ assert res == True
+
+ # Verify that it is gone
+ try:
+ res = api.Command['user_show'](self.uid)
+ except errors.NotFound:
+ pass
+ else:
+ assert False
diff --git a/tests/test_xmlrpc/xmlrpc_test.py b/tests/test_xmlrpc/xmlrpc_test.py
new file mode 100644
index 00000000..744c0c27
--- /dev/null
+++ b/tests/test_xmlrpc/xmlrpc_test.py
@@ -0,0 +1,49 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Base class for all XML-RPC tests
+"""
+
+import sys
+import socket
+import nose
+from ipalib import api
+from ipalib import errors
+from ipalib.cli import CLI
+
+try:
+ api.finalize()
+except StandardError:
+ pass
+
+class XMLRPC_test:
+ """
+ Base class for all XML-RPC plugin tests
+ """
+
+ def setUp(self):
+ # FIXME: changing Plugin.name from a property to an instance attribute
+ # somehow broke this.
+ try:
+ res = api.Command['user_show']('notfound')
+ except socket.error:
+ raise nose.SkipTest
+ except errors.NotFound:
+ pass
diff --git a/tests/util.py b/tests/util.py
new file mode 100644
index 00000000..f5899dfa
--- /dev/null
+++ b/tests/util.py
@@ -0,0 +1,391 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Common utility functions and classes for unit tests.
+"""
+
+import inspect
+import os
+from os import path
+import tempfile
+import shutil
+import ipalib
+from ipalib.plugable import Plugin
+from ipalib.request import context
+
+
+
+class TempDir(object):
+ def __init__(self):
+ self.__path = tempfile.mkdtemp(prefix='ipa.tests.')
+ assert self.path == self.__path
+
+ def __get_path(self):
+ assert path.abspath(self.__path) == self.__path
+ assert self.__path.startswith('/tmp/ipa.tests.')
+ assert path.isdir(self.__path) and not path.islink(self.__path)
+ return self.__path
+ path = property(__get_path)
+
+ def rmtree(self):
+ if self.__path is not None:
+ shutil.rmtree(self.path)
+ self.__path = None
+
+ def makedirs(self, *parts):
+ d = self.join(*parts)
+ if not path.exists(d):
+ os.makedirs(d)
+ assert path.isdir(d) and not path.islink(d)
+ return d
+
+ def touch(self, *parts):
+ d = self.makedirs(*parts[:-1])
+ f = path.join(d, parts[-1])
+ assert not path.exists(f)
+ open(f, 'w').close()
+ assert path.isfile(f) and not path.islink(f)
+ return f
+
+ def write(self, content, *parts):
+ d = self.makedirs(*parts[:-1])
+ f = path.join(d, parts[-1])
+ assert not path.exists(f)
+ open(f, 'w').write(content)
+ assert path.isfile(f) and not path.islink(f)
+ return f
+
+ def join(self, *parts):
+ return path.join(self.path, *parts)
+
+ def __del__(self):
+ self.rmtree()
+
+
+class TempHome(TempDir):
+ def __init__(self):
+ super(TempHome, self).__init__()
+ self.__home = os.environ['HOME']
+ os.environ['HOME'] = self.path
+
+
+class ExceptionNotRaised(Exception):
+ """
+ Exception raised when an *expected* exception is *not* raised during a
+ unit test.
+ """
+ msg = 'expected %s'
+
+ def __init__(self, expected):
+ self.expected = expected
+
+ def __str__(self):
+ return self.msg % self.expected.__name__
+
+
+def assert_equal(val1, val2):
+ """
+ Assert ``val1`` and ``val2`` are the same type and of equal value.
+ """
+ assert type(val1) is type(val2), '%r != %r' % (val1, val2)
+ assert val1 == val2, '%r != %r' % (val1, val2)
+
+
+def raises(exception, callback, *args, **kw):
+ """
+ Tests that the expected exception is raised; raises ExceptionNotRaised
+ if test fails.
+ """
+ raised = False
+ try:
+ callback(*args, **kw)
+ except exception, e:
+ raised = True
+ if not raised:
+ raise ExceptionNotRaised(exception)
+ return e
+
+
+def getitem(obj, key):
+ """
+ Works like getattr but for dictionary interface. Use this in combination
+ with raises() to test that, for example, KeyError is raised.
+ """
+ return obj[key]
+
+
+def setitem(obj, key, value):
+ """
+ Works like setattr but for dictionary interface. Use this in combination
+ with raises() to test that, for example, TypeError is raised.
+ """
+ obj[key] = value
+
+
+def delitem(obj, key):
+ """
+ Works like delattr but for dictionary interface. Use this in combination
+ with raises() to test that, for example, TypeError is raised.
+ """
+ del obj[key]
+
+
+def no_set(obj, name, value='some_new_obj'):
+ """
+ Tests that attribute cannot be set.
+ """
+ raises(AttributeError, setattr, obj, name, value)
+
+
+def no_del(obj, name):
+ """
+ Tests that attribute cannot be deleted.
+ """
+ raises(AttributeError, delattr, obj, name)
+
+
+def read_only(obj, name, value='some_new_obj'):
+ """
+ Tests that attribute is read-only. Returns attribute.
+ """
+ # Test that it cannot be set:
+ no_set(obj, name, value)
+
+ # Test that it cannot be deleted:
+ no_del(obj, name)
+
+ # Return the attribute
+ return getattr(obj, name)
+
+
+def is_prop(prop):
+ return type(prop) is property
+
+
+class ClassChecker(object):
+ __cls = None
+ __subcls = None
+
+ def __get_cls(self):
+ if self.__cls is None:
+ self.__cls = self._cls
+ assert inspect.isclass(self.__cls)
+ return self.__cls
+ cls = property(__get_cls)
+
+ def __get_subcls(self):
+ if self.__subcls is None:
+ self.__subcls = self.get_subcls()
+ assert inspect.isclass(self.__subcls)
+ return self.__subcls
+ subcls = property(__get_subcls)
+
+ def get_subcls(self):
+ raise NotImplementedError(
+ self.__class__.__name__,
+ 'get_subcls()'
+ )
+
+ def tearDown(self):
+ """
+ nose tear-down fixture.
+ """
+ for name in ('ugettext', 'ungettext'):
+ if hasattr(context, name):
+ delattr(context, name)
+
+
+
+
+
+
+
+def check_TypeError(value, type_, name, callback, *args, **kw):
+ """
+ Tests a standard TypeError raised with `errors.raise_TypeError`.
+ """
+ e = raises(TypeError, callback, *args, **kw)
+ assert e.value is value
+ assert e.type is type_
+ assert e.name == name
+ assert type(e.name) is str
+ assert str(e) == ipalib.errors.TYPE_FORMAT % (name, type_, value)
+ return e
+
+
+def get_api(**kw):
+ """
+ Returns (api, home) tuple.
+
+ This function returns a tuple containing an `ipalib.plugable.API`
+ instance and a `TempHome` instance.
+ """
+ home = TempHome()
+ api = ipalib.create_api(mode='unit_test')
+ api.env.in_tree = True
+ for (key, value) in kw.iteritems():
+ api.env[key] = value
+ return (api, home)
+
+
+def create_test_api(**kw):
+ """
+ Returns (api, home) tuple.
+
+ This function returns a tuple containing an `ipalib.plugable.API`
+ instance and a `TempHome` instance.
+ """
+ home = TempHome()
+ api = ipalib.create_api(mode='unit_test')
+ api.env.in_tree = True
+ for (key, value) in kw.iteritems():
+ api.env[key] = value
+ return (api, home)
+
+
+class PluginTester(object):
+ __plugin = None
+
+ def __get_plugin(self):
+ if self.__plugin is None:
+ self.__plugin = self._plugin
+ assert issubclass(self.__plugin, Plugin)
+ return self.__plugin
+ plugin = property(__get_plugin)
+
+ def register(self, *plugins, **kw):
+ """
+ Create a testing api and register ``self.plugin``.
+
+ This method returns an (api, home) tuple.
+
+ :param plugins: Additional \*plugins to register.
+ :param kw: Additional \**kw args to pass to `create_test_api`.
+ """
+ (api, home) = create_test_api(**kw)
+ api.register(self.plugin)
+ for p in plugins:
+ api.register(p)
+ return (api, home)
+
+ def finalize(self, *plugins, **kw):
+ (api, home) = self.register(*plugins, **kw)
+ api.finalize()
+ return (api, home)
+
+ def instance(self, namespace, *plugins, **kw):
+ (api, home) = self.finalize(*plugins, **kw)
+ o = api[namespace][self.plugin.__name__]
+ return (o, api, home)
+
+
+class dummy_ugettext(object):
+ __called = False
+
+ def __init__(self, translation=None):
+ if translation is None:
+ translation = u'The translation'
+ self.translation = translation
+ assert type(self.translation) is unicode
+
+ def __call__(self, message):
+ assert self.__called is False
+ self.__called = True
+ assert type(message) is str
+ assert not hasattr(self, 'message')
+ self.message = message
+ assert type(self.translation) is unicode
+ return self.translation
+
+ def called(self):
+ return self.__called
+
+ def reset(self):
+ assert type(self.translation) is unicode
+ assert type(self.message) is str
+ del self.message
+ assert self.__called is True
+ self.__called = False
+
+
+class dummy_ungettext(object):
+ __called = False
+
+ def __init__(self):
+ self.translation_singular = u'The singular translation'
+ self.translation_plural = u'The plural translation'
+
+ def __call__(self, singular, plural, n):
+ assert type(singular) is str
+ assert type(plural) is str
+ assert type(n) is int
+ assert self.__called is False
+ self.__called = True
+ self.singular = singular
+ self.plural = plural
+ self.n = n
+ if n == 1:
+ return self.translation_singular
+ return self.translation_plural
+
+
+class DummyMethod(object):
+ def __init__(self, callback, name):
+ self.__callback = callback
+ self.__name = name
+
+ def __call__(self, *args, **kw):
+ return self.__callback(self.__name, args, kw)
+
+
+class DummyClass(object):
+ def __init__(self, *calls):
+ self.__calls = calls
+ self.__i = 0
+ for (name, args, kw, result) in calls:
+ method = DummyMethod(self.__process, name)
+ setattr(self, name, method)
+
+ def __process(self, name_, args_, kw_):
+ if self.__i >= len(self.__calls):
+ raise AssertionError(
+ 'extra call: %s, %r, %r' % (name, args, kw)
+ )
+ (name, args, kw, result) = self.__calls[self.__i]
+ self.__i += 1
+ i = self.__i
+ if name_ != name:
+ raise AssertionError(
+ 'call %d should be to method %r; got %r' % (i, name, name_)
+ )
+ if args_ != args:
+ raise AssertionError(
+ 'call %d to %r should have args %r; got %r' % (i, name, args, args_)
+ )
+ if kw_ != kw:
+ raise AssertionError(
+ 'call %d to %r should have kw %r, got %r' % (i, name, kw, kw_)
+ )
+ if isinstance(result, Exception):
+ raise result
+ return result
+
+ def _calledall(self):
+ return self.__i == len(self.__calls)