summaryrefslogtreecommitdiffstats
path: root/ipatests/test_ipalib
diff options
context:
space:
mode:
authorPetr Viktorin <pviktori@redhat.com>2013-05-21 13:40:27 +0200
committerMartin Kosek <mkosek@redhat.com>2013-06-17 19:22:50 +0200
commitc60142efda817f030a7495cd6fe4a19953e55afa (patch)
tree31a840ceddd4381311bbc879f9851bb71a8e2ffa /ipatests/test_ipalib
parent6d66e826c1c248dffc80056b20c1e4b74b04d46f (diff)
downloadfreeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.gz
freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.xz
freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.zip
Make an ipa-tests package
Rename the 'tests' directory to 'ipa-tests', and create an ipa-tests RPM containing the test suite Part of the work for: https://fedorahosted.org/freeipa/ticket/3654
Diffstat (limited to 'ipatests/test_ipalib')
-rw-r--r--ipatests/test_ipalib/__init__.py22
-rw-r--r--ipatests/test_ipalib/test_backend.py272
-rw-r--r--ipatests/test_ipalib/test_base.py352
-rw-r--r--ipatests/test_ipalib/test_capabilities.py33
-rw-r--r--ipatests/test_ipalib/test_cli.py116
-rw-r--r--ipatests/test_ipalib/test_config.py609
-rw-r--r--ipatests/test_ipalib/test_crud.py240
-rw-r--r--ipatests/test_ipalib/test_errors.py374
-rw-r--r--ipatests/test_ipalib/test_frontend.py1188
-rw-r--r--ipatests/test_ipalib/test_messages.py89
-rw-r--r--ipatests/test_ipalib/test_output.py89
-rw-r--r--ipatests/test_ipalib/test_parameters.py1533
-rw-r--r--ipatests/test_ipalib/test_plugable.py516
-rw-r--r--ipatests/test_ipalib/test_rpc.py244
-rw-r--r--ipatests/test_ipalib/test_text.py334
-rw-r--r--ipatests/test_ipalib/test_util.py26
-rw-r--r--ipatests/test_ipalib/test_x509.py139
17 files changed, 6176 insertions, 0 deletions
diff --git a/ipatests/test_ipalib/__init__.py b/ipatests/test_ipalib/__init__.py
new file mode 100644
index 000000000..4e4c605cd
--- /dev/null
+++ b/ipatests/test_ipalib/__init__.py
@@ -0,0 +1,22 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Sub-package containing unit tests for `ipalib` package.
+"""
diff --git a/ipatests/test_ipalib/test_backend.py b/ipatests/test_ipalib/test_backend.py
new file mode 100644
index 000000000..3ebed4bba
--- /dev/null
+++ b/ipatests/test_ipalib/test_backend.py
@@ -0,0 +1,272 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.backend` module.
+"""
+
+import threading
+from ipatests.util import ClassChecker, raises, create_test_api
+from ipatests.data import unicode_str
+from ipalib.request import context, Connection
+from ipalib.frontend import Command
+from ipalib import backend, plugable, errors, base
+from ipapython.version import API_VERSION
+
+
+
+class test_Backend(ClassChecker):
+ """
+ Test the `ipalib.backend.Backend` class.
+ """
+
+ _cls = backend.Backend
+
+ def test_class(self):
+ assert self.cls.__bases__ == (plugable.Plugin,)
+
+
+class Disconnect(object):
+ called = False
+
+ def __init__(self, id=None):
+ self.id = id
+
+ def __call__(self):
+ assert self.called is False
+ self.called = True
+ if self.id is not None:
+ delattr(context, self.id)
+
+
+class test_Connectible(ClassChecker):
+ """
+ Test the `ipalib.backend.Connectible` class.
+ """
+
+ _cls = backend.Connectible
+
+ def test_connect(self):
+ """
+ Test the `ipalib.backend.Connectible.connect` method.
+ """
+ # Test that connection is created:
+ class example(self.cls):
+ def create_connection(self, *args, **kw):
+ object.__setattr__(self, 'args', args)
+ object.__setattr__(self, 'kw', kw)
+ return 'The connection.'
+ o = example()
+ args = ('Arg1', 'Arg2', 'Arg3')
+ kw = dict(key1='Val1', key2='Val2', key3='Val3')
+ assert not hasattr(context, 'example')
+ assert o.connect(*args, **kw) is None
+ conn = context.example
+ assert type(conn) is Connection
+ assert o.args == args
+ assert o.kw == kw
+ assert conn.conn == 'The connection.'
+ assert conn.disconnect == o.disconnect
+
+ # Test that StandardError is raised if already connected:
+ m = "connect: 'context.%s' already exists in thread %r"
+ e = raises(StandardError, o.connect, *args, **kw)
+ assert str(e) == m % ('example', threading.currentThread().getName())
+
+ # Double check that it works after deleting context.example:
+ del context.example
+ assert o.connect(*args, **kw) is None
+
+ def test_create_connection(self):
+ """
+ Test the `ipalib.backend.Connectible.create_connection` method.
+ """
+ class example(self.cls):
+ pass
+ for klass in (self.cls, example):
+ o = klass()
+ e = raises(NotImplementedError, o.create_connection)
+ assert str(e) == '%s.create_connection()' % klass.__name__
+
+ def test_disconnect(self):
+ """
+ Test the `ipalib.backend.Connectible.disconnect` method.
+ """
+ class example(self.cls):
+ destroy_connection = Disconnect()
+ o = example()
+
+ m = "disconnect: 'context.%s' does not exist in thread %r"
+ e = raises(StandardError, o.disconnect)
+ assert str(e) == m % ('example', threading.currentThread().getName())
+
+ context.example = 'The connection.'
+ assert o.disconnect() is None
+ assert example.destroy_connection.called is True
+
+ def test_destroy_connection(self):
+ """
+ Test the `ipalib.backend.Connectible.destroy_connection` method.
+ """
+ class example(self.cls):
+ pass
+ for klass in (self.cls, example):
+ o = klass()
+ e = raises(NotImplementedError, o.destroy_connection)
+ assert str(e) == '%s.destroy_connection()' % klass.__name__
+
+ def test_isconnected(self):
+ """
+ Test the `ipalib.backend.Connectible.isconnected` method.
+ """
+ class example(self.cls):
+ pass
+ for klass in (self.cls, example):
+ o = klass()
+ assert o.isconnected() is False
+ conn = 'whatever'
+ setattr(context, klass.__name__, conn)
+ assert o.isconnected() is True
+ delattr(context, klass.__name__)
+
+ def test_conn(self):
+ """
+ Test the `ipalib.backend.Connectible.conn` property.
+ """
+ msg = 'no context.%s in thread %r'
+ class example(self.cls):
+ pass
+ for klass in (self.cls, example):
+ o = klass()
+ e = raises(AttributeError, getattr, o, 'conn')
+ assert str(e) == msg % (
+ klass.__name__, threading.currentThread().getName()
+ )
+ conn = Connection('The connection.', Disconnect())
+ setattr(context, klass.__name__, conn)
+ assert o.conn is conn.conn
+ delattr(context, klass.__name__)
+
+
+class test_Executioner(ClassChecker):
+ """
+ Test the `ipalib.backend.Executioner` class.
+ """
+ _cls = backend.Executioner
+
+ def test_execute(self):
+ """
+ Test the `ipalib.backend.Executioner.execute` method.
+ """
+ (api, home) = create_test_api(in_server=True)
+
+ class echo(Command):
+ takes_args = ('arg1', 'arg2+')
+ takes_options = ('option1?', 'option2?')
+ def execute(self, *args, **options):
+ assert type(args[1]) is tuple
+ return dict(result=args + (options,))
+ api.register(echo)
+
+ class good(Command):
+ def execute(self, **options):
+ raise errors.ValidationError(
+ name='nurse',
+ error=u'Not naughty!',
+ )
+ api.register(good)
+
+ class bad(Command):
+ def execute(self, **options):
+ raise ValueError('This is private.')
+ api.register(bad)
+
+ class with_name(Command):
+ """
+ Test that a kwarg named 'name' can be used.
+ """
+ takes_options = 'name'
+ def execute(self, **options):
+ return dict(result=options['name'].upper())
+ api.register(with_name)
+
+ api.finalize()
+ o = self.cls()
+ o.set_api(api)
+ o.finalize()
+
+ # Test that CommandError is raised:
+ conn = Connection('The connection.', Disconnect('someconn'))
+ context.someconn = conn
+ print str(context.__dict__.keys())
+ e = raises(errors.CommandError, o.execute, 'nope')
+ assert e.name == 'nope'
+ assert conn.disconnect.called is True # Make sure destroy_context() was called
+ print str(context.__dict__.keys())
+ assert context.__dict__.keys() == []
+
+ # Test with echo command:
+ arg1 = unicode_str
+ arg2 = (u'Hello', unicode_str, u'world!')
+ args = (arg1,) + arg2
+ options = dict(option1=u'How are you?', option2=unicode_str,
+ version=API_VERSION)
+
+ conn = Connection('The connection.', Disconnect('someconn'))
+ context.someconn = conn
+ print o.execute('echo', arg1, arg2, **options)
+ print dict(
+ result=(arg1, arg2, options)
+ )
+ assert o.execute('echo', arg1, arg2, **options) == dict(
+ result=(arg1, arg2, options)
+ )
+ assert conn.disconnect.called is True # Make sure destroy_context() was called
+ assert context.__dict__.keys() == []
+
+ conn = Connection('The connection.', Disconnect('someconn'))
+ context.someconn = conn
+ assert o.execute('echo', *args, **options) == dict(
+ result=(arg1, arg2, options)
+ )
+ assert conn.disconnect.called is True # Make sure destroy_context() was called
+ assert context.__dict__.keys() == []
+
+ # Test with good command:
+ conn = Connection('The connection.', Disconnect('someconn'))
+ context.someconn = conn
+ e = raises(errors.ValidationError, o.execute, 'good')
+ assert e.name == 'nurse'
+ assert e.error == u'Not naughty!'
+ assert conn.disconnect.called is True # Make sure destroy_context() was called
+ assert context.__dict__.keys() == []
+
+ # Test with bad command:
+ conn = Connection('The connection.', Disconnect('someconn'))
+ context.someconn = conn
+ e = raises(errors.InternalError, o.execute, 'bad')
+ assert conn.disconnect.called is True # Make sure destroy_context() was called
+ assert context.__dict__.keys() == []
+
+ # Test with option 'name':
+ conn = Connection('The connection.', Disconnect('someconn'))
+ context.someconn = conn
+ expected = dict(result=u'TEST')
+ assert expected == o.execute('with_name', name=u'test',
+ version=API_VERSION)
diff --git a/ipatests/test_ipalib/test_base.py b/ipatests/test_ipalib/test_base.py
new file mode 100644
index 000000000..ef6c180c7
--- /dev/null
+++ b/ipatests/test_ipalib/test_base.py
@@ -0,0 +1,352 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.base` module.
+"""
+
+from ipatests.util import ClassChecker, raises
+from ipalib.constants import NAME_REGEX, NAME_ERROR
+from ipalib.constants import TYPE_ERROR, SET_ERROR, DEL_ERROR, OVERRIDE_ERROR
+from ipalib import base
+
+
+class test_ReadOnly(ClassChecker):
+ """
+ Test the `ipalib.base.ReadOnly` class
+ """
+ _cls = base.ReadOnly
+
+ def test_lock(self):
+ """
+ Test the `ipalib.base.ReadOnly.__lock__` method.
+ """
+ o = self.cls()
+ assert o._ReadOnly__locked is False
+ o.__lock__()
+ assert o._ReadOnly__locked is True
+ e = raises(AssertionError, o.__lock__) # Can only be locked once
+ assert str(e) == '__lock__() can only be called once'
+ assert o._ReadOnly__locked is True # This should still be True
+
+ def test_islocked(self):
+ """
+ Test the `ipalib.base.ReadOnly.__islocked__` method.
+ """
+ o = self.cls()
+ assert o.__islocked__() is False
+ o.__lock__()
+ assert o.__islocked__() is True
+
+ def test_setattr(self):
+ """
+ Test the `ipalib.base.ReadOnly.__setattr__` method.
+ """
+ o = self.cls()
+ o.attr1 = 'Hello, world!'
+ assert o.attr1 == 'Hello, world!'
+ o.__lock__()
+ for name in ('attr1', 'attr2'):
+ e = raises(AttributeError, setattr, o, name, 'whatever')
+ assert str(e) == SET_ERROR % ('ReadOnly', name, 'whatever')
+ assert o.attr1 == 'Hello, world!'
+
+ def test_delattr(self):
+ """
+ Test the `ipalib.base.ReadOnly.__delattr__` method.
+ """
+ o = self.cls()
+ o.attr1 = 'Hello, world!'
+ o.attr2 = 'How are you?'
+ assert o.attr1 == 'Hello, world!'
+ assert o.attr2 == 'How are you?'
+ del o.attr1
+ assert not hasattr(o, 'attr1')
+ o.__lock__()
+ e = raises(AttributeError, delattr, o, 'attr2')
+ assert str(e) == DEL_ERROR % ('ReadOnly', 'attr2')
+ assert o.attr2 == 'How are you?'
+
+
+def test_lock():
+ """
+ Test the `ipalib.base.lock` function
+ """
+ f = base.lock
+
+ # Test with ReadOnly instance:
+ o = base.ReadOnly()
+ assert o.__islocked__() is False
+ assert f(o) is o
+ assert o.__islocked__() is True
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'already locked: %r' % o
+
+ # Test with another class implemented locking protocol:
+ class Lockable(object):
+ __locked = False
+ def __lock__(self):
+ self.__locked = True
+ def __islocked__(self):
+ return self.__locked
+ o = Lockable()
+ assert o.__islocked__() is False
+ assert f(o) is o
+ assert o.__islocked__() is True
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'already locked: %r' % o
+
+ # Test with a class incorrectly implementing the locking protocol:
+ class Broken(object):
+ def __lock__(self):
+ pass
+ def __islocked__(self):
+ return False
+ o = Broken()
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'failed to lock: %r' % o
+
+
+def test_islocked():
+ """
+ Test the `ipalib.base.islocked` function.
+ """
+ f = base.islocked
+
+ # Test with ReadOnly instance:
+ o = base.ReadOnly()
+ assert f(o) is False
+ o.__lock__()
+ assert f(o) is True
+
+ # Test with another class implemented locking protocol:
+ class Lockable(object):
+ __locked = False
+ def __lock__(self):
+ self.__locked = True
+ def __islocked__(self):
+ return self.__locked
+ o = Lockable()
+ assert f(o) is False
+ o.__lock__()
+ assert f(o) is True
+
+ # Test with a class incorrectly implementing the locking protocol:
+ class Broken(object):
+ __lock__ = False
+ def __islocked__(self):
+ return False
+ o = Broken()
+ e = raises(AssertionError, f, o)
+ assert str(e) == 'no __lock__() method: %r' % o
+
+
+def test_check_name():
+ """
+ Test the `ipalib.base.check_name` function.
+ """
+ f = base.check_name
+ okay = [
+ 'user_add',
+ 'stuff2junk',
+ 'sixty9',
+ ]
+ nope = [
+ '_user_add',
+ '__user_add',
+ 'user_add_',
+ 'user_add__',
+ '_user_add_',
+ '__user_add__',
+ '60nine',
+ ]
+ for name in okay:
+ assert name is f(name)
+ e = raises(TypeError, f, unicode(name))
+ assert str(e) == TYPE_ERROR % ('name', str, unicode(name), unicode)
+ for name in nope:
+ e = raises(ValueError, f, name)
+ assert str(e) == NAME_ERROR % (NAME_REGEX, name)
+ for name in okay:
+ e = raises(ValueError, f, name.upper())
+ assert str(e) == NAME_ERROR % (NAME_REGEX, name.upper())
+
+
+def membername(i):
+ return 'member%03d' % i
+
+
+class DummyMember(object):
+ def __init__(self, i):
+ self.i = i
+ self.name = membername(i)
+
+
+def gen_members(*indexes):
+ return tuple(DummyMember(i) for i in indexes)
+
+
+class test_NameSpace(ClassChecker):
+ """
+ Test the `ipalib.base.NameSpace` class.
+ """
+ _cls = base.NameSpace
+
+ def new(self, count, sort=True):
+ members = tuple(DummyMember(i) for i in xrange(count, 0, -1))
+ assert len(members) == count
+ o = self.cls(members, sort=sort)
+ return (o, members)
+
+ def test_init(self):
+ """
+ Test the `ipalib.base.NameSpace.__init__` method.
+ """
+ o = self.cls([])
+ assert len(o) == 0
+ assert list(o) == []
+ assert list(o()) == []
+
+ # Test members as attribute and item:
+ for cnt in (3, 42):
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ assert len(members) == cnt
+ for m in members:
+ assert getattr(o, m.name) is m
+ assert o[m.name] is m
+
+ # Test that TypeError is raised if sort is not a bool:
+ e = raises(TypeError, self.cls, [], sort=None)
+ assert str(e) == TYPE_ERROR % ('sort', bool, None, type(None))
+
+ # Test that AttributeError is raised with duplicate member name:
+ members = gen_members(0, 1, 2, 1, 3)
+ e = raises(AttributeError, self.cls, members)
+ assert str(e) == OVERRIDE_ERROR % (
+ 'NameSpace', membername(1), members[1], members[3]
+ )
+
+ def test_len(self):
+ """
+ Test the `ipalib.base.NameSpace.__len__` method.
+ """
+ for count in (5, 18, 127):
+ (o, members) = self.new(count)
+ assert len(o) == count
+ (o, members) = self.new(count, sort=False)
+ assert len(o) == count
+
+ def test_iter(self):
+ """
+ Test the `ipalib.base.NameSpace.__iter__` method.
+ """
+ (o, members) = self.new(25)
+ assert list(o) == sorted(m.name for m in members)
+ (o, members) = self.new(25, sort=False)
+ assert list(o) == list(m.name for m in members)
+
+ def test_call(self):
+ """
+ Test the `ipalib.base.NameSpace.__call__` method.
+ """
+ (o, members) = self.new(25)
+ assert list(o()) == sorted(members, key=lambda m: m.name)
+ (o, members) = self.new(25, sort=False)
+ assert tuple(o()) == members
+
+ def test_contains(self):
+ """
+ Test the `ipalib.base.NameSpace.__contains__` method.
+ """
+ yes = (99, 3, 777)
+ no = (9, 333, 77)
+ for sort in (True, False):
+ members = gen_members(*yes)
+ o = self.cls(members, sort=sort)
+ for i in yes:
+ assert membername(i) in o
+ assert membername(i).upper() not in o
+ for i in no:
+ assert membername(i) not in o
+
+ def test_getitem(self):
+ """
+ Test the `ipalib.base.NameSpace.__getitem__` method.
+ """
+ cnt = 17
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ assert len(members) == cnt
+ if sort is True:
+ members = tuple(sorted(members, key=lambda m: m.name))
+
+ # Test str keys:
+ for m in members:
+ assert o[m.name] is m
+ e = raises(KeyError, o.__getitem__, 'nope')
+
+ # Test int indexes:
+ for i in xrange(cnt):
+ assert o[i] is members[i]
+ e = raises(IndexError, o.__getitem__, cnt)
+
+ # Test negative int indexes:
+ for i in xrange(1, cnt + 1):
+ assert o[-i] is members[-i]
+ e = raises(IndexError, o.__getitem__, -(cnt + 1))
+
+ # Test slicing:
+ assert o[3:] == members[3:]
+ assert o[:10] == members[:10]
+ assert o[3:10] == members[3:10]
+ assert o[-9:] == members[-9:]
+ assert o[:-4] == members[:-4]
+ assert o[-9:-4] == members[-9:-4]
+
+ # Test that TypeError is raised with wrong type
+ e = raises(TypeError, o.__getitem__, 3.0)
+ assert str(e) == TYPE_ERROR % ('key', (str, int, slice), 3.0, float)
+
+ def test_repr(self):
+ """
+ Test the `ipalib.base.NameSpace.__repr__` method.
+ """
+ for cnt in (0, 1, 2):
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ if cnt == 1:
+ assert repr(o) == \
+ 'NameSpace(<%d member>, sort=%r)' % (cnt, sort)
+ else:
+ assert repr(o) == \
+ 'NameSpace(<%d members>, sort=%r)' % (cnt, sort)
+
+ def test_todict(self):
+ """
+ Test the `ipalib.base.NameSpace.__todict__` method.
+ """
+ for cnt in (3, 101):
+ for sort in (True, False):
+ (o, members) = self.new(cnt, sort=sort)
+ d = o.__todict__()
+ assert d == dict((m.name, m) for m in members)
+
+ # Test that a copy is returned:
+ assert o.__todict__() is not d
diff --git a/ipatests/test_ipalib/test_capabilities.py b/ipatests/test_ipalib/test_capabilities.py
new file mode 100644
index 000000000..21e53c2dc
--- /dev/null
+++ b/ipatests/test_ipalib/test_capabilities.py
@@ -0,0 +1,33 @@
+# Authors:
+# Petr Viktorin <pviktori@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.errors` module.
+"""
+
+from ipalib.capabilities import capabilities, client_has_capability
+
+
+def test_client_has_capability():
+ assert capabilities['messages'] == u'2.52'
+ assert client_has_capability(u'2.52', 'messages')
+ assert client_has_capability(u'2.60', 'messages')
+ assert client_has_capability(u'3.0', 'messages')
+ assert not client_has_capability(u'2.11', 'messages')
+ assert not client_has_capability(u'0.1', 'messages')
diff --git a/ipatests/test_ipalib/test_cli.py b/ipatests/test_ipalib/test_cli.py
new file mode 100644
index 000000000..07935c5ba
--- /dev/null
+++ b/ipatests/test_ipalib/test_cli.py
@@ -0,0 +1,116 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.cli` module.
+"""
+
+from ipatests.util import raises, get_api, ClassChecker
+from ipalib import cli, plugable, frontend, backend
+
+
+class test_textui(ClassChecker):
+ _cls = cli.textui
+
+ def test_max_col_width(self):
+ """
+ Test the `ipalib.cli.textui.max_col_width` method.
+ """
+ o = self.cls()
+ e = raises(TypeError, o.max_col_width, 'hello')
+ assert str(e) == 'rows: need %r or %r; got %r' % (list, tuple, 'hello')
+ rows = [
+ 'hello',
+ 'naughty',
+ 'nurse',
+ ]
+ assert o.max_col_width(rows) == len('naughty')
+ rows = (
+ ( 'a', 'bbb', 'ccccc'),
+ ('aa', 'bbbb', 'cccccc'),
+ )
+ assert o.max_col_width(rows, col=0) == 2
+ assert o.max_col_width(rows, col=1) == 4
+ assert o.max_col_width(rows, col=2) == 6
+
+
+def test_to_cli():
+ """
+ Test the `ipalib.cli.to_cli` function.
+ """
+ f = cli.to_cli
+ assert f('initialize') == 'initialize'
+ assert f('user_add') == 'user-add'
+
+
+def test_from_cli():
+ """
+ Test the `ipalib.cli.from_cli` function.
+ """
+ f = cli.from_cli
+ assert f('initialize') == 'initialize'
+ assert f('user-add') == 'user_add'
+
+
+def get_cmd_name(i):
+ return 'cmd_%d' % i
+
+
+class DummyCommand(object):
+ def __init__(self, name):
+ self.__name = name
+
+ def __get_name(self):
+ return self.__name
+ name = property(__get_name)
+
+
+class DummyAPI(object):
+ def __init__(self, cnt):
+ self.__cmd = plugable.NameSpace(self.__cmd_iter(cnt))
+
+ def __get_cmd(self):
+ return self.__cmd
+ Command = property(__get_cmd)
+
+ def __cmd_iter(self, cnt):
+ for i in xrange(cnt):
+ yield DummyCommand(get_cmd_name(i))
+
+ def finalize(self):
+ pass
+
+ def register(self, *args, **kw):
+ pass
+
+
+config_cli = """
+[global]
+
+from_cli_conf = set in cli.conf
+"""
+
+config_default = """
+[global]
+
+from_default_conf = set in default.conf
+
+# Make sure cli.conf is loaded first:
+from_cli_conf = overridden in default.conf
+"""
diff --git a/ipatests/test_ipalib/test_config.py b/ipatests/test_ipalib/test_config.py
new file mode 100644
index 000000000..f896b8936
--- /dev/null
+++ b/ipatests/test_ipalib/test_config.py
@@ -0,0 +1,609 @@
+# Authors:
+# Martin Nagy <mnagy@redhat.com>
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.config` module.
+"""
+
+import os
+from os import path
+import sys
+import socket
+from ipatests.util import raises, setitem, delitem, ClassChecker
+from ipatests.util import getitem, setitem, delitem
+from ipatests.util import TempDir, TempHome
+from ipalib.constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR
+from ipalib.constants import NAME_REGEX, NAME_ERROR
+from ipalib import config, constants, base
+
+
+# Valid environment variables in (key, raw, value) tuples:
+# key: the name of the environment variable
+# raw: the value being set (possibly a string repr)
+# value: the expected value after the lightweight conversion
+good_vars = (
+ ('a_string', u'Hello world!', u'Hello world!'),
+ ('trailing_whitespace', u' value ', u'value'),
+ ('an_int', 42, 42),
+ ('int_repr', ' 42 ', 42),
+ ('a_float', 3.14, 3.14),
+ ('float_repr', ' 3.14 ', 3.14),
+ ('true', True, True),
+ ('true_repr', ' True ', True),
+ ('false', False, False),
+ ('false_repr', ' False ', False),
+ ('none', None, None),
+ ('none_repr', ' None ', None),
+ ('empty', '', None),
+
+ # These verify that the implied conversion is case-sensitive:
+ ('not_true', u' true ', u'true'),
+ ('not_false', u' false ', u'false'),
+ ('not_none', u' none ', u'none'),
+)
+
+
+bad_names = (
+ ('CamelCase', u'value'),
+ ('_leading_underscore', u'value'),
+ ('trailing_underscore_', u'value'),
+)
+
+
+# Random base64-encoded data to simulate a misbehaving config file.
+config_bad = """
+/9j/4AAQSkZJRgABAQEAlgCWAAD//gAIT2xpdmVy/9sAQwAQCwwODAoQDg0OEhEQExgoGhgWFhgx
+IyUdKDozPTw5Mzg3QEhcTkBEV0U3OFBtUVdfYmdoZz5NcXlwZHhcZWdj/8AACwgAlgB0AQERAP/E
+ABsAAAEFAQEAAAAAAAAAAAAAAAQAAQIDBQYH/8QAMhAAAgICAAUDBAIABAcAAAAAAQIAAwQRBRIh
+MUEGE1EiMmFxFIEVI0LBFjNSYnKRof/aAAgBAQAAPwDCtzmNRr1o/MEP1D6f7kdkRakgBsAtoQhk
+xls/y3Z113I11mhiUc1ewCf1Oq4anJgINdhLhQoextfedmYrenfcvdzaFQnYAE08XhONTWEK8+js
+Fpo1oqAKoAA8CWjoJJTHM8kJ5jsiOiszAKD1+IV/hmW76rosbfnlh1Pp3Mah2srCnXQE9YXiel/c
+p5r7uVj2CwxPTuFjjmdLbteNwmrLwsYe3TjsD8cmjKV43ycy+3o76D4llFuXmuCoZEPczXVOSsLv
+f5lgGpNZLxJL2jnvMar0/wAOp6jHDH/uO4RViY9f/KpRdfC6k3R9fRyj+pRZVkWKqF10e+hCKaFq
+XlH/ALlmhK7Met/uUGZ5ow8XL57lU8/Yt4lx4jUOJphLobTe/wDaHeZLxHXtJEya9o5lFzCqpmPY
+CUYoPtDfc9TLj0G5jZvHaMFirAs++oEHq9U4rbNiMp8a6wO/1Zbzn2alC+Nx8P1JfdeBboA+AILx
+rin8pfbA1ynvKuFUXZOXXkLbzOp2R56andL2G45MmO0RPWWLEe8GzaffoKb/ADI44Pt9ZXxAuuFa
+axtgp0BOSPCcviNX8n3Aw8KTNHB4FiY9StkobLWHVSeghq8M4bkAhKKyV6Hl8RV8MwMZG1Uuz3Jn
+IcUQJlMFGlJ6D4hfpymy7iChHKqvVtefxO7Ai1txLBIn7pcojN3jGVhQO0ZgCNfM5ZHycTLycSkr
+yhtqD4Bmrfw5cuqsm6xHXyp1seRLcHCp4dQy1bOzslj1MzeJ5dVFnuMVdgOiHxOWzrmyMg2Nrbde
+k3vR2OTddcd6A5R8GdZqOo67k4wXrLAQPMRKnzImMZEzm+P1nFz6cxQeVujagWR6jsYiqivlH/Ux
+1M+7jWY30i7QHx1gF11tjGyxiSfmVc+503pPidVROHYNNY21b/adVZZySo3uOo1qIZQYd9RCzfYm
+TUk/qW71LjGkTA+IYiZmM1T9N9j8Gee5+McXJem0/Wp8GUK6KOi7b5MgzFjsxpJHZGDKSCOxE3cD
+OvsxbbLc9lsT7Vc73KX4ln3q1ZyVrPx2J/uAjLyan37z7B+Zp4vqPJqKi0K4EvzvUt1qBMdfb+T5
+gycfzkXXuc35InfE6nO8Y9SjFc1Yqh2Hdj2mH/xFxR26XgD/AMRJf45mWMqW5bBD3KqAZlZtb++7
+kEqTsHe//sG1CcTBvy7OWpD+Sewhz8CyKCTYAQPiGV0LVWPdxqQNADQ6zL4nWq2gopU6+ofmA8x3
+1MlvfeIGbnBeCHitRt94IFbRGus2U9H08v13sT+BNHjeX/D4bY4OmP0rPPbHLMWJ2Yy2EDQjVsos
+BdeYDx8wo5L5KpSdLWPAE1+G8NrFtBKgOAXPTf6mzViql5ZBoE87eJZkKbOQ8m+Yjf5EBzcO621y
+GCqD0H41Obzq7U6vzM577HTXgzPPeOIvM1eB59nD8xXVj7bHTr8iej1MtlauvUMNgzi/V2ctliYy
+HYTq37nMExpZXRZYpZVJUdzNjg+FXYwZgdhv6nVVUJU/uH7iNf1CARrtF0IB113M7jTNVjFl2xJA
+5ROey88OrVOugOy67TDs+89NRKdSYILdRC8ZQVJ+PHyJs4fqe3EoFPLzBexPxOdusa2xndiWY7JM
+qMUNrzOTAfHC9XO9/E3vT9blVJB0o2Zu3MAoYrsL13Ii0Muw3XvJG9KkDOeqjf6gWcw5A33XN9nX
+tOeyMRFWy3Jch+bX7mXmCsW/5RBXUoHaOIRi2asAJ0IRbjqzll3o/EAaRiltDojgv2E1aePmhEWq
+rsNHZ7wir1K/8Y1vUCSCAd+IXiZ9b1gLYvN07trXTUD4rxN2TkUgEts8p2NDtD0t5MVGchr2Xe99
+hMPNvD1LX5J2TuZhGyYwBijjfiHU5bJXrnYfqBRtRtSbIBWG3+xI6HiLUWz8xA9RuaVNrMAPfB5x
+r6v9MLr4S1il7LaxyjY69Jl5eG+Kyhiv1jYIMGYMO8etGscKoJJ8Cbp4bVg4ivaq22t3G/tmRYo5
+zyjQ+JRFFET01GB0Yid9YiYh1l9KgEHqT8Tco/hewA/NzgdQdwTNGNTY3uU2crL9HN00ZlovNzfV
+oCanBrBRk1rpCHPUkQjjYoW4GtwAw30MDpuxvbAvpJceR5mXFFEY0W4o4mpg0XNXutQxPUHxLb8q
+7mRDyszLr6esz8u++9wL2LcvQb8RXCkhBV3A6mR5rEVSrdFPT8SBLMdsdmWe6P8AUAx+TB4oooxi
+i1Jmt0+5dfuOLbANB2H6MjzNzc2zv5ji1g2+5/MYnbb+Yh+T0kubUY940UUbUWtRpJN8w1CfebkK
+WfUu+/mDOAGOjsRo0UkIo+pPl6Rckl7ehuR1INGAj9u0kW2nXvK45YlQp1odukaICSAjgSQWf//Z
+"""
+
+
+# A config file that tries to override some standard vars:
+config_override = """
+[global]
+
+key0 = var0
+home = /home/sweet/home
+key1 = var1
+site_packages = planet
+key2 = var2
+key3 = var3
+"""
+
+
+# A config file that tests the automatic type conversion
+config_good = """
+[global]
+
+string = Hello world!
+null = None
+yes = True
+no = False
+number = 42
+floating = 3.14
+"""
+
+
+# A default config file to make sure it does not overwrite the explicit one
+config_default = """
+[global]
+
+yes = Hello
+not_in_other = foo_bar
+"""
+
+
+class test_Env(ClassChecker):
+ """
+ Test the `ipalib.config.Env` class.
+ """
+
+ _cls = config.Env
+
+ def test_init(self):
+ """
+ Test the `ipalib.config.Env.__init__` method.
+ """
+ o = self.cls()
+ assert list(o) == []
+ assert len(o) == 0
+ assert o.__islocked__() is False
+
+ def test_lock(self):
+ """
+ Test the `ipalib.config.Env.__lock__` method.
+ """
+ o = self.cls()
+ assert o.__islocked__() is False
+ o.__lock__()
+ assert o.__islocked__() is True
+ e = raises(StandardError, o.__lock__)
+ assert str(e) == 'Env.__lock__() already called'
+
+ # Also test with base.lock() function:
+ o = self.cls()
+ assert o.__islocked__() is False
+ assert base.lock(o) is o
+ assert o.__islocked__() is True
+ e = raises(AssertionError, base.lock, o)
+ assert str(e) == 'already locked: %r' % o
+
+ def test_islocked(self):
+ """
+ Test the `ipalib.config.Env.__islocked__` method.
+ """
+ o = self.cls()
+ assert o.__islocked__() is False
+ assert base.islocked(o) is False
+ o.__lock__()
+ assert o.__islocked__() is True
+ assert base.islocked(o) is True
+
+ def test_setattr(self):
+ """
+ Test the `ipalib.config.Env.__setattr__` method.
+ """
+ o = self.cls()
+ for (name, raw, value) in good_vars:
+ # Test setting the value:
+ setattr(o, name, raw)
+ result = getattr(o, name)
+ assert type(result) is type(value)
+ assert result == value
+ assert result is o[name]
+
+ # Test that value cannot be overridden once set:
+ e = raises(AttributeError, setattr, o, name, raw)
+ assert str(e) == OVERRIDE_ERROR % ('Env', name, value, raw)
+
+ # Test that values cannot be set once locked:
+ o = self.cls()
+ o.__lock__()
+ for (name, raw, value) in good_vars:
+ e = raises(AttributeError, setattr, o, name, raw)
+ assert str(e) == SET_ERROR % ('Env', name, raw)
+
+ # Test that name is tested with check_name():
+ o = self.cls()
+ for (name, value) in bad_names:
+ e = raises(ValueError, setattr, o, name, value)
+ assert str(e) == NAME_ERROR % (NAME_REGEX, name)
+
+ def test_setitem(self):
+ """
+ Test the `ipalib.config.Env.__setitem__` method.
+ """
+ o = self.cls()
+ for (key, raw, value) in good_vars:
+ # Test setting the value:
+ o[key] = raw
+ result = o[key]
+ assert type(result) is type(value)
+ assert result == value
+ assert result is getattr(o, key)
+
+ # Test that value cannot be overridden once set:
+ e = raises(AttributeError, o.__setitem__, key, raw)
+ assert str(e) == OVERRIDE_ERROR % ('Env', key, value, raw)
+
+ # Test that values cannot be set once locked:
+ o = self.cls()
+ o.__lock__()
+ for (key, raw, value) in good_vars:
+ e = raises(AttributeError, o.__setitem__, key, raw)
+ assert str(e) == SET_ERROR % ('Env', key, raw)
+
+ # Test that name is tested with check_name():
+ o = self.cls()
+ for (key, value) in bad_names:
+ e = raises(ValueError, o.__setitem__, key, value)
+ assert str(e) == NAME_ERROR % (NAME_REGEX, key)
+
+ def test_getitem(self):
+ """
+ Test the `ipalib.config.Env.__getitem__` method.
+ """
+ o = self.cls()
+ value = u'some value'
+ o.key = value
+ assert o.key is value
+ assert o['key'] is value
+ for name in ('one', 'two'):
+ e = raises(KeyError, getitem, o, name)
+ assert str(e) == repr(name)
+
+ def test_delattr(self):
+ """
+ Test the `ipalib.config.Env.__delattr__` method.
+
+ This also tests that ``__delitem__`` is not implemented.
+ """
+ o = self.cls()
+ o.one = 1
+ assert o.one == 1
+ for key in ('one', 'two'):
+ e = raises(AttributeError, delattr, o, key)
+ assert str(e) == DEL_ERROR % ('Env', key)
+ e = raises(AttributeError, delitem, o, key)
+ assert str(e) == '__delitem__'
+
+ def test_contains(self):
+ """
+ Test the `ipalib.config.Env.__contains__` method.
+ """
+ o = self.cls()
+ items = [
+ ('one', 1),
+ ('two', 2),
+ ('three', 3),
+ ('four', 4),
+ ]
+ for (key, value) in items:
+ assert key not in o
+ o[key] = value
+ assert key in o
+
+ def test_len(self):
+ """
+ Test the `ipalib.config.Env.__len__` method.
+ """
+ o = self.cls()
+ assert len(o) == 0
+ for i in xrange(1, 11):
+ key = 'key%d' % i
+ value = u'value %d' % i
+ o[key] = value
+ assert o[key] is value
+ assert len(o) == i
+
+ def test_iter(self):
+ """
+ Test the `ipalib.config.Env.__iter__` method.
+ """
+ o = self.cls()
+ default_keys = tuple(o)
+ keys = ('one', 'two', 'three', 'four', 'five')
+ for key in keys:
+ o[key] = 'the value'
+ assert list(o) == sorted(keys + default_keys)
+
+ def test_merge(self):
+ """
+ Test the `ipalib.config.Env._merge` method.
+ """
+ group1 = (
+ ('key1', u'value 1'),
+ ('key2', u'value 2'),
+ ('key3', u'value 3'),
+ ('key4', u'value 4'),
+ )
+ group2 = (
+ ('key0', u'Value 0'),
+ ('key2', u'Value 2'),
+ ('key4', u'Value 4'),
+ ('key5', u'Value 5'),
+ )
+ o = self.cls()
+ assert o._merge(**dict(group1)) == (4, 4)
+ assert len(o) == 4
+ assert list(o) == list(key for (key, value) in group1)
+ for (key, value) in group1:
+ assert getattr(o, key) is value
+ assert o[key] is value
+ assert o._merge(**dict(group2)) == (2, 4)
+ assert len(o) == 6
+ expected = dict(group2)
+ expected.update(dict(group1))
+ assert list(o) == sorted(expected)
+ assert expected['key2'] == 'value 2' # And not 'Value 2'
+ for (key, value) in expected.iteritems():
+ assert getattr(o, key) is value
+ assert o[key] is value
+ assert o._merge(**expected) == (0, 6)
+ assert len(o) == 6
+ assert list(o) == sorted(expected)
+
+ def test_merge_from_file(self):
+ """
+ Test the `ipalib.config.Env._merge_from_file` method.
+ """
+ tmp = TempDir()
+ assert callable(tmp.join)
+
+ # Test a config file that doesn't exist
+ no_exist = tmp.join('no_exist.conf')
+ assert not path.exists(no_exist)
+ o = self.cls()
+ o._bootstrap()
+ keys = tuple(o)
+ orig = dict((k, o[k]) for k in o)
+ assert o._merge_from_file(no_exist) is None
+ assert tuple(o) == keys
+
+ # Test an empty config file
+ empty = tmp.touch('empty.conf')
+ assert path.isfile(empty)
+ assert o._merge_from_file(empty) == (0, 0)
+ assert tuple(o) == keys
+
+ # Test a mal-formed config file:
+ bad = tmp.join('bad.conf')
+ open(bad, 'w').write(config_bad)
+ assert path.isfile(bad)
+ assert o._merge_from_file(bad) is None
+ assert tuple(o) == keys
+
+ # Test a valid config file that tries to override
+ override = tmp.join('override.conf')
+ open(override, 'w').write(config_override)
+ assert path.isfile(override)
+ assert o._merge_from_file(override) == (4, 6)
+ for (k, v) in orig.items():
+ assert o[k] is v
+ assert list(o) == sorted(keys + ('key0', 'key1', 'key2', 'key3', 'config_loaded'))
+ for i in xrange(4):
+ assert o['key%d' % i] == ('var%d' % i)
+ keys = tuple(o)
+
+ # Test a valid config file with type conversion
+ good = tmp.join('good.conf')
+ open(good, 'w').write(config_good)
+ assert path.isfile(good)
+ assert o._merge_from_file(good) == (6, 6)
+ added = ('string', 'null', 'yes', 'no', 'number', 'floating')
+ assert list(o) == sorted(keys + added)
+ assert o.string == 'Hello world!'
+ assert o.null is None
+ assert o.yes is True
+ assert o.no is False
+ assert o.number == 42
+ assert o.floating == 3.14
+
+ def new(self, in_tree=False):
+ """
+ Set os.environ['HOME'] to a tempdir.
+
+ Returns tuple with new Env instance and the TempHome instance. This
+ helper method is used in testing the bootstrap related methods below.
+ """
+ home = TempHome()
+ o = self.cls()
+ if in_tree:
+ o.in_tree = True
+ return (o, home)
+
+ def bootstrap(self, **overrides):
+ """
+ Helper method used in testing bootstrap related methods below.
+ """
+ (o, home) = self.new()
+ assert o._isdone('_bootstrap') is False
+ o._bootstrap(**overrides)
+ assert o._isdone('_bootstrap') is True
+ e = raises(StandardError, o._bootstrap)
+ assert str(e) == 'Env._bootstrap() already called'
+ return (o, home)
+
+ def test_bootstrap(self):
+ """
+ Test the `ipalib.config.Env._bootstrap` method.
+ """
+ # Test defaults created by _bootstrap():
+ (o, home) = self.new()
+ o._bootstrap()
+ ipalib = path.dirname(path.abspath(config.__file__))
+ assert o.ipalib == ipalib
+ assert o.site_packages == path.dirname(ipalib)
+ assert o.script == path.abspath(sys.argv[0])
+ assert o.bin == path.dirname(path.abspath(sys.argv[0]))
+ assert o.home == home.path
+ assert o.dot_ipa == home.join('.ipa')
+ assert o.in_tree is False
+ assert o.context == 'default'
+ assert o.confdir == '/etc/ipa'
+ assert o.conf == '/etc/ipa/default.conf'
+ assert o.conf_default == o.conf
+
+ # Test overriding values created by _bootstrap()
+ (o, home) = self.bootstrap(in_tree='True', context='server')
+ assert o.in_tree is True
+ assert o.context == 'server'
+ assert o.conf == home.join('.ipa', 'server.conf')
+ (o, home) = self.bootstrap(conf='/my/wacky/whatever.conf')
+ assert o.in_tree is False
+ assert o.context == 'default'
+ assert o.conf == '/my/wacky/whatever.conf'
+ assert o.conf_default == '/etc/ipa/default.conf'
+ (o, home) = self.bootstrap(conf_default='/my/wacky/default.conf')
+ assert o.in_tree is False
+ assert o.context == 'default'
+ assert o.conf == '/etc/ipa/default.conf'
+ assert o.conf_default == '/my/wacky/default.conf'
+
+ # Test various overrides and types conversion
+ kw = dict(
+ yes=True,
+ no=False,
+ num=42,
+ msg='Hello, world!',
+ )
+ override = dict(
+ (k, u' %s ' % v) for (k, v) in kw.items()
+ )
+ (o, home) = self.new()
+ for key in kw:
+ assert key not in o
+ o._bootstrap(**override)
+ for (key, value) in kw.items():
+ assert getattr(o, key) == value
+ assert o[key] == value
+
+ def finalize_core(self, ctx, **defaults):
+ """
+ Helper method used in testing `Env._finalize_core`.
+ """
+ # We must force in_tree=True so we don't load possible config files in
+ # /etc/ipa/, whose contents could break this test:
+ (o, home) = self.new(in_tree=True)
+ if ctx:
+ o.context = ctx
+
+ # Check that calls cascade down the chain:
+ set_here = ('in_server', 'logdir', 'log')
+ assert o._isdone('_bootstrap') is False
+ assert o._isdone('_finalize_core') is False
+ assert o._isdone('_finalize') is False
+ for key in set_here:
+ assert key not in o
+ o._finalize_core(**defaults)
+ assert o._isdone('_bootstrap') is True
+ assert o._isdone('_finalize_core') is True
+ assert o._isdone('_finalize') is False # Should not cascade
+ for key in set_here:
+ assert key in o
+
+ # Check that it can't be called twice:
+ e = raises(StandardError, o._finalize_core)
+ assert str(e) == 'Env._finalize_core() already called'
+
+ return (o, home)
+
+ def test_finalize_core(self):
+ """
+ Test the `ipalib.config.Env._finalize_core` method.
+ """
+ # Test that correct defaults are generated:
+ (o, home) = self.finalize_core(None)
+ assert o.in_server is False
+ assert o.logdir == home.join('.ipa', 'log')
+ assert o.log == home.join('.ipa', 'log', 'default.log')
+
+ # Test with context='server'
+ (o, home) = self.finalize_core('server')
+ assert o.in_server is True
+ assert o.logdir == home.join('.ipa', 'log')
+ assert o.log == home.join('.ipa', 'log', 'server.log')
+
+ # Test that **defaults can't set in_server, logdir, nor log:
+ (o, home) = self.finalize_core(None,
+ in_server='IN_SERVER',
+ logdir='LOGDIR',
+ log='LOG',
+ )
+ assert o.in_server is False
+ assert o.logdir == home.join('.ipa', 'log')
+ assert o.log == home.join('.ipa', 'log', 'default.log')
+
+ # Test loading config file, plus test some in-tree stuff
+ (o, home) = self.bootstrap(in_tree=True, context='server')
+ for key in ('yes', 'no', 'number'):
+ assert key not in o
+ home.write(config_good, '.ipa', 'server.conf')
+ home.write(config_default, '.ipa', 'default.conf')
+ o._finalize_core()
+ assert o.in_tree is True
+ assert o.context == 'server'
+ assert o.in_server is True
+ assert o.logdir == home.join('.ipa', 'log')
+ assert o.log == home.join('.ipa', 'log', 'server.log')
+ assert o.yes is True
+ assert o.no is False
+ assert o.number == 42
+ assert o.not_in_other == 'foo_bar'
+
+ # Test using DEFAULT_CONFIG:
+ defaults = dict(constants.DEFAULT_CONFIG)
+ (o, home) = self.finalize_core(None, **defaults)
+ assert list(o) == sorted(defaults)
+ for (key, value) in defaults.items():
+ if value is object:
+ continue
+ if key == 'mode':
+ continue
+ assert o[key] == value, '%r is %r; should be %r' % (key, o[key], value)
+
+ def test_finalize(self):
+ """
+ Test the `ipalib.config.Env._finalize` method.
+ """
+ # Check that calls cascade up the chain:
+ (o, home) = self.new(in_tree=True)
+ assert o._isdone('_bootstrap') is False
+ assert o._isdone('_finalize_core') is False
+ assert o._isdone('_finalize') is False
+ o._finalize()
+ assert o._isdone('_bootstrap') is True
+ assert o._isdone('_finalize_core') is True
+ assert o._isdone('_finalize') is True
+
+ # Check that it can't be called twice:
+ e = raises(StandardError, o._finalize)
+ assert str(e) == 'Env._finalize() already called'
+
+ # Check that _finalize() calls __lock__()
+ (o, home) = self.new(in_tree=True)
+ assert o.__islocked__() is False
+ o._finalize()
+ assert o.__islocked__() is True
+ e = raises(StandardError, o.__lock__)
+ assert str(e) == 'Env.__lock__() already called'
+
+ # Check that **lastchance works
+ (o, home) = self.finalize_core(None)
+ key = 'just_one_more_key'
+ value = u'with one more value'
+ lastchance = {key: value}
+ assert key not in o
+ assert o._isdone('_finalize') is False
+ o._finalize(**lastchance)
+ assert key in o
+ assert o[key] is value
diff --git a/ipatests/test_ipalib/test_crud.py b/ipatests/test_ipalib/test_crud.py
new file mode 100644
index 000000000..602f99f24
--- /dev/null
+++ b/ipatests/test_ipalib/test_crud.py
@@ -0,0 +1,240 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.crud` module.
+"""
+
+from ipatests.util import read_only, raises, get_api, ClassChecker
+from ipalib import crud, frontend, plugable, config
+from ipalib.parameters import Str
+
+
+class CrudChecker(ClassChecker):
+ """
+ Class for testing base classes in `ipalib.crud`.
+ """
+
+ def get_api(self, args=tuple(), options=tuple()):
+ """
+ Return a finalized `ipalib.plugable.API` instance.
+ """
+ (api, home) = get_api()
+ class user(frontend.Object):
+ takes_params = (
+ 'givenname',
+ Str('sn', flags='no_update'),
+ Str('uid', primary_key=True),
+ 'initials',
+ Str('uidnumber', flags=['no_create', 'no_search'])
+ )
+ class user_verb(self.cls):
+ takes_args = args
+ takes_options = options
+ api.register(user)
+ api.register(user_verb)
+ api.finalize()
+ return api
+
+
+class test_Create(CrudChecker):
+ """
+ Test the `ipalib.crud.Create` class.
+ """
+
+ _cls = crud.Create
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Create.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Create.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials', 'all', 'raw', 'version']
+ for param in api.Method.user_verb.options():
+ if param.name != 'version':
+ assert param.required is True
+ api = self.get_api(options=('extra?',))
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials', 'extra', 'all', 'raw', 'version']
+ assert api.Method.user_verb.options.extra.required is False
+
+
+class test_Update(CrudChecker):
+ """
+ Test the `ipalib.crud.Update` class.
+ """
+
+ _cls = crud.Update
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Update.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Update.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'initials', 'uidnumber', 'all', 'raw', 'version']
+ for param in api.Method.user_verb.options():
+ if param.name in ['all', 'raw']:
+ assert param.required is True
+ else:
+ assert param.required is False
+
+
+class test_Retrieve(CrudChecker):
+ """
+ Test the `ipalib.crud.Retrieve` class.
+ """
+
+ _cls = crud.Retrieve
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Retrieve.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Retrieve.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == ['all', 'raw', 'version']
+
+
+class test_Delete(CrudChecker):
+ """
+ Test the `ipalib.crud.Delete` class.
+ """
+
+ _cls = crud.Delete
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Delete.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Delete.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == ['version']
+ assert len(api.Method.user_verb.options) == 1
+
+
+class test_Search(CrudChecker):
+ """
+ Test the `ipalib.crud.Search` class.
+ """
+
+ _cls = crud.Search
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Search.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['criteria']
+ assert api.Method.user_verb.args.criteria.required is False
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Search.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'uid', 'initials', 'all', 'raw', 'version']
+ for param in api.Method.user_verb.options():
+ if param.name in ['all', 'raw']:
+ assert param.required is True
+ else:
+ assert param.required is False
+
+
+class test_CrudBackend(ClassChecker):
+ """
+ Test the `ipalib.crud.CrudBackend` class.
+ """
+
+ _cls = crud.CrudBackend
+
+ def get_subcls(self):
+ class ldap(self.cls):
+ pass
+ return ldap
+
+ def check_method(self, name, *args):
+ o = self.cls()
+ e = raises(NotImplementedError, getattr(o, name), *args)
+ assert str(e) == 'CrudBackend.%s()' % name
+ sub = self.subcls()
+ e = raises(NotImplementedError, getattr(sub, name), *args)
+ assert str(e) == 'ldap.%s()' % name
+
+ def test_create(self):
+ """
+ Test the `ipalib.crud.CrudBackend.create` method.
+ """
+ self.check_method('create')
+
+ def test_retrieve(self):
+ """
+ Test the `ipalib.crud.CrudBackend.retrieve` method.
+ """
+ self.check_method('retrieve', 'primary key', 'attribute')
+
+ def test_update(self):
+ """
+ Test the `ipalib.crud.CrudBackend.update` method.
+ """
+ self.check_method('update', 'primary key')
+
+ def test_delete(self):
+ """
+ Test the `ipalib.crud.CrudBackend.delete` method.
+ """
+ self.check_method('delete', 'primary key')
+
+ def test_search(self):
+ """
+ Test the `ipalib.crud.CrudBackend.search` method.
+ """
+ self.check_method('search')
diff --git a/ipatests/test_ipalib/test_errors.py b/ipatests/test_ipalib/test_errors.py
new file mode 100644
index 000000000..258af3b3f
--- /dev/null
+++ b/ipatests/test_ipalib/test_errors.py
@@ -0,0 +1,374 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.errors` module.
+"""
+
+import re
+import inspect
+
+from ipatests.util import assert_equal, raises
+from ipalib import errors, text
+from ipalib.constants import TYPE_ERROR
+
+
+class PrivateExceptionTester(object):
+ _klass = None
+ __klass = None
+
+ def __get_klass(self):
+ if self.__klass is None:
+ self.__klass = self._klass
+ assert issubclass(self.__klass, StandardError)
+ assert issubclass(self.__klass, errors.PrivateError)
+ assert not issubclass(self.__klass, errors.PublicError)
+ return self.__klass
+ klass = property(__get_klass)
+
+ def new(self, **kw):
+ for (key, value) in kw.iteritems():
+ assert not hasattr(self.klass, key), key
+ inst = self.klass(**kw)
+ assert isinstance(inst, StandardError)
+ assert isinstance(inst, errors.PrivateError)
+ assert isinstance(inst, self.klass)
+ assert not isinstance(inst, errors.PublicError)
+ for (key, value) in kw.iteritems():
+ assert getattr(inst, key) is value
+ assert str(inst) == self.klass.format % kw
+ assert inst.message == str(inst)
+ return inst
+
+
+class test_PrivateError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors.PrivateError` exception.
+ """
+ _klass = errors.PrivateError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.PrivateError.__init__` method.
+ """
+ inst = self.klass(key1='Value 1', key2='Value 2')
+ assert inst.key1 == 'Value 1'
+ assert inst.key2 == 'Value 2'
+ assert str(inst) == ''
+
+ # Test subclass and use of format:
+ class subclass(self.klass):
+ format = '%(true)r %(text)r %(number)r'
+
+ kw = dict(true=True, text='Hello!', number=18)
+ inst = subclass(**kw)
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+ assert str(inst) == subclass.format % kw
+
+ # Test via PrivateExceptionTester.new()
+ inst = self.new(**kw)
+ assert isinstance(inst, self.klass)
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+
+
+class test_SubprocessError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors.SubprocessError` exception.
+ """
+
+ _klass = errors.SubprocessError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.SubprocessError.__init__` method.
+ """
+ inst = self.new(returncode=1, argv=('/bin/false',))
+ assert inst.returncode == 1
+ assert inst.argv == ('/bin/false',)
+ assert str(inst) == "return code 1 from ('/bin/false',)"
+ assert inst.message == str(inst)
+
+
+class test_PluginSubclassError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors.PluginSubclassError` exception.
+ """
+
+ _klass = errors.PluginSubclassError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.PluginSubclassError.__init__` method.
+ """
+ inst = self.new(plugin='bad', bases=('base1', 'base2'))
+ assert inst.plugin == 'bad'
+ assert inst.bases == ('base1', 'base2')
+ assert str(inst) == \
+ "'bad' not subclass of any base in ('base1', 'base2')"
+ assert inst.message == str(inst)
+
+
+class test_PluginDuplicateError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors.PluginDuplicateError` exception.
+ """
+
+ _klass = errors.PluginDuplicateError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.PluginDuplicateError.__init__` method.
+ """
+ inst = self.new(plugin='my_plugin')
+ assert inst.plugin == 'my_plugin'
+ assert str(inst) == "'my_plugin' was already registered"
+ assert inst.message == str(inst)
+
+
+class test_PluginOverrideError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors.PluginOverrideError` exception.
+ """
+
+ _klass = errors.PluginOverrideError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.PluginOverrideError.__init__` method.
+ """
+ inst = self.new(base='Base', name='cmd', plugin='my_cmd')
+ assert inst.base == 'Base'
+ assert inst.name == 'cmd'
+ assert inst.plugin == 'my_cmd'
+ assert str(inst) == "unexpected override of Base.cmd with 'my_cmd'"
+ assert inst.message == str(inst)
+
+
+class test_PluginMissingOverrideError(PrivateExceptionTester):
+ """
+ Test the `ipalib.errors.PluginMissingOverrideError` exception.
+ """
+
+ _klass = errors.PluginMissingOverrideError
+
+ def test_init(self):
+ """
+ Test the `ipalib.errors.PluginMissingOverrideError.__init__` method.
+ """
+ inst = self.new(base='Base', name='cmd', plugin='my_cmd')
+ assert inst.base == 'Base'
+ assert inst.name == 'cmd'
+ assert inst.plugin == 'my_cmd'
+ assert str(inst) == "Base.cmd not registered, cannot override with 'my_cmd'"
+ assert inst.message == str(inst)
+
+
+
+##############################################################################
+# Unit tests for public errors:
+
+class PublicExceptionTester(object):
+ _klass = None
+ __klass = None
+
+ def __get_klass(self):
+ if self.__klass is None:
+ self.__klass = self._klass
+ assert issubclass(self.__klass, StandardError)
+ assert issubclass(self.__klass, errors.PublicError)
+ assert not issubclass(self.__klass, errors.PrivateError)
+ assert type(self.__klass.errno) is int
+ assert 900 <= self.__klass.errno <= 5999
+ return self.__klass
+ klass = property(__get_klass)
+
+ def new(self, format=None, message=None, **kw):
+ # Test that TypeError is raised if message isn't unicode:
+ e = raises(TypeError, self.klass, message='The message')
+ assert str(e) == TYPE_ERROR % ('message', unicode, 'The message', str)
+
+ # Test the instance:
+ for (key, value) in kw.iteritems():
+ assert not hasattr(self.klass, key), key
+ inst = self.klass(format=format, message=message, **kw)
+ for required_class in self.required_classes:
+ assert isinstance(inst, required_class)
+ assert isinstance(inst, self.klass)
+ assert not isinstance(inst, errors.PrivateError)
+ for (key, value) in kw.iteritems():
+ assert getattr(inst, key) is value
+ return inst
+
+
+class test_PublicError(PublicExceptionTester):
+ """
+ Test the `ipalib.errors.PublicError` exception.
+ """
+ _klass = errors.PublicError
+ required_classes = StandardError, errors.PublicError
+
+ def test_init(self):
+ message = u'The translated, interpolated message'
+ format = 'key=%(key1)r and key2=%(key2)r'
+ uformat = u'Translated key=%(key1)r and key2=%(key2)r'
+ val1 = 'Value 1'
+ val2 = 'Value 2'
+ kw = dict(key1=val1, key2=val2)
+
+ # Test with format=str, message=None
+ inst = self.klass(format, **kw)
+ assert inst.format is format
+ assert_equal(inst.message, format % kw)
+ assert inst.forwarded is False
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+ # Test with format=None, message=unicode
+ inst = self.klass(message=message, **kw)
+ assert inst.format is None
+ assert inst.message is message
+ assert inst.strerror is message
+ assert inst.forwarded is True
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+ # Test with format=None, message=str
+ e = raises(TypeError, self.klass, message='the message', **kw)
+ assert str(e) == TYPE_ERROR % ('message', unicode, 'the message', str)
+
+ # Test with format=None, message=None
+ e = raises(ValueError, self.klass, **kw)
+ assert (str(e) == '%s.format is None yet format=None, message=None' %
+ self.klass.__name__)
+
+
+ ######################################
+ # Test via PublicExceptionTester.new()
+
+ # Test with format=str, message=None
+ inst = self.new(format, **kw)
+ assert isinstance(inst, self.klass)
+ assert inst.format is format
+ assert_equal(inst.message, format % kw)
+ assert inst.forwarded is False
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+ # Test with format=None, message=unicode
+ inst = self.new(message=message, **kw)
+ assert isinstance(inst, self.klass)
+ assert inst.format is None
+ assert inst.message is message
+ assert inst.strerror is message
+ assert inst.forwarded is True
+ assert inst.key1 is val1
+ assert inst.key2 is val2
+
+
+ ##################
+ # Test a subclass:
+ class subclass(self.klass):
+ format = '%(true)r %(text)r %(number)r'
+
+ uformat = u'Translated %(true)r %(text)r %(number)r'
+ kw = dict(true=True, text='Hello!', number=18)
+
+ # Test with format=str, message=None
+ e = raises(ValueError, subclass, format, **kw)
+ assert str(e) == 'non-generic %r needs format=None; got format=%r' % (
+ 'subclass', format)
+
+ # Test with format=None, message=None:
+ inst = subclass(**kw)
+ assert inst.format is subclass.format
+ assert_equal(inst.message, subclass.format % kw)
+ assert inst.forwarded is False
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+
+ # Test with format=None, message=unicode:
+ inst = subclass(message=message, **kw)
+ assert inst.format is subclass.format
+ assert inst.message is message
+ assert inst.strerror is message
+ assert inst.forwarded is True
+ assert inst.true is True
+ assert inst.text is kw['text']
+ assert inst.number is kw['number']
+
+ # Test with instructions:
+ # first build up "instructions", then get error and search for
+ # lines of instructions appended to the end of the strerror
+ # despite the parameter 'instructions' not existing in the format
+ instructions = u"The quick brown fox jumps over the lazy dog".split()
+ # this expression checks if each word of instructions
+ # exists in a string as a separate line, with right order
+ regexp = re.compile('(?ims).*' +
+ ''.join(map(lambda x: '(%s).*' % (x),
+ instructions)) +
+ '$')
+ inst = subclass(instructions=instructions, **kw)
+ assert inst.format is subclass.format
+ assert_equal(inst.instructions, instructions)
+ inst_match = regexp.match(inst.strerror).groups()
+ assert_equal(list(inst_match),list(instructions))
+
+
+class BaseMessagesTest(object):
+ """Generic test for all of a module's errors or messages
+ """
+ def test_public_messages(self):
+ i = 0
+ for klass in self.message_list:
+ for required_class in self.required_classes:
+ assert issubclass(klass, required_class)
+ assert type(klass.errno) is int
+ assert klass.errno in self.errno_range
+ doc = inspect.getdoc(klass)
+ assert doc is not None, 'need class docstring for %s' % klass.__name__
+ m = re.match(r'^\*{2}(\d+)\*{2} ', doc)
+ assert m is not None, "need '**ERRNO**' in %s docstring" % klass.__name__
+ errno = int(m.group(1))
+ assert errno == klass.errno, (
+ 'docstring=%r but errno=%r in %s' % (errno, klass.errno, klass.__name__)
+ )
+ self.extratest(klass)
+
+ # Test format
+ if klass.format is not None:
+ assert klass.format is self.texts[i]
+ i += 1
+
+ def extratest(self, cls):
+ pass
+
+
+class test_PublicErrors(object):
+ message_list = errors.public_errors
+ errno_range = xrange(900, 5999)
+ required_classes = (StandardError, errors.PublicError)
+ texts = errors._texts
+
+ def extratest(self, cls):
+ assert not issubclass(cls, errors.PrivateError)
diff --git a/ipatests/test_ipalib/test_frontend.py b/ipatests/test_ipalib/test_frontend.py
new file mode 100644
index 000000000..310d7a53d
--- /dev/null
+++ b/ipatests/test_ipalib/test_frontend.py
@@ -0,0 +1,1188 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.frontend` module.
+"""
+
+from ipatests.util import raises, getitem, no_set, no_del, read_only
+from ipatests.util import check_TypeError, ClassChecker, create_test_api
+from ipatests.util import assert_equal
+from ipalib.constants import TYPE_ERROR
+from ipalib.base import NameSpace
+from ipalib import frontend, backend, plugable, errors, parameters, config
+from ipalib import output, messages
+from ipalib.parameters import Str
+from ipapython.version import API_VERSION
+
+def test_RULE_FLAG():
+ assert frontend.RULE_FLAG == 'validation_rule'
+
+
+def test_rule():
+ """
+ Test the `ipalib.frontend.rule` function.
+ """
+ flag = frontend.RULE_FLAG
+ rule = frontend.rule
+ def my_func():
+ pass
+ assert not hasattr(my_func, flag)
+ rule(my_func)
+ assert getattr(my_func, flag) is True
+ @rule
+ def my_func2():
+ pass
+ assert getattr(my_func2, flag) is True
+
+
+def test_is_rule():
+ """
+ Test the `ipalib.frontend.is_rule` function.
+ """
+ is_rule = frontend.is_rule
+ flag = frontend.RULE_FLAG
+
+ class no_call(object):
+ def __init__(self, value):
+ if value is not None:
+ assert value in (True, False)
+ setattr(self, flag, value)
+
+ class call(no_call):
+ def __call__(self):
+ pass
+
+ assert is_rule(call(True))
+ assert not is_rule(no_call(True))
+ assert not is_rule(call(False))
+ assert not is_rule(call(None))
+
+
+class test_HasParam(ClassChecker):
+ """
+ Test the `ipalib.frontend.Command` class.
+ """
+
+ _cls = frontend.HasParam
+
+ def test_get_param_iterable(self):
+ """
+ Test the `ipalib.frontend.HasParam._get_param_iterable` method.
+ """
+ class WithTuple(self.cls):
+ takes_stuff = ('one', 'two')
+ o = WithTuple()
+ assert o._get_param_iterable('stuff') is WithTuple.takes_stuff
+
+ junk = ('three', 'four')
+ class WithCallable(self.cls):
+ def takes_stuff(self):
+ return junk
+ o = WithCallable()
+ assert o._get_param_iterable('stuff') is junk
+
+ class WithParam(self.cls):
+ takes_stuff = parameters.Str('five')
+ o = WithParam()
+ assert o._get_param_iterable('stuff') == (WithParam.takes_stuff,)
+
+ class WithStr(self.cls):
+ takes_stuff = 'six'
+ o = WithStr()
+ assert o._get_param_iterable('stuff') == ('six',)
+
+ class Wrong(self.cls):
+ takes_stuff = ['seven', 'eight']
+ o = Wrong()
+ e = raises(TypeError, o._get_param_iterable, 'stuff')
+ assert str(e) == '%s.%s must be a tuple, callable, or spec; got %r' % (
+ 'Wrong', 'takes_stuff', Wrong.takes_stuff
+ )
+
+ def test_filter_param_by_context(self):
+ """
+ Test the `ipalib.frontend.HasParam._filter_param_by_context` method.
+ """
+ class Example(self.cls):
+ def get_stuff(self):
+ return (
+ 'one', # Make sure create_param() is called for each spec
+ 'two',
+ parameters.Str('three', include='cli'),
+ parameters.Str('four', exclude='server'),
+ parameters.Str('five', exclude=['whatever', 'cli']),
+ )
+ o = Example()
+
+ # Test when env is None:
+ params = list(o._filter_param_by_context('stuff'))
+ assert list(p.name for p in params) == [
+ 'one', 'two', 'three', 'four', 'five'
+ ]
+ for p in params:
+ assert type(p) is parameters.Str
+
+ # Test when env.context == 'cli':
+ cli = config.Env(context='cli')
+ assert cli.context == 'cli'
+ params = list(o._filter_param_by_context('stuff', cli))
+ assert list(p.name for p in params) == ['one', 'two', 'three', 'four']
+ for p in params:
+ assert type(p) is parameters.Str
+
+ # Test when env.context == 'server'
+ server = config.Env(context='server')
+ assert server.context == 'server'
+ params = list(o._filter_param_by_context('stuff', server))
+ assert list(p.name for p in params) == ['one', 'two', 'five']
+ for p in params:
+ assert type(p) is parameters.Str
+
+ # Test with no get_stuff:
+ class Missing(self.cls):
+ pass
+ o = Missing()
+ gen = o._filter_param_by_context('stuff')
+ e = raises(NotImplementedError, list, gen)
+ assert str(e) == 'Missing.get_stuff()'
+
+ # Test when get_stuff is not callable:
+ class NotCallable(self.cls):
+ get_stuff = ('one', 'two')
+ o = NotCallable()
+ gen = o._filter_param_by_context('stuff')
+ e = raises(TypeError, list, gen)
+ assert str(e) == '%s.%s must be a callable; got %r' % (
+ 'NotCallable', 'get_stuff', NotCallable.get_stuff
+ )
+
+
+class test_Command(ClassChecker):
+ """
+ Test the `ipalib.frontend.Command` class.
+ """
+
+ _cls = frontend.Command
+
+ def get_subcls(self):
+ """
+ Return a standard subclass of `ipalib.frontend.Command`.
+ """
+ class Rule(object):
+ def __init__(self, name):
+ self.name = name
+
+ def __call__(self, _, value):
+ if value != self.name:
+ return _('must equal %r') % self.name
+
+ default_from = parameters.DefaultFrom(
+ lambda arg: arg,
+ 'default_from'
+ )
+ normalizer = lambda value: value.lower()
+
+ class example(self.cls):
+ takes_options = (
+ parameters.Str('option0', Rule('option0'),
+ normalizer=normalizer,
+ default_from=default_from,
+ ),
+ parameters.Str('option1', Rule('option1'),
+ normalizer=normalizer,
+ default_from=default_from,
+ ),
+ )
+ return example
+
+ def get_instance(self, args=tuple(), options=tuple()):
+ """
+ Helper method used to test args and options.
+ """
+ class example(self.cls):
+ takes_args = args
+ takes_options = options
+ o = example()
+ o.finalize()
+ return o
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Command` class.
+ """
+ assert self.cls.takes_options == tuple()
+ assert self.cls.takes_args == tuple()
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.frontend.Command.get_args` method.
+ """
+ assert list(self.cls().get_args()) == []
+ args = ('login', 'stuff')
+ o = self.get_instance(args=args)
+ assert tuple(o.get_args()) == args
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.frontend.Command.get_options` method.
+ """
+ options = list(self.cls().get_options())
+ assert len(options) == 1
+ assert options[0].name == 'version'
+ options = ('verbose', 'debug')
+ o = self.get_instance(options=options)
+ assert len(tuple(o.get_options())) == 3
+ assert 'verbose' in tuple(o.get_options())
+ assert 'debug' in tuple(o.get_options())
+
+ def test_args(self):
+ """
+ Test the ``ipalib.frontend.Command.args`` instance attribute.
+ """
+ assert self.cls().args is None
+ o = self.cls()
+ o.finalize()
+ assert type(o.args) is plugable.NameSpace
+ assert len(o.args) == 0
+ args = ('destination', 'source?')
+ ns = self.get_instance(args=args).args
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == len(args)
+ assert list(ns) == ['destination', 'source']
+ assert type(ns.destination) is parameters.Str
+ assert type(ns.source) is parameters.Str
+ assert ns.destination.required is True
+ assert ns.destination.multivalue is False
+ assert ns.source.required is False
+ assert ns.source.multivalue is False
+
+ # Test TypeError:
+ e = raises(TypeError, self.get_instance, args=(u'whatever',))
+ assert str(e) == TYPE_ERROR % (
+ 'spec', (str, parameters.Param), u'whatever', unicode)
+
+ # Test ValueError, required after optional:
+ e = raises(ValueError, self.get_instance, args=('arg1?', 'arg2'))
+ assert str(e) == 'arg2: required argument after optional'
+
+ # Test ValueError, scalar after multivalue:
+ e = raises(ValueError, self.get_instance, args=('arg1+', 'arg2'))
+ assert str(e) == 'arg2: only final argument can be multivalue'
+
+ def test_max_args(self):
+ """
+ Test the ``ipalib.frontend.Command.max_args`` instance attribute.
+ """
+ o = self.get_instance()
+ assert o.max_args == 0
+ o = self.get_instance(args=('one?',))
+ assert o.max_args == 1
+ o = self.get_instance(args=('one', 'two?'))
+ assert o.max_args == 2
+ o = self.get_instance(args=('one', 'multi+',))
+ assert o.max_args is None
+ o = self.get_instance(args=('one', 'multi*',))
+ assert o.max_args is None
+
+ def test_options(self):
+ """
+ Test the ``ipalib.frontend.Command.options`` instance attribute.
+ """
+ assert self.cls().options is None
+ o = self.cls()
+ o.finalize()
+ assert type(o.options) is plugable.NameSpace
+ assert len(o.options) == 1
+ options = ('target', 'files*')
+ ns = self.get_instance(options=options).options
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == len(options) + 1
+ assert list(ns) == ['target', 'files', 'version']
+ assert type(ns.target) is parameters.Str
+ assert type(ns.files) is parameters.Str
+ assert ns.target.required is True
+ assert ns.target.multivalue is False
+ assert ns.files.required is False
+ assert ns.files.multivalue is True
+
+ def test_output(self):
+ """
+ Test the ``ipalib.frontend.Command.output`` instance attribute.
+ """
+ inst = self.cls()
+ assert inst.output is None
+ inst.finalize()
+ assert type(inst.output) is plugable.NameSpace
+ assert list(inst.output) == ['result']
+ assert type(inst.output.result) is output.Output
+
+ def test_iter_output(self):
+ """
+ Test the ``ipalib.frontend.Command._iter_output`` instance attribute.
+ """
+ class Example(self.cls):
+ pass
+ inst = Example()
+
+ inst.has_output = tuple()
+ assert list(inst._iter_output()) == []
+
+ wrong = ['hello', 'world']
+ inst.has_output = wrong
+ e = raises(TypeError, list, inst._iter_output())
+ assert str(e) == 'Example.has_output: need a %r; got a %r: %r' % (
+ tuple, list, wrong
+ )
+
+ wrong = ('hello', 17)
+ inst.has_output = wrong
+ e = raises(TypeError, list, inst._iter_output())
+ assert str(e) == 'Example.has_output[1]: need a %r; got a %r: %r' % (
+ (str, output.Output), int, 17
+ )
+
+ okay = ('foo', output.Output('bar'), 'baz')
+ inst.has_output = okay
+ items = list(inst._iter_output())
+ assert len(items) == 3
+ assert list(o.name for o in items) == ['foo', 'bar', 'baz']
+ for o in items:
+ assert type(o) is output.Output
+
+ def test_soft_validate(self):
+ """
+ Test the `ipalib.frontend.Command.soft_validate` method.
+ """
+ class user_add(frontend.Command):
+ takes_args = parameters.Str('uid',
+ normalizer=lambda value: value.lower(),
+ default_from=lambda givenname, sn: givenname[0] + sn,
+ )
+
+ takes_options = ('givenname', 'sn')
+
+ cmd = user_add()
+ cmd.env = config.Env(context='cli')
+ cmd.finalize()
+ assert list(cmd.params) == ['givenname', 'sn', 'uid', 'version']
+ ret = cmd.soft_validate({})
+ assert sorted(ret['values']) == ['version']
+ assert sorted(ret['errors']) == ['givenname', 'sn', 'uid']
+ assert cmd.soft_validate(dict(givenname=u'First', sn=u'Last')) == dict(
+ values=dict(givenname=u'First', sn=u'Last', uid=u'flast',
+ version=None),
+ errors=dict(),
+ )
+
+ def test_convert(self):
+ """
+ Test the `ipalib.frontend.Command.convert` method.
+ """
+ kw = dict(
+ option0=u'1.5',
+ option1=u'7',
+ )
+ o = self.subcls()
+ o.finalize()
+ for (key, value) in o.convert(**kw).iteritems():
+ assert_equal(unicode(kw[key]), value)
+
+ def test_normalize(self):
+ """
+ Test the `ipalib.frontend.Command.normalize` method.
+ """
+ kw = dict(
+ option0=u'OPTION0',
+ option1=u'OPTION1',
+ )
+ norm = dict((k, v.lower()) for (k, v) in kw.items())
+ sub = self.subcls()
+ sub.finalize()
+ assert sub.normalize(**kw) == norm
+
+ def test_get_default(self):
+ """
+ Test the `ipalib.frontend.Command.get_default` method.
+ """
+ # FIXME: Add an updated unit tests for get_default()
+
+ def test_default_from_chaining(self):
+ """
+ Test chaining of parameters through default_from.
+ """
+ class my_cmd(self.cls):
+ takes_options = (
+ Str('option0'),
+ Str('option1', default_from=lambda option0: option0),
+ Str('option2', default_from=lambda option1: option1),
+ )
+
+ def run(self, *args, **options):
+ return dict(result=options)
+
+ kw = dict(option0=u'some value')
+
+ (api, home) = create_test_api()
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ o.finalize()
+ e = o(**kw)
+ assert type(e) is dict
+ assert 'result' in e
+ assert 'option2' in e['result']
+ assert e['result']['option2'] == u'some value'
+
+ def test_validate(self):
+ """
+ Test the `ipalib.frontend.Command.validate` method.
+ """
+
+ sub = self.subcls()
+ sub.env = config.Env(context='cli')
+ sub.finalize()
+
+ # Check with valid values
+ okay = dict(
+ option0=u'option0',
+ option1=u'option1',
+ another_option='some value',
+ version=API_VERSION,
+ )
+ sub.validate(**okay)
+
+ # Check with an invalid value
+ fail = dict(okay)
+ fail['option0'] = u'whatever'
+ e = raises(errors.ValidationError, sub.validate, **fail)
+ assert_equal(e.name, 'option0')
+ assert_equal(e.value, u'whatever')
+ assert_equal(e.error, u"must equal 'option0'")
+ assert e.rule.__class__.__name__ == 'Rule'
+ assert e.index is None
+
+ # Check with a missing required arg
+ fail = dict(okay)
+ fail.pop('option1')
+ e = raises(errors.RequirementError, sub.validate, **fail)
+ assert e.name == 'option1'
+
+ def test_execute(self):
+ """
+ Test the `ipalib.frontend.Command.execute` method.
+ """
+ o = self.cls()
+ e = raises(NotImplementedError, o.execute)
+ assert str(e) == 'Command.execute()'
+
+ def test_args_options_2_params(self):
+ """
+ Test the `ipalib.frontend.Command.args_options_2_params` method.
+ """
+
+ # Test that ZeroArgumentError is raised:
+ o = self.get_instance()
+ e = raises(errors.ZeroArgumentError, o.args_options_2_params, 1)
+ assert e.name == 'example'
+
+ # Test that MaxArgumentError is raised (count=1)
+ o = self.get_instance(args=('one?',))
+ e = raises(errors.MaxArgumentError, o.args_options_2_params, 1, 2)
+ assert e.name == 'example'
+ assert e.count == 1
+ assert str(e) == "command 'example' takes at most 1 argument"
+
+ # Test that MaxArgumentError is raised (count=2)
+ o = self.get_instance(args=('one', 'two?'))
+ e = raises(errors.MaxArgumentError, o.args_options_2_params, 1, 2, 3)
+ assert e.name == 'example'
+ assert e.count == 2
+ assert str(e) == "command 'example' takes at most 2 arguments"
+
+ # Test that OptionError is raised when an extra option is given:
+ o = self.get_instance()
+ e = raises(errors.OptionError, o.args_options_2_params, bad_option=True)
+ assert e.option == 'bad_option'
+
+ # Test that OverlapError is raised:
+ o = self.get_instance(args=('one', 'two'), options=('three', 'four'))
+ e = raises(errors.OverlapError, o.args_options_2_params,
+ 1, 2, three=3, two=2, four=4, one=1)
+ assert e.names == ['one', 'two']
+
+ # Test the permutations:
+ o = self.get_instance(args=('one', 'two*'), options=('three', 'four'))
+ mthd = o.args_options_2_params
+ assert mthd() == dict()
+ assert mthd(1) == dict(one=1)
+ assert mthd(1, 2) == dict(one=1, two=(2,))
+ assert mthd(1, 21, 22, 23) == dict(one=1, two=(21, 22, 23))
+ assert mthd(1, (21, 22, 23)) == dict(one=1, two=(21, 22, 23))
+ assert mthd(three=3, four=4) == dict(three=3, four=4)
+ assert mthd(three=3, four=4, one=1, two=2) == \
+ dict(one=1, two=2, three=3, four=4)
+ assert mthd(1, 21, 22, 23, three=3, four=4) == \
+ dict(one=1, two=(21, 22, 23), three=3, four=4)
+ assert mthd(1, (21, 22, 23), three=3, four=4) == \
+ dict(one=1, two=(21, 22, 23), three=3, four=4)
+
+ def test_args_options_2_entry(self):
+ """
+ Test `ipalib.frontend.Command.args_options_2_entry` method.
+ """
+ class my_cmd(self.cls):
+ takes_args = (
+ parameters.Str('one', attribute=True),
+ parameters.Str('two', attribute=False),
+ )
+ takes_options = (
+ parameters.Str('three', attribute=True, multivalue=True),
+ parameters.Str('four', attribute=True, multivalue=False),
+ )
+
+ def run(self, *args, **kw):
+ return self.args_options_2_entry(*args, **kw)
+
+ args = ('one', 'two')
+ kw = dict(three=('three1', 'three2'), four='four')
+
+ (api, home) = create_test_api()
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ o.finalize()
+ e = o.run(*args, **kw)
+ assert type(e) is dict
+ assert 'one' in e
+ assert 'two' not in e
+ assert 'three' in e
+ assert 'four' in e
+ assert e['one'] == 'one'
+ assert e['three'] == ['three1', 'three2']
+ assert e['four'] == 'four'
+
+ def test_params_2_args_options(self):
+ """
+ Test the `ipalib.frontend.Command.params_2_args_options` method.
+ """
+ o = self.get_instance(args='one', options='two')
+ assert o.params_2_args_options() == ((None,), {})
+ assert o.params_2_args_options(one=1) == ((1,), {})
+ assert o.params_2_args_options(two=2) == ((None,), dict(two=2))
+ assert o.params_2_args_options(two=2, one=1) == ((1,), dict(two=2))
+
+ def test_run(self):
+ """
+ Test the `ipalib.frontend.Command.run` method.
+ """
+ class my_cmd(self.cls):
+ def execute(self, *args, **kw):
+ return ('execute', args, kw)
+
+ def forward(self, *args, **kw):
+ return ('forward', args, kw)
+
+ args = ('Hello,', 'world,')
+ kw = dict(how_are='you', on_this='fine day?', version=API_VERSION)
+
+ # Test in server context:
+ (api, home) = create_test_api(in_server=True)
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ assert o.run.im_func is self.cls.run.im_func
+ out = o.run(*args, **kw)
+ assert ('execute', args, kw) == out
+
+ # Test in non-server context
+ (api, home) = create_test_api(in_server=False)
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ assert o.run.im_func is self.cls.run.im_func
+ assert ('forward', args, kw) == o.run(*args, **kw)
+
+ def test_messages(self):
+ """
+ Test correct handling of messages
+ """
+ class TestMessage(messages.PublicMessage):
+ type = 'info'
+ format = 'This is a message.'
+ errno = 1234
+
+ class my_cmd(self.cls):
+ def execute(self, *args, **kw):
+ result = {'name': 'execute'}
+ messages.add_message(kw['version'], result, TestMessage())
+ return result
+
+ def forward(self, *args, **kw):
+ result = {'name': 'forward'}
+ messages.add_message(kw['version'], result, TestMessage())
+ return result
+
+ args = ('Hello,', 'world,')
+ kw = dict(how_are='you', on_this='fine day?', version=API_VERSION)
+
+ expected = [TestMessage().to_dict()]
+
+ # Test in server context:
+ (api, home) = create_test_api(in_server=True)
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ assert o.run.im_func is self.cls.run.im_func
+ assert {'name': 'execute', 'messages': expected} == o.run(*args, **kw)
+
+ # Test in non-server context
+ (api, home) = create_test_api(in_server=False)
+ api.finalize()
+ o = my_cmd()
+ o.set_api(api)
+ assert o.run.im_func is self.cls.run.im_func
+ assert {'name': 'forward', 'messages': expected} == o.run(*args, **kw)
+
+ def test_validate_output_basic(self):
+ """
+ Test the `ipalib.frontend.Command.validate_output` method.
+ """
+ class Example(self.cls):
+ has_output = ('foo', 'bar', 'baz')
+
+ inst = Example()
+ inst.finalize()
+
+ # Test with wrong type:
+ wrong = ('foo', 'bar', 'baz')
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == '%s.validate_output(): need a %r; got a %r: %r' % (
+ 'Example', dict, tuple, wrong
+ )
+
+ # Test with a missing keys:
+ wrong = dict(bar='hello')
+ e = raises(ValueError, inst.validate_output, wrong)
+ assert str(e) == '%s.validate_output(): missing keys %r in %r' % (
+ 'Example', ['baz', 'foo'], wrong
+ )
+
+ # Test with extra keys:
+ wrong = dict(foo=1, bar=2, baz=3, fee=4, azz=5)
+ e = raises(ValueError, inst.validate_output, wrong)
+ assert str(e) == '%s.validate_output(): unexpected keys %r in %r' % (
+ 'Example', ['azz', 'fee'], wrong
+ )
+
+ # Test with different keys:
+ wrong = dict(baz=1, xyzzy=2, quux=3)
+ e = raises(ValueError, inst.validate_output, wrong)
+ assert str(e) == '%s.validate_output(): missing keys %r in %r' % (
+ 'Example', ['bar', 'foo'], wrong
+ ), str(e)
+
+ def test_validate_output_per_type(self):
+ """
+ Test `ipalib.frontend.Command.validate_output` per-type validation.
+ """
+
+ class Complex(self.cls):
+ has_output = (
+ output.Output('foo', int),
+ output.Output('bar', list),
+ )
+ inst = Complex()
+ inst.finalize()
+
+ wrong = dict(foo=17.9, bar=[18])
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == '%s:\n output[%r]: need %r; got %r: %r' % (
+ 'Complex.validate_output()', 'foo', int, float, 17.9
+ )
+
+ wrong = dict(foo=18, bar=17)
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == '%s:\n output[%r]: need %r; got %r: %r' % (
+ 'Complex.validate_output()', 'bar', list, int, 17
+ )
+
+ def test_validate_output_nested(self):
+ """
+ Test `ipalib.frontend.Command.validate_output` nested validation.
+ """
+
+ class Subclass(output.ListOfEntries):
+ pass
+
+ # Test nested validation:
+ class nested(self.cls):
+ has_output = (
+ output.Output('hello', int),
+ Subclass('world'),
+ )
+ inst = nested()
+ inst.finalize()
+ okay = dict(foo='bar')
+ nope = ('aye', 'bee')
+
+ wrong = dict(hello=18, world=[okay, nope, okay])
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == output.emsg % (
+ 'nested', 'Subclass', 'world', 1, dict, tuple, nope
+ )
+
+ wrong = dict(hello=18, world=[okay, okay, okay, okay, nope])
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == output.emsg % (
+ 'nested', 'Subclass', 'world', 4, dict, tuple, nope
+ )
+
+ def test_get_output_params(self):
+ """
+ Test the `ipalib.frontend.Command.get_output_params` method.
+ """
+ class example(self.cls):
+ has_output_params = (
+ 'one',
+ 'two',
+ 'three',
+ )
+ takes_args = (
+ 'foo',
+ )
+ takes_options = (
+ Str('bar', flags='no_output'),
+ 'baz',
+ )
+
+ inst = example()
+ assert list(inst.get_output_params()) == ['one', 'two', 'three']
+ inst.finalize()
+ assert list(inst.get_output_params()) == [
+ 'one', 'two', 'three', inst.params.foo, inst.params.baz
+ ]
+ assert list(inst.output_params) == ['one', 'two', 'three', 'foo', 'baz']
+
+
+class test_LocalOrRemote(ClassChecker):
+ """
+ Test the `ipalib.frontend.LocalOrRemote` class.
+ """
+ _cls = frontend.LocalOrRemote
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.LocalOrRemote.__init__` method.
+ """
+ o = self.cls()
+ o.finalize()
+ assert list(o.args) == []
+ assert list(o.options) == ['server', 'version']
+ op = o.options.server
+ assert op.required is False
+ assert op.default is False
+
+ def test_run(self):
+ """
+ Test the `ipalib.frontend.LocalOrRemote.run` method.
+ """
+ class example(self.cls):
+ takes_args = 'key?'
+
+ def forward(self, *args, **options):
+ return dict(result=('forward', args, options))
+
+ def execute(self, *args, **options):
+ return dict(result=('execute', args, options))
+
+ # Test when in_server=False:
+ (api, home) = create_test_api(in_server=False)
+ api.register(example)
+ api.finalize()
+ cmd = api.Command.example
+ assert cmd(version=u'2.47') == dict(
+ result=('execute', (None,), dict(version=u'2.47', server=False))
+ )
+ assert cmd(u'var', version=u'2.47') == dict(
+ result=('execute', (u'var',), dict(version=u'2.47', server=False))
+ )
+ assert cmd(server=True, version=u'2.47') == dict(
+ result=('forward', (None,), dict(version=u'2.47', server=True))
+ )
+ assert cmd(u'var', server=True, version=u'2.47') == dict(
+ result=('forward', (u'var',), dict(version=u'2.47', server=True))
+ )
+
+ # Test when in_server=True (should always call execute):
+ (api, home) = create_test_api(in_server=True)
+ api.register(example)
+ api.finalize()
+ cmd = api.Command.example
+ assert cmd(version=u'2.47') == dict(
+ result=('execute', (None,), dict(version=u'2.47', server=False))
+ )
+ assert cmd(u'var', version=u'2.47') == dict(
+ result=('execute', (u'var',), dict(version=u'2.47', server=False))
+ )
+ assert cmd(server=True, version=u'2.47') == dict(
+ result=('execute', (None,), dict(version=u'2.47', server=True))
+ )
+ assert cmd(u'var', server=True, version=u'2.47') == dict(
+ result=('execute', (u'var',), dict(version=u'2.47', server=True))
+ )
+
+
+class test_Object(ClassChecker):
+ """
+ Test the `ipalib.frontend.Object` class.
+ """
+ _cls = frontend.Object
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Object` class.
+ """
+ assert self.cls.backend is None
+ assert self.cls.methods is None
+ assert self.cls.properties is None
+ assert self.cls.params is None
+ assert self.cls.params_minus_pk is None
+ assert self.cls.takes_params == tuple()
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Object.__init__` method.
+ """
+ o = self.cls()
+ assert o.backend is None
+ assert o.methods is None
+ assert o.properties is None
+ assert o.params is None
+ assert o.params_minus_pk is None
+ assert o.properties is None
+
+ def test_set_api(self):
+ """
+ Test the `ipalib.frontend.Object.set_api` method.
+ """
+ # Setup for test:
+ class DummyAttribute(object):
+ def __init__(self, obj_name, attr_name, name=None):
+ self.obj_name = obj_name
+ self.attr_name = attr_name
+ if name is None:
+ self.name = '%s_%s' % (obj_name, attr_name)
+ else:
+ self.name = name
+ self.param = frontend.create_param(attr_name)
+
+ def __clone__(self, attr_name):
+ return self.__class__(
+ self.obj_name,
+ self.attr_name,
+ getattr(self, attr_name)
+ )
+
+ def get_attributes(cnt, format):
+ for name in ['other', 'user', 'another']:
+ for i in xrange(cnt):
+ yield DummyAttribute(name, format % i)
+
+ cnt = 10
+ formats = dict(
+ methods='method_%d',
+ properties='property_%d',
+ )
+
+
+ _d = dict(
+ Method=plugable.NameSpace(
+ get_attributes(cnt, formats['methods'])
+ ),
+ Property=plugable.NameSpace(
+ get_attributes(cnt, formats['properties'])
+ ),
+ )
+ api = plugable.MagicDict(_d)
+ assert len(api.Method) == cnt * 3
+ assert len(api.Property) == cnt * 3
+
+ class user(self.cls):
+ pass
+
+ # Actually perform test:
+ o = user()
+ o.set_api(api)
+ assert read_only(o, 'api') is api
+ for name in ['methods', 'properties']:
+ namespace = getattr(o, name)
+ assert isinstance(namespace, plugable.NameSpace)
+ assert len(namespace) == cnt
+ f = formats[name]
+ for i in xrange(cnt):
+ attr_name = f % i
+ attr = namespace[attr_name]
+ assert isinstance(attr, DummyAttribute)
+ assert attr is getattr(namespace, attr_name)
+ assert attr.obj_name == 'user'
+ assert attr.attr_name == attr_name
+ assert attr.name == '%s_%s' % ('user', attr_name)
+
+ # Test params instance attribute
+ o = self.cls()
+ o.set_api(api)
+ ns = o.params
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == 0
+ class example(self.cls):
+ takes_params = ('banana', 'apple')
+ o = example()
+ o.set_api(api)
+ ns = o.params
+ assert type(ns) is plugable.NameSpace
+ assert len(ns) == 2, repr(ns)
+ assert list(ns) == ['banana', 'apple']
+ for p in ns():
+ assert type(p) is parameters.Str
+ assert p.required is True
+ assert p.multivalue is False
+
+ def test_primary_key(self):
+ """
+ Test the `ipalib.frontend.Object.primary_key` attribute.
+ """
+ (api, home) = create_test_api()
+ api.finalize()
+
+ # Test with no primary keys:
+ class example1(self.cls):
+ takes_params = (
+ 'one',
+ 'two',
+ )
+ o = example1()
+ o.set_api(api)
+ assert o.primary_key is None
+
+ # Test with 1 primary key:
+ class example2(self.cls):
+ takes_params = (
+ 'one',
+ 'two',
+ parameters.Str('three', primary_key=True),
+ 'four',
+ )
+ o = example2()
+ o.set_api(api)
+ pk = o.primary_key
+ assert type(pk) is parameters.Str
+ assert pk.name == 'three'
+ assert pk.primary_key is True
+ assert o.params[2] is o.primary_key
+ assert isinstance(o.params_minus_pk, plugable.NameSpace)
+ assert list(o.params_minus_pk) == ['one', 'two', 'four']
+
+ # Test with multiple primary_key:
+ class example3(self.cls):
+ takes_params = (
+ parameters.Str('one', primary_key=True),
+ parameters.Str('two', primary_key=True),
+ 'three',
+ parameters.Str('four', primary_key=True),
+ )
+ o = example3()
+ o.set_api(api)
+ e = raises(ValueError, o.finalize)
+ assert str(e) == \
+ 'example3 (Object) has multiple primary keys: one, two, four'
+
+ def test_backend(self):
+ """
+ Test the `ipalib.frontend.Object.backend` attribute.
+ """
+ (api, home) = create_test_api()
+ class ldap(backend.Backend):
+ whatever = 'It worked!'
+ api.register(ldap)
+ class user(frontend.Object):
+ backend_name = 'ldap'
+ api.register(user)
+ api.finalize()
+ b = api.Object.user.backend
+ assert isinstance(b, ldap)
+ assert b.whatever == 'It worked!'
+
+ def test_get_dn(self):
+ """
+ Test the `ipalib.frontend.Object.get_dn` method.
+ """
+ o = self.cls()
+ e = raises(NotImplementedError, o.get_dn, 'primary key')
+ assert str(e) == 'Object.get_dn()'
+ class user(self.cls):
+ pass
+ o = user()
+ e = raises(NotImplementedError, o.get_dn, 'primary key')
+ assert str(e) == 'user.get_dn()'
+
+ def test_params_minus(self):
+ """
+ Test the `ipalib.frontend.Object.params_minus` method.
+ """
+ class example(self.cls):
+ takes_params = ('one', 'two', 'three', 'four')
+ o = example()
+ (api, home) = create_test_api()
+ o.set_api(api)
+ p = o.params
+ assert tuple(o.params_minus()) == tuple(p())
+ assert tuple(o.params_minus([])) == tuple(p())
+ assert tuple(o.params_minus('two', 'three')) == (p.one, p.four)
+ assert tuple(o.params_minus(['two', 'three'])) == (p.one, p.four)
+ assert tuple(o.params_minus(p.two, p.three)) == (p.one, p.four)
+ assert tuple(o.params_minus([p.two, p.three])) == (p.one, p.four)
+ ns = NameSpace([p.two, p.three])
+ assert tuple(o.params_minus(ns)) == (p.one, p.four)
+
+
+class test_Attribute(ClassChecker):
+ """
+ Test the `ipalib.frontend.Attribute` class.
+ """
+ _cls = frontend.Attribute
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Attribute` class.
+ """
+ assert self.cls.__bases__ == (plugable.Plugin,)
+ assert type(self.cls.obj) is property
+ assert type(self.cls.obj_name) is property
+ assert type(self.cls.attr_name) is property
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Attribute.__init__` method.
+ """
+ class user_add(self.cls):
+ pass
+ o = user_add()
+ assert read_only(o, 'obj') is None
+ assert read_only(o, 'obj_name') == 'user'
+ assert read_only(o, 'attr_name') == 'add'
+
+ def test_set_api(self):
+ """
+ Test the `ipalib.frontend.Attribute.set_api` method.
+ """
+ user_obj = 'The user frontend.Object instance'
+ class api(object):
+ Object = dict(user=user_obj)
+ class user_add(self.cls):
+ pass
+ o = user_add()
+ assert read_only(o, 'api') is None
+ assert read_only(o, 'obj') is None
+ o.set_api(api)
+ assert read_only(o, 'api') is api
+ assert read_only(o, 'obj') is user_obj
+
+
+class test_Method(ClassChecker):
+ """
+ Test the `ipalib.frontend.Method` class.
+ """
+ _cls = frontend.Method
+
+ def get_api(self, args=tuple(), options=tuple()):
+ """
+ Return a finalized `ipalib.plugable.API` instance.
+ """
+ (api, home) = create_test_api()
+ class user(frontend.Object):
+ takes_params = (
+ 'givenname',
+ 'sn',
+ frontend.Param('uid', primary_key=True),
+ 'initials',
+ )
+ class user_verb(self.cls):
+ takes_args = args
+ takes_options = options
+ api.register(user)
+ api.register(user_verb)
+ api.finalize()
+ return api
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Method` class.
+ """
+ assert self.cls.__bases__ == (frontend.Attribute, frontend.Command)
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Method.__init__` method.
+ """
+ class user_add(self.cls):
+ pass
+ o = user_add()
+ assert o.name == 'user_add'
+ assert o.obj_name == 'user'
+ assert o.attr_name == 'add'
+
+
+class test_Property(ClassChecker):
+ """
+ Test the `ipalib.frontend.Property` class.
+ """
+ _cls = frontend.Property
+
+ def get_subcls(self):
+ """
+ Return a standard subclass of `ipalib.frontend.Property`.
+ """
+ class user_givenname(self.cls):
+ 'User first name'
+
+ @frontend.rule
+ def rule0_lowercase(self, value):
+ if not value.islower():
+ return 'Must be lowercase'
+ return user_givenname
+
+ def test_class(self):
+ """
+ Test the `ipalib.frontend.Property` class.
+ """
+ assert self.cls.__bases__ == (frontend.Attribute,)
+ assert self.cls.klass is parameters.Str
+
+ def test_init(self):
+ """
+ Test the `ipalib.frontend.Property.__init__` method.
+ """
+ o = self.subcls()
+ assert len(o.rules) == 1
+ assert o.rules[0].__name__ == 'rule0_lowercase'
+ param = o.param
+ assert isinstance(param, parameters.Str)
+ assert param.name == 'givenname'
+ assert unicode(param.doc) == u'User first name'
diff --git a/ipatests/test_ipalib/test_messages.py b/ipatests/test_ipalib/test_messages.py
new file mode 100644
index 000000000..686bf8dd5
--- /dev/null
+++ b/ipatests/test_ipalib/test_messages.py
@@ -0,0 +1,89 @@
+# Authors:
+# Petr Viktorin <pviktori@redhat.com>
+#
+# Copyright (C) 1012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.messages` module.
+"""
+
+from ipalib import messages
+from ipalib.capabilities import capabilities
+from ipatests.test_ipalib import test_errors
+
+
+class HelloMessage(messages.PublicMessage):
+ type = 'info'
+ format = '%(greeting)s, %(object)s!'
+ errno = 1234
+
+
+class test_PublicMessage(test_errors.test_PublicError):
+ """Test public messages"""
+ # The messages are a lot like public errors; defer testing to that.
+ klass = messages.PublicMessage
+ required_classes = (UserWarning, messages.PublicMessage)
+
+
+class test_PublicMessages(test_errors.BaseMessagesTest):
+ message_list = messages.public_messages
+ errno_range = xrange(10000, 19999)
+ required_classes = (UserWarning, messages.PublicMessage)
+ texts = messages._texts
+
+ def extratest(self, cls):
+ if cls is not messages.PublicMessage:
+ assert cls.type in ('debug', 'info', 'warning', 'error')
+
+
+def test_to_dict():
+ expected = dict(
+ name='HelloMessage',
+ type='info',
+ message='Hello, world!',
+ code=1234,
+ )
+
+ assert HelloMessage(greeting='Hello', object='world').to_dict() == expected
+
+
+def test_add_message():
+ result = {}
+
+ assert capabilities['messages'] == u'2.52'
+
+ messages.add_message(u'2.52', result,
+ HelloMessage(greeting='Hello', object='world'))
+ messages.add_message(u'2.1', result,
+ HelloMessage(greeting="'Lo", object='version'))
+ messages.add_message(u'2.60', result,
+ HelloMessage(greeting='Hi', object='version'))
+
+ assert result == {'messages': [
+ dict(
+ name='HelloMessage',
+ type='info',
+ message='Hello, world!',
+ code=1234,
+ ),
+ dict(
+ name='HelloMessage',
+ type='info',
+ message='Hi, version!',
+ code=1234,
+ )
+ ]}
diff --git a/ipatests/test_ipalib/test_output.py b/ipatests/test_ipalib/test_output.py
new file mode 100644
index 000000000..15ef11e10
--- /dev/null
+++ b/ipatests/test_ipalib/test_output.py
@@ -0,0 +1,89 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.output` module.
+"""
+
+from ipatests.util import raises, ClassChecker
+from ipalib import output
+from ipalib.frontend import Command
+from ipalib import _
+
+class test_Output(ClassChecker):
+ """
+ Test the `ipalib.output.Output` class.
+ """
+
+ _cls = output.Output
+
+ def test_init(self):
+ """
+ Test the `ipalib.output.Output.__init__` method.
+ """
+ o = self.cls('result')
+ assert o.name == 'result'
+ assert o.type is None
+ assert o.doc is None
+
+ def test_repr(self):
+ """
+ Test the `ipalib.output.Output.__repr__` method.
+ """
+ o = self.cls('aye')
+ assert repr(o) == "Output('aye', None, None)"
+ o = self.cls('aye', type=int, doc='An A, aye?')
+ assert repr(o) == "Output('aye', %r, 'An A, aye?')" % int
+
+ class Entry(self.cls):
+ pass
+ o = Entry('aye')
+ assert repr(o) == "Entry('aye', None, None)"
+ o = Entry('aye', type=int, doc='An A, aye?')
+ assert repr(o) == "Entry('aye', %r, 'An A, aye?')" % int
+
+
+class test_ListOfEntries(ClassChecker):
+ """
+ Test the `ipalib.output.ListOfEntries` class.
+ """
+
+ _cls = output.ListOfEntries
+
+ def test_validate(self):
+ """
+ Test the `ipalib.output.ListOfEntries.validate` method.
+ """
+ class example(Command):
+ pass
+ cmd = example()
+ inst = self.cls('stuff')
+
+ okay = dict(foo='bar')
+ nope = ('aye', 'bee')
+
+ e = raises(TypeError, inst.validate, cmd, [okay, okay, nope])
+ assert str(e) == output.emsg % (
+ 'example', 'ListOfEntries', 'stuff', 2, dict, tuple, nope
+ )
+
+ e = raises(TypeError, inst.validate, cmd, [nope, okay, nope])
+ assert str(e) == output.emsg % (
+ 'example', 'ListOfEntries', 'stuff', 0, dict, tuple, nope
+ )
diff --git a/ipatests/test_ipalib/test_parameters.py b/ipatests/test_ipalib/test_parameters.py
new file mode 100644
index 000000000..71acfce71
--- /dev/null
+++ b/ipatests/test_ipalib/test_parameters.py
@@ -0,0 +1,1533 @@
+# -*- coding: utf-8 -*-
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.parameters` module.
+"""
+
+import re
+import sys
+from types import NoneType
+from decimal import Decimal
+from inspect import isclass
+from ipatests.util import raises, ClassChecker, read_only
+from ipatests.util import dummy_ugettext, assert_equal
+from ipatests.data import binary_bytes, utf8_bytes, unicode_str
+from ipalib import parameters, text, errors, config
+from ipalib.constants import TYPE_ERROR, CALLABLE_ERROR, NULLS
+from ipalib.errors import ValidationError, ConversionError
+from ipalib import _
+from xmlrpclib import MAXINT, MININT
+
+class test_DefaultFrom(ClassChecker):
+ """
+ Test the `ipalib.parameters.DefaultFrom` class.
+ """
+ _cls = parameters.DefaultFrom
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.DefaultFrom.__init__` method.
+ """
+ def callback(*args):
+ return args
+ keys = ('givenname', 'sn')
+ o = self.cls(callback, *keys)
+ assert read_only(o, 'callback') is callback
+ assert read_only(o, 'keys') == keys
+ lam = lambda first, last: first[0] + last
+ o = self.cls(lam)
+ assert read_only(o, 'keys') == ('first', 'last')
+
+ # Test that TypeError is raised when callback isn't callable:
+ e = raises(TypeError, self.cls, 'whatever')
+ assert str(e) == CALLABLE_ERROR % ('callback', 'whatever', str)
+
+ # Test that TypeError is raised when a key isn't an str:
+ e = raises(TypeError, self.cls, callback, 'givenname', 17)
+ assert str(e) == TYPE_ERROR % ('keys', str, 17, int)
+
+ # Test that ValueError is raised when inferring keys from a callback
+ # which has *args:
+ e = raises(ValueError, self.cls, lambda foo, *args: None)
+ assert str(e) == "callback: variable-length argument list not allowed"
+
+ # Test that ValueError is raised when inferring keys from a callback
+ # which has **kwargs:
+ e = raises(ValueError, self.cls, lambda foo, **kwargs: None)
+ assert str(e) == "callback: variable-length argument list not allowed"
+
+ def test_repr(self):
+ """
+ Test the `ipalib.parameters.DefaultFrom.__repr__` method.
+ """
+ def stuff(one, two):
+ pass
+
+ o = self.cls(stuff)
+ assert repr(o) == "DefaultFrom(stuff, 'one', 'two')"
+
+ o = self.cls(stuff, 'aye', 'bee', 'see')
+ assert repr(o) == "DefaultFrom(stuff, 'aye', 'bee', 'see')"
+
+ cb = lambda first, last: first[0] + last
+
+ o = self.cls(cb)
+ assert repr(o) == "DefaultFrom(<lambda>, 'first', 'last')"
+
+ o = self.cls(cb, 'aye', 'bee', 'see')
+ assert repr(o) == "DefaultFrom(<lambda>, 'aye', 'bee', 'see')"
+
+ def test_call(self):
+ """
+ Test the `ipalib.parameters.DefaultFrom.__call__` method.
+ """
+ def callback(givenname, sn):
+ return givenname[0] + sn[0]
+ keys = ('givenname', 'sn')
+ o = self.cls(callback, *keys)
+ kw = dict(
+ givenname='John',
+ sn='Public',
+ hello='world',
+ )
+ assert o(**kw) == 'JP'
+ assert o() is None
+ for key in ('givenname', 'sn'):
+ kw_copy = dict(kw)
+ del kw_copy[key]
+ assert o(**kw_copy) is None
+
+ # Test using implied keys:
+ o = self.cls(lambda first, last: first[0] + last)
+ assert o(first='john', last='doe') == 'jdoe'
+ assert o(first='', last='doe') is None
+ assert o(one='john', two='doe') is None
+
+ # Test that co_varnames slice is used:
+ def callback2(first, last):
+ letter = first[0]
+ return letter + last
+ o = self.cls(callback2)
+ assert o.keys == ('first', 'last')
+ assert o(first='john', last='doe') == 'jdoe'
+
+
+def test_parse_param_spec():
+ """
+ Test the `ipalib.parameters.parse_param_spec` function.
+ """
+ f = parameters.parse_param_spec
+ assert f('name') == ('name', dict(required=True, multivalue=False))
+ assert f('name?') == ('name', dict(required=False, multivalue=False))
+ assert f('name*') == ('name', dict(required=False, multivalue=True))
+ assert f('name+') == ('name', dict(required=True, multivalue=True))
+
+ # Make sure other "funny" endings are *not* treated special:
+ assert f('name^') == ('name^', dict(required=True, multivalue=False))
+
+ # Test that TypeError is raised if spec isn't an str:
+ e = raises(TypeError, f, u'name?')
+ assert str(e) == TYPE_ERROR % ('spec', str, u'name?', unicode)
+
+
+class DummyRule(object):
+ def __init__(self, error=None):
+ assert error is None or type(error) is unicode
+ self.error = error
+ self.reset()
+
+ def __call__(self, *args):
+ self.calls.append(args)
+ return self.error
+
+ def reset(self):
+ self.calls = []
+
+
+class test_Param(ClassChecker):
+ """
+ Test the `ipalib.parameters.Param` class.
+ """
+ _cls = parameters.Param
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Param.__init__` method.
+ """
+ name = 'my_param'
+ o = self.cls(name)
+ assert o.param_spec is name
+ assert o.name is name
+ assert o.nice == "Param('my_param')"
+ assert o.password is False
+ assert o.__islocked__() is True
+
+ # Test default rules:
+ assert o.rules == tuple()
+ assert o.class_rules == tuple()
+ assert o.all_rules == tuple()
+
+ # Test default kwarg values:
+ assert o.cli_name is name
+ assert o.label.msg == 'my_param'
+ assert o.doc.msg == 'my_param'
+ assert o.required is True
+ assert o.multivalue is False
+ assert o.primary_key is False
+ assert o.normalizer is None
+ assert o.default is None
+ assert o.default_from is None
+ assert o.autofill is False
+ assert o.query is False
+ assert o.attribute is False
+ assert o.include is None
+ assert o.exclude is None
+ assert o.flags == frozenset()
+ assert o.sortorder == 2
+ assert o.csv is False
+
+ # Test that doc defaults from label:
+ o = self.cls('my_param', doc=_('Hello world'))
+ assert o.label.msg == 'my_param'
+ assert o.doc.msg == 'Hello world'
+
+ o = self.cls('my_param', label='My Param')
+ assert o.label == 'My Param'
+ assert o.doc == 'My Param'
+
+
+ # Test that ValueError is raised when a kwarg from a subclass
+ # conflicts with an attribute:
+ class Subclass(self.cls):
+ kwargs = self.cls.kwargs + (
+ ('convert', callable, None),
+ )
+ e = raises(ValueError, Subclass, name)
+ assert str(e) == "kwarg 'convert' conflicts with attribute on Subclass"
+
+ # Test type validation of keyword arguments:
+ class Subclass(self.cls):
+ kwargs = self.cls.kwargs + (
+ ('extra1', bool, True),
+ ('extra2', str, 'Hello'),
+ ('extra3', (int, float), 42),
+ ('extra4', callable, lambda whatever: whatever + 7),
+ )
+ o = Subclass('my_param') # Test with no **kw:
+ for (key, kind, default) in o.kwargs:
+ # Test with a type invalid for all:
+ value = object()
+ kw = {key: value}
+ e = raises(TypeError, Subclass, 'my_param', **kw)
+ if kind is callable:
+ assert str(e) == CALLABLE_ERROR % (key, value, type(value))
+ else:
+ assert str(e) == TYPE_ERROR % (key, kind, value, type(value))
+ # Test with None:
+ kw = {key: None}
+ Subclass('my_param', **kw)
+
+ # Test when using unknown kwargs:
+ e = raises(TypeError, self.cls, 'my_param',
+ flags=['hello', 'world'],
+ whatever=u'Hooray!',
+ )
+ assert str(e) == \
+ "Param('my_param'): takes no such kwargs: 'whatever'"
+ e = raises(TypeError, self.cls, 'my_param', great='Yes', ape='he is!')
+ assert str(e) == \
+ "Param('my_param'): takes no such kwargs: 'ape', 'great'"
+
+ # Test that ValueError is raised if you provide both include and
+ # exclude:
+ e = raises(ValueError, self.cls, 'my_param',
+ include=['server', 'foo'],
+ exclude=['client', 'bar'],
+ )
+ assert str(e) == '%s: cannot have both %s=%r and %s=%r' % (
+ "Param('my_param')",
+ 'include', frozenset(['server', 'foo']),
+ 'exclude', frozenset(['client', 'bar']),
+ )
+
+ # Test that ValueError is raised if csv is set and multivalue is not set:
+ e = raises(ValueError, self.cls, 'my_param', csv=True)
+ assert str(e) == '%s: cannot have csv without multivalue' % "Param('my_param')"
+
+ # Test that default_from gets set:
+ call = lambda first, last: first[0] + last
+ o = self.cls('my_param', default_from=call)
+ assert type(o.default_from) is parameters.DefaultFrom
+ assert o.default_from.callback is call
+
+ def test_repr(self):
+ """
+ Test the `ipalib.parameters.Param.__repr__` method.
+ """
+ for name in ['name', 'name?', 'name*', 'name+']:
+ o = self.cls(name)
+ assert repr(o) == 'Param(%r)' % name
+ o = self.cls('name', required=False)
+ assert repr(o) == "Param('name', required=False)"
+ o = self.cls('name', multivalue=True)
+ assert repr(o) == "Param('name', multivalue=True)"
+
+ def test_use_in_context(self):
+ """
+ Test the `ipalib.parameters.Param.use_in_context` method.
+ """
+ set1 = ('one', 'two', 'three')
+ set2 = ('four', 'five', 'six')
+ param1 = self.cls('param1')
+ param2 = self.cls('param2', include=set1)
+ param3 = self.cls('param3', exclude=set2)
+ for context in set1:
+ env = config.Env()
+ env.context = context
+ assert param1.use_in_context(env) is True, context
+ assert param2.use_in_context(env) is True, context
+ assert param3.use_in_context(env) is True, context
+ for context in set2:
+ env = config.Env()
+ env.context = context
+ assert param1.use_in_context(env) is True, context
+ assert param2.use_in_context(env) is False, context
+ assert param3.use_in_context(env) is False, context
+
+ def test_safe_value(self):
+ """
+ Test the `ipalib.parameters.Param.safe_value` method.
+ """
+ values = (unicode_str, binary_bytes, utf8_bytes)
+ o = self.cls('my_param')
+ for value in values:
+ assert o.safe_value(value) is value
+ assert o.safe_value(None) is None
+ p = parameters.Password('my_passwd')
+ for value in values:
+ assert_equal(p.safe_value(value), u'********')
+ assert p.safe_value(None) is None
+
+ def test_clone(self):
+ """
+ Test the `ipalib.parameters.Param.clone` method.
+ """
+ # Test with the defaults
+ orig = self.cls('my_param')
+ clone = orig.clone()
+ assert clone is not orig
+ assert type(clone) is self.cls
+ assert clone.name is orig.name
+ for (key, kind, default) in self.cls.kwargs:
+ assert getattr(clone, key) is getattr(orig, key)
+
+ # Test with a param spec:
+ orig = self.cls('my_param*')
+ assert orig.param_spec == 'my_param*'
+ clone = orig.clone()
+ assert clone.param_spec == 'my_param'
+ assert clone is not orig
+ assert type(clone) is self.cls
+ for (key, kind, default) in self.cls.kwargs:
+ assert getattr(clone, key) is getattr(orig, key)
+
+ # Test with overrides:
+ orig = self.cls('my_param*')
+ assert orig.required is False
+ assert orig.multivalue is True
+ clone = orig.clone(required=True)
+ assert clone is not orig
+ assert type(clone) is self.cls
+ assert clone.required is True
+ assert clone.multivalue is True
+ assert clone.param_spec == 'my_param'
+ assert clone.name == 'my_param'
+
+ def test_clone_rename(self):
+ """
+ Test the `ipalib.parameters.Param.clone` method.
+ """
+ new_name = 'my_new_param'
+
+ # Test with the defaults
+ orig = self.cls('my_param')
+ clone = orig.clone_rename(new_name)
+ assert clone is not orig
+ assert type(clone) is self.cls
+ assert clone.name == new_name
+ for (key, kind, default) in self.cls.kwargs:
+ assert getattr(clone, key) is getattr(orig, key)
+
+ # Test with overrides:
+ orig = self.cls('my_param*')
+ assert orig.required is False
+ assert orig.multivalue is True
+ clone = orig.clone_rename(new_name, required=True)
+ assert clone is not orig
+ assert type(clone) is self.cls
+ assert clone.required is True
+ assert clone.multivalue is True
+ assert clone.param_spec == new_name
+ assert clone.name == new_name
+
+
+ def test_convert(self):
+ """
+ Test the `ipalib.parameters.Param.convert` method.
+ """
+ okay = ('Hello', u'Hello', 0, 4.2, True, False, unicode_str)
+ class Subclass(self.cls):
+ def _convert_scalar(self, value, index=None):
+ return value
+
+ # Test when multivalue=False:
+ o = Subclass('my_param')
+ for value in NULLS:
+ assert o.convert(value) is None
+ assert o.convert(None) is None
+ for value in okay:
+ assert o.convert(value) is value
+
+ # Test when multivalue=True:
+ o = Subclass('my_param', multivalue=True)
+ for value in NULLS:
+ assert o.convert(value) is None
+ assert o.convert(okay) == okay
+ assert o.convert(NULLS) is None
+ assert o.convert(okay + NULLS) == okay
+ assert o.convert(NULLS + okay) == okay
+ for value in okay:
+ assert o.convert(value) == (value,)
+ assert o.convert([None, value]) == (value,)
+ assert o.convert([value, None]) == (value,)
+
+ def test_convert_scalar(self):
+ """
+ Test the `ipalib.parameters.Param._convert_scalar` method.
+ """
+ dummy = dummy_ugettext()
+
+ # Test with correct type:
+ o = self.cls('my_param')
+ assert o._convert_scalar(None) is None
+ assert dummy.called() is False
+ # Test with incorrect type
+ e = raises(errors.ConversionError, o._convert_scalar, 'hello', index=17)
+
+ def test_validate(self):
+ """
+ Test the `ipalib.parameters.Param.validate` method.
+ """
+
+ # Test in default state (with no rules, no kwarg):
+ o = self.cls('my_param')
+ e = raises(errors.RequirementError, o.validate, None, 'cli')
+ assert e.name == 'my_param'
+
+ # Test in default state that cli_name gets returned in the exception
+ # when context == 'cli'
+ o = self.cls('my_param', cli_name='short')
+ e = raises(errors.RequirementError, o.validate, None, 'cli')
+ assert e.name == 'short'
+
+ # Test with required=False
+ o = self.cls('my_param', required=False)
+ assert o.required is False
+ assert o.validate(None, 'cli') is None
+
+ # Test with query=True:
+ o = self.cls('my_param', query=True)
+ assert o.query is True
+ e = raises(errors.RequirementError, o.validate, None, 'cli')
+ assert_equal(e.name, 'my_param')
+
+ # Test with multivalue=True:
+ o = self.cls('my_param', multivalue=True)
+ e = raises(TypeError, o.validate, [], 'cli')
+ assert str(e) == TYPE_ERROR % ('value', tuple, [], list)
+ e = raises(ValueError, o.validate, tuple(), 'cli')
+ assert str(e) == 'value: empty tuple must be converted to None'
+
+ # Test with wrong (scalar) type:
+ e = raises(TypeError, o.validate, (None, None, 42, None), 'cli')
+ assert str(e) == TYPE_ERROR % ('my_param', NoneType, 42, int)
+ o = self.cls('my_param')
+ e = raises(TypeError, o.validate, 'Hello', 'cli')
+ assert str(e) == TYPE_ERROR % ('my_param', NoneType, 'Hello', str)
+
+ class Example(self.cls):
+ type = int
+
+ # Test with some rules and multivalue=False
+ pass1 = DummyRule()
+ pass2 = DummyRule()
+ fail = DummyRule(u'no good')
+ o = Example('example', pass1, pass2)
+ assert o.multivalue is False
+ assert o.validate(11, 'cli') is None
+ assert pass1.calls == [(text.ugettext, 11)]
+ assert pass2.calls == [(text.ugettext, 11)]
+ pass1.reset()
+ pass2.reset()
+ o = Example('example', pass1, pass2, fail)
+ e = raises(errors.ValidationError, o.validate, 42, 'cli')
+ assert e.name == 'example'
+ assert e.error == u'no good'
+ assert e.index is None
+ assert pass1.calls == [(text.ugettext, 42)]
+ assert pass2.calls == [(text.ugettext, 42)]
+ assert fail.calls == [(text.ugettext, 42)]
+
+ # Test with some rules and multivalue=True
+ pass1 = DummyRule()
+ pass2 = DummyRule()
+ fail = DummyRule(u'this one is not good')
+ o = Example('example', pass1, pass2, multivalue=True)
+ assert o.multivalue is True
+ assert o.validate((3, 9), 'cli') is None
+ assert pass1.calls == [
+ (text.ugettext, 3),
+ (text.ugettext, 9),
+ ]
+ assert pass2.calls == [
+ (text.ugettext, 3),
+ (text.ugettext, 9),
+ ]
+ pass1.reset()
+ pass2.reset()
+ o = Example('multi_example', pass1, pass2, fail, multivalue=True)
+ assert o.multivalue is True
+ e = raises(errors.ValidationError, o.validate, (3, 9), 'cli')
+ assert e.name == 'multi_example'
+ assert e.error == u'this one is not good'
+ assert e.index == 0
+ assert pass1.calls == [(text.ugettext, 3)]
+ assert pass2.calls == [(text.ugettext, 3)]
+ assert fail.calls == [(text.ugettext, 3)]
+
+ def test_validate_scalar(self):
+ """
+ Test the `ipalib.parameters.Param._validate_scalar` method.
+ """
+ class MyParam(self.cls):
+ type = bool
+ okay = DummyRule()
+ o = MyParam('my_param', okay)
+
+ # Test that TypeError is appropriately raised:
+ e = raises(TypeError, o._validate_scalar, 0)
+ assert str(e) == TYPE_ERROR % ('my_param', bool, 0, int)
+ e = raises(TypeError, o._validate_scalar, 'Hi', index=4)
+ assert str(e) == TYPE_ERROR % ('my_param', bool, 'Hi', str)
+ e = raises(TypeError, o._validate_scalar, True, index=3.0)
+ assert str(e) == TYPE_ERROR % ('index', int, 3.0, float)
+
+ # Test with passing rule:
+ assert o._validate_scalar(True, index=None) is None
+ assert o._validate_scalar(False, index=None) is None
+ assert okay.calls == [
+ (text.ugettext, True),
+ (text.ugettext, False),
+ ]
+
+ # Test with a failing rule:
+ okay = DummyRule()
+ fail = DummyRule(u'this describes the error')
+ o = MyParam('my_param', okay, fail)
+ e = raises(errors.ValidationError, o._validate_scalar, True)
+ assert e.name == 'my_param'
+ assert e.error == u'this describes the error'
+ assert e.index is None
+ e = raises(errors.ValidationError, o._validate_scalar, False, index=2)
+ assert e.name == 'my_param'
+ assert e.error == u'this describes the error'
+ assert e.index == 2
+ assert okay.calls == [
+ (text.ugettext, True),
+ (text.ugettext, False),
+ ]
+ assert fail.calls == [
+ (text.ugettext, True),
+ (text.ugettext, False),
+ ]
+
+ def test_get_default(self):
+ """
+ Test the `ipalib.parameters.Param.get_default` method.
+ """
+ class PassThrough(object):
+ value = None
+
+ def __call__(self, value):
+ assert self.value is None
+ assert value is not None
+ self.value = value
+ return value
+
+ def reset(self):
+ assert self.value is not None
+ self.value = None
+
+ class Str(self.cls):
+ type = unicode
+
+ def __init__(self, name, **kw):
+ self._convert_scalar = PassThrough()
+ super(Str, self).__init__(name, **kw)
+
+ # Test with only a static default:
+ o = Str('my_str',
+ normalizer=PassThrough(),
+ default=u'Static Default',
+ )
+ assert_equal(o.get_default(), u'Static Default')
+ assert o._convert_scalar.value is None
+ assert o.normalizer.value is None
+
+ # Test with default_from:
+ o = Str('my_str',
+ normalizer=PassThrough(),
+ default=u'Static Default',
+ default_from=lambda first, last: first[0] + last,
+ )
+ assert_equal(o.get_default(), u'Static Default')
+ assert o._convert_scalar.value is None
+ assert o.normalizer.value is None
+ default = o.get_default(first=u'john', last='doe')
+ assert_equal(default, u'jdoe')
+ assert o._convert_scalar.value is default
+ assert o.normalizer.value is default
+
+
+class test_Flag(ClassChecker):
+ """
+ Test the `ipalib.parameters.Flag` class.
+ """
+ _cls = parameters.Flag
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Flag.__init__` method.
+ """
+ # Test with no kwargs:
+ o = self.cls('my_flag')
+ assert o.type is bool
+ assert isinstance(o, parameters.Bool)
+ assert o.autofill is True
+ assert o.default is False
+
+ # Test that TypeError is raise if default is not a bool:
+ e = raises(TypeError, self.cls, 'my_flag', default=None)
+ assert str(e) == TYPE_ERROR % ('default', bool, None, NoneType)
+
+ # Test with autofill=False, default=True
+ o = self.cls('my_flag', autofill=False, default=True)
+ assert o.autofill is True
+ assert o.default is True
+
+ # Test when cloning:
+ orig = self.cls('my_flag')
+ for clone in [orig.clone(), orig.clone(autofill=False)]:
+ assert clone.autofill is True
+ assert clone.default is False
+ assert clone is not orig
+ assert type(clone) is self.cls
+
+ # Test when cloning with default=True/False
+ orig = self.cls('my_flag')
+ assert orig.clone().default is False
+ assert orig.clone(default=True).default is True
+ orig = self.cls('my_flag', default=True)
+ assert orig.clone().default is True
+ assert orig.clone(default=False).default is False
+
+
+class test_Data(ClassChecker):
+ """
+ Test the `ipalib.parameters.Data` class.
+ """
+ _cls = parameters.Data
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Data.__init__` method.
+ """
+ o = self.cls('my_data')
+ assert o.type is NoneType
+ assert o.password is False
+ assert o.rules == tuple()
+ assert o.class_rules == tuple()
+ assert o.all_rules == tuple()
+ assert o.minlength is None
+ assert o.maxlength is None
+ assert o.length is None
+ assert o.pattern is None
+
+ # Test mixing length with minlength or maxlength:
+ o = self.cls('my_data', length=5)
+ assert o.length == 5
+ permutations = [
+ dict(minlength=3),
+ dict(maxlength=7),
+ dict(minlength=3, maxlength=7),
+ ]
+ for kw in permutations:
+ o = self.cls('my_data', **kw)
+ for (key, value) in kw.iteritems():
+ assert getattr(o, key) == value
+ e = raises(ValueError, self.cls, 'my_data', length=5, **kw)
+ assert str(e) == \
+ "Data('my_data'): cannot mix length with minlength or maxlength"
+
+ # Test when minlength or maxlength are less than 1:
+ e = raises(ValueError, self.cls, 'my_data', minlength=0)
+ assert str(e) == "Data('my_data'): minlength must be >= 1; got 0"
+ e = raises(ValueError, self.cls, 'my_data', maxlength=0)
+ assert str(e) == "Data('my_data'): maxlength must be >= 1; got 0"
+
+ # Test when minlength > maxlength:
+ e = raises(ValueError, self.cls, 'my_data', minlength=22, maxlength=15)
+ assert str(e) == \
+ "Data('my_data'): minlength > maxlength (minlength=22, maxlength=15)"
+
+ # Test when minlength == maxlength
+ e = raises(ValueError, self.cls, 'my_data', minlength=7, maxlength=7)
+ assert str(e) == \
+ "Data('my_data'): minlength == maxlength; use length=7 instead"
+
+
+class test_Bytes(ClassChecker):
+ """
+ Test the `ipalib.parameters.Bytes` class.
+ """
+ _cls = parameters.Bytes
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Bytes.__init__` method.
+ """
+ o = self.cls('my_bytes')
+ assert o.type is str
+ assert o.password is False
+ assert o.rules == tuple()
+ assert o.class_rules == tuple()
+ assert o.all_rules == tuple()
+ assert o.minlength is None
+ assert o.maxlength is None
+ assert o.length is None
+ assert o.pattern is None
+ assert o.re is None
+
+ # Test mixing length with minlength or maxlength:
+ o = self.cls('my_bytes', length=5)
+ assert o.length == 5
+ assert len(o.class_rules) == 1
+ assert len(o.rules) == 0
+ assert len(o.all_rules) == 1
+ permutations = [
+ dict(minlength=3),
+ dict(maxlength=7),
+ dict(minlength=3, maxlength=7),
+ ]
+ for kw in permutations:
+ o = self.cls('my_bytes', **kw)
+ assert len(o.class_rules) == len(kw)
+ assert len(o.rules) == 0
+ assert len(o.all_rules) == len(kw)
+ for (key, value) in kw.iteritems():
+ assert getattr(o, key) == value
+ e = raises(ValueError, self.cls, 'my_bytes', length=5, **kw)
+ assert str(e) == \
+ "Bytes('my_bytes'): cannot mix length with minlength or maxlength"
+
+ # Test when minlength or maxlength are less than 1:
+ e = raises(ValueError, self.cls, 'my_bytes', minlength=0)
+ assert str(e) == "Bytes('my_bytes'): minlength must be >= 1; got 0"
+ e = raises(ValueError, self.cls, 'my_bytes', maxlength=0)
+ assert str(e) == "Bytes('my_bytes'): maxlength must be >= 1; got 0"
+
+ # Test when minlength > maxlength:
+ e = raises(ValueError, self.cls, 'my_bytes', minlength=22, maxlength=15)
+ assert str(e) == \
+ "Bytes('my_bytes'): minlength > maxlength (minlength=22, maxlength=15)"
+
+ # Test when minlength == maxlength
+ e = raises(ValueError, self.cls, 'my_bytes', minlength=7, maxlength=7)
+ assert str(e) == \
+ "Bytes('my_bytes'): minlength == maxlength; use length=7 instead"
+
+ def test_rule_minlength(self):
+ """
+ Test the `ipalib.parameters.Bytes._rule_minlength` method.
+ """
+ o = self.cls('my_bytes', minlength=3)
+ assert o.minlength == 3
+ rule = o._rule_minlength
+ translation = u'minlength=%(minlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in ('abc', 'four', '12345'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in ('', 'a', '12'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(minlength=3)
+ )
+ assert dummy.message == 'must be at least %(minlength)d bytes'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_maxlength(self):
+ """
+ Test the `ipalib.parameters.Bytes._rule_maxlength` method.
+ """
+ o = self.cls('my_bytes', maxlength=4)
+ assert o.maxlength == 4
+ rule = o._rule_maxlength
+ translation = u'maxlength=%(maxlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in ('ab', '123', 'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in ('12345', 'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(maxlength=4)
+ )
+ assert dummy.message == 'can be at most %(maxlength)d bytes'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_length(self):
+ """
+ Test the `ipalib.parameters.Bytes._rule_length` method.
+ """
+ o = self.cls('my_bytes', length=4)
+ assert o.length == 4
+ rule = o._rule_length
+ translation = u'length=%(length)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in ('1234', 'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in ('ab', '123', '12345', 'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(length=4),
+ )
+ assert dummy.message == 'must be exactly %(length)d bytes'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_pattern(self):
+ """
+ Test the `ipalib.parameters.Bytes._rule_pattern` method.
+ """
+ # Test our assumptions about Python re module and Unicode:
+ pat = '\w+$'
+ r = re.compile(pat)
+ assert r.match('Hello_World') is not None
+ assert r.match(utf8_bytes) is None
+ assert r.match(binary_bytes) is None
+
+ # Create instance:
+ o = self.cls('my_bytes', pattern=pat)
+ assert o.pattern is pat
+ rule = o._rule_pattern
+ translation = u'pattern=%(pattern)r'
+ dummy = dummy_ugettext(translation)
+
+ # Test with passing values:
+ for value in ('HELLO', 'hello', 'Hello_World'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in ('Hello!', 'Hello World', utf8_bytes, binary_bytes):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(pattern=pat),
+ )
+ assert_equal(dummy.message, 'must match pattern "%(pattern)s"')
+ assert dummy.called() is True
+ dummy.reset()
+
+
+class test_Str(ClassChecker):
+ """
+ Test the `ipalib.parameters.Str` class.
+ """
+ _cls = parameters.Str
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Str.__init__` method.
+ """
+ o = self.cls('my_str')
+ assert o.type is unicode
+ assert o.password is False
+ assert o.minlength is None
+ assert o.maxlength is None
+ assert o.length is None
+ assert o.pattern is None
+
+ def test_convert_scalar(self):
+ """
+ Test the `ipalib.parameters.Str._convert_scalar` method.
+ """
+ o = self.cls('my_str')
+ mthd = o._convert_scalar
+ for value in (u'Hello', 42, 1.2, unicode_str):
+ assert mthd(value) == unicode(value)
+ bad = [True, 'Hello', dict(one=1), utf8_bytes]
+ for value in bad:
+ e = raises(errors.ConversionError, mthd, value)
+ assert e.name == 'my_str'
+ assert e.index is None
+ assert_equal(unicode(e.error), u'must be Unicode text')
+ e = raises(errors.ConversionError, mthd, value, index=18)
+ assert e.name == 'my_str'
+ assert e.index == 18
+ assert_equal(unicode(e.error), u'must be Unicode text')
+ bad = [(u'Hello',), [42.3]]
+ for value in bad:
+ e = raises(errors.ConversionError, mthd, value)
+ assert e.name == 'my_str'
+ assert e.index is None
+ assert_equal(unicode(e.error), u'Only one value is allowed')
+ assert o.convert(None) is None
+
+ def test_rule_minlength(self):
+ """
+ Test the `ipalib.parameters.Str._rule_minlength` method.
+ """
+ o = self.cls('my_str', minlength=3)
+ assert o.minlength == 3
+ rule = o._rule_minlength
+ translation = u'minlength=%(minlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (u'abc', u'four', u'12345'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (u'', u'a', u'12'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(minlength=3)
+ )
+ assert dummy.message == 'must be at least %(minlength)d characters'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_maxlength(self):
+ """
+ Test the `ipalib.parameters.Str._rule_maxlength` method.
+ """
+ o = self.cls('my_str', maxlength=4)
+ assert o.maxlength == 4
+ rule = o._rule_maxlength
+ translation = u'maxlength=%(maxlength)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (u'ab', u'123', u'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (u'12345', u'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(maxlength=4)
+ )
+ assert dummy.message == 'can be at most %(maxlength)d characters'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_length(self):
+ """
+ Test the `ipalib.parameters.Str._rule_length` method.
+ """
+ o = self.cls('my_str', length=4)
+ assert o.length == 4
+ rule = o._rule_length
+ translation = u'length=%(length)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (u'1234', u'four'):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (u'ab', u'123', u'12345', u'sixsix'):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(length=4),
+ )
+ assert dummy.message == 'must be exactly %(length)d characters'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_pattern(self):
+ """
+ Test the `ipalib.parameters.Str._rule_pattern` method.
+ """
+ # Test our assumptions about Python re module and Unicode:
+ pat = '\w{5}$'
+ r1 = re.compile(pat)
+ r2 = re.compile(pat, re.UNICODE)
+ assert r1.match(unicode_str) is None
+ assert r2.match(unicode_str) is not None
+
+ # Create instance:
+ o = self.cls('my_str', pattern=pat)
+ assert o.pattern is pat
+ rule = o._rule_pattern
+ translation = u'pattern=%(pattern)r'
+ dummy = dummy_ugettext(translation)
+
+ # Test with passing values:
+ for value in (u'HELLO', u'hello', unicode_str):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (u'H LLO', u'***lo', unicode_str + unicode_str):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(pattern=pat),
+ )
+ assert_equal(dummy.message, 'must match pattern "%(pattern)s"')
+ assert dummy.called() is True
+ dummy.reset()
+
+
+class test_Password(ClassChecker):
+ """
+ Test the `ipalib.parameters.Password` class.
+ """
+ _cls = parameters.Password
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Password.__init__` method.
+ """
+ o = self.cls('my_password')
+ assert o.type is unicode
+ assert o.minlength is None
+ assert o.maxlength is None
+ assert o.length is None
+ assert o.pattern is None
+ assert o.password is True
+
+ def test_convert_scalar(self):
+ """
+ Test the `ipalib.parameters.Password._convert_scalar` method.
+ """
+ o = self.cls('my_password')
+ e = raises(errors.PasswordMismatch, o._convert_scalar, [u'one', u'two'])
+ assert e.name == 'my_password'
+ assert e.index is None
+ assert o._convert_scalar([u'one', u'one']) == u'one'
+ assert o._convert_scalar(u'one') == u'one'
+
+
+class test_StrEnum(ClassChecker):
+ """
+ Test the `ipalib.parameters.StrEnum` class.
+ """
+ _cls = parameters.StrEnum
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.StrEnum.__init__` method.
+ """
+ values = (u'Hello', u'naughty', u'nurse!')
+ o = self.cls('my_strenum', values=values)
+ assert o.type is unicode
+ assert o.values is values
+ assert o.class_rules == (o._rule_values,)
+ assert o.rules == tuple()
+ assert o.all_rules == (o._rule_values,)
+
+ badvalues = (u'Hello', 'naughty', u'nurse!')
+ e = raises(TypeError, self.cls, 'my_enum', values=badvalues)
+ assert str(e) == TYPE_ERROR % (
+ "StrEnum('my_enum') values[1]", unicode, 'naughty', str
+ )
+
+ # Test that ValueError is raised when list of values is empty
+ badvalues = tuple()
+ e = raises(ValueError, self.cls, 'empty_enum', values=badvalues)
+ assert_equal(str(e), "StrEnum('empty_enum'): list of values must not "
+ "be empty")
+
+ def test_rules_values(self):
+ """
+ Test the `ipalib.parameters.StrEnum._rule_values` method.
+ """
+ values = (u'Hello', u'naughty', u'nurse!')
+ o = self.cls('my_enum', values=values)
+ rule = o._rule_values
+ translation = u"values='Hello', 'naughty', 'nurse!'"
+ dummy = dummy_ugettext(translation)
+
+ # Test with passing values:
+ for v in values:
+ assert rule(dummy, v) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for val in (u'Howdy', u'quiet', u'library!'):
+ assert_equal(
+ rule(dummy, val),
+ translation % dict(values=values),
+ )
+ assert_equal(dummy.message, "must be one of %(values)s")
+ dummy.reset()
+
+ # test a special case when we have just one allowed value
+ values = (u'Hello', )
+ o = self.cls('my_enum', values=values)
+ rule = o._rule_values
+ translation = u"value='Hello'"
+ dummy = dummy_ugettext(translation)
+
+ for val in (u'Howdy', u'quiet', u'library!'):
+ assert_equal(
+ rule(dummy, val),
+ translation % dict(values=values),
+ )
+ assert_equal(dummy.message, "must be '%(value)s'")
+ dummy.reset()
+
+
+class test_Number(ClassChecker):
+ """
+ Test the `ipalib.parameters.Number` class.
+ """
+ _cls = parameters.Number
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Number.__init__` method.
+ """
+ o = self.cls('my_number')
+ assert o.type is NoneType
+ assert o.password is False
+ assert o.rules == tuple()
+ assert o.class_rules == tuple()
+ assert o.all_rules == tuple()
+
+
+
+class test_Int(ClassChecker):
+ """
+ Test the `ipalib.parameters.Int` class.
+ """
+ _cls = parameters.Int
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Int.__init__` method.
+ """
+ # Test with no kwargs:
+ o = self.cls('my_number')
+ assert o.type is int
+ assert isinstance(o, parameters.Int)
+ assert o.minvalue == int(MININT)
+ assert o.maxvalue == int(MAXINT)
+
+ # Test when min > max:
+ e = raises(ValueError, self.cls, 'my_number', minvalue=22, maxvalue=15)
+ assert str(e) == \
+ "Int('my_number'): minvalue > maxvalue (minvalue=22, maxvalue=15)"
+
+ def test_rule_minvalue(self):
+ """
+ Test the `ipalib.parameters.Int._rule_minvalue` method.
+ """
+ o = self.cls('my_number', minvalue=3)
+ assert o.minvalue == 3
+ rule = o._rule_minvalue
+ translation = u'minvalue=%(minvalue)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (4, 99, 1001):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (-1, 0, 2):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(minvalue=3)
+ )
+ assert dummy.message == 'must be at least %(minvalue)d'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_maxvalue(self):
+ """
+ Test the `ipalib.parameters.Int._rule_maxvalue` method.
+ """
+ o = self.cls('my_number', maxvalue=4)
+ assert o.maxvalue == 4
+ rule = o._rule_maxvalue
+ translation = u'maxvalue=%(maxvalue)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (-1, 0, 4):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (5, 99, 1009):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(maxvalue=4)
+ )
+ assert dummy.message == 'can be at most %(maxvalue)d'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_convert_scalar(self):
+ """
+ Test the `ipalib.parameters.Int._convert_scalar` method.
+ Assure radix prefixes work, str objects fail,
+ floats (native & string) are truncated,
+ large magnitude values are promoted to long,
+ empty strings & invalid numerical representations fail
+ """
+ o = self.cls('my_number')
+ # Assure invalid inputs raise error
+ for bad in ['hello', u'hello', True, None, u'', u'.']:
+ e = raises(errors.ConversionError, o._convert_scalar, bad)
+ assert e.name == 'my_number'
+ assert e.index is None
+ # Assure large magnatude values are handled correctly
+ assert type(o._convert_scalar(sys.maxint*2)) == long
+ assert o._convert_scalar(sys.maxint*2) == sys.maxint*2
+ assert o._convert_scalar(unicode(sys.maxint*2)) == sys.maxint*2
+ assert o._convert_scalar(long(16)) == 16
+ # Assure normal conversions produce expected result
+ assert o._convert_scalar(u'16.99') == 16
+ assert o._convert_scalar(16.99) == 16
+ assert o._convert_scalar(u'16') == 16
+ assert o._convert_scalar(u'0x10') == 16
+ assert o._convert_scalar(u'020') == 16
+
+class test_Decimal(ClassChecker):
+ """
+ Test the `ipalib.parameters.Decimal` class.
+ """
+ _cls = parameters.Decimal
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.Decimal.__init__` method.
+ """
+ # Test with no kwargs:
+ o = self.cls('my_number')
+ assert o.type is Decimal
+ assert isinstance(o, parameters.Decimal)
+ assert o.minvalue is None
+ assert o.maxvalue is None
+
+ # Test when min > max:
+ e = raises(ValueError, self.cls, 'my_number', minvalue=Decimal('22.5'), maxvalue=Decimal('15.1'))
+ assert str(e) == \
+ "Decimal('my_number'): minvalue > maxvalue (minvalue=22.5, maxvalue=15.1)"
+
+ def test_rule_minvalue(self):
+ """
+ Test the `ipalib.parameters.Decimal._rule_minvalue` method.
+ """
+ o = self.cls('my_number', minvalue='3.1')
+ assert o.minvalue == Decimal('3.1')
+ rule = o._rule_minvalue
+ translation = u'minvalue=%(minvalue)s'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (Decimal('3.2'), Decimal('99.0')):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (Decimal('-1.2'), Decimal('0.0'), Decimal('3.0')):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(minvalue=Decimal('3.1'))
+ )
+ assert dummy.message == 'must be at least %(minvalue)s'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_rule_maxvalue(self):
+ """
+ Test the `ipalib.parameters.Decimal._rule_maxvalue` method.
+ """
+ o = self.cls('my_number', maxvalue='4.7')
+ assert o.maxvalue == Decimal('4.7')
+ rule = o._rule_maxvalue
+ translation = u'maxvalue=%(maxvalue)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+
+ # Test with passing values:
+ for value in (Decimal('-1.0'), Decimal('0.1'), Decimal('4.2')):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # Test with failing values:
+ for value in (Decimal('5.3'), Decimal('99.9')):
+ assert_equal(
+ rule(dummy, value),
+ translation % dict(maxvalue=Decimal('4.7'))
+ )
+ assert dummy.message == 'can be at most %(maxvalue)s'
+ assert dummy.called() is True
+ dummy.reset()
+
+ def test_precision(self):
+ """
+ Test the `ipalib.parameters.Decimal` precision attribute
+ """
+ # precission is None
+ param = self.cls('my_number')
+
+ for value in (Decimal('0'), Decimal('4.4'), Decimal('4.67')):
+ assert_equal(
+ param(value),
+ value)
+
+ # precision is 0
+ param = self.cls('my_number', precision=0)
+ for original,expected in ((Decimal('0'), '0'),
+ (Decimal('1.1'), '1'),
+ (Decimal('4.67'), '5')):
+ assert_equal(
+ str(param(original)),
+ expected)
+
+ # precision is 1
+ param = self.cls('my_number', precision=1)
+ for original,expected in ((Decimal('0'), '0.0'),
+ (Decimal('1.1'), '1.1'),
+ (Decimal('4.67'), '4.7')):
+ assert_equal(
+ str(param(original)),
+ expected)
+
+ # value has too many digits
+ param = self.cls('my_number', precision=1)
+ e = raises(ConversionError, param, '123456789012345678901234567890')
+
+ assert str(e) == \
+ "invalid 'my_number': quantize result has too many digits for current context"
+
+ def test_exponential(self):
+ """
+ Test the `ipalib.parameters.Decimal` exponential attribute
+ """
+ param = self.cls('my_number', exponential=True)
+ for original,expected in ((Decimal('0'), '0'),
+ (Decimal('1E3'), '1E+3'),
+ (Decimal('3.4E2'), '3.4E+2')):
+ assert_equal(
+ str(param(original)),
+ expected)
+
+
+ param = self.cls('my_number', exponential=False)
+ for original,expected in ((Decimal('0'), '0'),
+ (Decimal('1E3'), '1000'),
+ (Decimal('3.4E2'), '340')):
+ assert_equal(
+ str(param(original)),
+ expected)
+
+ def test_numberclass(self):
+ """
+ Test the `ipalib.parameters.Decimal` numberclass attribute
+ """
+ # test default value: '-Normal', '+Zero', '+Normal'
+ param = self.cls('my_number')
+ for value,raises_verror in ((Decimal('0'), False),
+ (Decimal('-0'), True),
+ (Decimal('1E8'), False),
+ (Decimal('-1.1'), False),
+ (Decimal('-Infinity'), True),
+ (Decimal('+Infinity'), True),
+ (Decimal('NaN'), True)):
+ if raises_verror:
+ raises(ValidationError, param, value)
+ else:
+ param(value)
+
+
+ param = self.cls('my_number', exponential=True,
+ numberclass=('-Normal', '+Zero', '+Infinity'))
+ for value,raises_verror in ((Decimal('0'), False),
+ (Decimal('-0'), True),
+ (Decimal('1E8'), True),
+ (Decimal('-1.1'), False),
+ (Decimal('-Infinity'), True),
+ (Decimal('+Infinity'), False),
+ (Decimal('NaN'), True)):
+ if raises_verror:
+ raises(ValidationError, param, value)
+ else:
+ param(value)
+
+class test_AccessTime(ClassChecker):
+ """
+ Test the `ipalib.parameters.AccessTime` class.
+ """
+ _cls = parameters.AccessTime
+
+ def test_init(self):
+ """
+ Test the `ipalib.parameters.AccessTime.__init__` method.
+ """
+ # Test with no kwargs:
+ o = self.cls('my_time')
+ assert o.type is unicode
+ assert isinstance(o, parameters.AccessTime)
+ assert o.multivalue is False
+ translation = u'length=%(length)r'
+ dummy = dummy_ugettext(translation)
+ assert dummy.translation is translation
+ rule = o._rule_required
+
+ # Check some good rules
+ for value in (u'absolute 201012161032 ~ 201012161033',
+ u'periodic monthly week 2 day Sat,Sun 0900-1300',
+ u'periodic yearly month 4 day 1-31 0800-1400',
+ u'periodic weekly day 7 0800-1400',
+ u'periodic daily 0800-1400',
+ ):
+ assert rule(dummy, value) is None
+ assert dummy.called() is False
+
+ # And some bad ones
+ for value in (u'absolute 201012161032 - 201012161033',
+ u'absolute 201012161032 ~',
+ u'periodic monthly day Sat,Sun 0900-1300',
+ u'periodical yearly month 4 day 1-31 0800-1400',
+ u'periodic weekly day 8 0800-1400',
+ ):
+ e = raises(ValidationError, o._rule_required, None, value)
+
+def test_create_param():
+ """
+ Test the `ipalib.parameters.create_param` function.
+ """
+ f = parameters.create_param
+
+ # Test that Param instances are returned unchanged:
+ params = (
+ parameters.Param('one?'),
+ parameters.Int('two+'),
+ parameters.Str('three*'),
+ parameters.Bytes('four'),
+ )
+ for p in params:
+ assert f(p) is p
+
+ # Test that the spec creates an Str instance:
+ for spec in ('one?', 'two+', 'three*', 'four'):
+ (name, kw) = parameters.parse_param_spec(spec)
+ p = f(spec)
+ assert p.param_spec is spec
+ assert p.name == name
+ assert p.required is kw['required']
+ assert p.multivalue is kw['multivalue']
+
+ # Test that TypeError is raised when spec is neither a Param nor a str:
+ for spec in (u'one', 42, parameters.Param, parameters.Str):
+ e = raises(TypeError, f, spec)
+ assert str(e) == \
+ TYPE_ERROR % ('spec', (str, parameters.Param), spec, type(spec))
+
+
+def test_messages():
+ """
+ Test module level message in `ipalib.parameters`.
+ """
+ for name in dir(parameters):
+ if name.startswith('_'):
+ continue
+ attr = getattr(parameters, name)
+ if not (isclass(attr) and issubclass(attr, parameters.Param)):
+ continue
+ assert type(attr.type_error) is str
+ assert attr.type_error in parameters.__messages
+
+
+class test_IA5Str(ClassChecker):
+ """
+ Test the `ipalib.parameters.IA5Str` class.
+ """
+ _cls = parameters.IA5Str
+
+ def test_convert_scalar(self):
+ """
+ Test the `ipalib.parameters.IA5Str._convert_scalar` method.
+ """
+ o = self.cls('my_str')
+ mthd = o._convert_scalar
+ for value in (u'Hello', 42, 1.2):
+ assert mthd(value) == unicode(value)
+ bad = ['Helloá']
+ for value in bad:
+ e = raises(errors.ConversionError, mthd, value)
+ assert e.name == 'my_str'
+ assert e.index is None
+ assert_equal(e.error, "The character '\\xc3' is not allowed.")
diff --git a/ipatests/test_ipalib/test_plugable.py b/ipatests/test_ipalib/test_plugable.py
new file mode 100644
index 000000000..c495e74dc
--- /dev/null
+++ b/ipatests/test_ipalib/test_plugable.py
@@ -0,0 +1,516 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.plugable` module.
+"""
+
+import inspect
+from ipatests.util import raises, no_set, no_del, read_only
+from ipatests.util import getitem, setitem, delitem
+from ipatests.util import ClassChecker, create_test_api
+from ipalib import plugable, errors, text
+
+
+class test_SetProxy(ClassChecker):
+ """
+ Test the `ipalib.plugable.SetProxy` class.
+ """
+ _cls = plugable.SetProxy
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.SetProxy` class.
+ """
+ assert self.cls.__bases__ == (plugable.ReadOnly,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.SetProxy.__init__` method.
+ """
+ okay = (set, frozenset, dict)
+ fail = (list, tuple)
+ for t in okay:
+ self.cls(t())
+ raises(TypeError, self.cls, t)
+ for t in fail:
+ raises(TypeError, self.cls, t())
+ raises(TypeError, self.cls, t)
+
+ def test_SetProxy(self):
+ """
+ Test container emulation of `ipalib.plugable.SetProxy` class.
+ """
+ def get_key(i):
+ return 'key_%d' % i
+
+ cnt = 10
+ target = set()
+ proxy = self.cls(target)
+ for i in xrange(cnt):
+ key = get_key(i)
+
+ # Check initial state
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert key not in proxy
+ assert key not in target
+
+ # Add and test again
+ target.add(key)
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert key in proxy
+ assert key in target
+
+
+class test_DictProxy(ClassChecker):
+ """
+ Test the `ipalib.plugable.DictProxy` class.
+ """
+ _cls = plugable.DictProxy
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.DictProxy` class.
+ """
+ assert self.cls.__bases__ == (plugable.SetProxy,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.DictProxy.__init__` method.
+ """
+ self.cls(dict())
+ raises(TypeError, self.cls, dict)
+ fail = (set, frozenset, list, tuple)
+ for t in fail:
+ raises(TypeError, self.cls, t())
+ raises(TypeError, self.cls, t)
+
+ def test_DictProxy(self):
+ """
+ Test container emulation of `ipalib.plugable.DictProxy` class.
+ """
+ def get_kv(i):
+ return (
+ 'key_%d' % i,
+ 'val_%d' % i,
+ )
+ cnt = 10
+ target = dict()
+ proxy = self.cls(target)
+ for i in xrange(cnt):
+ (key, val) = get_kv(i)
+
+ # Check initial state
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert list(proxy()) == [target[k] for k in sorted(target)]
+ assert key not in proxy
+ raises(KeyError, getitem, proxy, key)
+
+ # Add and test again
+ target[key] = val
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert list(proxy()) == [target[k] for k in sorted(target)]
+
+ # Verify TypeError is raised trying to set/del via proxy
+ raises(TypeError, setitem, proxy, key, val)
+ raises(TypeError, delitem, proxy, key)
+
+
+class test_MagicDict(ClassChecker):
+ """
+ Test the `ipalib.plugable.MagicDict` class.
+ """
+ _cls = plugable.MagicDict
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.MagicDict` class.
+ """
+ assert self.cls.__bases__ == (plugable.DictProxy,)
+ for non_dict in ('hello', 69, object):
+ raises(TypeError, self.cls, non_dict)
+
+ def test_MagicDict(self):
+ """
+ Test container emulation of `ipalib.plugable.MagicDict` class.
+ """
+ cnt = 10
+ keys = []
+ d = dict()
+ dictproxy = self.cls(d)
+ for i in xrange(cnt):
+ key = 'key_%d' % i
+ val = 'val_%d' % i
+ keys.append(key)
+
+ # Test thet key does not yet exist
+ assert len(dictproxy) == i
+ assert key not in dictproxy
+ assert not hasattr(dictproxy, key)
+ raises(KeyError, getitem, dictproxy, key)
+ raises(AttributeError, getattr, dictproxy, key)
+
+ # Test that items/attributes cannot be set on dictproxy:
+ raises(TypeError, setitem, dictproxy, key, val)
+ raises(AttributeError, setattr, dictproxy, key, val)
+
+ # Test that additions in d are reflected in dictproxy:
+ d[key] = val
+ assert len(dictproxy) == i + 1
+ assert key in dictproxy
+ assert hasattr(dictproxy, key)
+ assert dictproxy[key] is val
+ assert read_only(dictproxy, key) is val
+
+ # Test __iter__
+ assert list(dictproxy) == keys
+
+ for key in keys:
+ # Test that items cannot be deleted through dictproxy:
+ raises(TypeError, delitem, dictproxy, key)
+ raises(AttributeError, delattr, dictproxy, key)
+
+ # Test that deletions in d are reflected in dictproxy
+ del d[key]
+ assert len(dictproxy) == len(d)
+ assert key not in dictproxy
+ raises(KeyError, getitem, dictproxy, key)
+ raises(AttributeError, getattr, dictproxy, key)
+
+
+class test_Plugin(ClassChecker):
+ """
+ Test the `ipalib.plugable.Plugin` class.
+ """
+ _cls = plugable.Plugin
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.Plugin` class.
+ """
+ assert self.cls.__bases__ == (plugable.ReadOnly,)
+ assert type(self.cls.api) is property
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.Plugin.__init__` method.
+ """
+ o = self.cls()
+ assert o.name == 'Plugin'
+ assert o.module == 'ipalib.plugable'
+ assert o.fullname == 'ipalib.plugable.Plugin'
+ assert isinstance(o.doc, text.Gettext)
+ class some_subclass(self.cls):
+ """
+ Do sub-classy things.
+
+ Although it doesn't know how to comport itself and is not for mixed
+ company, this class *is* useful as we all need a little sub-class
+ now and then.
+
+ One more paragraph.
+ """
+ o = some_subclass()
+ assert o.name == 'some_subclass'
+ assert o.module == __name__
+ assert o.fullname == '%s.some_subclass' % __name__
+ assert o.summary == 'Do sub-classy things.'
+ assert isinstance(o.doc, text.Gettext)
+ class another_subclass(self.cls):
+ pass
+ o = another_subclass()
+ assert o.summary == '<%s>' % o.fullname
+
+ # Test that Plugin makes sure the subclass hasn't defined attributes
+ # whose names conflict with the logger methods set in Plugin.__init__():
+ class check(self.cls):
+ info = 'whatever'
+ e = raises(StandardError, check)
+ assert str(e) == \
+ "info is already bound to ipatests.test_ipalib.test_plugable.check()"
+
+ def test_set_api(self):
+ """
+ Test the `ipalib.plugable.Plugin.set_api` method.
+ """
+ api = 'the api instance'
+ o = self.cls()
+ assert o.api is None
+ e = raises(AssertionError, o.set_api, None)
+ assert str(e) == 'set_api() argument cannot be None'
+ o.set_api(api)
+ assert o.api is api
+ e = raises(AssertionError, o.set_api, api)
+ assert str(e) == 'set_api() can only be called once'
+
+ def test_finalize(self):
+ """
+ Test the `ipalib.plugable.Plugin.finalize` method.
+ """
+ o = self.cls()
+ assert not o.__islocked__()
+ o.finalize()
+ assert o.__islocked__()
+
+ def test_call(self):
+ """
+ Test the `ipalib.plugable.Plugin.call` method.
+ """
+ o = self.cls()
+ o.call('/bin/true') is None
+ e = raises(errors.SubprocessError, o.call, '/bin/false')
+ assert e.returncode == 1
+ assert e.argv == ('/bin/false',)
+
+
+def test_Registrar():
+ """
+ Test the `ipalib.plugable.Registrar` class
+ """
+ class Base1(object):
+ pass
+ class Base2(object):
+ pass
+ class Base3(object):
+ pass
+ class plugin1(Base1):
+ pass
+ class plugin2(Base2):
+ pass
+ class plugin3(Base3):
+ pass
+
+ # Test creation of Registrar:
+ r = plugable.Registrar(Base1, Base2)
+
+ # Test __iter__:
+ assert list(r) == ['Base1', 'Base2']
+
+ # Test __hasitem__, __getitem__:
+ for base in [Base1, Base2]:
+ name = base.__name__
+ assert name in r
+ assert r[name] is base
+ magic = getattr(r, name)
+ assert type(magic) is plugable.MagicDict
+ assert len(magic) == 0
+
+ # Check that TypeError is raised trying to register something that isn't
+ # a class:
+ p = plugin1()
+ e = raises(TypeError, r, p)
+ assert str(e) == 'plugin must be a class; got %r' % p
+
+ # Check that SubclassError is raised trying to register a class that is
+ # not a subclass of an allowed base:
+ e = raises(errors.PluginSubclassError, r, plugin3)
+ assert e.plugin is plugin3
+
+ # Check that registration works
+ r(plugin1)
+ assert len(r.Base1) == 1
+ assert r.Base1['plugin1'] is plugin1
+ assert r.Base1.plugin1 is plugin1
+
+ # Check that DuplicateError is raised trying to register exact class
+ # again:
+ e = raises(errors.PluginDuplicateError, r, plugin1)
+ assert e.plugin is plugin1
+
+ # Check that OverrideError is raised trying to register class with same
+ # name and same base:
+ orig1 = plugin1
+ class base1_extended(Base1):
+ pass
+ class plugin1(base1_extended):
+ pass
+ e = raises(errors.PluginOverrideError, r, plugin1)
+ assert e.base == 'Base1'
+ assert e.name == 'plugin1'
+ assert e.plugin is plugin1
+
+ # Check that overriding works
+ r(plugin1, override=True)
+ assert len(r.Base1) == 1
+ assert r.Base1.plugin1 is plugin1
+ assert r.Base1.plugin1 is not orig1
+
+ # Check that MissingOverrideError is raised trying to override a name
+ # not yet registerd:
+ e = raises(errors.PluginMissingOverrideError, r, plugin2, override=True)
+ assert e.base == 'Base2'
+ assert e.name == 'plugin2'
+ assert e.plugin is plugin2
+
+ # Test that another plugin can be registered:
+ assert len(r.Base2) == 0
+ r(plugin2)
+ assert len(r.Base2) == 1
+ assert r.Base2.plugin2 is plugin2
+
+ # Setup to test more registration:
+ class plugin1a(Base1):
+ pass
+ r(plugin1a)
+
+ class plugin1b(Base1):
+ pass
+ r(plugin1b)
+
+ class plugin2a(Base2):
+ pass
+ r(plugin2a)
+
+ class plugin2b(Base2):
+ pass
+ r(plugin2b)
+
+ # Again test __hasitem__, __getitem__:
+ for base in [Base1, Base2]:
+ name = base.__name__
+ assert name in r
+ assert r[name] is base
+ magic = getattr(r, name)
+ assert len(magic) == 3
+ for key in magic:
+ klass = magic[key]
+ assert getattr(magic, key) is klass
+ assert issubclass(klass, base)
+
+
+class test_API(ClassChecker):
+ """
+ Test the `ipalib.plugable.API` class.
+ """
+
+ _cls = plugable.API
+
+ def test_API(self):
+ """
+ Test the `ipalib.plugable.API` class.
+ """
+ assert issubclass(plugable.API, plugable.ReadOnly)
+
+ # Setup the test bases, create the API:
+ class base0(plugable.Plugin):
+ def method(self, n):
+ return n
+
+ class base1(plugable.Plugin):
+ def method(self, n):
+ return n + 1
+
+ api = plugable.API(base0, base1)
+ api.env.mode = 'unit_test'
+ api.env.in_tree = True
+ r = api.register
+ assert isinstance(r, plugable.Registrar)
+ assert read_only(api, 'register') is r
+
+ class base0_plugin0(base0):
+ pass
+ r(base0_plugin0)
+
+ class base0_plugin1(base0):
+ pass
+ r(base0_plugin1)
+
+ class base0_plugin2(base0):
+ pass
+ r(base0_plugin2)
+
+ class base1_plugin0(base1):
+ pass
+ r(base1_plugin0)
+
+ class base1_plugin1(base1):
+ pass
+ r(base1_plugin1)
+
+ class base1_plugin2(base1):
+ pass
+ r(base1_plugin2)
+
+ # Test API instance:
+ assert api.isdone('bootstrap') is False
+ assert api.isdone('finalize') is False
+ api.finalize()
+ assert api.isdone('bootstrap') is True
+ assert api.isdone('finalize') is True
+
+ def get_base_name(b):
+ return 'base%d' % b
+
+
+ def get_plugin_name(b, p):
+ return 'base%d_plugin%d' % (b, p)
+
+ for b in xrange(2):
+ base_name = get_base_name(b)
+ base = locals()[base_name]
+ ns = getattr(api, base_name)
+ assert isinstance(ns, plugable.NameSpace)
+ assert read_only(api, base_name) is ns
+ assert len(ns) == 3
+ for p in xrange(3):
+ plugin_name = get_plugin_name(b, p)
+ plugin = locals()[plugin_name]
+ inst = ns[plugin_name]
+ assert isinstance(inst, base)
+ assert isinstance(inst, plugin)
+ assert inst.name == plugin_name
+ assert read_only(ns, plugin_name) is inst
+ assert inst.method(7) == 7 + b
+
+ # Test that calling finilize again raises AssertionError:
+ e = raises(StandardError, api.finalize)
+ assert str(e) == 'API.finalize() already called', str(e)
+
+ def test_bootstrap(self):
+ """
+ Test the `ipalib.plugable.API.bootstrap` method.
+ """
+ (o, home) = create_test_api()
+ assert o.env._isdone('_bootstrap') is False
+ assert o.env._isdone('_finalize_core') is False
+ assert o.isdone('bootstrap') is False
+ o.bootstrap(my_test_override='Hello, world!')
+ assert o.isdone('bootstrap') is True
+ assert o.env._isdone('_bootstrap') is True
+ assert o.env._isdone('_finalize_core') is True
+ assert o.env.my_test_override == 'Hello, world!'
+ e = raises(StandardError, o.bootstrap)
+ assert str(e) == 'API.bootstrap() already called'
+
+ def test_load_plugins(self):
+ """
+ Test the `ipalib.plugable.API.load_plugins` method.
+ """
+ (o, home) = create_test_api()
+ assert o.isdone('bootstrap') is False
+ assert o.isdone('load_plugins') is False
+ o.load_plugins()
+ assert o.isdone('bootstrap') is True
+ assert o.isdone('load_plugins') is True
+ e = raises(StandardError, o.load_plugins)
+ assert str(e) == 'API.load_plugins() already called'
diff --git a/ipatests/test_ipalib/test_rpc.py b/ipatests/test_ipalib/test_rpc.py
new file mode 100644
index 000000000..56b8184cf
--- /dev/null
+++ b/ipatests/test_ipalib/test_rpc.py
@@ -0,0 +1,244 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.rpc` module.
+"""
+
+import threading
+from xmlrpclib import Binary, Fault, dumps, loads, ServerProxy
+from ipatests.util import raises, assert_equal, PluginTester, DummyClass
+from ipatests.data import binary_bytes, utf8_bytes, unicode_str
+from ipalib.frontend import Command
+from ipalib.request import context, Connection
+from ipalib import rpc, errors
+
+
+std_compound = (binary_bytes, utf8_bytes, unicode_str)
+
+
+def dump_n_load(value):
+ (param, method) = loads(
+ dumps((value,), allow_none=True)
+ )
+ return param[0]
+
+
+def round_trip(value):
+ return rpc.xml_unwrap(
+ dump_n_load(rpc.xml_wrap(value))
+ )
+
+
+def test_round_trip():
+ """
+ Test `ipalib.rpc.xml_wrap` and `ipalib.rpc.xml_unwrap`.
+
+ This tests the two functions together with ``xmlrpclib.dumps()`` and
+ ``xmlrpclib.loads()`` in a full wrap/dumps/loads/unwrap round trip.
+ """
+ # We first test that our assumptions about xmlrpclib module in the Python
+ # standard library are correct:
+ assert_equal(dump_n_load(utf8_bytes), unicode_str)
+ assert_equal(dump_n_load(unicode_str), unicode_str)
+ assert_equal(dump_n_load(Binary(binary_bytes)).data, binary_bytes)
+ assert isinstance(dump_n_load(Binary(binary_bytes)), Binary)
+ assert type(dump_n_load('hello')) is str
+ assert type(dump_n_load(u'hello')) is str
+ assert_equal(dump_n_load(''), '')
+ assert_equal(dump_n_load(u''), '')
+ assert dump_n_load(None) is None
+
+ # Now we test our wrap and unwrap methods in combination with dumps, loads:
+ # All str should come back str (because they get wrapped in
+ # xmlrpclib.Binary(). All unicode should come back unicode because str
+ # explicity get decoded by rpc.xml_unwrap() if they weren't already
+ # decoded by xmlrpclib.loads().
+ assert_equal(round_trip(utf8_bytes), utf8_bytes)
+ assert_equal(round_trip(unicode_str), unicode_str)
+ assert_equal(round_trip(binary_bytes), binary_bytes)
+ assert type(round_trip('hello')) is str
+ assert type(round_trip(u'hello')) is unicode
+ assert_equal(round_trip(''), '')
+ assert_equal(round_trip(u''), u'')
+ assert round_trip(None) is None
+ compound = [utf8_bytes, None, binary_bytes, (None, unicode_str),
+ dict(utf8=utf8_bytes, chars=unicode_str, data=binary_bytes)
+ ]
+ assert round_trip(compound) == tuple(compound)
+
+
+def test_xml_wrap():
+ """
+ Test the `ipalib.rpc.xml_wrap` function.
+ """
+ f = rpc.xml_wrap
+ assert f([]) == tuple()
+ assert f({}) == dict()
+ b = f('hello')
+ assert isinstance(b, Binary)
+ assert b.data == 'hello'
+ u = f(u'hello')
+ assert type(u) is unicode
+ assert u == u'hello'
+ value = f([dict(one=False, two=u'hello'), None, 'hello'])
+
+
+def test_xml_unwrap():
+ """
+ Test the `ipalib.rpc.xml_unwrap` function.
+ """
+ f = rpc.xml_unwrap
+ assert f([]) == tuple()
+ assert f({}) == dict()
+ value = f(Binary(utf8_bytes))
+ assert type(value) is str
+ assert value == utf8_bytes
+ assert f(utf8_bytes) == unicode_str
+ assert f(unicode_str) == unicode_str
+ value = f([True, Binary('hello'), dict(one=1, two=utf8_bytes, three=None)])
+ assert value == (True, 'hello', dict(one=1, two=unicode_str, three=None))
+ assert type(value[1]) is str
+ assert type(value[2]['two']) is unicode
+
+
+def test_xml_dumps():
+ """
+ Test the `ipalib.rpc.xml_dumps` function.
+ """
+ f = rpc.xml_dumps
+ params = (binary_bytes, utf8_bytes, unicode_str, None)
+
+ # Test serializing an RPC request:
+ data = f(params, 'the_method')
+ (p, m) = loads(data)
+ assert_equal(m, u'the_method')
+ assert type(p) is tuple
+ assert rpc.xml_unwrap(p) == params
+
+ # Test serializing an RPC response:
+ data = f((params,), methodresponse=True)
+ (tup, m) = loads(data)
+ assert m is None
+ assert len(tup) == 1
+ assert type(tup) is tuple
+ assert rpc.xml_unwrap(tup[0]) == params
+
+ # Test serializing an RPC response containing a Fault:
+ fault = Fault(69, unicode_str)
+ data = f(fault, methodresponse=True)
+ e = raises(Fault, loads, data)
+ assert e.faultCode == 69
+ assert_equal(e.faultString, unicode_str)
+
+
+def test_xml_loads():
+ """
+ Test the `ipalib.rpc.xml_loads` function.
+ """
+ f = rpc.xml_loads
+ params = (binary_bytes, utf8_bytes, unicode_str, None)
+ wrapped = rpc.xml_wrap(params)
+
+ # Test un-serializing an RPC request:
+ data = dumps(wrapped, 'the_method', allow_none=True)
+ (p, m) = f(data)
+ assert_equal(m, u'the_method')
+ assert_equal(p, params)
+
+ # Test un-serializing an RPC response:
+ data = dumps((wrapped,), methodresponse=True, allow_none=True)
+ (tup, m) = f(data)
+ assert m is None
+ assert len(tup) == 1
+ assert type(tup) is tuple
+ assert_equal(tup[0], params)
+
+ # Test un-serializing an RPC response containing a Fault:
+ for error in (unicode_str, u'hello'):
+ fault = Fault(69, error)
+ data = dumps(fault, methodresponse=True, allow_none=True, encoding='UTF-8')
+ e = raises(Fault, f, data)
+ assert e.faultCode == 69
+ assert_equal(e.faultString, error)
+ assert type(e.faultString) is unicode
+
+
+class test_xmlclient(PluginTester):
+ """
+ Test the `ipalib.rpc.xmlclient` plugin.
+ """
+ _plugin = rpc.xmlclient
+
+ def test_forward(self):
+ """
+ Test the `ipalib.rpc.xmlclient.forward` method.
+ """
+ class user_add(Command):
+ pass
+
+ # Test that ValueError is raised when forwarding a command that is not
+ # in api.Command:
+ (o, api, home) = self.instance('Backend', in_server=False)
+ e = raises(ValueError, o.forward, 'user_add')
+ assert str(e) == '%s.forward(): %r not in api.Command' % (
+ 'xmlclient', 'user_add'
+ )
+
+ (o, api, home) = self.instance('Backend', user_add, in_server=False)
+ args = (binary_bytes, utf8_bytes, unicode_str)
+ kw = dict(one=binary_bytes, two=utf8_bytes, three=unicode_str)
+ params = [args, kw]
+ result = (unicode_str, binary_bytes, utf8_bytes)
+ conn = DummyClass(
+ (
+ 'user_add',
+ rpc.xml_wrap(params),
+ {},
+ rpc.xml_wrap(result),
+ ),
+ (
+ 'user_add',
+ rpc.xml_wrap(params),
+ {},
+ Fault(3007, u"'four' is required"), # RequirementError
+ ),
+ (
+ 'user_add',
+ rpc.xml_wrap(params),
+ {},
+ Fault(700, u'no such error'), # There is no error 700
+ ),
+
+ )
+ context.xmlclient = Connection(conn, lambda: None)
+
+ # Test with a successful return value:
+ assert o.forward('user_add', *args, **kw) == result
+
+ # Test with an errno the client knows:
+ e = raises(errors.RequirementError, o.forward, 'user_add', *args, **kw)
+ assert_equal(e.args[0], u"'four' is required")
+
+ # Test with an errno the client doesn't know
+ e = raises(errors.UnknownError, o.forward, 'user_add', *args, **kw)
+ assert_equal(e.code, 700)
+ assert_equal(e.error, u'no such error')
+
+ assert context.xmlclient.conn._calledall() is True
diff --git a/ipatests/test_ipalib/test_text.py b/ipatests/test_ipalib/test_text.py
new file mode 100644
index 000000000..2a5ff7a36
--- /dev/null
+++ b/ipatests/test_ipalib/test_text.py
@@ -0,0 +1,334 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty contextrmation
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.text` module.
+"""
+
+import os
+import shutil
+import tempfile
+import re
+import nose
+import locale
+from ipatests.util import raises, assert_equal
+from ipatests.i18n import create_po, po_file_iterate
+from ipalib.request import context
+from ipalib import request
+from ipalib import text
+from ipapython.ipautil import file_exists
+
+singular = '%(count)d goose makes a %(dish)s'
+plural = '%(count)d geese make a %(dish)s'
+
+
+def test_create_translation():
+ f = text.create_translation
+ key = ('foo', None)
+ t = f(key)
+ assert context.__dict__[key] is t
+
+
+class test_TestLang(object):
+ def setUp(self):
+ self.tmp_dir = None
+ self.saved_lang = None
+
+ self.lang = 'xh_ZA'
+ self.domain = 'ipa'
+
+ self.ipa_i18n_dir = os.path.join(os.path.dirname(__file__), '../../install/po')
+
+ self.pot_basename = '%s.pot' % self.domain
+ self.po_basename = '%s.po' % self.lang
+ self.mo_basename = '%s.mo' % self.domain
+
+ self.tmp_dir = tempfile.mkdtemp()
+ self.saved_lang = os.environ['LANG']
+
+ self.locale_dir = os.path.join(self.tmp_dir, 'test_locale')
+ self.msg_dir = os.path.join(self.locale_dir, self.lang, 'LC_MESSAGES')
+
+ if not os.path.exists(self.msg_dir):
+ os.makedirs(self.msg_dir)
+
+ self.pot_file = os.path.join(self.ipa_i18n_dir, self.pot_basename)
+ self.mo_file = os.path.join(self.msg_dir, self.mo_basename)
+ self.po_file = os.path.join(self.tmp_dir, self.po_basename)
+
+ result = create_po(self.pot_file, self.po_file, self.mo_file)
+ if result:
+ raise nose.SkipTest('Unable to create po file "%s" & mo file "%s" from pot file "%s"' %
+ (self.po_file, self.mo_file, self.pot_file))
+
+ if not file_exists(self.po_file):
+ raise nose.SkipTest('Test po file unavailable, run "make test" in install/po')
+
+ if not file_exists(self.mo_file):
+ raise nose.SkipTest('Test mo file unavailable, run "make test" in install/po')
+
+ self.po_file_iterate = po_file_iterate
+
+ def tearDown(self):
+ if self.saved_lang is not None:
+ os.environ['LANG'] = self.saved_lang
+
+ if self.tmp_dir is not None:
+ shutil.rmtree(self.tmp_dir)
+
+ def test_test_lang(self):
+ print "test_test_lang"
+ # The test installs the test message catalog under the xh_ZA
+ # (e.g. Zambia Xhosa) language by default. It would be nice to
+ # use a dummy language not associated with any real language,
+ # but the setlocale function demands the locale be a valid
+ # known locale, Zambia Xhosa is a reasonable choice :)
+
+ os.environ['LANG'] = self.lang
+
+ # Create a gettext translation object specifying our domain as
+ # 'ipa' and the locale_dir as 'test_locale' (i.e. where to
+ # look for the message catalog). Then use that translation
+ # object to obtain the translation functions.
+
+ def get_msgstr(msg):
+ gt = text.GettextFactory(localedir=self.locale_dir)(msg)
+ return unicode(gt)
+
+ def get_msgstr_plural(singular, plural, count):
+ ng = text.NGettextFactory(localedir=self.locale_dir)(singular, plural, count)
+ return ng(count)
+
+ result = self.po_file_iterate(self.po_file, get_msgstr, get_msgstr_plural)
+ assert result == 0
+
+class test_LazyText(object):
+
+ klass = text.LazyText
+
+ def test_init(self):
+ inst = self.klass('foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+ assert inst.key == ('foo', 'bar')
+
+
+class test_FixMe(object):
+ klass = text.FixMe
+
+ def test_init(self):
+ inst = self.klass('user.label')
+ assert inst.msg == 'user.label'
+ assert inst.domain is None
+ assert inst.localedir is None
+
+ def test_repr(self):
+ inst = self.klass('user.label')
+ assert repr(inst) == "FixMe('user.label')"
+
+ def test_unicode(self):
+ inst = self.klass('user.label')
+ assert unicode(inst) == u'<user.label>'
+ assert type(unicode(inst)) is unicode
+
+
+class test_Gettext(object):
+
+ klass = text.Gettext
+
+ def test_init(self):
+ inst = self.klass('what up?', 'foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+ assert inst.msg is 'what up?'
+ assert inst.args == ('what up?', 'foo', 'bar')
+
+ def test_repr(self):
+ inst = self.klass('foo', 'bar', 'baz')
+ assert repr(inst) == "Gettext('foo', domain='bar', localedir='baz')"
+
+ def test_unicode(self):
+ inst = self.klass('what up?', 'foo', 'bar')
+ assert unicode(inst) == u'what up?'
+
+ def test_mod(self):
+ inst = self.klass('hello %(adj)s nurse', 'foo', 'bar')
+ assert inst % dict(adj='naughty', stuff='junk') == 'hello naughty nurse'
+
+ def test_eq(self):
+ inst1 = self.klass('what up?', 'foo', 'bar')
+ inst2 = self.klass('what up?', 'foo', 'bar')
+ inst3 = self.klass('Hello world', 'foo', 'bar')
+ inst4 = self.klass('what up?', 'foo', 'baz')
+
+ assert (inst1 == inst1) is True
+ assert (inst1 == inst2) is True
+ assert (inst1 == inst3) is False
+ assert (inst1 == inst4) is False
+
+ # Test with args flipped
+ assert (inst2 == inst1) is True
+ assert (inst3 == inst1) is False
+ assert (inst4 == inst1) is False
+
+ def test_ne(self):
+ inst1 = self.klass('what up?', 'foo', 'bar')
+ inst2 = self.klass('what up?', 'foo', 'bar')
+ inst3 = self.klass('Hello world', 'foo', 'bar')
+ inst4 = self.klass('what up?', 'foo', 'baz')
+
+ assert (inst1 != inst2) is False
+ assert (inst1 != inst2) is False
+ assert (inst1 != inst3) is True
+ assert (inst1 != inst4) is True
+
+ # Test with args flipped
+ assert (inst2 != inst1) is False
+ assert (inst3 != inst1) is True
+ assert (inst4 != inst1) is True
+
+
+class test_NGettext(object):
+
+ klass = text.NGettext
+
+ def test_init(self):
+ inst = self.klass(singular, plural, 'foo', 'bar')
+ assert inst.singular is singular
+ assert inst.plural is plural
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+ assert inst.args == (singular, plural, 'foo', 'bar')
+
+ def test_repr(self):
+ inst = self.klass('sig', 'plu', 'foo', 'bar')
+ assert repr(inst) == \
+ "NGettext('sig', 'plu', domain='foo', localedir='bar')"
+
+ def test_call(self):
+ inst = self.klass(singular, plural, 'foo', 'bar')
+ assert inst(0) == plural
+ assert inst(1) == singular
+ assert inst(2) == plural
+ assert inst(3) == plural
+
+ def test_mod(self):
+ inst = self.klass(singular, plural, 'foo', 'bar')
+ assert inst % dict(count=0, dish='frown') == '0 geese make a frown'
+ assert inst % dict(count=1, dish='stew') == '1 goose makes a stew'
+ assert inst % dict(count=2, dish='pie') == '2 geese make a pie'
+
+ def test_eq(self):
+ inst1 = self.klass(singular, plural, 'foo', 'bar')
+ inst2 = self.klass(singular, plural, 'foo', 'bar')
+ inst3 = self.klass(singular, '%(count)d thingies', 'foo', 'bar')
+ inst4 = self.klass(singular, plural, 'foo', 'baz')
+
+ assert (inst1 == inst1) is True
+ assert (inst1 == inst2) is True
+ assert (inst1 == inst3) is False
+ assert (inst1 == inst4) is False
+
+ # Test with args flipped
+ assert (inst2 == inst1) is True
+ assert (inst3 == inst1) is False
+ assert (inst4 == inst1) is False
+
+ def test_ne(self):
+ inst1 = self.klass(singular, plural, 'foo', 'bar')
+ inst2 = self.klass(singular, plural, 'foo', 'bar')
+ inst3 = self.klass(singular, '%(count)d thingies', 'foo', 'bar')
+ inst4 = self.klass(singular, plural, 'foo', 'baz')
+
+ assert (inst1 != inst2) is False
+ assert (inst1 != inst2) is False
+ assert (inst1 != inst3) is True
+ assert (inst1 != inst4) is True
+
+ # Test with args flipped
+ assert (inst2 != inst1) is False
+ assert (inst3 != inst1) is True
+ assert (inst4 != inst1) is True
+
+
+class test_GettextFactory(object):
+
+ klass = text.GettextFactory
+
+ def test_init(self):
+ # Test with defaults:
+ inst = self.klass()
+ assert inst.domain == 'ipa'
+ assert inst.localedir is None
+
+ # Test with overrides:
+ inst = self.klass('foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+
+ def test_repr(self):
+ # Test with defaults:
+ inst = self.klass()
+ assert repr(inst) == "GettextFactory(domain='ipa', localedir=None)"
+
+ # Test with overrides:
+ inst = self.klass('foo', 'bar')
+ assert repr(inst) == "GettextFactory(domain='foo', localedir='bar')"
+
+ def test_call(self):
+ inst = self.klass('foo', 'bar')
+ g = inst('what up?')
+ assert type(g) is text.Gettext
+ assert g.msg is 'what up?'
+ assert g.domain == 'foo'
+ assert g.localedir == 'bar'
+
+
+class test_NGettextFactory(object):
+
+ klass = text.NGettextFactory
+
+ def test_init(self):
+ # Test with defaults:
+ inst = self.klass()
+ assert inst.domain == 'ipa'
+ assert inst.localedir is None
+
+ # Test with overrides:
+ inst = self.klass('foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+
+ def test_repr(self):
+ # Test with defaults:
+ inst = self.klass()
+ assert repr(inst) == "NGettextFactory(domain='ipa', localedir=None)"
+
+ # Test with overrides:
+ inst = self.klass('foo', 'bar')
+ assert repr(inst) == "NGettextFactory(domain='foo', localedir='bar')"
+
+ def test_call(self):
+ inst = self.klass('foo', 'bar')
+ ng = inst(singular, plural, 7)
+ assert type(ng) is text.NGettext
+ assert ng.singular is singular
+ assert ng.plural is plural
+ assert ng.domain == 'foo'
+ assert ng.localedir == 'bar'
diff --git a/ipatests/test_ipalib/test_util.py b/ipatests/test_ipalib/test_util.py
new file mode 100644
index 000000000..9d19dfb2c
--- /dev/null
+++ b/ipatests/test_ipalib/test_util.py
@@ -0,0 +1,26 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.util` module.
+"""
+
+from ipalib import util
+
+
diff --git a/ipatests/test_ipalib/test_x509.py b/ipatests/test_ipalib/test_x509.py
new file mode 100644
index 000000000..c7fafbbd9
--- /dev/null
+++ b/ipatests/test_ipalib/test_x509.py
@@ -0,0 +1,139 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2010 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.x509` module.
+"""
+
+import os
+from os import path
+import sys
+from ipatests.util import raises, setitem, delitem, ClassChecker
+from ipatests.util import getitem, setitem, delitem
+from ipatests.util import TempDir, TempHome
+from ipalib.constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR
+from ipalib.constants import NAME_REGEX, NAME_ERROR
+import base64
+from ipalib import x509
+from nss.error import NSPRError
+from ipapython.dn import DN
+
+# certutil -
+
+# certificate for CN=ipa.example.com,O=IPA
+goodcert = 'MIICAjCCAWugAwIBAgICBEUwDQYJKoZIhvcNAQEFBQAwKTEnMCUGA1UEAxMeSVBBIFRlc3QgQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4XDTEwMDYyNTEzMDA0MloXDTE1MDYyNTEzMDA0MlowKDEMMAoGA1UEChMDSVBBMRgwFgYDVQQDEw9pcGEuZXhhbXBsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAJcZ+H6+cQaN/BlzR8OYkVeJgaU5tCaV9FF1m7Ws/ftPtTJUaSL1ncp6603rjA4tH1aa/B8i8xdC46+ZbY2au8b9ryGcOsx2uaRpNLEQ2Fy//q1kQC8oM+iD8Nd6osF0a2wnugsgnJHPuJzhViaWxYgzk5DRdP81debokF3f3FX/AgMBAAGjOjA4MBEGCWCGSAGG+EIBAQQEAwIGQDATBgNVHSUEDDAKBggrBgEFBQcDATAOBgNVHQ8BAf8EBAMCBPAwDQYJKoZIhvcNAQEFBQADgYEALD6X9V9w381AzzQPcHsjIjiX3B/AF9RCGocKZUDXkdDhsD9NZ3PLPEf1AMjkraKG963HPB8scyiBbbSuSh6m7TCp0eDgRpo77zNuvd3U4Qpm0Qk+KEjtHQDjNNG6N4ZnCQPmjFPScElvc/GgW7XMbywJy2euF+3/Uip8cnPgSH4='
+
+# The base64-encoded string 'bad cert'
+badcert = 'YmFkIGNlcnQ='
+
+class test_x509(object):
+ """
+ Test `ipalib.x509`
+
+ I created the contents of this certificate with a self-signed CA with:
+ % certutil -R -s "CN=ipa.example.com,O=IPA" -d . -a -o example.csr
+ % ./ipa host-add ipa.example.com
+ % ./ipa cert-request --add --principal=test/ipa.example.com example.csr
+ """
+
+ def test_1_load_base64_cert(self):
+ """
+ Test loading a base64-encoded certificate.
+ """
+
+ # Load a good cert
+ cert = x509.load_certificate(goodcert)
+
+ # Load a good cert with headers
+ newcert = '-----BEGIN CERTIFICATE-----' + goodcert + '-----END CERTIFICATE-----'
+ cert = x509.load_certificate(newcert)
+
+ # Load a good cert with bad headers
+ newcert = '-----BEGIN CERTIFICATE-----' + goodcert
+ try:
+ cert = x509.load_certificate(newcert)
+ except TypeError:
+ pass
+
+ # Load a bad cert
+ try:
+ cert = x509.load_certificate(badcert)
+ except NSPRError:
+ pass
+
+ def test_1_load_der_cert(self):
+ """
+ Test loading a DER certificate.
+ """
+
+ der = base64.b64decode(goodcert)
+
+ # Load a good cert
+ cert = x509.load_certificate(der, x509.DER)
+
+ def test_2_get_subject(self):
+ """
+ Test retrieving the subject
+ """
+ subject = x509.get_subject(goodcert)
+ assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA'))
+
+ der = base64.b64decode(goodcert)
+ subject = x509.get_subject(der, x509.DER)
+ assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA'))
+
+ # We should be able to pass in a tuple/list of certs too
+ subject = x509.get_subject((goodcert))
+ assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA'))
+
+ subject = x509.get_subject([goodcert])
+ assert DN(str(subject)) == DN(('CN','ipa.example.com'),('O','IPA'))
+
+ def test_2_get_serial_number(self):
+ """
+ Test retrieving the serial number
+ """
+ serial = x509.get_serial_number(goodcert)
+ assert serial == 1093
+
+ der = base64.b64decode(goodcert)
+ serial = x509.get_serial_number(der, x509.DER)
+ assert serial == 1093
+
+ # We should be able to pass in a tuple/list of certs too
+ serial = x509.get_serial_number((goodcert))
+ assert serial == 1093
+
+ serial = x509.get_serial_number([goodcert])
+ assert serial == 1093
+
+ def test_3_cert_contents(self):
+ """
+ Test the contents of a certificate
+ """
+ # Verify certificate contents. This exercises python-nss more than
+ # anything but confirms our usage of it.
+
+ cert = x509.load_certificate(goodcert)
+
+ assert DN(str(cert.subject)) == DN(('CN','ipa.example.com'),('O','IPA'))
+ assert DN(str(cert.issuer)) == DN(('CN','IPA Test Certificate Authority'))
+ assert cert.serial_number == 1093
+ assert cert.valid_not_before_str == 'Fri Jun 25 13:00:42 2010 UTC'
+ assert cert.valid_not_after_str == 'Thu Jun 25 13:00:42 2015 UTC'