From c60142efda817f030a7495cd6fe4a19953e55afa Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 21 May 2013 13:40:27 +0200 Subject: Make an ipa-tests package Rename the 'tests' directory to 'ipa-tests', and create an ipa-tests RPM containing the test suite Part of the work for: https://fedorahosted.org/freeipa/ticket/3654 --- ipatests/test_ipalib/__init__.py | 22 + ipatests/test_ipalib/test_backend.py | 272 +++++ ipatests/test_ipalib/test_base.py | 352 +++++++ ipatests/test_ipalib/test_capabilities.py | 33 + ipatests/test_ipalib/test_cli.py | 116 +++ ipatests/test_ipalib/test_config.py | 609 ++++++++++++ ipatests/test_ipalib/test_crud.py | 240 +++++ ipatests/test_ipalib/test_errors.py | 374 +++++++ ipatests/test_ipalib/test_frontend.py | 1188 ++++++++++++++++++++++ ipatests/test_ipalib/test_messages.py | 89 ++ ipatests/test_ipalib/test_output.py | 89 ++ ipatests/test_ipalib/test_parameters.py | 1533 +++++++++++++++++++++++++++++ ipatests/test_ipalib/test_plugable.py | 516 ++++++++++ ipatests/test_ipalib/test_rpc.py | 244 +++++ ipatests/test_ipalib/test_text.py | 334 +++++++ ipatests/test_ipalib/test_util.py | 26 + ipatests/test_ipalib/test_x509.py | 139 +++ 17 files changed, 6176 insertions(+) create mode 100644 ipatests/test_ipalib/__init__.py create mode 100644 ipatests/test_ipalib/test_backend.py create mode 100644 ipatests/test_ipalib/test_base.py create mode 100644 ipatests/test_ipalib/test_capabilities.py create mode 100644 ipatests/test_ipalib/test_cli.py create mode 100644 ipatests/test_ipalib/test_config.py create mode 100644 ipatests/test_ipalib/test_crud.py create mode 100644 ipatests/test_ipalib/test_errors.py create mode 100644 ipatests/test_ipalib/test_frontend.py create mode 100644 ipatests/test_ipalib/test_messages.py create mode 100644 ipatests/test_ipalib/test_output.py create mode 100644 ipatests/test_ipalib/test_parameters.py create mode 100644 ipatests/test_ipalib/test_plugable.py create mode 100644 ipatests/test_ipalib/test_rpc.py create mode 100644 ipatests/test_ipalib/test_text.py create mode 100644 ipatests/test_ipalib/test_util.py create mode 100644 ipatests/test_ipalib/test_x509.py (limited to 'ipatests/test_ipalib') diff --git a/ipatests/test_ipalib/__init__.py b/ipatests/test_ipalib/__init__.py new file mode 100644 index 00000000..4e4c605c --- /dev/null +++ b/ipatests/test_ipalib/__init__.py @@ -0,0 +1,22 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Sub-package containing unit tests for `ipalib` package. +""" diff --git a/ipatests/test_ipalib/test_backend.py b/ipatests/test_ipalib/test_backend.py new file mode 100644 index 00000000..3ebed4bb --- /dev/null +++ b/ipatests/test_ipalib/test_backend.py @@ -0,0 +1,272 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.backend` module. +""" + +import threading +from ipatests.util import ClassChecker, raises, create_test_api +from ipatests.data import unicode_str +from ipalib.request import context, Connection +from ipalib.frontend import Command +from ipalib import backend, plugable, errors, base +from ipapython.version import API_VERSION + + + +class test_Backend(ClassChecker): + """ + Test the `ipalib.backend.Backend` class. + """ + + _cls = backend.Backend + + def test_class(self): + assert self.cls.__bases__ == (plugable.Plugin,) + + +class Disconnect(object): + called = False + + def __init__(self, id=None): + self.id = id + + def __call__(self): + assert self.called is False + self.called = True + if self.id is not None: + delattr(context, self.id) + + +class test_Connectible(ClassChecker): + """ + Test the `ipalib.backend.Connectible` class. + """ + + _cls = backend.Connectible + + def test_connect(self): + """ + Test the `ipalib.backend.Connectible.connect` method. + """ + # Test that connection is created: + class example(self.cls): + def create_connection(self, *args, **kw): + object.__setattr__(self, 'args', args) + object.__setattr__(self, 'kw', kw) + return 'The connection.' + o = example() + args = ('Arg1', 'Arg2', 'Arg3') + kw = dict(key1='Val1', key2='Val2', key3='Val3') + assert not hasattr(context, 'example') + assert o.connect(*args, **kw) is None + conn = context.example + assert type(conn) is Connection + assert o.args == args + assert o.kw == kw + assert conn.conn == 'The connection.' + assert conn.disconnect == o.disconnect + + # Test that StandardError is raised if already connected: + m = "connect: 'context.%s' already exists in thread %r" + e = raises(StandardError, o.connect, *args, **kw) + assert str(e) == m % ('example', threading.currentThread().getName()) + + # Double check that it works after deleting context.example: + del context.example + assert o.connect(*args, **kw) is None + + def test_create_connection(self): + """ + Test the `ipalib.backend.Connectible.create_connection` method. + """ + class example(self.cls): + pass + for klass in (self.cls, example): + o = klass() + e = raises(NotImplementedError, o.create_connection) + assert str(e) == '%s.create_connection()' % klass.__name__ + + def test_disconnect(self): + """ + Test the `ipalib.backend.Connectible.disconnect` method. + """ + class example(self.cls): + destroy_connection = Disconnect() + o = example() + + m = "disconnect: 'context.%s' does not exist in thread %r" + e = raises(StandardError, o.disconnect) + assert str(e) == m % ('example', threading.currentThread().getName()) + + context.example = 'The connection.' + assert o.disconnect() is None + assert example.destroy_connection.called is True + + def test_destroy_connection(self): + """ + Test the `ipalib.backend.Connectible.destroy_connection` method. + """ + class example(self.cls): + pass + for klass in (self.cls, example): + o = klass() + e = raises(NotImplementedError, o.destroy_connection) + assert str(e) == '%s.destroy_connection()' % klass.__name__ + + def test_isconnected(self): + """ + Test the `ipalib.backend.Connectible.isconnected` method. + """ + class example(self.cls): + pass + for klass in (self.cls, example): + o = klass() + assert o.isconnected() is False + conn = 'whatever' + setattr(context, klass.__name__, conn) + assert o.isconnected() is True + delattr(context, klass.__name__) + + def test_conn(self): + """ + Test the `ipalib.backend.Connectible.conn` property. + """ + msg = 'no context.%s in thread %r' + class example(self.cls): + pass + for klass in (self.cls, example): + o = klass() + e = raises(AttributeError, getattr, o, 'conn') + assert str(e) == msg % ( + klass.__name__, threading.currentThread().getName() + ) + conn = Connection('The connection.', Disconnect()) + setattr(context, klass.__name__, conn) + assert o.conn is conn.conn + delattr(context, klass.__name__) + + +class test_Executioner(ClassChecker): + """ + Test the `ipalib.backend.Executioner` class. + """ + _cls = backend.Executioner + + def test_execute(self): + """ + Test the `ipalib.backend.Executioner.execute` method. + """ + (api, home) = create_test_api(in_server=True) + + class echo(Command): + takes_args = ('arg1', 'arg2+') + takes_options = ('option1?', 'option2?') + def execute(self, *args, **options): + assert type(args[1]) is tuple + return dict(result=args + (options,)) + api.register(echo) + + class good(Command): + def execute(self, **options): + raise errors.ValidationError( + name='nurse', + error=u'Not naughty!', + ) + api.register(good) + + class bad(Command): + def execute(self, **options): + raise ValueError('This is private.') + api.register(bad) + + class with_name(Command): + """ + Test that a kwarg named 'name' can be used. + """ + takes_options = 'name' + def execute(self, **options): + return dict(result=options['name'].upper()) + api.register(with_name) + + api.finalize() + o = self.cls() + o.set_api(api) + o.finalize() + + # Test that CommandError is raised: + conn = Connection('The connection.', Disconnect('someconn')) + context.someconn = conn + print str(context.__dict__.keys()) + e = raises(errors.CommandError, o.execute, 'nope') + assert e.name == 'nope' + assert conn.disconnect.called is True # Make sure destroy_context() was called + print str(context.__dict__.keys()) + assert context.__dict__.keys() == [] + + # Test with echo command: + arg1 = unicode_str + arg2 = (u'Hello', unicode_str, u'world!') + args = (arg1,) + arg2 + options = dict(option1=u'How are you?', option2=unicode_str, + version=API_VERSION) + + conn = Connection('The connection.', Disconnect('someconn')) + context.someconn = conn + print o.execute('echo', arg1, arg2, **options) + print dict( + result=(arg1, arg2, options) + ) + assert o.execute('echo', arg1, arg2, **options) == dict( + result=(arg1, arg2, options) + ) + assert conn.disconnect.called is True # Make sure destroy_context() was called + assert context.__dict__.keys() == [] + + conn = Connection('The connection.', Disconnect('someconn')) + context.someconn = conn + assert o.execute('echo', *args, **options) == dict( + result=(arg1, arg2, options) + ) + assert conn.disconnect.called is True # Make sure destroy_context() was called + assert context.__dict__.keys() == [] + + # Test with good command: + conn = Connection('The connection.', Disconnect('someconn')) + context.someconn = conn + e = raises(errors.ValidationError, o.execute, 'good') + assert e.name == 'nurse' + assert e.error == u'Not naughty!' + assert conn.disconnect.called is True # Make sure destroy_context() was called + assert context.__dict__.keys() == [] + + # Test with bad command: + conn = Connection('The connection.', Disconnect('someconn')) + context.someconn = conn + e = raises(errors.InternalError, o.execute, 'bad') + assert conn.disconnect.called is True # Make sure destroy_context() was called + assert context.__dict__.keys() == [] + + # Test with option 'name': + conn = Connection('The connection.', Disconnect('someconn')) + context.someconn = conn + expected = dict(result=u'TEST') + assert expected == o.execute('with_name', name=u'test', + version=API_VERSION) diff --git a/ipatests/test_ipalib/test_base.py b/ipatests/test_ipalib/test_base.py new file mode 100644 index 00000000..ef6c180c --- /dev/null +++ b/ipatests/test_ipalib/test_base.py @@ -0,0 +1,352 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.base` module. +""" + +from ipatests.util import ClassChecker, raises +from ipalib.constants import NAME_REGEX, NAME_ERROR +from ipalib.constants import TYPE_ERROR, SET_ERROR, DEL_ERROR, OVERRIDE_ERROR +from ipalib import base + + +class test_ReadOnly(ClassChecker): + """ + Test the `ipalib.base.ReadOnly` class + """ + _cls = base.ReadOnly + + def test_lock(self): + """ + Test the `ipalib.base.ReadOnly.__lock__` method. + """ + o = self.cls() + assert o._ReadOnly__locked is False + o.__lock__() + assert o._ReadOnly__locked is True + e = raises(AssertionError, o.__lock__) # Can only be locked once + assert str(e) == '__lock__() can only be called once' + assert o._ReadOnly__locked is True # This should still be True + + def test_islocked(self): + """ + Test the `ipalib.base.ReadOnly.__islocked__` method. + """ + o = self.cls() + assert o.__islocked__() is False + o.__lock__() + assert o.__islocked__() is True + + def test_setattr(self): + """ + Test the `ipalib.base.ReadOnly.__setattr__` method. + """ + o = self.cls() + o.attr1 = 'Hello, world!' + assert o.attr1 == 'Hello, world!' + o.__lock__() + for name in ('attr1', 'attr2'): + e = raises(AttributeError, setattr, o, name, 'whatever') + assert str(e) == SET_ERROR % ('ReadOnly', name, 'whatever') + assert o.attr1 == 'Hello, world!' + + def test_delattr(self): + """ + Test the `ipalib.base.ReadOnly.__delattr__` method. + """ + o = self.cls() + o.attr1 = 'Hello, world!' + o.attr2 = 'How are you?' + assert o.attr1 == 'Hello, world!' + assert o.attr2 == 'How are you?' + del o.attr1 + assert not hasattr(o, 'attr1') + o.__lock__() + e = raises(AttributeError, delattr, o, 'attr2') + assert str(e) == DEL_ERROR % ('ReadOnly', 'attr2') + assert o.attr2 == 'How are you?' + + +def test_lock(): + """ + Test the `ipalib.base.lock` function + """ + f = base.lock + + # Test with ReadOnly instance: + o = base.ReadOnly() + assert o.__islocked__() is False + assert f(o) is o + assert o.__islocked__() is True + e = raises(AssertionError, f, o) + assert str(e) == 'already locked: %r' % o + + # Test with another class implemented locking protocol: + class Lockable(object): + __locked = False + def __lock__(self): + self.__locked = True + def __islocked__(self): + return self.__locked + o = Lockable() + assert o.__islocked__() is False + assert f(o) is o + assert o.__islocked__() is True + e = raises(AssertionError, f, o) + assert str(e) == 'already locked: %r' % o + + # Test with a class incorrectly implementing the locking protocol: + class Broken(object): + def __lock__(self): + pass + def __islocked__(self): + return False + o = Broken() + e = raises(AssertionError, f, o) + assert str(e) == 'failed to lock: %r' % o + + +def test_islocked(): + """ + Test the `ipalib.base.islocked` function. + """ + f = base.islocked + + # Test with ReadOnly instance: + o = base.ReadOnly() + assert f(o) is False + o.__lock__() + assert f(o) is True + + # Test with another class implemented locking protocol: + class Lockable(object): + __locked = False + def __lock__(self): + self.__locked = True + def __islocked__(self): + return self.__locked + o = Lockable() + assert f(o) is False + o.__lock__() + assert f(o) is True + + # Test with a class incorrectly implementing the locking protocol: + class Broken(object): + __lock__ = False + def __islocked__(self): + return False + o = Broken() + e = raises(AssertionError, f, o) + assert str(e) == 'no __lock__() method: %r' % o + + +def test_check_name(): + """ + Test the `ipalib.base.check_name` function. + """ + f = base.check_name + okay = [ + 'user_add', + 'stuff2junk', + 'sixty9', + ] + nope = [ + '_user_add', + '__user_add', + 'user_add_', + 'user_add__', + '_user_add_', + '__user_add__', + '60nine', + ] + for name in okay: + assert name is f(name) + e = raises(TypeError, f, unicode(name)) + assert str(e) == TYPE_ERROR % ('name', str, unicode(name), unicode) + for name in nope: + e = raises(ValueError, f, name) + assert str(e) == NAME_ERROR % (NAME_REGEX, name) + for name in okay: + e = raises(ValueError, f, name.upper()) + assert str(e) == NAME_ERROR % (NAME_REGEX, name.upper()) + + +def membername(i): + return 'member%03d' % i + + +class DummyMember(object): + def __init__(self, i): + self.i = i + self.name = membername(i) + + +def gen_members(*indexes): + return tuple(DummyMember(i) for i in indexes) + + +class test_NameSpace(ClassChecker): + """ + Test the `ipalib.base.NameSpace` class. + """ + _cls = base.NameSpace + + def new(self, count, sort=True): + members = tuple(DummyMember(i) for i in xrange(count, 0, -1)) + assert len(members) == count + o = self.cls(members, sort=sort) + return (o, members) + + def test_init(self): + """ + Test the `ipalib.base.NameSpace.__init__` method. + """ + o = self.cls([]) + assert len(o) == 0 + assert list(o) == [] + assert list(o()) == [] + + # Test members as attribute and item: + for cnt in (3, 42): + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + assert len(members) == cnt + for m in members: + assert getattr(o, m.name) is m + assert o[m.name] is m + + # Test that TypeError is raised if sort is not a bool: + e = raises(TypeError, self.cls, [], sort=None) + assert str(e) == TYPE_ERROR % ('sort', bool, None, type(None)) + + # Test that AttributeError is raised with duplicate member name: + members = gen_members(0, 1, 2, 1, 3) + e = raises(AttributeError, self.cls, members) + assert str(e) == OVERRIDE_ERROR % ( + 'NameSpace', membername(1), members[1], members[3] + ) + + def test_len(self): + """ + Test the `ipalib.base.NameSpace.__len__` method. + """ + for count in (5, 18, 127): + (o, members) = self.new(count) + assert len(o) == count + (o, members) = self.new(count, sort=False) + assert len(o) == count + + def test_iter(self): + """ + Test the `ipalib.base.NameSpace.__iter__` method. + """ + (o, members) = self.new(25) + assert list(o) == sorted(m.name for m in members) + (o, members) = self.new(25, sort=False) + assert list(o) == list(m.name for m in members) + + def test_call(self): + """ + Test the `ipalib.base.NameSpace.__call__` method. + """ + (o, members) = self.new(25) + assert list(o()) == sorted(members, key=lambda m: m.name) + (o, members) = self.new(25, sort=False) + assert tuple(o()) == members + + def test_contains(self): + """ + Test the `ipalib.base.NameSpace.__contains__` method. + """ + yes = (99, 3, 777) + no = (9, 333, 77) + for sort in (True, False): + members = gen_members(*yes) + o = self.cls(members, sort=sort) + for i in yes: + assert membername(i) in o + assert membername(i).upper() not in o + for i in no: + assert membername(i) not in o + + def test_getitem(self): + """ + Test the `ipalib.base.NameSpace.__getitem__` method. + """ + cnt = 17 + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + assert len(members) == cnt + if sort is True: + members = tuple(sorted(members, key=lambda m: m.name)) + + # Test str keys: + for m in members: + assert o[m.name] is m + e = raises(KeyError, o.__getitem__, 'nope') + + # Test int indexes: + for i in xrange(cnt): + assert o[i] is members[i] + e = raises(IndexError, o.__getitem__, cnt) + + # Test negative int indexes: + for i in xrange(1, cnt + 1): + assert o[-i] is members[-i] + e = raises(IndexError, o.__getitem__, -(cnt + 1)) + + # Test slicing: + assert o[3:] == members[3:] + assert o[:10] == members[:10] + assert o[3:10] == members[3:10] + assert o[-9:] == members[-9:] + assert o[:-4] == members[:-4] + assert o[-9:-4] == members[-9:-4] + + # Test that TypeError is raised with wrong type + e = raises(TypeError, o.__getitem__, 3.0) + assert str(e) == TYPE_ERROR % ('key', (str, int, slice), 3.0, float) + + def test_repr(self): + """ + Test the `ipalib.base.NameSpace.__repr__` method. + """ + for cnt in (0, 1, 2): + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + if cnt == 1: + assert repr(o) == \ + 'NameSpace(<%d member>, sort=%r)' % (cnt, sort) + else: + assert repr(o) == \ + 'NameSpace(<%d members>, sort=%r)' % (cnt, sort) + + def test_todict(self): + """ + Test the `ipalib.base.NameSpace.__todict__` method. + """ + for cnt in (3, 101): + for sort in (True, False): + (o, members) = self.new(cnt, sort=sort) + d = o.__todict__() + assert d == dict((m.name, m) for m in members) + + # Test that a copy is returned: + assert o.__todict__() is not d diff --git a/ipatests/test_ipalib/test_capabilities.py b/ipatests/test_ipalib/test_capabilities.py new file mode 100644 index 00000000..21e53c2d --- /dev/null +++ b/ipatests/test_ipalib/test_capabilities.py @@ -0,0 +1,33 @@ +# Authors: +# Petr Viktorin +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.errors` module. +""" + +from ipalib.capabilities import capabilities, client_has_capability + + +def test_client_has_capability(): + assert capabilities['messages'] == u'2.52' + assert client_has_capability(u'2.52', 'messages') + assert client_has_capability(u'2.60', 'messages') + assert client_has_capability(u'3.0', 'messages') + assert not client_has_capability(u'2.11', 'messages') + assert not client_has_capability(u'0.1', 'messages') diff --git a/ipatests/test_ipalib/test_cli.py b/ipatests/test_ipalib/test_cli.py new file mode 100644 index 00000000..07935c5b --- /dev/null +++ b/ipatests/test_ipalib/test_cli.py @@ -0,0 +1,116 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.cli` module. +""" + +from ipatests.util import raises, get_api, ClassChecker +from ipalib import cli, plugable, frontend, backend + + +class test_textui(ClassChecker): + _cls = cli.textui + + def test_max_col_width(self): + """ + Test the `ipalib.cli.textui.max_col_width` method. + """ + o = self.cls() + e = raises(TypeError, o.max_col_width, 'hello') + assert str(e) == 'rows: need %r or %r; got %r' % (list, tuple, 'hello') + rows = [ + 'hello', + 'naughty', + 'nurse', + ] + assert o.max_col_width(rows) == len('naughty') + rows = ( + ( 'a', 'bbb', 'ccccc'), + ('aa', 'bbbb', 'cccccc'), + ) + assert o.max_col_width(rows, col=0) == 2 + assert o.max_col_width(rows, col=1) == 4 + assert o.max_col_width(rows, col=2) == 6 + + +def test_to_cli(): + """ + Test the `ipalib.cli.to_cli` function. + """ + f = cli.to_cli + assert f('initialize') == 'initialize' + assert f('user_add') == 'user-add' + + +def test_from_cli(): + """ + Test the `ipalib.cli.from_cli` function. + """ + f = cli.from_cli + assert f('initialize') == 'initialize' + assert f('user-add') == 'user_add' + + +def get_cmd_name(i): + return 'cmd_%d' % i + + +class DummyCommand(object): + def __init__(self, name): + self.__name = name + + def __get_name(self): + return self.__name + name = property(__get_name) + + +class DummyAPI(object): + def __init__(self, cnt): + self.__cmd = plugable.NameSpace(self.__cmd_iter(cnt)) + + def __get_cmd(self): + return self.__cmd + Command = property(__get_cmd) + + def __cmd_iter(self, cnt): + for i in xrange(cnt): + yield DummyCommand(get_cmd_name(i)) + + def finalize(self): + pass + + def register(self, *args, **kw): + pass + + +config_cli = """ +[global] + +from_cli_conf = set in cli.conf +""" + +config_default = """ +[global] + +from_default_conf = set in default.conf + +# Make sure cli.conf is loaded first: +from_cli_conf = overridden in default.conf +""" diff --git a/ipatests/test_ipalib/test_config.py b/ipatests/test_ipalib/test_config.py new file mode 100644 index 00000000..f896b893 --- /dev/null +++ b/ipatests/test_ipalib/test_config.py @@ -0,0 +1,609 @@ +# Authors: +# Martin Nagy +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.config` module. +""" + +import os +from os import path +import sys +import socket +from ipatests.util import raises, setitem, delitem, ClassChecker +from ipatests.util import getitem, setitem, delitem +from ipatests.util import TempDir, TempHome +from ipalib.constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR +from ipalib.constants import NAME_REGEX, NAME_ERROR +from ipalib import config, constants, base + + +# Valid environment variables in (key, raw, value) tuples: +# key: the name of the environment variable +# raw: the value being set (possibly a string repr) +# value: the expected value after the lightweight conversion +good_vars = ( + ('a_string', u'Hello world!', u'Hello world!'), + ('trailing_whitespace', u' value ', u'value'), + ('an_int', 42, 42), + ('int_repr', ' 42 ', 42), + ('a_float', 3.14, 3.14), + ('float_repr', ' 3.14 ', 3.14), + ('true', True, True), + ('true_repr', ' True ', True), + ('false', False, False), + ('false_repr', ' False ', False), + ('none', None, None), + ('none_repr', ' None ', None), + ('empty', '', None), + + # These verify that the implied conversion is case-sensitive: + ('not_true', u' true ', u'true'), + ('not_false', u' false ', u'false'), + ('not_none', u' none ', u'none'), +) + + +bad_names = ( + ('CamelCase', u'value'), + ('_leading_underscore', u'value'), + ('trailing_underscore_', u'value'), +) + + +# Random base64-encoded data to simulate a misbehaving config file. +config_bad = """ +/9j/4AAQSkZJRgABAQEAlgCWAAD//gAIT2xpdmVy/9sAQwAQCwwODAoQDg0OEhEQExgoGhgWFhgx +IyUdKDozPTw5Mzg3QEhcTkBEV0U3OFBtUVdfYmdoZz5NcXlwZHhcZWdj/8AACwgAlgB0AQERAP/E +ABsAAAEFAQEAAAAAAAAAAAAAAAQAAQIDBQYH/8QAMhAAAgICAAUDBAIABAcAAAAAAQIAAwQRBRIh +MUEGE1EiMmFxFIEVI0LBFjNSYnKRof/aAAgBAQAAPwDCtzmNRr1o/MEP1D6f7kdkRakgBsAtoQhk +xls/y3Z113I11mhiUc1ewCf1Oq4anJgINdhLhQoextfedmYrenfcvdzaFQnYAE08XhONTWEK8+js +Fpo1oqAKoAA8CWjoJJTHM8kJ5jsiOiszAKD1+IV/hmW76rosbfnlh1Pp3Mah2srCnXQE9YXiel/c +p5r7uVj2CwxPTuFjjmdLbteNwmrLwsYe3TjsD8cmjKV43ycy+3o76D4llFuXmuCoZEPczXVOSsLv +f5lgGpNZLxJL2jnvMar0/wAOp6jHDH/uO4RViY9f/KpRdfC6k3R9fRyj+pRZVkWKqF10e+hCKaFq +XlH/ALlmhK7Met/uUGZ5ow8XL57lU8/Yt4lx4jUOJphLobTe/wDaHeZLxHXtJEya9o5lFzCqpmPY +CUYoPtDfc9TLj0G5jZvHaMFirAs++oEHq9U4rbNiMp8a6wO/1Zbzn2alC+Nx8P1JfdeBboA+AILx +rin8pfbA1ynvKuFUXZOXXkLbzOp2R56andL2G45MmO0RPWWLEe8GzaffoKb/ADI44Pt9ZXxAuuFa +axtgp0BOSPCcviNX8n3Aw8KTNHB4FiY9StkobLWHVSeghq8M4bkAhKKyV6Hl8RV8MwMZG1Uuz3Jn +IcUQJlMFGlJ6D4hfpymy7iChHKqvVtefxO7Ai1txLBIn7pcojN3jGVhQO0ZgCNfM5ZHycTLycSkr +yhtqD4Bmrfw5cuqsm6xHXyp1seRLcHCp4dQy1bOzslj1MzeJ5dVFnuMVdgOiHxOWzrmyMg2Nrbde +k3vR2OTddcd6A5R8GdZqOo67k4wXrLAQPMRKnzImMZEzm+P1nFz6cxQeVujagWR6jsYiqivlH/Ux +1M+7jWY30i7QHx1gF11tjGyxiSfmVc+503pPidVROHYNNY21b/adVZZySo3uOo1qIZQYd9RCzfYm +TUk/qW71LjGkTA+IYiZmM1T9N9j8Gee5+McXJem0/Wp8GUK6KOi7b5MgzFjsxpJHZGDKSCOxE3cD +OvsxbbLc9lsT7Vc73KX4ln3q1ZyVrPx2J/uAjLyan37z7B+Zp4vqPJqKi0K4EvzvUt1qBMdfb+T5 +gycfzkXXuc35InfE6nO8Y9SjFc1Yqh2Hdj2mH/xFxR26XgD/AMRJf45mWMqW5bBD3KqAZlZtb++7 +kEqTsHe//sG1CcTBvy7OWpD+Sewhz8CyKCTYAQPiGV0LVWPdxqQNADQ6zL4nWq2gopU6+ofmA8x3 +1MlvfeIGbnBeCHitRt94IFbRGus2U9H08v13sT+BNHjeX/D4bY4OmP0rPPbHLMWJ2Yy2EDQjVsos +BdeYDx8wo5L5KpSdLWPAE1+G8NrFtBKgOAXPTf6mzViql5ZBoE87eJZkKbOQ8m+Yjf5EBzcO621y +GCqD0H41Obzq7U6vzM577HTXgzPPeOIvM1eB59nD8xXVj7bHTr8iej1MtlauvUMNgzi/V2ctliYy +HYTq37nMExpZXRZYpZVJUdzNjg+FXYwZgdhv6nVVUJU/uH7iNf1CARrtF0IB113M7jTNVjFl2xJA +5ROey88OrVOugOy67TDs+89NRKdSYILdRC8ZQVJ+PHyJs4fqe3EoFPLzBexPxOdusa2xndiWY7JM +qMUNrzOTAfHC9XO9/E3vT9blVJB0o2Zu3MAoYrsL13Ii0Muw3XvJG9KkDOeqjf6gWcw5A33XN9nX +tOeyMRFWy3Jch+bX7mXmCsW/5RBXUoHaOIRi2asAJ0IRbjqzll3o/EAaRiltDojgv2E1aePmhEWq +rsNHZ7wir1K/8Y1vUCSCAd+IXiZ9b1gLYvN07trXTUD4rxN2TkUgEts8p2NDtD0t5MVGchr2Xe99 +hMPNvD1LX5J2TuZhGyYwBijjfiHU5bJXrnYfqBRtRtSbIBWG3+xI6HiLUWz8xA9RuaVNrMAPfB5x +r6v9MLr4S1il7LaxyjY69Jl5eG+Kyhiv1jYIMGYMO8etGscKoJJ8Cbp4bVg4ivaq22t3G/tmRYo5 +zyjQ+JRFFET01GB0Yid9YiYh1l9KgEHqT8Tco/hewA/NzgdQdwTNGNTY3uU2crL9HN00ZlovNzfV +oCanBrBRk1rpCHPUkQjjYoW4GtwAw30MDpuxvbAvpJceR5mXFFEY0W4o4mpg0XNXutQxPUHxLb8q +7mRDyszLr6esz8u++9wL2LcvQb8RXCkhBV3A6mR5rEVSrdFPT8SBLMdsdmWe6P8AUAx+TB4oooxi +i1Jmt0+5dfuOLbANB2H6MjzNzc2zv5ji1g2+5/MYnbb+Yh+T0kubUY940UUbUWtRpJN8w1CfebkK +WfUu+/mDOAGOjsRo0UkIo+pPl6Rckl7ehuR1INGAj9u0kW2nXvK45YlQp1odukaICSAjgSQWf//Z +""" + + +# A config file that tries to override some standard vars: +config_override = """ +[global] + +key0 = var0 +home = /home/sweet/home +key1 = var1 +site_packages = planet +key2 = var2 +key3 = var3 +""" + + +# A config file that tests the automatic type conversion +config_good = """ +[global] + +string = Hello world! +null = None +yes = True +no = False +number = 42 +floating = 3.14 +""" + + +# A default config file to make sure it does not overwrite the explicit one +config_default = """ +[global] + +yes = Hello +not_in_other = foo_bar +""" + + +class test_Env(ClassChecker): + """ + Test the `ipalib.config.Env` class. + """ + + _cls = config.Env + + def test_init(self): + """ + Test the `ipalib.config.Env.__init__` method. + """ + o = self.cls() + assert list(o) == [] + assert len(o) == 0 + assert o.__islocked__() is False + + def test_lock(self): + """ + Test the `ipalib.config.Env.__lock__` method. + """ + o = self.cls() + assert o.__islocked__() is False + o.__lock__() + assert o.__islocked__() is True + e = raises(StandardError, o.__lock__) + assert str(e) == 'Env.__lock__() already called' + + # Also test with base.lock() function: + o = self.cls() + assert o.__islocked__() is False + assert base.lock(o) is o + assert o.__islocked__() is True + e = raises(AssertionError, base.lock, o) + assert str(e) == 'already locked: %r' % o + + def test_islocked(self): + """ + Test the `ipalib.config.Env.__islocked__` method. + """ + o = self.cls() + assert o.__islocked__() is False + assert base.islocked(o) is False + o.__lock__() + assert o.__islocked__() is True + assert base.islocked(o) is True + + def test_setattr(self): + """ + Test the `ipalib.config.Env.__setattr__` method. + """ + o = self.cls() + for (name, raw, value) in good_vars: + # Test setting the value: + setattr(o, name, raw) + result = getattr(o, name) + assert type(result) is type(value) + assert result == value + assert result is o[name] + + # Test that value cannot be overridden once set: + e = raises(AttributeError, setattr, o, name, raw) + assert str(e) == OVERRIDE_ERROR % ('Env', name, value, raw) + + # Test that values cannot be set once locked: + o = self.cls() + o.__lock__() + for (name, raw, value) in good_vars: + e = raises(AttributeError, setattr, o, name, raw) + assert str(e) == SET_ERROR % ('Env', name, raw) + + # Test that name is tested with check_name(): + o = self.cls() + for (name, value) in bad_names: + e = raises(ValueError, setattr, o, name, value) + assert str(e) == NAME_ERROR % (NAME_REGEX, name) + + def test_setitem(self): + """ + Test the `ipalib.config.Env.__setitem__` method. + """ + o = self.cls() + for (key, raw, value) in good_vars: + # Test setting the value: + o[key] = raw + result = o[key] + assert type(result) is type(value) + assert result == value + assert result is getattr(o, key) + + # Test that value cannot be overridden once set: + e = raises(AttributeError, o.__setitem__, key, raw) + assert str(e) == OVERRIDE_ERROR % ('Env', key, value, raw) + + # Test that values cannot be set once locked: + o = self.cls() + o.__lock__() + for (key, raw, value) in good_vars: + e = raises(AttributeError, o.__setitem__, key, raw) + assert str(e) == SET_ERROR % ('Env', key, raw) + + # Test that name is tested with check_name(): + o = self.cls() + for (key, value) in bad_names: + e = raises(ValueError, o.__setitem__, key, value) + assert str(e) == NAME_ERROR % (NAME_REGEX, key) + + def test_getitem(self): + """ + Test the `ipalib.config.Env.__getitem__` method. + """ + o = self.cls() + value = u'some value' + o.key = value + assert o.key is value + assert o['key'] is value + for name in ('one', 'two'): + e = raises(KeyError, getitem, o, name) + assert str(e) == repr(name) + + def test_delattr(self): + """ + Test the `ipalib.config.Env.__delattr__` method. + + This also tests that ``__delitem__`` is not implemented. + """ + o = self.cls() + o.one = 1 + assert o.one == 1 + for key in ('one', 'two'): + e = raises(AttributeError, delattr, o, key) + assert str(e) == DEL_ERROR % ('Env', key) + e = raises(AttributeError, delitem, o, key) + assert str(e) == '__delitem__' + + def test_contains(self): + """ + Test the `ipalib.config.Env.__contains__` method. + """ + o = self.cls() + items = [ + ('one', 1), + ('two', 2), + ('three', 3), + ('four', 4), + ] + for (key, value) in items: + assert key not in o + o[key] = value + assert key in o + + def test_len(self): + """ + Test the `ipalib.config.Env.__len__` method. + """ + o = self.cls() + assert len(o) == 0 + for i in xrange(1, 11): + key = 'key%d' % i + value = u'value %d' % i + o[key] = value + assert o[key] is value + assert len(o) == i + + def test_iter(self): + """ + Test the `ipalib.config.Env.__iter__` method. + """ + o = self.cls() + default_keys = tuple(o) + keys = ('one', 'two', 'three', 'four', 'five') + for key in keys: + o[key] = 'the value' + assert list(o) == sorted(keys + default_keys) + + def test_merge(self): + """ + Test the `ipalib.config.Env._merge` method. + """ + group1 = ( + ('key1', u'value 1'), + ('key2', u'value 2'), + ('key3', u'value 3'), + ('key4', u'value 4'), + ) + group2 = ( + ('key0', u'Value 0'), + ('key2', u'Value 2'), + ('key4', u'Value 4'), + ('key5', u'Value 5'), + ) + o = self.cls() + assert o._merge(**dict(group1)) == (4, 4) + assert len(o) == 4 + assert list(o) == list(key for (key, value) in group1) + for (key, value) in group1: + assert getattr(o, key) is value + assert o[key] is value + assert o._merge(**dict(group2)) == (2, 4) + assert len(o) == 6 + expected = dict(group2) + expected.update(dict(group1)) + assert list(o) == sorted(expected) + assert expected['key2'] == 'value 2' # And not 'Value 2' + for (key, value) in expected.iteritems(): + assert getattr(o, key) is value + assert o[key] is value + assert o._merge(**expected) == (0, 6) + assert len(o) == 6 + assert list(o) == sorted(expected) + + def test_merge_from_file(self): + """ + Test the `ipalib.config.Env._merge_from_file` method. + """ + tmp = TempDir() + assert callable(tmp.join) + + # Test a config file that doesn't exist + no_exist = tmp.join('no_exist.conf') + assert not path.exists(no_exist) + o = self.cls() + o._bootstrap() + keys = tuple(o) + orig = dict((k, o[k]) for k in o) + assert o._merge_from_file(no_exist) is None + assert tuple(o) == keys + + # Test an empty config file + empty = tmp.touch('empty.conf') + assert path.isfile(empty) + assert o._merge_from_file(empty) == (0, 0) + assert tuple(o) == keys + + # Test a mal-formed config file: + bad = tmp.join('bad.conf') + open(bad, 'w').write(config_bad) + assert path.isfile(bad) + assert o._merge_from_file(bad) is None + assert tuple(o) == keys + + # Test a valid config file that tries to override + override = tmp.join('override.conf') + open(override, 'w').write(config_override) + assert path.isfile(override) + assert o._merge_from_file(override) == (4, 6) + for (k, v) in orig.items(): + assert o[k] is v + assert list(o) == sorted(keys + ('key0', 'key1', 'key2', 'key3', 'config_loaded')) + for i in xrange(4): + assert o['key%d' % i] == ('var%d' % i) + keys = tuple(o) + + # Test a valid config file with type conversion + good = tmp.join('good.conf') + open(good, 'w').write(config_good) + assert path.isfile(good) + assert o._merge_from_file(good) == (6, 6) + added = ('string', 'null', 'yes', 'no', 'number', 'floating') + assert list(o) == sorted(keys + added) + assert o.string == 'Hello world!' + assert o.null is None + assert o.yes is True + assert o.no is False + assert o.number == 42 + assert o.floating == 3.14 + + def new(self, in_tree=False): + """ + Set os.environ['HOME'] to a tempdir. + + Returns tuple with new Env instance and the TempHome instance. This + helper method is used in testing the bootstrap related methods below. + """ + home = TempHome() + o = self.cls() + if in_tree: + o.in_tree = True + return (o, home) + + def bootstrap(self, **overrides): + """ + Helper method used in testing bootstrap related methods below. + """ + (o, home) = self.new() + assert o._isdone('_bootstrap') is False + o._bootstrap(**overrides) + assert o._isdone('_bootstrap') is True + e = raises(StandardError, o._bootstrap) + assert str(e) == 'Env._bootstrap() already called' + return (o, home) + + def test_bootstrap(self): + """ + Test the `ipalib.config.Env._bootstrap` method. + """ + # Test defaults created by _bootstrap(): + (o, home) = self.new() + o._bootstrap() + ipalib = path.dirname(path.abspath(config.__file__)) + assert o.ipalib == ipalib + assert o.site_packages == path.dirname(ipalib) + assert o.script == path.abspath(sys.argv[0]) + assert o.bin == path.dirname(path.abspath(sys.argv[0])) + assert o.home == home.path + assert o.dot_ipa == home.join('.ipa') + assert o.in_tree is False + assert o.context == 'default' + assert o.confdir == '/etc/ipa' + assert o.conf == '/etc/ipa/default.conf' + assert o.conf_default == o.conf + + # Test overriding values created by _bootstrap() + (o, home) = self.bootstrap(in_tree='True', context='server') + assert o.in_tree is True + assert o.context == 'server' + assert o.conf == home.join('.ipa', 'server.conf') + (o, home) = self.bootstrap(conf='/my/wacky/whatever.conf') + assert o.in_tree is False + assert o.context == 'default' + assert o.conf == '/my/wacky/whatever.conf' + assert o.conf_default == '/etc/ipa/default.conf' + (o, home) = self.bootstrap(conf_default='/my/wacky/default.conf') + assert o.in_tree is False + assert o.context == 'default' + assert o.conf == '/etc/ipa/default.conf' + assert o.conf_default == '/my/wacky/default.conf' + + # Test various overrides and types conversion + kw = dict( + yes=True, + no=False, + num=42, + msg='Hello, world!', + ) + override = dict( + (k, u' %s ' % v) for (k, v) in kw.items() + ) + (o, home) = self.new() + for key in kw: + assert key not in o + o._bootstrap(**override) + for (key, value) in kw.items(): + assert getattr(o, key) == value + assert o[key] == value + + def finalize_core(self, ctx, **defaults): + """ + Helper method used in testing `Env._finalize_core`. + """ + # We must force in_tree=True so we don't load possible config files in + # /etc/ipa/, whose contents could break this test: + (o, home) = self.new(in_tree=True) + if ctx: + o.context = ctx + + # Check that calls cascade down the chain: + set_here = ('in_server', 'logdir', 'log') + assert o._isdone('_bootstrap') is False + assert o._isdone('_finalize_core') is False + assert o._isdone('_finalize') is False + for key in set_here: + assert key not in o + o._finalize_core(**defaults) + assert o._isdone('_bootstrap') is True + assert o._isdone('_finalize_core') is True + assert o._isdone('_finalize') is False # Should not cascade + for key in set_here: + assert key in o + + # Check that it can't be called twice: + e = raises(StandardError, o._finalize_core) + assert str(e) == 'Env._finalize_core() already called' + + return (o, home) + + def test_finalize_core(self): + """ + Test the `ipalib.config.Env._finalize_core` method. + """ + # Test that correct defaults are generated: + (o, home) = self.finalize_core(None) + assert o.in_server is False + assert o.logdir == home.join('.ipa', 'log') + assert o.log == home.join('.ipa', 'log', 'default.log') + + # Test with context='server' + (o, home) = self.finalize_core('server') + assert o.in_server is True + assert o.logdir == home.join('.ipa', 'log') + assert o.log == home.join('.ipa', 'log', 'server.log') + + # Test that **defaults can't set in_server, logdir, nor log: + (o, home) = self.finalize_core(None, + in_server='IN_SERVER', + logdir='LOGDIR', + log='LOG', + ) + assert o.in_server is False + assert o.logdir == home.join('.ipa', 'log') + assert o.log == home.join('.ipa', 'log', 'default.log') + + # Test loading config file, plus test some in-tree stuff + (o, home) = self.bootstrap(in_tree=True, context='server') + for key in ('yes', 'no', 'number'): + assert key not in o + home.write(config_good, '.ipa', 'server.conf') + home.write(config_default, '.ipa', 'default.conf') + o._finalize_core() + assert o.in_tree is True + assert o.context == 'server' + assert o.in_server is True + assert o.logdir == home.join('.ipa', 'log') + assert o.log == home.join('.ipa', 'log', 'server.log') + assert o.yes is True + assert o.no is False + assert o.number == 42 + assert o.not_in_other == 'foo_bar' + + # Test using DEFAULT_CONFIG: + defaults = dict(constants.DEFAULT_CONFIG) + (o, home) = self.finalize_core(None, **defaults) + assert list(o) == sorted(defaults) + for (key, value) in defaults.items(): + if value is object: + continue + if key == 'mode': + continue + assert o[key] == value, '%r is %r; should be %r' % (key, o[key], value) + + def test_finalize(self): + """ + Test the `ipalib.config.Env._finalize` method. + """ + # Check that calls cascade up the chain: + (o, home) = self.new(in_tree=True) + assert o._isdone('_bootstrap') is False + assert o._isdone('_finalize_core') is False + assert o._isdone('_finalize') is False + o._finalize() + assert o._isdone('_bootstrap') is True + assert o._isdone('_finalize_core') is True + assert o._isdone('_finalize') is True + + # Check that it can't be called twice: + e = raises(StandardError, o._finalize) + assert str(e) == 'Env._finalize() already called' + + # Check that _finalize() calls __lock__() + (o, home) = self.new(in_tree=True) + assert o.__islocked__() is False + o._finalize() + assert o.__islocked__() is True + e = raises(StandardError, o.__lock__) + assert str(e) == 'Env.__lock__() already called' + + # Check that **lastchance works + (o, home) = self.finalize_core(None) + key = 'just_one_more_key' + value = u'with one more value' + lastchance = {key: value} + assert key not in o + assert o._isdone('_finalize') is False + o._finalize(**lastchance) + assert key in o + assert o[key] is value diff --git a/ipatests/test_ipalib/test_crud.py b/ipatests/test_ipalib/test_crud.py new file mode 100644 index 00000000..602f99f2 --- /dev/null +++ b/ipatests/test_ipalib/test_crud.py @@ -0,0 +1,240 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.crud` module. +""" + +from ipatests.util import read_only, raises, get_api, ClassChecker +from ipalib import crud, frontend, plugable, config +from ipalib.parameters import Str + + +class CrudChecker(ClassChecker): + """ + Class for testing base classes in `ipalib.crud`. + """ + + def get_api(self, args=tuple(), options=tuple()): + """ + Return a finalized `ipalib.plugable.API` instance. + """ + (api, home) = get_api() + class user(frontend.Object): + takes_params = ( + 'givenname', + Str('sn', flags='no_update'), + Str('uid', primary_key=True), + 'initials', + Str('uidnumber', flags=['no_create', 'no_search']) + ) + class user_verb(self.cls): + takes_args = args + takes_options = options + api.register(user) + api.register(user_verb) + api.finalize() + return api + + +class test_Create(CrudChecker): + """ + Test the `ipalib.crud.Create` class. + """ + + _cls = crud.Create + + def test_get_args(self): + """ + Test the `ipalib.crud.Create.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Create.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == \ + ['givenname', 'sn', 'initials', 'all', 'raw', 'version'] + for param in api.Method.user_verb.options(): + if param.name != 'version': + assert param.required is True + api = self.get_api(options=('extra?',)) + assert list(api.Method.user_verb.options) == \ + ['givenname', 'sn', 'initials', 'extra', 'all', 'raw', 'version'] + assert api.Method.user_verb.options.extra.required is False + + +class test_Update(CrudChecker): + """ + Test the `ipalib.crud.Update` class. + """ + + _cls = crud.Update + + def test_get_args(self): + """ + Test the `ipalib.crud.Update.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Update.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == \ + ['givenname', 'initials', 'uidnumber', 'all', 'raw', 'version'] + for param in api.Method.user_verb.options(): + if param.name in ['all', 'raw']: + assert param.required is True + else: + assert param.required is False + + +class test_Retrieve(CrudChecker): + """ + Test the `ipalib.crud.Retrieve` class. + """ + + _cls = crud.Retrieve + + def test_get_args(self): + """ + Test the `ipalib.crud.Retrieve.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Retrieve.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == ['all', 'raw', 'version'] + + +class test_Delete(CrudChecker): + """ + Test the `ipalib.crud.Delete` class. + """ + + _cls = crud.Delete + + def test_get_args(self): + """ + Test the `ipalib.crud.Delete.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['uid'] + assert api.Method.user_verb.args.uid.required is True + + def test_get_options(self): + """ + Test the `ipalib.crud.Delete.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == ['version'] + assert len(api.Method.user_verb.options) == 1 + + +class test_Search(CrudChecker): + """ + Test the `ipalib.crud.Search` class. + """ + + _cls = crud.Search + + def test_get_args(self): + """ + Test the `ipalib.crud.Search.get_args` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.args) == ['criteria'] + assert api.Method.user_verb.args.criteria.required is False + + def test_get_options(self): + """ + Test the `ipalib.crud.Search.get_options` method. + """ + api = self.get_api() + assert list(api.Method.user_verb.options) == \ + ['givenname', 'sn', 'uid', 'initials', 'all', 'raw', 'version'] + for param in api.Method.user_verb.options(): + if param.name in ['all', 'raw']: + assert param.required is True + else: + assert param.required is False + + +class test_CrudBackend(ClassChecker): + """ + Test the `ipalib.crud.CrudBackend` class. + """ + + _cls = crud.CrudBackend + + def get_subcls(self): + class ldap(self.cls): + pass + return ldap + + def check_method(self, name, *args): + o = self.cls() + e = raises(NotImplementedError, getattr(o, name), *args) + assert str(e) == 'CrudBackend.%s()' % name + sub = self.subcls() + e = raises(NotImplementedError, getattr(sub, name), *args) + assert str(e) == 'ldap.%s()' % name + + def test_create(self): + """ + Test the `ipalib.crud.CrudBackend.create` method. + """ + self.check_method('create') + + def test_retrieve(self): + """ + Test the `ipalib.crud.CrudBackend.retrieve` method. + """ + self.check_method('retrieve', 'primary key', 'attribute') + + def test_update(self): + """ + Test the `ipalib.crud.CrudBackend.update` method. + """ + self.check_method('update', 'primary key') + + def test_delete(self): + """ + Test the `ipalib.crud.CrudBackend.delete` method. + """ + self.check_method('delete', 'primary key') + + def test_search(self): + """ + Test the `ipalib.crud.CrudBackend.search` method. + """ + self.check_method('search') diff --git a/ipatests/test_ipalib/test_errors.py b/ipatests/test_ipalib/test_errors.py new file mode 100644 index 00000000..258af3b3 --- /dev/null +++ b/ipatests/test_ipalib/test_errors.py @@ -0,0 +1,374 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.errors` module. +""" + +import re +import inspect + +from ipatests.util import assert_equal, raises +from ipalib import errors, text +from ipalib.constants import TYPE_ERROR + + +class PrivateExceptionTester(object): + _klass = None + __klass = None + + def __get_klass(self): + if self.__klass is None: + self.__klass = self._klass + assert issubclass(self.__klass, StandardError) + assert issubclass(self.__klass, errors.PrivateError) + assert not issubclass(self.__klass, errors.PublicError) + return self.__klass + klass = property(__get_klass) + + def new(self, **kw): + for (key, value) in kw.iteritems(): + assert not hasattr(self.klass, key), key + inst = self.klass(**kw) + assert isinstance(inst, StandardError) + assert isinstance(inst, errors.PrivateError) + assert isinstance(inst, self.klass) + assert not isinstance(inst, errors.PublicError) + for (key, value) in kw.iteritems(): + assert getattr(inst, key) is value + assert str(inst) == self.klass.format % kw + assert inst.message == str(inst) + return inst + + +class test_PrivateError(PrivateExceptionTester): + """ + Test the `ipalib.errors.PrivateError` exception. + """ + _klass = errors.PrivateError + + def test_init(self): + """ + Test the `ipalib.errors.PrivateError.__init__` method. + """ + inst = self.klass(key1='Value 1', key2='Value 2') + assert inst.key1 == 'Value 1' + assert inst.key2 == 'Value 2' + assert str(inst) == '' + + # Test subclass and use of format: + class subclass(self.klass): + format = '%(true)r %(text)r %(number)r' + + kw = dict(true=True, text='Hello!', number=18) + inst = subclass(**kw) + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + assert str(inst) == subclass.format % kw + + # Test via PrivateExceptionTester.new() + inst = self.new(**kw) + assert isinstance(inst, self.klass) + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + + +class test_SubprocessError(PrivateExceptionTester): + """ + Test the `ipalib.errors.SubprocessError` exception. + """ + + _klass = errors.SubprocessError + + def test_init(self): + """ + Test the `ipalib.errors.SubprocessError.__init__` method. + """ + inst = self.new(returncode=1, argv=('/bin/false',)) + assert inst.returncode == 1 + assert inst.argv == ('/bin/false',) + assert str(inst) == "return code 1 from ('/bin/false',)" + assert inst.message == str(inst) + + +class test_PluginSubclassError(PrivateExceptionTester): + """ + Test the `ipalib.errors.PluginSubclassError` exception. + """ + + _klass = errors.PluginSubclassError + + def test_init(self): + """ + Test the `ipalib.errors.PluginSubclassError.__init__` method. + """ + inst = self.new(plugin='bad', bases=('base1', 'base2')) + assert inst.plugin == 'bad' + assert inst.bases == ('base1', 'base2') + assert str(inst) == \ + "'bad' not subclass of any base in ('base1', 'base2')" + assert inst.message == str(inst) + + +class test_PluginDuplicateError(PrivateExceptionTester): + """ + Test the `ipalib.errors.PluginDuplicateError` exception. + """ + + _klass = errors.PluginDuplicateError + + def test_init(self): + """ + Test the `ipalib.errors.PluginDuplicateError.__init__` method. + """ + inst = self.new(plugin='my_plugin') + assert inst.plugin == 'my_plugin' + assert str(inst) == "'my_plugin' was already registered" + assert inst.message == str(inst) + + +class test_PluginOverrideError(PrivateExceptionTester): + """ + Test the `ipalib.errors.PluginOverrideError` exception. + """ + + _klass = errors.PluginOverrideError + + def test_init(self): + """ + Test the `ipalib.errors.PluginOverrideError.__init__` method. + """ + inst = self.new(base='Base', name='cmd', plugin='my_cmd') + assert inst.base == 'Base' + assert inst.name == 'cmd' + assert inst.plugin == 'my_cmd' + assert str(inst) == "unexpected override of Base.cmd with 'my_cmd'" + assert inst.message == str(inst) + + +class test_PluginMissingOverrideError(PrivateExceptionTester): + """ + Test the `ipalib.errors.PluginMissingOverrideError` exception. + """ + + _klass = errors.PluginMissingOverrideError + + def test_init(self): + """ + Test the `ipalib.errors.PluginMissingOverrideError.__init__` method. + """ + inst = self.new(base='Base', name='cmd', plugin='my_cmd') + assert inst.base == 'Base' + assert inst.name == 'cmd' + assert inst.plugin == 'my_cmd' + assert str(inst) == "Base.cmd not registered, cannot override with 'my_cmd'" + assert inst.message == str(inst) + + + +############################################################################## +# Unit tests for public errors: + +class PublicExceptionTester(object): + _klass = None + __klass = None + + def __get_klass(self): + if self.__klass is None: + self.__klass = self._klass + assert issubclass(self.__klass, StandardError) + assert issubclass(self.__klass, errors.PublicError) + assert not issubclass(self.__klass, errors.PrivateError) + assert type(self.__klass.errno) is int + assert 900 <= self.__klass.errno <= 5999 + return self.__klass + klass = property(__get_klass) + + def new(self, format=None, message=None, **kw): + # Test that TypeError is raised if message isn't unicode: + e = raises(TypeError, self.klass, message='The message') + assert str(e) == TYPE_ERROR % ('message', unicode, 'The message', str) + + # Test the instance: + for (key, value) in kw.iteritems(): + assert not hasattr(self.klass, key), key + inst = self.klass(format=format, message=message, **kw) + for required_class in self.required_classes: + assert isinstance(inst, required_class) + assert isinstance(inst, self.klass) + assert not isinstance(inst, errors.PrivateError) + for (key, value) in kw.iteritems(): + assert getattr(inst, key) is value + return inst + + +class test_PublicError(PublicExceptionTester): + """ + Test the `ipalib.errors.PublicError` exception. + """ + _klass = errors.PublicError + required_classes = StandardError, errors.PublicError + + def test_init(self): + message = u'The translated, interpolated message' + format = 'key=%(key1)r and key2=%(key2)r' + uformat = u'Translated key=%(key1)r and key2=%(key2)r' + val1 = 'Value 1' + val2 = 'Value 2' + kw = dict(key1=val1, key2=val2) + + # Test with format=str, message=None + inst = self.klass(format, **kw) + assert inst.format is format + assert_equal(inst.message, format % kw) + assert inst.forwarded is False + assert inst.key1 is val1 + assert inst.key2 is val2 + + # Test with format=None, message=unicode + inst = self.klass(message=message, **kw) + assert inst.format is None + assert inst.message is message + assert inst.strerror is message + assert inst.forwarded is True + assert inst.key1 is val1 + assert inst.key2 is val2 + + # Test with format=None, message=str + e = raises(TypeError, self.klass, message='the message', **kw) + assert str(e) == TYPE_ERROR % ('message', unicode, 'the message', str) + + # Test with format=None, message=None + e = raises(ValueError, self.klass, **kw) + assert (str(e) == '%s.format is None yet format=None, message=None' % + self.klass.__name__) + + + ###################################### + # Test via PublicExceptionTester.new() + + # Test with format=str, message=None + inst = self.new(format, **kw) + assert isinstance(inst, self.klass) + assert inst.format is format + assert_equal(inst.message, format % kw) + assert inst.forwarded is False + assert inst.key1 is val1 + assert inst.key2 is val2 + + # Test with format=None, message=unicode + inst = self.new(message=message, **kw) + assert isinstance(inst, self.klass) + assert inst.format is None + assert inst.message is message + assert inst.strerror is message + assert inst.forwarded is True + assert inst.key1 is val1 + assert inst.key2 is val2 + + + ################## + # Test a subclass: + class subclass(self.klass): + format = '%(true)r %(text)r %(number)r' + + uformat = u'Translated %(true)r %(text)r %(number)r' + kw = dict(true=True, text='Hello!', number=18) + + # Test with format=str, message=None + e = raises(ValueError, subclass, format, **kw) + assert str(e) == 'non-generic %r needs format=None; got format=%r' % ( + 'subclass', format) + + # Test with format=None, message=None: + inst = subclass(**kw) + assert inst.format is subclass.format + assert_equal(inst.message, subclass.format % kw) + assert inst.forwarded is False + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + + # Test with format=None, message=unicode: + inst = subclass(message=message, **kw) + assert inst.format is subclass.format + assert inst.message is message + assert inst.strerror is message + assert inst.forwarded is True + assert inst.true is True + assert inst.text is kw['text'] + assert inst.number is kw['number'] + + # Test with instructions: + # first build up "instructions", then get error and search for + # lines of instructions appended to the end of the strerror + # despite the parameter 'instructions' not existing in the format + instructions = u"The quick brown fox jumps over the lazy dog".split() + # this expression checks if each word of instructions + # exists in a string as a separate line, with right order + regexp = re.compile('(?ims).*' + + ''.join(map(lambda x: '(%s).*' % (x), + instructions)) + + '$') + inst = subclass(instructions=instructions, **kw) + assert inst.format is subclass.format + assert_equal(inst.instructions, instructions) + inst_match = regexp.match(inst.strerror).groups() + assert_equal(list(inst_match),list(instructions)) + + +class BaseMessagesTest(object): + """Generic test for all of a module's errors or messages + """ + def test_public_messages(self): + i = 0 + for klass in self.message_list: + for required_class in self.required_classes: + assert issubclass(klass, required_class) + assert type(klass.errno) is int + assert klass.errno in self.errno_range + doc = inspect.getdoc(klass) + assert doc is not None, 'need class docstring for %s' % klass.__name__ + m = re.match(r'^\*{2}(\d+)\*{2} ', doc) + assert m is not None, "need '**ERRNO**' in %s docstring" % klass.__name__ + errno = int(m.group(1)) + assert errno == klass.errno, ( + 'docstring=%r but errno=%r in %s' % (errno, klass.errno, klass.__name__) + ) + self.extratest(klass) + + # Test format + if klass.format is not None: + assert klass.format is self.texts[i] + i += 1 + + def extratest(self, cls): + pass + + +class test_PublicErrors(object): + message_list = errors.public_errors + errno_range = xrange(900, 5999) + required_classes = (StandardError, errors.PublicError) + texts = errors._texts + + def extratest(self, cls): + assert not issubclass(cls, errors.PrivateError) diff --git a/ipatests/test_ipalib/test_frontend.py b/ipatests/test_ipalib/test_frontend.py new file mode 100644 index 00000000..310d7a53 --- /dev/null +++ b/ipatests/test_ipalib/test_frontend.py @@ -0,0 +1,1188 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.frontend` module. +""" + +from ipatests.util import raises, getitem, no_set, no_del, read_only +from ipatests.util import check_TypeError, ClassChecker, create_test_api +from ipatests.util import assert_equal +from ipalib.constants import TYPE_ERROR +from ipalib.base import NameSpace +from ipalib import frontend, backend, plugable, errors, parameters, config +from ipalib import output, messages +from ipalib.parameters import Str +from ipapython.version import API_VERSION + +def test_RULE_FLAG(): + assert frontend.RULE_FLAG == 'validation_rule' + + +def test_rule(): + """ + Test the `ipalib.frontend.rule` function. + """ + flag = frontend.RULE_FLAG + rule = frontend.rule + def my_func(): + pass + assert not hasattr(my_func, flag) + rule(my_func) + assert getattr(my_func, flag) is True + @rule + def my_func2(): + pass + assert getattr(my_func2, flag) is True + + +def test_is_rule(): + """ + Test the `ipalib.frontend.is_rule` function. + """ + is_rule = frontend.is_rule + flag = frontend.RULE_FLAG + + class no_call(object): + def __init__(self, value): + if value is not None: + assert value in (True, False) + setattr(self, flag, value) + + class call(no_call): + def __call__(self): + pass + + assert is_rule(call(True)) + assert not is_rule(no_call(True)) + assert not is_rule(call(False)) + assert not is_rule(call(None)) + + +class test_HasParam(ClassChecker): + """ + Test the `ipalib.frontend.Command` class. + """ + + _cls = frontend.HasParam + + def test_get_param_iterable(self): + """ + Test the `ipalib.frontend.HasParam._get_param_iterable` method. + """ + class WithTuple(self.cls): + takes_stuff = ('one', 'two') + o = WithTuple() + assert o._get_param_iterable('stuff') is WithTuple.takes_stuff + + junk = ('three', 'four') + class WithCallable(self.cls): + def takes_stuff(self): + return junk + o = WithCallable() + assert o._get_param_iterable('stuff') is junk + + class WithParam(self.cls): + takes_stuff = parameters.Str('five') + o = WithParam() + assert o._get_param_iterable('stuff') == (WithParam.takes_stuff,) + + class WithStr(self.cls): + takes_stuff = 'six' + o = WithStr() + assert o._get_param_iterable('stuff') == ('six',) + + class Wrong(self.cls): + takes_stuff = ['seven', 'eight'] + o = Wrong() + e = raises(TypeError, o._get_param_iterable, 'stuff') + assert str(e) == '%s.%s must be a tuple, callable, or spec; got %r' % ( + 'Wrong', 'takes_stuff', Wrong.takes_stuff + ) + + def test_filter_param_by_context(self): + """ + Test the `ipalib.frontend.HasParam._filter_param_by_context` method. + """ + class Example(self.cls): + def get_stuff(self): + return ( + 'one', # Make sure create_param() is called for each spec + 'two', + parameters.Str('three', include='cli'), + parameters.Str('four', exclude='server'), + parameters.Str('five', exclude=['whatever', 'cli']), + ) + o = Example() + + # Test when env is None: + params = list(o._filter_param_by_context('stuff')) + assert list(p.name for p in params) == [ + 'one', 'two', 'three', 'four', 'five' + ] + for p in params: + assert type(p) is parameters.Str + + # Test when env.context == 'cli': + cli = config.Env(context='cli') + assert cli.context == 'cli' + params = list(o._filter_param_by_context('stuff', cli)) + assert list(p.name for p in params) == ['one', 'two', 'three', 'four'] + for p in params: + assert type(p) is parameters.Str + + # Test when env.context == 'server' + server = config.Env(context='server') + assert server.context == 'server' + params = list(o._filter_param_by_context('stuff', server)) + assert list(p.name for p in params) == ['one', 'two', 'five'] + for p in params: + assert type(p) is parameters.Str + + # Test with no get_stuff: + class Missing(self.cls): + pass + o = Missing() + gen = o._filter_param_by_context('stuff') + e = raises(NotImplementedError, list, gen) + assert str(e) == 'Missing.get_stuff()' + + # Test when get_stuff is not callable: + class NotCallable(self.cls): + get_stuff = ('one', 'two') + o = NotCallable() + gen = o._filter_param_by_context('stuff') + e = raises(TypeError, list, gen) + assert str(e) == '%s.%s must be a callable; got %r' % ( + 'NotCallable', 'get_stuff', NotCallable.get_stuff + ) + + +class test_Command(ClassChecker): + """ + Test the `ipalib.frontend.Command` class. + """ + + _cls = frontend.Command + + def get_subcls(self): + """ + Return a standard subclass of `ipalib.frontend.Command`. + """ + class Rule(object): + def __init__(self, name): + self.name = name + + def __call__(self, _, value): + if value != self.name: + return _('must equal %r') % self.name + + default_from = parameters.DefaultFrom( + lambda arg: arg, + 'default_from' + ) + normalizer = lambda value: value.lower() + + class example(self.cls): + takes_options = ( + parameters.Str('option0', Rule('option0'), + normalizer=normalizer, + default_from=default_from, + ), + parameters.Str('option1', Rule('option1'), + normalizer=normalizer, + default_from=default_from, + ), + ) + return example + + def get_instance(self, args=tuple(), options=tuple()): + """ + Helper method used to test args and options. + """ + class example(self.cls): + takes_args = args + takes_options = options + o = example() + o.finalize() + return o + + def test_class(self): + """ + Test the `ipalib.frontend.Command` class. + """ + assert self.cls.takes_options == tuple() + assert self.cls.takes_args == tuple() + + def test_get_args(self): + """ + Test the `ipalib.frontend.Command.get_args` method. + """ + assert list(self.cls().get_args()) == [] + args = ('login', 'stuff') + o = self.get_instance(args=args) + assert tuple(o.get_args()) == args + + def test_get_options(self): + """ + Test the `ipalib.frontend.Command.get_options` method. + """ + options = list(self.cls().get_options()) + assert len(options) == 1 + assert options[0].name == 'version' + options = ('verbose', 'debug') + o = self.get_instance(options=options) + assert len(tuple(o.get_options())) == 3 + assert 'verbose' in tuple(o.get_options()) + assert 'debug' in tuple(o.get_options()) + + def test_args(self): + """ + Test the ``ipalib.frontend.Command.args`` instance attribute. + """ + assert self.cls().args is None + o = self.cls() + o.finalize() + assert type(o.args) is plugable.NameSpace + assert len(o.args) == 0 + args = ('destination', 'source?') + ns = self.get_instance(args=args).args + assert type(ns) is plugable.NameSpace + assert len(ns) == len(args) + assert list(ns) == ['destination', 'source'] + assert type(ns.destination) is parameters.Str + assert type(ns.source) is parameters.Str + assert ns.destination.required is True + assert ns.destination.multivalue is False + assert ns.source.required is False + assert ns.source.multivalue is False + + # Test TypeError: + e = raises(TypeError, self.get_instance, args=(u'whatever',)) + assert str(e) == TYPE_ERROR % ( + 'spec', (str, parameters.Param), u'whatever', unicode) + + # Test ValueError, required after optional: + e = raises(ValueError, self.get_instance, args=('arg1?', 'arg2')) + assert str(e) == 'arg2: required argument after optional' + + # Test ValueError, scalar after multivalue: + e = raises(ValueError, self.get_instance, args=('arg1+', 'arg2')) + assert str(e) == 'arg2: only final argument can be multivalue' + + def test_max_args(self): + """ + Test the ``ipalib.frontend.Command.max_args`` instance attribute. + """ + o = self.get_instance() + assert o.max_args == 0 + o = self.get_instance(args=('one?',)) + assert o.max_args == 1 + o = self.get_instance(args=('one', 'two?')) + assert o.max_args == 2 + o = self.get_instance(args=('one', 'multi+',)) + assert o.max_args is None + o = self.get_instance(args=('one', 'multi*',)) + assert o.max_args is None + + def test_options(self): + """ + Test the ``ipalib.frontend.Command.options`` instance attribute. + """ + assert self.cls().options is None + o = self.cls() + o.finalize() + assert type(o.options) is plugable.NameSpace + assert len(o.options) == 1 + options = ('target', 'files*') + ns = self.get_instance(options=options).options + assert type(ns) is plugable.NameSpace + assert len(ns) == len(options) + 1 + assert list(ns) == ['target', 'files', 'version'] + assert type(ns.target) is parameters.Str + assert type(ns.files) is parameters.Str + assert ns.target.required is True + assert ns.target.multivalue is False + assert ns.files.required is False + assert ns.files.multivalue is True + + def test_output(self): + """ + Test the ``ipalib.frontend.Command.output`` instance attribute. + """ + inst = self.cls() + assert inst.output is None + inst.finalize() + assert type(inst.output) is plugable.NameSpace + assert list(inst.output) == ['result'] + assert type(inst.output.result) is output.Output + + def test_iter_output(self): + """ + Test the ``ipalib.frontend.Command._iter_output`` instance attribute. + """ + class Example(self.cls): + pass + inst = Example() + + inst.has_output = tuple() + assert list(inst._iter_output()) == [] + + wrong = ['hello', 'world'] + inst.has_output = wrong + e = raises(TypeError, list, inst._iter_output()) + assert str(e) == 'Example.has_output: need a %r; got a %r: %r' % ( + tuple, list, wrong + ) + + wrong = ('hello', 17) + inst.has_output = wrong + e = raises(TypeError, list, inst._iter_output()) + assert str(e) == 'Example.has_output[1]: need a %r; got a %r: %r' % ( + (str, output.Output), int, 17 + ) + + okay = ('foo', output.Output('bar'), 'baz') + inst.has_output = okay + items = list(inst._iter_output()) + assert len(items) == 3 + assert list(o.name for o in items) == ['foo', 'bar', 'baz'] + for o in items: + assert type(o) is output.Output + + def test_soft_validate(self): + """ + Test the `ipalib.frontend.Command.soft_validate` method. + """ + class user_add(frontend.Command): + takes_args = parameters.Str('uid', + normalizer=lambda value: value.lower(), + default_from=lambda givenname, sn: givenname[0] + sn, + ) + + takes_options = ('givenname', 'sn') + + cmd = user_add() + cmd.env = config.Env(context='cli') + cmd.finalize() + assert list(cmd.params) == ['givenname', 'sn', 'uid', 'version'] + ret = cmd.soft_validate({}) + assert sorted(ret['values']) == ['version'] + assert sorted(ret['errors']) == ['givenname', 'sn', 'uid'] + assert cmd.soft_validate(dict(givenname=u'First', sn=u'Last')) == dict( + values=dict(givenname=u'First', sn=u'Last', uid=u'flast', + version=None), + errors=dict(), + ) + + def test_convert(self): + """ + Test the `ipalib.frontend.Command.convert` method. + """ + kw = dict( + option0=u'1.5', + option1=u'7', + ) + o = self.subcls() + o.finalize() + for (key, value) in o.convert(**kw).iteritems(): + assert_equal(unicode(kw[key]), value) + + def test_normalize(self): + """ + Test the `ipalib.frontend.Command.normalize` method. + """ + kw = dict( + option0=u'OPTION0', + option1=u'OPTION1', + ) + norm = dict((k, v.lower()) for (k, v) in kw.items()) + sub = self.subcls() + sub.finalize() + assert sub.normalize(**kw) == norm + + def test_get_default(self): + """ + Test the `ipalib.frontend.Command.get_default` method. + """ + # FIXME: Add an updated unit tests for get_default() + + def test_default_from_chaining(self): + """ + Test chaining of parameters through default_from. + """ + class my_cmd(self.cls): + takes_options = ( + Str('option0'), + Str('option1', default_from=lambda option0: option0), + Str('option2', default_from=lambda option1: option1), + ) + + def run(self, *args, **options): + return dict(result=options) + + kw = dict(option0=u'some value') + + (api, home) = create_test_api() + api.finalize() + o = my_cmd() + o.set_api(api) + o.finalize() + e = o(**kw) + assert type(e) is dict + assert 'result' in e + assert 'option2' in e['result'] + assert e['result']['option2'] == u'some value' + + def test_validate(self): + """ + Test the `ipalib.frontend.Command.validate` method. + """ + + sub = self.subcls() + sub.env = config.Env(context='cli') + sub.finalize() + + # Check with valid values + okay = dict( + option0=u'option0', + option1=u'option1', + another_option='some value', + version=API_VERSION, + ) + sub.validate(**okay) + + # Check with an invalid value + fail = dict(okay) + fail['option0'] = u'whatever' + e = raises(errors.ValidationError, sub.validate, **fail) + assert_equal(e.name, 'option0') + assert_equal(e.value, u'whatever') + assert_equal(e.error, u"must equal 'option0'") + assert e.rule.__class__.__name__ == 'Rule' + assert e.index is None + + # Check with a missing required arg + fail = dict(okay) + fail.pop('option1') + e = raises(errors.RequirementError, sub.validate, **fail) + assert e.name == 'option1' + + def test_execute(self): + """ + Test the `ipalib.frontend.Command.execute` method. + """ + o = self.cls() + e = raises(NotImplementedError, o.execute) + assert str(e) == 'Command.execute()' + + def test_args_options_2_params(self): + """ + Test the `ipalib.frontend.Command.args_options_2_params` method. + """ + + # Test that ZeroArgumentError is raised: + o = self.get_instance() + e = raises(errors.ZeroArgumentError, o.args_options_2_params, 1) + assert e.name == 'example' + + # Test that MaxArgumentError is raised (count=1) + o = self.get_instance(args=('one?',)) + e = raises(errors.MaxArgumentError, o.args_options_2_params, 1, 2) + assert e.name == 'example' + assert e.count == 1 + assert str(e) == "command 'example' takes at most 1 argument" + + # Test that MaxArgumentError is raised (count=2) + o = self.get_instance(args=('one', 'two?')) + e = raises(errors.MaxArgumentError, o.args_options_2_params, 1, 2, 3) + assert e.name == 'example' + assert e.count == 2 + assert str(e) == "command 'example' takes at most 2 arguments" + + # Test that OptionError is raised when an extra option is given: + o = self.get_instance() + e = raises(errors.OptionError, o.args_options_2_params, bad_option=True) + assert e.option == 'bad_option' + + # Test that OverlapError is raised: + o = self.get_instance(args=('one', 'two'), options=('three', 'four')) + e = raises(errors.OverlapError, o.args_options_2_params, + 1, 2, three=3, two=2, four=4, one=1) + assert e.names == ['one', 'two'] + + # Test the permutations: + o = self.get_instance(args=('one', 'two*'), options=('three', 'four')) + mthd = o.args_options_2_params + assert mthd() == dict() + assert mthd(1) == dict(one=1) + assert mthd(1, 2) == dict(one=1, two=(2,)) + assert mthd(1, 21, 22, 23) == dict(one=1, two=(21, 22, 23)) + assert mthd(1, (21, 22, 23)) == dict(one=1, two=(21, 22, 23)) + assert mthd(three=3, four=4) == dict(three=3, four=4) + assert mthd(three=3, four=4, one=1, two=2) == \ + dict(one=1, two=2, three=3, four=4) + assert mthd(1, 21, 22, 23, three=3, four=4) == \ + dict(one=1, two=(21, 22, 23), three=3, four=4) + assert mthd(1, (21, 22, 23), three=3, four=4) == \ + dict(one=1, two=(21, 22, 23), three=3, four=4) + + def test_args_options_2_entry(self): + """ + Test `ipalib.frontend.Command.args_options_2_entry` method. + """ + class my_cmd(self.cls): + takes_args = ( + parameters.Str('one', attribute=True), + parameters.Str('two', attribute=False), + ) + takes_options = ( + parameters.Str('three', attribute=True, multivalue=True), + parameters.Str('four', attribute=True, multivalue=False), + ) + + def run(self, *args, **kw): + return self.args_options_2_entry(*args, **kw) + + args = ('one', 'two') + kw = dict(three=('three1', 'three2'), four='four') + + (api, home) = create_test_api() + api.finalize() + o = my_cmd() + o.set_api(api) + o.finalize() + e = o.run(*args, **kw) + assert type(e) is dict + assert 'one' in e + assert 'two' not in e + assert 'three' in e + assert 'four' in e + assert e['one'] == 'one' + assert e['three'] == ['three1', 'three2'] + assert e['four'] == 'four' + + def test_params_2_args_options(self): + """ + Test the `ipalib.frontend.Command.params_2_args_options` method. + """ + o = self.get_instance(args='one', options='two') + assert o.params_2_args_options() == ((None,), {}) + assert o.params_2_args_options(one=1) == ((1,), {}) + assert o.params_2_args_options(two=2) == ((None,), dict(two=2)) + assert o.params_2_args_options(two=2, one=1) == ((1,), dict(two=2)) + + def test_run(self): + """ + Test the `ipalib.frontend.Command.run` method. + """ + class my_cmd(self.cls): + def execute(self, *args, **kw): + return ('execute', args, kw) + + def forward(self, *args, **kw): + return ('forward', args, kw) + + args = ('Hello,', 'world,') + kw = dict(how_are='you', on_this='fine day?', version=API_VERSION) + + # Test in server context: + (api, home) = create_test_api(in_server=True) + api.finalize() + o = my_cmd() + o.set_api(api) + assert o.run.im_func is self.cls.run.im_func + out = o.run(*args, **kw) + assert ('execute', args, kw) == out + + # Test in non-server context + (api, home) = create_test_api(in_server=False) + api.finalize() + o = my_cmd() + o.set_api(api) + assert o.run.im_func is self.cls.run.im_func + assert ('forward', args, kw) == o.run(*args, **kw) + + def test_messages(self): + """ + Test correct handling of messages + """ + class TestMessage(messages.PublicMessage): + type = 'info' + format = 'This is a message.' + errno = 1234 + + class my_cmd(self.cls): + def execute(self, *args, **kw): + result = {'name': 'execute'} + messages.add_message(kw['version'], result, TestMessage()) + return result + + def forward(self, *args, **kw): + result = {'name': 'forward'} + messages.add_message(kw['version'], result, TestMessage()) + return result + + args = ('Hello,', 'world,') + kw = dict(how_are='you', on_this='fine day?', version=API_VERSION) + + expected = [TestMessage().to_dict()] + + # Test in server context: + (api, home) = create_test_api(in_server=True) + api.finalize() + o = my_cmd() + o.set_api(api) + assert o.run.im_func is self.cls.run.im_func + assert {'name': 'execute', 'messages': expected} == o.run(*args, **kw) + + # Test in non-server context + (api, home) = create_test_api(in_server=False) + api.finalize() + o = my_cmd() + o.set_api(api) + assert o.run.im_func is self.cls.run.im_func + assert {'name': 'forward', 'messages': expected} == o.run(*args, **kw) + + def test_validate_output_basic(self): + """ + Test the `ipalib.frontend.Command.validate_output` method. + """ + class Example(self.cls): + has_output = ('foo', 'bar', 'baz') + + inst = Example() + inst.finalize() + + # Test with wrong type: + wrong = ('foo', 'bar', 'baz') + e = raises(TypeError, inst.validate_output, wrong) + assert str(e) == '%s.validate_output(): need a %r; got a %r: %r' % ( + 'Example', dict, tuple, wrong + ) + + # Test with a missing keys: + wrong = dict(bar='hello') + e = raises(ValueError, inst.validate_output, wrong) + assert str(e) == '%s.validate_output(): missing keys %r in %r' % ( + 'Example', ['baz', 'foo'], wrong + ) + + # Test with extra keys: + wrong = dict(foo=1, bar=2, baz=3, fee=4, azz=5) + e = raises(ValueError, inst.validate_output, wrong) + assert str(e) == '%s.validate_output(): unexpected keys %r in %r' % ( + 'Example', ['azz', 'fee'], wrong + ) + + # Test with different keys: + wrong = dict(baz=1, xyzzy=2, quux=3) + e = raises(ValueError, inst.validate_output, wrong) + assert str(e) == '%s.validate_output(): missing keys %r in %r' % ( + 'Example', ['bar', 'foo'], wrong + ), str(e) + + def test_validate_output_per_type(self): + """ + Test `ipalib.frontend.Command.validate_output` per-type validation. + """ + + class Complex(self.cls): + has_output = ( + output.Output('foo', int), + output.Output('bar', list), + ) + inst = Complex() + inst.finalize() + + wrong = dict(foo=17.9, bar=[18]) + e = raises(TypeError, inst.validate_output, wrong) + assert str(e) == '%s:\n output[%r]: need %r; got %r: %r' % ( + 'Complex.validate_output()', 'foo', int, float, 17.9 + ) + + wrong = dict(foo=18, bar=17) + e = raises(TypeError, inst.validate_output, wrong) + assert str(e) == '%s:\n output[%r]: need %r; got %r: %r' % ( + 'Complex.validate_output()', 'bar', list, int, 17 + ) + + def test_validate_output_nested(self): + """ + Test `ipalib.frontend.Command.validate_output` nested validation. + """ + + class Subclass(output.ListOfEntries): + pass + + # Test nested validation: + class nested(self.cls): + has_output = ( + output.Output('hello', int), + Subclass('world'), + ) + inst = nested() + inst.finalize() + okay = dict(foo='bar') + nope = ('aye', 'bee') + + wrong = dict(hello=18, world=[okay, nope, okay]) + e = raises(TypeError, inst.validate_output, wrong) + assert str(e) == output.emsg % ( + 'nested', 'Subclass', 'world', 1, dict, tuple, nope + ) + + wrong = dict(hello=18, world=[okay, okay, okay, okay, nope]) + e = raises(TypeError, inst.validate_output, wrong) + assert str(e) == output.emsg % ( + 'nested', 'Subclass', 'world', 4, dict, tuple, nope + ) + + def test_get_output_params(self): + """ + Test the `ipalib.frontend.Command.get_output_params` method. + """ + class example(self.cls): + has_output_params = ( + 'one', + 'two', + 'three', + ) + takes_args = ( + 'foo', + ) + takes_options = ( + Str('bar', flags='no_output'), + 'baz', + ) + + inst = example() + assert list(inst.get_output_params()) == ['one', 'two', 'three'] + inst.finalize() + assert list(inst.get_output_params()) == [ + 'one', 'two', 'three', inst.params.foo, inst.params.baz + ] + assert list(inst.output_params) == ['one', 'two', 'three', 'foo', 'baz'] + + +class test_LocalOrRemote(ClassChecker): + """ + Test the `ipalib.frontend.LocalOrRemote` class. + """ + _cls = frontend.LocalOrRemote + + def test_init(self): + """ + Test the `ipalib.frontend.LocalOrRemote.__init__` method. + """ + o = self.cls() + o.finalize() + assert list(o.args) == [] + assert list(o.options) == ['server', 'version'] + op = o.options.server + assert op.required is False + assert op.default is False + + def test_run(self): + """ + Test the `ipalib.frontend.LocalOrRemote.run` method. + """ + class example(self.cls): + takes_args = 'key?' + + def forward(self, *args, **options): + return dict(result=('forward', args, options)) + + def execute(self, *args, **options): + return dict(result=('execute', args, options)) + + # Test when in_server=False: + (api, home) = create_test_api(in_server=False) + api.register(example) + api.finalize() + cmd = api.Command.example + assert cmd(version=u'2.47') == dict( + result=('execute', (None,), dict(version=u'2.47', server=False)) + ) + assert cmd(u'var', version=u'2.47') == dict( + result=('execute', (u'var',), dict(version=u'2.47', server=False)) + ) + assert cmd(server=True, version=u'2.47') == dict( + result=('forward', (None,), dict(version=u'2.47', server=True)) + ) + assert cmd(u'var', server=True, version=u'2.47') == dict( + result=('forward', (u'var',), dict(version=u'2.47', server=True)) + ) + + # Test when in_server=True (should always call execute): + (api, home) = create_test_api(in_server=True) + api.register(example) + api.finalize() + cmd = api.Command.example + assert cmd(version=u'2.47') == dict( + result=('execute', (None,), dict(version=u'2.47', server=False)) + ) + assert cmd(u'var', version=u'2.47') == dict( + result=('execute', (u'var',), dict(version=u'2.47', server=False)) + ) + assert cmd(server=True, version=u'2.47') == dict( + result=('execute', (None,), dict(version=u'2.47', server=True)) + ) + assert cmd(u'var', server=True, version=u'2.47') == dict( + result=('execute', (u'var',), dict(version=u'2.47', server=True)) + ) + + +class test_Object(ClassChecker): + """ + Test the `ipalib.frontend.Object` class. + """ + _cls = frontend.Object + + def test_class(self): + """ + Test the `ipalib.frontend.Object` class. + """ + assert self.cls.backend is None + assert self.cls.methods is None + assert self.cls.properties is None + assert self.cls.params is None + assert self.cls.params_minus_pk is None + assert self.cls.takes_params == tuple() + + def test_init(self): + """ + Test the `ipalib.frontend.Object.__init__` method. + """ + o = self.cls() + assert o.backend is None + assert o.methods is None + assert o.properties is None + assert o.params is None + assert o.params_minus_pk is None + assert o.properties is None + + def test_set_api(self): + """ + Test the `ipalib.frontend.Object.set_api` method. + """ + # Setup for test: + class DummyAttribute(object): + def __init__(self, obj_name, attr_name, name=None): + self.obj_name = obj_name + self.attr_name = attr_name + if name is None: + self.name = '%s_%s' % (obj_name, attr_name) + else: + self.name = name + self.param = frontend.create_param(attr_name) + + def __clone__(self, attr_name): + return self.__class__( + self.obj_name, + self.attr_name, + getattr(self, attr_name) + ) + + def get_attributes(cnt, format): + for name in ['other', 'user', 'another']: + for i in xrange(cnt): + yield DummyAttribute(name, format % i) + + cnt = 10 + formats = dict( + methods='method_%d', + properties='property_%d', + ) + + + _d = dict( + Method=plugable.NameSpace( + get_attributes(cnt, formats['methods']) + ), + Property=plugable.NameSpace( + get_attributes(cnt, formats['properties']) + ), + ) + api = plugable.MagicDict(_d) + assert len(api.Method) == cnt * 3 + assert len(api.Property) == cnt * 3 + + class user(self.cls): + pass + + # Actually perform test: + o = user() + o.set_api(api) + assert read_only(o, 'api') is api + for name in ['methods', 'properties']: + namespace = getattr(o, name) + assert isinstance(namespace, plugable.NameSpace) + assert len(namespace) == cnt + f = formats[name] + for i in xrange(cnt): + attr_name = f % i + attr = namespace[attr_name] + assert isinstance(attr, DummyAttribute) + assert attr is getattr(namespace, attr_name) + assert attr.obj_name == 'user' + assert attr.attr_name == attr_name + assert attr.name == '%s_%s' % ('user', attr_name) + + # Test params instance attribute + o = self.cls() + o.set_api(api) + ns = o.params + assert type(ns) is plugable.NameSpace + assert len(ns) == 0 + class example(self.cls): + takes_params = ('banana', 'apple') + o = example() + o.set_api(api) + ns = o.params + assert type(ns) is plugable.NameSpace + assert len(ns) == 2, repr(ns) + assert list(ns) == ['banana', 'apple'] + for p in ns(): + assert type(p) is parameters.Str + assert p.required is True + assert p.multivalue is False + + def test_primary_key(self): + """ + Test the `ipalib.frontend.Object.primary_key` attribute. + """ + (api, home) = create_test_api() + api.finalize() + + # Test with no primary keys: + class example1(self.cls): + takes_params = ( + 'one', + 'two', + ) + o = example1() + o.set_api(api) + assert o.primary_key is None + + # Test with 1 primary key: + class example2(self.cls): + takes_params = ( + 'one', + 'two', + parameters.Str('three', primary_key=True), + 'four', + ) + o = example2() + o.set_api(api) + pk = o.primary_key + assert type(pk) is parameters.Str + assert pk.name == 'three' + assert pk.primary_key is True + assert o.params[2] is o.primary_key + assert isinstance(o.params_minus_pk, plugable.NameSpace) + assert list(o.params_minus_pk) == ['one', 'two', 'four'] + + # Test with multiple primary_key: + class example3(self.cls): + takes_params = ( + parameters.Str('one', primary_key=True), + parameters.Str('two', primary_key=True), + 'three', + parameters.Str('four', primary_key=True), + ) + o = example3() + o.set_api(api) + e = raises(ValueError, o.finalize) + assert str(e) == \ + 'example3 (Object) has multiple primary keys: one, two, four' + + def test_backend(self): + """ + Test the `ipalib.frontend.Object.backend` attribute. + """ + (api, home) = create_test_api() + class ldap(backend.Backend): + whatever = 'It worked!' + api.register(ldap) + class user(frontend.Object): + backend_name = 'ldap' + api.register(user) + api.finalize() + b = api.Object.user.backend + assert isinstance(b, ldap) + assert b.whatever == 'It worked!' + + def test_get_dn(self): + """ + Test the `ipalib.frontend.Object.get_dn` method. + """ + o = self.cls() + e = raises(NotImplementedError, o.get_dn, 'primary key') + assert str(e) == 'Object.get_dn()' + class user(self.cls): + pass + o = user() + e = raises(NotImplementedError, o.get_dn, 'primary key') + assert str(e) == 'user.get_dn()' + + def test_params_minus(self): + """ + Test the `ipalib.frontend.Object.params_minus` method. + """ + class example(self.cls): + takes_params = ('one', 'two', 'three', 'four') + o = example() + (api, home) = create_test_api() + o.set_api(api) + p = o.params + assert tuple(o.params_minus()) == tuple(p()) + assert tuple(o.params_minus([])) == tuple(p()) + assert tuple(o.params_minus('two', 'three')) == (p.one, p.four) + assert tuple(o.params_minus(['two', 'three'])) == (p.one, p.four) + assert tuple(o.params_minus(p.two, p.three)) == (p.one, p.four) + assert tuple(o.params_minus([p.two, p.three])) == (p.one, p.four) + ns = NameSpace([p.two, p.three]) + assert tuple(o.params_minus(ns)) == (p.one, p.four) + + +class test_Attribute(ClassChecker): + """ + Test the `ipalib.frontend.Attribute` class. + """ + _cls = frontend.Attribute + + def test_class(self): + """ + Test the `ipalib.frontend.Attribute` class. + """ + assert self.cls.__bases__ == (plugable.Plugin,) + assert type(self.cls.obj) is property + assert type(self.cls.obj_name) is property + assert type(self.cls.attr_name) is property + + def test_init(self): + """ + Test the `ipalib.frontend.Attribute.__init__` method. + """ + class user_add(self.cls): + pass + o = user_add() + assert read_only(o, 'obj') is None + assert read_only(o, 'obj_name') == 'user' + assert read_only(o, 'attr_name') == 'add' + + def test_set_api(self): + """ + Test the `ipalib.frontend.Attribute.set_api` method. + """ + user_obj = 'The user frontend.Object instance' + class api(object): + Object = dict(user=user_obj) + class user_add(self.cls): + pass + o = user_add() + assert read_only(o, 'api') is None + assert read_only(o, 'obj') is None + o.set_api(api) + assert read_only(o, 'api') is api + assert read_only(o, 'obj') is user_obj + + +class test_Method(ClassChecker): + """ + Test the `ipalib.frontend.Method` class. + """ + _cls = frontend.Method + + def get_api(self, args=tuple(), options=tuple()): + """ + Return a finalized `ipalib.plugable.API` instance. + """ + (api, home) = create_test_api() + class user(frontend.Object): + takes_params = ( + 'givenname', + 'sn', + frontend.Param('uid', primary_key=True), + 'initials', + ) + class user_verb(self.cls): + takes_args = args + takes_options = options + api.register(user) + api.register(user_verb) + api.finalize() + return api + + def test_class(self): + """ + Test the `ipalib.frontend.Method` class. + """ + assert self.cls.__bases__ == (frontend.Attribute, frontend.Command) + + def test_init(self): + """ + Test the `ipalib.frontend.Method.__init__` method. + """ + class user_add(self.cls): + pass + o = user_add() + assert o.name == 'user_add' + assert o.obj_name == 'user' + assert o.attr_name == 'add' + + +class test_Property(ClassChecker): + """ + Test the `ipalib.frontend.Property` class. + """ + _cls = frontend.Property + + def get_subcls(self): + """ + Return a standard subclass of `ipalib.frontend.Property`. + """ + class user_givenname(self.cls): + 'User first name' + + @frontend.rule + def rule0_lowercase(self, value): + if not value.islower(): + return 'Must be lowercase' + return user_givenname + + def test_class(self): + """ + Test the `ipalib.frontend.Property` class. + """ + assert self.cls.__bases__ == (frontend.Attribute,) + assert self.cls.klass is parameters.Str + + def test_init(self): + """ + Test the `ipalib.frontend.Property.__init__` method. + """ + o = self.subcls() + assert len(o.rules) == 1 + assert o.rules[0].__name__ == 'rule0_lowercase' + param = o.param + assert isinstance(param, parameters.Str) + assert param.name == 'givenname' + assert unicode(param.doc) == u'User first name' diff --git a/ipatests/test_ipalib/test_messages.py b/ipatests/test_ipalib/test_messages.py new file mode 100644 index 00000000..686bf8dd --- /dev/null +++ b/ipatests/test_ipalib/test_messages.py @@ -0,0 +1,89 @@ +# Authors: +# Petr Viktorin +# +# Copyright (C) 1012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.messages` module. +""" + +from ipalib import messages +from ipalib.capabilities import capabilities +from ipatests.test_ipalib import test_errors + + +class HelloMessage(messages.PublicMessage): + type = 'info' + format = '%(greeting)s, %(object)s!' + errno = 1234 + + +class test_PublicMessage(test_errors.test_PublicError): + """Test public messages""" + # The messages are a lot like public errors; defer testing to that. + klass = messages.PublicMessage + required_classes = (UserWarning, messages.PublicMessage) + + +class test_PublicMessages(test_errors.BaseMessagesTest): + message_list = messages.public_messages + errno_range = xrange(10000, 19999) + required_classes = (UserWarning, messages.PublicMessage) + texts = messages._texts + + def extratest(self, cls): + if cls is not messages.PublicMessage: + assert cls.type in ('debug', 'info', 'warning', 'error') + + +def test_to_dict(): + expected = dict( + name='HelloMessage', + type='info', + message='Hello, world!', + code=1234, + ) + + assert HelloMessage(greeting='Hello', object='world').to_dict() == expected + + +def test_add_message(): + result = {} + + assert capabilities['messages'] == u'2.52' + + messages.add_message(u'2.52', result, + HelloMessage(greeting='Hello', object='world')) + messages.add_message(u'2.1', result, + HelloMessage(greeting="'Lo", object='version')) + messages.add_message(u'2.60', result, + HelloMessage(greeting='Hi', object='version')) + + assert result == {'messages': [ + dict( + name='HelloMessage', + type='info', + message='Hello, world!', + code=1234, + ), + dict( + name='HelloMessage', + type='info', + message='Hi, version!', + code=1234, + ) + ]} diff --git a/ipatests/test_ipalib/test_output.py b/ipatests/test_ipalib/test_output.py new file mode 100644 index 00000000..15ef11e1 --- /dev/null +++ b/ipatests/test_ipalib/test_output.py @@ -0,0 +1,89 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.output` module. +""" + +from ipatests.util import raises, ClassChecker +from ipalib import output +from ipalib.frontend import Command +from ipalib import _ + +class test_Output(ClassChecker): + """ + Test the `ipalib.output.Output` class. + """ + + _cls = output.Output + + def test_init(self): + """ + Test the `ipalib.output.Output.__init__` method. + """ + o = self.cls('result') + assert o.name == 'result' + assert o.type is None + assert o.doc is None + + def test_repr(self): + """ + Test the `ipalib.output.Output.__repr__` method. + """ + o = self.cls('aye') + assert repr(o) == "Output('aye', None, None)" + o = self.cls('aye', type=int, doc='An A, aye?') + assert repr(o) == "Output('aye', %r, 'An A, aye?')" % int + + class Entry(self.cls): + pass + o = Entry('aye') + assert repr(o) == "Entry('aye', None, None)" + o = Entry('aye', type=int, doc='An A, aye?') + assert repr(o) == "Entry('aye', %r, 'An A, aye?')" % int + + +class test_ListOfEntries(ClassChecker): + """ + Test the `ipalib.output.ListOfEntries` class. + """ + + _cls = output.ListOfEntries + + def test_validate(self): + """ + Test the `ipalib.output.ListOfEntries.validate` method. + """ + class example(Command): + pass + cmd = example() + inst = self.cls('stuff') + + okay = dict(foo='bar') + nope = ('aye', 'bee') + + e = raises(TypeError, inst.validate, cmd, [okay, okay, nope]) + assert str(e) == output.emsg % ( + 'example', 'ListOfEntries', 'stuff', 2, dict, tuple, nope + ) + + e = raises(TypeError, inst.validate, cmd, [nope, okay, nope]) + assert str(e) == output.emsg % ( + 'example', 'ListOfEntries', 'stuff', 0, dict, tuple, nope + ) diff --git a/ipatests/test_ipalib/test_parameters.py b/ipatests/test_ipalib/test_parameters.py new file mode 100644 index 00000000..71acfce7 --- /dev/null +++ b/ipatests/test_ipalib/test_parameters.py @@ -0,0 +1,1533 @@ +# -*- coding: utf-8 -*- +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.parameters` module. +""" + +import re +import sys +from types import NoneType +from decimal import Decimal +from inspect import isclass +from ipatests.util import raises, ClassChecker, read_only +from ipatests.util import dummy_ugettext, assert_equal +from ipatests.data import binary_bytes, utf8_bytes, unicode_str +from ipalib import parameters, text, errors, config +from ipalib.constants import TYPE_ERROR, CALLABLE_ERROR, NULLS +from ipalib.errors import ValidationError, ConversionError +from ipalib import _ +from xmlrpclib import MAXINT, MININT + +class test_DefaultFrom(ClassChecker): + """ + Test the `ipalib.parameters.DefaultFrom` class. + """ + _cls = parameters.DefaultFrom + + def test_init(self): + """ + Test the `ipalib.parameters.DefaultFrom.__init__` method. + """ + def callback(*args): + return args + keys = ('givenname', 'sn') + o = self.cls(callback, *keys) + assert read_only(o, 'callback') is callback + assert read_only(o, 'keys') == keys + lam = lambda first, last: first[0] + last + o = self.cls(lam) + assert read_only(o, 'keys') == ('first', 'last') + + # Test that TypeError is raised when callback isn't callable: + e = raises(TypeError, self.cls, 'whatever') + assert str(e) == CALLABLE_ERROR % ('callback', 'whatever', str) + + # Test that TypeError is raised when a key isn't an str: + e = raises(TypeError, self.cls, callback, 'givenname', 17) + assert str(e) == TYPE_ERROR % ('keys', str, 17, int) + + # Test that ValueError is raised when inferring keys from a callback + # which has *args: + e = raises(ValueError, self.cls, lambda foo, *args: None) + assert str(e) == "callback: variable-length argument list not allowed" + + # Test that ValueError is raised when inferring keys from a callback + # which has **kwargs: + e = raises(ValueError, self.cls, lambda foo, **kwargs: None) + assert str(e) == "callback: variable-length argument list not allowed" + + def test_repr(self): + """ + Test the `ipalib.parameters.DefaultFrom.__repr__` method. + """ + def stuff(one, two): + pass + + o = self.cls(stuff) + assert repr(o) == "DefaultFrom(stuff, 'one', 'two')" + + o = self.cls(stuff, 'aye', 'bee', 'see') + assert repr(o) == "DefaultFrom(stuff, 'aye', 'bee', 'see')" + + cb = lambda first, last: first[0] + last + + o = self.cls(cb) + assert repr(o) == "DefaultFrom(, 'first', 'last')" + + o = self.cls(cb, 'aye', 'bee', 'see') + assert repr(o) == "DefaultFrom(, 'aye', 'bee', 'see')" + + def test_call(self): + """ + Test the `ipalib.parameters.DefaultFrom.__call__` method. + """ + def callback(givenname, sn): + return givenname[0] + sn[0] + keys = ('givenname', 'sn') + o = self.cls(callback, *keys) + kw = dict( + givenname='John', + sn='Public', + hello='world', + ) + assert o(**kw) == 'JP' + assert o() is None + for key in ('givenname', 'sn'): + kw_copy = dict(kw) + del kw_copy[key] + assert o(**kw_copy) is None + + # Test using implied keys: + o = self.cls(lambda first, last: first[0] + last) + assert o(first='john', last='doe') == 'jdoe' + assert o(first='', last='doe') is None + assert o(one='john', two='doe') is None + + # Test that co_varnames slice is used: + def callback2(first, last): + letter = first[0] + return letter + last + o = self.cls(callback2) + assert o.keys == ('first', 'last') + assert o(first='john', last='doe') == 'jdoe' + + +def test_parse_param_spec(): + """ + Test the `ipalib.parameters.parse_param_spec` function. + """ + f = parameters.parse_param_spec + assert f('name') == ('name', dict(required=True, multivalue=False)) + assert f('name?') == ('name', dict(required=False, multivalue=False)) + assert f('name*') == ('name', dict(required=False, multivalue=True)) + assert f('name+') == ('name', dict(required=True, multivalue=True)) + + # Make sure other "funny" endings are *not* treated special: + assert f('name^') == ('name^', dict(required=True, multivalue=False)) + + # Test that TypeError is raised if spec isn't an str: + e = raises(TypeError, f, u'name?') + assert str(e) == TYPE_ERROR % ('spec', str, u'name?', unicode) + + +class DummyRule(object): + def __init__(self, error=None): + assert error is None or type(error) is unicode + self.error = error + self.reset() + + def __call__(self, *args): + self.calls.append(args) + return self.error + + def reset(self): + self.calls = [] + + +class test_Param(ClassChecker): + """ + Test the `ipalib.parameters.Param` class. + """ + _cls = parameters.Param + + def test_init(self): + """ + Test the `ipalib.parameters.Param.__init__` method. + """ + name = 'my_param' + o = self.cls(name) + assert o.param_spec is name + assert o.name is name + assert o.nice == "Param('my_param')" + assert o.password is False + assert o.__islocked__() is True + + # Test default rules: + assert o.rules == tuple() + assert o.class_rules == tuple() + assert o.all_rules == tuple() + + # Test default kwarg values: + assert o.cli_name is name + assert o.label.msg == 'my_param' + assert o.doc.msg == 'my_param' + assert o.required is True + assert o.multivalue is False + assert o.primary_key is False + assert o.normalizer is None + assert o.default is None + assert o.default_from is None + assert o.autofill is False + assert o.query is False + assert o.attribute is False + assert o.include is None + assert o.exclude is None + assert o.flags == frozenset() + assert o.sortorder == 2 + assert o.csv is False + + # Test that doc defaults from label: + o = self.cls('my_param', doc=_('Hello world')) + assert o.label.msg == 'my_param' + assert o.doc.msg == 'Hello world' + + o = self.cls('my_param', label='My Param') + assert o.label == 'My Param' + assert o.doc == 'My Param' + + + # Test that ValueError is raised when a kwarg from a subclass + # conflicts with an attribute: + class Subclass(self.cls): + kwargs = self.cls.kwargs + ( + ('convert', callable, None), + ) + e = raises(ValueError, Subclass, name) + assert str(e) == "kwarg 'convert' conflicts with attribute on Subclass" + + # Test type validation of keyword arguments: + class Subclass(self.cls): + kwargs = self.cls.kwargs + ( + ('extra1', bool, True), + ('extra2', str, 'Hello'), + ('extra3', (int, float), 42), + ('extra4', callable, lambda whatever: whatever + 7), + ) + o = Subclass('my_param') # Test with no **kw: + for (key, kind, default) in o.kwargs: + # Test with a type invalid for all: + value = object() + kw = {key: value} + e = raises(TypeError, Subclass, 'my_param', **kw) + if kind is callable: + assert str(e) == CALLABLE_ERROR % (key, value, type(value)) + else: + assert str(e) == TYPE_ERROR % (key, kind, value, type(value)) + # Test with None: + kw = {key: None} + Subclass('my_param', **kw) + + # Test when using unknown kwargs: + e = raises(TypeError, self.cls, 'my_param', + flags=['hello', 'world'], + whatever=u'Hooray!', + ) + assert str(e) == \ + "Param('my_param'): takes no such kwargs: 'whatever'" + e = raises(TypeError, self.cls, 'my_param', great='Yes', ape='he is!') + assert str(e) == \ + "Param('my_param'): takes no such kwargs: 'ape', 'great'" + + # Test that ValueError is raised if you provide both include and + # exclude: + e = raises(ValueError, self.cls, 'my_param', + include=['server', 'foo'], + exclude=['client', 'bar'], + ) + assert str(e) == '%s: cannot have both %s=%r and %s=%r' % ( + "Param('my_param')", + 'include', frozenset(['server', 'foo']), + 'exclude', frozenset(['client', 'bar']), + ) + + # Test that ValueError is raised if csv is set and multivalue is not set: + e = raises(ValueError, self.cls, 'my_param', csv=True) + assert str(e) == '%s: cannot have csv without multivalue' % "Param('my_param')" + + # Test that default_from gets set: + call = lambda first, last: first[0] + last + o = self.cls('my_param', default_from=call) + assert type(o.default_from) is parameters.DefaultFrom + assert o.default_from.callback is call + + def test_repr(self): + """ + Test the `ipalib.parameters.Param.__repr__` method. + """ + for name in ['name', 'name?', 'name*', 'name+']: + o = self.cls(name) + assert repr(o) == 'Param(%r)' % name + o = self.cls('name', required=False) + assert repr(o) == "Param('name', required=False)" + o = self.cls('name', multivalue=True) + assert repr(o) == "Param('name', multivalue=True)" + + def test_use_in_context(self): + """ + Test the `ipalib.parameters.Param.use_in_context` method. + """ + set1 = ('one', 'two', 'three') + set2 = ('four', 'five', 'six') + param1 = self.cls('param1') + param2 = self.cls('param2', include=set1) + param3 = self.cls('param3', exclude=set2) + for context in set1: + env = config.Env() + env.context = context + assert param1.use_in_context(env) is True, context + assert param2.use_in_context(env) is True, context + assert param3.use_in_context(env) is True, context + for context in set2: + env = config.Env() + env.context = context + assert param1.use_in_context(env) is True, context + assert param2.use_in_context(env) is False, context + assert param3.use_in_context(env) is False, context + + def test_safe_value(self): + """ + Test the `ipalib.parameters.Param.safe_value` method. + """ + values = (unicode_str, binary_bytes, utf8_bytes) + o = self.cls('my_param') + for value in values: + assert o.safe_value(value) is value + assert o.safe_value(None) is None + p = parameters.Password('my_passwd') + for value in values: + assert_equal(p.safe_value(value), u'********') + assert p.safe_value(None) is None + + def test_clone(self): + """ + Test the `ipalib.parameters.Param.clone` method. + """ + # Test with the defaults + orig = self.cls('my_param') + clone = orig.clone() + assert clone is not orig + assert type(clone) is self.cls + assert clone.name is orig.name + for (key, kind, default) in self.cls.kwargs: + assert getattr(clone, key) is getattr(orig, key) + + # Test with a param spec: + orig = self.cls('my_param*') + assert orig.param_spec == 'my_param*' + clone = orig.clone() + assert clone.param_spec == 'my_param' + assert clone is not orig + assert type(clone) is self.cls + for (key, kind, default) in self.cls.kwargs: + assert getattr(clone, key) is getattr(orig, key) + + # Test with overrides: + orig = self.cls('my_param*') + assert orig.required is False + assert orig.multivalue is True + clone = orig.clone(required=True) + assert clone is not orig + assert type(clone) is self.cls + assert clone.required is True + assert clone.multivalue is True + assert clone.param_spec == 'my_param' + assert clone.name == 'my_param' + + def test_clone_rename(self): + """ + Test the `ipalib.parameters.Param.clone` method. + """ + new_name = 'my_new_param' + + # Test with the defaults + orig = self.cls('my_param') + clone = orig.clone_rename(new_name) + assert clone is not orig + assert type(clone) is self.cls + assert clone.name == new_name + for (key, kind, default) in self.cls.kwargs: + assert getattr(clone, key) is getattr(orig, key) + + # Test with overrides: + orig = self.cls('my_param*') + assert orig.required is False + assert orig.multivalue is True + clone = orig.clone_rename(new_name, required=True) + assert clone is not orig + assert type(clone) is self.cls + assert clone.required is True + assert clone.multivalue is True + assert clone.param_spec == new_name + assert clone.name == new_name + + + def test_convert(self): + """ + Test the `ipalib.parameters.Param.convert` method. + """ + okay = ('Hello', u'Hello', 0, 4.2, True, False, unicode_str) + class Subclass(self.cls): + def _convert_scalar(self, value, index=None): + return value + + # Test when multivalue=False: + o = Subclass('my_param') + for value in NULLS: + assert o.convert(value) is None + assert o.convert(None) is None + for value in okay: + assert o.convert(value) is value + + # Test when multivalue=True: + o = Subclass('my_param', multivalue=True) + for value in NULLS: + assert o.convert(value) is None + assert o.convert(okay) == okay + assert o.convert(NULLS) is None + assert o.convert(okay + NULLS) == okay + assert o.convert(NULLS + okay) == okay + for value in okay: + assert o.convert(value) == (value,) + assert o.convert([None, value]) == (value,) + assert o.convert([value, None]) == (value,) + + def test_convert_scalar(self): + """ + Test the `ipalib.parameters.Param._convert_scalar` method. + """ + dummy = dummy_ugettext() + + # Test with correct type: + o = self.cls('my_param') + assert o._convert_scalar(None) is None + assert dummy.called() is False + # Test with incorrect type + e = raises(errors.ConversionError, o._convert_scalar, 'hello', index=17) + + def test_validate(self): + """ + Test the `ipalib.parameters.Param.validate` method. + """ + + # Test in default state (with no rules, no kwarg): + o = self.cls('my_param') + e = raises(errors.RequirementError, o.validate, None, 'cli') + assert e.name == 'my_param' + + # Test in default state that cli_name gets returned in the exception + # when context == 'cli' + o = self.cls('my_param', cli_name='short') + e = raises(errors.RequirementError, o.validate, None, 'cli') + assert e.name == 'short' + + # Test with required=False + o = self.cls('my_param', required=False) + assert o.required is False + assert o.validate(None, 'cli') is None + + # Test with query=True: + o = self.cls('my_param', query=True) + assert o.query is True + e = raises(errors.RequirementError, o.validate, None, 'cli') + assert_equal(e.name, 'my_param') + + # Test with multivalue=True: + o = self.cls('my_param', multivalue=True) + e = raises(TypeError, o.validate, [], 'cli') + assert str(e) == TYPE_ERROR % ('value', tuple, [], list) + e = raises(ValueError, o.validate, tuple(), 'cli') + assert str(e) == 'value: empty tuple must be converted to None' + + # Test with wrong (scalar) type: + e = raises(TypeError, o.validate, (None, None, 42, None), 'cli') + assert str(e) == TYPE_ERROR % ('my_param', NoneType, 42, int) + o = self.cls('my_param') + e = raises(TypeError, o.validate, 'Hello', 'cli') + assert str(e) == TYPE_ERROR % ('my_param', NoneType, 'Hello', str) + + class Example(self.cls): + type = int + + # Test with some rules and multivalue=False + pass1 = DummyRule() + pass2 = DummyRule() + fail = DummyRule(u'no good') + o = Example('example', pass1, pass2) + assert o.multivalue is False + assert o.validate(11, 'cli') is None + assert pass1.calls == [(text.ugettext, 11)] + assert pass2.calls == [(text.ugettext, 11)] + pass1.reset() + pass2.reset() + o = Example('example', pass1, pass2, fail) + e = raises(errors.ValidationError, o.validate, 42, 'cli') + assert e.name == 'example' + assert e.error == u'no good' + assert e.index is None + assert pass1.calls == [(text.ugettext, 42)] + assert pass2.calls == [(text.ugettext, 42)] + assert fail.calls == [(text.ugettext, 42)] + + # Test with some rules and multivalue=True + pass1 = DummyRule() + pass2 = DummyRule() + fail = DummyRule(u'this one is not good') + o = Example('example', pass1, pass2, multivalue=True) + assert o.multivalue is True + assert o.validate((3, 9), 'cli') is None + assert pass1.calls == [ + (text.ugettext, 3), + (text.ugettext, 9), + ] + assert pass2.calls == [ + (text.ugettext, 3), + (text.ugettext, 9), + ] + pass1.reset() + pass2.reset() + o = Example('multi_example', pass1, pass2, fail, multivalue=True) + assert o.multivalue is True + e = raises(errors.ValidationError, o.validate, (3, 9), 'cli') + assert e.name == 'multi_example' + assert e.error == u'this one is not good' + assert e.index == 0 + assert pass1.calls == [(text.ugettext, 3)] + assert pass2.calls == [(text.ugettext, 3)] + assert fail.calls == [(text.ugettext, 3)] + + def test_validate_scalar(self): + """ + Test the `ipalib.parameters.Param._validate_scalar` method. + """ + class MyParam(self.cls): + type = bool + okay = DummyRule() + o = MyParam('my_param', okay) + + # Test that TypeError is appropriately raised: + e = raises(TypeError, o._validate_scalar, 0) + assert str(e) == TYPE_ERROR % ('my_param', bool, 0, int) + e = raises(TypeError, o._validate_scalar, 'Hi', index=4) + assert str(e) == TYPE_ERROR % ('my_param', bool, 'Hi', str) + e = raises(TypeError, o._validate_scalar, True, index=3.0) + assert str(e) == TYPE_ERROR % ('index', int, 3.0, float) + + # Test with passing rule: + assert o._validate_scalar(True, index=None) is None + assert o._validate_scalar(False, index=None) is None + assert okay.calls == [ + (text.ugettext, True), + (text.ugettext, False), + ] + + # Test with a failing rule: + okay = DummyRule() + fail = DummyRule(u'this describes the error') + o = MyParam('my_param', okay, fail) + e = raises(errors.ValidationError, o._validate_scalar, True) + assert e.name == 'my_param' + assert e.error == u'this describes the error' + assert e.index is None + e = raises(errors.ValidationError, o._validate_scalar, False, index=2) + assert e.name == 'my_param' + assert e.error == u'this describes the error' + assert e.index == 2 + assert okay.calls == [ + (text.ugettext, True), + (text.ugettext, False), + ] + assert fail.calls == [ + (text.ugettext, True), + (text.ugettext, False), + ] + + def test_get_default(self): + """ + Test the `ipalib.parameters.Param.get_default` method. + """ + class PassThrough(object): + value = None + + def __call__(self, value): + assert self.value is None + assert value is not None + self.value = value + return value + + def reset(self): + assert self.value is not None + self.value = None + + class Str(self.cls): + type = unicode + + def __init__(self, name, **kw): + self._convert_scalar = PassThrough() + super(Str, self).__init__(name, **kw) + + # Test with only a static default: + o = Str('my_str', + normalizer=PassThrough(), + default=u'Static Default', + ) + assert_equal(o.get_default(), u'Static Default') + assert o._convert_scalar.value is None + assert o.normalizer.value is None + + # Test with default_from: + o = Str('my_str', + normalizer=PassThrough(), + default=u'Static Default', + default_from=lambda first, last: first[0] + last, + ) + assert_equal(o.get_default(), u'Static Default') + assert o._convert_scalar.value is None + assert o.normalizer.value is None + default = o.get_default(first=u'john', last='doe') + assert_equal(default, u'jdoe') + assert o._convert_scalar.value is default + assert o.normalizer.value is default + + +class test_Flag(ClassChecker): + """ + Test the `ipalib.parameters.Flag` class. + """ + _cls = parameters.Flag + + def test_init(self): + """ + Test the `ipalib.parameters.Flag.__init__` method. + """ + # Test with no kwargs: + o = self.cls('my_flag') + assert o.type is bool + assert isinstance(o, parameters.Bool) + assert o.autofill is True + assert o.default is False + + # Test that TypeError is raise if default is not a bool: + e = raises(TypeError, self.cls, 'my_flag', default=None) + assert str(e) == TYPE_ERROR % ('default', bool, None, NoneType) + + # Test with autofill=False, default=True + o = self.cls('my_flag', autofill=False, default=True) + assert o.autofill is True + assert o.default is True + + # Test when cloning: + orig = self.cls('my_flag') + for clone in [orig.clone(), orig.clone(autofill=False)]: + assert clone.autofill is True + assert clone.default is False + assert clone is not orig + assert type(clone) is self.cls + + # Test when cloning with default=True/False + orig = self.cls('my_flag') + assert orig.clone().default is False + assert orig.clone(default=True).default is True + orig = self.cls('my_flag', default=True) + assert orig.clone().default is True + assert orig.clone(default=False).default is False + + +class test_Data(ClassChecker): + """ + Test the `ipalib.parameters.Data` class. + """ + _cls = parameters.Data + + def test_init(self): + """ + Test the `ipalib.parameters.Data.__init__` method. + """ + o = self.cls('my_data') + assert o.type is NoneType + assert o.password is False + assert o.rules == tuple() + assert o.class_rules == tuple() + assert o.all_rules == tuple() + assert o.minlength is None + assert o.maxlength is None + assert o.length is None + assert o.pattern is None + + # Test mixing length with minlength or maxlength: + o = self.cls('my_data', length=5) + assert o.length == 5 + permutations = [ + dict(minlength=3), + dict(maxlength=7), + dict(minlength=3, maxlength=7), + ] + for kw in permutations: + o = self.cls('my_data', **kw) + for (key, value) in kw.iteritems(): + assert getattr(o, key) == value + e = raises(ValueError, self.cls, 'my_data', length=5, **kw) + assert str(e) == \ + "Data('my_data'): cannot mix length with minlength or maxlength" + + # Test when minlength or maxlength are less than 1: + e = raises(ValueError, self.cls, 'my_data', minlength=0) + assert str(e) == "Data('my_data'): minlength must be >= 1; got 0" + e = raises(ValueError, self.cls, 'my_data', maxlength=0) + assert str(e) == "Data('my_data'): maxlength must be >= 1; got 0" + + # Test when minlength > maxlength: + e = raises(ValueError, self.cls, 'my_data', minlength=22, maxlength=15) + assert str(e) == \ + "Data('my_data'): minlength > maxlength (minlength=22, maxlength=15)" + + # Test when minlength == maxlength + e = raises(ValueError, self.cls, 'my_data', minlength=7, maxlength=7) + assert str(e) == \ + "Data('my_data'): minlength == maxlength; use length=7 instead" + + +class test_Bytes(ClassChecker): + """ + Test the `ipalib.parameters.Bytes` class. + """ + _cls = parameters.Bytes + + def test_init(self): + """ + Test the `ipalib.parameters.Bytes.__init__` method. + """ + o = self.cls('my_bytes') + assert o.type is str + assert o.password is False + assert o.rules == tuple() + assert o.class_rules == tuple() + assert o.all_rules == tuple() + assert o.minlength is None + assert o.maxlength is None + assert o.length is None + assert o.pattern is None + assert o.re is None + + # Test mixing length with minlength or maxlength: + o = self.cls('my_bytes', length=5) + assert o.length == 5 + assert len(o.class_rules) == 1 + assert len(o.rules) == 0 + assert len(o.all_rules) == 1 + permutations = [ + dict(minlength=3), + dict(maxlength=7), + dict(minlength=3, maxlength=7), + ] + for kw in permutations: + o = self.cls('my_bytes', **kw) + assert len(o.class_rules) == len(kw) + assert len(o.rules) == 0 + assert len(o.all_rules) == len(kw) + for (key, value) in kw.iteritems(): + assert getattr(o, key) == value + e = raises(ValueError, self.cls, 'my_bytes', length=5, **kw) + assert str(e) == \ + "Bytes('my_bytes'): cannot mix length with minlength or maxlength" + + # Test when minlength or maxlength are less than 1: + e = raises(ValueError, self.cls, 'my_bytes', minlength=0) + assert str(e) == "Bytes('my_bytes'): minlength must be >= 1; got 0" + e = raises(ValueError, self.cls, 'my_bytes', maxlength=0) + assert str(e) == "Bytes('my_bytes'): maxlength must be >= 1; got 0" + + # Test when minlength > maxlength: + e = raises(ValueError, self.cls, 'my_bytes', minlength=22, maxlength=15) + assert str(e) == \ + "Bytes('my_bytes'): minlength > maxlength (minlength=22, maxlength=15)" + + # Test when minlength == maxlength + e = raises(ValueError, self.cls, 'my_bytes', minlength=7, maxlength=7) + assert str(e) == \ + "Bytes('my_bytes'): minlength == maxlength; use length=7 instead" + + def test_rule_minlength(self): + """ + Test the `ipalib.parameters.Bytes._rule_minlength` method. + """ + o = self.cls('my_bytes', minlength=3) + assert o.minlength == 3 + rule = o._rule_minlength + translation = u'minlength=%(minlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in ('abc', 'four', '12345'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in ('', 'a', '12'): + assert_equal( + rule(dummy, value), + translation % dict(minlength=3) + ) + assert dummy.message == 'must be at least %(minlength)d bytes' + assert dummy.called() is True + dummy.reset() + + def test_rule_maxlength(self): + """ + Test the `ipalib.parameters.Bytes._rule_maxlength` method. + """ + o = self.cls('my_bytes', maxlength=4) + assert o.maxlength == 4 + rule = o._rule_maxlength + translation = u'maxlength=%(maxlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in ('ab', '123', 'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in ('12345', 'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(maxlength=4) + ) + assert dummy.message == 'can be at most %(maxlength)d bytes' + assert dummy.called() is True + dummy.reset() + + def test_rule_length(self): + """ + Test the `ipalib.parameters.Bytes._rule_length` method. + """ + o = self.cls('my_bytes', length=4) + assert o.length == 4 + rule = o._rule_length + translation = u'length=%(length)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in ('1234', 'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in ('ab', '123', '12345', 'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(length=4), + ) + assert dummy.message == 'must be exactly %(length)d bytes' + assert dummy.called() is True + dummy.reset() + + def test_rule_pattern(self): + """ + Test the `ipalib.parameters.Bytes._rule_pattern` method. + """ + # Test our assumptions about Python re module and Unicode: + pat = '\w+$' + r = re.compile(pat) + assert r.match('Hello_World') is not None + assert r.match(utf8_bytes) is None + assert r.match(binary_bytes) is None + + # Create instance: + o = self.cls('my_bytes', pattern=pat) + assert o.pattern is pat + rule = o._rule_pattern + translation = u'pattern=%(pattern)r' + dummy = dummy_ugettext(translation) + + # Test with passing values: + for value in ('HELLO', 'hello', 'Hello_World'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in ('Hello!', 'Hello World', utf8_bytes, binary_bytes): + assert_equal( + rule(dummy, value), + translation % dict(pattern=pat), + ) + assert_equal(dummy.message, 'must match pattern "%(pattern)s"') + assert dummy.called() is True + dummy.reset() + + +class test_Str(ClassChecker): + """ + Test the `ipalib.parameters.Str` class. + """ + _cls = parameters.Str + + def test_init(self): + """ + Test the `ipalib.parameters.Str.__init__` method. + """ + o = self.cls('my_str') + assert o.type is unicode + assert o.password is False + assert o.minlength is None + assert o.maxlength is None + assert o.length is None + assert o.pattern is None + + def test_convert_scalar(self): + """ + Test the `ipalib.parameters.Str._convert_scalar` method. + """ + o = self.cls('my_str') + mthd = o._convert_scalar + for value in (u'Hello', 42, 1.2, unicode_str): + assert mthd(value) == unicode(value) + bad = [True, 'Hello', dict(one=1), utf8_bytes] + for value in bad: + e = raises(errors.ConversionError, mthd, value) + assert e.name == 'my_str' + assert e.index is None + assert_equal(unicode(e.error), u'must be Unicode text') + e = raises(errors.ConversionError, mthd, value, index=18) + assert e.name == 'my_str' + assert e.index == 18 + assert_equal(unicode(e.error), u'must be Unicode text') + bad = [(u'Hello',), [42.3]] + for value in bad: + e = raises(errors.ConversionError, mthd, value) + assert e.name == 'my_str' + assert e.index is None + assert_equal(unicode(e.error), u'Only one value is allowed') + assert o.convert(None) is None + + def test_rule_minlength(self): + """ + Test the `ipalib.parameters.Str._rule_minlength` method. + """ + o = self.cls('my_str', minlength=3) + assert o.minlength == 3 + rule = o._rule_minlength + translation = u'minlength=%(minlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (u'abc', u'four', u'12345'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (u'', u'a', u'12'): + assert_equal( + rule(dummy, value), + translation % dict(minlength=3) + ) + assert dummy.message == 'must be at least %(minlength)d characters' + assert dummy.called() is True + dummy.reset() + + def test_rule_maxlength(self): + """ + Test the `ipalib.parameters.Str._rule_maxlength` method. + """ + o = self.cls('my_str', maxlength=4) + assert o.maxlength == 4 + rule = o._rule_maxlength + translation = u'maxlength=%(maxlength)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (u'ab', u'123', u'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (u'12345', u'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(maxlength=4) + ) + assert dummy.message == 'can be at most %(maxlength)d characters' + assert dummy.called() is True + dummy.reset() + + def test_rule_length(self): + """ + Test the `ipalib.parameters.Str._rule_length` method. + """ + o = self.cls('my_str', length=4) + assert o.length == 4 + rule = o._rule_length + translation = u'length=%(length)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (u'1234', u'four'): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (u'ab', u'123', u'12345', u'sixsix'): + assert_equal( + rule(dummy, value), + translation % dict(length=4), + ) + assert dummy.message == 'must be exactly %(length)d characters' + assert dummy.called() is True + dummy.reset() + + def test_rule_pattern(self): + """ + Test the `ipalib.parameters.Str._rule_pattern` method. + """ + # Test our assumptions about Python re module and Unicode: + pat = '\w{5}$' + r1 = re.compile(pat) + r2 = re.compile(pat, re.UNICODE) + assert r1.match(unicode_str) is None + assert r2.match(unicode_str) is not None + + # Create instance: + o = self.cls('my_str', pattern=pat) + assert o.pattern is pat + rule = o._rule_pattern + translation = u'pattern=%(pattern)r' + dummy = dummy_ugettext(translation) + + # Test with passing values: + for value in (u'HELLO', u'hello', unicode_str): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (u'H LLO', u'***lo', unicode_str + unicode_str): + assert_equal( + rule(dummy, value), + translation % dict(pattern=pat), + ) + assert_equal(dummy.message, 'must match pattern "%(pattern)s"') + assert dummy.called() is True + dummy.reset() + + +class test_Password(ClassChecker): + """ + Test the `ipalib.parameters.Password` class. + """ + _cls = parameters.Password + + def test_init(self): + """ + Test the `ipalib.parameters.Password.__init__` method. + """ + o = self.cls('my_password') + assert o.type is unicode + assert o.minlength is None + assert o.maxlength is None + assert o.length is None + assert o.pattern is None + assert o.password is True + + def test_convert_scalar(self): + """ + Test the `ipalib.parameters.Password._convert_scalar` method. + """ + o = self.cls('my_password') + e = raises(errors.PasswordMismatch, o._convert_scalar, [u'one', u'two']) + assert e.name == 'my_password' + assert e.index is None + assert o._convert_scalar([u'one', u'one']) == u'one' + assert o._convert_scalar(u'one') == u'one' + + +class test_StrEnum(ClassChecker): + """ + Test the `ipalib.parameters.StrEnum` class. + """ + _cls = parameters.StrEnum + + def test_init(self): + """ + Test the `ipalib.parameters.StrEnum.__init__` method. + """ + values = (u'Hello', u'naughty', u'nurse!') + o = self.cls('my_strenum', values=values) + assert o.type is unicode + assert o.values is values + assert o.class_rules == (o._rule_values,) + assert o.rules == tuple() + assert o.all_rules == (o._rule_values,) + + badvalues = (u'Hello', 'naughty', u'nurse!') + e = raises(TypeError, self.cls, 'my_enum', values=badvalues) + assert str(e) == TYPE_ERROR % ( + "StrEnum('my_enum') values[1]", unicode, 'naughty', str + ) + + # Test that ValueError is raised when list of values is empty + badvalues = tuple() + e = raises(ValueError, self.cls, 'empty_enum', values=badvalues) + assert_equal(str(e), "StrEnum('empty_enum'): list of values must not " + "be empty") + + def test_rules_values(self): + """ + Test the `ipalib.parameters.StrEnum._rule_values` method. + """ + values = (u'Hello', u'naughty', u'nurse!') + o = self.cls('my_enum', values=values) + rule = o._rule_values + translation = u"values='Hello', 'naughty', 'nurse!'" + dummy = dummy_ugettext(translation) + + # Test with passing values: + for v in values: + assert rule(dummy, v) is None + assert dummy.called() is False + + # Test with failing values: + for val in (u'Howdy', u'quiet', u'library!'): + assert_equal( + rule(dummy, val), + translation % dict(values=values), + ) + assert_equal(dummy.message, "must be one of %(values)s") + dummy.reset() + + # test a special case when we have just one allowed value + values = (u'Hello', ) + o = self.cls('my_enum', values=values) + rule = o._rule_values + translation = u"value='Hello'" + dummy = dummy_ugettext(translation) + + for val in (u'Howdy', u'quiet', u'library!'): + assert_equal( + rule(dummy, val), + translation % dict(values=values), + ) + assert_equal(dummy.message, "must be '%(value)s'") + dummy.reset() + + +class test_Number(ClassChecker): + """ + Test the `ipalib.parameters.Number` class. + """ + _cls = parameters.Number + + def test_init(self): + """ + Test the `ipalib.parameters.Number.__init__` method. + """ + o = self.cls('my_number') + assert o.type is NoneType + assert o.password is False + assert o.rules == tuple() + assert o.class_rules == tuple() + assert o.all_rules == tuple() + + + +class test_Int(ClassChecker): + """ + Test the `ipalib.parameters.Int` class. + """ + _cls = parameters.Int + + def test_init(self): + """ + Test the `ipalib.parameters.Int.__init__` method. + """ + # Test with no kwargs: + o = self.cls('my_number') + assert o.type is int + assert isinstance(o, parameters.Int) + assert o.minvalue == int(MININT) + assert o.maxvalue == int(MAXINT) + + # Test when min > max: + e = raises(ValueError, self.cls, 'my_number', minvalue=22, maxvalue=15) + assert str(e) == \ + "Int('my_number'): minvalue > maxvalue (minvalue=22, maxvalue=15)" + + def test_rule_minvalue(self): + """ + Test the `ipalib.parameters.Int._rule_minvalue` method. + """ + o = self.cls('my_number', minvalue=3) + assert o.minvalue == 3 + rule = o._rule_minvalue + translation = u'minvalue=%(minvalue)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (4, 99, 1001): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (-1, 0, 2): + assert_equal( + rule(dummy, value), + translation % dict(minvalue=3) + ) + assert dummy.message == 'must be at least %(minvalue)d' + assert dummy.called() is True + dummy.reset() + + def test_rule_maxvalue(self): + """ + Test the `ipalib.parameters.Int._rule_maxvalue` method. + """ + o = self.cls('my_number', maxvalue=4) + assert o.maxvalue == 4 + rule = o._rule_maxvalue + translation = u'maxvalue=%(maxvalue)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (-1, 0, 4): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (5, 99, 1009): + assert_equal( + rule(dummy, value), + translation % dict(maxvalue=4) + ) + assert dummy.message == 'can be at most %(maxvalue)d' + assert dummy.called() is True + dummy.reset() + + def test_convert_scalar(self): + """ + Test the `ipalib.parameters.Int._convert_scalar` method. + Assure radix prefixes work, str objects fail, + floats (native & string) are truncated, + large magnitude values are promoted to long, + empty strings & invalid numerical representations fail + """ + o = self.cls('my_number') + # Assure invalid inputs raise error + for bad in ['hello', u'hello', True, None, u'', u'.']: + e = raises(errors.ConversionError, o._convert_scalar, bad) + assert e.name == 'my_number' + assert e.index is None + # Assure large magnatude values are handled correctly + assert type(o._convert_scalar(sys.maxint*2)) == long + assert o._convert_scalar(sys.maxint*2) == sys.maxint*2 + assert o._convert_scalar(unicode(sys.maxint*2)) == sys.maxint*2 + assert o._convert_scalar(long(16)) == 16 + # Assure normal conversions produce expected result + assert o._convert_scalar(u'16.99') == 16 + assert o._convert_scalar(16.99) == 16 + assert o._convert_scalar(u'16') == 16 + assert o._convert_scalar(u'0x10') == 16 + assert o._convert_scalar(u'020') == 16 + +class test_Decimal(ClassChecker): + """ + Test the `ipalib.parameters.Decimal` class. + """ + _cls = parameters.Decimal + + def test_init(self): + """ + Test the `ipalib.parameters.Decimal.__init__` method. + """ + # Test with no kwargs: + o = self.cls('my_number') + assert o.type is Decimal + assert isinstance(o, parameters.Decimal) + assert o.minvalue is None + assert o.maxvalue is None + + # Test when min > max: + e = raises(ValueError, self.cls, 'my_number', minvalue=Decimal('22.5'), maxvalue=Decimal('15.1')) + assert str(e) == \ + "Decimal('my_number'): minvalue > maxvalue (minvalue=22.5, maxvalue=15.1)" + + def test_rule_minvalue(self): + """ + Test the `ipalib.parameters.Decimal._rule_minvalue` method. + """ + o = self.cls('my_number', minvalue='3.1') + assert o.minvalue == Decimal('3.1') + rule = o._rule_minvalue + translation = u'minvalue=%(minvalue)s' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (Decimal('3.2'), Decimal('99.0')): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (Decimal('-1.2'), Decimal('0.0'), Decimal('3.0')): + assert_equal( + rule(dummy, value), + translation % dict(minvalue=Decimal('3.1')) + ) + assert dummy.message == 'must be at least %(minvalue)s' + assert dummy.called() is True + dummy.reset() + + def test_rule_maxvalue(self): + """ + Test the `ipalib.parameters.Decimal._rule_maxvalue` method. + """ + o = self.cls('my_number', maxvalue='4.7') + assert o.maxvalue == Decimal('4.7') + rule = o._rule_maxvalue + translation = u'maxvalue=%(maxvalue)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + + # Test with passing values: + for value in (Decimal('-1.0'), Decimal('0.1'), Decimal('4.2')): + assert rule(dummy, value) is None + assert dummy.called() is False + + # Test with failing values: + for value in (Decimal('5.3'), Decimal('99.9')): + assert_equal( + rule(dummy, value), + translation % dict(maxvalue=Decimal('4.7')) + ) + assert dummy.message == 'can be at most %(maxvalue)s' + assert dummy.called() is True + dummy.reset() + + def test_precision(self): + """ + Test the `ipalib.parameters.Decimal` precision attribute + """ + # precission is None + param = self.cls('my_number') + + for value in (Decimal('0'), Decimal('4.4'), Decimal('4.67')): + assert_equal( + param(value), + value) + + # precision is 0 + param = self.cls('my_number', precision=0) + for original,expected in ((Decimal('0'), '0'), + (Decimal('1.1'), '1'), + (Decimal('4.67'), '5')): + assert_equal( + str(param(original)), + expected) + + # precision is 1 + param = self.cls('my_number', precision=1) + for original,expected in ((Decimal('0'), '0.0'), + (Decimal('1.1'), '1.1'), + (Decimal('4.67'), '4.7')): + assert_equal( + str(param(original)), + expected) + + # value has too many digits + param = self.cls('my_number', precision=1) + e = raises(ConversionError, param, '123456789012345678901234567890') + + assert str(e) == \ + "invalid 'my_number': quantize result has too many digits for current context" + + def test_exponential(self): + """ + Test the `ipalib.parameters.Decimal` exponential attribute + """ + param = self.cls('my_number', exponential=True) + for original,expected in ((Decimal('0'), '0'), + (Decimal('1E3'), '1E+3'), + (Decimal('3.4E2'), '3.4E+2')): + assert_equal( + str(param(original)), + expected) + + + param = self.cls('my_number', exponential=False) + for original,expected in ((Decimal('0'), '0'), + (Decimal('1E3'), '1000'), + (Decimal('3.4E2'), '340')): + assert_equal( + str(param(original)), + expected) + + def test_numberclass(self): + """ + Test the `ipalib.parameters.Decimal` numberclass attribute + """ + # test default value: '-Normal', '+Zero', '+Normal' + param = self.cls('my_number') + for value,raises_verror in ((Decimal('0'), False), + (Decimal('-0'), True), + (Decimal('1E8'), False), + (Decimal('-1.1'), False), + (Decimal('-Infinity'), True), + (Decimal('+Infinity'), True), + (Decimal('NaN'), True)): + if raises_verror: + raises(ValidationError, param, value) + else: + param(value) + + + param = self.cls('my_number', exponential=True, + numberclass=('-Normal', '+Zero', '+Infinity')) + for value,raises_verror in ((Decimal('0'), False), + (Decimal('-0'), True), + (Decimal('1E8'), True), + (Decimal('-1.1'), False), + (Decimal('-Infinity'), True), + (Decimal('+Infinity'), False), + (Decimal('NaN'), True)): + if raises_verror: + raises(ValidationError, param, value) + else: + param(value) + +class test_AccessTime(ClassChecker): + """ + Test the `ipalib.parameters.AccessTime` class. + """ + _cls = parameters.AccessTime + + def test_init(self): + """ + Test the `ipalib.parameters.AccessTime.__init__` method. + """ + # Test with no kwargs: + o = self.cls('my_time') + assert o.type is unicode + assert isinstance(o, parameters.AccessTime) + assert o.multivalue is False + translation = u'length=%(length)r' + dummy = dummy_ugettext(translation) + assert dummy.translation is translation + rule = o._rule_required + + # Check some good rules + for value in (u'absolute 201012161032 ~ 201012161033', + u'periodic monthly week 2 day Sat,Sun 0900-1300', + u'periodic yearly month 4 day 1-31 0800-1400', + u'periodic weekly day 7 0800-1400', + u'periodic daily 0800-1400', + ): + assert rule(dummy, value) is None + assert dummy.called() is False + + # And some bad ones + for value in (u'absolute 201012161032 - 201012161033', + u'absolute 201012161032 ~', + u'periodic monthly day Sat,Sun 0900-1300', + u'periodical yearly month 4 day 1-31 0800-1400', + u'periodic weekly day 8 0800-1400', + ): + e = raises(ValidationError, o._rule_required, None, value) + +def test_create_param(): + """ + Test the `ipalib.parameters.create_param` function. + """ + f = parameters.create_param + + # Test that Param instances are returned unchanged: + params = ( + parameters.Param('one?'), + parameters.Int('two+'), + parameters.Str('three*'), + parameters.Bytes('four'), + ) + for p in params: + assert f(p) is p + + # Test that the spec creates an Str instance: + for spec in ('one?', 'two+', 'three*', 'four'): + (name, kw) = parameters.parse_param_spec(spec) + p = f(spec) + assert p.param_spec is spec + assert p.name == name + assert p.required is kw['required'] + assert p.multivalue is kw['multivalue'] + + # Test that TypeError is raised when spec is neither a Param nor a str: + for spec in (u'one', 42, parameters.Param, parameters.Str): + e = raises(TypeError, f, spec) + assert str(e) == \ + TYPE_ERROR % ('spec', (str, parameters.Param), spec, type(spec)) + + +def test_messages(): + """ + Test module level message in `ipalib.parameters`. + """ + for name in dir(parameters): + if name.startswith('_'): + continue + attr = getattr(parameters, name) + if not (isclass(attr) and issubclass(attr, parameters.Param)): + continue + assert type(attr.type_error) is str + assert attr.type_error in parameters.__messages + + +class test_IA5Str(ClassChecker): + """ + Test the `ipalib.parameters.IA5Str` class. + """ + _cls = parameters.IA5Str + + def test_convert_scalar(self): + """ + Test the `ipalib.parameters.IA5Str._convert_scalar` method. + """ + o = self.cls('my_str') + mthd = o._convert_scalar + for value in (u'Hello', 42, 1.2): + assert mthd(value) == unicode(value) + bad = ['HelloĆ”'] + for value in bad: + e = raises(errors.ConversionError, mthd, value) + assert e.name == 'my_str' + assert e.index is None + assert_equal(e.error, "The character '\\xc3' is not allowed.") diff --git a/ipatests/test_ipalib/test_plugable.py b/ipatests/test_ipalib/test_plugable.py new file mode 100644 index 00000000..c495e74d --- /dev/null +++ b/ipatests/test_ipalib/test_plugable.py @@ -0,0 +1,516 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.plugable` module. +""" + +import inspect +from ipatests.util import raises, no_set, no_del, read_only +from ipatests.util import getitem, setitem, delitem +from ipatests.util import ClassChecker, create_test_api +from ipalib import plugable, errors, text + + +class test_SetProxy(ClassChecker): + """ + Test the `ipalib.plugable.SetProxy` class. + """ + _cls = plugable.SetProxy + + def test_class(self): + """ + Test the `ipalib.plugable.SetProxy` class. + """ + assert self.cls.__bases__ == (plugable.ReadOnly,) + + def test_init(self): + """ + Test the `ipalib.plugable.SetProxy.__init__` method. + """ + okay = (set, frozenset, dict) + fail = (list, tuple) + for t in okay: + self.cls(t()) + raises(TypeError, self.cls, t) + for t in fail: + raises(TypeError, self.cls, t()) + raises(TypeError, self.cls, t) + + def test_SetProxy(self): + """ + Test container emulation of `ipalib.plugable.SetProxy` class. + """ + def get_key(i): + return 'key_%d' % i + + cnt = 10 + target = set() + proxy = self.cls(target) + for i in xrange(cnt): + key = get_key(i) + + # Check initial state + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert key not in proxy + assert key not in target + + # Add and test again + target.add(key) + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert key in proxy + assert key in target + + +class test_DictProxy(ClassChecker): + """ + Test the `ipalib.plugable.DictProxy` class. + """ + _cls = plugable.DictProxy + + def test_class(self): + """ + Test the `ipalib.plugable.DictProxy` class. + """ + assert self.cls.__bases__ == (plugable.SetProxy,) + + def test_init(self): + """ + Test the `ipalib.plugable.DictProxy.__init__` method. + """ + self.cls(dict()) + raises(TypeError, self.cls, dict) + fail = (set, frozenset, list, tuple) + for t in fail: + raises(TypeError, self.cls, t()) + raises(TypeError, self.cls, t) + + def test_DictProxy(self): + """ + Test container emulation of `ipalib.plugable.DictProxy` class. + """ + def get_kv(i): + return ( + 'key_%d' % i, + 'val_%d' % i, + ) + cnt = 10 + target = dict() + proxy = self.cls(target) + for i in xrange(cnt): + (key, val) = get_kv(i) + + # Check initial state + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert list(proxy()) == [target[k] for k in sorted(target)] + assert key not in proxy + raises(KeyError, getitem, proxy, key) + + # Add and test again + target[key] = val + assert len(proxy) == len(target) + assert list(proxy) == sorted(target) + assert list(proxy()) == [target[k] for k in sorted(target)] + + # Verify TypeError is raised trying to set/del via proxy + raises(TypeError, setitem, proxy, key, val) + raises(TypeError, delitem, proxy, key) + + +class test_MagicDict(ClassChecker): + """ + Test the `ipalib.plugable.MagicDict` class. + """ + _cls = plugable.MagicDict + + def test_class(self): + """ + Test the `ipalib.plugable.MagicDict` class. + """ + assert self.cls.__bases__ == (plugable.DictProxy,) + for non_dict in ('hello', 69, object): + raises(TypeError, self.cls, non_dict) + + def test_MagicDict(self): + """ + Test container emulation of `ipalib.plugable.MagicDict` class. + """ + cnt = 10 + keys = [] + d = dict() + dictproxy = self.cls(d) + for i in xrange(cnt): + key = 'key_%d' % i + val = 'val_%d' % i + keys.append(key) + + # Test thet key does not yet exist + assert len(dictproxy) == i + assert key not in dictproxy + assert not hasattr(dictproxy, key) + raises(KeyError, getitem, dictproxy, key) + raises(AttributeError, getattr, dictproxy, key) + + # Test that items/attributes cannot be set on dictproxy: + raises(TypeError, setitem, dictproxy, key, val) + raises(AttributeError, setattr, dictproxy, key, val) + + # Test that additions in d are reflected in dictproxy: + d[key] = val + assert len(dictproxy) == i + 1 + assert key in dictproxy + assert hasattr(dictproxy, key) + assert dictproxy[key] is val + assert read_only(dictproxy, key) is val + + # Test __iter__ + assert list(dictproxy) == keys + + for key in keys: + # Test that items cannot be deleted through dictproxy: + raises(TypeError, delitem, dictproxy, key) + raises(AttributeError, delattr, dictproxy, key) + + # Test that deletions in d are reflected in dictproxy + del d[key] + assert len(dictproxy) == len(d) + assert key not in dictproxy + raises(KeyError, getitem, dictproxy, key) + raises(AttributeError, getattr, dictproxy, key) + + +class test_Plugin(ClassChecker): + """ + Test the `ipalib.plugable.Plugin` class. + """ + _cls = plugable.Plugin + + def test_class(self): + """ + Test the `ipalib.plugable.Plugin` class. + """ + assert self.cls.__bases__ == (plugable.ReadOnly,) + assert type(self.cls.api) is property + + def test_init(self): + """ + Test the `ipalib.plugable.Plugin.__init__` method. + """ + o = self.cls() + assert o.name == 'Plugin' + assert o.module == 'ipalib.plugable' + assert o.fullname == 'ipalib.plugable.Plugin' + assert isinstance(o.doc, text.Gettext) + class some_subclass(self.cls): + """ + Do sub-classy things. + + Although it doesn't know how to comport itself and is not for mixed + company, this class *is* useful as we all need a little sub-class + now and then. + + One more paragraph. + """ + o = some_subclass() + assert o.name == 'some_subclass' + assert o.module == __name__ + assert o.fullname == '%s.some_subclass' % __name__ + assert o.summary == 'Do sub-classy things.' + assert isinstance(o.doc, text.Gettext) + class another_subclass(self.cls): + pass + o = another_subclass() + assert o.summary == '<%s>' % o.fullname + + # Test that Plugin makes sure the subclass hasn't defined attributes + # whose names conflict with the logger methods set in Plugin.__init__(): + class check(self.cls): + info = 'whatever' + e = raises(StandardError, check) + assert str(e) == \ + "info is already bound to ipatests.test_ipalib.test_plugable.check()" + + def test_set_api(self): + """ + Test the `ipalib.plugable.Plugin.set_api` method. + """ + api = 'the api instance' + o = self.cls() + assert o.api is None + e = raises(AssertionError, o.set_api, None) + assert str(e) == 'set_api() argument cannot be None' + o.set_api(api) + assert o.api is api + e = raises(AssertionError, o.set_api, api) + assert str(e) == 'set_api() can only be called once' + + def test_finalize(self): + """ + Test the `ipalib.plugable.Plugin.finalize` method. + """ + o = self.cls() + assert not o.__islocked__() + o.finalize() + assert o.__islocked__() + + def test_call(self): + """ + Test the `ipalib.plugable.Plugin.call` method. + """ + o = self.cls() + o.call('/bin/true') is None + e = raises(errors.SubprocessError, o.call, '/bin/false') + assert e.returncode == 1 + assert e.argv == ('/bin/false',) + + +def test_Registrar(): + """ + Test the `ipalib.plugable.Registrar` class + """ + class Base1(object): + pass + class Base2(object): + pass + class Base3(object): + pass + class plugin1(Base1): + pass + class plugin2(Base2): + pass + class plugin3(Base3): + pass + + # Test creation of Registrar: + r = plugable.Registrar(Base1, Base2) + + # Test __iter__: + assert list(r) == ['Base1', 'Base2'] + + # Test __hasitem__, __getitem__: + for base in [Base1, Base2]: + name = base.__name__ + assert name in r + assert r[name] is base + magic = getattr(r, name) + assert type(magic) is plugable.MagicDict + assert len(magic) == 0 + + # Check that TypeError is raised trying to register something that isn't + # a class: + p = plugin1() + e = raises(TypeError, r, p) + assert str(e) == 'plugin must be a class; got %r' % p + + # Check that SubclassError is raised trying to register a class that is + # not a subclass of an allowed base: + e = raises(errors.PluginSubclassError, r, plugin3) + assert e.plugin is plugin3 + + # Check that registration works + r(plugin1) + assert len(r.Base1) == 1 + assert r.Base1['plugin1'] is plugin1 + assert r.Base1.plugin1 is plugin1 + + # Check that DuplicateError is raised trying to register exact class + # again: + e = raises(errors.PluginDuplicateError, r, plugin1) + assert e.plugin is plugin1 + + # Check that OverrideError is raised trying to register class with same + # name and same base: + orig1 = plugin1 + class base1_extended(Base1): + pass + class plugin1(base1_extended): + pass + e = raises(errors.PluginOverrideError, r, plugin1) + assert e.base == 'Base1' + assert e.name == 'plugin1' + assert e.plugin is plugin1 + + # Check that overriding works + r(plugin1, override=True) + assert len(r.Base1) == 1 + assert r.Base1.plugin1 is plugin1 + assert r.Base1.plugin1 is not orig1 + + # Check that MissingOverrideError is raised trying to override a name + # not yet registerd: + e = raises(errors.PluginMissingOverrideError, r, plugin2, override=True) + assert e.base == 'Base2' + assert e.name == 'plugin2' + assert e.plugin is plugin2 + + # Test that another plugin can be registered: + assert len(r.Base2) == 0 + r(plugin2) + assert len(r.Base2) == 1 + assert r.Base2.plugin2 is plugin2 + + # Setup to test more registration: + class plugin1a(Base1): + pass + r(plugin1a) + + class plugin1b(Base1): + pass + r(plugin1b) + + class plugin2a(Base2): + pass + r(plugin2a) + + class plugin2b(Base2): + pass + r(plugin2b) + + # Again test __hasitem__, __getitem__: + for base in [Base1, Base2]: + name = base.__name__ + assert name in r + assert r[name] is base + magic = getattr(r, name) + assert len(magic) == 3 + for key in magic: + klass = magic[key] + assert getattr(magic, key) is klass + assert issubclass(klass, base) + + +class test_API(ClassChecker): + """ + Test the `ipalib.plugable.API` class. + """ + + _cls = plugable.API + + def test_API(self): + """ + Test the `ipalib.plugable.API` class. + """ + assert issubclass(plugable.API, plugable.ReadOnly) + + # Setup the test bases, create the API: + class base0(plugable.Plugin): + def method(self, n): + return n + + class base1(plugable.Plugin): + def method(self, n): + return n + 1 + + api = plugable.API(base0, base1) + api.env.mode = 'unit_test' + api.env.in_tree = True + r = api.register + assert isinstance(r, plugable.Registrar) + assert read_only(api, 'register') is r + + class base0_plugin0(base0): + pass + r(base0_plugin0) + + class base0_plugin1(base0): + pass + r(base0_plugin1) + + class base0_plugin2(base0): + pass + r(base0_plugin2) + + class base1_plugin0(base1): + pass + r(base1_plugin0) + + class base1_plugin1(base1): + pass + r(base1_plugin1) + + class base1_plugin2(base1): + pass + r(base1_plugin2) + + # Test API instance: + assert api.isdone('bootstrap') is False + assert api.isdone('finalize') is False + api.finalize() + assert api.isdone('bootstrap') is True + assert api.isdone('finalize') is True + + def get_base_name(b): + return 'base%d' % b + + + def get_plugin_name(b, p): + return 'base%d_plugin%d' % (b, p) + + for b in xrange(2): + base_name = get_base_name(b) + base = locals()[base_name] + ns = getattr(api, base_name) + assert isinstance(ns, plugable.NameSpace) + assert read_only(api, base_name) is ns + assert len(ns) == 3 + for p in xrange(3): + plugin_name = get_plugin_name(b, p) + plugin = locals()[plugin_name] + inst = ns[plugin_name] + assert isinstance(inst, base) + assert isinstance(inst, plugin) + assert inst.name == plugin_name + assert read_only(ns, plugin_name) is inst + assert inst.method(7) == 7 + b + + # Test that calling finilize again raises AssertionError: + e = raises(StandardError, api.finalize) + assert str(e) == 'API.finalize() already called', str(e) + + def test_bootstrap(self): + """ + Test the `ipalib.plugable.API.bootstrap` method. + """ + (o, home) = create_test_api() + assert o.env._isdone('_bootstrap') is False + assert o.env._isdone('_finalize_core') is False + assert o.isdone('bootstrap') is False + o.bootstrap(my_test_override='Hello, world!') + assert o.isdone('bootstrap') is True + assert o.env._isdone('_bootstrap') is True + assert o.env._isdone('_finalize_core') is True + assert o.env.my_test_override == 'Hello, world!' + e = raises(StandardError, o.bootstrap) + assert str(e) == 'API.bootstrap() already called' + + def test_load_plugins(self): + """ + Test the `ipalib.plugable.API.load_plugins` method. + """ + (o, home) = create_test_api() + assert o.isdone('bootstrap') is False + assert o.isdone('load_plugins') is False + o.load_plugins() + assert o.isdone('bootstrap') is True + assert o.isdone('load_plugins') is True + e = raises(StandardError, o.load_plugins) + assert str(e) == 'API.load_plugins() already called' diff --git a/ipatests/test_ipalib/test_rpc.py b/ipatests/test_ipalib/test_rpc.py new file mode 100644 index 00000000..56b8184c --- /dev/null +++ b/ipatests/test_ipalib/test_rpc.py @@ -0,0 +1,244 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.rpc` module. +""" + +import threading +from xmlrpclib import Binary, Fault, dumps, loads, ServerProxy +from ipatests.util import raises, assert_equal, PluginTester, DummyClass +from ipatests.data import binary_bytes, utf8_bytes, unicode_str +from ipalib.frontend import Command +from ipalib.request import context, Connection +from ipalib import rpc, errors + + +std_compound = (binary_bytes, utf8_bytes, unicode_str) + + +def dump_n_load(value): + (param, method) = loads( + dumps((value,), allow_none=True) + ) + return param[0] + + +def round_trip(value): + return rpc.xml_unwrap( + dump_n_load(rpc.xml_wrap(value)) + ) + + +def test_round_trip(): + """ + Test `ipalib.rpc.xml_wrap` and `ipalib.rpc.xml_unwrap`. + + This tests the two functions together with ``xmlrpclib.dumps()`` and + ``xmlrpclib.loads()`` in a full wrap/dumps/loads/unwrap round trip. + """ + # We first test that our assumptions about xmlrpclib module in the Python + # standard library are correct: + assert_equal(dump_n_load(utf8_bytes), unicode_str) + assert_equal(dump_n_load(unicode_str), unicode_str) + assert_equal(dump_n_load(Binary(binary_bytes)).data, binary_bytes) + assert isinstance(dump_n_load(Binary(binary_bytes)), Binary) + assert type(dump_n_load('hello')) is str + assert type(dump_n_load(u'hello')) is str + assert_equal(dump_n_load(''), '') + assert_equal(dump_n_load(u''), '') + assert dump_n_load(None) is None + + # Now we test our wrap and unwrap methods in combination with dumps, loads: + # All str should come back str (because they get wrapped in + # xmlrpclib.Binary(). All unicode should come back unicode because str + # explicity get decoded by rpc.xml_unwrap() if they weren't already + # decoded by xmlrpclib.loads(). + assert_equal(round_trip(utf8_bytes), utf8_bytes) + assert_equal(round_trip(unicode_str), unicode_str) + assert_equal(round_trip(binary_bytes), binary_bytes) + assert type(round_trip('hello')) is str + assert type(round_trip(u'hello')) is unicode + assert_equal(round_trip(''), '') + assert_equal(round_trip(u''), u'') + assert round_trip(None) is None + compound = [utf8_bytes, None, binary_bytes, (None, unicode_str), + dict(utf8=utf8_bytes, chars=unicode_str, data=binary_bytes) + ] + assert round_trip(compound) == tuple(compound) + + +def test_xml_wrap(): + """ + Test the `ipalib.rpc.xml_wrap` function. + """ + f = rpc.xml_wrap + assert f([]) == tuple() + assert f({}) == dict() + b = f('hello') + assert isinstance(b, Binary) + assert b.data == 'hello' + u = f(u'hello') + assert type(u) is unicode + assert u == u'hello' + value = f([dict(one=False, two=u'hello'), None, 'hello']) + + +def test_xml_unwrap(): + """ + Test the `ipalib.rpc.xml_unwrap` function. + """ + f = rpc.xml_unwrap + assert f([]) == tuple() + assert f({}) == dict() + value = f(Binary(utf8_bytes)) + assert type(value) is str + assert value == utf8_bytes + assert f(utf8_bytes) == unicode_str + assert f(unicode_str) == unicode_str + value = f([True, Binary('hello'), dict(one=1, two=utf8_bytes, three=None)]) + assert value == (True, 'hello', dict(one=1, two=unicode_str, three=None)) + assert type(value[1]) is str + assert type(value[2]['two']) is unicode + + +def test_xml_dumps(): + """ + Test the `ipalib.rpc.xml_dumps` function. + """ + f = rpc.xml_dumps + params = (binary_bytes, utf8_bytes, unicode_str, None) + + # Test serializing an RPC request: + data = f(params, 'the_method') + (p, m) = loads(data) + assert_equal(m, u'the_method') + assert type(p) is tuple + assert rpc.xml_unwrap(p) == params + + # Test serializing an RPC response: + data = f((params,), methodresponse=True) + (tup, m) = loads(data) + assert m is None + assert len(tup) == 1 + assert type(tup) is tuple + assert rpc.xml_unwrap(tup[0]) == params + + # Test serializing an RPC response containing a Fault: + fault = Fault(69, unicode_str) + data = f(fault, methodresponse=True) + e = raises(Fault, loads, data) + assert e.faultCode == 69 + assert_equal(e.faultString, unicode_str) + + +def test_xml_loads(): + """ + Test the `ipalib.rpc.xml_loads` function. + """ + f = rpc.xml_loads + params = (binary_bytes, utf8_bytes, unicode_str, None) + wrapped = rpc.xml_wrap(params) + + # Test un-serializing an RPC request: + data = dumps(wrapped, 'the_method', allow_none=True) + (p, m) = f(data) + assert_equal(m, u'the_method') + assert_equal(p, params) + + # Test un-serializing an RPC response: + data = dumps((wrapped,), methodresponse=True, allow_none=True) + (tup, m) = f(data) + assert m is None + assert len(tup) == 1 + assert type(tup) is tuple + assert_equal(tup[0], params) + + # Test un-serializing an RPC response containing a Fault: + for error in (unicode_str, u'hello'): + fault = Fault(69, error) + data = dumps(fault, methodresponse=True, allow_none=True, encoding='UTF-8') + e = raises(Fault, f, data) + assert e.faultCode == 69 + assert_equal(e.faultString, error) + assert type(e.faultString) is unicode + + +class test_xmlclient(PluginTester): + """ + Test the `ipalib.rpc.xmlclient` plugin. + """ + _plugin = rpc.xmlclient + + def test_forward(self): + """ + Test the `ipalib.rpc.xmlclient.forward` method. + """ + class user_add(Command): + pass + + # Test that ValueError is raised when forwarding a command that is not + # in api.Command: + (o, api, home) = self.instance('Backend', in_server=False) + e = raises(ValueError, o.forward, 'user_add') + assert str(e) == '%s.forward(): %r not in api.Command' % ( + 'xmlclient', 'user_add' + ) + + (o, api, home) = self.instance('Backend', user_add, in_server=False) + args = (binary_bytes, utf8_bytes, unicode_str) + kw = dict(one=binary_bytes, two=utf8_bytes, three=unicode_str) + params = [args, kw] + result = (unicode_str, binary_bytes, utf8_bytes) + conn = DummyClass( + ( + 'user_add', + rpc.xml_wrap(params), + {}, + rpc.xml_wrap(result), + ), + ( + 'user_add', + rpc.xml_wrap(params), + {}, + Fault(3007, u"'four' is required"), # RequirementError + ), + ( + 'user_add', + rpc.xml_wrap(params), + {}, + Fault(700, u'no such error'), # There is no error 700 + ), + + ) + context.xmlclient = Connection(conn, lambda: None) + + # Test with a successful return value: + assert o.forward('user_add', *args, **kw) == result + + # Test with an errno the client knows: + e = raises(errors.RequirementError, o.forward, 'user_add', *args, **kw) + assert_equal(e.args[0], u"'four' is required") + + # Test with an errno the client doesn't know + e = raises(errors.UnknownError, o.forward, 'user_add', *args, **kw) + assert_equal(e.code, 700) + assert_equal(e.error, u'no such error') + + assert context.xmlclient.conn._calledall() is True diff --git a/ipatests/test_ipalib/test_text.py b/ipatests/test_ipalib/test_text.py new file mode 100644 index 00000000..2a5ff7a3 --- /dev/null +++ b/ipatests/test_ipalib/test_text.py @@ -0,0 +1,334 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty contextrmation +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.text` module. +""" + +import os +import shutil +import tempfile +import re +import nose +import locale +from ipatests.util import raises, assert_equal +from ipatests.i18n import create_po, po_file_iterate +from ipalib.request import context +from ipalib import request +from ipalib import text +from ipapython.ipautil import file_exists + +singular = '%(count)d goose makes a %(dish)s' +plural = '%(count)d geese make a %(dish)s' + + +def test_create_translation(): + f = text.create_translation + key = ('foo', None) + t = f(key) + assert context.__dict__[key] is t + + +class test_TestLang(object): + def setUp(self): + self.tmp_dir = None + self.saved_lang = None + + self.lang = 'xh_ZA' + self.domain = 'ipa' + + self.ipa_i18n_dir = os.path.join(os.path.dirname(__file__), '../../install/po') + + self.pot_basename = '%s.pot' % self.domain + self.po_basename = '%s.po' % self.lang + self.mo_basename = '%s.mo' % self.domain + + self.tmp_dir = tempfile.mkdtemp() + self.saved_lang = os.environ['LANG'] + + self.locale_dir = os.path.join(self.tmp_dir, 'test_locale') + self.msg_dir = os.path.join(self.locale_dir, self.lang, 'LC_MESSAGES') + + if not os.path.exists(self.msg_dir): + os.makedirs(self.msg_dir) + + self.pot_file = os.path.join(self.ipa_i18n_dir, self.pot_basename) + self.mo_file = os.path.join(self.msg_dir, self.mo_basename) + self.po_file = os.path.join(self.tmp_dir, self.po_basename) + + result = create_po(self.pot_file, self.po_file, self.mo_file) + if result: + raise nose.SkipTest('Unable to create po file "%s" & mo file "%s" from pot file "%s"' % + (self.po_file, self.mo_file, self.pot_file)) + + if not file_exists(self.po_file): + raise nose.SkipTest('Test po file unavailable, run "make test" in install/po') + + if not file_exists(self.mo_file): + raise nose.SkipTest('Test mo file unavailable, run "make test" in install/po') + + self.po_file_iterate = po_file_iterate + + def tearDown(self): + if self.saved_lang is not None: + os.environ['LANG'] = self.saved_lang + + if self.tmp_dir is not None: + shutil.rmtree(self.tmp_dir) + + def test_test_lang(self): + print "test_test_lang" + # The test installs the test message catalog under the xh_ZA + # (e.g. Zambia Xhosa) language by default. It would be nice to + # use a dummy language not associated with any real language, + # but the setlocale function demands the locale be a valid + # known locale, Zambia Xhosa is a reasonable choice :) + + os.environ['LANG'] = self.lang + + # Create a gettext translation object specifying our domain as + # 'ipa' and the locale_dir as 'test_locale' (i.e. where to + # look for the message catalog). Then use that translation + # object to obtain the translation functions. + + def get_msgstr(msg): + gt = text.GettextFactory(localedir=self.locale_dir)(msg) + return unicode(gt) + + def get_msgstr_plural(singular, plural, count): + ng = text.NGettextFactory(localedir=self.locale_dir)(singular, plural, count) + return ng(count) + + result = self.po_file_iterate(self.po_file, get_msgstr, get_msgstr_plural) + assert result == 0 + +class test_LazyText(object): + + klass = text.LazyText + + def test_init(self): + inst = self.klass('foo', 'bar') + assert inst.domain == 'foo' + assert inst.localedir == 'bar' + assert inst.key == ('foo', 'bar') + + +class test_FixMe(object): + klass = text.FixMe + + def test_init(self): + inst = self.klass('user.label') + assert inst.msg == 'user.label' + assert inst.domain is None + assert inst.localedir is None + + def test_repr(self): + inst = self.klass('user.label') + assert repr(inst) == "FixMe('user.label')" + + def test_unicode(self): + inst = self.klass('user.label') + assert unicode(inst) == u'' + assert type(unicode(inst)) is unicode + + +class test_Gettext(object): + + klass = text.Gettext + + def test_init(self): + inst = self.klass('what up?', 'foo', 'bar') + assert inst.domain == 'foo' + assert inst.localedir == 'bar' + assert inst.msg is 'what up?' + assert inst.args == ('what up?', 'foo', 'bar') + + def test_repr(self): + inst = self.klass('foo', 'bar', 'baz') + assert repr(inst) == "Gettext('foo', domain='bar', localedir='baz')" + + def test_unicode(self): + inst = self.klass('what up?', 'foo', 'bar') + assert unicode(inst) == u'what up?' + + def test_mod(self): + inst = self.klass('hello %(adj)s nurse', 'foo', 'bar') + assert inst % dict(adj='naughty', stuff='junk') == 'hello naughty nurse' + + def test_eq(self): + inst1 = self.klass('what up?', 'foo', 'bar') + inst2 = self.klass('what up?', 'foo', 'bar') + inst3 = self.klass('Hello world', 'foo', 'bar') + inst4 = self.klass('what up?', 'foo', 'baz') + + assert (inst1 == inst1) is True + assert (inst1 == inst2) is True + assert (inst1 == inst3) is False + assert (inst1 == inst4) is False + + # Test with args flipped + assert (inst2 == inst1) is True + assert (inst3 == inst1) is False + assert (inst4 == inst1) is False + + def test_ne(self): + inst1 = self.klass('what up?', 'foo', 'bar') + inst2 = self.klass('what up?', 'foo', 'bar') + inst3 = self.klass('Hello world', 'foo', 'bar') + inst4 = self.klass('what up?', 'foo', 'baz') + + assert (inst1 != inst2) is False + assert (inst1 != inst2) is False + assert (inst1 != inst3) is True + assert (inst1 != inst4) is True + + # Test with args flipped + assert (inst2 != inst1) is False + assert (inst3 != inst1) is True + assert (inst4 != inst1) is True + + +class test_NGettext(object): + + klass = text.NGettext + + def test_init(self): + inst = self.klass(singular, plural, 'foo', 'bar') + assert inst.singular is singular + assert inst.plural is plural + assert inst.domain == 'foo' + assert inst.localedir == 'bar' + assert inst.args == (singular, plural, 'foo', 'bar') + + def test_repr(self): + inst = self.klass('sig', 'plu', 'foo', 'bar') + assert repr(inst) == \ + "NGettext('sig', 'plu', domain='foo', localedir='bar')" + + def test_call(self): + inst = self.klass(singular, plural, 'foo', 'bar') + assert inst(0) == plural + assert inst(1) == singular + assert inst(2) == plural + assert inst(3) == plural + + def test_mod(self): + inst = self.klass(singular, plural, 'foo', 'bar') + assert inst % dict(count=0, dish='frown') == '0 geese make a frown' + assert inst % dict(count=1, dish='stew') == '1 goose makes a stew' + assert inst % dict(count=2, dish='pie') == '2 geese make a pie' + + def test_eq(self): + inst1 = self.klass(singular, plural, 'foo', 'bar') + inst2 = self.klass(singular, plural, 'foo', 'bar') + inst3 = self.klass(singular, '%(count)d thingies', 'foo', 'bar') + inst4 = self.klass(singular, plural, 'foo', 'baz') + + assert (inst1 == inst1) is True + assert (inst1 == inst2) is True + assert (inst1 == inst3) is False + assert (inst1 == inst4) is False + + # Test with args flipped + assert (inst2 == inst1) is True + assert (inst3 == inst1) is False + assert (inst4 == inst1) is False + + def test_ne(self): + inst1 = self.klass(singular, plural, 'foo', 'bar') + inst2 = self.klass(singular, plural, 'foo', 'bar') + inst3 = self.klass(singular, '%(count)d thingies', 'foo', 'bar') + inst4 = self.klass(singular, plural, 'foo', 'baz') + + assert (inst1 != inst2) is False + assert (inst1 != inst2) is False + assert (inst1 != inst3) is True + assert (inst1 != inst4) is True + + # Test with args flipped + assert (inst2 != inst1) is False + assert (inst3 != inst1) is True + assert (inst4 != inst1) is True + + +class test_GettextFactory(object): + + klass = text.GettextFactory + + def test_init(self): + # Test with defaults: + inst = self.klass() + assert inst.domain == 'ipa' + assert inst.localedir is None + + # Test with overrides: + inst = self.klass('foo', 'bar') + assert inst.domain == 'foo' + assert inst.localedir == 'bar' + + def test_repr(self): + # Test with defaults: + inst = self.klass() + assert repr(inst) == "GettextFactory(domain='ipa', localedir=None)" + + # Test with overrides: + inst = self.klass('foo', 'bar') + assert repr(inst) == "GettextFactory(domain='foo', localedir='bar')" + + def test_call(self): + inst = self.klass('foo', 'bar') + g = inst('what up?') + assert type(g) is text.Gettext + assert g.msg is 'what up?' + assert g.domain == 'foo' + assert g.localedir == 'bar' + + +class test_NGettextFactory(object): + + klass = text.NGettextFactory + + def test_init(self): + # Test with defaults: + inst = self.klass() + assert inst.domain == 'ipa' + assert inst.localedir is None + + # Test with overrides: + inst = self.klass('foo', 'bar') + assert inst.domain == 'foo' + assert inst.localedir == 'bar' + + def test_repr(self): + # Test with defaults: + inst = self.klass() + assert repr(inst) == "NGettextFactory(domain='ipa', localedir=None)" + + # Test with overrides: + inst = self.klass('foo', 'bar') + assert repr(inst) == "NGettextFactory(domain='foo', localedir='bar')" + + def test_call(self): + inst = self.klass('foo', 'bar') + ng = inst(singular, plural, 7) + assert type(ng) is text.NGettext + assert ng.singular is singular + assert ng.plural is plural + assert ng.domain == 'foo' + assert ng.localedir == 'bar' diff --git a/ipatests/test_ipalib/test_util.py b/ipatests/test_ipalib/test_util.py new file mode 100644 index 00000000..9d19dfb2 --- /dev/null +++ b/ipatests/test_ipalib/test_util.py @@ -0,0 +1,26 @@ +# Authors: +# Jason Gerard DeRose +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.util` module. +""" + +from ipalib import util + + diff --git a/ipatests/test_ipalib/test_x509.py b/ipatests/test_ipalib/test_x509.py new file mode 100644 index 00000000..c7fafbbd --- /dev/null +++ b/ipatests/test_ipalib/test_x509.py @@ -0,0 +1,139 @@ +# Authors: +# Rob Crittenden +# +# Copyright (C) 2010 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib.x509` module. +""" + +import os +from os import path +import sys +from ipatests.util import raises, setitem, delitem, ClassChecker +from ipatests.util import getitem, setitem, delitem +from ipatests.util import TempDir, TempHome +from ipalib.constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR +from ipalib.constants import NAME_REGEX, NAME_ERROR +import base64 +from ipalib import x509 +from nss.error import NSPRError +from ipapython.dn import DN + +# certutil - + +# certificate for CN=ipa.example.com,O=IPA +goodcert = 'MIICAjCCAWugAwIBAgICBEUwDQYJKoZIhvcNAQEFBQAwKTEnMCUGA1UEAxMeSVBBIFRlc3QgQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4XDTEwMDYyNTEzMDA0MloXDTE1MDYyNTEzMDA0MlowKDEMMAoGA1UEChMDSVBBMRgwFgYDVQQDEw9pcGEuZXhhbXBsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAJcZ+H6+cQaN/BlzR8OYkVeJgaU5tCaV9FF1m7Ws/ftPtTJUaSL1ncp6603rjA4tH1aa/B8i8xdC46+ZbY2au8b9ryGcOsx2uaRpNLEQ2Fy//q1kQC8oM+iD8Nd6osF0a2wnugsgnJHPuJzhViaWxYgzk5DRdP81debokF3f3FX/AgMBAAGjOjA4MBEGCWCGSAGG+EIBAQQEAwIGQDATBgNVHSUEDDAKBggrBgEFBQcDATAOBgNVHQ8BAf8EBAMCBPAwDQYJKoZIhvcNAQEFBQADgYEALD6X9V9w381AzzQPcHsjIjiX3B/AF9RCGocKZUDXkdDhsD9NZ3PLPEf1AMjkraKG963HPB8scyiBbbSuSh6m7TCp0eDgRpo77zNuvd3U4Qpm0Qk+KEjtHQDjNNG6N4ZnCQPmjFPScElvc/GgW7XMbywJy2euF+3/Uip8cnPgSH4=' + +# The base64-encoded string 'bad cert' +badcert = 'YmFkIGNlcnQ=' + +class test_x509(object): + """ + Test `ipalib.x509` + + I created the contents of this certificate with a self-signed CA with: + % certutil -R -s "CN=ipa.example.com,O=IPA" -d . -a -o example.csr + % ./ipa host-add ipa.example.com + % ./ipa cert-request --add --principal=test/ipa.example.com example.csr + """ + + def test_1_load_base64_cert(self): + """ + Test loading a base64-encoded certificate. + """ + + # Load a good cert + cert = x509.load_certificate(goodcert) + + # Load a good cert with headers + newcert = '-----BEGIN CERTIFICATE-----' + goodcert + '-----END CERTIFICATE-----' + cert = x509.load_certificate(newcert) + + # Load a good cert with bad headers + newcert = '-----BEGIN CERTIFICATE-----' + goodcert + try: + cert = x509.load_certificate(newcert) + except TypeError: + pass + + # Load a bad cert + try: + cert = x509.load_certificate(badcert) + except NSPRError: + pass + + def test_1_load_der_cert(self): + """ + Test loading a DER certificate. + """ + + der = base64.b64decode(goodcert) + + # Load a good cert + cert = x509.load_certificate(der, x509.DER) + + def test_2_get_subject(self): + """ + Test retrieving the subject + """ + subject = x509.get_subject(goodcert) + assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA')) + + der = base64.b64decode(goodcert) + subject = x509.get_subject(der, x509.DER) + assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA')) + + # We should be able to pass in a tuple/list of certs too + subject = x509.get_subject((goodcert)) + assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA')) + + subject = x509.get_subject([goodcert]) + assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA')) + + def test_2_get_serial_number(self): + """ + Test retrieving the serial number + """ + serial = x509.get_serial_number(goodcert) + assert serial == 1093 + + der = base64.b64decode(goodcert) + serial = x509.get_serial_number(der, x509.DER) + assert serial == 1093 + + # We should be able to pass in a tuple/list of certs too + serial = x509.get_serial_number((goodcert)) + assert serial == 1093 + + serial = x509.get_serial_number([goodcert]) + assert serial == 1093 + + def test_3_cert_contents(self): + """ + Test the contents of a certificate + """ + # Verify certificate contents. This exercises python-nss more than + # anything but confirms our usage of it. + + cert = x509.load_certificate(goodcert) + + assert DN(str(cert.subject)) == DN(('CN','ipa.example.com'),('O','IPA')) + assert DN(str(cert.issuer)) == DN(('CN','IPA Test Certificate Authority')) + assert cert.serial_number == 1093 + assert cert.valid_not_before_str == 'Fri Jun 25 13:00:42 2010 UTC' + assert cert.valid_not_after_str == 'Thu Jun 25 13:00:42 2015 UTC' -- cgit