diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-01-22 10:42:41 -0500 |
---|---|---|
committer | Rob Crittenden <rcritten@redhat.com> | 2009-01-22 10:42:41 -0500 |
commit | c2967a675a288e7d31374229fd974d0cb9966f2c (patch) | |
tree | 58be8ca6319f4660d9f18b97a37b9c0c56104d02 /tests | |
parent | 2b8b87b4d6c3b4389a0a7bf48c225035c53e7ad1 (diff) | |
parent | 5d82e3b35a8fb2d4c25f282cddad557a7650197c (diff) | |
download | freeipa.git-c2967a675a288e7d31374229fd974d0cb9966f2c.tar.gz freeipa.git-c2967a675a288e7d31374229fd974d0cb9966f2c.tar.xz freeipa.git-c2967a675a288e7d31374229fd974d0cb9966f2c.zip |
Merge branch 'master' of git://fedorapeople.org/~jderose/freeipa2
Diffstat (limited to 'tests')
31 files changed, 7327 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..4550e6bc --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,22 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Package containing all unit tests. +""" diff --git a/tests/data.py b/tests/data.py new file mode 100644 index 00000000..cf646ea9 --- /dev/null +++ b/tests/data.py @@ -0,0 +1,38 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Data frequently used in the unit tests, especially Unicode related tests. +""" + +import struct + + +# A string that should have bytes 'x\00' through '\xff': +binary_bytes = ''.join(struct.pack('B', d) for d in xrange(256)) +assert '\x00' in binary_bytes and '\xff' in binary_bytes +assert type(binary_bytes) is str and len(binary_bytes) == 256 + +# A UTF-8 encoded str: +utf8_bytes = '\xd0\x9f\xd0\xb0\xd0\xb2\xd0\xb5\xd0\xbb' + +# The same UTF-8 data decoded (a unicode instance): +unicode_str = u'\u041f\u0430\u0432\u0435\u043b' +assert utf8_bytes.decode('UTF-8') == unicode_str +assert unicode_str.encode('UTF-8') == utf8_bytes diff --git a/tests/test_ipalib/__init__.py b/tests/test_ipalib/__init__.py new file mode 100644 index 00000000..113881eb --- /dev/null +++ b/tests/test_ipalib/__init__.py @@ -0,0 +1,22 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Sub-package containing unit tests for `ipalib` package. +""" diff --git a/tests/test_ipalib/test_backend.py b/tests/test_ipalib/test_backend.py new file mode 100644 index 00000000..88bd2da4 --- /dev/null +++ b/tests/test_ipalib/test_backend.py @@ -0,0 +1,55 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.backend` module. +""" + +from ipalib import backend, plugable, errors +from tests.util import ClassChecker, raises + + +class test_Backend(ClassChecker): + """ + Test the `ipalib.backend.Backend` class. + """ + + _cls = backend.Backend + + def test_class(self): + assert self.cls.__bases__ == (plugable.Plugin,) + assert self.cls.__proxy__ is False + + +class test_Context(ClassChecker): + """ + Test the `ipalib.backend.Context` class. + """ + + _cls = backend.Context + + def test_get_value(self): + """ + Test the `ipalib.backend.Context.get_value` method. + """ + class Subclass(self.cls): + pass + o = Subclass() + e = raises(NotImplementedError, o.get_value) + assert str(e) == 'Subclass.get_value()' diff --git a/tests/test_ipalib/test_base.py b/tests/test_ipalib/test_base.py new file mode 100644 index 00000000..ce88f23f --- /dev/null +++ b/tests/test_ipalib/test_base.py @@ -0,0 +1,352 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.base` module. +""" + +from tests.util import ClassChecker, raises +from ipalib.constants import NAME_REGEX, NAME_ERROR +from ipalib.constants import TYPE_ERROR, SET_ERROR, DEL_ERROR, OVERRIDE_ERROR +from ipalib import base + + +class test_ReadOnly(ClassChecker): + """ + Test the `ipalib.base.ReadOnly` class + """ + _cls = base.ReadOnly + + def test_lock(self): + """ + Test the `ipalib.base.ReadOnly.__lock__` method. + """ + o = self.cls() + assert o._ReadOnly__locked is False + o.__lock__() + assert o._ReadOnly__locked is True + e = raises(AssertionError, o.__lock__) # Can only be locked once + assert str(e) == '__lock__() can only be called once' + assert o._ReadOnly__locked is True # This should still be True + + def test_islocked(self): + """ + Test the `ipalib.base.ReadOnly.__islocked__` method. + """ + o = self.cls() + assert o.__islocked__() is False + o.__lock__() + assert o.__islocked__() is True + + def test_setattr(self): + """ + Test the `ipalib.base.ReadOnly.__setattr__` method. + """ + o = self.cls() + o.attr1 = 'Hello, world!' + assert o.attr1 == 'Hello, world!' + o.__lock__() + for name in ('attr1', 'attr2'): + e = raises(AttributeError, setattr, o, name, 'whatever') + assert str(e) == SET_ERROR % ('ReadOnly', name, 'whatever') + assert o.attr1 == 'Hello, world!' + + def test_delattr(self): + """ + Test the `ipalib.base.ReadOnly.__delattr__` method. + """ + o = self.cls() + o.attr1 = 'Hello, world!' + o.attr2 = 'How are you?' + assert o.attr1 == 'Hello, world!' + assert o.attr2 == 'How are you?' + del o.attr1 + assert not hasattr(o, 'attr1') + o.__lock__() + e = raises(AttributeError, delattr, o, 'attr2') + assert str(e) == DEL_ERROR % ('ReadOnly', 'attr2') + assert o.attr2 == 'How are you?' + + +def test_lock(): + """ + Test the `ipalib.base.lock` function + """ + f = base.lock + + # Test with ReadOnly instance: + o = base.ReadOnly() + assert o.__islocked__() is False + assert f(o) is o + assert o.__islocked__() is True + e = raises(AssertionError, f, o) + assert str(e) == 'already locked: %r' % o + + # Test with another class implemented locking protocol: + class Lockable(object): + __locked = False + def __lock__(self): + self.__locked = True + def __islocked__(self): + return self.__locked + o = Lockable() + assert o.__islocked__() is False + assert f(o) is o + assert o.__islocked__() is True + e = raises(AssertionError, f, o) + assert str(e) == 'already locked: %r' % o + + # Test with a class incorrectly implementing the locking protocol: + class Broken(object): + def __lock__(self): + pass + def __islocked__(self): + return False + o = Broken() + e = raises(AssertionError, f, o) + assert str(e) == 'failed to lock: %r' % o + + +def test_islocked(): + """ + Test the `ipalib.base.islocked` function. + """ + f = base.islocked + + # Test with ReadOnly instance: + o = base.ReadOnly() + assert f(o) is False + o.__lock__() + assert f(o) is True + + # Test with another class implemented locking protocol: + class Lockable(object): + __locked = False + def __lock__(self): + self.__locked = True + def __islocked__(self): + return self.__locked + o = Lockable() + assert f(o) is False + o.__lock__() + assert f(o) is True + + # Test with a class incorrectly implementing the locking protocol: + class Broken(object): + __lock__ = False + def __islocked__(self): + return False + o = Broken() + e = raises(AssertionError, f, o) + assert str(e) == 'no __lock__() method: %r' % o + + +def test_check_name(): + """ + Test the `ipalib.base.check_name` function. + """ + f = base.check_name + okay = [ + 'user_add', + 'stuff2junk', + 'sixty9', + ] + nope = [ + '_user_add', + '__user_add', + 'user_add_', + 'user_add__', + '_user_add_', + '__user_add__', + '60nine', + ] + for name in okay: + assert name is f(name) + e = raises(TypeError, f, unicode(name)) + assert str(e) == TYPE_ERROR % ('name', str, unicode(name), unicode) + for name in nope: + e = raises(ValueError, f, name) + assert str(e) == NAME_ERROR % (NAME_REGEX, name) + for name in okay: + e = raises(ValueError, f, name.upper()) + assert str(e) == NAME_ERROR % (NAME_REGEX, name.upper()) + + +def membername(i): + return 'member%03d' % i + + +class DummyMember(object): + def __init__(self, i): + self.i = i + self.name = membername(i) + + +def gen_members(*indexes): + return tuple(DummyMember(i) for i in indexes) + + +class test_NameSpace(ClassChecker): + """ + Test the `ipalib.base.NameSpace` class. + """ + _cls = base.NameSpace + + def new(self, count, sort=True): + members = tuple(DummyMember(i) for i in xrange(count, 0, -1)) + assert len(members) == count + o = self.cls(members, sort=sort) + return (o, members) + + def test_init(self): + """ + Test the `ipalib.base.NameSpace.__init__` method. + """ + o = self.cls([]) + assert len(o) == 0 + assert list(o) == [] + assert list(o()) == [] + + # Test members as attribute and item: + for cnt in (3, 42): + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + assert len(members) == cnt + for m in members: + assert getattr(o, m.name) is m + assert o[m.name] is m + + # Test that TypeError is raised if sort is not a bool: + e = raises(TypeError, self.cls, [], sort=None) + assert str(e) == TYPE_ERROR % ('sort', bool, None, type(None)) + + # Test that AttributeError is raised with duplicate member name: + members = gen_members(0, 1, 2, 1, 3) + e = raises(AttributeError, self.cls, members) + assert str(e) == OVERRIDE_ERROR % ( + 'NameSpace', membername(1), members[1], members[3] + ) + + def test_len(self): + """ + Test the `ipalib.base.NameSpace.__len__` method. + """ + for count in (5, 18, 127): + (o, members) = self.new(count) + assert len(o) == count + (o, members) = self.new(count, sort=False) + assert len(o) == count + + def test_iter(self): + """ + Test the `ipalib.base.NameSpace.__iter__` method. + """ + (o, members) = self.new(25) + assert list(o) == sorted(m.name for m in members) + (o, members) = self.new(25, sort=False) + assert list(o) == list(m.name for m in members) + + def test_call(self): + """ + Test the `ipalib.base.NameSpace.__call__` method. + """ + (o, members) = self.new(25) + assert list(o()) == sorted(members, key=lambda m: m.name) + (o, members) = self.new(25, sort=False) + assert tuple(o()) == members + + def test_contains(self): + """ + Test the `ipalib.base.NameSpace.__contains__` method. + """ + yes = (99, 3, 777) + no = (9, 333, 77) + for sort in (True, False): + members = gen_members(*yes) + o = self.cls(members, sort=sort) + for i in yes: + assert membername(i) in o + assert membername(i).upper() not in o + for i in no: + assert membername(i) not in o + + def test_getitem(self): + """ + Test the `ipalib.base.NameSpace.__getitem__` method. + """ + cnt = 17 + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + assert len(members) == cnt + if sort is True: + members = tuple(sorted(members, key=lambda m: m.name)) + + # Test str keys: + for m in members: + assert o[m.name] is m + e = raises(KeyError, o.__getitem__, 'nope') + + # Test int indexes: + for i in xrange(cnt): + assert o[i] is members[i] + e = raises(IndexError, o.__getitem__, cnt) + + # Test negative int indexes: + for i in xrange(1, cnt + 1): + assert o[-i] is members[-i] + e = raises(IndexError, o.__getitem__, -(cnt + 1)) + + # Test slicing: + assert o[3:] == members[3:] + assert o[:10] == members[:10] + assert o[3:10] == members[3:10] + assert o[-9:] == members[-9:] + assert o[:-4] == members[:-4] + assert o[-9:-4] == members[-9:-4] + + # Test that TypeError is raised with wrong type + e = raises(TypeError, o.__getitem__, 3.0) + assert str(e) == TYPE_ERROR % ('key', (str, int, slice), 3.0, float) + + def test_repr(self): + """ + Test the `ipalib.base.NameSpace.__repr__` method. + """ + for cnt in (0, 1, 2): + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + if cnt == 1: + assert repr(o) == \ + 'NameSpace(<%d member>, sort=%r)' % (cnt, sort) + else: + assert repr(o) == \ + 'NameSpace(<%d members>, sort=%r)' % (cnt, sort) + + def test_todict(self): + """ + Test the `ipalib.base.NameSpace.__todict__` method. + """ + for cnt in (3, 101): + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + d = o.__todict__() + assert d == dict((m.name, m) for m in members) + + # Test that a copy is returned: + assert o.__todict__() is not d diff --git a/tests/test_ipalib/test_cli.py b/tests/test_ipalib/test_cli.py new file mode 100644 index 00000000..56297fdf --- /dev/null +++ b/tests/test_ipalib/test_cli.py @@ -0,0 +1,277 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.cli` module. +""" + +from tests.util import raises, get_api, ClassChecker +from ipalib import cli, plugable, frontend, backend + + +class test_textui(ClassChecker): + _cls = cli.textui + + def test_max_col_width(self): + """ + Test the `ipalib.cli.textui.max_col_width` method. + """ + o = self.cls() + e = raises(TypeError, o.max_col_width, 'hello') + assert str(e) == 'rows: need %r or %r; got %r' % (list, tuple, 'hello') + rows = [ + 'hello', + 'naughty', + 'nurse', + ] + assert o.max_col_width(rows) == len('naughty') + rows = ( + ( 'a', 'bbb', 'ccccc'), + ('aa', 'bbbb', 'cccccc'), + ) + assert o.max_col_width(rows, col=0) == 2 + assert o.max_col_width(rows, col=1) == 4 + assert o.max_col_width(rows, col=2) == 6 + + +def test_to_cli(): + """ + Test the `ipalib.cli.to_cli` function. + """ + f = cli.to_cli + assert f('initialize') == 'initialize' + assert f('user_add') == 'user-add' + + +def test_from_cli(): + """ + Test the `ipalib.cli.from_cli` function. + """ + f = cli.from_cli + assert f('initialize') == 'initialize' + assert f('user-add') == 'user_add' + + +def get_cmd_name(i): + return 'cmd_%d' % i + + +class DummyCommand(object): + def __init__(self, name): + self.__name = name + + def __get_name(self): + return self.__name + name = property(__get_name) + + +class DummyAPI(object): + def __init__(self, cnt): + self.__cmd = plugable.NameSpace(self.__cmd_iter(cnt)) + + def __get_cmd(self): + return self.__cmd + Command = property(__get_cmd) + + def __cmd_iter(self, cnt): + for i in xrange(cnt): + yield DummyCommand(get_cmd_name(i)) + + def finalize(self): + pass + + def register(self, *args, **kw): + pass + + +config_cli = """ +[global] + +from_cli_conf = set in cli.conf +""" + +config_default = """ +[global] + +from_default_conf = set in default.conf + +# Make sure cli.conf is loaded first: +from_cli_conf = overridden in default.conf +""" + + +class test_CLI(ClassChecker): + """ + Test the `ipalib.cli.CLI` class. + """ + _cls = cli.CLI + + def new(self, argv=tuple()): + (api, home) = get_api() + o = self.cls(api, argv) + assert o.api is api + return (o, api, home) + + def check_cascade(self, *names): + (o, api, home) = self.new() + method = getattr(o, names[0]) + for name in names: + assert o.isdone(name) is False + method() + for name in names: + assert o.isdone(name) is True + e = raises(StandardError, method) + assert str(e) == 'CLI.%s() already called' % names[0] + + def test_init(self): + """ + Test the `ipalib.cli.CLI.__init__` method. + """ + argv = ['-v', 'user-add', '--first=Jonh', '--last=Doe'] + (o, api, home) = self.new(argv) + assert o.api is api + assert o.argv == tuple(argv) + + def test_run_real(self): + """ + Test the `ipalib.cli.CLI.run_real` method. + """ + self.check_cascade( + 'run_real', + 'finalize', + 'load_plugins', + 'bootstrap', + 'parse_globals' + ) + + def test_finalize(self): + """ + Test the `ipalib.cli.CLI.finalize` method. + """ + self.check_cascade( + 'finalize', + 'load_plugins', + 'bootstrap', + 'parse_globals' + ) + + (o, api, home) = self.new() + assert api.isdone('finalize') is False + assert 'Command' not in api + o.finalize() + assert api.isdone('finalize') is True + assert list(api.Command) == \ + sorted(k.__name__ for k in cli.cli_application_commands) + + def test_load_plugins(self): + """ + Test the `ipalib.cli.CLI.load_plugins` method. + """ + self.check_cascade( + 'load_plugins', + 'bootstrap', + 'parse_globals' + ) + (o, api, home) = self.new() + assert api.isdone('load_plugins') is False + o.load_plugins() + assert api.isdone('load_plugins') is True + + def test_bootstrap(self): + """ + Test the `ipalib.cli.CLI.bootstrap` method. + """ + self.check_cascade( + 'bootstrap', + 'parse_globals' + ) + # Test with empty argv + (o, api, home) = self.new() + keys = tuple(api.env) + assert api.isdone('bootstrap') is False + o.bootstrap() + assert api.isdone('bootstrap') is True + e = raises(StandardError, o.bootstrap) + assert str(e) == 'CLI.bootstrap() already called' + assert api.env.verbose is False + assert api.env.context == 'cli' + keys = tuple(api.env) + added = ( + 'my_key', + 'from_default_conf', + 'from_cli_conf' + ) + for key in added: + assert key not in api.env + assert key not in keys + + # Test with a populated argv + argv = ['-e', 'my_key=my_val,whatever=Hello'] + (o, api, home) = self.new(argv) + home.write(config_default, '.ipa', 'default.conf') + home.write(config_cli, '.ipa', 'cli.conf') + o.bootstrap() + assert api.env.my_key == 'my_val,whatever=Hello' + assert api.env.from_default_conf == 'set in default.conf' + assert api.env.from_cli_conf == 'set in cli.conf' + assert list(api.env) == sorted(keys + added) + + def test_parse_globals(self): + """ + Test the `ipalib.cli.CLI.parse_globals` method. + """ + # Test with empty argv: + (o, api, home) = self.new() + assert not hasattr(o, 'options') + assert not hasattr(o, 'cmd_argv') + assert o.isdone('parse_globals') is False + o.parse_globals() + assert o.isdone('parse_globals') is True + assert o.options.prompt_all is False + assert o.options.interactive is True + assert o.options.verbose is None + assert o.options.conf is None + assert o.options.env is None + assert o.cmd_argv == tuple() + e = raises(StandardError, o.parse_globals) + assert str(e) == 'CLI.parse_globals() already called' + + # Test with a populated argv: + argv = ('-a', '-n', '-v', '-c', '/my/config.conf', '-e', 'my_key=my_val') + cmd_argv = ('user-add', '--first', 'John', '--last', 'Doe') + (o, api, home) = self.new(argv + cmd_argv) + assert not hasattr(o, 'options') + assert not hasattr(o, 'cmd_argv') + assert o.isdone('parse_globals') is False + o.parse_globals() + assert o.isdone('parse_globals') is True + assert o.options.prompt_all is True + assert o.options.interactive is False + assert o.options.verbose is True + assert o.options.conf == '/my/config.conf' + assert o.options.env == ['my_key=my_val'] + assert o.cmd_argv == cmd_argv + e = raises(StandardError, o.parse_globals) + assert str(e) == 'CLI.parse_globals() already called' + + # Test with multiple -e args: + argv = ('-e', 'key1=val1', '-e', 'key2=val2') + (o, api, home) = self.new(argv) + o.parse_globals() + assert o.options.env == ['key1=val1', 'key2=val2'] diff --git a/tests/test_ipalib/test_config.py b/tests/test_ipalib/test_config.py new file mode 100644 index 00000000..d3109f7b --- /dev/null +++ b/tests/test_ipalib/test_config.py @@ -0,0 +1,608 @@ +# Authors: +# Martin Nagy <mnagy@redhat.com> +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.config` module. +""" + +import os +from os import path +import sys +from tests.util import raises, setitem, delitem, ClassChecker +from tests.util import getitem, setitem, delitem +from tests.util import TempDir, TempHome +from ipalib.constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR +from ipalib.constants import NAME_REGEX, NAME_ERROR +from ipalib import config, constants, base + + +# Valid environment variables in (key, raw, value) tuples: +# key: the name of the environment variable +# raw: the value being set (possibly a string repr) +# value: the expected value after the lightweight conversion +good_vars = ( + ('a_string', 'Hello world!', 'Hello world!'), + ('trailing_whitespace', ' value ', 'value'), + ('an_int', 42, 42), + ('int_repr', ' 42 ', 42), + ('a_float', 3.14, 3.14), + ('float_repr', ' 3.14 ', 3.14), + ('true', True, True), + ('true_repr', ' True ', True), + ('false', False, False), + ('false_repr', ' False ', False), + ('none', None, None), + ('none_repr', ' None ', None), + ('empty', '', None), + + # These verify that the implied conversion is case-sensitive: + ('not_true', ' true ', 'true'), + ('not_false', ' false ', 'false'), + ('not_none', ' none ', 'none'), +) + + +bad_names = ( + ('CamelCase', 'value'), + ('_leading_underscore', 'value'), + ('trailing_underscore_', 'value'), +) + + +# Random base64-encoded data to simulate a misbehaving config file. +config_bad = """ +/9j/4AAQSkZJRgABAQEAlgCWAAD//gAIT2xpdmVy/9sAQwAQCwwODAoQDg0OEhEQExgoGhgWFhgx +IyUdKDozPTw5Mzg3QEhcTkBEV0U3OFBtUVdfYmdoZz5NcXlwZHhcZWdj/8AACwgAlgB0AQERAP/E +ABsAAAEFAQEAAAAAAAAAAAAAAAQAAQIDBQYH/8QAMhAAAgICAAUDBAIABAcAAAAAAQIAAwQRBRIh +MUEGE1EiMmFxFIEVI0LBFjNSYnKRof/aAAgBAQAAPwDCtzmNRr1o/MEP1D6f7kdkRakgBsAtoQhk +xls/y3Z113I11mhiUc1ewCf1Oq4anJgINdhLhQoextfedmYrenfcvdzaFQnYAE08XhONTWEK8+js +Fpo1oqAKoAA8CWjoJJTHM8kJ5jsiOiszAKD1+IV/hmW76rosbfnlh1Pp3Mah2srCnXQE9YXiel/c +p5r7uVj2CwxPTuFjjmdLbteNwmrLwsYe3TjsD8cmjKV43ycy+3o76D4llFuXmuCoZEPczXVOSsLv +f5lgGpNZLxJL2jnvMar0/wAOp6jHDH/uO4RViY9f/KpRdfC6k3R9fRyj+pRZVkWKqF10e+hCKaFq +XlH/ALlmhK7Met/uUGZ5ow8XL57lU8/Yt4lx4jUOJphLobTe/wDaHeZLxHXtJEya9o5lFzCqpmPY +CUYoPtDfc9TLj0G5jZvHaMFirAs++oEHq9U4rbNiMp8a6wO/1Zbzn2alC+Nx8P1JfdeBboA+AILx +rin8pfbA1ynvKuFUXZOXXkLbzOp2R56andL2G45MmO0RPWWLEe8GzaffoKb/ADI44Pt9ZXxAuuFa +axtgp0BOSPCcviNX8n3Aw8KTNHB4FiY9StkobLWHVSeghq8M4bkAhKKyV6Hl8RV8MwMZG1Uuz3Jn +IcUQJlMFGlJ6D4hfpymy7iChHKqvVtefxO7Ai1txLBIn7pcojN3jGVhQO0ZgCNfM5ZHycTLycSkr +yhtqD4Bmrfw5cuqsm6xHXyp1seRLcHCp4dQy1bOzslj1MzeJ5dVFnuMVdgOiHxOWzrmyMg2Nrbde +k3vR2OTddcd6A5R8GdZqOo67k4wXrLAQPMRKnzImMZEzm+P1nFz6cxQeVujagWR6jsYiqivlH/Ux +1M+7jWY30i7QHx1gF11tjGyxiSfmVc+503pPidVROHYNNY21b/adVZZySo3uOo1qIZQYd9RCzfYm +TUk/qW71LjGkTA+IYiZmM1T9N9j8Gee5+McXJem0/Wp8GUK6KOi7b5MgzFjsxpJHZGDKSCOxE3cD +OvsxbbLc9lsT7Vc73KX4ln3q1ZyVrPx2J/uAjLyan37z7B+Zp4vqPJqKi0K4EvzvUt1qBMdfb+T5 +gycfzkXXuc35InfE6nO8Y9SjFc1Yqh2Hdj2mH/xFxR26XgD/AMRJf45mWMqW5bBD3KqAZlZtb++7 +kEqTsHe//sG1CcTBvy7OWpD+Sewhz8CyKCTYAQPiGV0LVWPdxqQNADQ6zL4nWq2gopU6+ofmA8x3 +1MlvfeIGbnBeCHitRt94IFbRGus2U9H08v13sT+BNHjeX/D4bY4OmP0rPPbHLMWJ2Yy2EDQjVsos +BdeYDx8wo5L5KpSdLWPAE1+G8NrFtBKgOAXPTf6mzViql5ZBoE87eJZkKbOQ8m+Yjf5EBzcO621y +GCqD0H41Obzq7U6vzM577HTXgzPPeOIvM1eB59nD8xXVj7bHTr8iej1MtlauvUMNgzi/V2ctliYy +HYTq37nMExpZXRZYpZVJUdzNjg+FXYwZgdhv6nVVUJU/uH7iNf1CARrtF0IB113M7jTNVjFl2xJA +5ROey88OrVOugOy67TDs+89NRKdSYILdRC8ZQVJ+PHyJs4fqe3EoFPLzBexPxOdusa2xndiWY7JM +qMUNrzOTAfHC9XO9/E3vT9blVJB0o2Zu3MAoYrsL13Ii0Muw3XvJG9KkDOeqjf6gWcw5A33XN9nX +tOeyMRFWy3Jch+bX7mXmCsW/5RBXUoHaOIRi2asAJ0IRbjqzll3o/EAaRiltDojgv2E1aePmhEWq +rsNHZ7wir1K/8Y1vUCSCAd+IXiZ9b1gLYvN07trXTUD4rxN2TkUgEts8p2NDtD0t5MVGchr2Xe99 +hMPNvD1LX5J2TuZhGyYwBijjfiHU5bJXrnYfqBRtRtSbIBWG3+xI6HiLUWz8xA9RuaVNrMAPfB5x +r6v9MLr4S1il7LaxyjY69Jl5eG+Kyhiv1jYIMGYMO8etGscKoJJ8Cbp4bVg4ivaq22t3G/tmRYo5 +zyjQ+JRFFET01GB0Yid9YiYh1l9KgEHqT8Tco/hewA/NzgdQdwTNGNTY3uU2crL9HN00ZlovNzfV +oCanBrBRk1rpCHPUkQjjYoW4GtwAw30MDpuxvbAvpJceR5mXFFEY0W4o4mpg0XNXutQxPUHxLb8q +7mRDyszLr6esz8u++9wL2LcvQb8RXCkhBV3A6mR5rEVSrdFPT8SBLMdsdmWe6P8AUAx+TB4oooxi +i1Jmt0+5dfuOLbANB2H6MjzNzc2zv5ji1g2+5/MYnbb+Yh+T0kubUY940UUbUWtRpJN8w1CfebkK +WfUu+/mDOAGOjsRo0UkIo+pPl6Rckl7ehuR1INGAj9u0kW2nXvK45YlQp1odukaICSAjgSQWf//Z +""" + + +# A config file that tries to override some standard vars: +config_override = """ +[global] + +key0 = var0 +home = /home/sweet/home +key1 = var1 +site_packages = planet +key2 = var2 +key3 = var3 +""" + + +# A config file that tests the automatic type conversion +config_good = """ +[global] + +string = Hello world! +null = None +yes = True +no = False +number = 42 +floating = 3.14 +""" + + +# A default config file to make sure it does not overwrite the explicit one +config_default = """ +[global] + +yes = Hello +not_in_other = foo_bar +""" + + +class test_Env(ClassChecker): + """ + Test the `ipalib.config.Env` class. + """ + + _cls = config.Env + + def test_init(self): + """ + Test the `ipalib.config.Env.__init__` method. + """ + o = self.cls() + assert list(o) == [] + assert len(o) == 0 + assert o.__islocked__() is False + + def test_lock(self): + """ + Test the `ipalib.config.Env.__lock__` method. + """ + o = self.cls() + assert o.__islocked__() is False + o.__lock__() + assert o.__islocked__() is True + e = raises(StandardError, o.__lock__) + assert str(e) == 'Env.__lock__() already called' + + # Also test with base.lock() function: + o = self.cls() + assert o.__islocked__() is False + assert base.lock(o) is o + assert o.__islocked__() is True + e = raises(AssertionError, base.lock, o) + assert str(e) == 'already locked: %r' % o + + def test_islocked(self): + """ + Test the `ipalib.config.Env.__islocked__` method. + """ + o = self.cls() + assert o.__islocked__() is False + assert base.islocked(o) is False + o.__lock__() + assert o.__islocked__() is True + assert base.islocked(o) is True + + def test_setattr(self): + """ + Test the `ipalib.config.Env.__setattr__` method. + """ + o = self.cls() + for (name, raw, value) in good_vars: + # Test setting the value: + setattr(o, name, raw) + result = getattr(o, name) + assert type(result) is type(value) + assert result == value + assert result is o[name] + + # Test that value cannot be overridden once set: + e = raises(AttributeError, setattr, o, name, raw) + assert str(e) == OVERRIDE_ERROR % ('Env', name, value, raw) + + # Test that values cannot be set once locked: + o = self.cls() + o.__lock__() + for (name, raw, value) in good_vars: + e = raises(AttributeError, setattr, o, name, raw) + assert str(e) == SET_ERROR % ('Env', name, raw) + + # Test that name is tested with check_name(): + o = self.cls() + for (name, value) in bad_names: + e = raises(ValueError, setattr, o, name, value) + assert str(e) == NAME_ERROR % (NAME_REGEX, name) + + def test_setitem(self): + """ + Test the `ipalib.config.Env.__setitem__` method. + """ + o = self.cls() + for (key, raw, value) in good_vars: + # Test setting the value: + o[key] = raw + result = o[key] + assert type(result) is type(value) + assert result == value + assert result is getattr(o, key) + + # Test that value cannot be overridden once set: + e = raises(AttributeError, o.__setitem__, key, raw) + assert str(e) == OVERRIDE_ERROR % ('Env', key, value, raw) + + # Test that values cannot be set once locked: + o = self.cls() + o.__lock__() + for (key, raw, value) in good_vars: + e = raises(AttributeError, o.__setitem__, key, raw) + assert str(e) == SET_ERROR % ('Env', key, raw) + + # Test that name is tested with check_name(): + o = self.cls() + for (key, value) in bad_names: + e = raises(ValueError, o.__setitem__, key, value) + assert str(e) == NAME_ERROR % (NAME_REGEX, key) + + def test_getitem(self): + """ + Test the `ipalib.config.Env.__getitem__` method. + """ + o = self.cls() + value = 'some value' + o.key = value + assert o.key is value + assert o['key'] is value + for name in ('one', 'two'): + e = raises(KeyError, getitem, o, name) + assert str(e) == repr(name) + + def test_delattr(self): + """ + Test the `ipalib.config.Env.__delattr__` method. + + This also tests that ``__delitem__`` is not implemented. + """ + o = self.cls() + o.one = 1 + assert o.one == 1 + for key in ('one', 'two'): + e = raises(AttributeError, delattr, o, key) + assert str(e) == DEL_ERROR % ('Env', key) + e = raises(AttributeError, delitem, o, key) + assert str(e) == '__delitem__' + + def test_contains(self): + """ + Test the `ipalib.config.Env.__contains__` method. + """ + o = self.cls() + items = [ + ('one', 1), + ('two', 2), + ('three', 3), + ('four', 4), + ] + for (key, value) in items: + assert key not in o + o[key] = value + assert key in o + + def test_len(self): + """ + Test the `ipalib.config.Env.__len__` method. + """ + o = self.cls() + assert len(o) == 0 + for i in xrange(1, 11): + key = 'key%d' % i + value = 'value %d' % i + o[key] = value + assert o[key] is value + assert len(o) == i + + def test_iter(self): + """ + Test the `ipalib.config.Env.__iter__` method. + """ + o = self.cls() + default_keys = tuple(o) + keys = ('one', 'two', 'three', 'four', 'five') + for key in keys: + o[key] = 'the value' + assert list(o) == sorted(keys + default_keys) + + def test_merge(self): + """ + Test the `ipalib.config.Env._merge` method. + """ + group1 = ( + ('key1', 'value 1'), + ('key2', 'value 2'), + ('key3', 'value 3'), + ('key4', 'value 4'), + ) + group2 = ( + ('key0', 'Value 0'), + ('key2', 'Value 2'), + ('key4', 'Value 4'), + ('key5', 'Value 5'), + ) + o = self.cls() + assert o._merge(**dict(group1)) == (4, 4) + assert len(o) == 4 + assert list(o) == list(key for (key, value) in group1) + for (key, value) in group1: + assert getattr(o, key) is value + assert o[key] is value + assert o._merge(**dict(group2)) == (2, 4) + assert len(o) == 6 + expected = dict(group2) + expected.update(dict(group1)) + assert list(o) == sorted(expected) + assert expected['key2'] == 'value 2' # And not 'Value 2' + for (key, value) in expected.iteritems(): + assert getattr(o, key) is value + assert o[key] is value + assert o._merge(**expected) == (0, 6) + assert len(o) == 6 + assert list(o) == sorted(expected) + + def test_merge_from_file(self): + """ + Test the `ipalib.config.Env._merge_from_file` method. + """ + tmp = TempDir() + assert callable(tmp.join) + + # Test a config file that doesn't exist + no_exist = tmp.join('no_exist.conf') + assert not path.exists(no_exist) + o = self.cls() + o._bootstrap() + keys = tuple(o) + orig = dict((k, o[k]) for k in o) + assert o._merge_from_file(no_exist) is None + assert tuple(o) == keys + + # Test an empty config file + empty = tmp.touch('empty.conf') + assert path.isfile(empty) + assert o._merge_from_file(empty) == (0, 0) + assert tuple(o) == keys + + # Test a mal-formed config file: + bad = tmp.join('bad.conf') + open(bad, 'w').write(config_bad) + assert path.isfile(bad) + assert o._merge_from_file(bad) is None + assert tuple(o) == keys + + # Test a valid config file that tries to override + override = tmp.join('override.conf') + open(override, 'w').write(config_override) + assert path.isfile(override) + assert o._merge_from_file(override) == (4, 6) + for (k, v) in orig.items(): + assert o[k] is v + assert list(o) == sorted(keys + ('key0', 'key1', 'key2', 'key3')) + for i in xrange(4): + assert o['key%d' % i] == ('var%d' % i) + keys = tuple(o) + + # Test a valid config file with type conversion + good = tmp.join('good.conf') + open(good, 'w').write(config_good) + assert path.isfile(good) + assert o._merge_from_file(good) == (6, 6) + added = ('string', 'null', 'yes', 'no', 'number', 'floating') + assert list(o) == sorted(keys + added) + assert o.string == 'Hello world!' + assert o.null is None + assert o.yes is True + assert o.no is False + assert o.number == 42 + assert o.floating == 3.14 + + def new(self): + """ + Set os.environ['HOME'] to a tempdir. + + Returns tuple with new Env instance and the TempHome instance. This + helper method is used in testing the bootstrap related methods below. + """ + home = TempHome() + return (self.cls(), home) + + def bootstrap(self, **overrides): + """ + Helper method used in testing bootstrap related methods below. + """ + (o, home) = self.new() + assert o._isdone('_bootstrap') is False + o._bootstrap(**overrides) + assert o._isdone('_bootstrap') is True + e = raises(StandardError, o._bootstrap) + assert str(e) == 'Env._bootstrap() already called' + return (o, home) + + def test_bootstrap(self): + """ + Test the `ipalib.config.Env._bootstrap` method. + """ + # Test defaults created by _bootstrap(): + (o, home) = self.new() + o._bootstrap() + ipalib = path.dirname(path.abspath(config.__file__)) + assert o.ipalib == ipalib + assert o.site_packages == path.dirname(ipalib) + assert o.script == path.abspath(sys.argv[0]) + assert o.bin == path.dirname(path.abspath(sys.argv[0])) + assert o.home == home.path + assert o.dot_ipa == home.join('.ipa') + assert o.in_tree is False + assert o.context == 'default' + assert o.conf == '/etc/ipa/default.conf' + assert o.conf_default == o.conf + + # Test overriding values created by _bootstrap() + (o, home) = self.bootstrap(in_tree='True', context='server') + assert o.in_tree is True + assert o.context == 'server' + assert o.conf == home.join('.ipa', 'server.conf') + (o, home) = self.bootstrap(conf='/my/wacky/whatever.conf') + assert o.in_tree is False + assert o.context == 'default' + assert o.conf == '/my/wacky/whatever.conf' + assert o.conf_default == '/etc/ipa/default.conf' + (o, home) = self.bootstrap(conf_default='/my/wacky/default.conf') + assert o.in_tree is False + assert o.context == 'default' + assert o.conf == '/etc/ipa/default.conf' + assert o.conf_default == '/my/wacky/default.conf' + + # Test various overrides and types conversion + kw = dict( + yes=True, + no=False, + num=42, + msg='Hello, world!', + ) + override = dict( + (k, u' %s ' % v) for (k, v) in kw.items() + ) + (o, home) = self.new() + for key in kw: + assert key not in o + o._bootstrap(**override) + for (key, value) in kw.items(): + assert getattr(o, key) == value + assert o[key] == value + + def finalize_core(self, **defaults): + """ + Helper method used in testing `Env._finalize_core`. + """ + (o, home) = self.new() + assert o._isdone('_finalize_core') is False + o._finalize_core(**defaults) + assert o._isdone('_finalize_core') is True + e = raises(StandardError, o._finalize_core) + assert str(e) == 'Env._finalize_core() already called' + return (o, home) + + def test_finalize_core(self): + """ + Test the `ipalib.config.Env._finalize_core` method. + """ + # Check that calls cascade up the chain: + (o, home) = self.new() + assert o._isdone('_bootstrap') is False + assert o._isdone('_finalize_core') is False + assert o._isdone('_finalize') is False + o._finalize_core() + assert o._isdone('_bootstrap') is True + assert o._isdone('_finalize_core') is True + assert o._isdone('_finalize') is False + + # Check that it can't be called twice: + e = raises(StandardError, o._finalize_core) + assert str(e) == 'Env._finalize_core() already called' + + # Check that _bootstrap() did its job: + (o, home) = self.bootstrap() + assert 'in_tree' in o + assert 'conf' in o + assert 'context' in o + + # Check that keys _finalize_core() will set are not set yet: + assert 'log' not in o + assert 'in_server' not in o + + # Check that _finalize_core() did its job: + o._finalize_core() + assert 'in_server' in o + assert 'log' in o + assert o.in_tree is False + assert o.context == 'default' + assert o.in_server is False + assert o.log == '/var/log/ipa/default.log' + + # Check log is in ~/.ipa/log when context='cli' + (o, home) = self.bootstrap(context='cli') + o._finalize_core() + assert o.in_tree is False + assert o.log == home.join('.ipa', 'log', 'cli.log') + + # Check **defaults can't set in_server nor log: + (o, home) = self.bootstrap(in_server='True') + o._finalize_core(in_server=False) + assert o.in_server is True + (o, home) = self.bootstrap(log='/some/silly/log') + o._finalize_core(log='/a/different/log') + assert o.log == '/some/silly/log' + + # Test loading config file, plus test some in-tree stuff + (o, home) = self.bootstrap(in_tree=True, context='server') + for key in ('yes', 'no', 'number'): + assert key not in o + home.write(config_good, '.ipa', 'server.conf') + home.write(config_default, '.ipa', 'default.conf') + o._finalize_core() + assert o.in_tree is True + assert o.context == 'server' + assert o.in_server is True + assert o.log == home.join('.ipa', 'log', 'server.log') + assert o.yes is True + assert o.no is False + assert o.number == 42 + assert o.not_in_other == 'foo_bar' + + # Test using DEFAULT_CONFIG: + defaults = dict(constants.DEFAULT_CONFIG) + (o, home) = self.finalize_core(**defaults) + assert list(o) == sorted(defaults) + for (key, value) in defaults.items(): + if value is object: + continue + assert o[key] is value, value + + def test_finalize(self): + """ + Test the `ipalib.config.Env._finalize` method. + """ + # Check that calls cascade up the chain: + (o, home) = self.new() + assert o._isdone('_bootstrap') is False + assert o._isdone('_finalize_core') is False + assert o._isdone('_finalize') is False + o._finalize() + assert o._isdone('_bootstrap') is True + assert o._isdone('_finalize_core') is True + assert o._isdone('_finalize') is True + + # Check that it can't be called twice: + e = raises(StandardError, o._finalize) + assert str(e) == 'Env._finalize() already called' + + # Check that _finalize() calls __lock__() + (o, home) = self.new() + assert o.__islocked__() is False + o._finalize() + assert o.__islocked__() is True + e = raises(StandardError, o.__lock__) + assert str(e) == 'Env.__lock__() already called' + + # Check that **lastchance works + (o, home) = self.finalize_core() + key = 'just_one_more_key' + value = 'with one more value' + lastchance = {key: value} + assert key not in o + assert o._isdone('_finalize') is False + o._finalize(**lastchance) + assert key in o + assert o[key] is value diff --git a/tests/test_ipalib/test_crud.py b/tests/test_ipalib/test_crud.py new file mode 100644 index 00000000..ad391e2e --- /dev/null +++ b/tests/test_ipalib/test_crud.py @@ -0,0 +1,237 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.crud` module. +""" + +from tests.util import read_only, raises, get_api, ClassChecker +from ipalib import crud, frontend, plugable, config + + +class CrudChecker(ClassChecker): + """ + Class for testing base classes in `ipalib.crud`. + """ + + def get_api(self, args=tuple(), options={}): + """ + Return a finalized `ipalib.plugable.API` instance. + """ + assert self.cls.__bases__ == (frontend.Method,) + (api, home) = get_api() + class user(frontend.Object): + takes_params = ( + 'givenname', + 'sn', + frontend.Param('uid', primary_key=True), + 'initials', + ) + class user_verb(self.cls): + takes_args = args + takes_options = options + api.register(user) + api.register(user_verb) + api.finalize() + return api + + +class test_Add(CrudChecker): + """ + Test the `ipalib.crud.Add` class. + """ + + _cls = crud.Add + + def test_get_args(self): + """ + Test the `ipalib.crud.Add.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + api = self.get_api(args=('extra?',)) + assert list(api.Method.user_verb.args) == ['uid', 'extra'] + assert api.Method.user_verb.args.uid.required is True + assert api.Method.user_verb.args.extra.required is False + + def test_get_options(self): + """ + Test the `ipalib.crud.Add.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == \ + ['givenname', 'sn', 'initials'] + for param in api.Method.user_verb.options(): + assert param.required is True + api = self.get_api(options=('extra?',)) + assert list(api.Method.user_verb.options) == \ + ['givenname', 'sn', 'initials', 'extra'] + assert api.Method.user_verb.options.extra.required is False + + +class test_Get(CrudChecker): + """ + Test the `ipalib.crud.Get` class. + """ + + _cls = crud.Get + + def test_get_args(self): + """ + Test the `ipalib.crud.Get.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Get.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == [] + assert len(api.Method.user_verb.options) == 0 + + +class test_Del(CrudChecker): + """ + Test the `ipalib.crud.Del` class. + """ + + _cls = crud.Del + + def test_get_args(self): + """ + Test the `ipalib.crud.Del.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Del.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == [] + assert len(api.Method.user_verb.options) == 0 + + +class test_Mod(CrudChecker): + """ + Test the `ipalib.crud.Mod` class. + """ + + _cls = crud.Mod + + def test_get_args(self): + """ + Test the `ipalib.crud.Mod.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Mod.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == \ + ['givenname', 'sn', 'initials'] + for param in api.Method.user_verb.options(): + assert param.required is False + + +class test_Find(CrudChecker): + """ + Test the `ipalib.crud.Find` class. + """ + + _cls = crud.Find + + def test_get_args(self): + """ + Test the `ipalib.crud.Find.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Find.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == \ + ['givenname', 'sn', 'initials'] + for param in api.Method.user_verb.options(): + assert param.required is False + + +class test_CrudBackend(ClassChecker): + """ + Test the `ipalib.crud.CrudBackend` class. + """ + + _cls = crud.CrudBackend + + def get_subcls(self): + class ldap(self.cls): + pass + return ldap + + def check_method(self, name, *args): + o = self.cls() + e = raises(NotImplementedError, getattr(o, name), *args) + assert str(e) == 'CrudBackend.%s()' % name + sub = self.subcls() + e = raises(NotImplementedError, getattr(sub, name), *args) + assert str(e) == 'ldap.%s()' % name + + def test_create(self): + """ + Test the `ipalib.crud.CrudBackend.create` method. + """ + self.check_method('create') + + def test_retrieve(self): + """ + Test the `ipalib.crud.CrudBackend.retrieve` method. + """ + self.check_method('retrieve', 'primary key', 'attribute') + + def test_update(self): + """ + Test the `ipalib.crud.CrudBackend.update` method. + """ + self.check_method('update', 'primary key') + + def test_delete(self): + """ + Test the `ipalib.crud.CrudBackend.delete` method. + """ + self.check_method('delete', 'primary key') + + def test_search(self): + """ + Test the `ipalib.crud.CrudBackend.search` method. + """ + self.check_method('search') diff --git a/tests/test_ipalib/test_error2.py b/tests/test_ipalib/test_error2.py new file mode 100644 index 00000000..cd13ba77 --- /dev/null +++ b/tests/test_ipalib/test_error2.py @@ -0,0 +1,371 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.error2` module. +""" + +import re +import inspect +from tests.util import assert_equal, raises, dummy_ugettext +from ipalib import errors2, request +from ipalib.constants import TYPE_ERROR + + +class PrivateExceptionTester(object): + _klass = None + __klass = None + + def __get_klass(self): + if self.__klass is None: + self.__klass = self._klass + assert issubclass(self.__klass, StandardError) + assert issubclass(self.__klass, errors2.PrivateError) + assert not issubclass(self.__klass, errors2.PublicError) + return self.__klass + klass = property(__get_klass) + + def new(self, **kw): + for (key, value) in kw.iteritems(): + assert not hasattr(self.klass, key), key + inst = self.klass(**kw) + assert isinstance(inst, StandardError) + assert isinstance(inst, errors2.PrivateError) + assert isinstance(inst, self.klass) + assert not isinstance(inst, errors2.PublicError) + for (key, value) in kw.iteritems(): + assert getattr(inst, key) is value + assert str(inst) == self.klass.format % kw + assert inst.message == str(inst) + return inst + + +class test_PrivateError(PrivateExceptionTester): + """ + Test the `ipalib.errors2.PrivateError` exception. + """ + _klass = errors2.PrivateError + + def test_init(self): + """ + Test the `ipalib.errors2.PrivateError.__init__` method. + """ + inst = self.klass(key1='Value 1', key2='Value 2') + assert inst.key1 == 'Value 1' + assert inst.key2 == 'Value 2' + assert str(inst) == '' + + # Test subclass and use of format: + class subclass(self.klass): + format = '%(true)r %(text)r %(number)r' + + kw = dict(true=True, text='Hello!', number=18) + inst = subclass(**kw) + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + assert str(inst) == subclass.format % kw + + # Test via PrivateExceptionTester.new() + inst = self.new(**kw) + assert isinstance(inst, self.klass) + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + + +class test_SubprocessError(PrivateExceptionTester): + """ + Test the `ipalib.errors2.SubprocessError` exception. + """ + + _klass = errors2.SubprocessError + + def test_init(self): + """ + Test the `ipalib.errors2.SubprocessError.__init__` method. + """ + inst = self.new(returncode=1, argv=('/bin/false',)) + assert inst.returncode == 1 + assert inst.argv == ('/bin/false',) + assert str(inst) == "return code 1 from ('/bin/false',)" + assert inst.message == str(inst) + + +class test_PluginSubclassError(PrivateExceptionTester): + """ + Test the `ipalib.errors2.PluginSubclassError` exception. + """ + + _klass = errors2.PluginSubclassError + + def test_init(self): + """ + Test the `ipalib.errors2.PluginSubclassError.__init__` method. + """ + inst = self.new(plugin='bad', bases=('base1', 'base2')) + assert inst.plugin == 'bad' + assert inst.bases == ('base1', 'base2') + assert str(inst) == \ + "'bad' not subclass of any base in ('base1', 'base2')" + assert inst.message == str(inst) + + +class test_PluginDuplicateError(PrivateExceptionTester): + """ + Test the `ipalib.errors2.PluginDuplicateError` exception. + """ + + _klass = errors2.PluginDuplicateError + + def test_init(self): + """ + Test the `ipalib.errors2.PluginDuplicateError.__init__` method. + """ + inst = self.new(plugin='my_plugin') + assert inst.plugin == 'my_plugin' + assert str(inst) == "'my_plugin' was already registered" + assert inst.message == str(inst) + + +class test_PluginOverrideError(PrivateExceptionTester): + """ + Test the `ipalib.errors2.PluginOverrideError` exception. + """ + + _klass = errors2.PluginOverrideError + + def test_init(self): + """ + Test the `ipalib.errors2.PluginOverrideError.__init__` method. + """ + inst = self.new(base='Base', name='cmd', plugin='my_cmd') + assert inst.base == 'Base' + assert inst.name == 'cmd' + assert inst.plugin == 'my_cmd' + assert str(inst) == "unexpected override of Base.cmd with 'my_cmd'" + assert inst.message == str(inst) + + +class test_PluginMissingOverrideError(PrivateExceptionTester): + """ + Test the `ipalib.errors2.PluginMissingOverrideError` exception. + """ + + _klass = errors2.PluginMissingOverrideError + + def test_init(self): + """ + Test the `ipalib.errors2.PluginMissingOverrideError.__init__` method. + """ + inst = self.new(base='Base', name='cmd', plugin='my_cmd') + assert inst.base == 'Base' + assert inst.name == 'cmd' + assert inst.plugin == 'my_cmd' + assert str(inst) == "Base.cmd not registered, cannot override with 'my_cmd'" + assert inst.message == str(inst) + + + +############################################################################## +# Unit tests for public errors: + +class PublicExceptionTester(object): + _klass = None + __klass = None + + def __get_klass(self): + if self.__klass is None: + self.__klass = self._klass + assert issubclass(self.__klass, StandardError) + assert issubclass(self.__klass, errors2.PublicError) + assert not issubclass(self.__klass, errors2.PrivateError) + assert type(self.__klass.errno) is int + assert 900 <= self.__klass.errno <= 5999 + return self.__klass + klass = property(__get_klass) + + def new(self, format=None, message=None, **kw): + # Test that TypeError is raised if message isn't unicode: + e = raises(TypeError, self.klass, message='The message') + assert str(e) == TYPE_ERROR % ('message', unicode, 'The message', str) + + # Test the instance: + for (key, value) in kw.iteritems(): + assert not hasattr(self.klass, key), key + inst = self.klass(format=format, message=message, **kw) + assert isinstance(inst, StandardError) + assert isinstance(inst, errors2.PublicError) + assert isinstance(inst, self.klass) + assert not isinstance(inst, errors2.PrivateError) + for (key, value) in kw.iteritems(): + assert getattr(inst, key) is value + return inst + + +class test_PublicError(PublicExceptionTester): + """ + Test the `ipalib.errors2.PublicError` exception. + """ + _klass = errors2.PublicError + + def test_init(self): + """ + Test the `ipalib.errors2.PublicError.__init__` method. + """ + context = request.context + message = u'The translated, interpolated message' + format = 'key=%(key1)r and key2=%(key2)r' + uformat = u'Translated key=%(key1)r and key2=%(key2)r' + val1 = 'Value 1' + val2 = 'Value 2' + kw = dict(key1=val1, key2=val2) + + assert not hasattr(context, 'ugettext') + + # Test with format=str, message=None + dummy = dummy_ugettext(uformat) + context.ugettext = dummy + inst = self.klass(format, **kw) + assert dummy.message is format # Means ugettext() called + assert inst.format is format + assert_equal(inst.message, format % kw) + assert_equal(inst.strerror, uformat % kw) + assert inst.forwarded is False + assert inst.key1 is val1 + assert inst.key2 is val2 + + # Test with format=None, message=unicode + dummy = dummy_ugettext(uformat) + context.ugettext = dummy + inst = self.klass(message=message, **kw) + assert not hasattr(dummy, 'message') # Means ugettext() not called + assert inst.format is None + assert inst.message is message + assert inst.strerror is message + assert inst.forwarded is True + assert inst.key1 is val1 + assert inst.key2 is val2 + + # Test with format=None, message=str + e = raises(TypeError, self.klass, message='the message', **kw) + assert str(e) == TYPE_ERROR % ('message', unicode, 'the message', str) + + # Test with format=None, message=None + e = raises(ValueError, self.klass, **kw) + assert str(e) == \ + 'PublicError.format is None yet format=None, message=None' + + + ###################################### + # Test via PublicExceptionTester.new() + + # Test with format=str, message=None + dummy = dummy_ugettext(uformat) + context.ugettext = dummy + inst = self.new(format, **kw) + assert isinstance(inst, self.klass) + assert dummy.message is format # Means ugettext() called + assert inst.format is format + assert_equal(inst.message, format % kw) + assert_equal(inst.strerror, uformat % kw) + assert inst.forwarded is False + assert inst.key1 is val1 + assert inst.key2 is val2 + + # Test with format=None, message=unicode + dummy = dummy_ugettext(uformat) + context.ugettext = dummy + inst = self.new(message=message, **kw) + assert isinstance(inst, self.klass) + assert not hasattr(dummy, 'message') # Means ugettext() not called + assert inst.format is None + assert inst.message is message + assert inst.strerror is message + assert inst.forwarded is True + assert inst.key1 is val1 + assert inst.key2 is val2 + + + ################## + # Test a subclass: + class subclass(self.klass): + format = '%(true)r %(text)r %(number)r' + + uformat = u'Translated %(true)r %(text)r %(number)r' + kw = dict(true=True, text='Hello!', number=18) + + dummy = dummy_ugettext(uformat) + context.ugettext = dummy + + # Test with format=str, message=None + e = raises(ValueError, subclass, format, **kw) + assert str(e) == 'non-generic %r needs format=None; got format=%r' % ( + 'subclass', format) + + # Test with format=None, message=None: + inst = subclass(**kw) + assert dummy.message is subclass.format # Means ugettext() called + assert inst.format is subclass.format + assert_equal(inst.message, subclass.format % kw) + assert_equal(inst.strerror, uformat % kw) + assert inst.forwarded is False + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + + # Test with format=None, message=unicode: + dummy = dummy_ugettext(uformat) + context.ugettext = dummy + inst = subclass(message=message, **kw) + assert not hasattr(dummy, 'message') # Means ugettext() not called + assert inst.format is subclass.format + assert inst.message is message + assert inst.strerror is message + assert inst.forwarded is True + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + del context.ugettext + + +def test_public_errors(): + """ + Test the `ipalib.errors2.public_errors` module variable. + """ + i = 0 + for klass in errors2.public_errors: + assert issubclass(klass, StandardError) + assert issubclass(klass, errors2.PublicError) + assert not issubclass(klass, errors2.PrivateError) + assert type(klass.errno) is int + assert 900 <= klass.errno <= 5999 + doc = inspect.getdoc(klass) + assert doc is not None, 'need class docstring for %s' % klass.__name__ + m = re.match(r'^\*{2}(\d+)\*{2} ', doc) + assert m is not None, "need '**ERRNO**' in %s docstring" % klass.__name__ + errno = int(m.group(1)) + assert errno == klass.errno, ( + 'docstring=%r but errno=%r in %s' % (errno, klass.errno, klass.__name__) + ) + + # Test format + if klass.format is not None: + assert klass.format is errors2.__messages[i] + i += 1 diff --git a/tests/test_ipalib/test_errors.py b/tests/test_ipalib/test_errors.py new file mode 100644 index 00000000..f1dd5dc8 --- /dev/null +++ b/tests/test_ipalib/test_errors.py @@ -0,0 +1,289 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.errors` module. +""" + +from tests.util import raises, ClassChecker +from ipalib import errors + + +type_format = '%s: need a %r; got %r' + + +def check_TypeError(f, value, type_, name, **kw): + e = raises(TypeError, f, value, type_, name, **kw) + assert e.value is value + assert e.type is type_ + assert e.name is name + assert str(e) == type_format % (name, type_, value) + + +def test_raise_TypeError(): + """ + Test the `ipalib.errors.raise_TypeError` function. + """ + f = errors.raise_TypeError + value = 'Hello.' + type_ = unicode + name = 'message' + + check_TypeError(f, value, type_, name) + + # name not an str + fail_name = 42 + e = raises(AssertionError, f, value, type_, fail_name) + assert str(e) == type_format % ('name', str, fail_name), str(e) + + # type_ not a type: + fail_type = unicode() + e = raises(AssertionError, f, value, fail_type, name) + assert str(e) == type_format % ('type_', type, fail_type) + + # type(value) is type_: + fail_value = u'How are you?' + e = raises(AssertionError, f, fail_value, type_, name) + assert str(e) == 'value: %r is a %r' % (fail_value, type_) + + +def test_check_type(): + """ + Test the `ipalib.errors.check_type` function. + """ + f = errors.check_type + value = 'How are you?' + type_ = str + name = 'greeting' + + # Should pass: + assert value is f(value, type_, name) + assert None is f(None, type_, name, allow_none=True) + + # Should raise TypeError + check_TypeError(f, None, type_, name) + check_TypeError(f, value, basestring, name) + check_TypeError(f, value, unicode, name) + + # name not an str + fail_name = unicode(name) + e = raises(AssertionError, f, value, type_, fail_name) + assert str(e) == type_format % ('name', str, fail_name) + + # type_ not a type: + fail_type = 42 + e = raises(AssertionError, f, value, fail_type, name) + assert str(e) == type_format % ('type_', type, fail_type) + + # allow_none not a bool: + fail_bool = 0 + e = raises(AssertionError, f, value, type_, name, allow_none=fail_bool) + assert str(e) == type_format % ('allow_none', bool, fail_bool) + + +def test_check_isinstance(): + """ + Test the `ipalib.errors.check_isinstance` function. + """ + f = errors.check_isinstance + value = 'How are you?' + type_ = str + name = 'greeting' + + # Should pass: + assert value is f(value, type_, name) + assert value is f(value, basestring, name) + assert None is f(None, type_, name, allow_none=True) + + # Should raise TypeError + check_TypeError(f, None, type_, name) + check_TypeError(f, value, unicode, name) + + # name not an str + fail_name = unicode(name) + e = raises(AssertionError, f, value, type_, fail_name) + assert str(e) == type_format % ('name', str, fail_name) + + # type_ not a type: + fail_type = 42 + e = raises(AssertionError, f, value, fail_type, name) + assert str(e) == type_format % ('type_', type, fail_type) + + # allow_none not a bool: + fail_bool = 0 + e = raises(AssertionError, f, value, type_, name, allow_none=fail_bool) + assert str(e) == type_format % ('allow_none', bool, fail_bool) + + +class test_IPAError(ClassChecker): + """ + Test the `ipalib.errors.IPAError` exception. + """ + _cls = errors.IPAError + + def test_class(self): + """ + Test the `ipalib.errors.IPAError` exception. + """ + assert self.cls.__bases__ == (StandardError,) + + def test_init(self): + """ + Test the `ipalib.errors.IPAError.__init__` method. + """ + args = ('one fish', 'two fish') + e = self.cls(*args) + assert e.args == args + assert self.cls().args == tuple() + + def test_str(self): + """ + Test the `ipalib.errors.IPAError.__str__` method. + """ + f = 'The %s color is %s.' + class custom_error(self.cls): + format = f + for args in [('sexiest', 'red'), ('most-batman-like', 'black')]: + e = custom_error(*args) + assert e.args == args + assert str(e) == f % args + + +class test_ValidationError(ClassChecker): + """ + Test the `ipalib.errors.ValidationError` exception. + """ + _cls = errors.ValidationError + + def test_class(self): + """ + Test the `ipalib.errors.ValidationError` exception. + """ + assert self.cls.__bases__ == (errors.IPAError,) + + def test_init(self): + """ + Test the `ipalib.errors.ValidationError.__init__` method. + """ + name = 'login' + value = 'Whatever' + error = 'Must be lowercase.' + for index in (None, 3): + e = self.cls(name, value, error, index=index) + assert e.name is name + assert e.value is value + assert e.error is error + assert e.index is index + assert str(e) == 'invalid %r value %r: %s' % (name, value, error) + # Check that index default is None: + assert self.cls(name, value, error).index is None + # Check non str name raises AssertionError: + raises(AssertionError, self.cls, unicode(name), value, error) + # Check non int index raises AssertionError: + raises(AssertionError, self.cls, name, value, error, index=5.0) + # Check negative index raises AssertionError: + raises(AssertionError, self.cls, name, value, error, index=-2) + + +class test_ConversionError(ClassChecker): + """ + Test the `ipalib.errors.ConversionError` exception. + """ + _cls = errors.ConversionError + + def test_class(self): + """ + Test the `ipalib.errors.ConversionError` exception. + """ + assert self.cls.__bases__ == (errors.ValidationError,) + + def test_init(self): + """ + Test the `ipalib.errors.ConversionError.__init__` method. + """ + name = 'some_arg' + value = '42.0' + class type_(object): + conversion_error = 'Not an integer' + for index in (None, 7): + e = self.cls(name, value, type_, index=index) + assert e.name is name + assert e.value is value + assert e.type is type_ + assert e.error is type_.conversion_error + assert e.index is index + assert str(e) == 'invalid %r value %r: %s' % (name, value, + type_.conversion_error) + # Check that index default is None: + assert self.cls(name, value, type_).index is None + + +class test_RuleError(ClassChecker): + """ + Test the `ipalib.errors.RuleError` exception. + """ + _cls = errors.RuleError + + def test_class(self): + """ + Test the `ipalib.errors.RuleError` exception. + """ + assert self.cls.__bases__ == (errors.ValidationError,) + + def test_init(self): + """ + Test the `ipalib.errors.RuleError.__init__` method. + """ + name = 'whatever' + value = 'The smallest weird number.' + def my_rule(value): + return 'Value is bad.' + error = my_rule(value) + for index in (None, 42): + e = self.cls(name, value, error, my_rule, index=index) + assert e.name is name + assert e.value is value + assert e.error is error + assert e.rule is my_rule + # Check that index default is None: + assert self.cls(name, value, error, my_rule).index is None + + +class test_RequirementError(ClassChecker): + """ + Test the `ipalib.errors.RequirementError` exception. + """ + _cls = errors.RequirementError + + def test_class(self): + """ + Test the `ipalib.errors.RequirementError` exception. + """ + assert self.cls.__bases__ == (errors.ValidationError,) + + def test_init(self): + """ + Test the `ipalib.errors.RequirementError.__init__` method. + """ + name = 'givenname' + e = self.cls(name) + assert e.name is name + assert e.value is None + assert e.error == 'Required' + assert e.index is None diff --git a/tests/test_ipalib/test_frontend.py b/tests/test_ipalib/test_frontend.py new file mode 100644 index 00000000..071a70fd --- /dev/null +++ b/tests/test_ipalib/test_frontend.py @@ -0,0 +1,771 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.frontend` module. +""" + +from tests.util import raises, getitem, no_set, no_del, read_only +from tests.util import check_TypeError, ClassChecker, create_test_api +from tests.util import assert_equal +from ipalib.constants import TYPE_ERROR +from ipalib import frontend, backend, plugable, errors2, errors, parameters, config + + +def test_RULE_FLAG(): + assert frontend.RULE_FLAG == 'validation_rule' + + +def test_rule(): + """ + Test the `ipalib.frontend.rule` function. + """ + flag = frontend.RULE_FLAG + rule = frontend.rule + def my_func(): + pass + assert not hasattr(my_func, flag) + rule(my_func) + assert getattr(my_func, flag) is True + @rule + def my_func2(): + pass + assert getattr(my_func2, flag) is True + + +def test_is_rule(): + """ + Test the `ipalib.frontend.is_rule` function. + """ + is_rule = frontend.is_rule + flag = frontend.RULE_FLAG + + class no_call(object): + def __init__(self, value): + if value is not None: + assert value in (True, False) + setattr(self, flag, value) + + class call(no_call): + def __call__(self): + pass + + assert is_rule(call(True)) + assert not is_rule(no_call(True)) + assert not is_rule(call(False)) + assert not is_rule(call(None)) + + +class test_Command(ClassChecker): + """ + Test the `ipalib.frontend.Command` class. + """ + + _cls = frontend.Command + + def get_subcls(self): + """ + Return a standard subclass of `ipalib.frontend.Command`. + """ + class Rule(object): + def __init__(self, name): + self.name = name + + def __call__(self, _, value): + if value != self.name: + return _('must equal %r') % self.name + + default_from = parameters.DefaultFrom( + lambda arg: arg, + 'default_from' + ) + normalizer = lambda value: value.lower() + + class example(self.cls): + takes_options = ( + parameters.Str('option0', Rule('option0'), + normalizer=normalizer, + default_from=default_from, + ), + parameters.Str('option1', Rule('option1'), + normalizer=normalizer, + default_from=default_from, + ), + ) + return example + + def get_instance(self, args=tuple(), options=tuple()): + """ + Helper method used to test args and options. + """ + class example(self.cls): + takes_args = args + takes_options = options + o = example() + o.finalize() + return o + + def test_class(self): + """ + Test the `ipalib.frontend.Command` class. + """ + assert self.cls.__bases__ == (plugable.Plugin,) + assert self.cls.takes_options == tuple() + assert self.cls.takes_args == tuple() + + def test_get_args(self): + """ + Test the `ipalib.frontend.Command.get_args` method. + """ + assert list(self.cls().get_args()) == [] + args = ('login', 'stuff') + o = self.get_instance(args=args) + assert o.get_args() is args + + def test_get_options(self): + """ + Test the `ipalib.frontend.Command.get_options` method. + """ + assert list(self.cls().get_options()) == [] + options = ('verbose', 'debug') + o = self.get_instance(options=options) + assert o.get_options() is options + + def test_args(self): + """ + Test the ``ipalib.frontend.Command.args`` instance attribute. + """ + assert 'args' in self.cls.__public__ # Public + assert self.cls().args is None + o = self.cls() + o.finalize() + assert type(o.args) is plugable.NameSpace + assert len(o.args) == 0 + args = ('destination', 'source?') + ns = self.get_instance(args=args).args + assert type(ns) is plugable.NameSpace + assert len(ns) == len(args) + assert list(ns) == ['destination', 'source'] + assert type(ns.destination) is parameters.Str + assert type(ns.source) is parameters.Str + assert ns.destination.required is True + assert ns.destination.multivalue is False + assert ns.source.required is False + assert ns.source.multivalue is False + + # Test TypeError: + e = raises(TypeError, self.get_instance, args=(u'whatever',)) + assert str(e) == TYPE_ERROR % ( + 'spec', (str, parameters.Param), u'whatever', unicode) + + # Test ValueError, required after optional: + e = raises(ValueError, self.get_instance, args=('arg1?', 'arg2')) + assert str(e) == 'arg2: required argument after optional' + + # Test ValueError, scalar after multivalue: + e = raises(ValueError, self.get_instance, args=('arg1+', 'arg2')) + assert str(e) == 'arg2: only final argument can be multivalue' + + def test_max_args(self): + """ + Test the ``ipalib.frontend.Command.max_args`` instance attribute. + """ + o = self.get_instance() + assert o.max_args == 0 + o = self.get_instance(args=('one?',)) + assert o.max_args == 1 + o = self.get_instance(args=('one', 'two?')) + assert o.max_args == 2 + o = self.get_instance(args=('one', 'multi+',)) + assert o.max_args is None + o = self.get_instance(args=('one', 'multi*',)) + assert o.max_args is None + + def test_options(self): + """ + Test the ``ipalib.frontend.Command.options`` instance attribute. + """ + assert 'options' in self.cls.__public__ # Public + assert self.cls().options is None + o = self.cls() + o.finalize() + assert type(o.options) is plugable.NameSpace + assert len(o.options) == 0 + options = ('target', 'files*') + ns = self.get_instance(options=options).options + assert type(ns) is plugable.NameSpace + assert len(ns) == len(options) + assert list(ns) == ['target', 'files'] + assert type(ns.target) is parameters.Str + assert type(ns.files) is parameters.Str + assert ns.target.required is True + assert ns.target.multivalue is False + assert ns.files.required is False + assert ns.files.multivalue is True + + def test_convert(self): + """ + Test the `ipalib.frontend.Command.convert` method. + """ + assert 'convert' in self.cls.__public__ # Public + kw = dict( + option0=u'1.5', + option1=u'7', + ) + o = self.subcls() + o.finalize() + for (key, value) in o.convert(**kw).iteritems(): + assert_equal(unicode(kw[key]), value) + + def test_normalize(self): + """ + Test the `ipalib.frontend.Command.normalize` method. + """ + assert 'normalize' in self.cls.__public__ # Public + kw = dict( + option0=u'OPTION0', + option1=u'OPTION1', + ) + norm = dict((k, v.lower()) for (k, v) in kw.items()) + sub = self.subcls() + sub.finalize() + assert sub.normalize(**kw) == norm + + def test_get_default(self): + """ + Test the `ipalib.frontend.Command.get_default` method. + """ + assert 'get_default' in self.cls.__public__ # Public + # FIXME: Add an updated unit tests for get_default() + + def test_validate(self): + """ + Test the `ipalib.frontend.Command.validate` method. + """ + assert 'validate' in self.cls.__public__ # Public + + sub = self.subcls() + sub.finalize() + + # Check with valid values + okay = dict( + option0=u'option0', + option1=u'option1', + another_option='some value', + ) + sub.validate(**okay) + + # Check with an invalid value + fail = dict(okay) + fail['option0'] = u'whatever' + e = raises(errors2.ValidationError, sub.validate, **fail) + assert_equal(e.name, 'option0') + assert_equal(e.value, u'whatever') + assert_equal(e.error, u"must equal 'option0'") + assert e.rule.__class__.__name__ == 'Rule' + assert e.index is None + + # Check with a missing required arg + fail = dict(okay) + fail.pop('option1') + e = raises(errors.RequirementError, sub.validate, **fail) + assert e.name == 'option1' + assert e.value is None + assert e.index is None + + def test_execute(self): + """ + Test the `ipalib.frontend.Command.execute` method. + """ + assert 'execute' in self.cls.__public__ # Public + o = self.cls() + e = raises(NotImplementedError, o.execute) + assert str(e) == 'Command.execute()' + + def test_args_to_kw(self): + """ + Test the `ipalib.frontend.Command.args_to_kw` method. + """ + assert 'args_to_kw' in self.cls.__public__ # Public + o = self.get_instance(args=('one', 'two?')) + assert o.args_to_kw(1) == dict(one=1) + assert o.args_to_kw(1, 2) == dict(one=1, two=2) + + o = self.get_instance(args=('one', 'two*')) + assert o.args_to_kw(1) == dict(one=1) + assert o.args_to_kw(1, 2) == dict(one=1, two=(2,)) + assert o.args_to_kw(1, 2, 3) == dict(one=1, two=(2, 3)) + + o = self.get_instance(args=('one', 'two+')) + assert o.args_to_kw(1) == dict(one=1) + assert o.args_to_kw(1, 2) == dict(one=1, two=(2,)) + assert o.args_to_kw(1, 2, 3) == dict(one=1, two=(2, 3)) + + o = self.get_instance() + e = raises(errors.ArgumentError, o.args_to_kw, 1) + assert str(e) == 'example takes no arguments' + + o = self.get_instance(args=('one?',)) + e = raises(errors.ArgumentError, o.args_to_kw, 1, 2) + assert str(e) == 'example takes at most 1 argument' + + o = self.get_instance(args=('one', 'two?')) + e = raises(errors.ArgumentError, o.args_to_kw, 1, 2, 3) + assert str(e) == 'example takes at most 2 arguments' + + def test_params_2_args_options(self): + """ + Test the `ipalib.frontend.Command.params_2_args_options` method. + """ + assert 'params_2_args_options' in self.cls.__public__ # Public + o = self.get_instance(args=['one'], options=['two']) + assert o.params_2_args_options({}) == ((None,), dict(two=None)) + assert o.params_2_args_options(dict(one=1)) == ((1,), dict(two=None)) + assert o.params_2_args_options(dict(two=2)) == ((None,), dict(two=2)) + assert o.params_2_args_options(dict(two=2, one=1)) == \ + ((1,), dict(two=2)) + + def test_run(self): + """ + Test the `ipalib.frontend.Command.run` method. + """ + class my_cmd(self.cls): + def execute(self, *args, **kw): + return ('execute', args, kw) + + def forward(self, *args, **kw): + return ('forward', args, kw) + + args = ('Hello,', 'world,') + kw = dict(how_are='you', on_this='fine day?') + + # Test in server context: + (api, home) = create_test_api(in_server=True) + api.finalize() + o = my_cmd() + o.set_api(api) + assert o.run.im_func is self.cls.run.im_func + assert ('execute', args, kw) == o.run(*args, **kw) + assert o.run.im_func is my_cmd.execute.im_func + + # Test in non-server context + (api, home) = create_test_api(in_server=False) + api.finalize() + o = my_cmd() + o.set_api(api) + assert o.run.im_func is self.cls.run.im_func + assert ('forward', args, kw) == o.run(*args, **kw) + assert o.run.im_func is my_cmd.forward.im_func + + +class test_LocalOrRemote(ClassChecker): + """ + Test the `ipalib.frontend.LocalOrRemote` class. + """ + _cls = frontend.LocalOrRemote + + def test_init(self): + """ + Test the `ipalib.frontend.LocalOrRemote.__init__` method. + """ + o = self.cls() + o.finalize() + assert list(o.args) == [] + assert list(o.options) == ['server'] + op = o.options.server + assert op.required is False + assert op.default is False + + def test_run(self): + """ + Test the `ipalib.frontend.LocalOrRemote.run` method. + """ + class example(self.cls): + takes_args = ['key?'] + + def forward(self, *args, **options): + return ('forward', args, options) + + def execute(self, *args, **options): + return ('execute', args, options) + + # Test when in_server=False: + (api, home) = create_test_api(in_server=False) + api.register(example) + api.finalize() + cmd = api.Command.example + assert cmd() == ('execute', (None,), dict(server=False)) + assert cmd(u'var') == ('execute', (u'var',), dict(server=False)) + assert cmd(server=True) == ('forward', (None,), dict(server=True)) + assert cmd(u'var', server=True) == \ + ('forward', (u'var',), dict(server=True)) + + # Test when in_server=True (should always call execute): + (api, home) = create_test_api(in_server=True) + api.register(example) + api.finalize() + cmd = api.Command.example + assert cmd() == ('execute', (None,), dict(server=False)) + assert cmd(u'var') == ('execute', (u'var',), dict(server=False)) + assert cmd(server=True) == ('execute', (None,), dict(server=True)) + assert cmd(u'var', server=True) == \ + ('execute', (u'var',), dict(server=True)) + + +class test_Object(ClassChecker): + """ + Test the `ipalib.frontend.Object` class. + """ + _cls = frontend.Object + + def test_class(self): + """ + Test the `ipalib.frontend.Object` class. + """ + assert self.cls.__bases__ == (plugable.Plugin,) + assert self.cls.backend is None + assert self.cls.methods is None + assert self.cls.properties is None + assert self.cls.params is None + assert self.cls.params_minus_pk is None + assert self.cls.takes_params == tuple() + + def test_init(self): + """ + Test the `ipalib.frontend.Object.__init__` method. + """ + o = self.cls() + assert o.backend is None + assert o.methods is None + assert o.properties is None + assert o.params is None + assert o.params_minus_pk is None + assert o.properties is None + + def test_set_api(self): + """ + Test the `ipalib.frontend.Object.set_api` method. + """ + # Setup for test: + class DummyAttribute(object): + def __init__(self, obj_name, attr_name, name=None): + self.obj_name = obj_name + self.attr_name = attr_name + if name is None: + self.name = '%s_%s' % (obj_name, attr_name) + else: + self.name = name + self.param = frontend.create_param(attr_name) + + def __clone__(self, attr_name): + return self.__class__( + self.obj_name, + self.attr_name, + getattr(self, attr_name) + ) + + def get_attributes(cnt, format): + for name in ['other', 'user', 'another']: + for i in xrange(cnt): + yield DummyAttribute(name, format % i) + + cnt = 10 + formats = dict( + methods='method_%d', + properties='property_%d', + ) + + + _d = dict( + Method=plugable.NameSpace( + get_attributes(cnt, formats['methods']) + ), + Property=plugable.NameSpace( + get_attributes(cnt, formats['properties']) + ), + ) + api = plugable.MagicDict(_d) + assert len(api.Method) == cnt * 3 + assert len(api.Property) == cnt * 3 + + class user(self.cls): + pass + + # Actually perform test: + o = user() + o.set_api(api) + assert read_only(o, 'api') is api + for name in ['methods', 'properties']: + namespace = getattr(o, name) + assert isinstance(namespace, plugable.NameSpace) + assert len(namespace) == cnt + f = formats[name] + for i in xrange(cnt): + attr_name = f % i + attr = namespace[attr_name] + assert isinstance(attr, DummyAttribute) + assert attr is getattr(namespace, attr_name) + assert attr.obj_name == 'user' + assert attr.attr_name == attr_name + assert attr.name == attr_name + + # Test params instance attribute + o = self.cls() + o.set_api(api) + ns = o.params + assert type(ns) is plugable.NameSpace + assert len(ns) == 0 + class example(self.cls): + takes_params = ('banana', 'apple') + o = example() + o.set_api(api) + ns = o.params + assert type(ns) is plugable.NameSpace + assert len(ns) == 2, repr(ns) + assert list(ns) == ['banana', 'apple'] + for p in ns(): + assert type(p) is parameters.Str + assert p.required is True + assert p.multivalue is False + + def test_primary_key(self): + """ + Test the `ipalib.frontend.Object.primary_key` attribute. + """ + (api, home) = create_test_api() + api.finalize() + + # Test with no primary keys: + class example1(self.cls): + takes_params = ( + 'one', + 'two', + ) + o = example1() + o.set_api(api) + assert o.primary_key is None + assert o.params_minus_pk is None + + # Test with 1 primary key: + class example2(self.cls): + takes_params = ( + 'one', + 'two', + parameters.Str('three', primary_key=True), + 'four', + ) + o = example2() + o.set_api(api) + pk = o.primary_key + assert type(pk) is parameters.Str + assert pk.name == 'three' + assert pk.primary_key is True + assert o.params[2] is o.primary_key + assert isinstance(o.params_minus_pk, plugable.NameSpace) + assert list(o.params_minus_pk) == ['one', 'two', 'four'] + + # Test with multiple primary_key: + class example3(self.cls): + takes_params = ( + parameters.Str('one', primary_key=True), + parameters.Str('two', primary_key=True), + 'three', + parameters.Str('four', primary_key=True), + ) + o = example3() + e = raises(ValueError, o.set_api, api) + assert str(e) == \ + 'example3 (Object) has multiple primary keys: one, two, four' + + def test_backend(self): + """ + Test the `ipalib.frontend.Object.backend` attribute. + """ + (api, home) = create_test_api() + class ldap(backend.Backend): + whatever = 'It worked!' + api.register(ldap) + class user(frontend.Object): + backend_name = 'ldap' + api.register(user) + api.finalize() + b = api.Object.user.backend + assert isinstance(b, ldap) + assert b.whatever == 'It worked!' + + def test_get_dn(self): + """ + Test the `ipalib.frontend.Object.get_dn` method. + """ + assert 'get_dn' in self.cls.__public__ # Public + o = self.cls() + e = raises(NotImplementedError, o.get_dn, 'primary key') + assert str(e) == 'Object.get_dn()' + class user(self.cls): + pass + o = user() + e = raises(NotImplementedError, o.get_dn, 'primary key') + assert str(e) == 'user.get_dn()' + + +class test_Attribute(ClassChecker): + """ + Test the `ipalib.frontend.Attribute` class. + """ + _cls = frontend.Attribute + + def test_class(self): + """ + Test the `ipalib.frontend.Attribute` class. + """ + assert self.cls.__bases__ == (plugable.Plugin,) + assert type(self.cls.obj) is property + assert type(self.cls.obj_name) is property + assert type(self.cls.attr_name) is property + + def test_init(self): + """ + Test the `ipalib.frontend.Attribute.__init__` method. + """ + class user_add(self.cls): + pass + o = user_add() + assert read_only(o, 'obj') is None + assert read_only(o, 'obj_name') == 'user' + assert read_only(o, 'attr_name') == 'add' + + def test_set_api(self): + """ + Test the `ipalib.frontend.Attribute.set_api` method. + """ + user_obj = 'The user frontend.Object instance' + class api(object): + Object = dict(user=user_obj) + class user_add(self.cls): + pass + o = user_add() + assert read_only(o, 'api') is None + assert read_only(o, 'obj') is None + o.set_api(api) + assert read_only(o, 'api') is api + assert read_only(o, 'obj') is user_obj + + +class test_Method(ClassChecker): + """ + Test the `ipalib.frontend.Method` class. + """ + _cls = frontend.Method + + def test_class(self): + """ + Test the `ipalib.frontend.Method` class. + """ + assert self.cls.__bases__ == (frontend.Attribute, frontend.Command) + assert self.cls.implements(frontend.Command) + assert self.cls.implements(frontend.Attribute) + + def test_init(self): + """ + Test the `ipalib.frontend.Method.__init__` method. + """ + class user_add(self.cls): + pass + o = user_add() + assert o.name == 'user_add' + assert o.obj_name == 'user' + assert o.attr_name == 'add' + assert frontend.Command.implemented_by(o) + assert frontend.Attribute.implemented_by(o) + + +class test_Property(ClassChecker): + """ + Test the `ipalib.frontend.Property` class. + """ + _cls = frontend.Property + + def get_subcls(self): + """ + Return a standard subclass of `ipalib.frontend.Property`. + """ + class user_givenname(self.cls): + 'User first name' + + @frontend.rule + def rule0_lowercase(self, value): + if not value.islower(): + return 'Must be lowercase' + return user_givenname + + def test_class(self): + """ + Test the `ipalib.frontend.Property` class. + """ + assert self.cls.__bases__ == (frontend.Attribute,) + assert self.cls.klass is parameters.Str + + def test_init(self): + """ + Test the `ipalib.frontend.Property.__init__` method. + """ + o = self.subcls() + assert len(o.rules) == 1 + assert o.rules[0].__name__ == 'rule0_lowercase' + param = o.param + assert isinstance(param, parameters.Str) + assert param.name == 'givenname' + assert param.doc == 'User first name' + + +class test_Application(ClassChecker): + """ + Test the `ipalib.frontend.Application` class. + """ + _cls = frontend.Application + + def test_class(self): + """ + Test the `ipalib.frontend.Application` class. + """ + assert self.cls.__bases__ == (frontend.Command,) + assert type(self.cls.application) is property + + def test_application(self): + """ + Test the `ipalib.frontend.Application.application` property. + """ + assert 'application' in self.cls.__public__ # Public + assert 'set_application' in self.cls.__public__ # Public + app = 'The external application' + class example(self.cls): + 'A subclass' + for o in (self.cls(), example()): + assert read_only(o, 'application') is None + e = raises(TypeError, o.set_application, None) + assert str(e) == ( + '%s.application cannot be None' % o.__class__.__name__ + ) + o.set_application(app) + assert read_only(o, 'application') is app + e = raises(AttributeError, o.set_application, app) + assert str(e) == ( + '%s.application can only be set once' % o.__class__.__name__ + ) + assert read_only(o, 'application') is app diff --git a/tests/test_ipalib/test_parameters.py b/tests/test_ipalib/test_parameters.py new file mode 100644 index 00000000..261e1481 --- /dev/null +++ b/tests/test_ipalib/test_parameters.py @@ -0,0 +1,994 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.parameters` module. +""" + +from types import NoneType +from inspect import isclass +from tests.util import raises, ClassChecker, read_only +from tests.util import dummy_ugettext, assert_equal +from tests.data import binary_bytes, utf8_bytes, unicode_str +from ipalib import parameters, request, errors2 +from ipalib.constants import TYPE_ERROR, CALLABLE_ERROR, NULLS + + +class test_DefaultFrom(ClassChecker): + """ + Test the `ipalib.parameters.DefaultFrom` class. + """ + _cls = parameters.DefaultFrom + + def test_init(self): + """ + Test the `ipalib.parameters.DefaultFrom.__init__` method. + """ + def callback(*args): + return args + keys = ('givenname', 'sn') + o = self.cls(callback, *keys) + assert read_only(o, 'callback') is callback + assert read_only(o, 'keys') == keys + lam = lambda first, last: first[0] + last + o = self.cls(lam) + assert read_only(o, 'keys') == ('first', 'last') + + # Test that TypeError is raised when callback isn't callable: + e = raises(TypeError, self.cls, 'whatever') + assert str(e) == CALLABLE_ERROR % ('callback', 'whatever', str) + + # Test that TypeError is raised when a key isn't an str: + e = raises(TypeError, self.cls, callback, 'givenname', 17) + assert str(e) == TYPE_ERROR % ('keys', str, 17, int) + + def test_call(self): + """ + Test the `ipalib.parameters.DefaultFrom.__call__` method. + """ + def callback(givenname, sn): + return givenname[0] + sn[0] + keys = ('givenname', 'sn') + o = self.cls(callback, *keys) + kw = dict( + givenname='John', + sn='Public', + hello='world', + ) + assert o(**kw) == 'JP' + assert o() is None + for key in ('givenname', 'sn'): + kw_copy = dict(kw) + del kw_copy[key] + assert o(**kw_copy) is None + + # Test using implied keys: + o = self.cls(lambda first, last: first[0] + last) + assert o(first='john', last='doe') == 'jdoe' + assert o(first='', last='doe') is None + assert o(one='john', two='doe') is None + + # Test that co_varnames slice is used: + def callback2(first, last): + letter = first[0] + return letter + last + o = self.cls(callback2) + assert o.keys == ('first', 'last') + assert o(first='john', last='doe') == 'jdoe' + + +def test_parse_param_spec(): + """ + Test the `ipalib.parameters.parse_param_spec` function. + """ + f = parameters.parse_param_spec + assert f('name') == ('name', dict(required=True, multivalue=False)) + assert f('name?') == ('name', dict(required=False, multivalue=False)) + assert f('name*') == ('name', dict(required=False, multivalue=True)) + assert f('name+') == ('name', dict(required=True, multivalue=True)) + + # Make sure other "funny" endings are *not* treated special: + assert f('name^') == ('name^', dict(required=True, multivalue=False)) + + # Test that TypeError is raised if spec isn't an str: + e = raises(TypeError, f, u'name?') + assert str(e) == TYPE_ERROR % ('spec', str, u'name?', unicode) + + # Test that ValueError is raised if len(spec) < 2: + e = raises(ValueError, f, 'n') + assert str(e) == "spec must be at least 2 characters; got 'n'" + + +class DummyRule(object): + def __init__(self, error=None): + assert error is None or type(error) is unicode + self.error = error + self.reset() + + def __call__(self, *args): + self.calls.append(args) + return self.error + + def reset(self): + self.calls = [] + + +class test_Param(ClassChecker): + """ + Test the `ipalib.parameters.Param` class. + """ + _cls = parameters.Param + + def test_init(self): + """ + Test the `ipalib.parameters.Param.__init__` method. + """ + name = 'my_param' + o = self.cls(name) + assert o.param_spec is name + assert o.name is name + assert o.nice == "Param('my_param')" + assert o.__islocked__() is True + + # Test default rules: + assert o.rules == tuple() + assert o.class_rules == tuple() + assert o.all_rules == tuple() + + # Test default kwarg values: + assert o.cli_name is name + assert o.label is None + assert o.doc == '' + assert o.required is True + assert o.multivalue is False + assert o.primary_key is False + assert o.normalizer is None + assert o.default is None + assert o.default_from is None + assert o.create_default is None + assert o._get_default is None + assert o.autofill is False + assert o.query is False + assert o.flags == frozenset() + + # Test that ValueError is raised when a kwarg from a subclass + # conflicts with an attribute: + class Subclass(self.cls): + kwargs = self.cls.kwargs + ( + ('convert', callable, None), + ) + e = raises(ValueError, Subclass, name) + assert str(e) == "kwarg 'convert' conflicts with attribute on Subclass" + + # Test type validation of keyword arguments: + class Subclass(self.cls): + kwargs = self.cls.kwargs + ( + ('extra1', bool, True), + ('extra2', str, 'Hello'), + ('extra3', (int, float), 42), + ('extra4', callable, lambda whatever: whatever + 7), + ) + o = Subclass('my_param') # Test with no **kw: + for (key, kind, default) in o.kwargs: + # Test with a type invalid for all: + value = object() + kw = {key: value} + e = raises(TypeError, Subclass, 'my_param', **kw) + if kind is callable: + assert str(e) == CALLABLE_ERROR % (key, value, type(value)) + else: + assert str(e) == TYPE_ERROR % (key, kind, value, type(value)) + # Test with None: + kw = {key: None} + Subclass('my_param', **kw) + + # Test when using unknown kwargs: + e = raises(TypeError, self.cls, 'my_param', + flags=['hello', 'world'], + whatever=u'Hooray!', + ) + assert str(e) == \ + "Param('my_param'): takes no such kwargs: 'whatever'" + e = raises(TypeError, self.cls, 'my_param', great='Yes', ape='he is!') + assert str(e) == \ + "Param('my_param'): takes no such kwargs: 'ape', 'great'" + + # Test that ValueError is raised if you provide both default_from and + # create_default: + e = raises(ValueError, self.cls, 'my_param', + default_from=lambda first, last: first[0] + last, + create_default=lambda **kw: 'The Default' + ) + assert str(e) == '%s: cannot have both %r and %r' % ( + "Param('my_param')", 'default_from', 'create_default', + ) + + # Test that _get_default gets set: + call1 = lambda first, last: first[0] + last + call2 = lambda **kw: 'The Default' + o = self.cls('my_param', default_from=call1) + assert o.default_from.callback is call1 + assert o._get_default is o.default_from + o = self.cls('my_param', create_default=call2) + assert o.create_default is call2 + assert o._get_default is call2 + + def test_repr(self): + """ + Test the `ipalib.parameters.Param.__repr__` method. + """ + for name in ['name', 'name?', 'name*', 'name+']: + o = self.cls(name) + assert repr(o) == 'Param(%r)' % name + o = self.cls('name', required=False) + assert repr(o) == "Param('name', required=False)" + o = self.cls('name', multivalue=True) + assert repr(o) == "Param('name', multivalue=True)" + + def test_clone(self): + """ + Test the `ipalib.parameters.Param.clone` method. + """ + # Test with the defaults + orig = self.cls('my_param') + clone = orig.clone() + assert clone is not orig + assert type(clone) is self.cls + assert clone.name is orig.name + for (key, kind, default) in self.cls.kwargs: + assert getattr(clone, key) is getattr(orig, key) + + # Test with a param spec: + orig = self.cls('my_param*') + assert orig.param_spec == 'my_param*' + clone = orig.clone() + assert clone.param_spec == 'my_param' + assert clone is not orig + assert type(clone) is self.cls + for (key, kind, default) in self.cls.kwargs: + assert getattr(clone, key) is getattr(orig, key) + + # Test with overrides: + orig = self.cls('my_param*') + assert orig.required is False + assert orig.multivalue is True + clone = orig.clone(required=True) + assert clone is not orig + assert type(clone) is self.cls + assert clone.required is True + assert clone.multivalue is True + assert clone.param_spec == 'my_param' + assert clone.name == 'my_param' + + def test_get_label(self): + """ + Test the `ipalib.parameters.get_label` method. + """ + context = request.context + cli_name = 'the_cli_name' + message = 'The Label' + label = lambda _: _(message) + o = self.cls('name', cli_name=cli_name, label=label) + assert o.label is label + + ## Scenario 1: label=callable (a lambda form) + + # Test with no context.ugettext: + assert not hasattr(context, 'ugettext') + assert_equal(o.get_label(), u'The Label') + + # Test with dummy context.ugettext: + assert not hasattr(context, 'ugettext') + dummy = dummy_ugettext() + context.ugettext = dummy + assert o.get_label() is dummy.translation + assert dummy.message is message + del context.ugettext + + ## Scenario 2: label=None + o = self.cls('name', cli_name=cli_name) + assert o.label is None + + # Test with no context.ugettext: + assert not hasattr(context, 'ugettext') + assert_equal(o.get_label(), u'the_cli_name') + + # Test with dummy context.ugettext: + assert not hasattr(context, 'ugettext') + dummy = dummy_ugettext() + context.ugettext = dummy + assert_equal(o.get_label(), u'the_cli_name') + assert not hasattr(dummy, 'message') + + # Cleanup + del context.ugettext + assert not hasattr(context, 'ugettext') + + def test_convert(self): + """ + Test the `ipalib.parameters.Param.convert` method. + """ + okay = ('Hello', u'Hello', 0, 4.2, True, False) + class Subclass(self.cls): + def _convert_scalar(self, value, index=None): + return value + + # Test when multivalue=False: + o = Subclass('my_param') + for value in NULLS: + assert o.convert(value) is None + for value in okay: + assert o.convert(value) is value + + # Test when multivalue=True: + o = Subclass('my_param', multivalue=True) + for value in NULLS: + assert o.convert(value) is None + assert o.convert(okay) == okay + assert o.convert(NULLS) is None + assert o.convert(okay + NULLS) == okay + assert o.convert(NULLS + okay) == okay + for value in okay: + assert o.convert(value) == (value,) + assert o.convert([None, value]) == (value,) + assert o.convert([value, None]) == (value,) + + def test_convert_scalar(self): + """ + Test the `ipalib.parameters.Param._convert_scalar` method. + """ + dummy = dummy_ugettext() + + # Test with correct type: + o = self.cls('my_param') + assert o._convert_scalar(None) is None + assert dummy.called() is False + # Test with incorrect type + e = raises(errors2.ConversionError, o._convert_scalar, 'hello', index=17) + + def test_validate(self): + """ + Test the `ipalib.parameters.Param.validate` method. + """ + + # Test in default state (with no rules, no kwarg): + o = self.cls('my_param') + e = raises(errors2.RequirementError, o.validate, None) + assert e.name == 'my_param' + + # Test with required=False + o = self.cls('my_param', required=False) + assert o.required is False + assert o.validate(None) is None + + # Test with query=True: + o = self.cls('my_param', query=True) + assert o.query is True + e = raises(errors2.RequirementError, o.validate, None) + assert_equal(e.name, 'my_param') + + # Test with multivalue=True: + o = self.cls('my_param', multivalue=True) + e = raises(TypeError, o.validate, []) + assert str(e) == TYPE_ERROR % ('value', tuple, [], list) + e = raises(ValueError, o.validate, tuple()) + assert str(e) == 'value: empty tuple must be converted to None' + + # Test with wrong (scalar) type: + e = raises(TypeError, o.validate, (None, None, 42, None)) + assert str(e) == TYPE_ERROR % ('value[2]', NoneType, 42, int) + o = self.cls('my_param') + e = raises(TypeError, o.validate, 'Hello') + assert str(e) == TYPE_ERROR % ('value', NoneType, 'Hello', str) + + class Example(self.cls): + type = int + + # Test with some rules and multivalue=False + pass1 = DummyRule() + pass2 = DummyRule() + fail = DummyRule(u'no good') + o = Example('example', pass1, pass2) + assert o.multivalue is False + assert o.validate(11) is None + assert pass1.calls == [(request.ugettext, 11)] + assert pass2.calls == [(request.ugettext, 11)] + pass1.reset() + pass2.reset() + o = Example('example', pass1, pass2, fail) + e = raises(errors2.ValidationError, o.validate, 42) + assert e.name == 'example' + assert e.error == u'no good' + assert e.index is None + assert pass1.calls == [(request.ugettext, 42)] + assert pass2.calls == [(request.ugettext, 42)] + assert fail.calls == [(request.ugettext, 42)] + + # Test with some rules and multivalue=True + pass1 = DummyRule() + pass2 = DummyRule() + fail = DummyRule(u'this one is not good') + o = Example('example', pass1, pass2, multivalue=True) + assert o.multivalue is True + assert o.validate((3, 9)) is None + assert pass1.calls == [ + (request.ugettext, 3), + (request.ugettext, 9), + ] + assert pass2.calls == [ + (request.ugettext, 3), + (request.ugettext, 9), + ] + pass1.reset() + pass2.reset() + o = Example('multi_example', pass1, pass2, fail, multivalue=True) + assert o.multivalue is True + e = raises(errors2.ValidationError, o.validate, (3, 9)) + assert e.name == 'multi_example' + assert e.error == u'this one is not good' + assert e.index == 0 + assert pass1.calls == [(request.ugettext, 3)] + assert pass2.calls == [(request.ugettext, 3)] + assert fail.calls == [(request.ugettext, 3)] + + def test_validate_scalar(self): + """ + Test the `ipalib.parameters.Param._validate_scalar` method. + """ + class MyParam(self.cls): + type = bool + okay = DummyRule() + o = MyParam('my_param', okay) + + # Test that TypeError is appropriately raised: + e = raises(TypeError, o._validate_scalar, 0) + assert str(e) == TYPE_ERROR % ('value', bool, 0, int) + e = raises(TypeError, o._validate_scalar, 'Hi', index=4) + assert str(e) == TYPE_ERROR % ('value[4]', bool, 'Hi', str) + e = raises(TypeError, o._validate_scalar, True, index=3.0) + assert str(e) == TYPE_ERROR % ('index', int, 3.0, float) + + # Test with passing rule: + assert o._validate_scalar(True, index=None) is None + assert o._validate_scalar(False, index=None) is None + assert okay.calls == [ + (request.ugettext, True), + (request.ugettext, False), + ] + + # Test with a failing rule: + okay = DummyRule() + fail = DummyRule(u'this describes the error') + o = MyParam('my_param', okay, fail) + e = raises(errors2.ValidationError, o._validate_scalar, True) + assert e.name == 'my_param' + assert e.error == u'this describes the error' + assert e.index is None + e = raises(errors2.ValidationError, o._validate_scalar, False, index=2) + assert e.name == 'my_param' + assert e.error == u'this describes the error' + assert e.index == 2 + assert okay.calls == [ + (request.ugettext, True), + (request.ugettext, False), + ] + assert fail.calls == [ + (request.ugettext, True), + (request.ugettext, False), + ] + + def test_get_default(self): + """ + Test the `ipalib.parameters.Param._get_default` method. + """ + class PassThrough(object): + value = None + + def __call__(self, value): + assert self.value is None + assert value is not None + self.value = value + return value + + def reset(self): + assert self.value is not None + self.value = None + + class Str(self.cls): + type = unicode + + def __init__(self, name, **kw): + self._convert_scalar = PassThrough() + super(Str, self).__init__(name, **kw) + + # Test with only a static default: + o = Str('my_str', + normalizer=PassThrough(), + default=u'Static Default', + ) + assert_equal(o.get_default(), u'Static Default') + assert o._convert_scalar.value is None + assert o.normalizer.value is None + + # Test with default_from: + o = Str('my_str', + normalizer=PassThrough(), + default=u'Static Default', + default_from=lambda first, last: first[0] + last, + ) + assert_equal(o.get_default(), u'Static Default') + assert o._convert_scalar.value is None + assert o.normalizer.value is None + default = o.get_default(first=u'john', last='doe') + assert_equal(default, u'jdoe') + assert o._convert_scalar.value is default + assert o.normalizer.value is default + + # Test with create_default: + o = Str('my_str', + normalizer=PassThrough(), + default=u'Static Default', + create_default=lambda **kw: u'The created default', + ) + default = o.get_default(first=u'john', last='doe') + assert_equal(default, u'The created default') + assert o._convert_scalar.value is default + assert o.normalizer.value is default + + +class test_Flag(ClassChecker): + """ + Test the `ipalib.parameters.Flag` class. + """ + _cls = parameters.Flag + + def test_init(self): + """ + Test the `ipalib.parameters.Flag.__init__` method. + """ + # Test with no kwargs: + o = self.cls('my_flag') + assert o.type is bool + assert isinstance(o, parameters.Bool) + assert o.autofill is True + assert o.default is False + + # Test that TypeError is raise if default is not a bool: + e = raises(TypeError, self.cls, 'my_flag', default=None) + assert str(e) == TYPE_ERROR % ('default', bool, None, NoneType) + + # Test with autofill=False, default=True + o = self.cls('my_flag', autofill=False, default=True) + assert o.autofill is True + assert o.default is True + + # Test when cloning: + orig = self.cls('my_flag') + for clone in [orig.clone(), orig.clone(autofill=False)]: + assert clone.autofill is True + assert clone.default is False + assert clone is not orig + assert type(clone) is self.cls + + # Test when cloning with default=True/False + orig = self.cls('my_flag') + assert orig.clone().default is False + assert orig.clone(default=True).default is True + orig = self.cls('my_flag', default=True) + assert orig.clone().default is True + assert orig.clone(default=False).default is False + + +class test_Data(ClassChecker): + """ + Test the `ipalib.parameters.Data` class. + """ + _cls = parameters.Data + + def test_init(self): + """ + Test the `ipalib.parameters.Data.__init__` method. + """ + o = self.cls('my_data') + assert o.type is NoneType + assert o.rules == tuple() + assert o.class_rules == tuple() + assert o.all_rules == tuple() + assert o.minlength is None + assert o.maxlength is None + assert o.length is None + assert not hasattr(o, 'pattern') + + # Test mixing length with minlength or maxlength: + o = self.cls('my_data', length=5) + assert o.length == 5 + permutations = [ + dict(minlength=3), + dict(maxlength=7), + dict(minlength=3, maxlength=7), + ] + for kw in permutations: + o = self.cls('my_data', **kw) + for (key, value) in kw.iteritems(): + assert getattr(o, key) == value + e = raises(ValueError, self.cls, 'my_data', length=5, **kw) + assert str(e) == \ + "Data('my_data'): cannot mix length with minlength or maxlength" + + # Test when minlength or maxlength are less than 1: + e = raises(ValueError, self.cls, 'my_data', minlength=0) + assert str(e) == "Data('my_data'): minlength must be >= 1; got 0" + e = raises(ValueError, self.cls, 'my_data', maxlength=0) + assert str(e) == "Data('my_data'): maxlength must be >= 1; got 0" + + # Test when minlength > maxlength: + e = raises(ValueError, self.cls, 'my_data', minlength=22, maxlength=15) + assert str(e) == \ + "Data('my_data'): minlength > maxlength (minlength=22, maxlength=15)" + + # Test when minlength == maxlength + e = raises(ValueError, self.cls, 'my_data', minlength=7, maxlength=7) + assert str(e) == \ + "Data('my_data'): minlength == maxlength; use length=7 instead" + + +class test_Bytes(ClassChecker): + """ + Test the `ipalib.parameters.Bytes` class. + """ + _cls = parameters.Bytes + + def test_init(self): + """ + Test the `ipalib.parameters.Bytes.__init__` method. + """ + o = self.cls('my_bytes') + assert o.type is str + assert o.rules == tuple() + assert o.class_rules == tuple() + assert o.all_rules == tuple() + assert o.minlength is None + assert o.maxlength is None + assert o.length is None + assert o.pattern is None + + # Test mixing length with minlength or maxlength: + o = self.cls('my_bytes', length=5) + assert o.length == 5 + assert len(o.class_rules) == 1 + assert len(o.rules) == 0 + assert len(o.all_rules) == 1 + permutations = [ + dict(minlength=3), + dict(maxlength=7), + dict(minlength=3, maxlength=7), + ] + for kw in permutations: + o = self.cls('my_bytes', **kw) + assert len(o.class_rules) == len(kw) + assert len(o.rules) == 0 + assert len(o.all_rules) == len(kw) + for (key, value) in kw.iteritems(): + assert getattr(o, key) == value + e = raises(ValueError, self.cls, 'my_bytes', length=5, **kw) + assert str(e) == \ + "Bytes('my_bytes'): cannot mix length with minlength or maxlength" + + # Test when minlength or maxlength are less than 1: + e = raises(ValueError, self.cls, 'my_bytes', minlength=0) + assert str(e) == "Bytes('my_bytes'): minlength must be >= 1; got 0" + e = raises(ValueError, self.cls, 'my_bytes', maxlength=0) + assert str(e) == "Bytes('my_bytes'): maxlength must be >= 1; got 0" + + # Test when minlength > maxlength: + e = raises(ValueError, self.cls, 'my_bytes', minlength=22, maxlength=15) + assert str(e) == \ + "Bytes('my_bytes'): minlength > maxlength (minlength=22, maxlength=15)" + + # Test when minlength == maxlength + e = raises(ValueError, self.cls, 'my_bytes', minlength=7, maxlength=7) + assert str(e) == \ + "Bytes('my_bytes'): minlength == maxlength; use length=7 instead" + + def test_rule_minlength(self): + """ + Test the `ipalib.parameters.Bytes._rule_minlength` method. + """ + o = self.cls('my_bytes', minlength=3) + assert o.minlength == 3 + rule = o._rule_minlength + translation = u'minlength=%(minlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in ('abc', 'four', '12345'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in ('', 'a', '12'): + assert_equal( + rule(dummy, value), + translation % dict(minlength=3) + ) + assert dummy.message == 'must be at least %(minlength)d bytes' + assert dummy.called() is True + dummy.reset() + + def test_rule_maxlength(self): + """ + Test the `ipalib.parameters.Bytes._rule_maxlength` method. + """ + o = self.cls('my_bytes', maxlength=4) + assert o.maxlength == 4 + rule = o._rule_maxlength + translation = u'maxlength=%(maxlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in ('ab', '123', 'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in ('12345', 'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(maxlength=4) + ) + assert dummy.message == 'can be at most %(maxlength)d bytes' + assert dummy.called() is True + dummy.reset() + + def test_rule_length(self): + """ + Test the `ipalib.parameters.Bytes._rule_length` method. + """ + o = self.cls('my_bytes', length=4) + assert o.length == 4 + rule = o._rule_length + translation = u'length=%(length)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in ('1234', 'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in ('ab', '123', '12345', 'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(length=4), + ) + assert dummy.message == 'must be exactly %(length)d bytes' + assert dummy.called() is True + dummy.reset() + + +class test_Str(ClassChecker): + """ + Test the `ipalib.parameters.Str` class. + """ + _cls = parameters.Str + + def test_init(self): + """ + Test the `ipalib.parameters.Str.__init__` method. + """ + o = self.cls('my_str') + assert o.type is unicode + assert o.minlength is None + assert o.maxlength is None + assert o.length is None + assert o.pattern is None + + def test_convert_scalar(self): + """ + Test the `ipalib.parameters.Str._convert_scalar` method. + """ + o = self.cls('my_str') + mthd = o._convert_scalar + for value in (u'Hello', 42, 1.2): + assert mthd(value) == unicode(value) + for value in [True, 'Hello', (u'Hello',), [42.3], dict(one=1)]: + e = raises(errors2.ConversionError, mthd, value) + assert e.name == 'my_str' + assert e.index is None + assert_equal(e.error, u'must be Unicode text') + e = raises(errors2.ConversionError, mthd, value, index=18) + assert e.name == 'my_str' + assert e.index == 18 + assert_equal(e.error, u'must be Unicode text') + + def test_rule_minlength(self): + """ + Test the `ipalib.parameters.Str._rule_minlength` method. + """ + o = self.cls('my_str', minlength=3) + assert o.minlength == 3 + rule = o._rule_minlength + translation = u'minlength=%(minlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (u'abc', u'four', u'12345'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (u'', u'a', u'12'): + assert_equal( + rule(dummy, value), + translation % dict(minlength=3) + ) + assert dummy.message == 'must be at least %(minlength)d characters' + assert dummy.called() is True + dummy.reset() + + def test_rule_maxlength(self): + """ + Test the `ipalib.parameters.Str._rule_maxlength` method. + """ + o = self.cls('my_str', maxlength=4) + assert o.maxlength == 4 + rule = o._rule_maxlength + translation = u'maxlength=%(maxlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (u'ab', u'123', u'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (u'12345', u'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(maxlength=4) + ) + assert dummy.message == 'can be at most %(maxlength)d characters' + assert dummy.called() is True + dummy.reset() + + def test_rule_length(self): + """ + Test the `ipalib.parameters.Str._rule_length` method. + """ + o = self.cls('my_str', length=4) + assert o.length == 4 + rule = o._rule_length + translation = u'length=%(length)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (u'1234', u'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (u'ab', u'123', u'12345', u'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(length=4), + ) + assert dummy.message == 'must be exactly %(length)d characters' + assert dummy.called() is True + dummy.reset() + + +class test_StrEnum(ClassChecker): + """ + Test the `ipalib.parameters.StrEnum` class. + """ + _cls = parameters.StrEnum + + def test_init(self): + """ + Test the `ipalib.parameters.StrEnum.__init__` method. + """ + values = (u'Hello', u'naughty', u'nurse!') + o = self.cls('my_strenum', values=values) + assert o.type is unicode + assert o.values is values + assert o.class_rules == (o._rule_values,) + assert o.rules == tuple() + assert o.all_rules == (o._rule_values,) + + badvalues = (u'Hello', 'naughty', u'nurse!') + e = raises(TypeError, self.cls, 'my_enum', values=badvalues) + assert str(e) == TYPE_ERROR % ( + "StrEnum('my_enum') values[1]", unicode, 'naughty', str + ) + + def test_rules_values(self): + """ + Test the `ipalib.parameters.StrEnum._rule_values` method. + """ + values = (u'Hello', u'naughty', u'nurse!') + o = self.cls('my_enum', values=values) + rule = o._rule_values + translation = u'values=%(values)s' + dummy = dummy_ugettext(translation) + + # Test with passing values: + for v in values: + assert rule(dummy, v) is None + assert dummy.called() is False + + # Test with failing values: + for val in (u'Howdy', u'quiet', u'library!'): + assert_equal( + rule(dummy, val), + translation % dict(values=values), + ) + assert_equal(dummy.message, 'must be one of %(values)r') + dummy.reset() + + +def test_create_param(): + """ + Test the `ipalib.parameters.create_param` function. + """ + f = parameters.create_param + + # Test that Param instances are returned unchanged: + params = ( + parameters.Param('one?'), + parameters.Int('two+'), + parameters.Str('three*'), + parameters.Bytes('four'), + ) + for p in params: + assert f(p) is p + + # Test that the spec creates an Str instance: + for spec in ('one?', 'two+', 'three*', 'four'): + (name, kw) = parameters.parse_param_spec(spec) + p = f(spec) + assert p.param_spec is spec + assert p.name == name + assert p.required is kw['required'] + assert p.multivalue is kw['multivalue'] + + # Test that TypeError is raised when spec is neither a Param nor a str: + for spec in (u'one', 42, parameters.Param, parameters.Str): + e = raises(TypeError, f, spec) + assert str(e) == \ + TYPE_ERROR % ('spec', (str, parameters.Param), spec, type(spec)) + + +def test_messages(): + """ + Test module level message in `ipalib.parameters`. + """ + for name in dir(parameters): + if name.startswith('_'): + continue + attr = getattr(parameters, name) + if not (isclass(attr) and issubclass(attr, parameters.Param)): + continue + assert type(attr.type_error) is str + assert attr.type_error in parameters.__messages diff --git a/tests/test_ipalib/test_plugable.py b/tests/test_ipalib/test_plugable.py new file mode 100644 index 00000000..c6c84fa1 --- /dev/null +++ b/tests/test_ipalib/test_plugable.py @@ -0,0 +1,756 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.plugable` module. +""" + +import inspect +from tests.util import raises, no_set, no_del, read_only +from tests.util import getitem, setitem, delitem +from tests.util import ClassChecker, create_test_api +from ipalib import plugable, errors, errors2 + + +class test_SetProxy(ClassChecker): + """ + Test the `ipalib.plugable.SetProxy` class. + """ + _cls = plugable.SetProxy + + def test_class(self): + """ + Test the `ipalib.plugable.SetProxy` class. + """ + assert self.cls.__bases__ == (plugable.ReadOnly,) + + def test_init(self): + """ + Test the `ipalib.plugable.SetProxy.__init__` method. + """ + okay = (set, frozenset, dict) + fail = (list, tuple) + for t in okay: + self.cls(t()) + raises(TypeError, self.cls, t) + for t in fail: + raises(TypeError, self.cls, t()) + raises(TypeError, self.cls, t) + + def test_SetProxy(self): + """ + Test container emulation of `ipalib.plugable.SetProxy` class. + """ + def get_key(i): + return 'key_%d' % i + + cnt = 10 + target = set() + proxy = self.cls(target) + for i in xrange(cnt): + key = get_key(i) + + # Check initial state + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert key not in proxy + assert key not in target + + # Add and test again + target.add(key) + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert key in proxy + assert key in target + + +class test_DictProxy(ClassChecker): + """ + Test the `ipalib.plugable.DictProxy` class. + """ + _cls = plugable.DictProxy + + def test_class(self): + """ + Test the `ipalib.plugable.DictProxy` class. + """ + assert self.cls.__bases__ == (plugable.SetProxy,) + + def test_init(self): + """ + Test the `ipalib.plugable.DictProxy.__init__` method. + """ + self.cls(dict()) + raises(TypeError, self.cls, dict) + fail = (set, frozenset, list, tuple) + for t in fail: + raises(TypeError, self.cls, t()) + raises(TypeError, self.cls, t) + + def test_DictProxy(self): + """ + Test container emulation of `ipalib.plugable.DictProxy` class. + """ + def get_kv(i): + return ( + 'key_%d' % i, + 'val_%d' % i, + ) + cnt = 10 + target = dict() + proxy = self.cls(target) + for i in xrange(cnt): + (key, val) = get_kv(i) + + # Check initial state + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert list(proxy()) == [target[k] for k in sorted(target)] + assert key not in proxy + raises(KeyError, getitem, proxy, key) + + # Add and test again + target[key] = val + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert list(proxy()) == [target[k] for k in sorted(target)] + + # Verify TypeError is raised trying to set/del via proxy + raises(TypeError, setitem, proxy, key, val) + raises(TypeError, delitem, proxy, key) + + +class test_MagicDict(ClassChecker): + """ + Test the `ipalib.plugable.MagicDict` class. + """ + _cls = plugable.MagicDict + + def test_class(self): + """ + Test the `ipalib.plugable.MagicDict` class. + """ + assert self.cls.__bases__ == (plugable.DictProxy,) + for non_dict in ('hello', 69, object): + raises(TypeError, self.cls, non_dict) + + def test_MagicDict(self): + """ + Test container emulation of `ipalib.plugable.MagicDict` class. + """ + cnt = 10 + keys = [] + d = dict() + dictproxy = self.cls(d) + for i in xrange(cnt): + key = 'key_%d' % i + val = 'val_%d' % i + keys.append(key) + + # Test thet key does not yet exist + assert len(dictproxy) == i + assert key not in dictproxy + assert not hasattr(dictproxy, key) + raises(KeyError, getitem, dictproxy, key) + raises(AttributeError, getattr, dictproxy, key) + + # Test that items/attributes cannot be set on dictproxy: + raises(TypeError, setitem, dictproxy, key, val) + raises(AttributeError, setattr, dictproxy, key, val) + + # Test that additions in d are reflected in dictproxy: + d[key] = val + assert len(dictproxy) == i + 1 + assert key in dictproxy + assert hasattr(dictproxy, key) + assert dictproxy[key] is val + assert read_only(dictproxy, key) is val + + # Test __iter__ + assert list(dictproxy) == keys + + for key in keys: + # Test that items cannot be deleted through dictproxy: + raises(TypeError, delitem, dictproxy, key) + raises(AttributeError, delattr, dictproxy, key) + + # Test that deletions in d are reflected in dictproxy + del d[key] + assert len(dictproxy) == len(d) + assert key not in dictproxy + raises(KeyError, getitem, dictproxy, key) + raises(AttributeError, getattr, dictproxy, key) + + +class test_Plugin(ClassChecker): + """ + Test the `ipalib.plugable.Plugin` class. + """ + _cls = plugable.Plugin + + def test_class(self): + """ + Test the `ipalib.plugable.Plugin` class. + """ + assert self.cls.__bases__ == (plugable.ReadOnly,) + assert self.cls.__public__ == frozenset() + assert type(self.cls.api) is property + + def test_init(self): + """ + Test the `ipalib.plugable.Plugin.__init__` method. + """ + o = self.cls() + assert o.name == 'Plugin' + assert o.module == 'ipalib.plugable' + assert o.fullname == 'ipalib.plugable.Plugin' + assert o.doc == inspect.getdoc(self.cls) + class some_subclass(self.cls): + """ + Do sub-classy things. + + Although it doesn't know how to comport itself and is not for mixed + company, this class *is* useful as we all need a little sub-class + now and then. + + One more paragraph. + """ + o = some_subclass() + assert o.name == 'some_subclass' + assert o.module == __name__ + assert o.fullname == '%s.some_subclass' % __name__ + assert o.doc == inspect.getdoc(some_subclass) + assert o.summary == 'Do sub-classy things.' + class another_subclass(self.cls): + pass + o = another_subclass() + assert o.doc is None + assert o.summary == '<%s>' % o.fullname + + # Test that Plugin makes sure the subclass hasn't defined attributes + # whose names conflict with the logger methods set in Plugin.__init__(): + class check(self.cls): + info = 'whatever' + e = raises(StandardError, check) + assert str(e) == \ + "check.info attribute ('whatever') conflicts with Plugin logger" + + def test_implements(self): + """ + Test the `ipalib.plugable.Plugin.implements` classmethod. + """ + class example(self.cls): + __public__ = frozenset(( + 'some_method', + 'some_property', + )) + class superset(self.cls): + __public__ = frozenset(( + 'some_method', + 'some_property', + 'another_property', + )) + class subset(self.cls): + __public__ = frozenset(( + 'some_property', + )) + class any_object(object): + __public__ = frozenset(( + 'some_method', + 'some_property', + )) + + for ex in (example, example()): + # Test using str: + assert ex.implements('some_method') + assert not ex.implements('another_method') + + # Test using frozenset: + assert ex.implements(frozenset(['some_method'])) + assert not ex.implements( + frozenset(['some_method', 'another_method']) + ) + + # Test using another object/class with __public__ frozenset: + assert ex.implements(example) + assert ex.implements(example()) + + assert ex.implements(subset) + assert not subset.implements(ex) + + assert not ex.implements(superset) + assert superset.implements(ex) + + assert ex.implements(any_object) + assert ex.implements(any_object()) + + def test_implemented_by(self): + """ + Test the `ipalib.plugable.Plugin.implemented_by` classmethod. + """ + class base(self.cls): + __public__ = frozenset(( + 'attr0', + 'attr1', + 'attr2', + )) + + class okay(base): + def attr0(self): + pass + def __get_attr1(self): + assert False # Make sure property isn't accesed on instance + attr1 = property(__get_attr1) + attr2 = 'hello world' + another_attr = 'whatever' + + class fail(base): + def __init__(self): + # Check that class, not instance is inspected: + self.attr2 = 'hello world' + def attr0(self): + pass + def __get_attr1(self): + assert False # Make sure property isn't accesed on instance + attr1 = property(__get_attr1) + another_attr = 'whatever' + + # Test that AssertionError is raised trying to pass something not + # subclass nor instance of base: + raises(AssertionError, base.implemented_by, object) + + # Test on subclass with needed attributes: + assert base.implemented_by(okay) is True + assert base.implemented_by(okay()) is True + + # Test on subclass *without* needed attributes: + assert base.implemented_by(fail) is False + assert base.implemented_by(fail()) is False + + def test_set_api(self): + """ + Test the `ipalib.plugable.Plugin.set_api` method. + """ + api = 'the api instance' + o = self.cls() + assert o.api is None + e = raises(AssertionError, o.set_api, None) + assert str(e) == 'set_api() argument cannot be None' + o.set_api(api) + assert o.api is api + e = raises(AssertionError, o.set_api, api) + assert str(e) == 'set_api() can only be called once' + + def test_finalize(self): + """ + Test the `ipalib.plugable.Plugin.finalize` method. + """ + o = self.cls() + assert not o.__islocked__() + o.finalize() + assert o.__islocked__() + + def test_call(self): + """ + Test the `ipalib.plugable.Plugin.call` method. + """ + o = self.cls() + o.call('/bin/true') is None + e = raises(errors2.SubprocessError, o.call, '/bin/false') + assert e.returncode == 1 + assert e.argv == ('/bin/false',) + + +class test_PluginProxy(ClassChecker): + """ + Test the `ipalib.plugable.PluginProxy` class. + """ + _cls = plugable.PluginProxy + + def test_class(self): + """ + Test the `ipalib.plugable.PluginProxy` class. + """ + assert self.cls.__bases__ == (plugable.SetProxy,) + + def test_proxy(self): + """ + Test proxy behaviour of `ipalib.plugable.PluginProxy` instance. + """ + # Setup: + class base(object): + __public__ = frozenset(( + 'public_0', + 'public_1', + '__call__', + )) + + def public_0(self): + return 'public_0' + + def public_1(self): + return 'public_1' + + def __call__(self, caller): + return 'ya called it, %s.' % caller + + def private_0(self): + return 'private_0' + + def private_1(self): + return 'private_1' + + class plugin(base): + name = 'user_add' + attr_name = 'add' + doc = 'add a new user' + + # Test that TypeError is raised when base is not a class: + raises(TypeError, self.cls, base(), None) + + # Test that ValueError is raised when target is not instance of base: + raises(ValueError, self.cls, base, object()) + + # Test with correct arguments: + i = plugin() + p = self.cls(base, i) + assert read_only(p, 'name') is plugin.name + assert read_only(p, 'doc') == plugin.doc + assert list(p) == sorted(base.__public__) + + # Test normal methods: + for n in xrange(2): + pub = 'public_%d' % n + priv = 'private_%d' % n + assert getattr(i, pub)() == pub + assert getattr(p, pub)() == pub + assert hasattr(p, pub) + assert getattr(i, priv)() == priv + assert not hasattr(p, priv) + + # Test __call__: + value = 'ya called it, dude.' + assert i('dude') == value + assert p('dude') == value + assert callable(p) + + # Test name_attr='name' kw arg + i = plugin() + p = self.cls(base, i, 'attr_name') + assert read_only(p, 'name') == 'add' + + def test_implements(self): + """ + Test the `ipalib.plugable.PluginProxy.implements` method. + """ + class base(object): + __public__ = frozenset() + name = 'base' + doc = 'doc' + @classmethod + def implements(cls, arg): + return arg + 7 + + class sub(base): + @classmethod + def implements(cls, arg): + """ + Defined to make sure base.implements() is called, not + target.implements() + """ + return arg + + o = sub() + p = self.cls(base, o) + assert p.implements(3) == 10 + + def test_clone(self): + """ + Test the `ipalib.plugable.PluginProxy.__clone__` method. + """ + class base(object): + __public__ = frozenset() + class sub(base): + name = 'some_name' + doc = 'doc' + label = 'another_name' + + p = self.cls(base, sub()) + assert read_only(p, 'name') == 'some_name' + c = p.__clone__('label') + assert isinstance(c, self.cls) + assert c is not p + assert read_only(c, 'name') == 'another_name' + + +def test_Registrar(): + """ + Test the `ipalib.plugable.Registrar` class + """ + class Base1(object): + pass + class Base2(object): + pass + class Base3(object): + pass + class plugin1(Base1): + pass + class plugin2(Base2): + pass + class plugin3(Base3): + pass + + # Test creation of Registrar: + r = plugable.Registrar(Base1, Base2) + + # Test __iter__: + assert list(r) == ['Base1', 'Base2'] + + # Test __hasitem__, __getitem__: + for base in [Base1, Base2]: + name = base.__name__ + assert name in r + assert r[name] is base + magic = getattr(r, name) + assert type(magic) is plugable.MagicDict + assert len(magic) == 0 + + # Check that TypeError is raised trying to register something that isn't + # a class: + p = plugin1() + e = raises(TypeError, r, p) + assert str(e) == 'plugin must be a class; got %r' % p + + # Check that SubclassError is raised trying to register a class that is + # not a subclass of an allowed base: + e = raises(errors2.PluginSubclassError, r, plugin3) + assert e.plugin is plugin3 + + # Check that registration works + r(plugin1) + assert len(r.Base1) == 1 + assert r.Base1['plugin1'] is plugin1 + assert r.Base1.plugin1 is plugin1 + + # Check that DuplicateError is raised trying to register exact class + # again: + e = raises(errors2.PluginDuplicateError, r, plugin1) + assert e.plugin is plugin1 + + # Check that OverrideError is raised trying to register class with same + # name and same base: + orig1 = plugin1 + class base1_extended(Base1): + pass + class plugin1(base1_extended): + pass + e = raises(errors2.PluginOverrideError, r, plugin1) + assert e.base == 'Base1' + assert e.name == 'plugin1' + assert e.plugin is plugin1 + + # Check that overriding works + r(plugin1, override=True) + assert len(r.Base1) == 1 + assert r.Base1.plugin1 is plugin1 + assert r.Base1.plugin1 is not orig1 + + # Check that MissingOverrideError is raised trying to override a name + # not yet registerd: + e = raises(errors2.PluginMissingOverrideError, r, plugin2, override=True) + assert e.base == 'Base2' + assert e.name == 'plugin2' + assert e.plugin is plugin2 + + # Test that another plugin can be registered: + assert len(r.Base2) == 0 + r(plugin2) + assert len(r.Base2) == 1 + assert r.Base2.plugin2 is plugin2 + + # Setup to test more registration: + class plugin1a(Base1): + pass + r(plugin1a) + + class plugin1b(Base1): + pass + r(plugin1b) + + class plugin2a(Base2): + pass + r(plugin2a) + + class plugin2b(Base2): + pass + r(plugin2b) + + # Again test __hasitem__, __getitem__: + for base in [Base1, Base2]: + name = base.__name__ + assert name in r + assert r[name] is base + magic = getattr(r, name) + assert len(magic) == 3 + for key in magic: + klass = magic[key] + assert getattr(magic, key) is klass + assert issubclass(klass, base) + + +class test_API(ClassChecker): + """ + Test the `ipalib.plugable.API` class. + """ + + _cls = plugable.API + + def test_API(self): + """ + Test the `ipalib.plugable.API` class. + """ + assert issubclass(plugable.API, plugable.ReadOnly) + + # Setup the test bases, create the API: + class base0(plugable.Plugin): + __public__ = frozenset(( + 'method', + )) + + def method(self, n): + return n + + class base1(plugable.Plugin): + __public__ = frozenset(( + 'method', + )) + + def method(self, n): + return n + 1 + + api = plugable.API(base0, base1) + api.env.mode = 'unit_test' + api.env.in_tree = True + r = api.register + assert isinstance(r, plugable.Registrar) + assert read_only(api, 'register') is r + + class base0_plugin0(base0): + pass + r(base0_plugin0) + + class base0_plugin1(base0): + pass + r(base0_plugin1) + + class base0_plugin2(base0): + pass + r(base0_plugin2) + + class base1_plugin0(base1): + pass + r(base1_plugin0) + + class base1_plugin1(base1): + pass + r(base1_plugin1) + + class base1_plugin2(base1): + pass + r(base1_plugin2) + + # Test API instance: + assert api.isdone('bootstrap') is False + assert api.isdone('finalize') is False + api.finalize() + assert api.isdone('bootstrap') is True + assert api.isdone('finalize') is True + + def get_base(b): + return 'base%d' % b + + def get_plugin(b, p): + return 'base%d_plugin%d' % (b, p) + + for b in xrange(2): + base_name = get_base(b) + ns = getattr(api, base_name) + assert isinstance(ns, plugable.NameSpace) + assert read_only(api, base_name) is ns + assert len(ns) == 3 + for p in xrange(3): + plugin_name = get_plugin(b, p) + proxy = ns[plugin_name] + assert isinstance(proxy, plugable.PluginProxy) + assert proxy.name == plugin_name + assert read_only(ns, plugin_name) is proxy + assert read_only(proxy, 'method')(7) == 7 + b + + # Test that calling finilize again raises AssertionError: + e = raises(StandardError, api.finalize) + assert str(e) == 'API.finalize() already called', str(e) + + # Test with base class that doesn't request a proxy + class NoProxy(plugable.Plugin): + __proxy__ = False + api = plugable.API(NoProxy) + api.env.mode = 'unit_test' + class plugin0(NoProxy): + pass + api.register(plugin0) + class plugin1(NoProxy): + pass + api.register(plugin1) + api.finalize() + names = ['plugin0', 'plugin1'] + assert list(api.NoProxy) == names + for name in names: + plugin = api.NoProxy[name] + assert getattr(api.NoProxy, name) is plugin + assert isinstance(plugin, plugable.Plugin) + assert not isinstance(plugin, plugable.PluginProxy) + + def test_bootstrap(self): + """ + Test the `ipalib.plugable.API.bootstrap` method. + """ + (o, home) = create_test_api() + assert o.env._isdone('_bootstrap') is False + assert o.env._isdone('_finalize_core') is False + assert o.isdone('bootstrap') is False + o.bootstrap(my_test_override='Hello, world!') + assert o.isdone('bootstrap') is True + assert o.env._isdone('_bootstrap') is True + assert o.env._isdone('_finalize_core') is True + assert o.env.my_test_override == 'Hello, world!' + e = raises(StandardError, o.bootstrap) + assert str(e) == 'API.bootstrap() already called' + + def test_load_plugins(self): + """ + Test the `ipalib.plugable.API.load_plugins` method. + """ + (o, home) = create_test_api() + assert o.isdone('bootstrap') is False + assert o.isdone('load_plugins') is False + o.load_plugins() + assert o.isdone('bootstrap') is True + assert o.isdone('load_plugins') is True + e = raises(StandardError, o.load_plugins) + assert str(e) == 'API.load_plugins() already called' diff --git a/tests/test_ipalib/test_request.py b/tests/test_ipalib/test_request.py new file mode 100644 index 00000000..f26c270a --- /dev/null +++ b/tests/test_ipalib/test_request.py @@ -0,0 +1,161 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty contextrmation +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.request` module. +""" + +import threading +import locale +from tests.util import raises, assert_equal +from tests.util import TempDir, dummy_ugettext, dummy_ungettext +from ipalib.constants import OVERRIDE_ERROR +from ipalib import request + + +def test_ugettext(): + """ + Test the `ipalib.request.ugettext` function. + """ + f = request.ugettext + context = request.context + message = 'Hello, world!' + + # Test with no context.ugettext: + assert not hasattr(context, 'ugettext') + assert_equal(f(message), u'Hello, world!') + + # Test with dummy context.ugettext: + assert not hasattr(context, 'ugettext') + dummy = dummy_ugettext() + context.ugettext = dummy + assert f(message) is dummy.translation + assert dummy.message is message + + # Cleanup + del context.ugettext + assert not hasattr(context, 'ugettext') + + +def test_ungettext(): + """ + Test the `ipalib.request.ungettext` function. + """ + f = request.ungettext + context = request.context + singular = 'Goose' + plural = 'Geese' + + # Test with no context.ungettext: + assert not hasattr(context, 'ungettext') + assert_equal(f(singular, plural, 1), u'Goose') + assert_equal(f(singular, plural, 2), u'Geese') + + # Test singular with dummy context.ungettext + assert not hasattr(context, 'ungettext') + dummy = dummy_ungettext() + context.ungettext = dummy + assert f(singular, plural, 1) is dummy.translation_singular + assert dummy.singular is singular + assert dummy.plural is plural + assert dummy.n == 1 + del context.ungettext + assert not hasattr(context, 'ungettext') + + # Test plural with dummy context.ungettext + assert not hasattr(context, 'ungettext') + dummy = dummy_ungettext() + context.ungettext = dummy + assert f(singular, plural, 2) is dummy.translation_plural + assert dummy.singular is singular + assert dummy.plural is plural + assert dummy.n == 2 + del context.ungettext + assert not hasattr(context, 'ungettext') + + +def test_set_languages(): + """ + Test the `ipalib.request.set_languages` function. + """ + f = request.set_languages + c = request.context + langs = ('ru', 'en') + + # Test that StandardError is raised if languages has already been set: + assert not hasattr(c, 'languages') + c.languages = None + e = raises(StandardError, f, *langs) + assert str(e) == OVERRIDE_ERROR % ('context', 'languages', None, langs) + del c.languages + + # Test setting the languages: + assert not hasattr(c, 'languages') + f(*langs) + assert c.languages == langs + del c.languages + + # Test setting language from locale.getdefaultlocale() + assert not hasattr(c, 'languages') + f() + assert c.languages == locale.getdefaultlocale()[:1] + del c.languages + assert not hasattr(c, 'languages') + + +def test_create_translation(): + """ + Test the `ipalib.request.create_translation` function. + """ + f = request.create_translation + c = request.context + t = TempDir() + + # Test that StandardError is raised if ugettext or ungettext: + assert not (hasattr(c, 'ugettext') or hasattr(c, 'ungettext')) + for name in ('ugettext', 'ungettext'): + setattr(c, name, None) + e = raises(StandardError, f, 'ipa', None) + assert str(e) == ( + 'create_translation() already called in thread %r' % + threading.currentThread().getName() + ) + delattr(c, name) + + # Test using default language: + assert not hasattr(c, 'ugettext') + assert not hasattr(c, 'ungettext') + assert not hasattr(c, 'languages') + f('ipa', t.path) + assert hasattr(c, 'ugettext') + assert hasattr(c, 'ungettext') + assert c.languages == locale.getdefaultlocale()[:1] + del c.ugettext + del c.ungettext + del c.languages + + # Test using explicit languages: + langs = ('de', 'es') + f('ipa', t.path, *langs) + assert hasattr(c, 'ugettext') + assert hasattr(c, 'ungettext') + assert c.languages == langs + del c.ugettext + del c.ungettext + del c.languages diff --git a/tests/test_ipalib/test_rpc.py b/tests/test_ipalib/test_rpc.py new file mode 100644 index 00000000..296e9bc1 --- /dev/null +++ b/tests/test_ipalib/test_rpc.py @@ -0,0 +1,249 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.rpc` module. +""" + +import threading +from xmlrpclib import Binary, Fault, dumps, loads +from tests.util import raises, assert_equal, PluginTester, DummyClass +from tests.data import binary_bytes, utf8_bytes, unicode_str +from ipalib.frontend import Command +from ipalib.request import context +from ipalib import rpc, errors2 + + +std_compound = (binary_bytes, utf8_bytes, unicode_str) + + +def dump_n_load(value): + (param, method) = loads( + dumps((value,), allow_none=True) + ) + return param[0] + + +def round_trip(value): + return rpc.xml_unwrap( + dump_n_load(rpc.xml_wrap(value)) + ) + + +def test_round_trip(): + """ + Test `ipalib.rpc.xml_wrap` and `ipalib.rpc.xml_unwrap`. + + This tests the two functions together with ``xmlrpclib.dumps()`` and + ``xmlrpclib.loads()`` in a full wrap/dumps/loads/unwrap round trip. + """ + # We first test that our assumptions about xmlrpclib module in the Python + # standard library are correct: + assert_equal(dump_n_load(utf8_bytes), unicode_str) + assert_equal(dump_n_load(unicode_str), unicode_str) + assert_equal(dump_n_load(Binary(binary_bytes)).data, binary_bytes) + assert isinstance(dump_n_load(Binary(binary_bytes)), Binary) + assert type(dump_n_load('hello')) is str + assert type(dump_n_load(u'hello')) is str + assert_equal(dump_n_load(''), '') + assert_equal(dump_n_load(u''), '') + assert dump_n_load(None) is None + + # Now we test our wrap and unwrap methods in combination with dumps, loads: + # All str should come back str (because they get wrapped in + # xmlrpclib.Binary(). All unicode should come back unicode because str + # explicity get decoded by rpc.xml_unwrap() if they weren't already + # decoded by xmlrpclib.loads(). + assert_equal(round_trip(utf8_bytes), utf8_bytes) + assert_equal(round_trip(unicode_str), unicode_str) + assert_equal(round_trip(binary_bytes), binary_bytes) + assert type(round_trip('hello')) is str + assert type(round_trip(u'hello')) is unicode + assert_equal(round_trip(''), '') + assert_equal(round_trip(u''), u'') + assert round_trip(None) is None + compound = [utf8_bytes, None, binary_bytes, (None, unicode_str), + dict(utf8=utf8_bytes, chars=unicode_str, data=binary_bytes) + ] + assert round_trip(compound) == tuple(compound) + + +def test_xml_wrap(): + """ + Test the `ipalib.rpc.xml_wrap` function. + """ + f = rpc.xml_wrap + assert f([]) == tuple() + assert f({}) == dict() + b = f('hello') + assert isinstance(b, Binary) + assert b.data == 'hello' + u = f(u'hello') + assert type(u) is unicode + assert u == u'hello' + value = f([dict(one=False, two=u'hello'), None, 'hello']) + + +def test_xml_unwrap(): + """ + Test the `ipalib.rpc.xml_unwrap` function. + """ + f = rpc.xml_unwrap + assert f([]) == tuple() + assert f({}) == dict() + value = f(Binary(utf8_bytes)) + assert type(value) is str + assert value == utf8_bytes + assert f(utf8_bytes) == unicode_str + assert f(unicode_str) == unicode_str + value = f([True, Binary('hello'), dict(one=1, two=utf8_bytes, three=None)]) + assert value == (True, 'hello', dict(one=1, two=unicode_str, three=None)) + assert type(value[1]) is str + assert type(value[2]['two']) is unicode + + +def test_xml_dumps(): + """ + Test the `ipalib.rpc.xml_dumps` function. + """ + f = rpc.xml_dumps + params = (binary_bytes, utf8_bytes, unicode_str, None) + + # Test serializing an RPC request: + data = f(params, 'the_method') + (p, m) = loads(data) + assert_equal(m, u'the_method') + assert type(p) is tuple + assert rpc.xml_unwrap(p) == params + + # Test serializing an RPC response: + data = f((params,), methodresponse=True) + (tup, m) = loads(data) + assert m is None + assert len(tup) == 1 + assert type(tup) is tuple + assert rpc.xml_unwrap(tup[0]) == params + + # Test serializing an RPC response containing a Fault: + fault = Fault(69, unicode_str) + data = f(fault, methodresponse=True) + e = raises(Fault, loads, data) + assert e.faultCode == 69 + assert_equal(e.faultString, unicode_str) + + +def test_xml_loads(): + """ + Test the `ipalib.rpc.xml_loads` function. + """ + f = rpc.xml_loads + params = (binary_bytes, utf8_bytes, unicode_str, None) + wrapped = rpc.xml_wrap(params) + + # Test un-serializing an RPC request: + data = dumps(wrapped, 'the_method', allow_none=True) + (p, m) = f(data) + assert_equal(m, u'the_method') + assert_equal(p, params) + + # Test un-serializing an RPC response: + data = dumps((wrapped,), methodresponse=True, allow_none=True) + (tup, m) = f(data) + assert m is None + assert len(tup) == 1 + assert type(tup) is tuple + assert_equal(tup[0], params) + + # Test un-serializing an RPC response containing a Fault: + fault = Fault(69, unicode_str) + data = dumps(fault, methodresponse=True, allow_none=True) + e = raises(Fault, f, data) + assert e.faultCode == 69 + assert_equal(e.faultString, unicode_str) + + +class test_xmlclient(PluginTester): + """ + Test the `ipalib.rpc.xmlclient` plugin. + """ + _plugin = rpc.xmlclient + + def test_forward(self): + """ + Test the `ipalib.rpc.xmlclient.forward` method. + """ + class user_add(Command): + pass + + # Test that ValueError is raised when forwarding a command that is not + # in api.Command: + (o, api, home) = self.instance('Backend', in_server=False) + e = raises(ValueError, o.forward, 'user_add') + assert str(e) == '%s.forward(): %r not in api.Command' % ( + 'xmlclient', 'user_add' + ) + + # Test that StandardError is raised when context.xmlconn does not exist: + (o, api, home) = self.instance('Backend', user_add, in_server=False) + e = raises(StandardError, o.forward, 'user_add') + assert str(e) == '%s.forward(%r): need context.xmlconn in thread %r' % ( + 'xmlclient', 'user_add', threading.currentThread().getName() + ) + + args = (binary_bytes, utf8_bytes, unicode_str) + kw = dict(one=binary_bytes, two=utf8_bytes, three=unicode_str) + params = args + (kw,) + result = (unicode_str, binary_bytes, utf8_bytes) + context.xmlconn = DummyClass( + ( + 'user_add', + (rpc.xml_wrap(params),), + {}, + rpc.xml_wrap(result), + ), + ( + 'user_add', + (rpc.xml_wrap(params),), + {}, + Fault(3005, u"'four' is required"), # RequirementError + ), + ( + 'user_add', + (rpc.xml_wrap(params),), + {}, + Fault(700, u'no such error'), # There is no error 700 + ), + + ) + + # Test with a successful return value: + assert o.forward('user_add', *args, **kw) == result + + # Test with an errno the client knows: + e = raises(errors2.RequirementError, o.forward, 'user_add', *args, **kw) + assert_equal(e.message, u"'four' is required") + + # Test with an errno the client doesn't know + e = raises(errors2.UnknownError, o.forward, 'user_add', *args, **kw) + assert_equal(e.code, 700) + assert_equal(e.error, u'no such error') + + assert context.xmlconn._calledall() is True + + del context.xmlconn diff --git a/tests/test_ipalib/test_util.py b/tests/test_ipalib/test_util.py new file mode 100644 index 00000000..6729fcda --- /dev/null +++ b/tests/test_ipalib/test_util.py @@ -0,0 +1,61 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib.util` module. +""" + +from tests.util import raises +from ipalib import util + + +def test_xmlrpc_marshal(): + """ + Test the `ipalib.util.xmlrpc_marshal` function. + """ + f = util.xmlrpc_marshal + assert f() == ({},) + assert f('one', 'two') == ({}, 'one', 'two') + assert f(one=1, two=2) == (dict(one=1, two=2),) + assert f('one', 'two', three=3, four=4) == \ + (dict(three=3, four=4), 'one', 'two') + + +def test_xmlrpc_unmarshal(): + """ + Test the `ipalib.util.xmlrpc_unmarshal` function. + """ + f = util.xmlrpc_unmarshal + assert f() == (tuple(), {}) + assert f({}, 'one', 'two') == (('one', 'two'), {}) + assert f(dict(one=1, two=2)) == (tuple(), dict(one=1, two=2)) + assert f(dict(three=3, four=4), 'one', 'two') == \ + (('one', 'two'), dict(three=3, four=4)) + + +def test_make_repr(): + """ + Test the `ipalib.util.make_repr` function. + """ + f = util.make_repr + assert f('my') == 'my()' + assert f('my', True, u'hello') == "my(True, u'hello')" + assert f('my', one=1, two='two') == "my(one=1, two='two')" + assert f('my', None, 3, dog='animal', apple='fruit') == \ + "my(None, 3, apple='fruit', dog='animal')" diff --git a/tests/test_ipaserver/__init__.py b/tests/test_ipaserver/__init__.py new file mode 100644 index 00000000..56a6c533 --- /dev/null +++ b/tests/test_ipaserver/__init__.py @@ -0,0 +1,22 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Sub-package containing unit tests for `ipaserver` package. +""" diff --git a/tests/test_ipaserver/test_rpcserver.py b/tests/test_ipaserver/test_rpcserver.py new file mode 100644 index 00000000..48c1d36e --- /dev/null +++ b/tests/test_ipaserver/test_rpcserver.py @@ -0,0 +1,79 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipaserver.rpc` module. +""" + +from tests.util import create_test_api, raises, PluginTester +from tests.data import unicode_str +from ipalib import errors2, Command +from ipaserver import rpcserver + + +def test_params_2_args_options(): + """ + Test the `ipaserver.rpcserver.params_2_args_options` function. + """ + f = rpcserver.params_2_args_options + args = ('Hello', u'world!') + options = dict(one=1, two=u'Two', three='Three') + assert f(tuple()) == (tuple(), dict()) + assert f(args) == (args, dict()) + assert f((options,)) == (tuple(), options) + assert f(args + (options,)) == (args, options) + assert f((options,) + args) == ((options,) + args, dict()) + + +class test_xmlserver(PluginTester): + """ + Test the `ipaserver.rpcserver.xmlserver` plugin. + """ + + _plugin = rpcserver.xmlserver + + def test_dispatch(self): + """ + Test the `ipaserver.rpcserver.xmlserver.dispatch` method. + """ + (o, api, home) = self.instance('Backend', in_server=True) + e = raises(errors2.CommandError, o.dispatch, 'echo', tuple()) + assert e.name == 'echo' + + class echo(Command): + takes_args = ['arg1', 'arg2+'] + takes_options = ['option1?', 'option2?'] + def execute(self, *args, **options): + assert type(args[1]) is tuple + return args + (options,) + + (o, api, home) = self.instance('Backend', echo, in_server=True) + def call(params): + response = o.dispatch('echo', params) + assert type(response) is tuple and len(response) == 1 + return response[0] + arg1 = unicode_str + arg2 = (u'Hello', unicode_str, u'world!') + options = dict(option1=u'How are you?', option2=unicode_str) + assert call((arg1, arg2, options)) == (arg1, arg2, options) + assert call((arg1,) + arg2 + (options,)) == (arg1, arg2, options) + + + def test_execute(self): + (o, api, home) = self.instance('Backend', in_server=True) diff --git a/tests/test_ipawebui/__init__.py b/tests/test_ipawebui/__init__.py new file mode 100644 index 00000000..f739a856 --- /dev/null +++ b/tests/test_ipawebui/__init__.py @@ -0,0 +1,21 @@ +# Authors: Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Sub-package containing unit tests for `ipawebui` package. +""" diff --git a/tests/test_ipawebui/test_controllers.py b/tests/test_ipawebui/test_controllers.py new file mode 100644 index 00000000..e236d1a0 --- /dev/null +++ b/tests/test_ipawebui/test_controllers.py @@ -0,0 +1,70 @@ +# Authors: Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipawebui.controller` module. +""" + +from ipawebui import controller + + + +class test_Controller(object): + """ + Test the `controller.Controller` class. + """ + + def test_init(self): + """ + Test the `ipawebui.controller.Controller.__init__()` method. + """ + o = controller.Controller() + assert o.template is None + template = 'The template.' + o = controller.Controller(template) + assert o.template is template + + def test_output_xhtml(self): + """ + Test the `ipawebui.controller.Controller.output_xhtml` method. + """ + class Template(object): + def __init__(self): + self.calls = 0 + self.kw = {} + + def serialize(self, **kw): + self.calls += 1 + self.kw = kw + return dict(kw) + + d = dict(output='xhtml-strict', format='pretty') + t = Template() + o = controller.Controller(t) + assert o.output_xhtml() == d + assert t.calls == 1 + + def test_output_json(self): + """ + Test the `ipawebui.controller.Controller.output_json` method. + """ + o = controller.Controller() + assert o.output_json() == '{}' + e = '{\n "age": 27, \n "first": "John", \n "last": "Doe"\n}' + j = o.output_json(last='Doe', first='John', age=27) + assert j == e diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 00000000..7d7038c1 --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,148 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `tests.util` module. +""" + +import util + + +class Prop(object): + def __init__(self, *ops): + self.__ops = frozenset(ops) + self.__prop = 'prop value' + + def __get_prop(self): + if 'get' not in self.__ops: + raise AttributeError('get prop') + return self.__prop + + def __set_prop(self, value): + if 'set' not in self.__ops: + raise AttributeError('set prop') + self.__prop = value + + def __del_prop(self): + if 'del' not in self.__ops: + raise AttributeError('del prop') + self.__prop = None + + prop = property(__get_prop, __set_prop, __del_prop) + + +def test_yes_raised(): + f = util.raises + + class SomeError(Exception): + pass + + class AnotherError(Exception): + pass + + def callback1(): + 'raises correct exception' + raise SomeError() + + def callback2(): + 'raises wrong exception' + raise AnotherError() + + def callback3(): + 'raises no exception' + + f(SomeError, callback1) + + raised = False + try: + f(SomeError, callback2) + except AnotherError: + raised = True + assert raised + + raised = False + try: + f(SomeError, callback3) + except util.ExceptionNotRaised: + raised = True + assert raised + + +def test_no_set(): + # Tests that it works when prop cannot be set: + util.no_set(Prop('get', 'del'), 'prop') + + # Tests that ExceptionNotRaised is raised when prop *can* be set: + raised = False + try: + util.no_set(Prop('set'), 'prop') + except util.ExceptionNotRaised: + raised = True + assert raised + + +def test_no_del(): + # Tests that it works when prop cannot be deleted: + util.no_del(Prop('get', 'set'), 'prop') + + # Tests that ExceptionNotRaised is raised when prop *can* be set: + raised = False + try: + util.no_del(Prop('del'), 'prop') + except util.ExceptionNotRaised: + raised = True + assert raised + + +def test_read_only(): + # Test that it works when prop is read only: + assert util.read_only(Prop('get'), 'prop') == 'prop value' + + # Test that ExceptionNotRaised is raised when prop can be set: + raised = False + try: + util.read_only(Prop('get', 'set'), 'prop') + except util.ExceptionNotRaised: + raised = True + assert raised + + # Test that ExceptionNotRaised is raised when prop can be deleted: + raised = False + try: + util.read_only(Prop('get', 'del'), 'prop') + except util.ExceptionNotRaised: + raised = True + assert raised + + # Test that ExceptionNotRaised is raised when prop can be both set and + # deleted: + raised = False + try: + util.read_only(Prop('get', 'del'), 'prop') + except util.ExceptionNotRaised: + raised = True + assert raised + + # Test that AttributeError is raised when prop can't be read: + raised = False + try: + util.read_only(Prop(), 'prop') + except AttributeError: + raised = True + assert raised diff --git a/tests/test_xmlrpc/__init__.py b/tests/test_xmlrpc/__init__.py new file mode 100644 index 00000000..043007b5 --- /dev/null +++ b/tests/test_xmlrpc/__init__.py @@ -0,0 +1,22 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Sub-package containing unit tests for `xmlrpc` package. +""" diff --git a/tests/test_xmlrpc/test_automount_plugin.py b/tests/test_xmlrpc/test_automount_plugin.py new file mode 100644 index 00000000..522ee689 --- /dev/null +++ b/tests/test_xmlrpc/test_automount_plugin.py @@ -0,0 +1,243 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_automount' module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +class test_Service(XMLRPC_test): + """ + Test the `f_automount` plugin. + """ + mapname='testmap' + keyname='testkey' + keyname2='secondkey' + description='description of map' + info='ro' + map_kw={'automountmapname': mapname, 'description': description} + key_kw={'automountmapname': mapname, 'automountkey': keyname, 'automountinformation': info} + key_kw2={'automountmapname': mapname, 'automountkey': keyname2, 'automountinformation': info} + + def test_add_1map(self): + """ + Test adding a map `xmlrpc.automount_addmap` method. + """ + res = api.Command['automount_addmap'](**self.map_kw) + assert res + assert res.get('automountmapname','') == self.mapname + + def test_add_2key(self): + """ + Test adding a key using `xmlrpc.automount_addkey` method. + """ + res = api.Command['automount_addkey'](**self.key_kw2) + assert res + assert res.get('automountkey','') == self.keyname2 + + def test_add_3key(self): + """ + Test adding a key using `xmlrpc.automount_addkey` method. + """ + res = api.Command['automount_addkey'](**self.key_kw) + assert res + assert res.get('automountkey','') == self.keyname + + def test_add_4key(self): + """ + Test adding a duplicate key using `xmlrpc.automount_addkey` method. + """ + try: + res = api.Command['automount_addkey'](**self.key_kw) + except errors.DuplicateEntry: + pass + else: + assert False + + def test_doshowmap(self): + """ + Test the `xmlrpc.automount_showmap` method. + """ + res = api.Command['automount_showmap'](self.mapname) + assert res + assert res.get('automountmapname','') == self.mapname + + def test_findmap(self): + """ + Test the `xmlrpc.automount_findmap` method. + """ + res = api.Command['automount_findmap'](self.mapname) + assert res + assert len(res) == 2 + assert res[1].get('automountmapname','') == self.mapname + + def test_doshowkey(self): + """ + Test the `xmlrpc.automount_showkey` method. + """ + showkey_kw={'automountmapname': self.mapname, 'automountkey': self.keyname} + res = api.Command['automount_showkey'](**showkey_kw) + assert res + assert res.get('automountkey','') == self.keyname + assert res.get('automountinformation','') == self.info + + def test_findkey(self): + """ + Test the `xmlrpc.automount_findkey` method. + """ + res = api.Command['automount_findkey'](self.keyname) + assert res + assert len(res) == 2 + assert res[1].get('automountkey','') == self.keyname + assert res[1].get('automountinformation','') == self.info + + def test_modkey(self): + """ + Test the `xmlrpc.automount_modkey` method. + """ + self.key_kw['automountinformation'] = 'rw' + self.key_kw['description'] = 'new description' + res = api.Command['automount_modkey'](**self.key_kw) + assert res + assert res.get('automountkey','') == self.keyname + assert res.get('automountinformation','') == 'rw' + assert res.get('description','') == 'new description' + + def test_modmap(self): + """ + Test the `xmlrpc.automount_modmap` method. + """ + self.map_kw['description'] = 'new description' + res = api.Command['automount_modmap'](**self.map_kw) + assert res + assert res.get('automountmapname','') == self.mapname + assert res.get('description','') == 'new description' + + def test_remove1key(self): + """ + Test the `xmlrpc.automount_delkey` method. + """ + delkey_kw={'automountmapname': self.mapname, 'automountkey': self.keyname} + res = api.Command['automount_delkey'](**delkey_kw) + assert res == True + + # Verify that it is gone + try: + res = api.Command['automount_showkey'](**delkey_kw) + except errors.NotFound: + pass + else: + assert False + + def test_remove2map(self): + """ + Test the `xmlrpc.automount_delmap` method. + """ + res = api.Command['automount_delmap'](self.mapname) + assert res == True + + # Verify that it is gone + try: + res = api.Command['automount_showmap'](self.mapname) + except errors.NotFound: + pass + else: + assert False + + def test_remove3map(self): + """ + Test that the `xmlrpc.automount_delmap` method removes all keys + """ + # Verify that the second key we added is gone + key_kw={'automountmapname': self.mapname, 'automountkey': self.keyname2} + try: + res = api.Command['automount_showkey'](**key_kw) + except errors.NotFound: + pass + else: + assert False + +class test_Indirect(XMLRPC_test): + """ + Test the `f_automount` plugin Indirect map function. + """ + mapname='auto.home' + keyname='/home' + parentmap='auto.master' + description='Home directories' + map_kw={'automountkey': keyname, 'parentmap': parentmap, 'description': description} + + def test_add_indirect(self): + """ + Test adding an indirect map. + """ + res = api.Command['automount_addindirectmap'](self.mapname, **self.map_kw) + assert res + assert res.get('automountinformation','') == self.mapname + + def test_doshowkey(self): + """ + Test the `xmlrpc.automount_showkey` method. + """ + showkey_kw={'automountmapname': self.parentmap, 'automountkey': self.keyname} + res = api.Command['automount_showkey'](**showkey_kw) + assert res + assert res.get('automountkey','') == self.keyname + + def test_remove_key(self): + """ + Remove the indirect key /home + """ + delkey_kw={'automountmapname': self.parentmap, 'automountkey': self.keyname} + res = api.Command['automount_delkey'](**delkey_kw) + assert res == True + + # Verify that it is gone + try: + res = api.Command['automount_showkey'](**delkey_kw) + except errors.NotFound: + pass + else: + assert False + + def test_remove_map(self): + """ + Remove the indirect map for auto.home + """ + res = api.Command['automount_delmap'](self.mapname) + assert res == True + + # Verify that it is gone + try: + res = api.Command['automount_showmap'](self.mapname) + except errors.NotFound: + pass + else: + assert False + diff --git a/tests/test_xmlrpc/test_group_plugin.py b/tests/test_xmlrpc/test_group_plugin.py new file mode 100644 index 00000000..2b16cc8a --- /dev/null +++ b/tests/test_xmlrpc/test_group_plugin.py @@ -0,0 +1,178 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_group` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +class test_Group(XMLRPC_test): + """ + Test the `f_group` plugin. + """ + cn='testgroup' + cn2='testgroup2' + description='This is a test' + kw={'description':description,'cn':cn} + + def test_add(self): + """ + Test the `xmlrpc.group_add` method. + """ + res = api.Command['group_add'](**self.kw) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + + def test_add2(self): + """ + Test the `xmlrpc.group_add` method duplicate detection. + """ + try: + res = api.Command['group_add'](**self.kw) + except errors.DuplicateEntry: + pass + + def test_add2(self): + """ + Test the `xmlrpc.group_add` method. + """ + self.kw['cn'] = self.cn2 + res = api.Command['group_add'](**self.kw) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn2 + + def test_add_member(self): + """ + Test the `xmlrpc.group_add_member` method. + """ + kw={} + kw['groups'] = self.cn2 + res = api.Command['group_add_member'](self.cn, **kw) + assert res == [] + + def test_add_member2(self): + """ + Test the `xmlrpc.group_add_member` with a non-existent member + """ + kw={} + kw['groups'] = "notfound" + res = api.Command['group_add_member'](self.cn, **kw) + # an error isn't thrown, the list of failed members is returned + assert res != [] + + def test_doshow(self): + """ + Test the `xmlrpc.group_show` method. + """ + res = api.Command['group_show'](self.cn) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + assert res.get('member','').startswith('cn=%s' % self.cn2) + + def test_find(self): + """ + Test the `xmlrpc.group_find` method. + """ + res = api.Command['group_find'](self.cn) + assert res + assert len(res) == 3 + assert res[1].get('description','') == self.description + assert res[1].get('cn','') == self.cn + + def test_mod(self): + """ + Test the `xmlrpc.group_mod` method. + """ + modkw = self.kw + modkw['cn'] = self.cn + modkw['description'] = 'New description' + res = api.Command['group_mod'](**modkw) + assert res + assert res.get('description','') == 'New description' + + # Ok, double-check that it was changed + res = api.Command['group_show'](self.cn) + assert res + assert res.get('description','') == 'New description' + assert res.get('cn','') == self.cn + + def test_remove_member(self): + """ + Test the `xmlrpc.group_remove_member` method. + """ + kw={} + kw['groups'] = self.cn2 + res = api.Command['group_remove_member'](self.cn, **kw) + + res = api.Command['group_show'](self.cn) + assert res + assert res.get('member','') == '' + + def test_remove_member2(self): + """ + Test the `xmlrpc.group_remove_member` method with non-member + """ + kw={} + kw['groups'] = "notfound" + # an error isn't thrown, the list of failed members is returned + res = api.Command['group_remove_member'](self.cn, **kw) + assert res != [] + + def test_remove_x(self): + """ + Test the `xmlrpc.group_del` method. + """ + res = api.Command['group_del'](self.cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['group_show'](self.cn) + except errors.NotFound: + pass + else: + assert False + + def test_remove_x2(self): + """ + Test the `xmlrpc.group_del` method. + """ + res = api.Command['group_del'](self.cn2) + assert res == True + + # Verify that it is gone + try: + res = api.Command['group_show'](self.cn2) + except errors.NotFound: + pass + else: + assert False diff --git a/tests/test_xmlrpc/test_host_plugin.py b/tests/test_xmlrpc/test_host_plugin.py new file mode 100644 index 00000000..515cd703 --- /dev/null +++ b/tests/test_xmlrpc/test_host_plugin.py @@ -0,0 +1,128 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_host` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +class test_Host(XMLRPC_test): + """ + Test the `f_host` plugin. + """ + cn='ipaexample.%s' % api.env.domain + description='Test host' + localityname='Undisclosed location' + kw={'cn': cn, 'description': description, 'localityname': localityname} + + def test_add(self): + """ + Test the `xmlrpc.host_add` method. + """ + res = api.Command['host_add'](**self.kw) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + assert res.get('l','') == self.localityname + + def test_doshow_all(self): + """ + Test the `xmlrpc.host_show` method with all attributes. + """ + kw={'cn':self.cn, 'all': True} + res = api.Command['host_show'](**kw) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + assert res.get('l','') == self.localityname + + def test_doshow_minimal(self): + """ + Test the `xmlrpc.host_show` method with default attributes. + """ + kw={'cn':self.cn} + res = api.Command['host_show'](**kw) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + assert res.get('localityname','') == self.localityname + + def test_find_all(self): + """ + Test the `xmlrpc.host_find` method with all attributes. + """ + kw={'cn':self.cn, 'all': True} + res = api.Command['host_find'](**kw) + assert res + assert len(res) == 2 + assert res[1].get('description','') == self.description + assert res[1].get('cn','') == self.cn + assert res[1].get('l','') == self.localityname + + def test_find_minimal(self): + """ + Test the `xmlrpc.host_find` method with default attributes. + """ + res = api.Command['host_find'](self.cn) + assert res + assert len(res) == 2 + assert res[1].get('description','') == self.description + assert res[1].get('cn','') == self.cn + assert res[1].get('localityname','') == self.localityname + + def test_mod(self): + """ + Test the `xmlrpc.host_mod` method. + """ + newdesc='Updated host' + modkw={'cn': self.cn, 'description': newdesc} + res = api.Command['host_mod'](**modkw) + assert res + assert res.get('description','') == newdesc + + # Ok, double-check that it was changed + res = api.Command['host_show'](self.cn) + assert res + assert res.get('description','') == newdesc + assert res.get('cn','') == self.cn + + def test_remove(self): + """ + Test the `xmlrpc.host_del` method. + """ + res = api.Command['host_del'](self.cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['host_show'](self.cn) + except errors.NotFound: + pass + else: + assert False diff --git a/tests/test_xmlrpc/test_hostgroup_plugin.py b/tests/test_xmlrpc/test_hostgroup_plugin.py new file mode 100644 index 00000000..9180c1dd --- /dev/null +++ b/tests/test_xmlrpc/test_hostgroup_plugin.py @@ -0,0 +1,149 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_hostgroup` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +class test_Host(XMLRPC_test): + """ + Test the `f_hostgroup` plugin. + """ + cn='testgroup' + description='Test host group' + kw={'cn': cn, 'description': description} + + host_cn='ipaexample.%s' % api.env.domain + host_description='Test host' + host_localityname='Undisclosed location' + + def test_add(self): + """ + Test the `xmlrpc.hostgroup_add` method. + """ + res = api.Command['hostgroup_add'](**self.kw) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + + def test_addhost(self): + """ + Add a host to test add/remove member. + """ + kw={'cn': self.host_cn, 'description': self.host_description, 'localityname': self.host_localityname} + res = api.Command['host_add'](**kw) + assert res + assert res.get('description','') == self.host_description + assert res.get('cn','') == self.host_cn + + def test_addmember(self): + """ + Test the `xmlrpc.hostgroup_add_member` method. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['hostgroup_add_member'](self.cn, **kw) + assert res == [] + + def test_doshow(self): + """ + Test the `xmlrpc.hostgroup_show` method. + """ + res = api.Command['hostgroup_show'](self.cn) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + assert res.get('member','').startswith('cn=%s' % self.host_cn) + + def test_find(self): + """ + Test the `xmlrpc.hostgroup_find` method. + """ + res = api.Command['hostgroup_find'](self.cn) + assert res + assert len(res) == 2 + assert res[1].get('description','') == self.description + assert res[1].get('cn','') == self.cn + assert res[1].get('member','').startswith('cn=%s' % self.host_cn) + + def test_mod(self): + """ + Test the `xmlrpc.hostgroup_mod` method. + """ + newdesc='Updated host group' + modkw={'cn': self.cn, 'description': newdesc} + res = api.Command['hostgroup_mod'](**modkw) + assert res + assert res.get('description','') == newdesc + + # Ok, double-check that it was changed + res = api.Command['hostgroup_show'](self.cn) + assert res + assert res.get('description','') == newdesc + assert res.get('cn','') == self.cn + + def test_member_remove(self): + """ + Test the `xmlrpc.hostgroup_remove_member` method. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['hostgroup_remove_member'](self.cn, **kw) + assert res == [] + + def test_remove(self): + """ + Test the `xmlrpc.hostgroup_del` method. + """ + res = api.Command['hostgroup_del'](self.cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['hostgroup_show'](self.cn) + except errors.NotFound: + pass + else: + assert False + + def test_removehost(self): + """ + Test the `xmlrpc.host_del` method. + """ + res = api.Command['host_del'](self.host_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['host_show'](self.host_cn) + except errors.NotFound: + pass + else: + assert False diff --git a/tests/test_xmlrpc/test_netgroup_plugin.py b/tests/test_xmlrpc/test_netgroup_plugin.py new file mode 100644 index 00000000..3d3e4aff --- /dev/null +++ b/tests/test_xmlrpc/test_netgroup_plugin.py @@ -0,0 +1,320 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_netgroup` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +def is_member_of(members, candidate): + if not isinstance(members, list): + members = [members] + for m in members: + if m.startswith(candidate): + return True + return False + +class test_Netgroup(XMLRPC_test): + """ + Test the `f_netgroup` plugin. + """ + ng_cn='ng1' + ng_description='Netgroup' + ng_kw={'cn': ng_cn, 'description': ng_description} + + host_cn='ipaexample.%s' % api.env.domain + host_description='Test host' + host_localityname='Undisclosed location' + host_kw={'cn': host_cn, 'description': host_description, 'localityname': host_localityname} + + hg_cn='ng1' + hg_description='Netgroup' + hg_kw={'cn': hg_cn, 'description': hg_description} + + user_uid='jexample' + user_givenname='Jim' + user_sn='Example' + user_home='/home/%s' % user_uid + user_kw={'givenname':user_givenname,'sn':user_sn,'uid':user_uid,'homedirectory':user_home} + + group_cn='testgroup' + group_description='This is a test' + group_kw={'description':group_description,'cn':group_cn} + + def test_add(self): + """ + Test the `xmlrpc.netgroup_add` method. + """ + res = api.Command['netgroup_add'](**self.ng_kw) + assert res + assert res.get('description','') == self.ng_description + assert res.get('cn','') == self.ng_cn + + def test_adddata(self): + """ + Add the data needed to do additional testing. + """ + + # Add a host + res = api.Command['host_add'](**self.host_kw) + assert res + assert res.get('description','') == self.host_description + assert res.get('cn','') == self.host_cn + + # Add a hostgroup + res = api.Command['hostgroup_add'](**self.hg_kw) + assert res + assert res.get('description','') == self.hg_description + assert res.get('cn','') == self.hg_cn + + # Add a user + res = api.Command['user_add'](**self.user_kw) + assert res + assert res.get('givenname','') == self.user_givenname + assert res.get('uid','') == self.user_uid + + # Add a group + res = api.Command['group_add'](**self.group_kw) + assert res + assert res.get('description','') == self.group_description + assert res.get('cn','') == self.group_cn + + def test_addmembers(self): + """ + Test the `xmlrpc.netgroup_add_member` method. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + + def test_addmembers2(self): + """ + Test the `xmlrpc.netgroup_add_member` method again to test dupes. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.host_cn) + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.hg_cn) + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'uid=%s' % self.user_uid) + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.group_cn) + + def test_addexternalmembers(self): + """ + Test adding external hosts + """ + kw={} + kw['hosts'] = "nosuchhost" + res = api.Command['netgroup_add_member'](self.ng_cn, **kw) + assert res == [] + res = api.Command['netgroup_show'](self.ng_cn) + assert res + assert is_member_of(res.get('externalhost',[]), kw['hosts']) + + def test_doshow(self): + """ + Test the `xmlrpc.netgroup_show` method. + """ + res = api.Command['netgroup_show'](self.ng_cn) + assert res + assert res.get('description','') == self.ng_description + assert res.get('cn','') == self.ng_cn + assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.host_cn) + assert is_member_of(res.get('memberhost',[]), 'cn=%s' % self.hg_cn) + assert is_member_of(res.get('memberuser',[]), 'uid=%s' % self.user_uid) + assert is_member_of(res.get('memberuser',[]), 'cn=%s' % self.group_cn) + + def test_find(self): + """ + Test the `xmlrpc.hostgroup_find` method. + """ + res = api.Command['netgroup_find'](self.ng_cn) + assert res + assert len(res) == 2 + assert res[1].get('description','') == self.ng_description + assert res[1].get('cn','') == self.ng_cn + + def test_mod(self): + """ + Test the `xmlrpc.hostgroup_mod` method. + """ + newdesc='Updated host group' + modkw={'cn': self.ng_cn, 'description': newdesc} + res = api.Command['netgroup_mod'](**modkw) + assert res + assert res.get('description','') == newdesc + + # Ok, double-check that it was changed + res = api.Command['netgroup_show'](self.ng_cn) + assert res + assert res.get('description','') == newdesc + assert res.get('cn','') == self.ng_cn + + def test_member_remove(self): + """ + Test the `xmlrpc.hostgroup_remove_member` method. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert res == [] + + def test_member_remove2(self): + """ + Test the `xmlrpc.netgroup_remove_member` method again to test not found. + """ + kw={} + kw['hosts'] = self.host_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.host_cn) + + kw={} + kw['hostgroups'] = self.hg_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.hg_cn) + + kw={} + kw['users'] = self.user_uid + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'uid=%s' % self.user_uid) + + kw={} + kw['groups'] = self.group_cn + res = api.Command['netgroup_remove_member'](self.ng_cn, **kw) + assert is_member_of(res, 'cn=%s' % self.group_cn) + + def test_remove(self): + """ + Test the `xmlrpc.netgroup_del` method. + """ + res = api.Command['netgroup_del'](self.ng_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['netgroup_show'](self.ng_cn) + except errors.NotFound: + pass + else: + assert False + + def test_removedata(self): + """ + Remove the test data we added + """ + # Remove the host + res = api.Command['host_del'](self.host_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['host_show'](self.host_cn) + except errors.NotFound: + pass + else: + assert False + + # Remove the hostgroup + res = api.Command['hostgroup_del'](self.hg_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['hostgroup_show'](self.hg_cn) + except errors.NotFound: + pass + else: + assert False + + # Remove the user + res = api.Command['user_del'](self.user_uid) + assert res == True + + # Verify that it is gone + try: + res = api.Command['user_show'](self.user_uid) + except errors.NotFound: + pass + else: + assert False + + # Remove the group + res = api.Command['group_del'](self.group_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['group_show'](self.group_cn) + except errors.NotFound: + pass + else: + assert False diff --git a/tests/test_xmlrpc/test_service_plugin.py b/tests/test_xmlrpc/test_service_plugin.py new file mode 100644 index 00000000..0a843d36 --- /dev/null +++ b/tests/test_xmlrpc/test_service_plugin.py @@ -0,0 +1,93 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_service` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +class test_Service(XMLRPC_test): + """ + Test the `f_service` plugin. + """ + principal='HTTP/ipatest.%s@%s' % (api.env.domain, api.env.realm) + hostprincipal='host/ipatest.%s@%s' % (api.env.domain, api.env.realm) + kw={'principal':principal} + + def test_add(self): + """ + Test adding a HTTP principal using the `xmlrpc.service_add` method. + """ + res = api.Command['service_add'](**self.kw) + assert res + assert res.get('krbprincipalname','') == self.principal + + def test_add_host(self): + """ + Test adding a host principal using `xmlrpc.service_add` method. + """ + kw={'principal':self.hostprincipal} + try: + res = api.Command['service_add'](**kw) + except errors.HostService: + pass + else: + assert False + + def test_doshow(self): + """ + Test the `xmlrpc.service_show` method. + """ + res = api.Command['service_show'](self.principal) + assert res + assert res.get('krbprincipalname','') == self.principal + + def test_find(self): + """ + Test the `xmlrpc.service_find` method. + """ + res = api.Command['service_find'](self.principal) + assert res + assert len(res) == 2 + assert res[1].get('krbprincipalname','') == self.principal + + def test_remove(self): + """ + Test the `xmlrpc.service_del` method. + """ + res = api.Command['service_del'](self.principal) + assert res == True + + # Verify that it is gone + try: + res = api.Command['service_show'](self.principal) + except errors.NotFound: + pass + else: + assert False diff --git a/tests/test_xmlrpc/test_user_plugin.py b/tests/test_xmlrpc/test_user_plugin.py new file mode 100644 index 00000000..0189aa5a --- /dev/null +++ b/tests/test_xmlrpc/test_user_plugin.py @@ -0,0 +1,151 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/f_user` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +class test_User(XMLRPC_test): + """ + Test the `f_user` plugin. + """ + uid='jexample' + givenname='Jim' + sn='Example' + home='/home/%s' % uid + principalname='%s@%s' % (uid, api.env.realm) + kw={'givenname':givenname,'sn':sn,'uid':uid,'homedirectory':home} + + def test_add(self): + """ + Test the `xmlrpc.user_add` method. + """ + res = api.Command['user_add'](**self.kw) + assert res + assert res.get('givenname','') == self.givenname + assert res.get('sn','') == self.sn + assert res.get('uid','') == self.uid + assert res.get('homedirectory','') == self.home + + def test_add2(self): + """ + Test the `xmlrpc.user_add` method duplicate detection. + """ + try: + res = api.Command['user_add'](**self.kw) + except errors.DuplicateEntry: + pass + + def test_doshow(self): + """ + Test the `xmlrpc.user_show` method. + """ + kw={'uid':self.uid, 'all': True} + res = api.Command['user_show'](**kw) + assert res + assert res.get('givenname','') == self.givenname + assert res.get('sn','') == self.sn + assert res.get('uid','') == self.uid + assert res.get('homedirectory','') == self.home + assert res.get('krbprincipalname','') == self.principalname + + def test_find_all(self): + """ + Test the `xmlrpc.user_find` method with all attributes. + """ + kw={'uid':self.uid, 'all': True} + res = api.Command['user_find'](**kw) + assert res + assert len(res) == 2 + assert res[1].get('givenname','') == self.givenname + assert res[1].get('sn','') == self.sn + assert res[1].get('uid','') == self.uid + assert res[1].get('homedirectory','') == self.home + assert res[1].get('krbprincipalname','') == self.principalname + + def test_find_minimal(self): + """ + Test the `xmlrpc.user_find` method with minimal attributes. + """ + res = api.Command['user_find'](self.uid) + assert res + assert len(res) == 2 + assert res[1].get('givenname','') == self.givenname + assert res[1].get('sn','') == self.sn + assert res[1].get('uid','') == self.uid + assert res[1].get('homedirectory','') == self.home + assert res[1].get('krbprincipalname', None) == None + + def test_lock(self): + """ + Test the `xmlrpc.user_lock` method. + """ + res = api.Command['user_lock'](self.uid) + assert res == True + + def test_lockoff(self): + """ + Test the `xmlrpc.user_unlock` method. + """ + res = api.Command['user_unlock'](self.uid) + assert res == True + + def test_mod(self): + """ + Test the `xmlrpc.user_mod` method. + """ + modkw = self.kw + modkw['givenname'] = 'Finkle' + res = api.Command['user_mod'](**modkw) + assert res + assert res.get('givenname','') == 'Finkle' + assert res.get('sn','') == self.sn + + # Ok, double-check that it was changed + res = api.Command['user_show'](self.uid) + assert res + assert res.get('givenname','') == 'Finkle' + assert res.get('sn','') == self.sn + assert res.get('uid','') == self.uid + + def test_remove(self): + """ + Test the `xmlrpc.user_del` method. + """ + res = api.Command['user_del'](self.uid) + assert res == True + + # Verify that it is gone + try: + res = api.Command['user_show'](self.uid) + except errors.NotFound: + pass + else: + assert False diff --git a/tests/test_xmlrpc/xmlrpc_test.py b/tests/test_xmlrpc/xmlrpc_test.py new file mode 100644 index 00000000..744c0c27 --- /dev/null +++ b/tests/test_xmlrpc/xmlrpc_test.py @@ -0,0 +1,49 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Base class for all XML-RPC tests +""" + +import sys +import socket +import nose +from ipalib import api +from ipalib import errors +from ipalib.cli import CLI + +try: + api.finalize() +except StandardError: + pass + +class XMLRPC_test: + """ + Base class for all XML-RPC plugin tests + """ + + def setUp(self): + # FIXME: changing Plugin.name from a property to an instance attribute + # somehow broke this. + try: + res = api.Command['user_show']('notfound') + except socket.error: + raise nose.SkipTest + except errors.NotFound: + pass diff --git a/tests/util.py b/tests/util.py new file mode 100644 index 00000000..f5899dfa --- /dev/null +++ b/tests/util.py @@ -0,0 +1,391 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Common utility functions and classes for unit tests. +""" + +import inspect +import os +from os import path +import tempfile +import shutil +import ipalib +from ipalib.plugable import Plugin +from ipalib.request import context + + + +class TempDir(object): + def __init__(self): + self.__path = tempfile.mkdtemp(prefix='ipa.tests.') + assert self.path == self.__path + + def __get_path(self): + assert path.abspath(self.__path) == self.__path + assert self.__path.startswith('/tmp/ipa.tests.') + assert path.isdir(self.__path) and not path.islink(self.__path) + return self.__path + path = property(__get_path) + + def rmtree(self): + if self.__path is not None: + shutil.rmtree(self.path) + self.__path = None + + def makedirs(self, *parts): + d = self.join(*parts) + if not path.exists(d): + os.makedirs(d) + assert path.isdir(d) and not path.islink(d) + return d + + def touch(self, *parts): + d = self.makedirs(*parts[:-1]) + f = path.join(d, parts[-1]) + assert not path.exists(f) + open(f, 'w').close() + assert path.isfile(f) and not path.islink(f) + return f + + def write(self, content, *parts): + d = self.makedirs(*parts[:-1]) + f = path.join(d, parts[-1]) + assert not path.exists(f) + open(f, 'w').write(content) + assert path.isfile(f) and not path.islink(f) + return f + + def join(self, *parts): + return path.join(self.path, *parts) + + def __del__(self): + self.rmtree() + + +class TempHome(TempDir): + def __init__(self): + super(TempHome, self).__init__() + self.__home = os.environ['HOME'] + os.environ['HOME'] = self.path + + +class ExceptionNotRaised(Exception): + """ + Exception raised when an *expected* exception is *not* raised during a + unit test. + """ + msg = 'expected %s' + + def __init__(self, expected): + self.expected = expected + + def __str__(self): + return self.msg % self.expected.__name__ + + +def assert_equal(val1, val2): + """ + Assert ``val1`` and ``val2`` are the same type and of equal value. + """ + assert type(val1) is type(val2), '%r != %r' % (val1, val2) + assert val1 == val2, '%r != %r' % (val1, val2) + + +def raises(exception, callback, *args, **kw): + """ + Tests that the expected exception is raised; raises ExceptionNotRaised + if test fails. + """ + raised = False + try: + callback(*args, **kw) + except exception, e: + raised = True + if not raised: + raise ExceptionNotRaised(exception) + return e + + +def getitem(obj, key): + """ + Works like getattr but for dictionary interface. Use this in combination + with raises() to test that, for example, KeyError is raised. + """ + return obj[key] + + +def setitem(obj, key, value): + """ + Works like setattr but for dictionary interface. Use this in combination + with raises() to test that, for example, TypeError is raised. + """ + obj[key] = value + + +def delitem(obj, key): + """ + Works like delattr but for dictionary interface. Use this in combination + with raises() to test that, for example, TypeError is raised. + """ + del obj[key] + + +def no_set(obj, name, value='some_new_obj'): + """ + Tests that attribute cannot be set. + """ + raises(AttributeError, setattr, obj, name, value) + + +def no_del(obj, name): + """ + Tests that attribute cannot be deleted. + """ + raises(AttributeError, delattr, obj, name) + + +def read_only(obj, name, value='some_new_obj'): + """ + Tests that attribute is read-only. Returns attribute. + """ + # Test that it cannot be set: + no_set(obj, name, value) + + # Test that it cannot be deleted: + no_del(obj, name) + + # Return the attribute + return getattr(obj, name) + + +def is_prop(prop): + return type(prop) is property + + +class ClassChecker(object): + __cls = None + __subcls = None + + def __get_cls(self): + if self.__cls is None: + self.__cls = self._cls + assert inspect.isclass(self.__cls) + return self.__cls + cls = property(__get_cls) + + def __get_subcls(self): + if self.__subcls is None: + self.__subcls = self.get_subcls() + assert inspect.isclass(self.__subcls) + return self.__subcls + subcls = property(__get_subcls) + + def get_subcls(self): + raise NotImplementedError( + self.__class__.__name__, + 'get_subcls()' + ) + + def tearDown(self): + """ + nose tear-down fixture. + """ + for name in ('ugettext', 'ungettext'): + if hasattr(context, name): + delattr(context, name) + + + + + + + +def check_TypeError(value, type_, name, callback, *args, **kw): + """ + Tests a standard TypeError raised with `errors.raise_TypeError`. + """ + e = raises(TypeError, callback, *args, **kw) + assert e.value is value + assert e.type is type_ + assert e.name == name + assert type(e.name) is str + assert str(e) == ipalib.errors.TYPE_FORMAT % (name, type_, value) + return e + + +def get_api(**kw): + """ + Returns (api, home) tuple. + + This function returns a tuple containing an `ipalib.plugable.API` + instance and a `TempHome` instance. + """ + home = TempHome() + api = ipalib.create_api(mode='unit_test') + api.env.in_tree = True + for (key, value) in kw.iteritems(): + api.env[key] = value + return (api, home) + + +def create_test_api(**kw): + """ + Returns (api, home) tuple. + + This function returns a tuple containing an `ipalib.plugable.API` + instance and a `TempHome` instance. + """ + home = TempHome() + api = ipalib.create_api(mode='unit_test') + api.env.in_tree = True + for (key, value) in kw.iteritems(): + api.env[key] = value + return (api, home) + + +class PluginTester(object): + __plugin = None + + def __get_plugin(self): + if self.__plugin is None: + self.__plugin = self._plugin + assert issubclass(self.__plugin, Plugin) + return self.__plugin + plugin = property(__get_plugin) + + def register(self, *plugins, **kw): + """ + Create a testing api and register ``self.plugin``. + + This method returns an (api, home) tuple. + + :param plugins: Additional \*plugins to register. + :param kw: Additional \**kw args to pass to `create_test_api`. + """ + (api, home) = create_test_api(**kw) + api.register(self.plugin) + for p in plugins: + api.register(p) + return (api, home) + + def finalize(self, *plugins, **kw): + (api, home) = self.register(*plugins, **kw) + api.finalize() + return (api, home) + + def instance(self, namespace, *plugins, **kw): + (api, home) = self.finalize(*plugins, **kw) + o = api[namespace][self.plugin.__name__] + return (o, api, home) + + +class dummy_ugettext(object): + __called = False + + def __init__(self, translation=None): + if translation is None: + translation = u'The translation' + self.translation = translation + assert type(self.translation) is unicode + + def __call__(self, message): + assert self.__called is False + self.__called = True + assert type(message) is str + assert not hasattr(self, 'message') + self.message = message + assert type(self.translation) is unicode + return self.translation + + def called(self): + return self.__called + + def reset(self): + assert type(self.translation) is unicode + assert type(self.message) is str + del self.message + assert self.__called is True + self.__called = False + + +class dummy_ungettext(object): + __called = False + + def __init__(self): + self.translation_singular = u'The singular translation' + self.translation_plural = u'The plural translation' + + def __call__(self, singular, plural, n): + assert type(singular) is str + assert type(plural) is str + assert type(n) is int + assert self.__called is False + self.__called = True + self.singular = singular + self.plural = plural + self.n = n + if n == 1: + return self.translation_singular + return self.translation_plural + + +class DummyMethod(object): + def __init__(self, callback, name): + self.__callback = callback + self.__name = name + + def __call__(self, *args, **kw): + return self.__callback(self.__name, args, kw) + + +class DummyClass(object): + def __init__(self, *calls): + self.__calls = calls + self.__i = 0 + for (name, args, kw, result) in calls: + method = DummyMethod(self.__process, name) + setattr(self, name, method) + + def __process(self, name_, args_, kw_): + if self.__i >= len(self.__calls): + raise AssertionError( + 'extra call: %s, %r, %r' % (name, args, kw) + ) + (name, args, kw, result) = self.__calls[self.__i] + self.__i += 1 + i = self.__i + if name_ != name: + raise AssertionError( + 'call %d should be to method %r; got %r' % (i, name, name_) + ) + if args_ != args: + raise AssertionError( + 'call %d to %r should have args %r; got %r' % (i, name, args, args_) + ) + if kw_ != kw: + raise AssertionError( + 'call %d to %r should have kw %r, got %r' % (i, name, kw, kw_) + ) + if isinstance(result, Exception): + raise result + return result + + def _calledall(self): + return self.__i == len(self.__calls) |