summaryrefslogtreecommitdiffstats
path: root/ipatests/test_ipalib/test_plugable.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipatests/test_ipalib/test_plugable.py')
-rw-r--r--ipatests/test_ipalib/test_plugable.py516
1 files changed, 516 insertions, 0 deletions
diff --git a/ipatests/test_ipalib/test_plugable.py b/ipatests/test_ipalib/test_plugable.py
new file mode 100644
index 000000000..c495e74dc
--- /dev/null
+++ b/ipatests/test_ipalib/test_plugable.py
@@ -0,0 +1,516 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib.plugable` module.
+"""
+
+import inspect
+from ipatests.util import raises, no_set, no_del, read_only
+from ipatests.util import getitem, setitem, delitem
+from ipatests.util import ClassChecker, create_test_api
+from ipalib import plugable, errors, text
+
+
+class test_SetProxy(ClassChecker):
+ """
+ Test the `ipalib.plugable.SetProxy` class.
+ """
+ _cls = plugable.SetProxy
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.SetProxy` class.
+ """
+ assert self.cls.__bases__ == (plugable.ReadOnly,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.SetProxy.__init__` method.
+ """
+ okay = (set, frozenset, dict)
+ fail = (list, tuple)
+ for t in okay:
+ self.cls(t())
+ raises(TypeError, self.cls, t)
+ for t in fail:
+ raises(TypeError, self.cls, t())
+ raises(TypeError, self.cls, t)
+
+ def test_SetProxy(self):
+ """
+ Test container emulation of `ipalib.plugable.SetProxy` class.
+ """
+ def get_key(i):
+ return 'key_%d' % i
+
+ cnt = 10
+ target = set()
+ proxy = self.cls(target)
+ for i in xrange(cnt):
+ key = get_key(i)
+
+ # Check initial state
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert key not in proxy
+ assert key not in target
+
+ # Add and test again
+ target.add(key)
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert key in proxy
+ assert key in target
+
+
+class test_DictProxy(ClassChecker):
+ """
+ Test the `ipalib.plugable.DictProxy` class.
+ """
+ _cls = plugable.DictProxy
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.DictProxy` class.
+ """
+ assert self.cls.__bases__ == (plugable.SetProxy,)
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.DictProxy.__init__` method.
+ """
+ self.cls(dict())
+ raises(TypeError, self.cls, dict)
+ fail = (set, frozenset, list, tuple)
+ for t in fail:
+ raises(TypeError, self.cls, t())
+ raises(TypeError, self.cls, t)
+
+ def test_DictProxy(self):
+ """
+ Test container emulation of `ipalib.plugable.DictProxy` class.
+ """
+ def get_kv(i):
+ return (
+ 'key_%d' % i,
+ 'val_%d' % i,
+ )
+ cnt = 10
+ target = dict()
+ proxy = self.cls(target)
+ for i in xrange(cnt):
+ (key, val) = get_kv(i)
+
+ # Check initial state
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert list(proxy()) == [target[k] for k in sorted(target)]
+ assert key not in proxy
+ raises(KeyError, getitem, proxy, key)
+
+ # Add and test again
+ target[key] = val
+ assert len(proxy) == len(target)
+ assert list(proxy) == sorted(target)
+ assert list(proxy()) == [target[k] for k in sorted(target)]
+
+ # Verify TypeError is raised trying to set/del via proxy
+ raises(TypeError, setitem, proxy, key, val)
+ raises(TypeError, delitem, proxy, key)
+
+
+class test_MagicDict(ClassChecker):
+ """
+ Test the `ipalib.plugable.MagicDict` class.
+ """
+ _cls = plugable.MagicDict
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.MagicDict` class.
+ """
+ assert self.cls.__bases__ == (plugable.DictProxy,)
+ for non_dict in ('hello', 69, object):
+ raises(TypeError, self.cls, non_dict)
+
+ def test_MagicDict(self):
+ """
+ Test container emulation of `ipalib.plugable.MagicDict` class.
+ """
+ cnt = 10
+ keys = []
+ d = dict()
+ dictproxy = self.cls(d)
+ for i in xrange(cnt):
+ key = 'key_%d' % i
+ val = 'val_%d' % i
+ keys.append(key)
+
+ # Test thet key does not yet exist
+ assert len(dictproxy) == i
+ assert key not in dictproxy
+ assert not hasattr(dictproxy, key)
+ raises(KeyError, getitem, dictproxy, key)
+ raises(AttributeError, getattr, dictproxy, key)
+
+ # Test that items/attributes cannot be set on dictproxy:
+ raises(TypeError, setitem, dictproxy, key, val)
+ raises(AttributeError, setattr, dictproxy, key, val)
+
+ # Test that additions in d are reflected in dictproxy:
+ d[key] = val
+ assert len(dictproxy) == i + 1
+ assert key in dictproxy
+ assert hasattr(dictproxy, key)
+ assert dictproxy[key] is val
+ assert read_only(dictproxy, key) is val
+
+ # Test __iter__
+ assert list(dictproxy) == keys
+
+ for key in keys:
+ # Test that items cannot be deleted through dictproxy:
+ raises(TypeError, delitem, dictproxy, key)
+ raises(AttributeError, delattr, dictproxy, key)
+
+ # Test that deletions in d are reflected in dictproxy
+ del d[key]
+ assert len(dictproxy) == len(d)
+ assert key not in dictproxy
+ raises(KeyError, getitem, dictproxy, key)
+ raises(AttributeError, getattr, dictproxy, key)
+
+
+class test_Plugin(ClassChecker):
+ """
+ Test the `ipalib.plugable.Plugin` class.
+ """
+ _cls = plugable.Plugin
+
+ def test_class(self):
+ """
+ Test the `ipalib.plugable.Plugin` class.
+ """
+ assert self.cls.__bases__ == (plugable.ReadOnly,)
+ assert type(self.cls.api) is property
+
+ def test_init(self):
+ """
+ Test the `ipalib.plugable.Plugin.__init__` method.
+ """
+ o = self.cls()
+ assert o.name == 'Plugin'
+ assert o.module == 'ipalib.plugable'
+ assert o.fullname == 'ipalib.plugable.Plugin'
+ assert isinstance(o.doc, text.Gettext)
+ class some_subclass(self.cls):
+ """
+ Do sub-classy things.
+
+ Although it doesn't know how to comport itself and is not for mixed
+ company, this class *is* useful as we all need a little sub-class
+ now and then.
+
+ One more paragraph.
+ """
+ o = some_subclass()
+ assert o.name == 'some_subclass'
+ assert o.module == __name__
+ assert o.fullname == '%s.some_subclass' % __name__
+ assert o.summary == 'Do sub-classy things.'
+ assert isinstance(o.doc, text.Gettext)
+ class another_subclass(self.cls):
+ pass
+ o = another_subclass()
+ assert o.summary == '<%s>' % o.fullname
+
+ # Test that Plugin makes sure the subclass hasn't defined attributes
+ # whose names conflict with the logger methods set in Plugin.__init__():
+ class check(self.cls):
+ info = 'whatever'
+ e = raises(StandardError, check)
+ assert str(e) == \
+ "info is already bound to ipatests.test_ipalib.test_plugable.check()"
+
+ def test_set_api(self):
+ """
+ Test the `ipalib.plugable.Plugin.set_api` method.
+ """
+ api = 'the api instance'
+ o = self.cls()
+ assert o.api is None
+ e = raises(AssertionError, o.set_api, None)
+ assert str(e) == 'set_api() argument cannot be None'
+ o.set_api(api)
+ assert o.api is api
+ e = raises(AssertionError, o.set_api, api)
+ assert str(e) == 'set_api() can only be called once'
+
+ def test_finalize(self):
+ """
+ Test the `ipalib.plugable.Plugin.finalize` method.
+ """
+ o = self.cls()
+ assert not o.__islocked__()
+ o.finalize()
+ assert o.__islocked__()
+
+ def test_call(self):
+ """
+ Test the `ipalib.plugable.Plugin.call` method.
+ """
+ o = self.cls()
+ o.call('/bin/true') is None
+ e = raises(errors.SubprocessError, o.call, '/bin/false')
+ assert e.returncode == 1
+ assert e.argv == ('/bin/false',)
+
+
+def test_Registrar():
+ """
+ Test the `ipalib.plugable.Registrar` class
+ """
+ class Base1(object):
+ pass
+ class Base2(object):
+ pass
+ class Base3(object):
+ pass
+ class plugin1(Base1):
+ pass
+ class plugin2(Base2):
+ pass
+ class plugin3(Base3):
+ pass
+
+ # Test creation of Registrar:
+ r = plugable.Registrar(Base1, Base2)
+
+ # Test __iter__:
+ assert list(r) == ['Base1', 'Base2']
+
+ # Test __hasitem__, __getitem__:
+ for base in [Base1, Base2]:
+ name = base.__name__
+ assert name in r
+ assert r[name] is base
+ magic = getattr(r, name)
+ assert type(magic) is plugable.MagicDict
+ assert len(magic) == 0
+
+ # Check that TypeError is raised trying to register something that isn't
+ # a class:
+ p = plugin1()
+ e = raises(TypeError, r, p)
+ assert str(e) == 'plugin must be a class; got %r' % p
+
+ # Check that SubclassError is raised trying to register a class that is
+ # not a subclass of an allowed base:
+ e = raises(errors.PluginSubclassError, r, plugin3)
+ assert e.plugin is plugin3
+
+ # Check that registration works
+ r(plugin1)
+ assert len(r.Base1) == 1
+ assert r.Base1['plugin1'] is plugin1
+ assert r.Base1.plugin1 is plugin1
+
+ # Check that DuplicateError is raised trying to register exact class
+ # again:
+ e = raises(errors.PluginDuplicateError, r, plugin1)
+ assert e.plugin is plugin1
+
+ # Check that OverrideError is raised trying to register class with same
+ # name and same base:
+ orig1 = plugin1
+ class base1_extended(Base1):
+ pass
+ class plugin1(base1_extended):
+ pass
+ e = raises(errors.PluginOverrideError, r, plugin1)
+ assert e.base == 'Base1'
+ assert e.name == 'plugin1'
+ assert e.plugin is plugin1
+
+ # Check that overriding works
+ r(plugin1, override=True)
+ assert len(r.Base1) == 1
+ assert r.Base1.plugin1 is plugin1
+ assert r.Base1.plugin1 is not orig1
+
+ # Check that MissingOverrideError is raised trying to override a name
+ # not yet registerd:
+ e = raises(errors.PluginMissingOverrideError, r, plugin2, override=True)
+ assert e.base == 'Base2'
+ assert e.name == 'plugin2'
+ assert e.plugin is plugin2
+
+ # Test that another plugin can be registered:
+ assert len(r.Base2) == 0
+ r(plugin2)
+ assert len(r.Base2) == 1
+ assert r.Base2.plugin2 is plugin2
+
+ # Setup to test more registration:
+ class plugin1a(Base1):
+ pass
+ r(plugin1a)
+
+ class plugin1b(Base1):
+ pass
+ r(plugin1b)
+
+ class plugin2a(Base2):
+ pass
+ r(plugin2a)
+
+ class plugin2b(Base2):
+ pass
+ r(plugin2b)
+
+ # Again test __hasitem__, __getitem__:
+ for base in [Base1, Base2]:
+ name = base.__name__
+ assert name in r
+ assert r[name] is base
+ magic = getattr(r, name)
+ assert len(magic) == 3
+ for key in magic:
+ klass = magic[key]
+ assert getattr(magic, key) is klass
+ assert issubclass(klass, base)
+
+
+class test_API(ClassChecker):
+ """
+ Test the `ipalib.plugable.API` class.
+ """
+
+ _cls = plugable.API
+
+ def test_API(self):
+ """
+ Test the `ipalib.plugable.API` class.
+ """
+ assert issubclass(plugable.API, plugable.ReadOnly)
+
+ # Setup the test bases, create the API:
+ class base0(plugable.Plugin):
+ def method(self, n):
+ return n
+
+ class base1(plugable.Plugin):
+ def method(self, n):
+ return n + 1
+
+ api = plugable.API(base0, base1)
+ api.env.mode = 'unit_test'
+ api.env.in_tree = True
+ r = api.register
+ assert isinstance(r, plugable.Registrar)
+ assert read_only(api, 'register') is r
+
+ class base0_plugin0(base0):
+ pass
+ r(base0_plugin0)
+
+ class base0_plugin1(base0):
+ pass
+ r(base0_plugin1)
+
+ class base0_plugin2(base0):
+ pass
+ r(base0_plugin2)
+
+ class base1_plugin0(base1):
+ pass
+ r(base1_plugin0)
+
+ class base1_plugin1(base1):
+ pass
+ r(base1_plugin1)
+
+ class base1_plugin2(base1):
+ pass
+ r(base1_plugin2)
+
+ # Test API instance:
+ assert api.isdone('bootstrap') is False
+ assert api.isdone('finalize') is False
+ api.finalize()
+ assert api.isdone('bootstrap') is True
+ assert api.isdone('finalize') is True
+
+ def get_base_name(b):
+ return 'base%d' % b
+
+
+ def get_plugin_name(b, p):
+ return 'base%d_plugin%d' % (b, p)
+
+ for b in xrange(2):
+ base_name = get_base_name(b)
+ base = locals()[base_name]
+ ns = getattr(api, base_name)
+ assert isinstance(ns, plugable.NameSpace)
+ assert read_only(api, base_name) is ns
+ assert len(ns) == 3
+ for p in xrange(3):
+ plugin_name = get_plugin_name(b, p)
+ plugin = locals()[plugin_name]
+ inst = ns[plugin_name]
+ assert isinstance(inst, base)
+ assert isinstance(inst, plugin)
+ assert inst.name == plugin_name
+ assert read_only(ns, plugin_name) is inst
+ assert inst.method(7) == 7 + b
+
+ # Test that calling finilize again raises AssertionError:
+ e = raises(StandardError, api.finalize)
+ assert str(e) == 'API.finalize() already called', str(e)
+
+ def test_bootstrap(self):
+ """
+ Test the `ipalib.plugable.API.bootstrap` method.
+ """
+ (o, home) = create_test_api()
+ assert o.env._isdone('_bootstrap') is False
+ assert o.env._isdone('_finalize_core') is False
+ assert o.isdone('bootstrap') is False
+ o.bootstrap(my_test_override='Hello, world!')
+ assert o.isdone('bootstrap') is True
+ assert o.env._isdone('_bootstrap') is True
+ assert o.env._isdone('_finalize_core') is True
+ assert o.env.my_test_override == 'Hello, world!'
+ e = raises(StandardError, o.bootstrap)
+ assert str(e) == 'API.bootstrap() already called'
+
+ def test_load_plugins(self):
+ """
+ Test the `ipalib.plugable.API.load_plugins` method.
+ """
+ (o, home) = create_test_api()
+ assert o.isdone('bootstrap') is False
+ assert o.isdone('load_plugins') is False
+ o.load_plugins()
+ assert o.isdone('bootstrap') is True
+ assert o.isdone('load_plugins') is True
+ e = raises(StandardError, o.load_plugins)
+ assert str(e) == 'API.load_plugins() already called'