summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason Gerard DeRose <jderose@redhat.com>2009-12-09 09:09:53 -0700
committerJason Gerard DeRose <jderose@redhat.com>2009-12-10 08:29:15 -0700
commitb6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7 (patch)
tree7e5329a51af169ce34a7d275a1bbd63c1e31c026
parentd08b8858ddc3bf6265f6ea8acae6661b9fff5112 (diff)
downloadfreeipa-b6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7.tar.gz
freeipa-b6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7.tar.xz
freeipa-b6e4972e7f6aa08e0392a2cf441b60ab0e7d88b7.zip
Take 2: Extensible return values and validation; steps toward a single output_for_cli(); enable more webUI stuff
-rw-r--r--ipalib/__init__.py60
-rw-r--r--ipalib/cli.py58
-rw-r--r--ipalib/crud.py15
-rw-r--r--ipalib/frontend.py145
-rw-r--r--ipalib/output.py104
-rw-r--r--ipalib/parameters.py22
-rw-r--r--ipalib/plugins/baseldap.py89
-rw-r--r--ipalib/plugins/group.py22
-rw-r--r--ipalib/plugins/hbac.py14
-rw-r--r--ipalib/plugins/host.py29
-rw-r--r--ipalib/plugins/misc.py81
-rw-r--r--ipalib/plugins/passwd.py3
-rw-r--r--ipalib/plugins/pwpolicy.py67
-rw-r--r--ipalib/plugins/service.py3
-rw-r--r--ipalib/plugins/user.py79
-rw-r--r--ipalib/text.py79
-rw-r--r--ipaserver/plugins/join.py3
-rw-r--r--ipaserver/rpcserver.py5
-rw-r--r--ipawebui/__init__.py2
-rw-r--r--ipawebui/engine.py56
-rw-r--r--ipawebui/widgets.py186
-rwxr-xr-xmake-test96
-rw-r--r--tests/test_ipalib/test_backend.py14
-rw-r--r--tests/test_ipalib/test_crud.py131
-rw-r--r--tests/test_ipalib/test_frontend.py206
-rw-r--r--tests/test_ipalib/test_output.py88
-rw-r--r--tests/test_ipalib/test_parameters.py58
-rw-r--r--tests/test_ipalib/test_text.py120
-rw-r--r--tests/test_util.py131
-rw-r--r--tests/test_xmlrpc/test_automount_plugin.py63
-rw-r--r--tests/test_xmlrpc/test_cert.py3
-rw-r--r--tests/test_xmlrpc/test_group_plugin.py542
-rw-r--r--tests/test_xmlrpc/test_hbac_plugin.py173
-rw-r--r--tests/test_xmlrpc/test_host_plugin.py32
-rw-r--r--tests/test_xmlrpc/test_hostgroup_plugin.py61
-rw-r--r--tests/test_xmlrpc/test_netgroup_plugin.py179
-rw-r--r--tests/test_xmlrpc/test_passwd_plugin.py20
-rw-r--r--tests/test_xmlrpc/test_pwpolicy.py89
-rw-r--r--tests/test_xmlrpc/test_rolegroup_plugin.py65
-rw-r--r--tests/test_xmlrpc/test_service_plugin.py35
-rw-r--r--tests/test_xmlrpc/test_taskgroup_plugin.py88
-rw-r--r--tests/test_xmlrpc/test_user_plugin.py386
-rw-r--r--tests/test_xmlrpc/xmlrpc_test.py213
-rw-r--r--tests/util.py82
44 files changed, 2962 insertions, 1035 deletions
diff --git a/ipalib/__init__.py b/ipalib/__init__.py
index c1ad760b6..83956e16d 100644
--- a/ipalib/__init__.py
+++ b/ipalib/__init__.py
@@ -135,13 +135,13 @@ implement a ``run()`` method, like this:
... """My example plugin with run()."""
...
... def run(self):
-... return 'My run() method was called!'
+... return dict(result='My run() method was called!')
...
>>> api = create_api()
>>> api.register(my_command)
>>> api.finalize()
>>> api.Command.my_command() # Call your command
-'My run() method was called!'
+{'result': 'My run() method was called!'}
When `frontend.Command.__call__()` is called, it first validates any arguments
and options your command plugin takes (if any) and then calls its ``run()``
@@ -175,10 +175,14 @@ For example, say you have a command plugin like this:
... """Forwarding vs. execution."""
...
... def forward(self):
-... return 'in_server=%r; forward() was called.' % self.env.in_server
+... return dict(
+... result='forward(): in_server=%r' % self.env.in_server
+... )
...
... def execute(self):
-... return 'in_server=%r; execute() was called.' % self.env.in_server
+... return dict(
+... result='execute(): in_server=%r' % self.env.in_server
+... )
...
If ``my_command`` is loaded in a *client* context, ``forward()`` will be
@@ -189,7 +193,7 @@ called:
>>> api.register(my_command)
>>> api.finalize()
>>> api.Command.my_command() # Call your command plugin
-'in_server=False; forward() was called.'
+{'result': 'forward(): in_server=False'}
On the other hand, if ``my_command`` is loaded in a *server* context,
``execute()`` will be called:
@@ -199,7 +203,7 @@ On the other hand, if ``my_command`` is loaded in a *server* context,
>>> api.register(my_command)
>>> api.finalize()
>>> api.Command.my_command() # Call your command plugin
-'in_server=True; execute() was called.'
+{'result': 'execute(): in_server=True'}
Normally there should be no reason to override `frontend.Command.forward()`,
but, as above, it can be done for demonstration purposes. In contrast, there
@@ -312,7 +316,7 @@ Second, we have our frontend plugin, the command:
...
... def execute(self):
... """Implemented against Backend.my_backend"""
-... return self.Backend.my_backend.do_stuff()
+... return dict(result=self.Backend.my_backend.do_stuff())
...
>>> api.register(my_command)
@@ -321,7 +325,7 @@ Lastly, we call ``api.finalize()`` and see what happens when we call
>>> api.finalize()
>>> api.Command.my_command()
-'my_backend.do_stuff() indeed did do stuff!'
+{'result': 'my_backend.do_stuff() indeed did do stuff!'}
When not in a server context, ``my_command.execute()`` never gets called, so
it never tries to access the non-existent backend plugin at
@@ -335,10 +339,10 @@ example:
...
... def execute(self):
... """Same as above."""
-... return self.Backend.my_backend.do_stuff()
+... return dict(result=self.Backend.my_backend.do_stuff())
...
... def forward(self):
-... return 'Just my_command.forward() getting called here.'
+... return dict(result='Just my_command.forward() getting called here.')
...
>>> api.register(my_command)
>>> api.finalize()
@@ -351,7 +355,7 @@ False
And yet we can call ``my_command()``:
>>> api.Command.my_command()
-'Just my_command.forward() getting called here.'
+{'result': 'Just my_command.forward() getting called here.'}
----------------------------------------
@@ -369,24 +373,25 @@ several other commands in a single operation. For example:
...
... def execute(self):
... """Calls command_1(), command_2()"""
-... return '%s; %s.' % (
-... self.Command.command_1(),
-... self.Command.command_2()
+... msg = '%s; %s.' % (
+... self.Command.command_1()['result'],
+... self.Command.command_2()['result'],
... )
+... return dict(result=msg)
>>> class command_1(Command):
... def execute(self):
-... return 'command_1.execute() called'
+... return dict(result='command_1.execute() called')
...
>>> class command_2(Command):
... def execute(self):
-... return 'command_2.execute() called'
+... return dict(result='command_2.execute() called')
...
>>> api.register(meta_command)
>>> api.register(command_1)
>>> api.register(command_2)
>>> api.finalize()
>>> api.Command.meta_command()
-'command_1.execute() called; command_2.execute() called.'
+{'result': 'command_1.execute() called; command_2.execute() called.'}
Because this is quite useful, we are going to revise our golden rule somewhat:
@@ -412,16 +417,18 @@ For example:
... takes_options = (Str('stuff', default=u'documentation'))
...
... def execute(self, programmer, **kw):
-... return '%s, go write more %s!' % (programmer, kw['stuff'])
+... return dict(
+... result='%s, go write more %s!' % (programmer, kw['stuff'])
+... )
...
>>> api = create_api()
>>> api.env.in_server = True
>>> api.register(nudge)
>>> api.finalize()
>>> api.Command.nudge(u'Jason')
-u'Jason, go write more documentation!'
+{'result': u'Jason, go write more documentation!'}
>>> api.Command.nudge(u'Jason', stuff=u'unit tests')
-u'Jason, go write more unit tests!'
+{'result': u'Jason, go write more unit tests!'}
The ``args`` and ``options`` attributes are `plugable.NameSpace` instances
containing a command's arguments and options, respectively, as you can see:
@@ -450,7 +457,7 @@ When calling a command, its positional arguments can also be provided as
keyword arguments, and in any order. For example:
>>> api.Command.nudge(stuff=u'lines of code', programmer=u'Jason')
-u'Jason, go write more lines of code!'
+{'result': u'Jason, go write more lines of code!'}
When a command plugin is called, the values supplied for its parameters are
put through a sophisticated processing pipeline that includes steps for
@@ -582,12 +589,14 @@ For example, say we setup a command like this:
... city='Berlin',
... )
... if key in items:
-... return items[key]
-... return [
+... return dict(result=items[key])
+... items = [
... (k, items[k]) for k in sorted(items, reverse=options['reverse'])
... ]
+... return dict(result=items)
...
... def output_for_cli(self, textui, result, key, **options):
+... result = result['result']
... if key is not None:
... textui.print_plain('%s = %r' % (key, result))
... else:
@@ -738,14 +747,14 @@ For example:
... """Print message of the day."""
...
... def execute(self):
-... return self.env.message
+... return dict(result=self.env.message)
...
>>> api = create_api()
>>> api.bootstrap(in_server=True, message='Hello, world!')
>>> api.register(motd)
>>> api.finalize()
>>> api.Command.motd()
-'Hello, world!'
+{'result': 'Hello, world!'}
Also see the `plugable.API.bootstrap_with_global_options()` method.
@@ -872,6 +881,7 @@ from crud import Create, Retrieve, Update, Delete, Search
from parameters import DefaultFrom, Bool, Flag, Int, Float, Bytes, Str, Password,List
from parameters import BytesEnum, StrEnum, AccessTime, File
from errors import SkipPluginModule
+from text import _, gettext, ngettext
# We can't import the python uuid since it includes ctypes which makes
# httpd throw up when run in in mod_python due to SELinux issues
diff --git a/ipalib/cli.py b/ipalib/cli.py
index 64ace0359..d0feaaea3 100644
--- a/ipalib/cli.py
+++ b/ipalib/cli.py
@@ -312,6 +312,37 @@ class textui(backend.Backend):
for attr in sorted(entry):
print_attr(attr)
+ def print_entries(self, entries, params=None, format='%s: %s', indent=1):
+ assert isinstance(entries, (list, tuple))
+ first = True
+ for entry in entries:
+ if not first:
+ print ''
+ first = False
+ self.print_entry(entry, params, format, indent)
+
+ def print_entry(self, entry, params=None, format='%s: %s', indent=1):
+ """
+ """
+ if isinstance(entry, (list, tuple)):
+ entry = dict(entry)
+ assert isinstance(entry, dict)
+ if params:
+ order = list(params)
+ labels = dict((p.name, p.label) for p in params())
+ else:
+ order = sorted(entry)
+ labels = dict((k, k) for k in order)
+ for key in order:
+ if key not in entry:
+ continue
+ label = labels[key]
+ value = entry[key]
+ if isinstance(value, (list, tuple)):
+ value = ', '.join(value)
+ self.print_indented(format % (label, value), indent)
+
+
def print_dashed(self, string, above=True, below=True, indent=0, dash='-'):
"""
Print a string with a dashed line above and/or below.
@@ -383,6 +414,23 @@ class textui(backend.Backend):
"""
self.print_dashed('%s:' % to_cli(name))
+ def print_header(self, msg, output):
+ self.print_dashed(msg % output)
+
+ def print_summary(self, msg):
+ """
+ Print a summary at the end of a comand's output.
+
+ For example:
+
+ >>> ui = textui()
+ >>> ui.print_summary('Added user "jdoe"')
+ -----------------
+ Added user "jdoe"
+ -----------------
+ """
+ self.print_dashed(msg)
+
def print_count(self, count, singular, plural=None):
"""
Print a summary count.
@@ -408,7 +456,7 @@ class textui(backend.Backend):
``len(count)`` is used as the count.
"""
if type(count) is not int:
- assert type(count) in (list, tuple)
+ assert type(count) in (list, tuple, dict)
count = len(count)
self.print_dashed(
self.choose_number(count, singular, plural)
@@ -515,6 +563,8 @@ class help(frontend.Command):
takes_args = (Bytes('command?'),)
+ has_output = tuple()
+
_PLUGIN_BASE_MODULE = 'ipalib.plugins'
def _get_command_module(self, module):
@@ -614,6 +664,8 @@ class help(frontend.Command):
class console(frontend.Command):
"""Start the IPA interactive Python console."""
+ has_output = tuple()
+
def run(self):
code.interact(
'(Custom IPA interactive Python console)',
@@ -814,8 +866,8 @@ class cli(backend.Executioner):
error = None
while True:
if error is not None:
- print '>>> %s: %s' % (param.cli_name, error)
- raw = self.Backend.textui.prompt(param.cli_name, default)
+ print '>>> %s: %s' % (param.label, error)
+ raw = self.Backend.textui.prompt(param.label, default)
try:
value = param(raw, **kw)
if value is not None:
diff --git a/ipalib/crud.py b/ipalib/crud.py
index 6d1a87f32..173fefc72 100644
--- a/ipalib/crud.py
+++ b/ipalib/crud.py
@@ -20,7 +20,7 @@
Base classes for standard CRUD operations.
"""
-import backend, frontend, parameters
+import backend, frontend, parameters, output
class Create(frontend.Method):
@@ -28,6 +28,8 @@ class Create(frontend.Method):
Create a new entry.
"""
+ has_output = output.standard_entry
+
def get_args(self):
if self.obj.primary_key:
yield self.obj.primary_key.clone(attribute=True)
@@ -53,18 +55,21 @@ class PKQuery(frontend.Method):
yield self.obj.primary_key.clone(attribute=True, query=True)
-
class Retrieve(PKQuery):
"""
Retrieve an entry by its primary key.
"""
+ has_output = output.standard_entry
+
class Update(PKQuery):
"""
Update one or more attributes on an entry.
"""
+ has_output = output.standard_entry
+
def get_options(self):
if self.extra_options_first:
for option in super(Update, self).get_options():
@@ -75,17 +80,22 @@ class Update(PKQuery):
for option in super(Update, self).get_options():
yield option
+
class Delete(PKQuery):
"""
Delete one or more entries.
"""
+ has_output = output.standard_delete
+
class Search(frontend.Method):
"""
Retrieve all entries that match a given search criteria.
"""
+ has_output = output.standard_list_of_entries
+
def get_args(self):
yield parameters.Str('criteria?')
@@ -176,4 +186,3 @@ class CrudBackend(backend.Connectible):
this method should return an empty iterable.
"""
raise NotImplementedError('%s.search()' % self.name)
-
diff --git a/ipalib/frontend.py b/ipalib/frontend.py
index e257a0a25..2c1168a5b 100644
--- a/ipalib/frontend.py
+++ b/ipalib/frontend.py
@@ -27,8 +27,11 @@ from base import lock, check_name, NameSpace
from plugable import Plugin
from parameters import create_param, parse_param_spec, Param, Str, Flag, Password
from util import make_repr
+from output import Output
+from text import _, ngettext
from errors import ZeroArgumentError, MaxArgumentError, OverlapError, RequiresRoot
+from errors import InvocationError
from constants import TYPE_ERROR
@@ -43,6 +46,7 @@ def is_rule(obj):
return callable(obj) and getattr(obj, RULE_FLAG, False) is True
+
class HasParam(Plugin):
"""
Base class for plugins that have `Param` `NameSpace` attributes.
@@ -198,7 +202,7 @@ class HasParam(Plugin):
that consider arbitrary ``api.env`` values.
"""
- def _get_param_iterable(self, name):
+ def _get_param_iterable(self, name, verb='takes'):
"""
Return an iterable of params defined by the attribute named ``name``.
@@ -257,19 +261,19 @@ class HasParam(Plugin):
Also see `HasParam._filter_param_by_context()`.
"""
- takes_name = 'takes_' + name
- takes = getattr(self, takes_name, None)
- if type(takes) is tuple:
- return takes
- if isinstance(takes, (Param, str)):
- return (takes,)
- if callable(takes):
- return takes()
- if takes is None:
+ src_name = verb + '_' + name
+ src = getattr(self, src_name, None)
+ if type(src) is tuple:
+ return src
+ if isinstance(src, (Param, str)):
+ return (src,)
+ if callable(src):
+ return src()
+ if src is None:
return tuple()
raise TypeError(
'%s.%s must be a tuple, callable, or spec; got %r' % (
- self.name, takes_name, takes
+ self.name, src_name, src
)
)
@@ -377,6 +381,15 @@ class Command(HasParam):
output_for_cli = None
obj = None
+ use_output_validation = True
+ output = None
+ has_output = ('result',)
+ output_params = None
+ has_output_params = tuple()
+
+ msg_summary = None
+ msg_truncated = _('Results are truncated, try a more specific search')
+
def __call__(self, *args, **options):
"""
Perform validation and then execute the command.
@@ -396,9 +409,32 @@ class Command(HasParam):
)
self.validate(**params)
(args, options) = self.params_2_args_options(**params)
- result = self.run(*args, **options)
- self.debug('result from %s(): %r', self.name, result)
- return result
+ ret = self.run(*args, **options)
+ if (
+ isinstance(ret, dict)
+ and 'summary' in self.output
+ and 'summary' not in ret
+ ):
+ if self.msg_summary:
+ ret['summary'] = self.msg_summary % ret
+ else:
+ ret['summary'] = None
+ if self.use_output_validation and (self.output or ret is not None):
+ self.validate_output(ret)
+ return ret
+
+ def soft_validate(self, values):
+ errors = dict()
+ for p in self.params():
+ try:
+ value = values.get(p.name)
+ values[p.name] = p(value, **values)
+ except InvocationError, e:
+ errors[p.name] = str(e)
+ return dict(
+ values=values,
+ errors=errors,
+ )
def _repr_iter(self, **params):
"""
@@ -511,10 +547,10 @@ class Command(HasParam):
yield (name, kw[name])
adddict = {}
- if 'setattr' in kw:
+ if kw.get('setattr'):
adddict = self.__convert_2_dict(kw['setattr'], append=False)
- if 'addattr' in kw:
+ if kw.get('addattr'):
adddict.update(self.__convert_2_dict(kw['addattr']))
for name in adddict:
@@ -691,8 +727,24 @@ class Command(HasParam):
sorted(tuple(self.args()) + tuple(self.options()), key=get_key),
sort=False
)
+ self.output = NameSpace(self._iter_output(), sort=False)
+ self._create_param_namespace('output_params')
super(Command, self).finalize()
+ def _iter_output(self):
+ if type(self.has_output) is not tuple:
+ raise TypeError('%s.has_output: need a %r; got a %r: %r' % (
+ self.name, tuple, type(self.has_output), self.has_output)
+ )
+ for (i, o) in enumerate(self.has_output):
+ if isinstance(o, str):
+ o = Output(o)
+ if not isinstance(o, Output):
+ raise TypeError('%s.has_output[%d]: need a %r; got a %r: %r' % (
+ self.name, i, (str, Output), type(o), o)
+ )
+ yield o
+
def get_args(self):
"""
Iterate through parameters for ``Command.args`` namespace.
@@ -741,6 +793,55 @@ class Command(HasParam):
for option in self._get_param_iterable('options'):
yield option
+ def validate_output(self, output):
+ """
+ Validate the return value to make sure it meets the interface contract.
+ """
+ nice = '%s.validate_output()' % self.name
+ if not isinstance(output, dict):
+ raise TypeError('%s: need a %r; got a %r: %r' % (
+ nice, dict, type(output), output)
+ )
+ if len(output) < len(self.output):
+ missing = sorted(set(self.output).difference(output))
+ raise ValueError('%s: missing keys %r in %r' % (
+ nice, missing, output)
+ )
+ if len(output) > len(self.output):
+ extra = sorted(set(output).difference(self.output))
+ raise ValueError('%s: unexpected keys %r in %r' % (
+ nice, extra, output)
+ )
+ for o in self.output():
+ value = output[o.name]
+ if not (o.type is None or isinstance(value, o.type)):
+ raise TypeError('%s:\n output[%r]: need %r; got %r: %r' % (
+ nice, o.name, o.type, type(value), value)
+ )
+ if callable(o.validate):
+ o.validate(self, value)
+
+ def get_output_params(self):
+ for param in self._get_param_iterable('output_params', verb='has'):
+ yield param
+
+ def output_for_cli(self, textui, output, *args, **options):
+ if not isinstance(output, dict):
+ return
+ result = output.get('result')
+ summary = output.get('summary')
+
+ if (summary and isinstance(result, (list, tuple, dict)) and result):
+ textui.print_name(self.name)
+
+ if isinstance(result, (tuple, list)):
+ textui.print_entries(result, self.output_params)
+ elif isinstance(result, dict):
+ textui.print_entry(result, self.output_params)
+
+ if isinstance(summary, unicode):
+ textui.print_summary(summary)
+
class LocalOrRemote(Command):
"""
@@ -972,7 +1073,7 @@ class Method(Attribute, Command):
>>> api = create_api()
>>> class user_add(Method):
... def run(self):
- ... return 'Added the user!'
+ ... return dict(result='Added the user!')
...
>>> class user(Object):
... pass
@@ -987,7 +1088,7 @@ class Method(Attribute, Command):
>>> list(api.Method)
['user_add']
>>> api.Method.user_add() # Will call user_add.run()
- 'Added the user!'
+ {'result': 'Added the user!'}
Second, because `Method` is a subclass of `Command`, the ``user_add``
plugin can also be accessed through the ``api.Command`` namespace:
@@ -995,7 +1096,7 @@ class Method(Attribute, Command):
>>> list(api.Command)
['user_add']
>>> api.Command.user_add() # Will call user_add.run()
- 'Added the user!'
+ {'result': 'Added the user!'}
And third, ``user_add`` can be accessed as an attribute on the ``user``
`Object`:
@@ -1005,7 +1106,7 @@ class Method(Attribute, Command):
>>> list(api.Object.user.methods)
['add']
>>> api.Object.user.methods.add() # Will call user_add.run()
- 'Added the user!'
+ {'result': 'Added the user!'}
The `Attribute` base class implements the naming convention for the
attribute-to-object association. Also see the `Object` and the
@@ -1018,6 +1119,10 @@ class Method(Attribute, Command):
def __init__(self):
super(Method, self).__init__()
+ def get_output_params(self):
+ for param in self.obj.params():
+ yield param
+
class Property(Attribute):
__public__ = frozenset((
diff --git a/ipalib/output.py b/ipalib/output.py
new file mode 100644
index 000000000..425ff977c
--- /dev/null
+++ b/ipalib/output.py
@@ -0,0 +1,104 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Simple description of return values.
+"""
+
+from inspect import getdoc
+from types import NoneType
+from plugable import ReadOnly, lock
+
+
+class Output(ReadOnly):
+ """
+ Simple description of a member in the return value ``dict``.
+ """
+
+ type = None
+ validate = None
+ doc = None
+
+ def __init__(self, name, type=None, doc=None):
+ self.name = name
+ if type is not None:
+ self.type = type
+ if doc is not None:
+ self.doc = doc
+ lock(self)
+
+ def __repr__(self):
+ return '%s(%r, %r, %r)' % (
+ self.__class__.__name__, self.name, self.type, self.doc,
+ )
+
+
+class Entry(Output):
+ type = dict
+ doc = 'A dictionary representing an LDAP entry'
+
+
+emsg = """%s.validate_output() => %s.validate():
+ output[%r][%d]: need a %r; got a %r: %r"""
+
+class ListOfEntries(Output):
+ type = (list, tuple)
+ doc = 'A list of LDAP entries'
+
+ def validate(self, cmd, entries):
+ assert isinstance(entries, self.type)
+ for (i, entry) in enumerate(entries):
+ if not isinstance(entry, dict):
+ raise TypeError(emsg % (cmd.name, self.__class__.__name__,
+ self.name, i, dict, type(entry), entry)
+ )
+
+
+result = Output('result', doc='All commands should at least have a result')
+
+summary = Output('summary', (unicode, NoneType),
+ 'User-friendly description of action performed'
+)
+
+value = Output('value', unicode,
+ "The primary_key value of the entry, e.g. 'jdoe' for a user"
+)
+
+standard = (result, summary)
+
+standard_entry = (
+ Entry('result'),
+ value,
+ summary,
+)
+
+standard_list_of_entries = (
+ ListOfEntries('result'),
+ Output('count', int, 'Number of entries returned'),
+ Output('truncated', bool, 'True if not all results were returned'),
+ summary,
+)
+
+standard_delete = (
+ Output('result', bool, 'True means the operation was successful'),
+ value,
+ summary,
+)
+
+standard_value = standard_delete
diff --git a/ipalib/parameters.py b/ipalib/parameters.py
index 00880bd2b..b6133f1b1 100644
--- a/ipalib/parameters.py
+++ b/ipalib/parameters.py
@@ -233,8 +233,8 @@ class Param(ReadOnly):
kwargs = (
('cli_name', str, None),
('cli_short_name', str, None),
- ('label', callable, None),
- ('doc', str, ''),
+ ('label', str, None),
+ ('doc', str, None),
('required', bool, True),
('multivalue', bool, False),
('primary_key', bool, False),
@@ -285,10 +285,16 @@ class Param(ReadOnly):
)
)
- # Merge in default for 'cli_name' if not given:
- if kw.get('cli_name', None) is None:
+ # Merge in default for 'cli_name', label, doc if not given:
+ if kw.get('cli_name') is None:
kw['cli_name'] = self.name
+ if kw.get('label') is None:
+ kw['label'] = '<%s>' % self.name
+
+ if kw.get('doc') is None:
+ kw['doc'] = kw['label']
+
# Wrap 'default_from' in a DefaultFrom if not already:
df = kw.get('default_from', None)
if callable(df) and not isinstance(df, DefaultFrom):
@@ -505,14 +511,6 @@ class Param(ReadOnly):
kw.update(overrides)
return self.__class__(self.name, **kw)
- def get_label(self):
- """
- Return translated label using `request.ugettext`.
- """
- if self.label is None:
- return self.cli_name.decode('UTF-8')
- return self.label(ugettext)
-
def normalize(self, value):
"""
Normalize ``value`` using normalizer callback.
diff --git a/ipalib/plugins/baseldap.py b/ipalib/plugins/baseldap.py
index b0ea57380..bd6ce3010 100644
--- a/ipalib/plugins/baseldap.py
+++ b/ipalib/plugins/baseldap.py
@@ -26,6 +26,7 @@ from ipalib import Command, Method, Object
from ipalib import Flag, List, Str
from ipalib.base import NameSpace
from ipalib.cli import to_cli, from_cli
+from ipalib import output
def validate_add_attribute(ugettext, attr):
@@ -178,9 +179,12 @@ class LDAPCreate(crud.Create):
dn = self.post_callback(ldap, dn, entry_attrs, *keys, **options)
self.obj.convert_attribute_members(entry_attrs, *keys, **options)
- return (dn, entry_attrs)
+ return dict(
+ result=entry_attrs,
+ value=keys[0],
+ )
- def output_for_cli(self, textui, entry, *keys, **options):
+ def dont_output_for_cli(self, textui, entry, *keys, **options):
textui.print_name(self.name)
self.obj.print_entry(textui, entry, *keys, **options)
if len(keys) > 1:
@@ -220,6 +224,7 @@ class LDAPRetrieve(LDAPQuery):
"""
Retrieve an LDAP entry.
"""
+
takes_options = (
Flag('raw',
cli_name='raw',
@@ -231,6 +236,8 @@ class LDAPRetrieve(LDAPQuery):
),
)
+ has_output = output.standard_entry
+
def execute(self, *keys, **options):
ldap = self.obj.backend
@@ -248,9 +255,14 @@ class LDAPRetrieve(LDAPQuery):
dn = self.post_callback(ldap, dn, entry_attrs, *keys, **options)
self.obj.convert_attribute_members(entry_attrs, *keys, **options)
- return (dn, entry_attrs)
+ entry_attrs['dn'] = dn
+ return dict(
+ result=entry_attrs,
+ value=keys[0],
+ )
- def output_for_cli(self, textui, entry, *keys, **options):
+
+ def dont_output_for_cli(self, textui, entry, *keys, **options):
textui.print_name(self.name)
self.obj.print_entry(textui, entry, *keys, **options)
@@ -328,9 +340,12 @@ class LDAPUpdate(LDAPQuery, crud.Update):
dn = self.post_callback(ldap, dn, entry_attrs, *keys, **options)
self.obj.convert_attribute_members(entry_attrs, *keys, **options)
- return (dn, entry_attrs)
+ return dict(
+ result=entry_attrs,
+ value=keys[0],
+ )
- def output_for_cli(self, textui, entry, *keys, **options):
+ def dont_output_for_cli(self, textui, entry, *keys, **options):
textui.print_name(self.name)
self.obj.print_entry(textui, entry, *keys, **options)
if len(keys) > 1:
@@ -359,6 +374,8 @@ class LDAPDelete(LDAPQuery):
"""
Delete an LDAP entry and all of its direct subentries.
"""
+ has_output = output.standard_delete
+
def execute(self, *keys, **options):
ldap = self.obj.backend
@@ -384,9 +401,13 @@ class LDAPDelete(LDAPQuery):
result = self.post_callback(ldap, dn, *keys, **options)
- return result
+ return dict(
+ result=result,
+ value=keys[0],
+ )
+
- def output_for_cli(self, textui, result, *keys, **options):
+ def dont_output_for_cli(self, textui, result, *keys, **options):
textui.print_name(self.name)
if len(keys) > 1:
textui.print_dashed(
@@ -454,7 +475,7 @@ class LDAPModMember(LDAPQuery):
failed[attr][ldap_obj_name].append(name)
return (dns, failed)
- def output_for_cli(self, textui, result, *keys, **options):
+ def dont_output_for_cli(self, textui, result, *keys, **options):
(completed, failed, entry) = result
for (attr, objs) in failed.iteritems():
@@ -479,6 +500,19 @@ class LDAPAddMember(LDAPModMember):
member_param_doc = 'comma-separated list of %s to add'
member_count_out = ('%i member added.', '%i members added.')
+ has_output = (
+ output.Entry('result'),
+ output.Output('completed',
+ type=int,
+ doc='Number of members added',
+ ),
+ output.Output('failed',
+ type=dict,
+ doc='Members that could not be added',
+ ),
+ )
+
+
def execute(self, *keys, **options):
ldap = self.obj.backend
@@ -511,7 +545,11 @@ class LDAPAddMember(LDAPModMember):
)
self.obj.convert_attribute_members(entry_attrs, *keys, **options)
- return (completed, failed, (dn, entry_attrs))
+ return dict(
+ completed=completed,
+ failed=failed,
+ result=entry_attrs,
+ )
def pre_callback(self, ldap, dn, found, not_found, *keys, **options):
return dn
@@ -527,6 +565,18 @@ class LDAPRemoveMember(LDAPModMember):
member_param_doc = 'comma-separated list of %s to remove'
member_count_out = ('%i member removed.', '%i members removed.')
+ has_output = (
+ output.Entry('result'),
+ output.Output('completed',
+ type=int,
+ doc='Number of members removed',
+ ),
+ output.Output('failed',
+ type=dict,
+ doc='Members that could not be removed',
+ ),
+ )
+
def execute(self, *keys, **options):
ldap = self.obj.backend
@@ -559,7 +609,11 @@ class LDAPRemoveMember(LDAPModMember):
)
self.obj.convert_attribute_members(entry_attrs, *keys, **options)
- return (completed, failed, (dn, entry_attrs))
+ return dict(
+ completed=completed,
+ failed=failed,
+ result=entry_attrs,
+ )
def pre_callback(self, ldap, dn, found, not_found, *keys, **options):
return dn
@@ -647,9 +701,18 @@ class LDAPSearch(crud.Search):
entries[i][1], *args, **options
)
entries[i] = (dn, entries[i][1])
- return (entries, truncated)
- def output_for_cli(self, textui, result, *args, **options):
+ entries = tuple(e for (dn, e) in entries)
+
+ return dict(
+ result=entries,
+ count=len(entries),
+ truncated=truncated,
+ )
+
+
+
+ def dont_output_for_cli(self, textui, result, *args, **options):
(entries, truncated) = result
textui.print_name(self.name)
diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py
index b5cc7ca5a..322f3fa8b 100644
--- a/ipalib/plugins/group.py
+++ b/ipalib/plugins/group.py
@@ -24,6 +24,7 @@ Groups of users
from ipalib import api
from ipalib import Int, Str
from ipalib.plugins.baseldap import *
+from ipalib import _, ngettext
class group(LDAPObject):
@@ -57,16 +58,18 @@ class group(LDAPObject):
takes_params = (
Str('cn',
cli_name='name',
- doc='group name',
+ label='Group name',
primary_key=True,
normalizer=lambda value: value.lower(),
),
Str('description',
cli_name='desc',
- doc='group description',
+ label='Description',
+ doc='Group description',
),
Int('gidnumber?',
cli_name='gid',
+ label='GID',
doc='GID (use this option to set it manually)',
),
)
@@ -78,6 +81,9 @@ class group_add(LDAPCreate):
"""
Create new group.
"""
+
+ msg_summary = _('Added group "%(value)s"')
+
takes_options = LDAPCreate.takes_options + (
Flag('posix',
cli_name='posix',
@@ -90,6 +96,7 @@ class group_add(LDAPCreate):
entry_attrs['objectclass'].append('posixgroup')
return dn
+
api.register(group_add)
@@ -97,6 +104,9 @@ class group_del(LDAPDelete):
"""
Delete group.
"""
+
+ msg_summary = _('Deleted group "%(value)s"')
+
def pre_callback(self, ldap, dn, *keys, **options):
config = ldap.get_ipa_config()[1]
def_primary_group = config.get('ipadefaultprimarygroup', '')
@@ -112,6 +122,9 @@ class group_mod(LDAPUpdate):
"""
Modify group.
"""
+
+ msg_summary = _('Modified group "%(value)s"')
+
takes_options = LDAPUpdate.takes_options + (
Flag('posix',
cli_name='posix',
@@ -138,6 +151,10 @@ class group_find(LDAPSearch):
Search for groups.
"""
+ msg_summary = ngettext(
+ '%(count)d group matched', '%(count)d groups matched', 0
+ )
+
api.register(group_find)
@@ -163,4 +180,3 @@ class group_remove_member(LDAPRemoveMember):
"""
api.register(group_remove_member)
-
diff --git a/ipalib/plugins/hbac.py b/ipalib/plugins/hbac.py
index 4a7cc9407..6dc13f6d5 100644
--- a/ipalib/plugins/hbac.py
+++ b/ipalib/plugins/hbac.py
@@ -35,7 +35,7 @@ class hbac(LDAPObject):
default_attributes = [
'cn', 'accessruletype', 'ipaenabledflag', 'servicename',
'accesstime', 'description',
-
+
]
uuid_attribute = 'ipauniqueid'
attribute_names = {
@@ -128,7 +128,7 @@ class hbac_add(LDAPCreate):
if not dn.startswith('cn='):
msg = 'HBAC rule with name "%s" already exists' % keys[-1]
raise errors.DuplicateEntry(message=msg)
- # HBAC rules are enabled by default
+ # HBAC rules are enabled by default
entry_attrs['ipaenabledflag'] = 'TRUE'
return ldap.make_dn(
entry_attrs, self.obj.uuid_attribute, self.obj.container_dn
@@ -184,7 +184,7 @@ class hbac_enable(LDAPQuery):
except errors.EmptyModlist:
pass
- return True
+ return dict(result=True)
def output_for_cli(self, textui, result, cn):
textui.print_name(self.name)
@@ -208,7 +208,7 @@ class hbac_disable(LDAPQuery):
except errors.EmptyModlist:
pass
- return True
+ return dict(result=True)
def output_for_cli(self, textui, result, cn):
textui.print_name(self.name)
@@ -242,7 +242,7 @@ class hbac_add_accesstime(LDAPQuery):
except errors.EmptyModlist:
pass
- return True
+ return dict(result=True)
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
@@ -280,7 +280,7 @@ class hbac_remove_accesstime(LDAPQuery):
except (ValueError, errors.EmptyModlist):
pass
- return True
+ return dict(result=True)
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
@@ -351,5 +351,3 @@ class hbac_remove_sourcehost(LDAPRemoveMember):
member_count_out = ('%i object removed.', '%i objects removed.')
api.register(hbac_remove_sourcehost)
-
-
diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py
index 8859b5873..dd19362bd 100644
--- a/ipalib/plugins/host.py
+++ b/ipalib/plugins/host.py
@@ -29,6 +29,7 @@ from ipalib import api, errors, util
from ipalib import Str, Flag
from ipalib.plugins.baseldap import *
from ipalib.plugins.service import split_principal
+from ipalib import _, ngettext
def validate_host(ugettext, fqdn):
@@ -72,32 +73,38 @@ class host(LDAPObject):
takes_params = (
Str('fqdn', validate_host,
cli_name='hostname',
- doc='Hostname',
+ label='Hostname',
primary_key=True,
normalizer=lambda value: value.lower(),
),
Str('description?',
cli_name='desc',
+ label='Description',
doc='Description of the host',
),
Str('localityname?',
cli_name='locality',
+ label='Locality',
doc='Locality of the host (Baltimore, MD)',
),
Str('nshostlocation?',
cli_name='location',
+ label='Location',
doc='Location of the host (e.g. Lab 2)',
),
Str('nshardwareplatform?',
cli_name='platform',
+ label='Platform',
doc='Hardware platform of the host (e.g. Lenovo T61)',
),
Str('nsosversion?',
cli_name='os',
+ label='Operating system',
doc='Operating System and version of the host (e.g. Fedora 9)',
),
Str('userpassword?',
cli_name='password',
+ label='User password',
doc='Password used in bulk enrollment',
),
)
@@ -125,6 +132,9 @@ class host_add(LDAPCreate):
"""
Create new host.
"""
+
+ msg_summary = _('Added host "%(value)s"')
+
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
entry_attrs['cn'] = keys[-1]
entry_attrs['serverhostname'] = keys[-1].split('.', 1)[0]
@@ -147,16 +157,21 @@ class host_del(LDAPDelete):
"""
Delete host.
"""
+
+ msg_summary = _('Deleted host "%(value)s"')
+
def pre_callback(self, ldap, dn, *keys, **options):
# Remove all service records for this host
truncated = True
while truncated:
try:
- (services, truncated) = api.Command['service_find'](keys[-1])
+ ret = api.Command['service_find'](keys[-1])
+ truncated = ret['truncated']
+ services = ret['result']
except errors.NotFound:
break
else:
- for (dn_, entry_attrs) in services:
+ for entry_attrs in services:
principal = entry_attrs['krbprincipalname'][0]
(service, hostname, realm) = split_principal(principal)
if hostname.lower() == keys[-1]:
@@ -170,6 +185,9 @@ class host_mod(LDAPUpdate):
"""
Modify host.
"""
+
+ msg_summary = _('Modified host "%(value)s"')
+
takes_options = LDAPUpdate.takes_options + (
Str('krbprincipalname?',
cli_name='principalname',
@@ -201,6 +219,10 @@ class host_find(LDAPSearch):
Search for hosts.
"""
+ msg_summary = ngettext(
+ '%(count)d host matched', '%(count)d hosts matched'
+ )
+
api.register(host_find)
@@ -210,4 +232,3 @@ class host_show(LDAPRetrieve):
"""
api.register(host_show)
-
diff --git a/ipalib/plugins/misc.py b/ipalib/plugins/misc.py
index 8bf9d81fd..0584654f7 100644
--- a/ipalib/plugins/misc.py
+++ b/ipalib/plugins/misc.py
@@ -22,18 +22,39 @@ Misc plugins
"""
import re
-from ipalib import api, LocalOrRemote
-
-
+from ipalib import api, LocalOrRemote, _, ngettext
+from ipalib.output import Output, summary
# FIXME: We should not let env return anything in_server
# when mode == 'production'. This would allow an attacker to see the
# configuration of the server, potentially revealing compromising
# information. However, it's damn handy for testing/debugging.
+
+
class env(LocalOrRemote):
"""Show environment variables"""
- takes_args = ('variables*',)
+ msg_summary = _('%(count)d variables')
+
+ takes_args = (
+ 'variables*',
+ )
+
+ has_output = (
+ Output('result',
+ type=dict,
+ doc='Dictionary mapping variable name to value',
+ ),
+ Output('total',
+ type=int,
+ doc='Total number of variables env (>= count)',
+ ),
+ Output('count',
+ type=int,
+ doc='Number of variables returned (<= total)',
+ ),
+ summary,
+ )
def __find_keys(self, variables):
keys = set()
@@ -52,20 +73,18 @@ class env(LocalOrRemote):
keys = self.env
else:
keys = self.__find_keys(variables)
- return dict(
- (key, self.env[key]) for key in keys
+ ret = dict(
+ result=dict(
+ (key, self.env[key]) for key in keys
+ ),
+ count=len(keys),
+ total=len(self.env),
)
-
- def output_for_cli(self, textui, result, variables, **options):
- if len(result) == 0:
- return
- result = tuple((k, result[k]) for k in sorted(result))
- if len(result) == 1:
- textui.print_keyval(result)
- return
- textui.print_name(self.name)
- textui.print_keyval(result)
- textui.print_count(result, '%d variables')
+ if len(keys) > 1:
+ ret['summary'] = self.msg_summary % ret
+ else:
+ ret['summary'] = None
+ return ret
api.register(env)
@@ -73,18 +92,26 @@ api.register(env)
class plugins(LocalOrRemote):
"""Show all loaded plugins"""
+ msg_summary = ngettext(
+ '%(count)d plugin loaded', '%(count)d plugins loaded'
+ )
+
+ has_output = (
+ Output('result', dict, 'Dictionary mapping plugin names to bases'),
+ Output('count',
+ type=int,
+ doc='Number of plugins loaded',
+ ),
+ summary,
+ )
+
def execute(self, **options):
plugins = sorted(self.api.plugins, key=lambda o: o.plugin)
- return tuple(
- (p.plugin, p.bases) for p in plugins
+ return dict(
+ result=dict(
+ (p.plugin, p.bases) for p in plugins
+ ),
+ count=len(plugins),
)
- def output_for_cli(self, textui, result, **options):
- textui.print_name(self.name)
- for (plugin, bases) in result:
- textui.print_indented(
- '%s: %s' % (plugin, ', '.join(bases))
- )
- textui.print_count(result, '%d plugin loaded', '%s plugins loaded')
-
api.register(plugins)
diff --git a/ipalib/plugins/passwd.py b/ipalib/plugins/passwd.py
index fbc12263c..6c509c2c7 100644
--- a/ipalib/plugins/passwd.py
+++ b/ipalib/plugins/passwd.py
@@ -67,7 +67,7 @@ class passwd(Command):
ldap.modify_password(dn, password)
- return True
+ return dict(result=True)
def output_for_cli(self, textui, result, principal, password):
assert password is None
@@ -75,4 +75,3 @@ class passwd(Command):
textui.print_dashed('Changed password for "%s."' % principal)
api.register(passwd)
-
diff --git a/ipalib/plugins/pwpolicy.py b/ipalib/plugins/pwpolicy.py
index 5a07c880a..faf036418 100644
--- a/ipalib/plugins/pwpolicy.py
+++ b/ipalib/plugins/pwpolicy.py
@@ -25,6 +25,7 @@ Password policy
from ipalib import api, crud, errors
from ipalib import Command, Object
from ipalib import Int, Str
+from ipalib import output
from ldap.functions import explode_dn
_fields = {
@@ -54,6 +55,7 @@ def _convert_time_on_input(entry_attrs):
if 'krbminpwdlife' in entry_attrs:
entry_attrs['krbminpwdlife'] = entry_attrs['krbminpwdlife'] * 3600
+
def make_cos_entry(group, cospriority=None):
"""
Make the CoS dn and entry for this group.
@@ -64,9 +66,10 @@ def make_cos_entry(group, cospriority=None):
"""
try:
- (groupdn, group_attrs) = api.Command['group_show'](group)
+ entry = api.Command['group_show'](group)['result']
except errors.NotFound:
raise errors.NotFound(reason="group '%s' does not exist" % group)
+ groupdn = entry['dn']
cos_entry = {}
if cospriority:
@@ -76,6 +79,7 @@ def make_cos_entry(group, cospriority=None):
return (cos_dn, cos_entry)
+
def make_policy_entry(group_cn, policy_entry):
"""
Make the krbpwdpolicy dn and entry for this group.
@@ -98,6 +102,7 @@ def make_policy_entry(group_cn, policy_entry):
return (policy_dn, policy_entry)
+
class pwpolicy(Object):
"""
Password Policy object.
@@ -138,6 +143,7 @@ class pwpolicy(Object):
api.register(pwpolicy)
+
class pwpolicy_add(crud.Create):
"""
Create a new password policy associated with a group.
@@ -150,6 +156,7 @@ class pwpolicy_add(crud.Create):
),
Int('cospriority',
cli_name='priority',
+ label='Priority',
doc='Priority of the policy. Higher number equals higher priority',
minvalue=0,
attribute=True,
@@ -182,22 +189,12 @@ class pwpolicy_add(crud.Create):
_convert_time_for_output(entry_attrs)
- return (dn, entry_attrs)
-
- def output_for_cli(self, textui, result, *args, **options):
-# textui.print_name(self.name)
-# textui.print_dashed("Added policy for '%s'." % options['group'])
- (dn, entry_attrs) = result
-
- textui.print_name(self.name)
- textui.print_plain('Password policy:')
- for (k, v) in _fields.iteritems():
- if k in entry_attrs:
- textui.print_attribute(v, entry_attrs[k])
- textui.print_dashed('Modified password policy.')
+ entry_attrs['dn'] = dn
+ return dict(result=entry_attrs, value=group_cn)
api.register(pwpolicy_add)
+
class pwpolicy_mod(crud.Update):
"""
Modify password policy.
@@ -215,6 +212,10 @@ class pwpolicy_mod(crud.Update):
),
)
+ has_output = (
+ output.Entry('result'),
+ )
+
def execute(self, *args, **options):
assert 'dn' not in options
ldap = self.api.Backend.ldap2
@@ -235,24 +236,16 @@ class pwpolicy_mod(crud.Update):
_convert_time_for_output(entry_attrs)
- return (dn, entry_attrs)
-
- def output_for_cli(self, textui, result, *args, **options):
- (dn, entry_attrs) = result
-
- textui.print_name(self.name)
- textui.print_plain('Password policy:')
- for (k, v) in _fields.iteritems():
- if k in entry_attrs:
- textui.print_attribute(v, entry_attrs[k])
- textui.print_dashed('Modified password policy.')
+ return dict(result=entry_attrs)
api.register(pwpolicy_mod)
+
class pwpolicy_del(crud.Delete):
"""
Delete a group password policy.
"""
+
takes_options = (
Str('group',
doc='Group to remove policy from',
@@ -278,12 +271,10 @@ class pwpolicy_del(crud.Delete):
ldap.delete_entry(policy_dn, normalize=False)
ldap.delete_entry(cos_dn, normalize=False)
-
- return True
-
- def output_for_cli(self, textui, result, *args, **options):
- textui.print_name(self.name)
- textui.print_dashed('Deleted policy "%s".' % options['group'])
+ return dict(
+ result=True,
+ value=group_cn,
+ )
api.register(pwpolicy_del)
@@ -292,6 +283,7 @@ class pwpolicy_show(Command):
"""
Display password policy.
"""
+
takes_options = (
Str('group?',
doc='Group to display policy',
@@ -300,6 +292,7 @@ class pwpolicy_show(Command):
doc='Display policy applied to a given user',
),
)
+
def execute(self, *args, **options):
ldap = self.api.Backend.ldap2
@@ -333,16 +326,6 @@ class pwpolicy_show(Command):
entry_attrs['group'] = 'global'
_convert_time_for_output(entry_attrs)
- return (dn, entry_attrs)
-
- def output_for_cli(self, textui, result, *args, **options):
- (dn, entry_attrs) = result
-
- textui.print_name(self.name)
- textui.print_plain('Password policy:')
- for (k, v) in _fields.iteritems():
- if k in entry_attrs:
- textui.print_attribute(v, entry_attrs[k])
+ return dict(result=entry_attrs)
api.register(pwpolicy_show)
-
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 5b0119151..f65ab3ebd 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -149,7 +149,7 @@ class service_add(LDAPCreate):
raise errors.HostService()
try:
- (hostdn, hostentry) = api.Command['host_show'](hostname, **{})
+ api.Command['host_show'](hostname)
except errors.NotFound:
raise errors.NotFound(reason="The host '%s' does not exist to add a service to." % hostname)
@@ -267,4 +267,3 @@ class service_remove_host(LDAPRemoveMember):
member_attributes = ['managedby']
api.register(service_remove_host)
-
diff --git a/ipalib/plugins/user.py b/ipalib/plugins/user.py
index 643531305..44b0f7d5e 100644
--- a/ipalib/plugins/user.py
+++ b/ipalib/plugins/user.py
@@ -24,6 +24,7 @@ Users (Identity)
from ipalib import api, errors
from ipalib import Flag, Int, Password, Str
from ipalib.plugins.baseldap import *
+from ipalib import _, ngettext
class user(LDAPObject):
@@ -68,57 +69,59 @@ class user(LDAPObject):
}
takes_params = (
+ Str('uid',
+ cli_name='login',
+ label='User login',
+ primary_key=True,
+ default_from=lambda givenname, sn: givenname[0] + sn,
+ normalizer=lambda value: value.lower(),
+ ),
Str('givenname',
cli_name='first',
- doc='First name',
+ label='First name',
),
Str('sn',
cli_name='last',
- doc='Last name',
+ label='Last name',
),
- Str('uid',
- cli_name='user',
- doc='Login name',
- primary_key=True,
- default_from=lambda givenname, sn: givenname[0] + sn,
- normalizer=lambda value: value.lower(),
+ Str('homedirectory?',
+ cli_name='homedir',
+ label='Home directory',
+ default_from=lambda uid: '/home/%s' % uid,
),
Str('gecos?',
- doc='GECOS field',
+ label='GECOS field',
default_from=lambda uid: uid,
autofill=True,
),
- Str('homedirectory?',
- cli_name='homedir',
- doc='Home directory',
- default_from=lambda uid: '/home/%s' % uid,
- ),
Str('loginshell?',
cli_name='shell',
+ label='Login shell',
default=u'/bin/sh',
- doc='login shell',
),
Str('krbprincipalname?',
cli_name='principal',
- doc='Kerberos principal name',
+ label='Kerberos principal',
default_from=lambda uid: '%s@%s' % (uid, api.env.realm),
autofill=True,
),
Str('mail?',
cli_name='email',
- doc='e-mail address',
+ label='Email address',
),
Password('userpassword?',
cli_name='password',
- doc='password',
+ label='Password',
+ doc='Set the user password',
),
Int('uidnumber?',
cli_name='uid',
+ label='UID',
doc='UID (use this option to set it manually)',
),
Str('street?',
cli_name='street',
- doc='street address',
+ label='Street address',
),
)
@@ -129,6 +132,9 @@ class user_add(LDAPCreate):
"""
Create new user.
"""
+
+ msg_summary = _('Added user "%(value)s"')
+
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
config = ldap.get_ipa_config()[1]
entry_attrs.setdefault('loginshell', config.get('ipadefaultloginshell'))
@@ -171,6 +177,9 @@ class user_del(LDAPDelete):
"""
Delete user.
"""
+
+ msg_summary = _('Deleted user "%(value)s"')
+
def pre_callback(self, ldap, dn, *keys, **options):
if keys[-1] == 'admin':
raise errors.ExecutionError('Cannot delete user "admin".')
@@ -188,6 +197,8 @@ class user_mod(LDAPUpdate):
Modify user.
"""
+ msg_summary = _('Modified user "%(value)s"')
+
api.register(user_mod)
@@ -196,6 +207,10 @@ class user_find(LDAPSearch):
Search for users.
"""
+ msg_summary = ngettext(
+ '%(count)d user matched', '%(count)d users matched', 0
+ )
+
api.register(user_find)
@@ -211,6 +226,10 @@ class user_lock(LDAPQuery):
"""
Lock user account.
"""
+
+ has_output = output.standard_value
+ msg_summary = _('Locked user "%(value)s"')
+
def execute(self, *keys, **options):
ldap = self.obj.backend
@@ -221,11 +240,10 @@ class user_lock(LDAPQuery):
except errors.AlreadyInactive:
pass
- return True
-
- def output_for_cli(self, textui, result, *keys, **options):
- textui.print_name(self.name)
- textui.print_dashed('Locked user "%s".' % keys[-1])
+ return dict(
+ result=True,
+ value=keys[0],
+ )
api.register(user_lock)
@@ -234,6 +252,10 @@ class user_unlock(LDAPQuery):
"""
Unlock user account.
"""
+
+ has_output = output.standard_value
+ msg_summary = _('Unlocked user "%(value)s"')
+
def execute(self, *keys, **options):
ldap = self.obj.backend
@@ -244,10 +266,9 @@ class user_unlock(LDAPQuery):
except errors.AlreadyActive:
pass
- return True
-
- def output_for_cli(self, textui, result, *keys, **options):
- textui.print_name(self.name)
- textui.print_dashed('Unlocked user "%s".' % keys[-1])
+ return dict(
+ result=True,
+ value=keys[0],
+ )
api.register(user_unlock)
diff --git a/ipalib/text.py b/ipalib/text.py
new file mode 100644
index 000000000..0c8684025
--- /dev/null
+++ b/ipalib/text.py
@@ -0,0 +1,79 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty contextrmation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Thread-local lazy gettext service.
+
+TODO: This aren't hooked up into gettext yet, they currently just provide
+placeholders for the rest of the code.
+"""
+
+
+class LazyText(object):
+ def __init__(self, domain, localedir):
+ self.domain = domain
+ self.localedir = localedir
+
+
+class Gettext(LazyText):
+ def __init__(self, msg, domain, localedir):
+ self.msg = msg
+ super(Gettext, self).__init__(domain, localedir)
+
+ def __unicode__(self):
+ return self.msg.decode('utf-8')
+
+ def __mod__(self, value):
+ return self.__unicode__() % value
+
+
+class NGettext(LazyText):
+ def __init__(self, singular, plural, domain, localedir):
+ self.singular = singular
+ self.plural = plural
+ super(NGettext, self).__init__(domain, localedir)
+
+ def __mod__(self, kw):
+ count = kw['count']
+ return self(count) % kw
+
+ def __call__(self, count):
+ if count == 1:
+ return self.singular.decode('utf-8')
+ return self.plural.decode('utf-8')
+
+
+class gettext_factory(object):
+ def __init__(self, domain='ipa', localedir=None):
+ self.domain = domain
+ self.localedir = localedir
+
+ def __call__(self, msg):
+ return Gettext(msg, self.domain, self.localedir)
+
+
+class ngettext_factory(gettext_factory):
+ def __call__(self, singular, plural, count=0):
+ return NGettext(singular, plural, self.domain, self.localedir)
+
+
+# Process wide factories:
+gettext = gettext_factory()
+_ = gettext
+ngettext = ngettext_factory()
diff --git a/ipaserver/plugins/join.py b/ipaserver/plugins/join.py
index 08b1596ed..34f4c58cb 100644
--- a/ipaserver/plugins/join.py
+++ b/ipaserver/plugins/join.py
@@ -72,6 +72,9 @@ class join(Command):
),
)
+ has_output = tuple()
+ use_output_validation = False
+
def execute(self, hostname, **kw):
"""
Execute the machine join operation.
diff --git a/ipaserver/rpcserver.py b/ipaserver/rpcserver.py
index 24213dd67..9400ac367 100644
--- a/ipaserver/rpcserver.py
+++ b/ipaserver/rpcserver.py
@@ -77,7 +77,7 @@ def extract_query(environ):
qstr = environ['QUERY_STRING']
if qstr:
query = dict(nicify_query(
- parse_qs(qstr, keep_blank_values=True)
+ parse_qs(qstr)#, keep_blank_values=True)
))
else:
query = {}
@@ -125,6 +125,9 @@ class WSGIExecutioner(Executioner):
error = InternalError()
finally:
destroy_context()
+ self.debug('Returning:\n%s',
+ json.dumps(result, sort_keys=True, indent=4)
+ )
return self.marshal(result, error, _id)
def simple_unmarshal(self, environ):
diff --git a/ipawebui/__init__.py b/ipawebui/__init__.py
index 9f86da344..c7ebaa870 100644
--- a/ipawebui/__init__.py
+++ b/ipawebui/__init__.py
@@ -1,6 +1,6 @@
# Authors: Jason Gerard DeRose <jderose@redhat.com>
#
-# Copyright (C) 2008 Red Hat
+# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
diff --git a/ipawebui/engine.py b/ipawebui/engine.py
index ea0905d49..a90a450d0 100644
--- a/ipawebui/engine.py
+++ b/ipawebui/engine.py
@@ -1,6 +1,6 @@
# Authors: Jason Gerard DeRose <jderose@redhat.com>
#
-# Copyright (C) 2008 Red Hat
+# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@@ -21,6 +21,7 @@ Engine to map ipalib plugins to wehjit widgets.
"""
from controllers import Command
+from ipalib import crud
class ParamMapper(object):
def __init__(self, api, app):
@@ -28,7 +29,7 @@ class ParamMapper(object):
self._app = app
self.__methods = dict()
for name in dir(self):
- if name.startswith('_') or name.endswith('_'):
+ if name.startswith('_'):
continue
attr = getattr(self, name)
if not callable(attr):
@@ -40,13 +41,12 @@ class ParamMapper(object):
if key in self.__methods:
method = self.__methods[key]
else:
- #raise Warning('No ParamMapper for %r' % key)
method = self.Str
return method(param, cmd)
def Str(self, param, cmd):
return self._app.new('TextRow',
- label=param.cli_name,
+ label=param.label,
name=param.name,
required=param.required,
value=param.default,
@@ -61,7 +61,7 @@ class ParamMapper(object):
def Flag(self, param, cmd):
return self._app.new('SelectRow',
name=param.name,
- label=param.cli_name,
+ label=param.label,
)
@@ -128,23 +128,43 @@ class Engine(object):
return page
def build_page(self, cmd):
- page = self.app.new('PageApp',
+ page = self.app.new('PageCmd',
+ cmd=cmd,
id=cmd.name,
title=cmd.summary.rstrip('.'),
)
- #page.form.action = self.app.url + '__json__'
- page.actions.add(
- self.app.new('Submit')
+ page.form.action = page.url
+ page.form.method = 'GET'
+ page.form.add(
+ self.app.new('Hidden', name='__mode__', value='output')
)
- table = self.app.new('FieldTable')
- page.view.add(table)
- for param in cmd.params():
- if param.exclude and 'webui' in param.exclude:
- continue
- field = self.param_mapper(param, cmd)
- table.add(field)
+ page.notification = self.app.new('Notification')
+ page.view.add(page.notification)
+ page.prompt = self.make_prompt(cmd)
+ page.show = self.make_show(cmd)
+ self.conditional('input', page.actions, self.app.new('Submit'))
+ self.conditional('input', page.view, page.prompt)
+ self.conditional('output', page.view, page.show)
+ return page
+
+ def conditional(self, mode, parent, *children):
+ conditional = self.app.new('Conditional', mode=mode)
+ conditional.add(*children)
+ parent.add(conditional)
- page.form.action = '/'.join([self.jsonurl, cmd.name])
+ def make_prompt(self, cmd):
+ table = self.app.new('FieldTable')
+ for param in self._iter_params(cmd.params):
+ table.add(
+ self.param_mapper(param, cmd)
+ )
+ return table
+ def make_show(self, cmd):
+ return self.app.new('Output')
- return page
+ def _iter_params(self, namespace):
+ for param in namespace():
+ if param.exclude and 'webui' in param.exclude:
+ continue
+ yield param
diff --git a/ipawebui/widgets.py b/ipawebui/widgets.py
index 74b9d7e0c..1cf1adb97 100644
--- a/ipawebui/widgets.py
+++ b/ipawebui/widgets.py
@@ -1,6 +1,6 @@
# Authors: Jason Gerard DeRose <jderose@redhat.com>
#
-# Copyright (C) 2008 Red Hat
+# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@@ -21,9 +21,10 @@ Custom IPA widgets.
"""
from textwrap import dedent
-from wehjit import Collection, base, freeze
+from wehjit import Collection, base, freeze, builtins
from wehjit.util import Alternator
from wehjit import Static, Dynamic, StaticProp, DynamicProp
+from ipaserver.rpcserver import extract_query
class IPAPlugins(base.Container):
@@ -189,6 +190,14 @@ class Command(base.Widget):
<td py:content="repr(option)" />
</tr>
+ <tr py:if="plugin.output" class="${row.next()}">
+ <th colspan="2" py:content="'output (%d)' % len(plugin.output)" />
+ </tr>
+ <tr py:for="param in plugin.output()" class="${row.next()}">
+ <td py:content="param.name"/>
+ <td py:content="repr(param)" />
+ </tr>
+
</table>
"""
@@ -211,7 +220,7 @@ class Object(base.Widget):
<th colspan="2" py:content="'params (%d)' % len(plugin.params)" />
</tr>
<tr py:for="param in plugin.params()" class="${row.next()}">
- <td py:content="param.name"/>
+ <td>${"param.name"}:</td>
<td py:content="repr(param)" />
</tr>
@@ -219,6 +228,171 @@ class Object(base.Widget):
"""
+
+class Conditional(base.Container):
+
+ mode = Static('mode', default='input')
+
+ @DynamicProp
+ def page_mode(self):
+ if self.page is None:
+ return
+ return self.page.mode
+
+ xml = """
+ <div
+ xmlns:py="http://genshi.edgewall.org/"
+ py:if="mode == page_mode"
+ py:strip="True"
+ >
+ <child py:for="child in children" py:replace="child.generate()" />
+ </div>
+ """
+
+
+class Output(base.Widget):
+ """
+ Shows attributes form an LDAP entry.
+ """
+
+ order = Dynamic('order')
+ labels = Dynamic('labels')
+ result = Dynamic('result')
+
+ xml = """
+ <div
+ xmlns:py="http://genshi.edgewall.org/"
+ class="${klass}"
+ id="${id}"
+ >
+ <table py:if="isinstance(result, dict)">
+ <tr py:for="key in order" py:if="key in result">
+ <th py:content="labels[key]" />
+ <td py:content="result[key]" />
+ </tr>
+ </table>
+
+ <table
+ py:if="isinstance(result, (list, tuple)) and len(result) > 0"
+ >
+ <tr>
+ <th
+ py:for="key in order"
+ py:if="key in result[0]"
+ py:content="labels[key]"
+ />
+ </tr>
+ <tr py:for="entry in result">
+ <td
+ py:for="key in order"
+ py:if="key in result[0]"
+ py:content="entry[key]"
+ />
+ </tr>
+ </table>
+ </div>
+ """
+
+ style = (
+ ('table', (
+ ('empty-cells', 'show'),
+ ('border-collapse', 'collapse'),
+ )),
+
+ ('th', (
+ ('text-align', 'right'),
+ ('padding', '.25em 0.5em'),
+ ('line-height', '%(height_bar)s'),
+ ('vertical-align', 'top'),
+ )),
+
+ ('td', (
+ ('padding', '.25em'),
+ ('vertical-align', 'top'),
+ ('text-align', 'left'),
+ ('line-height', '%(height_bar)s'),
+ )),
+ )
+
+
+class Hidden(base.Field):
+ xml = """
+ <input
+ xmlns:py="http://genshi.edgewall.org/"
+ type="hidden"
+ name="${name}"
+ />
+ """
+
+
+class Notification(base.Widget):
+ message = Dynamic('message')
+ error = Dynamic('error', default=False)
+
+ @property
+ def extra_css_classes(self):
+ if self.error:
+ yield 'error'
+ else:
+ yield 'okay'
+
+ xml = """
+ <p
+ xmlns:py="http://genshi.edgewall.org/"
+ class="${klass}"
+ id="${id}"
+ py:if="message"
+ py:content="message"
+ />
+ """
+
+ style = (
+ ('', (
+ ('font-weight', 'bold'),
+ ('-moz-border-radius', '100%%'),
+ ('background-color', '#eee'),
+ ('border', '2px solid #966'),
+ ('padding', '0.5em'),
+ ('text-align', 'center'),
+ )),
+ )
+
+
+class PageCmd(builtins.PageApp):
+ cmd = Static('cmd')
+ mode = Dynamic('mode', default='input')
+
+ def controller(self, environ):
+ query = extract_query(environ)
+ self.mode = query.pop('__mode__', 'input')
+ if self.mode == 'input':
+ return
+ soft = self.cmd.soft_validate(query)
+ errors = soft['errors']
+ values = soft['values']
+ if errors:
+ self.mode = 'input'
+ for key in self.form:
+ if key in errors:
+ self.form[key].error = errors[key]
+ if key in values:
+ self.form[key].value = values[key]
+ return
+ output = self.cmd(**query)
+ if isinstance(output, dict) and 'summary' in output:
+ self.notification.message = output['summary']
+ params = self.cmd.output_params
+ if params:
+ order = list(params)
+ labels = dict((p.name, p.label) for p in params())
+ else:
+ order = sorted(entry)
+ labels = dict((k, k) for k in order)
+ self.show.order = order
+ self.show.labels = labels
+ self.show.result = output.get('result')
+
+
def create_widgets():
widgets = Collection('freeIPA')
widgets.register_builtins()
@@ -227,6 +401,12 @@ def create_widgets():
widgets.register(IPAPlugins)
widgets.register(Command)
widgets.register(Object)
+ widgets.register(Conditional)
+ widgets.register(Output)
+ widgets.register(Hidden)
+ widgets.register(Notification)
+
+ widgets.register(PageCmd)
freeze(widgets)
diff --git a/make-test b/make-test
index f7bf432bd..2d9449155 100755
--- a/make-test
+++ b/make-test
@@ -1,33 +1,63 @@
-#!/bin/bash
-
-# Script to run nosetests under multiple versions of Python
-
-export IPA_UNIT_TEST_MODE="cli_test"
-versions="python2.4 python2.5 python2.6"
-
-for name in $versions
-do
- executable="/usr/bin/$name"
- if [[ -f $executable ]]; then
- echo "[ $name: Starting tests... ]"
- ((runs += 1))
- if $executable /usr/bin/nosetests --debug-log=/dev/null -v --with-doctest --exclude="plugins"
- then
- echo "[ $name: Tests OK ]"
- else
- echo "[ $name: Tests FAILED ]"
- ((failures += 1))
- fi
- else
- echo "[ $name: Not found ]"
- fi
- echo ""
-done
-
-if [ $failures ]; then
- echo "[ Ran under $runs version(s); FAILED under $failures version(s) ]"
- echo "FAIL!"
- exit $failures
-else
- echo "[ Ran under $runs version(s); all OK ]"
-fi
+#!/usr/bin/env python
+
+"""
+Run IPA unit tests under multiple versions of Python (if present).
+"""
+
+import sys
+import optparse
+import os
+from os import path
+from subprocess import call
+
+versions = ('2.4', '2.5', '2.6', '2.7')
+python = '/usr/bin/python'
+nose = '/usr/bin/nosetests'
+ran = []
+fail = []
+
+parser = optparse.OptionParser(
+ usage='usage: %prog [MODULE...]',
+)
+parser.add_option('--stop',
+ action='store_true',
+ default=False,
+ help='Stop running tests after the first error or failure',
+)
+(options, args) = parser.parse_args()
+
+cmd = [nose] + args + [
+ '-v',
+ '--with-doctest',
+ '--exclude=plugins',
+]
+if options.stop:
+ cmd.append('--stop')
+
+
+# This must be set so ipalib.api gets initialized property for tests:
+os.environ['IPA_UNIT_TEST_MODE'] = 'cli_test'
+
+if not path.isfile(nose):
+ print 'ERROR: need %r' % nose
+ sys.exit(100)
+for v in versions:
+ pver = python + v
+ if not path.isfile(pver):
+ continue
+ if 0 != call([pver] + cmd):
+ fail.append(pver)
+ ran.append(pver)
+
+
+print '=' * 70
+for pver in ran:
+ if pver in fail:
+ print 'FAILED under %r' % pver
+ else:
+ print 'passed under %r' % pver
+print ''
+if fail:
+ print '** FAIL **'
+else:
+ print '** pass **'
diff --git a/tests/test_ipalib/test_backend.py b/tests/test_ipalib/test_backend.py
index 9ddbbf252..3c29ee35f 100644
--- a/tests/test_ipalib/test_backend.py
+++ b/tests/test_ipalib/test_backend.py
@@ -176,7 +176,7 @@ class test_Executioner(ClassChecker):
takes_options = ('option1?', 'option2?')
def execute(self, *args, **options):
assert type(args[1]) is tuple
- return args + (options,)
+ return dict(result=args + (options,))
api.register(echo)
class good(Command):
@@ -198,7 +198,7 @@ class test_Executioner(ClassChecker):
"""
takes_options = 'name'
def execute(self, **options):
- return options['name'].upper()
+ return dict(result=options['name'].upper())
api.register(with_name)
api.finalize()
@@ -222,13 +222,17 @@ class test_Executioner(ClassChecker):
conn = Connection('The connection.', Disconnect())
context.someconn = conn
- assert o.execute('echo', arg1, arg2, **options) == (arg1, arg2, options)
+ assert o.execute('echo', arg1, arg2, **options) == dict(
+ result=(arg1, arg2, options)
+ )
assert conn.disconnect.called is True # Make sure destroy_context() was called
assert context.__dict__.keys() == []
conn = Connection('The connection.', Disconnect())
context.someconn = conn
- assert o.execute('echo', *args, **options) == (arg1, arg2, options)
+ assert o.execute('echo', *args, **options) == dict(
+ result=(arg1, arg2, options)
+ )
assert conn.disconnect.called is True # Make sure destroy_context() was called
assert context.__dict__.keys() == []
@@ -251,4 +255,4 @@ class test_Executioner(ClassChecker):
# Test with option 'name':
conn = Connection('The connection.', Disconnect())
context.someconn = conn
- assert o.execute('with_name', name=u'test') == u'TEST'
+ assert o.execute('with_name', name=u'test') == dict(result=u'TEST')
diff --git a/tests/test_ipalib/test_crud.py b/tests/test_ipalib/test_crud.py
index a7f49f878..47d51c5dc 100644
--- a/tests/test_ipalib/test_crud.py
+++ b/tests/test_ipalib/test_crud.py
@@ -34,7 +34,6 @@ class CrudChecker(ClassChecker):
"""
Return a finalized `ipalib.plugable.API` instance.
"""
- assert self.cls.__bases__ == (frontend.Method,)
(api, home) = get_api()
class user(frontend.Object):
takes_params = (
@@ -52,6 +51,136 @@ class CrudChecker(ClassChecker):
return api
+class test_Create(CrudChecker):
+ """
+ Test the `ipalib.crud.Create` class.
+ """
+
+ _cls = crud.Create
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Create.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Create.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials']
+ for param in api.Method.user_verb.options():
+ assert param.required is True
+ api = self.get_api(options=('extra?',))
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials', 'extra']
+ assert api.Method.user_verb.options.extra.required is False
+
+
+class test_Update(CrudChecker):
+ """
+ Test the `ipalib.crud.Update` class.
+ """
+
+ _cls = crud.Update
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Update.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Update.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'initials']
+ for param in api.Method.user_verb.options():
+ assert param.required is False
+
+
+class test_Retrieve(CrudChecker):
+ """
+ Test the `ipalib.crud.Retrieve` class.
+ """
+
+ _cls = crud.Retrieve
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Retrieve.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Retrieve.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == []
+ assert len(api.Method.user_verb.options) == 0
+
+
+class test_Delete(CrudChecker):
+ """
+ Test the `ipalib.crud.Delete` class.
+ """
+
+ _cls = crud.Delete
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Delete.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['uid']
+ assert api.Method.user_verb.args.uid.required is True
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Delete.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == []
+ assert len(api.Method.user_verb.options) == 0
+
+
+class test_Search(CrudChecker):
+ """
+ Test the `ipalib.crud.Search` class.
+ """
+
+ _cls = crud.Search
+
+ def test_get_args(self):
+ """
+ Test the `ipalib.crud.Search.get_args` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.args) == ['criteria']
+ assert api.Method.user_verb.args.criteria.required is False
+
+ def test_get_options(self):
+ """
+ Test the `ipalib.crud.Search.get_options` method.
+ """
+ api = self.get_api()
+ assert list(api.Method.user_verb.options) == \
+ ['givenname', 'sn', 'uid', 'initials']
+ for param in api.Method.user_verb.options():
+ assert param.required is False
+
+
class test_CrudBackend(ClassChecker):
"""
Test the `ipalib.crud.CrudBackend` class.
diff --git a/tests/test_ipalib/test_frontend.py b/tests/test_ipalib/test_frontend.py
index 71d902403..4cdb8774e 100644
--- a/tests/test_ipalib/test_frontend.py
+++ b/tests/test_ipalib/test_frontend.py
@@ -27,6 +27,7 @@ from tests.util import assert_equal
from ipalib.constants import TYPE_ERROR
from ipalib.base import NameSpace
from ipalib import frontend, backend, plugable, errors, parameters, config
+from ipalib import output
def test_RULE_FLAG():
assert frontend.RULE_FLAG == 'validation_rule'
@@ -317,6 +318,73 @@ class test_Command(ClassChecker):
assert ns.files.required is False
assert ns.files.multivalue is True
+ def test_output(self):
+ """
+ Test the ``ipalib.frontend.Command.output`` instance attribute.
+ """
+ inst = self.cls()
+ assert inst.output is None
+ inst.finalize()
+ assert type(inst.output) is plugable.NameSpace
+ assert list(inst.output) == ['result']
+ assert type(inst.output.result) is output.Output
+
+ def test_iter_output(self):
+ """
+ Test the ``ipalib.frontend.Command._iter_output`` instance attribute.
+ """
+ class Example(self.cls):
+ pass
+ inst = Example()
+
+ inst.has_output = tuple()
+ assert list(inst._iter_output()) == []
+
+ wrong = ['hello', 'world']
+ inst.has_output = wrong
+ e = raises(TypeError, list, inst._iter_output())
+ assert str(e) == 'Example.has_output: need a %r; got a %r: %r' % (
+ tuple, list, wrong
+ )
+
+ wrong = ('hello', 17)
+ inst.has_output = wrong
+ e = raises(TypeError, list, inst._iter_output())
+ assert str(e) == 'Example.has_output[1]: need a %r; got a %r: %r' % (
+ (str, output.Output), int, 17
+ )
+
+ okay = ('foo', output.Output('bar'), 'baz')
+ inst.has_output = okay
+ items = list(inst._iter_output())
+ assert len(items) == 3
+ assert list(o.name for o in items) == ['foo', 'bar', 'baz']
+ for o in items:
+ assert type(o) is output.Output
+
+ def test_soft_validate(self):
+ """
+ Test the `ipalib.frontend.Command.soft_validate` method.
+ """
+ class user_add(frontend.Command):
+ takes_args = parameters.Str('uid',
+ normalizer=lambda value: value.lower(),
+ default_from=lambda givenname, sn: givenname[0] + sn,
+ )
+
+ takes_options = ('givenname', 'sn')
+
+ cmd = user_add()
+ cmd.finalize()
+ assert list(cmd.params) == ['givenname', 'sn', 'uid']
+ ret = cmd.soft_validate({})
+ assert len(ret['values']) == 0
+ assert len(ret['errors']) == 3
+ assert cmd.soft_validate(dict(givenname=u'First', sn=u'Last')) == dict(
+ values=dict(givenname=u'First', sn=u'Last', uid=u'flast'),
+ errors=dict(),
+ )
+
def test_convert(self):
"""
Test the `ipalib.frontend.Command.convert` method.
@@ -517,6 +585,84 @@ class test_Command(ClassChecker):
assert o.run.im_func is self.cls.run.im_func
assert ('forward', args, kw) == o.run(*args, **kw)
+ def test_validate_output(self):
+ """
+ Test the `ipalib.frontend.Command.validate_output` method.
+ """
+ class Example(self.cls):
+ has_output = ('foo', 'bar', 'baz')
+
+ inst = Example()
+ inst.finalize()
+
+ # Test with wrong type:
+ wrong = ('foo', 'bar', 'baz')
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == '%s.validate_output(): need a %r; got a %r: %r' % (
+ 'Example', dict, tuple, wrong
+ )
+
+ # Test with a missing keys:
+ wrong = dict(bar='hello')
+ e = raises(ValueError, inst.validate_output, wrong)
+ assert str(e) == '%s.validate_output(): missing keys %r in %r' % (
+ 'Example', ['baz', 'foo'], wrong
+ )
+
+ # Test with extra keys:
+ wrong = dict(foo=1, bar=2, baz=3, fee=4, azz=5)
+ e = raises(ValueError, inst.validate_output, wrong)
+ assert str(e) == '%s.validate_output(): unexpected keys %r in %r' % (
+ 'Example', ['azz', 'fee'], wrong
+ )
+
+ # Test with per item type validation:
+ class Complex(self.cls):
+ has_output = (
+ output.Output('foo', int),
+ output.Output('bar', list),
+ )
+ inst = Complex()
+ inst.finalize()
+
+ wrong = dict(foo=17.9, bar=[18])
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == '%s:\n output[%r]: need %r; got %r: %r' % (
+ 'Complex.validate_output()', 'foo', int, float, 17.9
+ )
+
+ wrong = dict(foo=18, bar=17)
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == '%s:\n output[%r]: need %r; got %r: %r' % (
+ 'Complex.validate_output()', 'bar', list, int, 17
+ )
+
+ class Subclass(output.ListOfEntries):
+ pass
+
+ # Test nested validation:
+ class nested(self.cls):
+ has_output = (
+ output.Output('hello', int),
+ Subclass('world'),
+ )
+ inst = nested()
+ inst.finalize()
+ okay = dict(foo='bar')
+ nope = ('aye', 'bee')
+
+ wrong = dict(hello=18, world=[okay, nope, okay])
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == output.emsg % (
+ 'nested', 'Subclass', 'world', 1, dict, tuple, nope
+ )
+
+ wrong = dict(hello=18, world=[okay, okay, okay, okay, nope])
+ e = raises(TypeError, inst.validate_output, wrong)
+ assert str(e) == output.emsg % (
+ 'nested', 'Subclass', 'world', 4, dict, tuple, nope
+ )
+
class test_LocalOrRemote(ClassChecker):
"""
@@ -544,32 +690,46 @@ class test_LocalOrRemote(ClassChecker):
takes_args = 'key?'
def forward(self, *args, **options):
- return ('forward', args, options)
+ return dict(result=('forward', args, options))
def execute(self, *args, **options):
- return ('execute', args, options)
+ return dict(result=('execute', args, options))
# Test when in_server=False:
(api, home) = create_test_api(in_server=False)
api.register(example)
api.finalize()
cmd = api.Command.example
- assert cmd() == ('execute', (None,), dict(server=False))
- assert cmd(u'var') == ('execute', (u'var',), dict(server=False))
- assert cmd(server=True) == ('forward', (None,), dict(server=True))
- assert cmd(u'var', server=True) == \
- ('forward', (u'var',), dict(server=True))
+ assert cmd() == dict(
+ result=('execute', (None,), dict(server=False))
+ )
+ assert cmd(u'var') == dict(
+ result=('execute', (u'var',), dict(server=False))
+ )
+ assert cmd(server=True) == dict(
+ result=('forward', (None,), dict(server=True))
+ )
+ assert cmd(u'var', server=True) == dict(
+ result=('forward', (u'var',), dict(server=True))
+ )
# Test when in_server=True (should always call execute):
(api, home) = create_test_api(in_server=True)
api.register(example)
api.finalize()
cmd = api.Command.example
- assert cmd() == ('execute', (None,), dict(server=False))
- assert cmd(u'var') == ('execute', (u'var',), dict(server=False))
- assert cmd(server=True) == ('execute', (None,), dict(server=True))
- assert cmd(u'var', server=True) == \
- ('execute', (u'var',), dict(server=True))
+ assert cmd() == dict(
+ result=('execute', (None,), dict(server=False))
+ )
+ assert cmd(u'var') == dict(
+ result=('execute', (u'var',), dict(server=False))
+ )
+ assert cmd(server=True) == dict(
+ result=('execute', (None,), dict(server=True))
+ )
+ assert cmd(u'var', server=True) == dict(
+ result=('execute', (u'var',), dict(server=True))
+ )
class test_Object(ClassChecker):
@@ -834,6 +994,26 @@ class test_Method(ClassChecker):
"""
_cls = frontend.Method
+ def get_api(self, args=tuple(), options=tuple()):
+ """
+ Return a finalized `ipalib.plugable.API` instance.
+ """
+ (api, home) = create_test_api()
+ class user(frontend.Object):
+ takes_params = (
+ 'givenname',
+ 'sn',
+ frontend.Param('uid', primary_key=True),
+ 'initials',
+ )
+ class user_verb(self.cls):
+ takes_args = args
+ takes_options = options
+ api.register(user)
+ api.register(user_verb)
+ api.finalize()
+ return api
+
def test_class(self):
"""
Test the `ipalib.frontend.Method` class.
@@ -856,6 +1036,8 @@ class test_Method(ClassChecker):
assert frontend.Attribute.implemented_by(o)
+
+
class test_Property(ClassChecker):
"""
Test the `ipalib.frontend.Property` class.
diff --git a/tests/test_ipalib/test_output.py b/tests/test_ipalib/test_output.py
new file mode 100644
index 000000000..ceb825c44
--- /dev/null
+++ b/tests/test_ipalib/test_output.py
@@ -0,0 +1,88 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.output` module.
+"""
+
+from tests.util import raises, ClassChecker
+from ipalib import output
+from ipalib.frontend import Command
+
+class test_Output(ClassChecker):
+ """
+ Test the `ipalib.output.Output` class.
+ """
+
+ _cls = output.Output
+
+ def test_init(self):
+ """
+ Test the `ipalib.output.Output.__init__` method.
+ """
+ o = self.cls('result')
+ assert o.name == 'result'
+ assert o.type is None
+ assert o.doc is None
+
+ def test_repr(self):
+ """
+ Test the `ipalib.output.Output.__repr__` method.
+ """
+ o = self.cls('aye')
+ assert repr(o) == "Output('aye', None, None)"
+ o = self.cls('aye', type=int, doc='An A, aye?')
+ assert repr(o) == "Output('aye', %r, 'An A, aye?')" % int
+
+ class Entry(self.cls):
+ pass
+ o = Entry('aye')
+ assert repr(o) == "Entry('aye', None, None)"
+ o = Entry('aye', type=int, doc='An A, aye?')
+ assert repr(o) == "Entry('aye', %r, 'An A, aye?')" % int
+
+
+class test_ListOfEntries(ClassChecker):
+ """
+ Test the `ipalib.output.ListOfEntries` class.
+ """
+
+ _cls = output.ListOfEntries
+
+ def test_validate(self):
+ """
+ Test the `ipalib.output.ListOfEntries.validate` method.
+ """
+ class example(Command):
+ pass
+ cmd = example()
+ inst = self.cls('stuff')
+
+ okay = dict(foo='bar')
+ nope = ('aye', 'bee')
+
+ e = raises(TypeError, inst.validate, cmd, [okay, okay, nope])
+ assert str(e) == output.emsg % (
+ 'example', 'ListOfEntries', 'stuff', 2, dict, tuple, nope
+ )
+
+ e = raises(TypeError, inst.validate, cmd, [nope, okay, nope])
+ assert str(e) == output.emsg % (
+ 'example', 'ListOfEntries', 'stuff', 0, dict, tuple, nope
+ )
diff --git a/tests/test_ipalib/test_parameters.py b/tests/test_ipalib/test_parameters.py
index d1c5f7f92..1f0a7aec8 100644
--- a/tests/test_ipalib/test_parameters.py
+++ b/tests/test_ipalib/test_parameters.py
@@ -178,8 +178,8 @@ class test_Param(ClassChecker):
# Test default kwarg values:
assert o.cli_name is name
- assert o.label is None
- assert o.doc == ''
+ assert o.label == '<my_param>'
+ assert o.doc == '<my_param>'
assert o.required is True
assert o.multivalue is False
assert o.primary_key is False
@@ -195,6 +195,16 @@ class test_Param(ClassChecker):
assert o.exclude is None
assert o.flags == frozenset()
+ # Test that doc defaults from label:
+ o = self.cls('my_param', doc='Hello world')
+ assert o.label == '<my_param>'
+ assert o.doc == 'Hello world'
+
+ o = self.cls('my_param', label='My Param')
+ assert o.label == 'My Param'
+ assert o.doc == 'My Param'
+
+
# Test that ValueError is raised when a kwarg from a subclass
# conflicts with an attribute:
class Subclass(self.cls):
@@ -352,50 +362,6 @@ class test_Param(ClassChecker):
assert clone.param_spec == 'my_param'
assert clone.name == 'my_param'
- def test_get_label(self):
- """
- Test the `ipalib.parameters.get_label` method.
- """
- context = request.context
- cli_name = 'the_cli_name'
- message = 'The Label'
- label = lambda _: _(message)
- o = self.cls('name', cli_name=cli_name, label=label)
- assert o.label is label
-
- ## Scenario 1: label=callable (a lambda form)
-
- # Test with no context.ugettext:
- assert not hasattr(context, 'ugettext')
- assert_equal(o.get_label(), u'The Label')
-
- # Test with dummy context.ugettext:
- assert not hasattr(context, 'ugettext')
- dummy = dummy_ugettext()
- context.ugettext = dummy
- assert o.get_label() is dummy.translation
- assert dummy.message is message
- del context.ugettext
-
- ## Scenario 2: label=None
- o = self.cls('name', cli_name=cli_name)
- assert o.label is None
-
- # Test with no context.ugettext:
- assert not hasattr(context, 'ugettext')
- assert_equal(o.get_label(), u'the_cli_name')
-
- # Test with dummy context.ugettext:
- assert not hasattr(context, 'ugettext')
- dummy = dummy_ugettext()
- context.ugettext = dummy
- assert_equal(o.get_label(), u'the_cli_name')
- assert not hasattr(dummy, 'message')
-
- # Cleanup
- del context.ugettext
- assert not hasattr(context, 'ugettext')
-
def test_convert(self):
"""
Test the `ipalib.parameters.Param.convert` method.
diff --git a/tests/test_ipalib/test_text.py b/tests/test_ipalib/test_text.py
new file mode 100644
index 000000000..924534a03
--- /dev/null
+++ b/tests/test_ipalib/test_text.py
@@ -0,0 +1,120 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty contextrmation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Test the `ipalib.text` module.
+"""
+
+from tests.util import raises, assert_equal
+from tests.data import utf8_bytes, unicode_str
+from ipalib import text
+
+singular = '%(count)d goose makes a %(dish)s'
+plural = '%(count)d geese make a %(dish)s'
+
+
+class test_LazyText(object):
+
+ klass = text.LazyText
+
+ def test_init(self):
+ inst = self.klass('foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+
+
+class test_Gettext(object):
+
+ klass = text.Gettext
+
+ def test_init(self):
+ inst = self.klass(utf8_bytes, 'foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+ assert inst.msg is utf8_bytes
+
+ def test_unicode(self):
+ inst = self.klass(utf8_bytes, 'foo', 'bar')
+ assert unicode(inst) == unicode_str
+
+ def test_mod(self):
+ inst = self.klass('hello %(adj)s nurse', 'foo', 'bar')
+ assert inst % dict(adj='naughty', stuff='junk') == 'hello naughty nurse'
+
+
+class test_NGettext(object):
+
+ klass = text.NGettext
+
+ def test_init(self):
+ inst = self.klass(singular, plural, 'foo', 'bar')
+ assert inst.singular is singular
+ assert inst.plural is plural
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+
+ def test_call(self):
+ inst = self.klass(singular, plural, 'foo', 'bar')
+ assert inst(0) == plural
+ assert inst(1) == singular
+ assert inst(2) == plural
+ assert inst(3) == plural
+
+ def test_mod(self):
+ inst = self.klass(singular, plural, 'foo', 'bar')
+ assert inst % dict(count=0, dish='frown') == '0 geese make a frown'
+ assert inst % dict(count=1, dish='stew') == '1 goose makes a stew'
+ assert inst % dict(count=2, dish='pie') == '2 geese make a pie'
+
+
+class test_gettext_factory(object):
+
+ klass = text.gettext_factory
+
+ def test_init(self):
+ inst = self.klass('foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+
+ def test_call(self):
+ inst = self.klass('foo', 'bar')
+ g = inst(utf8_bytes)
+ assert type(g) is text.Gettext
+ assert g.msg is utf8_bytes
+ assert g.domain == 'foo'
+ assert g.localedir == 'bar'
+
+
+class test_ngettext_factory(object):
+
+ klass = text.ngettext_factory
+
+ def test_init(self):
+ inst = self.klass('foo', 'bar')
+ assert inst.domain == 'foo'
+ assert inst.localedir == 'bar'
+
+ def test_call(self):
+ inst = self.klass('foo', 'bar')
+ ng = inst(singular, plural, 7)
+ assert type(ng) is text.NGettext
+ assert ng.singular is singular
+ assert ng.plural is plural
+ assert ng.domain == 'foo'
+ assert ng.localedir == 'bar'
diff --git a/tests/test_util.py b/tests/test_util.py
index 7d7038c1a..b2a53101a 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -22,6 +22,7 @@ Test the `tests.util` module.
"""
import util
+from util import raises, TYPE, VALUE, LEN, KEYS
class Prop(object):
@@ -47,6 +48,136 @@ class Prop(object):
prop = property(__get_prop, __set_prop, __del_prop)
+def test_assert_deepequal():
+ f = util.assert_deepequal
+
+ # Test with good scalar values:
+ f(u'hello', u'hello', 'foo')
+ f(18, 18, 'foo')
+
+ # Test with bad scalar values:
+ e = raises(AssertionError, f, u'hello', u'world', 'foo')
+ assert str(e) == VALUE % (
+ 'foo', u'hello', u'world', tuple()
+ )
+
+ e = raises(AssertionError, f, 'hello', u'hello', 'foo')
+ assert str(e) == TYPE % (
+ 'foo', str, unicode, 'hello', u'hello', tuple()
+ )
+
+ e = raises(AssertionError, f, 18, 18.0, 'foo')
+ assert str(e) == TYPE % (
+ 'foo', int, float, 18, 18.0, tuple()
+ )
+
+ # Test with good compound values:
+ a = [
+ u'hello',
+ dict(naughty=u'nurse'),
+ 18,
+ ]
+ b = [
+ u'hello',
+ dict(naughty=u'nurse'),
+ 18,
+ ]
+ f(a, b)
+
+ # Test with bad compound values:
+ b = [
+ 'hello',
+ dict(naughty=u'nurse'),
+ 18,
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == TYPE % (
+ 'foo', unicode, str, u'hello', 'hello', (0,)
+ )
+
+ b = [
+ u'hello',
+ dict(naughty='nurse'),
+ 18,
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == TYPE % (
+ 'foo', unicode, str, u'nurse', 'nurse', (1, 'naughty')
+ )
+
+ b = [
+ u'hello',
+ dict(naughty=u'nurse'),
+ 18.0,
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == TYPE % (
+ 'foo', int, float, 18, 18.0, (2,)
+ )
+
+ # List length mismatch
+ b = [
+ u'hello',
+ dict(naughty=u'nurse'),
+ 18,
+ 19
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == LEN % (
+ 'foo', 3, 4, a, b, tuple()
+ )
+
+ b = [
+ dict(naughty=u'nurse'),
+ 18,
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == LEN % (
+ 'foo', 3, 2, a, b, tuple()
+ )
+
+ # Dict keys mismatch:
+
+ # Missing
+ b = [
+ u'hello',
+ dict(),
+ 18,
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == KEYS % ('foo',
+ ['naughty'], [],
+ dict(naughty=u'nurse'), dict(),
+ (1,)
+ )
+
+ # Extra
+ b = [
+ u'hello',
+ dict(naughty=u'nurse', barely=u'legal'),
+ 18,
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == KEYS % ('foo',
+ [], ['barely'],
+ dict(naughty=u'nurse'), dict(naughty=u'nurse', barely=u'legal'),
+ (1,)
+ )
+
+ # Missing + Extra
+ b = [
+ u'hello',
+ dict(barely=u'legal'),
+ 18,
+ ]
+ e = raises(AssertionError, f, a, b, 'foo')
+ assert str(e) == KEYS % ('foo',
+ ['naughty'], ['barely'],
+ dict(naughty=u'nurse'), dict(barely=u'legal'),
+ (1,)
+ )
+
+
def test_yes_raised():
f = util.raises
diff --git a/tests/test_xmlrpc/test_automount_plugin.py b/tests/test_xmlrpc/test_automount_plugin.py
index 875430b4f..355f9f827 100644
--- a/tests/test_xmlrpc/test_automount_plugin.py
+++ b/tests/test_xmlrpc/test_automount_plugin.py
@@ -46,15 +46,17 @@ class test_automount(XMLRPC_test):
"""
Test adding a location `xmlrpc.automountlocation_add` method.
"""
- (dn, res) = api.Command['automountlocation_add'](**self.loc_kw)
- assert res
- assert_attr_equal(res, 'cn', self.locname)
+ ret = self.failsafe_add(
+ api.Object.automountlocation, self.locname
+ )
+ entry = ret['result']
+ assert_attr_equal(entry, 'cn', self.locname)
def test_1_automountmap_add(self):
"""
Test adding a map `xmlrpc.automountmap_add` method.
"""
- (dn, res) = api.Command['automountmap_add'](**self.map_kw)
+ res = api.Command['automountmap_add'](**self.map_kw)['result']
assert res
assert_attr_equal(res, 'automountmapname', self.mapname)
@@ -62,7 +64,7 @@ class test_automount(XMLRPC_test):
"""
Test adding a key using `xmlrpc.automountkey_add` method.
"""
- (dn, res) = api.Command['automountkey_add'](**self.key_kw2)
+ res = api.Command['automountkey_add'](**self.key_kw2)['result']
assert res
assert_attr_equal(res, 'automountkey', self.keyname2)
@@ -70,7 +72,7 @@ class test_automount(XMLRPC_test):
"""
Test adding a key using `xmlrpc.automountkey_add` method.
"""
- (dn, res) = api.Command['automountkey_add'](**self.key_kw)
+ res = api.Command['automountkey_add'](**self.key_kw)['result']
assert res
assert_attr_equal(res, 'automountkey', self.keyname)
@@ -89,7 +91,7 @@ class test_automount(XMLRPC_test):
"""
Test the `xmlrpc.automountmap_show` method.
"""
- (dn, res) = api.Command['automountmap_show'](self.locname, self.mapname, raw=True)
+ res = api.Command['automountmap_show'](self.locname, self.mapname, raw=True)['result']
assert res
assert_attr_equal(res, 'automountmapname', self.mapname)
@@ -97,16 +99,15 @@ class test_automount(XMLRPC_test):
"""
Test the `xmlrpc.automountmap_find` method.
"""
- (res, truncated) = api.Command['automountmap_find'](self.locname, self.mapname, raw=True)
- assert res
- assert_attr_equal(res[0][1], 'automountmapname', self.mapname)
+ res = api.Command['automountmap_find'](self.locname, self.mapname, raw=True)['result']
+ assert_attr_equal(res[0], 'automountmapname', self.mapname)
def test_7_automountkey_show(self):
"""
Test the `xmlrpc.automountkey_show` method.
"""
showkey_kw={'cn': self.locname, 'automountmapname': self.mapname, 'automountkey': self.keyname, 'raw': True}
- (dn, res) = api.Command['automountkey_show'](**showkey_kw)
+ res = api.Command['automountkey_show'](**showkey_kw)['result']
assert res
assert_attr_equal(res, 'automountkey', self.keyname)
assert_attr_equal(res, 'automountinformation', self.info)
@@ -115,11 +116,11 @@ class test_automount(XMLRPC_test):
"""
Test the `xmlrpc.automountkey_find` method.
"""
- (res, truncated) = api.Command['automountkey_find'](self.locname, self.mapname, raw=True)
+ res = api.Command['automountkey_find'](self.locname, self.mapname, raw=True)['result']
assert res
assert len(res) == 2
- assert_attr_equal(res[1][1], 'automountkey', self.keyname)
- assert_attr_equal(res[1][1], 'automountinformation', self.info)
+ assert_attr_equal(res[1], 'automountkey', self.keyname)
+ assert_attr_equal(res[1], 'automountinformation', self.info)
def test_9_automountkey_mod(self):
"""
@@ -127,7 +128,7 @@ class test_automount(XMLRPC_test):
"""
self.key_kw['automountinformation'] = u'rw'
self.key_kw['description'] = u'new description'
- (dn, res) = api.Command['automountkey_mod'](**self.key_kw)
+ res = api.Command['automountkey_mod'](**self.key_kw)['result']
assert res
assert_attr_equal(res, 'automountinformation', 'rw')
assert_attr_equal(res, 'description', 'new description')
@@ -137,7 +138,7 @@ class test_automount(XMLRPC_test):
Test the `xmlrpc.automountmap_mod` method.
"""
self.map_kw['description'] = u'new description'
- (dn, res) = api.Command['automountmap_mod'](**self.map_kw)
+ res = api.Command['automountmap_mod'](**self.map_kw)['result']
assert res
assert_attr_equal(res, 'description', 'new description')
@@ -146,7 +147,7 @@ class test_automount(XMLRPC_test):
Test the `xmlrpc.automountkey_del` method.
"""
delkey_kw={'cn': self.locname, 'automountmapname': self.mapname, 'automountkey': self.keyname, 'raw': True}
- res = api.Command['automountkey_del'](**delkey_kw)
+ res = api.Command['automountkey_del'](**delkey_kw)['result']
assert res == True
# Verify that it is gone
@@ -161,7 +162,7 @@ class test_automount(XMLRPC_test):
"""
Test the `xmlrpc.automountlocation_del` method.
"""
- res = api.Command['automountlocation_del'](self.locname)
+ res = api.Command['automountlocation_del'](self.locname)['result']
assert res == True
# Verify that it is gone
@@ -201,7 +202,7 @@ class test_automount_indirect(XMLRPC_test):
"""
Test adding a location.
"""
- (dn, res) = api.Command['automountlocation_add'](self.locname, raw=True)
+ res = api.Command['automountlocation_add'](self.locname, raw=True)['result']
assert res
assert_attr_equal(res, 'cn', self.locname)
@@ -209,7 +210,7 @@ class test_automount_indirect(XMLRPC_test):
"""
Test adding an indirect map.
"""
- (dn, res) = api.Command['automountmap_add_indirect'](self.locname, self.mapname, **self.map_kw)
+ res = api.Command['automountmap_add_indirect'](self.locname, self.mapname, **self.map_kw)['result']
assert res
assert_attr_equal(res, 'automountmapname', self.mapname)
@@ -217,7 +218,7 @@ class test_automount_indirect(XMLRPC_test):
"""
Test the `xmlrpc.automountmap_show` method.
"""
- (dn, res) = api.Command['automountkey_show'](self.locname, self.parentmap, self.keyname, raw=True)
+ res = api.Command['automountkey_show'](self.locname, self.parentmap, self.keyname, raw=True)['result']
assert res
assert_attr_equal(res, 'automountkey', self.keyname)
@@ -226,7 +227,7 @@ class test_automount_indirect(XMLRPC_test):
Remove the indirect key /home.
"""
delkey_kw = {'cn': self.locname, 'automountmapname': self.parentmap, 'automountkey': self.keyname}
- res = api.Command['automountkey_del'](**delkey_kw)
+ res = api.Command['automountkey_del'](**delkey_kw)['result']
assert res == True
# Verify that it is gone
@@ -241,7 +242,7 @@ class test_automount_indirect(XMLRPC_test):
"""
Remove the indirect map for auto.home.
"""
- res = api.Command['automountmap_del'](self.locname, self.mapname)
+ res = api.Command['automountmap_del'](self.locname, self.mapname)['result']
assert res == True
# Verify that it is gone
@@ -256,7 +257,7 @@ class test_automount_indirect(XMLRPC_test):
"""
Remove the location.
"""
- res = api.Command['automountlocation_del'](self.locname)
+ res = api.Command['automountlocation_del'](self.locname)['result']
assert res == True
# Verity that it is gone
@@ -283,16 +284,15 @@ class test_automount_indirect_no_parent(XMLRPC_test):
"""
Test adding a location.
"""
- (dn, res) = api.Command['automountlocation_add'](self.locname, raw=True)
+ res = api.Command['automountlocation_add'](self.locname, raw=True)['result']
assert res
assert_attr_equal(res, 'cn', self.locname)
-
def test_1_automountmap_add_indirect(self):
"""
Test adding an indirect map with default parent.
"""
- (dn, res) = api.Command['automountmap_add_indirect'](self.locname, self.mapname, **self.map_kw)
+ res = api.Command['automountmap_add_indirect'](self.locname, self.mapname, **self.map_kw)['result']
assert res
assert_attr_equal(res, 'automountmapname', self.mapname)
@@ -301,7 +301,7 @@ class test_automount_indirect_no_parent(XMLRPC_test):
Test the `xmlrpc.automountkey_show` method with default parent.
"""
showkey_kw = {'cn': self.locname, 'automountmapname': self.parentmap, 'automountkey': self.keyname, 'raw': True}
- (dn, res) = api.Command['automountkey_show'](**showkey_kw)
+ res = api.Command['automountkey_show'](**showkey_kw)['result']
assert res
assert_attr_equal(res, 'automountkey', self.keyname)
@@ -310,7 +310,7 @@ class test_automount_indirect_no_parent(XMLRPC_test):
Remove the indirect key /home.
"""
delkey_kw={'cn': self.locname, 'automountmapname': self.parentmap, 'automountkey': self.keyname}
- res = api.Command['automountkey_del'](**delkey_kw)
+ res = api.Command['automountkey_del'](**delkey_kw)['result']
assert res == True
# Verify that it is gone
@@ -325,7 +325,7 @@ class test_automount_indirect_no_parent(XMLRPC_test):
"""
Remove the indirect map for auto.home.
"""
- res = api.Command['automountmap_del'](self.locname, self.mapname)
+ res = api.Command['automountmap_del'](self.locname, self.mapname)['result']
assert res == True
# Verify that it is gone
@@ -340,7 +340,7 @@ class test_automount_indirect_no_parent(XMLRPC_test):
"""
Remove the location.
"""
- res = api.Command['automountlocation_del'](self.locname)
+ res = api.Command['automountlocation_del'](self.locname)['result']
assert res == True
# Verity that it is gone
@@ -350,4 +350,3 @@ class test_automount_indirect_no_parent(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_cert.py b/tests/test_xmlrpc/test_cert.py
index 37c993644..6d23c69ad 100644
--- a/tests/test_xmlrpc/test_cert.py
+++ b/tests/test_xmlrpc/test_cert.py
@@ -27,6 +27,7 @@ from ipalib import api
from ipalib import errors
import tempfile
from ipapython import ipautil
+import nose
class test_cert(XMLRPC_test):
@@ -37,6 +38,8 @@ class test_cert(XMLRPC_test):
return ipautil.run(new_args, stdin)
def setUp(self):
+ if 'cert_request' not in api.Command:
+ raise nose.SkipTest('cert_request not registered')
super(test_cert, self).setUp()
self.reqdir = tempfile.mkdtemp(prefix = "tmp-")
self.reqfile = self.reqdir + "/test.csr"
diff --git a/tests/test_xmlrpc/test_group_plugin.py b/tests/test_xmlrpc/test_group_plugin.py
index 20061cfdd..89947d22d 100644
--- a/tests/test_xmlrpc/test_group_plugin.py
+++ b/tests/test_xmlrpc/test_group_plugin.py
@@ -23,174 +23,376 @@ Test the `ipalib/plugins/group.py` module.
import sys
from xmlrpc_test import XMLRPC_test, assert_attr_equal
-from ipalib import api
-from ipalib import errors
-
-
-class test_group(XMLRPC_test):
- """
- Test the `group` plugin.
- """
- cn = u'testgroup'
- cn2 = u'testgroup2'
- cnposix = u'posixgroup'
- description = u'This is a test'
- kw = {'description': description, 'cn': cn, 'raw': True}
-
- def test_1_group_add(self):
- """
- Test the `xmlrpc.group_add` method: testgroup.
- """
- (dn, res) = api.Command['group_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
-
- def test_2_group_add(self):
- """
- Test the `xmlrpc.group_add` method duplicate detection.
- """
- try:
- api.Command['group_add'](**self.kw)
- except errors.DuplicateEntry:
- pass
-
- def test_3_group_add(self):
- """
- Test the `xmlrpc.group_add` method: testgroup2.
- """
- self.kw['cn'] = self.cn2
- (dn, res) = api.Command['group_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn2)
-
- def test_3_group_add_member(self):
- """
- Test the `xmlrpc.group_add_member` method.
- """
- kw = {'raw': True}
- kw['group'] = self.cn2
- (total, failed, res) = api.Command['group_add_member'](self.cn, **kw)
- assert total == 1, '%r %r %r' % (total, failed, res)
-
- def test_4_group_add_member(self):
- """
- Test the `xmlrpc.group_add_member` with a non-existent member
- """
- kw = {'raw': True}
- kw['group'] = u'notfound'
- (total, failed, res) = api.Command['group_add_member'](self.cn, **kw)
- assert total == 0
- assert 'member' in failed
- assert 'group' in failed['member']
- assert 'notfound' in failed['member']['group']
-
- def test_5_group_show(self):
- """
- Test the `xmlrpc.group_show` method.
- """
- (dn, res) = api.Command['group_show'](self.cn, raw=True)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
-
- def test_6_group_find(self):
- """
- Test the `xmlrpc.group_find` method.
- """
- (res, truncated) = api.Command['group_find'](cn=self.cn, raw=True)
- assert res
- assert_attr_equal(res[0][1], 'description', self.description)
- assert_attr_equal(res[0][1], 'cn', self.cn)
-
- def test_7_group_mod(self):
- """
- Test the `xmlrpc.group_mod` method.
- """
- modkw = self.kw
- modkw['cn'] = self.cn
- modkw['description'] = u'New description'
- (dn, res) = api.Command['group_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'description', 'New description')
- # Ok, double-check that it was changed
- (dn, res) = api.Command['group_show'](self.cn)
- assert res
- assert_attr_equal(res, 'description', 'New description')
- assert_attr_equal(res, 'cn', self.cn)
-
- def test_8_group_mod(self):
- """
- Test the `xmlrpc.group_mod` method, promote a posix group
- """
- modkw = self.kw
- modkw['cn'] = self.cn
- modkw['posix'] = True
- modkw['all'] = True
- modkw['raw'] = True
- (dn, res) = api.Command['group_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'description', 'New description')
- assert_attr_equal(res, 'cn', self.cn)
- # Ok, double-check that it was changed
- (dn, res) = api.Command['group_show'](self.cn, all=True, raw=True)
- assert res
- assert_attr_equal(res, 'description', 'New description')
- assert_attr_equal(res, 'cn', self.cn)
- assert res.get('gidnumber', '')
-
- def test_9_group_remove_member(self):
- """
- Test the `xmlrpc.group_remove_member` method.
- """
- kw = {'raw': True}
- kw['group'] = self.cn2
- (total, failed, res) = api.Command['group_remove_member'](self.cn, **kw)
- assert res
- assert total == 1
-
- def test_a_group_remove_member(self):
- """
- Test the `xmlrpc.group_remove_member` method with non-member
- """
- kw = {'raw': True}
- kw['group'] = u'notfound'
- # an error isn't thrown, the list of failed members is returned
- (total, failed, res) = api.Command['group_remove_member'](self.cn, **kw)
- assert total == 0
- assert 'member' in failed
- assert 'group' in failed['member']
- assert 'notfound' in failed['member']['group']
-
- def test_b_group_del(self):
- """
- Test the `xmlrpc.group_del` method: testgroup.
- """
- res = api.Command['group_del'](self.cn)
- assert res == True
-
- # Verify that it is gone
- try:
- api.Command['group_show'](self.cn)
- except errors.NotFound:
- pass
- else:
- assert False
-
- def test_c_group_del(self):
- """
- Test the `xmlrpc.group_del` method: testgroup2.
- """
- res = api.Command['group_del'](self.cn2)
- assert res == True
-
- # Verify that it is gone
- try:
- api.Command['group_show'](self.cn2)
- except errors.NotFound:
- pass
- else:
- assert False
+from ipalib import api, errors
+from xmlrpc_test import Declarative
+
+group_objectclass = (
+ u'top',
+ u'groupofnames',
+ u'nestedgroup',
+ u'ipausergroup',
+ u'ipaobject',
+)
+
+
+class test_group(Declarative):
+ cleanup_commands = [
+ ('group_del', [u'testgroup1'], {}),
+ ('group_del', [u'testgroup2'], {}),
+ ]
+
+ tests = [
+ # testgroup1:
+ dict(
+ desc='Try to retrieve a non-existant testgroup1',
+ command=('group_show', [u'testgroup2'], {}),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+
+ dict(
+ desc='Create testgroup1',
+ command=(
+ 'group_add', [u'testgroup1'], dict(description=u'Test desc 1')
+ ),
+ expected=dict(
+ value=u'testgroup1',
+ result=dict(
+ cn=(u'testgroup1',),
+ description=(u'Test desc 1',),
+ objectclass=group_objectclass,
+ ),
+ summary=u'Added group "testgroup1"',
+ ),
+ ignore_values=['ipauniqueid'],
+ ),
+
+ dict(
+ desc='Try to create testgroup1 again',
+ command=(
+ 'group_add', [u'testgroup1'], dict(description=u'Test desc 1')
+ ),
+ expected=errors.DuplicateEntry(),
+ ),
+
+ dict(
+ desc='Retrieve testgroup1',
+ command=('group_show', [u'testgroup1'], {}),
+ expected=dict(
+ value=u'testgroup1',
+ result=dict(
+ cn=(u'testgroup1',),
+ description=(u'Test desc 1',),
+ ),
+ summary=None,
+ ),
+ ignore_values=['dn'],
+ ),
+
+ dict(
+ desc='Updated testgroup1',
+ command=(
+ 'group_mod', [u'testgroup1'], dict(description=u'New desc 1')
+ ),
+ expected=dict(
+ result=dict(
+ description=(u'New desc 1',),
+ ),
+ summary=u'Modified group "testgroup1"',
+ value=u'testgroup1',
+ ),
+ ),
+
+ dict(
+ desc='Retrieve testgroup1 to check update',
+ command=('group_show', [u'testgroup1'], {}),
+ expected=dict(
+ value=u'testgroup1',
+ result=dict(
+ cn=(u'testgroup1',),
+ description=(u'New desc 1',),
+ ),
+ summary=None,
+ ),
+ ignore_values=['dn'],
+ ),
+
+ # FIXME: The return value is totally different here than from the above
+ # group_mod() test. I think that for all *_mod() commands we should
+ # just return the entry exactly as *_show() does.
+ dict(
+ desc='Updated testgroup1 to promote it to posix group',
+ command=('group_mod', [u'testgroup1'], dict(posix=True)),
+ expected=dict(
+ result=dict(
+ cn=(u'testgroup1',),
+ description=(u'New desc 1',),
+ objectclass=group_objectclass + (u'posixgroup',),
+ ),
+ value=u'testgroup1',
+ summary=u'Modified group "testgroup1"',
+ ),
+ ignore_values=['gidnumber', 'ipauniqueid'],
+ ),
+
+ dict(
+ desc="Retrieve testgroup1 to check it's a posix group",
+ command=('group_show', [u'testgroup1'], {}),
+ expected=dict(
+ value=u'testgroup1',
+ result=dict(
+ cn=(u'testgroup1',),
+ description=(u'New desc 1',),
+ ),
+ summary=None,
+ ),
+ ignore_values=['dn', 'gidnumber'],
+ ),
+
+ dict(
+ desc='Search for testgroup1',
+ command=('group_find', [], dict(cn=u'testgroup1')),
+ expected=dict(
+ count=1,
+ truncated=False,
+ result=(
+ dict(
+ cn=(u'testgroup1',),
+ description=(u'New desc 1',),
+ ),
+ ),
+ summary=u'1 group matched',
+ ),
+ ignore_values=['gidnumber'],
+ ),
+
+
+ # testgroup2:
+ dict(
+ desc='Try to retrieve a non-existant testgroup2',
+ command=('group_show', [u'testgroup2'], {}),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+
+ dict(
+ desc='Create testgroup2',
+ command=(
+ 'group_add', [u'testgroup2'], dict(description=u'Test desc 2')
+ ),
+ expected=dict(
+ value=u'testgroup2',
+ result=dict(
+ cn=(u'testgroup2',),
+ description=(u'Test desc 2',),
+ objectclass=group_objectclass,
+ ),
+ summary=u'Added group "testgroup2"',
+ ),
+ ignore_values=['ipauniqueid'],
+ ),
+
+ dict(
+ desc='Try to create testgroup2 again',
+ command=(
+ 'group_add', [u'testgroup2'], dict(description=u'Test desc 2')
+ ),
+ expected=errors.DuplicateEntry(),
+ ),
+
+ dict(
+ desc='Retrieve testgroup2',
+ command=('group_show', [u'testgroup2'], {}),
+ expected=dict(
+ value=u'testgroup2',
+ result=dict(
+ cn=(u'testgroup2',),
+ description=(u'Test desc 2',),
+ ),
+ summary=None,
+ ),
+ ignore_values=['dn'],
+ ),
+
+ dict(
+ desc='Search for testgroup2',
+ command=('group_find', [], dict(cn=u'testgroup2')),
+ expected=dict(
+ count=1,
+ truncated=False,
+ result=(
+ dict(
+ cn=(u'testgroup2',),
+ description=(u'Test desc 2',),
+ ),
+ ),
+ summary=u'1 group matched',
+ ),
+ ),
+
+ dict(
+ desc='Updated testgroup2',
+ command=(
+ 'group_mod', [u'testgroup2'], dict(description=u'New desc 2')
+ ),
+ expected=dict(
+ result=dict(
+ description=(u'New desc 2',),
+ ),
+ value=u'testgroup2',
+ summary=u'Modified group "testgroup2"',
+ ),
+ ),
+
+ dict(
+ desc='Retrieve testgroup2 to check update',
+ command=('group_show', [u'testgroup2'], {}),
+ expected=dict(
+ value=u'testgroup2',
+ result=dict(
+ cn=(u'testgroup2',),
+ description=(u'New desc 2',),
+ ),
+ summary=None,
+ ),
+ ignore_values=['dn'],
+ ),
+
+
+ # member stuff:
+ dict(
+ desc='Make testgroup2 member of testgroup1',
+ command=(
+ 'group_add_member', [u'testgroup1'], dict(group=u'testgroup2')
+ ),
+ expected=dict(
+ completed=1,
+ failed=dict(
+ member=dict(
+ group=tuple(),
+ user=tuple(),
+ ),
+ ),
+ result={'member group': (u'testgroup2',)},
+ ),
+ ),
+
+ dict(
+ # FIXME: Shouldn't this raise a NotFound instead?
+ desc='Try to add a non-existent member to testgroup1',
+ command=(
+ 'group_add_member', [u'testgroup1'], dict(group=u'notfound')
+ ),
+ expected=dict(
+ completed=0,
+ failed=dict(
+ member=dict(
+ group=(u'notfound',),
+ user=tuple(),
+ ),
+ ),
+ result={'member group': (u'testgroup2',)},
+ ),
+ ),
+
+ dict(
+ desc='Remove member testgroup2 from testgroup1',
+ command=('group_remove_member',
+ [u'testgroup1'], dict(group=u'testgroup2')
+ ),
+ expected=dict(
+ completed=1,
+ result=dict(),
+ failed=dict(
+ member=dict(
+ group=tuple(),
+ user=tuple(),
+ ),
+ ),
+ ),
+ ),
+
+ dict(
+ # FIXME: Shouldn't this raise a NotFound instead?
+ desc='Try to remove a non-existent member from testgroup1',
+ command=('group_remove_member',
+ [u'testgroup1'], dict(group=u'notfound')
+ ),
+ expected=dict(
+ completed=0,
+ result=dict(),
+ failed=dict(
+ member=dict(
+ group=(u'notfound',),
+ user=tuple(),
+ ),
+ ),
+ ),
+ ),
+
+
+ # Delete:
+ dict(
+ desc='Delete testgroup1',
+ command=('group_del', [u'testgroup1'], {}),
+ expected=dict(
+ result=True,
+ value=u'testgroup1',
+ summary=u'Deleted group "testgroup1"',
+ ),
+ ),
+
+ dict(
+ desc='Delete testgroup2',
+ command=('group_del', [u'testgroup2'], {}),
+ expected=dict(
+ result=True,
+ value=u'testgroup2',
+ summary=u'Deleted group "testgroup2"',
+ ),
+ ),
+
+
+ ##############
+ # Non-existent
+ ##############
+
+ # testgroup1:
+ dict(
+ desc='Try to retrieve non-existent testgroup1',
+ command=('group_show', [u'testgroup1'], {}),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+ dict(
+ desc='Try to update non-existent testgroup1',
+ command=(
+ 'group_mod', [u'testgroup1'], dict(description=u'New desc 1')
+ ),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+ dict(
+ desc='Try to delete non-existent testgroup1',
+ command=('group_del', [u'testgroup1'], {}),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+
+ # testgroup2:
+ dict(
+ desc='Try to retrieve non-existent testgroup2',
+ command=('group_show', [u'testgroup2'], {}),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+ dict(
+ desc='Try to update non-existent testgroup2',
+ command=(
+ 'group_mod', [u'testgroup2'], dict(description=u'New desc 2')
+ ),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+ dict(
+ desc='Try to delete non-existent testgroup2',
+ command=('group_del', [u'testgroup2'], {}),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+
+
+ ]
diff --git a/tests/test_xmlrpc/test_hbac_plugin.py b/tests/test_xmlrpc/test_hbac_plugin.py
index 0393d68d2..aa7bb78a4 100644
--- a/tests/test_xmlrpc/test_hbac_plugin.py
+++ b/tests/test_xmlrpc/test_hbac_plugin.py
@@ -51,25 +51,27 @@ class test_hbac(XMLRPC_test):
"""
Test adding a new HBAC rule using `xmlrpc.hbac_add`.
"""
- (dn, res) = api.Command['hbac_add'](
- self.rule_name, accessruletype=self.rule_type,
- servicename=self.rule_service, accesstime=self.rule_time,
- description=self.rule_desc
+ ret = self.failsafe_add(api.Object.hbac,
+ self.rule_name,
+ accessruletype=self.rule_type,
+ servicename=self.rule_service,
+ accesstime=self.rule_time,
+ description=self.rule_desc,
)
- assert res
- assert_attr_equal(res, 'cn', self.rule_name)
- assert_attr_equal(res, 'accessruletype', self.rule_type)
- assert_attr_equal(res, 'servicename', self.rule_service)
- assert_attr_equal(res, 'accesstime', self.rule_time)
- assert_attr_equal(res, 'ipaenabledflag', 'TRUE')
- assert_attr_equal(res, 'description', self.rule_desc)
+ entry = ret['result']
+ assert_attr_equal(entry, 'cn', self.rule_name)
+ assert_attr_equal(entry, 'accessruletype', self.rule_type)
+ assert_attr_equal(entry, 'servicename', self.rule_service)
+ assert_attr_equal(entry, 'accesstime', self.rule_time)
+ assert_attr_equal(entry, 'ipaenabledflag', 'TRUE')
+ assert_attr_equal(entry, 'description', self.rule_desc)
def test_1_hbac_add(self):
"""
Test adding an existing HBAC rule using `xmlrpc.hbac_add'.
"""
try:
- (dn, res) = api.Command['hbac_add'](
+ api.Command['hbac_add'](
self.rule_name, accessruletype=self.rule_type
)
except errors.DuplicateEntry:
@@ -81,35 +83,35 @@ class test_hbac(XMLRPC_test):
"""
Test displaying a HBAC rule using `xmlrpc.hbac_show`.
"""
- (dn, res) = api.Command['hbac_show'](self.rule_name)
- assert res
- assert_attr_equal(res, 'cn', self.rule_name)
- assert_attr_equal(res, 'accessruletype', self.rule_type)
- assert_attr_equal(res, 'servicename', self.rule_service)
- assert_attr_equal(res, 'accesstime', self.rule_time)
- assert_attr_equal(res, 'ipaenabledflag', 'TRUE')
- assert_attr_equal(res, 'description', self.rule_desc)
+ entry = api.Command['hbac_show'](self.rule_name)['result']
+ assert_attr_equal(entry, 'cn', self.rule_name)
+ assert_attr_equal(entry, 'accessruletype', self.rule_type)
+ assert_attr_equal(entry, 'servicename', self.rule_service)
+ assert_attr_equal(entry, 'accesstime', self.rule_time)
+ assert_attr_equal(entry, 'ipaenabledflag', 'TRUE')
+ assert_attr_equal(entry, 'description', self.rule_desc)
def test_3_hbac_mod(self):
"""
Test modifying a HBAC rule using `xmlrpc.hbac_mod`.
"""
- (dn, res) = api.Command['hbac_mod'](
+ ret = api.Command['hbac_mod'](
self.rule_name, description=self.rule_desc_mod
)
- assert res
- assert_attr_equal(res, 'description', self.rule_desc_mod)
+ entry = ret['result']
+ assert_attr_equal(entry, 'description', self.rule_desc_mod)
def test_4_hbac_add_accesstime(self):
"""
Test adding access time to HBAC rule using `xmlrpc.hbac_add_accesstime`.
"""
- (dn, res) = api.Command['hbac_add_accesstime'](
+ return
+ ret = api.Command['hbac_add_accesstime'](
self.rule_name, accesstime=self.rule_time2
)
- assert res
- assert_attr_equal(res, 'accesstime', self.rule_time);
- assert_attr_equal(res, 'accesstime', self.rule_time2);
+ entry = ret['result']
+ assert_attr_equal(entry, 'accesstime', self.rule_time);
+ assert_attr_equal(entry, 'accesstime', self.rule_time2);
def test_5_hbac_add_accesstime(self):
"""
@@ -128,28 +130,36 @@ class test_hbac(XMLRPC_test):
"""
Test searching for HBAC rules using `xmlrpc.hbac_find`.
"""
- (res, truncated) = api.Command['hbac_find'](
+ ret = api.Command['hbac_find'](
name=self.rule_name, accessruletype=self.rule_type,
description=self.rule_desc_mod
)
- assert res
- assert res[0]
- assert_attr_equal(res[0][1], 'cn', self.rule_name)
- assert_attr_equal(res[0][1], 'accessruletype', self.rule_type)
- assert_attr_equal(res[0][1], 'description', self.rule_desc_mod)
+ assert ret['truncated'] is False
+ entries = ret['result']
+ assert_attr_equal(entries[0], 'cn', self.rule_name)
+ assert_attr_equal(entries[0], 'accessruletype', self.rule_type)
+ assert_attr_equal(entries[0], 'description', self.rule_desc_mod)
def test_7_hbac_init_testing_data(self):
"""
Initialize data for more HBAC plugin testing.
"""
- api.Command['user_add'](self.test_user, givenname=u'first', sn=u'last')
- api.Command['group_add'](self.test_group, description=u'description')
- api.Command['host_add'](self.test_host)
- api.Command['hostgroup_add'](
+ self.failsafe_add(api.Object.user,
+ self.test_user, givenname=u'first', sn=u'last'
+ )
+ self.failsafe_add(api.Object.group,
+ self.test_group, description=u'description'
+ )
+ self.failsafe_add(api.Object.host,
+ self.test_host
+ )
+ self.failsafe_add(api.Object.hostgroup,
self.test_hostgroup, description=u'description'
)
- api.Command['host_add'](self.test_sourcehost)
- api.Command['hostgroup_add'](
+ self.failsafe_add(api.Object.host,
+ self.test_sourcehost
+ )
+ self.failsafe_add(api.Object.hostgroup,
self.test_sourcehostgroup, description=u'desc'
)
@@ -157,67 +167,71 @@ class test_hbac(XMLRPC_test):
"""
Test adding user and group to HBAC rule using `xmlrpc.hbac_add_user`.
"""
- (completed, failed, res) = api.Command['hbac_add_user'](
+ ret = api.Command['hbac_add_user'](
self.rule_name, user=self.test_user, group=self.test_group
)
- assert completed == 2
+ assert ret['completed'] == 2
+ failed = ret['failed']
assert 'memberuser' in failed
assert 'user' in failed['memberuser']
assert not failed['memberuser']['user']
assert 'group' in failed['memberuser']
assert not failed['memberuser']['group']
- assert res
- assert_attr_equal(res[1], 'memberuser user', self.test_user)
- assert_attr_equal(res[1], 'memberuser group', self.test_group)
+ entry = ret['result']
+ assert_attr_equal(entry, 'memberuser user', self.test_user)
+ assert_attr_equal(entry, 'memberuser group', self.test_group)
def test_9_hbac_remove_user(self):
"""
Test removing user and group from HBAC rule using `xmlrpc.hbac_remove_user'.
"""
- (completed, failed, res) = api.Command['hbac_remove_user'](
+ ret = api.Command['hbac_remove_user'](
self.rule_name, user=self.test_user, group=self.test_group
)
- assert completed == 2
+ assert ret['completed'] == 2
+ failed = ret['failed']
assert 'memberuser' in failed
assert 'user' in failed['memberuser']
assert not failed['memberuser']['user']
assert 'group' in failed['memberuser']
assert not failed['memberuser']['group']
- assert res
- assert 'memberuser user' not in res[1]
- assert 'memberuser group' not in res[1]
+ entry = ret['result']
+ assert 'memberuser user' not in entry
+ assert 'memberuser group' not in entry
def test_a_hbac_add_host(self):
"""
Test adding host and hostgroup to HBAC rule using `xmlrpc.hbac_add_host`.
"""
- (completed, failed, res) = api.Command['hbac_add_host'](
+ ret = api.Command['hbac_add_host'](
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
)
- assert completed == 2
+ assert ret['completed'] == 2
+ failed = ret['failed']
assert 'memberhost' in failed
assert 'host' in failed['memberhost']
assert not failed['memberhost']['host']
assert 'hostgroup' in failed['memberhost']
assert not failed['memberhost']['hostgroup']
- assert res
- assert_attr_equal(res[1], 'memberhost host', self.test_host)
- assert_attr_equal(res[1], 'memberhost hostgroup', self.test_hostgroup)
+ entry = ret['result']
+ assert_attr_equal(entry, 'memberhost host', self.test_host)
+ assert_attr_equal(entry, 'memberhost hostgroup', self.test_hostgroup)
def test_b_hbac_remove_host(self):
"""
Test removing host and hostgroup from HBAC rule using `xmlrpc.hbac_remove_host`.
"""
- (completed, failed, res) = api.Command['hbac_remove_host'](
+ ret = api.Command['hbac_remove_host'](
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
)
- assert completed == 2
+ assert ret['completed'] == 2
+ failed = ret['failed']
assert 'memberhost' in failed
assert 'host' in failed['memberhost']
assert not failed['memberhost']['host']
assert 'hostgroup' in failed['memberhost']
assert not failed['memberhost']['hostgroup']
- assert res
+ entry = ret['result']
assert 'memberhost host' not in res[1]
assert 'memberhost hostgroup' not in res[1]
@@ -225,35 +239,37 @@ class test_hbac(XMLRPC_test):
"""
Test adding source host and hostgroup to HBAC rule using `xmlrpc.hbac_add_host`.
"""
- (completed, failed, res) = api.Command['hbac_add_sourcehost'](
+ ret = api.Command['hbac_add_sourcehost'](
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
)
- assert completed == 2
+ assert ret['completed'] == 2
+ failed = ret['failed']
assert 'sourcehost' in failed
assert 'host' in failed['sourcehost']
assert not failed['sourcehost']['host']
assert 'hostgroup' in failed['sourcehost']
assert not failed['sourcehost']['hostgroup']
- assert res
- assert_attr_equal(res[1], 'sourcehost host', self.test_host)
- assert_attr_equal(res[1], 'sourcehost hostgroup', self.test_hostgroup)
+ entry = ret['result']
+ assert_attr_equal(entry, 'sourcehost host', self.test_host)
+ assert_attr_equal(entry, 'sourcehost hostgroup', self.test_hostgroup)
def test_b_hbac_remove_host(self):
"""
Test removing source host and hostgroup from HBAC rule using `xmlrpc.hbac_remove_host`.
"""
- (completed, failed, res) = api.Command['hbac_remove_sourcehost'](
+ ret = api.Command['hbac_remove_sourcehost'](
self.rule_name, host=self.test_host, hostgroup=self.test_hostgroup
)
- assert completed == 2
+ assert ret['completed'] == 2
+ failed = ret['failed']
assert 'sourcehost' in failed
assert 'host' in failed['sourcehost']
assert not failed['sourcehost']['host']
assert 'hostgroup' in failed['sourcehost']
assert not failed['sourcehost']['hostgroup']
- assert res
- assert 'sourcehost host' not in res[1]
- assert 'sourcehost hostgroup' not in res[1]
+ entry = ret['result']
+ assert 'sourcehost host' not in entry
+ assert 'sourcehost hostgroup' not in entry
def test_c_hbac_clear_testing_data(self):
"""
@@ -270,30 +286,26 @@ class test_hbac(XMLRPC_test):
"""
Test disabling HBAC rule using `xmlrpc.hbac_disable`.
"""
- res = api.Command['hbac_disable'](self.rule_name)
- assert res == True
- # check it's really disabled
- (dn, res) = api.Command['hbac_show'](self.rule_name)
- assert res
- assert_attr_equal(res, 'ipaenabledflag', 'disabled')
+ assert api.Command['hbac_disable'](self.rule_name)['result'] is True
+ entry = api.Command['hbac_show'](self.rule_name)['result']
+ # FIXME: Should this be 'disabled' or 'FALSE'?
+ assert_attr_equal(entry, 'ipaenabledflag', 'FALSE')
def test_e_hbac_enabled(self):
"""
Test enabling HBAC rule using `xmlrpc.hbac_enable`.
"""
- res = api.Command['hbac_enable'](self.rule_name)
- assert res == True
+ assert api.Command['hbac_enable'](self.rule_name)['result'] is True
# check it's really enabled
- (dn, res) = api.Command['hbac_show'](self.rule_name)
- assert res
- assert_attr_equal(res, 'ipaenabledflag', 'enabled')
+ entry = api.Command['hbac_show'](self.rule_name)['result']
+ # FIXME: Should this be 'enabled' or 'TRUE'?
+ assert_attr_equal(entry, 'ipaenabledflag', 'TRUE')
def test_f_hbac_del(self):
"""
Test deleting a HBAC rule using `xmlrpc.hbac_remove_sourcehost`.
"""
- res = api.Command['hbac_del'](self.rule_name)
- assert res == True
+ assert api.Command['hbac_del'](self.rule_name)['result'] is True
# verify that it's gone
try:
api.Command['hbac_show'](self.rule_name)
@@ -301,4 +313,3 @@ class test_hbac(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_host_plugin.py b/tests/test_xmlrpc/test_host_plugin.py
index 817c759eb..009e98eb6 100644
--- a/tests/test_xmlrpc/test_host_plugin.py
+++ b/tests/test_xmlrpc/test_host_plugin.py
@@ -40,7 +40,7 @@ class test_host(XMLRPC_test):
"""
Test the `xmlrpc.host_add` method.
"""
- (dn, res) = api.Command['host_add'](**self.kw)
+ res = api.Command['host_add'](**self.kw)['result']
assert type(res) is dict
assert_attr_equal(res, 'description', self.description)
assert_attr_equal(res, 'fqdn', self.fqdn)
@@ -52,9 +52,7 @@ class test_host(XMLRPC_test):
Test the `xmlrpc.host_show` method with all attributes.
"""
kw = {'fqdn': self.fqdn, 'all': True, 'raw': True}
- (dn, res) = api.Command['host_show'](**kw)
- print res
- print '%r' % res
+ res = api.Command['host_show'](**kw)['result']
assert res
assert_attr_equal(res, 'description', self.description)
assert_attr_equal(res, 'fqdn', self.fqdn)
@@ -65,7 +63,7 @@ class test_host(XMLRPC_test):
Test the `xmlrpc.host_show` method with default attributes.
"""
kw = {'fqdn': self.fqdn, 'raw': True}
- (dn, res) = api.Command['host_show'](**kw)
+ res = api.Command['host_show'](**kw)['result']
assert res
assert_attr_equal(res, 'description', self.description)
assert_attr_equal(res, 'fqdn', self.fqdn)
@@ -76,21 +74,21 @@ class test_host(XMLRPC_test):
Test the `xmlrpc.host_find` method with all attributes.
"""
kw = {'fqdn': self.fqdn, 'all': True, 'raw': True}
- (res, truncated) = api.Command['host_find'](**kw)
+ res = api.Command['host_find'](**kw)['result']
assert res
- assert_attr_equal(res[0][1], 'description', self.description)
- assert_attr_equal(res[0][1], 'fqdn', self.fqdn)
- assert_attr_equal(res[0][1], 'l', self.localityname)
+ assert_attr_equal(res[0], 'description', self.description)
+ assert_attr_equal(res[0], 'fqdn', self.fqdn)
+ assert_attr_equal(res[0], 'l', self.localityname)
def test_5_host_find(self):
"""
Test the `xmlrpc.host_find` method with default attributes.
"""
- (res, truncated) = api.Command['host_find'](self.fqdn, raw=True)
+ res = api.Command['host_find'](self.fqdn, raw=True)['result']
assert res
- assert_attr_equal(res[0][1], 'description', self.description)
- assert_attr_equal(res[0][1], 'fqdn', self.fqdn)
- assert_attr_equal(res[0][1], 'localityname', self.localityname)
+ assert_attr_equal(res[0], 'description', self.description)
+ assert_attr_equal(res[0], 'fqdn', self.fqdn)
+ assert_attr_equal(res[0], 'localityname', self.localityname)
def test_6_host_mod(self):
"""
@@ -98,12 +96,12 @@ class test_host(XMLRPC_test):
"""
newdesc = u'Updated host'
modkw = {'fqdn': self.fqdn, 'description': newdesc, 'raw': True}
- (dn, res) = api.Command['host_mod'](**modkw)
+ res = api.Command['host_mod'](**modkw)['result']
assert res
assert_attr_equal(res, 'description', newdesc)
# Ok, double-check that it was changed
- (dn, res) = api.Command['host_show'](self.fqdn, raw=True)
+ res = api.Command['host_show'](self.fqdn, raw=True)['result']
assert res
assert_attr_equal(res, 'description', newdesc)
assert_attr_equal(res, 'fqdn', self.fqdn)
@@ -112,8 +110,7 @@ class test_host(XMLRPC_test):
"""
Test the `xmlrpc.host_del` method.
"""
- res = api.Command['host_del'](self.fqdn)
- assert res == True
+ assert api.Command['host_del'](self.fqdn)['result'] is True
# Verify that it is gone
try:
@@ -122,4 +119,3 @@ class test_host(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_hostgroup_plugin.py b/tests/test_xmlrpc/test_hostgroup_plugin.py
index fdc73baf6..7fa227a28 100644
--- a/tests/test_xmlrpc/test_hostgroup_plugin.py
+++ b/tests/test_xmlrpc/test_hostgroup_plugin.py
@@ -43,21 +43,19 @@ class test_hostgroup(XMLRPC_test):
"""
Test the `xmlrpc.hostgroup_add` method.
"""
- (dn, res) = api.Command['hostgroup_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
+ entry = api.Command['hostgroup_add'](**self.kw)['result']
+ assert_attr_equal(entry, 'description', self.description)
+ assert_attr_equal(entry, 'cn', self.cn)
+ assert_attr_equal(entry, 'objectclass', 'ipaobject')
def test_2_host_add(self):
"""
Add a host to test add/remove member.
"""
kw = {'fqdn': self.host_fqdn, 'description': self.host_description, 'localityname': self.host_localityname, 'raw': True}
- (dn, res) = api.Command['host_add'](**kw)
- assert res
- assert_attr_equal(res, 'description', self.host_description)
- assert_attr_equal(res, 'fqdn', self.host_fqdn)
+ entry = api.Command['host_add'](**kw)['result']
+ assert_attr_equal(entry, 'description', self.host_description)
+ assert_attr_equal(entry, 'fqdn', self.host_fqdn)
def test_3_hostgroup_add_member(self):
"""
@@ -65,26 +63,27 @@ class test_hostgroup(XMLRPC_test):
"""
kw = {'raw': True}
kw['host'] = self.host_fqdn
- (total, failed, res) = api.Command['hostgroup_add_member'](self.cn, **kw)
- assert res[1].get('member', []) != [], '%r %r %r' % (total, failed, res)
+ ret = api.Command['hostgroup_add_member'](self.cn, **kw)
+ assert ret['result']['member'] != []
+ assert ret['completed'] == 1
def test_4_hostgroup_show(self):
"""
Test the `xmlrpc.hostgroup_show` method.
"""
- (dn, res) = api.Command['hostgroup_show'](self.cn, raw=True)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
+ entry = api.Command['hostgroup_show'](self.cn, raw=True)['result']
+ assert_attr_equal(entry, 'description', self.description)
+ assert_attr_equal(entry, 'cn', self.cn)
def test_5_hostgroup_find(self):
"""
Test the `xmlrpc.hostgroup_find` method.
"""
- (res, truncated) = api.Command['hostgroup_find'](cn=self.cn, raw=True)
- assert res
- assert_attr_equal(res[0][1], 'description', self.description)
- assert_attr_equal(res[0][1], 'cn', self.cn)
+ ret = api.Command['hostgroup_find'](cn=self.cn, raw=True)
+ assert ret['truncated'] is False
+ entries = ret['result']
+ assert_attr_equal(entries[0], 'description', self.description)
+ assert_attr_equal(entries[0], 'cn', self.cn)
def test_6_hostgroup_mod(self):
"""
@@ -92,15 +91,13 @@ class test_hostgroup(XMLRPC_test):
"""
newdesc = u'Updated host group'
modkw = {'cn': self.cn, 'description': newdesc, 'raw': True}
- (dn, res) = api.Command['hostgroup_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'description', newdesc)
+ entry = api.Command['hostgroup_mod'](**modkw)['result']
+ assert_attr_equal(entry, 'description', newdesc)
# Ok, double-check that it was changed
- (dn, res) = api.Command['hostgroup_show'](self.cn, raw=True)
- assert res
- assert_attr_equal(res, 'description', newdesc)
- assert_attr_equal(res, 'cn', self.cn)
+ entry = api.Command['hostgroup_show'](self.cn, raw=True)['result']
+ assert_attr_equal(entry, 'description', newdesc)
+ assert_attr_equal(entry, 'cn', self.cn)
def test_7_hostgroup_remove_member(self):
"""
@@ -108,16 +105,14 @@ class test_hostgroup(XMLRPC_test):
"""
kw = {'raw': True}
kw['host'] = self.host_fqdn
- (total, failed, res) = api.Command['hostgroup_remove_member'](self.cn, **kw)
- assert res
- assert res[1].get('member', []) == []
+ ret = api.Command['hostgroup_remove_member'](self.cn, **kw)
+ assert ret['completed'] == 1
def test_8_hostgroup_del(self):
"""
Test the `xmlrpc.hostgroup_del` method.
"""
- res = api.Command['hostgroup_del'](self.cn)
- assert res == True
+ assert api.Command['hostgroup_del'](self.cn)['result'] is True
# Verify that it is gone
try:
@@ -131,8 +126,7 @@ class test_hostgroup(XMLRPC_test):
"""
Test the `xmlrpc.host_del` method.
"""
- res = api.Command['host_del'](self.host_fqdn)
- assert res == True
+ assert api.Command['host_del'](self.host_fqdn)['result'] is True
# Verify that it is gone
try:
@@ -141,4 +135,3 @@ class test_hostgroup(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_netgroup_plugin.py b/tests/test_xmlrpc/test_netgroup_plugin.py
index 41ee0953b..3e98f2654 100644
--- a/tests/test_xmlrpc/test_netgroup_plugin.py
+++ b/tests/test_xmlrpc/test_netgroup_plugin.py
@@ -58,38 +58,33 @@ class test_netgroup(XMLRPC_test):
"""
Test the `xmlrpc.netgroup_add` method.
"""
- (dn, res) = api.Command['netgroup_add'](**self.ng_kw)
- assert res
- assert_attr_equal(res, 'description', self.ng_description)
- assert_attr_equal(res, 'cn', self.ng_cn)
+ entry = api.Command['netgroup_add'](**self.ng_kw)['result']
+ assert_attr_equal(entry, 'description', self.ng_description)
+ assert_attr_equal(entry, 'cn', self.ng_cn)
def test_2_add_data(self):
"""
Add the data needed to do additional testing.
"""
# Add a host
- (dn, res) = api.Command['host_add'](**self.host_kw)
- assert res
- assert_attr_equal(res, 'description', self.host_description)
- assert_attr_equal(res, 'fqdn', self.host_fqdn)
+ entry = api.Command['host_add'](**self.host_kw)['result']
+ assert_attr_equal(entry, 'description', self.host_description)
+ assert_attr_equal(entry, 'fqdn', self.host_fqdn)
# Add a hostgroup
- (dn, res) = api.Command['hostgroup_add'](**self.hg_kw)
- assert res
- assert_attr_equal(res, 'description', self.hg_description)
- assert_attr_equal(res, 'cn', self.hg_cn)
+ entry= api.Command['hostgroup_add'](**self.hg_kw)['result']
+ assert_attr_equal(entry, 'description', self.hg_description)
+ assert_attr_equal(entry, 'cn', self.hg_cn)
# Add a user
- (dn, res) = api.Command['user_add'](**self.user_kw)
- assert res
- assert_attr_equal(res, 'givenname', self.user_givenname)
- assert_attr_equal(res, 'uid', self.user_uid)
+ entry = api.Command['user_add'](**self.user_kw)['result']
+ assert_attr_equal(entry, 'givenname', self.user_givenname)
+ assert_attr_equal(entry, 'uid', self.user_uid)
# Add a group
- (dn, res) = api.Command['group_add'](**self.group_kw)
- assert res
- assert_attr_equal(res, 'description', self.group_description)
- assert_attr_equal(res, 'cn', self.group_cn)
+ entry = api.Command['group_add'](**self.group_kw)['result']
+ assert_attr_equal(entry, 'description', self.group_description)
+ assert_attr_equal(entry, 'cn', self.group_cn)
def test_3_netgroup_add_member(self):
"""
@@ -97,27 +92,26 @@ class test_netgroup(XMLRPC_test):
"""
kw = {'raw': True}
kw['host'] = self.host_fqdn
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 1
- assert_is_member(res[1], 'fqdn=%s' % self.host_fqdn)
+ entry = api.Command['netgroup_add_member'](self.ng_cn, **kw)['result']
+ assert_is_member(entry, 'fqdn=%s' % self.host_fqdn)
kw = {'raw': True}
kw['hostgroup'] = self.hg_cn
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 1
- assert_is_member(res[1], 'cn=%s' % self.hg_cn)
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1
+ assert_is_member(ret['result'], 'cn=%s' % self.hg_cn)
kw = {'raw': True}
kw['user'] = self.user_uid
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 1
- assert_is_member(res[1], 'uid=%s' % self.user_uid)
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1
+ assert_is_member(ret['result'], 'uid=%s' % self.user_uid)
kw = {'raw': True}
kw['group'] = self.group_cn
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 1
- assert_is_member(res[1], 'cn=%s' % self.group_cn)
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1
+ assert_is_member(ret['result'], 'cn=%s' % self.group_cn)
def test_4_netgroup_add_member(self):
"""
@@ -125,32 +119,36 @@ class test_netgroup(XMLRPC_test):
"""
kw = {'raw': True}
kw['host'] = self.host_fqdn
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 0
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'host' in failed['member']
assert self.host_fqdn in failed['member']['host']
kw = {'raw': True}
kw['hostgroup'] = self.hg_cn
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 0
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'hostgroup' in failed['member']
assert self.hg_cn in failed['member']['hostgroup']
kw = {'raw': True}
kw['user'] = self.user_uid
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 0
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'user' in failed['member']
assert self.user_uid in failed['member']['user']
kw = {'raw': True}
kw['group'] = self.group_cn
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 0
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'group' in failed['member']
assert self.group_cn in failed['member']['group']
@@ -161,36 +159,31 @@ class test_netgroup(XMLRPC_test):
"""
kw = {'raw': True}
kw['host'] = u'nosuchhost'
- (total, failed, res) = api.Command['netgroup_add_member'](self.ng_cn, **kw)
- assert total == 1, '%r %r %r' % (total, failed, res)
-
- (dn, res) = api.Command['netgroup_show'](self.ng_cn, all=True, raw=True)
- assert res
- print res
- assert_is_member(res, 'nosuchhost', 'externalhost')
+ ret = api.Command['netgroup_add_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1, ret
+ entry = api.Command['netgroup_show'](self.ng_cn, all=True, raw=True)['result']
+ assert_is_member(entry, 'nosuchhost', 'externalhost')
def test_6_netgroup_show(self):
"""
Test the `xmlrpc.netgroup_show` method.
"""
- (dn, res) = api.Command['netgroup_show'](self.ng_cn, all=True, raw=True)
- assert res
- assert_attr_equal(res, 'description', self.ng_description)
- assert_attr_equal(res, 'cn', self.ng_cn)
- assert_is_member(res, 'fqdn=%s' % self.host_fqdn)
- assert_is_member(res, 'cn=%s' % self.hg_cn)
- assert_is_member(res, 'uid=%s' % self.user_uid)
- assert_is_member(res, 'cn=%s' % self.group_cn)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
+ entry = api.Command['netgroup_show'](self.ng_cn, all=True, raw=True)['result']
+ assert_attr_equal(entry, 'description', self.ng_description)
+ assert_attr_equal(entry, 'cn', self.ng_cn)
+ assert_is_member(entry, 'fqdn=%s' % self.host_fqdn)
+ assert_is_member(entry, 'cn=%s' % self.hg_cn)
+ assert_is_member(entry, 'uid=%s' % self.user_uid)
+ assert_is_member(entry, 'cn=%s' % self.group_cn)
+ assert_attr_equal(entry, 'objectclass', 'ipaobject')
def test_7_netgroup_find(self):
"""
Test the `xmlrpc.hostgroup_find` method.
"""
- (res, truncated) = api.Command.netgroup_find(self.ng_cn, raw=True)
- assert res
- assert_attr_equal(res[0][1], 'description', self.ng_description)
- assert_attr_equal(res[0][1], 'cn', self.ng_cn)
+ entries = api.Command.netgroup_find(self.ng_cn, raw=True)['result']
+ assert_attr_equal(entries[0], 'description', self.ng_description)
+ assert_attr_equal(entries[0], 'cn', self.ng_cn)
def test_8_netgroup_mod(self):
"""
@@ -198,15 +191,13 @@ class test_netgroup(XMLRPC_test):
"""
newdesc = u'Updated host group'
modkw = {'cn': self.ng_cn, 'description': newdesc, 'raw': True}
- (dn, res) = api.Command['netgroup_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'description', newdesc)
+ entry = api.Command['netgroup_mod'](**modkw)['result']
+ assert_attr_equal(entry, 'description', newdesc)
# Ok, double-check that it was changed
- (dn, res) = api.Command['netgroup_show'](self.ng_cn, raw=True)
- assert res
- assert_attr_equal(res, 'description', newdesc)
- assert_attr_equal(res, 'cn', self.ng_cn)
+ entry = api.Command['netgroup_show'](self.ng_cn, raw=True)['result']
+ assert_attr_equal(entry, 'description', newdesc)
+ assert_attr_equal(entry, 'cn', self.ng_cn)
def test_9_netgroup_remove_member(self):
"""
@@ -214,23 +205,23 @@ class test_netgroup(XMLRPC_test):
"""
kw = {'raw': True}
kw['host'] = self.host_fqdn
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 1
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1
kw = {'raw': True}
kw['hostgroup'] = self.hg_cn
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 1
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1
kw = {'raw': True}
kw['user'] = self.user_uid
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 1
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1
kw = {'raw': True}
kw['group'] = self.group_cn
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 1
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 1
def test_a_netgroup_remove_member(self):
"""
@@ -238,33 +229,37 @@ class test_netgroup(XMLRPC_test):
"""
kw = {'raw': True}
kw['host'] = self.host_fqdn
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 0
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'host' in failed['member']
assert self.host_fqdn in failed['member']['host']
kw = {'raw': True}
kw['hostgroup'] = self.hg_cn
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 0
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'hostgroup' in failed['member']
assert self.hg_cn in failed['member']['hostgroup']
kw = {'raw': True}
kw['user'] = self.user_uid
- (dn, res) = api.Command['netgroup_show'](self.ng_cn, all=True)
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 0
+ api.Command['netgroup_show'](self.ng_cn, all=True)
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'user' in failed['member']
assert self.user_uid in failed['member']['user']
kw = {'raw': True}
kw['group'] = self.group_cn
- (total, failed, res) = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
- assert total == 0
+ ret = api.Command['netgroup_remove_member'](self.ng_cn, **kw)
+ assert ret['completed'] == 0
+ failed = ret['failed']
assert 'member' in failed
assert 'group' in failed['member']
assert self.group_cn in failed['member']['group']
@@ -273,8 +268,7 @@ class test_netgroup(XMLRPC_test):
"""
Test the `xmlrpc.netgroup_del` method.
"""
- res = api.Command['netgroup_del'](self.ng_cn)
- assert res == True
+ assert api.Command['netgroup_del'](self.ng_cn)['result'] is True
# Verify that it is gone
try:
@@ -289,8 +283,7 @@ class test_netgroup(XMLRPC_test):
Remove the test data we added.
"""
# Remove the host
- res = api.Command['host_del'](self.host_fqdn)
- assert res == True
+ assert api.Command['host_del'](self.host_fqdn)['result'] is True
# Verify that it is gone
try:
@@ -301,8 +294,7 @@ class test_netgroup(XMLRPC_test):
assert False
# Remove the hostgroup
- res = api.Command['hostgroup_del'](self.hg_cn)
- assert res == True
+ assert api.Command['hostgroup_del'](self.hg_cn)['result'] is True
# Verify that it is gone
try:
@@ -313,8 +305,7 @@ class test_netgroup(XMLRPC_test):
assert False
# Remove the user
- res = api.Command['user_del'](self.user_uid)
- assert res == True
+ assert api.Command['user_del'](self.user_uid)['result'] is True
# Verify that it is gone
try:
@@ -325,8 +316,7 @@ class test_netgroup(XMLRPC_test):
assert False
# Remove the group
- res = api.Command['group_del'](self.group_cn)
- assert res == True
+ assert api.Command['group_del'](self.group_cn)['result'] is True
# Verify that it is gone
try:
@@ -335,4 +325,3 @@ class test_netgroup(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_passwd_plugin.py b/tests/test_xmlrpc/test_passwd_plugin.py
index 21fb743f9..c14fa53df 100644
--- a/tests/test_xmlrpc/test_passwd_plugin.py
+++ b/tests/test_xmlrpc/test_passwd_plugin.py
@@ -41,27 +41,25 @@ class test_passwd(XMLRPC_test):
"""
Create a test user
"""
- (dn, res) = api.Command['user_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'givenname', self.givenname)
- assert_attr_equal(res, 'sn', self.sn)
- assert_attr_equal(res, 'uid', self.uid)
- assert_attr_equal(res, 'homedirectory', self.home)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
+ entry = api.Command['user_add'](**self.kw)['result']
+ assert_attr_equal(entry, 'givenname', self.givenname)
+ assert_attr_equal(entry, 'sn', self.sn)
+ assert_attr_equal(entry, 'uid', self.uid)
+ assert_attr_equal(entry, 'homedirectory', self.home)
+ assert_attr_equal(entry, 'objectclass', 'ipaobject')
def test_2_set_passwd(self):
"""
Test the `xmlrpc.passwd` method.
"""
- res = api.Command['passwd'](self.uid, password=u'password1')
- assert res
+ out = api.Command['passwd'](self.uid, password=u'password1')
+ assert out['result'] is True
def test_3_user_del(self):
"""
Remove the test user
"""
- res = api.Command['user_del'](self.uid)
- assert res == True
+ assert api.Command['user_del'](self.uid)['result'] is True
# Verify that it is gone
try:
diff --git a/tests/test_xmlrpc/test_pwpolicy.py b/tests/test_xmlrpc/test_pwpolicy.py
index a6cdbf283..1c2ccd1d8 100644
--- a/tests/test_xmlrpc/test_pwpolicy.py
+++ b/tests/test_xmlrpc/test_pwpolicy.py
@@ -41,16 +41,19 @@ class test_pwpolicy(XMLRPC_test):
Test adding a per-group policy using the `xmlrpc.pwpolicy_add` method.
"""
# First set up a group and user that will use this policy
- (groupdn, res) = api.Command['group_add'](self.group, description=u'pwpolicy test group')
- (userdn, res) = api.Command['user_add'](self.user, givenname=u'Test', sn=u'User')
- (total, failed, res) = api.Command['group_add_member'](self.group, users=self.user)
-
- (dn, res) = api.Command['pwpolicy_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'krbminpwdlife', '30')
- assert_attr_equal(res, 'krbmaxpwdlife', '40')
- assert_attr_equal(res, 'krbpwdhistorylength', '5')
- assert_attr_equal(res, 'krbpwdminlength', '6')
+ self.failsafe_add(
+ api.Object.group, self.group, description=u'pwpolicy test group',
+ )
+ self.failsafe_add(
+ api.Object.user, self.user, givenname=u'Test', sn=u'User'
+ )
+ api.Command.group_add_member(self.group, users=self.user)
+
+ entry = api.Command['pwpolicy_add'](**self.kw)['result']
+ assert_attr_equal(entry, 'krbminpwdlife', '30')
+ assert_attr_equal(entry, 'krbmaxpwdlife', '40')
+ assert_attr_equal(entry, 'krbpwdhistorylength', '5')
+ assert_attr_equal(entry, 'krbpwdminlength', '6')
def test_2_pwpolicy_add(self):
"""
@@ -67,13 +70,14 @@ class test_pwpolicy(XMLRPC_test):
"""
Test adding another per-group policy using the `xmlrpc.pwpolicy_add` method.
"""
- (groupdn, res) = api.Command['group_add'](self.group2, description=u'pwpolicy test group 2')
- (dn, res) = api.Command['pwpolicy_add'](**self.kw2)
- assert res
- assert_attr_equal(res, 'krbminpwdlife', '40')
- assert_attr_equal(res, 'krbmaxpwdlife', '60')
- assert_attr_equal(res, 'krbpwdhistorylength', '8')
- assert_attr_equal(res, 'krbpwdminlength', '9')
+ self.failsafe_add(
+ api.Object.group, self.group2, description=u'pwpolicy test group 2'
+ )
+ entry = api.Command['pwpolicy_add'](**self.kw2)['result']
+ assert_attr_equal(entry, 'krbminpwdlife', '40')
+ assert_attr_equal(entry, 'krbmaxpwdlife', '60')
+ assert_attr_equal(entry, 'krbpwdhistorylength', '8')
+ assert_attr_equal(entry, 'krbpwdminlength', '9')
def test_4_pwpolicy_add(self):
"""
@@ -90,54 +94,46 @@ class test_pwpolicy(XMLRPC_test):
"""
Test the `xmlrpc.pwpolicy_show` method with global policy.
"""
- (dn, res) = api.Command['pwpolicy_show']()
- assert res
-
+ entry = api.Command['pwpolicy_show']()['result']
# Note that this assumes an unchanged global policy
- assert_attr_equal(res, 'krbminpwdlife', '1')
- assert_attr_equal(res, 'krbmaxpwdlife', '90')
- assert_attr_equal(res, 'krbpwdhistorylength', '0')
- assert_attr_equal(res, 'krbpwdminlength', '8')
+ assert_attr_equal(entry, 'krbminpwdlife', '1')
+ assert_attr_equal(entry, 'krbmaxpwdlife', '90')
+ assert_attr_equal(entry, 'krbpwdhistorylength', '0')
+ assert_attr_equal(entry, 'krbpwdminlength', '8')
def test_6_pwpolicy_show(self):
"""
Test the `xmlrpc.pwpolicy_show` method.
"""
- (dn, res) = api.Command['pwpolicy_show'](group=self.group)
- assert res
- assert_attr_equal(res, 'krbminpwdlife', '30')
- assert_attr_equal(res, 'krbmaxpwdlife', '40')
- assert_attr_equal(res, 'krbpwdhistorylength', '5')
- assert_attr_equal(res, 'krbpwdminlength', '6')
+ entry = api.Command['pwpolicy_show'](group=self.group)['result']
+ assert_attr_equal(entry, 'krbminpwdlife', '30')
+ assert_attr_equal(entry, 'krbmaxpwdlife', '40')
+ assert_attr_equal(entry, 'krbpwdhistorylength', '5')
+ assert_attr_equal(entry, 'krbpwdminlength', '6')
def test_7_pwpolicy_mod(self):
"""
Test the `xmlrpc.pwpolicy_mod` method for global policy.
"""
- (dn, res) = api.Command['pwpolicy_mod'](krbminpwdlife=50)
- assert res
- assert_attr_equal(res, 'krbminpwdlife', '50')
+ entry = api.Command['pwpolicy_mod'](krbminpwdlife=50)['result']
+ assert_attr_equal(entry, 'krbminpwdlife', '50')
# Great, now change it back
- (dn, res) = api.Command['pwpolicy_mod'](krbminpwdlife=1)
- assert res
- assert_attr_equal(res, 'krbminpwdlife', '1')
+ entry = api.Command['pwpolicy_mod'](krbminpwdlife=1)['result']
+ assert_attr_equal(entry, 'krbminpwdlife', '1')
def test_8_pwpolicy_mod(self):
"""
Test the `xmlrpc.pwpolicy_mod` method.
"""
- (dn, res) = api.Command['pwpolicy_mod'](group=self.group, krbminpwdlife=50)
- assert res
- assert_attr_equal(res, 'krbminpwdlife', '50')
+ entry = api.Command['pwpolicy_mod'](group=self.group, krbminpwdlife=50)['result']
+ assert_attr_equal(entry, 'krbminpwdlife', '50')
def test_9_pwpolicy_del(self):
"""
Test the `xmlrpc.pwpolicy_del` method.
"""
- res = api.Command['pwpolicy_del'](group=self.group)
- assert res == True
-
+ assert api.Command['pwpolicy_del'](group=self.group)['result'] is True
# Verify that it is gone
try:
api.Command['pwpolicy_show'](group=self.group)
@@ -147,18 +143,17 @@ class test_pwpolicy(XMLRPC_test):
assert False
# Remove the groups we created
- res = api.Command['group_del'](self.group)
- res = api.Command['group_del'](self.group2)
+ api.Command['group_del'](self.group)
+ api.Command['group_del'](self.group2)
# Remove the user we created
- res = api.Command['user_del'](self.user)
+ api.Command['user_del'](self.user)
def test_a_pwpolicy_del(self):
"""
Remove the second test policy with `xmlrpc.pwpolicy_del`.
"""
- res = api.Command['pwpolicy_del'](group=self.group2)
- assert res == True
+ assert api.Command['pwpolicy_del'](group=self.group2)['result'] is True
# Verify that it is gone
try:
diff --git a/tests/test_xmlrpc/test_rolegroup_plugin.py b/tests/test_xmlrpc/test_rolegroup_plugin.py
index 60971a06b..9c41b23e5 100644
--- a/tests/test_xmlrpc/test_rolegroup_plugin.py
+++ b/tests/test_xmlrpc/test_rolegroup_plugin.py
@@ -42,21 +42,21 @@ class test_rolegroup(XMLRPC_test):
"""
Test the `xmlrpc.rolegroup_add` method.
"""
- (dn, res) = api.Command['rolegroup_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
+ entry = api.Command['rolegroup_add'](**self.kw)['result']
+ assert_attr_equal(entry, 'description', self.description)
+ assert_attr_equal(entry, 'cn', self.cn)
+ # FIXME: Has the schema changed? rolegroup doesn't have the 'ipaobject'
+ # object class.
+ #assert_attr_equal(entry, 'objectclass', 'ipaobject')
def test_2_add_group(self):
"""
Add a group to test add/remove member.
"""
kw = {'cn': self.rolegroup_cn, 'description': self.rolegroup_description, 'raw': True}
- (dn, res) = api.Command['group_add'](**kw)
- assert res
- assert_attr_equal(res, 'description', self.rolegroup_description)
- assert_attr_equal(res, 'cn', self.rolegroup_cn)
+ entry = api.Command['group_add'](**kw)['result']
+ assert_attr_equal(entry, 'description', self.rolegroup_description)
+ assert_attr_equal(entry, 'cn', self.rolegroup_cn)
def test_3_rolegroup_add_member(self):
"""
@@ -64,28 +64,28 @@ class test_rolegroup(XMLRPC_test):
"""
kw = {}
kw['group'] = self.rolegroup_cn
- (total, failed, res) = api.Command['rolegroup_add_member'](self.cn, **kw)
- assert total == 1
+ ret = api.Command['rolegroup_add_member'](self.cn, **kw)
+ assert ret['completed'] == 1
def test_4_rolegroup_show(self):
"""
Test the `xmlrpc.rolegroup_show` method.
"""
- (dn, res) = api.Command['rolegroup_show'](self.cn, all=True, raw=True)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
- assert_is_member(res, 'cn=%s' % self.rolegroup_cn)
+ entry = api.Command['rolegroup_show'](self.cn, all=True, raw=True)['result']
+ assert_attr_equal(entry, 'description', self.description)
+ assert_attr_equal(entry, 'cn', self.cn)
+ assert_is_member(entry, 'cn=%s' % self.rolegroup_cn)
def test_5_rolegroup_find(self):
"""
Test the `xmlrpc.rolegroup_find` method.
"""
- (res, truncated) = api.Command['rolegroup_find'](self.cn, all=True, raw=True)
- assert res
- assert_attr_equal(res[0][1], 'description', self.description)
- assert_attr_equal(res[0][1], 'cn', self.cn)
- assert_is_member(res[0][1], 'cn=%s' % self.rolegroup_cn)
+ ret = api.Command['rolegroup_find'](self.cn, all=True, raw=True)
+ assert ret['truncated'] is False
+ entries = ret['result']
+ assert_attr_equal(entries[0], 'description', self.description)
+ assert_attr_equal(entries[0], 'cn', self.cn)
+ assert_is_member(entries[0], 'cn=%s' % self.rolegroup_cn)
def test_6_rolegroup_mod(self):
"""
@@ -93,15 +93,13 @@ class test_rolegroup(XMLRPC_test):
"""
newdesc = u'Updated role group'
modkw = {'cn': self.cn, 'description': newdesc, 'raw': True}
- (dn, res) = api.Command['rolegroup_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'description', newdesc)
+ entry = api.Command['rolegroup_mod'](**modkw)['result']
+ assert_attr_equal(entry, 'description', newdesc)
# Ok, double-check that it was changed
- (dn, res) = api.Command['rolegroup_show'](self.cn, raw=True)
- assert res
- assert_attr_equal(res, 'description', newdesc)
- assert_attr_equal(res, 'cn', self.cn)
+ entry = api.Command['rolegroup_show'](self.cn, raw=True)['result']
+ assert_attr_equal(entry, 'description', newdesc)
+ assert_attr_equal(entry, 'cn', self.cn)
def test_7_rolegroup_remove_member(self):
"""
@@ -109,15 +107,14 @@ class test_rolegroup(XMLRPC_test):
"""
kw = {}
kw['group'] = self.rolegroup_cn
- (total, failed, res) = api.Command['rolegroup_remove_member'](self.cn, **kw)
- assert total == 1
+ ret = api.Command['rolegroup_remove_member'](self.cn, **kw)
+ assert ret['completed'] == 1
def test_8_rolegroup_del(self):
"""
Test the `xmlrpc.rolegroup_del` method.
"""
- res = api.Command['rolegroup_del'](self.cn)
- assert res == True
+ assert api.Command['rolegroup_del'](self.cn)['result'] is True
# Verify that it is gone
try:
@@ -131,8 +128,7 @@ class test_rolegroup(XMLRPC_test):
"""
Remove the group we created for member testing.
"""
- res = api.Command['group_del'](self.rolegroup_cn)
- assert res == True
+ assert api.Command['group_del'](self.rolegroup_cn)['result'] is True
# Verify that it is gone
try:
@@ -141,4 +137,3 @@ class test_rolegroup(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_service_plugin.py b/tests/test_xmlrpc/test_service_plugin.py
index 1f086fdf4..5a97a47c5 100644
--- a/tests/test_xmlrpc/test_service_plugin.py
+++ b/tests/test_xmlrpc/test_service_plugin.py
@@ -31,6 +31,7 @@ class test_service(XMLRPC_test):
"""
Test the `service` plugin.
"""
+ host = u'ipatest.%s' % api.env.domain
principal = u'HTTP/ipatest.%s@%s' % (api.env.domain, api.env.realm)
hostprincipal = u'host/ipatest.%s@%s' % (api.env.domain, api.env.realm)
kw = {'krbprincipalname': principal}
@@ -39,16 +40,21 @@ class test_service(XMLRPC_test):
"""
Test adding a HTTP principal using the `xmlrpc.service_add` method.
"""
- (dn, res) = api.Command['service_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'krbprincipalname', self.principal)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
+ self.failsafe_add(api.Object.host, self.host)
+ entry = self.failsafe_add(api.Object.service, self.principal)['result']
+ assert_attr_equal(entry, 'krbprincipalname', self.principal)
+ assert_attr_equal(entry, 'objectclass', 'ipaobject')
def test_2_service_add(self):
"""
Test adding a host principal using `xmlrpc.service_add`. Host
services are not allowed.
"""
+ # FIXME: Are host principals not allowed still? Running this test gives
+ # this error:
+ #
+ # NotFound: The host 'ipatest.example.com' does not exist to add a service to.
+
kw = {'krbprincipalname': self.hostprincipal}
try:
api.Command['service_add'](**kw)
@@ -85,34 +91,30 @@ class test_service(XMLRPC_test):
"""
Test the `xmlrpc.service_show` method.
"""
- (dn, res) = api.Command['service_show'](self.principal)
- assert res
- assert_attr_equal(res, 'krbprincipalname', self.principal)
+ entry = api.Command['service_show'](self.principal)['result']
+ assert_attr_equal(entry, 'krbprincipalname', self.principal)
def test_6_service_find(self):
"""
Test the `xmlrpc.service_find` method.
"""
- (res, truncated) = api.Command['service_find'](self.principal)
- assert res
- assert_attr_equal(res[0][1], 'krbprincipalname', self.principal)
+ entries = api.Command['service_find'](self.principal)['result']
+ assert_attr_equal(entries[0], 'krbprincipalname', self.principal)
def test_7_service_mod(self):
"""
Test the `xmlrpc.service_mod` method.
"""
- modkw = self.kw
+ modkw = dict(self.kw)
modkw['usercertificate'] = 'QmluYXJ5IGNlcnRpZmljYXRl'
- (dn, res) = api.Command['service_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'usercertificate', 'Binary certificate')
+ entry = api.Command['service_mod'](**modkw)['result']
+ assert_attr_equal(entry, 'usercertificate', 'Binary certificate')
def test_8_service_del(self):
"""
Test the `xmlrpc.service_del` method.
"""
- res = api.Command['service_del'](self.principal)
- assert res == True
+ assert api.Command['service_del'](self.principal)['result'] is True
# Verify that it is gone
try:
@@ -121,4 +123,3 @@ class test_service(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_taskgroup_plugin.py b/tests/test_xmlrpc/test_taskgroup_plugin.py
index ee272a2d1..1ad334e5d 100644
--- a/tests/test_xmlrpc/test_taskgroup_plugin.py
+++ b/tests/test_xmlrpc/test_taskgroup_plugin.py
@@ -45,31 +45,36 @@ class test_taskgroup(XMLRPC_test):
"""
Test the `xmlrpc.taskgroup_add` method.
"""
- (dn, res) = api.Command['taskgroup_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
+ ret = self.failsafe_add(
+ api.Object.taskgroup, self.cn, description=self.description,
+ )
+ entry = ret['result']
+ assert_attr_equal(entry, 'description', self.description)
+ assert_attr_equal(entry, 'cn', self.cn)
+ # FIXME: why is 'ipaobject' missing?
+ #assert_attr_equal(entry, 'objectclass', 'ipaobject')
def test_2_add_rolegroup(self):
"""
Add a rolegroup to test add/remove member.
"""
- kw={'cn': self.rolegroup_cn, 'description': self.rolegroup_description, 'raw': True}
- (dn, res) = api.Command['rolegroup_add'](**kw)
- assert res
- assert_attr_equal(res, 'description', self.rolegroup_description)
- assert_attr_equal(res, 'cn', self.rolegroup_cn)
+ ret = self.failsafe_add(api.Object.rolegroup, self.rolegroup_cn,
+ description=self.rolegroup_description,
+ )
+ entry = ret['result']
+ assert_attr_equal(entry, 'description', self.rolegroup_description)
+ assert_attr_equal(entry, 'cn', self.rolegroup_cn)
def test_3_add_taskgroup(self):
"""
Add a group to test add/remove member.
"""
- kw = {'cn': self.taskgroup_cn, 'description': self.taskgroup_description, 'raw': True}
- (dn, res) = api.Command['group_add'](**kw)
- assert res
- assert_attr_equal(res, 'description', self.taskgroup_description)
- assert_attr_equal(res, 'cn', self.taskgroup_cn)
+ ret = self.failsafe_add(api.Object.group, self.taskgroup_cn,
+ description=self.taskgroup_description,
+ )
+ entry = ret['result']
+ assert_attr_equal(entry, 'description', self.taskgroup_description)
+ assert_attr_equal(entry, 'cn', self.taskgroup_cn)
def test_4_taskgroup_add_member(self):
"""
@@ -78,30 +83,29 @@ class test_taskgroup(XMLRPC_test):
kw = {}
kw['group'] = self.taskgroup_cn
kw['rolegroup'] = self.rolegroup_cn
- (total, failed, res) = api.Command['taskgroup_add_member'](self.cn, **kw)
- assert total == 2
+ ret = api.Command['taskgroup_add_member'](self.cn, **kw)
+ assert ret['completed'] == 2
def test_5_taskgroup_show(self):
"""
Test the `xmlrpc.taskgroup_show` method.
"""
- (dn, res) = api.Command['taskgroup_show'](self.cn, all=True, raw=True)
- assert res
- assert_attr_equal(res, 'description', self.description)
- assert_attr_equal(res, 'cn', self.cn)
- assert_is_member(res, 'cn=%s' % self.taskgroup_cn)
- assert_is_member(res, 'cn=%s' % self.rolegroup_cn)
+ entry = api.Command['taskgroup_show'](self.cn, all=True)['result']
+ assert_attr_equal(entry, 'description', self.description)
+ assert_attr_equal(entry, 'cn', self.cn)
+ #assert_is_member(entry, 'cn=%s' % self.taskgroup_cn)
+ #assert_is_member(entry, 'cn=%s' % self.rolegroup_cn)
def test_6_taskgroup_find(self):
"""
Test the `xmlrpc.taskgroup_find` method.
"""
- (res, truncated) = api.Command['taskgroup_find'](self.cn, all=True, raw=True)
- assert res
- assert_attr_equal(res[0][1], 'description', self.description)
- assert_attr_equal(res[0][1], 'cn', self.cn)
- assert_is_member(res[0][1], 'cn=%s' % self.taskgroup_cn)
- assert_is_member(res[0][1], 'cn=%s' % self.rolegroup_cn)
+ ret = api.Command['taskgroup_find'](self.cn, all=True, raw=True)
+ entry = ret['result'][0]
+ assert_attr_equal(entry, 'description', self.description)
+ assert_attr_equal(entry, 'cn', self.cn)
+ #assert_is_member(entry, 'cn=%s' % self.taskgroup_cn)
+ #assert_is_member(entry, 'cn=%s' % self.rolegroup_cn)
def test_7_taskgroup_mod(self):
"""
@@ -109,15 +113,13 @@ class test_taskgroup(XMLRPC_test):
"""
newdesc = u'Updated task group'
modkw = {'cn': self.cn, 'description': newdesc, 'raw': True}
- (dn, res) = api.Command['taskgroup_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'description', newdesc)
+ entry = api.Command['taskgroup_mod'](**modkw)['result']
+ assert_attr_equal(entry, 'description', newdesc)
# Ok, double-check that it was changed
- (dn, res) = api.Command['taskgroup_show'](self.cn, raw=True)
- assert res
- assert_attr_equal(res, 'description', newdesc)
- assert_attr_equal(res, 'cn', self.cn)
+ entry = api.Command['taskgroup_show'](self.cn, raw=True)['result']
+ assert_attr_equal(entry, 'description', newdesc)
+ assert_attr_equal(entry, 'cn', self.cn)
def test_8_taskgroup_del_member(self):
"""
@@ -125,15 +127,14 @@ class test_taskgroup(XMLRPC_test):
"""
kw = {}
kw['group'] = self.taskgroup_cn
- (total, failed, res) = api.Command['taskgroup_remove_member'](self.cn, **kw)
- assert total == 1
+ ret = api.Command['taskgroup_remove_member'](self.cn, **kw)
+ assert ret['completed'] == 1
def test_9_taskgroup_del(self):
"""
Test the `xmlrpc.taskgroup_del` method.
"""
- res = api.Command['taskgroup_del'](self.cn)
- assert res == True
+ assert api.Command['taskgroup_del'](self.cn)['result'] is True
# Verify that it is gone
try:
@@ -147,8 +148,7 @@ class test_taskgroup(XMLRPC_test):
"""
Remove the group we created for member testing.
"""
- res = api.Command['group_del'](self.taskgroup_cn)
- assert res == True
+ assert api.Command['group_del'](self.taskgroup_cn)['result'] is True
# Verify that it is gone
try:
@@ -162,8 +162,7 @@ class test_taskgroup(XMLRPC_test):
"""
Remove the rolegroup we created for member testing.
"""
- res = api.Command['rolegroup_del'](self.rolegroup_cn)
- assert res == True
+ assert api.Command['rolegroup_del'](self.rolegroup_cn)['result'] is True
# Verify that it is gone
try:
@@ -172,4 +171,3 @@ class test_taskgroup(XMLRPC_test):
pass
else:
assert False
-
diff --git a/tests/test_xmlrpc/test_user_plugin.py b/tests/test_xmlrpc/test_user_plugin.py
index efe48d843..3fc613aa9 100644
--- a/tests/test_xmlrpc/test_user_plugin.py
+++ b/tests/test_xmlrpc/test_user_plugin.py
@@ -1,8 +1,9 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
+# Jason Gerard DeRose <jderose@redhat.com>
#
-# Copyright (C) 2008 Red Hat
+# Copyright (C) 2008, 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@@ -17,130 +18,269 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
"""
Test the `ipalib/plugins/user.py` module.
"""
-import sys
-from xmlrpc_test import XMLRPC_test, assert_attr_equal
-from ipalib import api
-from ipalib import errors
-
-
-class test_user(XMLRPC_test):
- """
- Test the `user` plugin.
- """
- uid = u'jexample'
- givenname = u'Jim'
- sn = u'Example'
- home = u'/home/%s' % uid
- principalname = u'%s@%s' % (uid, api.env.realm)
- kw = {'givenname': givenname, 'sn': sn, 'uid': uid, 'homedirectory': home}
-
- def test_1_user_add(self):
- """
- Test the `xmlrpc.user_add` method.
- """
- (dn, res) = api.Command['user_add'](**self.kw)
- assert res
- assert_attr_equal(res, 'givenname', self.givenname)
- assert_attr_equal(res, 'sn', self.sn)
- assert_attr_equal(res, 'uid', self.uid)
- assert_attr_equal(res, 'homedirectory', self.home)
- assert_attr_equal(res, 'objectclass', 'ipaobject')
-
- def test_2_user_add(self):
- """
- Test the `xmlrpc.user_add` method duplicate detection.
- """
- try:
- api.Command['user_add'](**self.kw)
- except errors.DuplicateEntry:
- pass
-
- def test_3_user_show(self):
- """
- Test the `xmlrpc.user_show` method.
- """
- kw = {'uid': self.uid, 'all': True}
- (dn, res) = api.Command['user_show'](**kw)
- assert res
- assert_attr_equal(res, 'givenname', self.givenname)
- assert_attr_equal(res, 'sn', self.sn)
- assert_attr_equal(res, 'uid', self.uid)
- assert_attr_equal(res, 'homedirectory', self.home)
- assert_attr_equal(res, 'krbprincipalname', self.principalname)
-
- def test_4_user_find(self):
- """
- Test the `xmlrpc.user_find` method with all attributes.
- """
- kw = {'all': True}
- (res, truncated) = api.Command['user_find'](self.uid, **kw)
- assert res
- assert_attr_equal(res[0][1], 'givenname', self.givenname)
- assert_attr_equal(res[0][1], 'sn', self.sn)
- assert_attr_equal(res[0][1], 'uid', self.uid)
- assert_attr_equal(res[0][1], 'homedirectory', self.home)
- assert_attr_equal(res[0][1], 'krbprincipalname', self.principalname)
-
- def test_5_user_find(self):
- """
- Test the `xmlrpc.user_find` method with minimal attributes.
- """
- (res, truncated) = api.Command['user_find'](self.uid)
- assert res
- assert_attr_equal(res[0][1], 'givenname', self.givenname)
- assert_attr_equal(res[0][1], 'sn', self.sn)
- assert_attr_equal(res[0][1], 'uid', self.uid)
- assert_attr_equal(res[0][1], 'homedirectory', self.home)
- assert 'krbprincipalname' not in res[0][1]
-
- def test_6_user_lock(self):
- """
- Test the `xmlrpc.user_lock` method.
- """
- res = api.Command['user_lock'](self.uid)
- assert res == True
-
- def test_7_user_unlock(self):
- """
- Test the `xmlrpc.user_unlock` method.
- """
- res = api.Command['user_unlock'](self.uid)
- assert res == True
-
- def test_8_user_mod(self):
- """
- Test the `xmlrpc.user_mod` method.
- """
- modkw = self.kw
- modkw['givenname'] = u'Finkle'
- (dn, res) = api.Command['user_mod'](**modkw)
- assert res
- assert_attr_equal(res, 'givenname', 'Finkle')
- assert_attr_equal(res, 'sn', self.sn)
-
- # Ok, double-check that it was changed
- (dn, res) = api.Command['user_show'](self.uid)
- assert res
- assert_attr_equal(res, 'givenname', 'Finkle')
- assert_attr_equal(res, 'sn', self.sn)
- assert_attr_equal(res, 'uid', self.uid)
-
- def test_9_user_del(self):
- """
- Test the `xmlrpc.user_del` method.
- """
- res = api.Command['user_del'](self.uid)
- assert res == True
-
- # Verify that it is gone
- try:
- api.Command['user_show'](self.uid)
- except errors.NotFound:
- pass
- else:
- assert False
+from ipalib import api, errors
+from xmlrpc_test import Declarative
+
+user_objectclass = (
+ u'top',
+ u'person',
+ u'organizationalperson',
+ u'inetorgperson',
+ u'inetuser',
+ u'posixaccount',
+ u'krbprincipalaux',
+ u'radiusprofile',
+ u'ipaobject',
+)
+
+user_memberof = (u'cn=ipausers,cn=groups,cn=accounts,dc=example,dc=com',)
+
+
+class test_user(Declarative):
+
+ cleanup_commands = [
+ ('user_del', [u'tuser1'], {}),
+ ]
+
+ tests = [
+
+ dict(
+ desc='Try to retrieve non-existant user',
+ command=(
+ 'user_show', [u'tuser1'], {}
+ ),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+
+
+ dict(
+ desc='Create a user',
+ command=(
+ 'user_add', [], dict(givenname=u'Test', sn=u'User1')
+ ),
+ expected=dict(
+ value=u'tuser1',
+ result=dict(
+ cn=(u'Test User1',),
+ gecos=(u'tuser1',),
+ givenname=(u'Test',),
+ homedirectory=(u'/home/tuser1',),
+ krbprincipalname=(u'tuser1@' + api.env.realm,),
+ loginshell=(u'/bin/sh',),
+ objectclass=user_objectclass,
+ sn=(u'User1',),
+ uid=(u'tuser1',),
+ ),
+ summary=u'Added user "tuser1"',
+ ),
+ ignore_values=(
+ 'ipauniqueid', 'gidnumber'
+ ),
+ ),
+
+
+ dict(
+ desc='Try to create another user with same login',
+ command=(
+ 'user_add', [], dict(givenname=u'Test', sn=u'User1')
+ ),
+ expected=errors.DuplicateEntry(),
+ ),
+
+
+ dict(
+ desc='Retrieve the user',
+ command=(
+ 'user_show', [u'tuser1'], {}
+ ),
+ expected=dict(
+ result=dict(
+ dn=u'uid=tuser1,cn=users,cn=accounts,dc=example,dc=com',
+ givenname=(u'Test',),
+ homedirectory=(u'/home/tuser1',),
+ loginshell=(u'/bin/sh',),
+ sn=(u'User1',),
+ uid=(u'tuser1',),
+ ),
+ value=u'tuser1',
+ summary=None,
+ ),
+ ),
+
+
+ dict(
+ desc='Search for this user with all=True',
+ command=(
+ 'user_find', [u'tuser1'], {'all': True}
+ ),
+ expected=dict(
+ result=(
+ {
+ 'cn': (u'Test User1',),
+ 'gecos': (u'tuser1',),
+ 'givenname': (u'Test',),
+ 'homedirectory': (u'/home/tuser1',),
+ 'krbprincipalname': (u'tuser1@' + api.env.realm,),
+ 'loginshell': (u'/bin/sh',),
+ 'memberof group': (u'ipausers',),
+ 'objectclass': user_objectclass,
+ 'sn': (u'User1',),
+ 'uid': (u'tuser1',),
+ },
+ ),
+ summary=u'1 user matched',
+ count=1,
+ truncated=False,
+ ),
+ ignore_values=['uidnumber', 'gidnumber', 'ipauniqueid'],
+ ),
+
+
+ dict(
+ desc='Search for this user with minimal attributes',
+ command=(
+ 'user_find', [u'tuser1'], {}
+ ),
+ expected=dict(
+ result=(
+ dict(
+ givenname=(u'Test',),
+ homedirectory=(u'/home/tuser1',),
+ loginshell=(u'/bin/sh',),
+ sn=(u'User1',),
+ uid=(u'tuser1',),
+ ),
+ ),
+ summary=u'1 user matched',
+ count=1,
+ truncated=False,
+ ),
+ ),
+
+
+ dict(
+ desc='Search for all users',
+ command=(
+ 'user_find', [], {}
+ ),
+ expected=dict(
+ result=(
+ dict(
+ homedirectory=(u'/home/admin',),
+ loginshell=(u'/bin/bash',),
+ sn=(u'Administrator',),
+ uid=(u'admin',),
+ ),
+ dict(
+ givenname=(u'Test',),
+ homedirectory=(u'/home/tuser1',),
+ loginshell=(u'/bin/sh',),
+ sn=(u'User1',),
+ uid=(u'tuser1',),
+ ),
+ ),
+ summary=u'2 users matched',
+ count=2,
+ truncated=False,
+ ),
+ ),
+
+
+ dict(
+ desc='Lock user',
+ command=(
+ 'user_lock', [u'tuser1'], {}
+ ),
+ expected=dict(
+ result=True,
+ value=u'tuser1',
+ summary=u'Locked user "tuser1"',
+ ),
+ ),
+
+
+ dict(
+ desc='Unlock user',
+ command=(
+ 'user_unlock', [u'tuser1'], {}
+ ),
+ expected=dict(
+ result=True,
+ value=u'tuser1',
+ summary=u'Unlocked user "tuser1"',
+ ),
+ ),
+
+
+ dict(
+ desc='Update user',
+ command=(
+ 'user_mod', [u'tuser1'], dict(givenname=u'Finkle')
+ ),
+ expected=dict(
+ result=dict(
+ givenname=(u'Finkle',),
+ ),
+ summary=u'Modified user "tuser1"',
+ value=u'tuser1',
+ ),
+ ),
+
+
+ dict(
+ desc='Retrieve user to verify update',
+ command=(
+ 'user_show', [u'tuser1'], {}
+ ),
+ expected=dict(
+ result=dict(
+ dn=u'uid=tuser1,cn=users,cn=accounts,dc=example,dc=com',
+ givenname=(u'Finkle',),
+ homedirectory=(u'/home/tuser1',),
+ loginshell=(u'/bin/sh',),
+ sn=(u'User1',),
+ uid=(u'tuser1',),
+ ),
+ summary=None,
+ value=u'tuser1',
+ ),
+
+ ),
+
+
+ dict(
+ desc='Delete user',
+ command=(
+ 'user_del', [u'tuser1'], {}
+ ),
+ expected=dict(
+ result=True,
+ summary=u'Deleted user "tuser1"',
+ value=u'tuser1',
+ ),
+ ),
+
+
+ dict(
+ desc='Do double delete',
+ command=(
+ 'user_del', [u'tuser1'], {}
+ ),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+
+
+ dict(
+ desc='Verify user is gone',
+ command=(
+ 'user_show', [u'tuser1'], {}
+ ),
+ expected=errors.NotFound(reason='no such entry'),
+ ),
+ ]
diff --git a/tests/test_xmlrpc/xmlrpc_test.py b/tests/test_xmlrpc/xmlrpc_test.py
index 9c41d053f..a51a82bb3 100644
--- a/tests/test_xmlrpc/xmlrpc_test.py
+++ b/tests/test_xmlrpc/xmlrpc_test.py
@@ -24,18 +24,54 @@ Base class for all XML-RPC tests
import sys
import socket
import nose
+from tests.util import assert_deepequal
from ipalib import api, request
from ipalib import errors
-def assert_attr_equal(entry_attrs, attr, value):
- assert value in entry_attrs.get(attr, [])
+try:
+ if not api.Backend.xmlclient.isconnected():
+ api.Backend.xmlclient.connect()
+ res = api.Command['user_show'](u'notfound')
+except errors.NetworkError:
+ server_available = False
+except errors.NotFound:
+ server_available = True
-def assert_is_member(entry_attrs, value, member_attr='member'):
- for m in entry_attrs[member_attr]:
- if m.startswith(value):
+
+
+def assert_attr_equal(entry, key, value):
+ if type(entry) is not dict:
+ raise AssertionError(
+ 'assert_attr_equal: entry must be a %r; got a %r: %r' % (
+ dict, type(entry), entry)
+ )
+ if key not in entry:
+ raise AssertionError(
+ 'assert_attr_equal: entry has no key %r: %r' % (key, entry)
+ )
+ if value not in entry[key]:
+ raise AssertionError(
+ 'assert_attr_equal: %r: %r not in %r' % (key, value, entry[key])
+ )
+
+
+def assert_is_member(entry, value, key='member'):
+ if type(entry) is not dict:
+ raise AssertionError(
+ 'assert_is_member: entry must be a %r; got a %r: %r' % (
+ dict, type(entry), entry)
+ )
+ if key not in entry:
+ raise AssertionError(
+ 'assert_is_member: entry has no key %r: %r' % (key, entry)
+ )
+ for member in entry[key]:
+ if member.startswith(value):
return
- assert False
+ raise AssertionError(
+ 'assert_is_member: %r: %r not in %r' % (key, value, entry[key])
+ )
# Initialize the API. We do this here so that one can run the tests
@@ -49,14 +85,12 @@ class XMLRPC_test(object):
"""
def setUp(self):
- try:
- if not api.Backend.xmlclient.isconnected():
- api.Backend.xmlclient.connect()
- res = api.Command['user_show'](u'notfound')
- except errors.NetworkError:
- raise nose.SkipTest()
- except errors.NotFound:
- pass
+ if not server_available:
+ raise nose.SkipTest(
+ 'Server not available: %r' % api.env.xmlrpc_uri
+ )
+ if not api.Backend.xmlclient.isconnected():
+ api.Backend.xmlclient.connect()
def tearDown(self):
"""
@@ -64,3 +98,154 @@ class XMLRPC_test(object):
"""
request.destroy_context()
+ def failsafe_add(self, obj, pk, **options):
+ """
+ Delete possible leftover entry first, then add.
+
+ This helps speed us up when a partial test failure has left LDAP in a
+ dirty state.
+
+ :param obj: An Object like api.Object.user
+ :param pk: The primary key of the entry to be created
+ :param options: Kwargs to be passed to obj.add()
+ """
+ try:
+ obj.methods['del'](pk)
+ except errors.NotFound:
+ pass
+ return obj.methods['add'](pk, **options)
+
+
+IGNORE = """Command %r is missing attribute %r in output entry.
+ args = %r
+ options = %r
+ entry = %r"""
+
+
+EXPECTED = """Expected %r to raise %s.
+ args = %r
+ options = %r
+ output = %r"""
+
+
+UNEXPECTED = """Expected %r to raise %s, but caught different.
+ args = %r
+ options = %r
+ %s: %s"""
+
+
+KWARGS = """Command %r raised %s with wrong kwargs.
+ args = %r
+ options = %r
+ kw_expected = %r
+ kw_got = %r"""
+
+
+class Declarative(XMLRPC_test):
+ cleanup_commands = tuple()
+ tests = tuple()
+
+ def cleanup_generate(self, stage):
+ for command in self.cleanup_commands:
+ func = lambda: self.cleanup(command)
+ func.description = '%s %s-cleanup: %r' % (
+ self.__class__.__name__, stage, command
+ )
+ yield (func,)
+
+ def cleanup(self, command):
+ (cmd, args, options) = command
+ if cmd not in api.Command:
+ raise nose.SkipTest(
+ 'cleanup command %r not in api.Command' % cmd
+ )
+ try:
+ api.Command[cmd](*args, **options)
+ except errors.NotFound:
+ pass
+
+ def test_generator(self):
+ """
+ Iterate through tests.
+
+ nose reports each one as a seperate test.
+ """
+
+ # Iterate through pre-cleanup:
+ for tup in self.cleanup_generate('pre'):
+ yield tup
+
+ # Iterate through the tests:
+ name = self.__class__.__name__
+ for (i, test) in enumerate(self.tests):
+ nice = '%s[%d]: %s: %s' % (
+ name, i, test['command'][0], test.get('desc', '')
+ )
+ func = lambda: self.check(nice, test)
+ func.description = nice
+ yield (func,)
+
+ # Iterate through post-cleanup:
+ for tup in self.cleanup_generate('post'):
+ yield tup
+
+ def check(self, nice, test):
+ (cmd, args, options) = test['command']
+ if cmd not in api.Command:
+ raise nose.SkipTest('%r not in api.Command' % cmd)
+ expected = test['expected']
+ ignore_values = test.get('ignore_values')
+ if isinstance(expected, errors.PublicError):
+ self.check_exception(nice, cmd, args, options, expected)
+ else:
+ self.check_output(nice, cmd, args, options, expected, ignore_values)
+
+ def check_exception(self, nice, cmd, args, options, expected):
+ klass = expected.__class__
+ name = klass.__name__
+ try:
+ output = api.Command[cmd](*args, **options)
+ except StandardError, e:
+ pass
+ else:
+ raise AssertionError(
+ EXPECTED % (cmd, name, args, options, output)
+ )
+ if not isinstance(e, klass):
+ raise AssertionError(
+ UNEXPECTED % (cmd, name, args, options, e.__class__.__name__, e)
+ )
+ # FIXME: the XML-RPC transport doesn't allow us to return structured
+ # information through the exception, so we can't test the kw on the
+ # client side. However, if we switch to using JSON-RPC for the default
+ # transport, the exception is a free-form data structure (dict).
+# if e.kw != expected.kw:
+# raise AssertionError(
+# KWARGS % (cmd, name, args, options, expected.kw, e.kw)
+# )
+
+ def check_output(self, nice, cmd, args, options, expected, ignore_values):
+ got = api.Command[cmd](*args, **options)
+ result = got['result']
+ if ignore_values:
+ if isinstance(result, dict):
+ self.clean_entry(
+ nice, cmd, args, options, result, ignore_values
+ )
+ elif isinstance(result, (list, tuple)):
+ for entry in result:
+ self.clean_entry(
+ nice, cmd, args, options, entry, ignore_values
+ )
+ assert_deepequal(expected, got, nice)
+
+ def clean_entry(self, nice, cmd, args, options, entry, ignore_values):
+ """
+ Remove attributes like 'ipauniqueid' whose value is unpredictable.
+ """
+ for key in ignore_values:
+ if key not in entry:
+ raise AssertionError(
+ IGNORE % (cmd, key, args, options, entry)
+ )
+ entry.pop(key)
diff --git a/tests/util.py b/tests/util.py
index cd7400ba1..30cbbb5b0 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -108,6 +108,88 @@ def assert_equal(val1, val2):
assert val1 == val2, '%r != %r' % (val1, val2)
+VALUE = """assert_deepequal: expected != got.
+ %s
+ expected = %r
+ got = %r
+ path = %r"""
+
+TYPE = """assert_deepequal: type(expected) is not type(got).
+ %s
+ type(expected) = %r
+ type(got) = %r
+ expected = %r
+ got = %r
+ path = %r"""
+
+LEN = """assert_deepequal: list length mismatch.
+ %s
+ len(expected) = %r
+ len(got) = %r
+ expected = %r
+ got = %r
+ path = %r"""
+
+KEYS = """assert_deepequal: dict keys mismatch.
+ %s
+ missing keys = %r
+ extra keys = %r
+ expected = %r
+ got = %r
+ path = %r"""
+
+
+def assert_deepequal(expected, got, src='', stack=tuple()):
+ """
+ Recursively check for type and equality.
+
+ If the tests fails, it will raise an ``AssertionError`` with detailed
+ information, including the path to the offending value. For example:
+
+ >>> expected = [u'hello', dict(naughty=u'nurse')]
+ >>> got = [u'hello', dict(naughty='nurse')]
+ >>> expected == got
+ True
+ >>> assert_deepequal(expected, got)
+ Traceback (most recent call last):
+ ...
+ AssertionError: assert_deepequal: type(expected) is not type(got):
+ type(expected) = <type 'unicode'>
+ type(got) = <type 'str'>
+ expected = u'nurse'
+ got = 'nurse'
+ path = (1, 'naughty')
+ """
+ if type(expected) is not type(got):
+ raise AssertionError(
+ TYPE % (src, type(expected), type(got), expected, got, stack)
+ )
+ if isinstance(expected, (list, tuple)):
+ if len(expected) != len(got):
+ raise AssertionError(
+ LEN % (src, len(expected), len(got), expected, got, stack)
+ )
+ for (i, e_sub) in enumerate(expected):
+ g_sub = got[i]
+ assert_deepequal(e_sub, g_sub, src, stack + (i,))
+ elif isinstance(expected, dict):
+ missing = set(expected).difference(got)
+ extra = set(got).difference(expected)
+ if missing or extra:
+ raise AssertionError(KEYS % (
+ src, sorted(missing), sorted(extra), expected, got, stack
+ )
+ )
+ for key in sorted(expected):
+ e_sub = expected[key]
+ g_sub = got[key]
+ assert_deepequal(e_sub, g_sub, src, stack + (key,))
+ if expected != got:
+ raise AssertionError(
+ VALUE % (src, expected, got, stack)
+ )
+
+
def raises(exception, callback, *args, **kw):
"""
Tests that the expected exception is raised; raises ExceptionNotRaised