From 1235dfa7bf4b249eb6da8eab8d8a2c7b0eef98db Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 17 Apr 2012 12:42:35 -0400 Subject: Fail on unknown Command options When unknown keyword arguments are passed to a Command, raise an error instead of ignoring them. Options used when IPA calls its commands internally are listed in a new Command attribute called internal_options, and allowed. Previous patches (0b01751c, c45174d6, c5689e7f) made IPA not use unknown keyword arguments in its own commands and tests, but since that some violations were reintroduced in permission_find and tests. Fix those. Tests included; both a frontend unittest and a XML-RPC test via the ping plugin (which was untested previously). https://fedorahosted.org/freeipa/ticket/2509 --- ipalib/frontend.py | 13 ++++++-- ipalib/plugins/aci.py | 2 ++ ipalib/plugins/automount.py | 6 +++- ipalib/plugins/permission.py | 47 ++++++++++++++++---------- tests/test_cmdline/test_cli.py | 6 ++-- tests/test_ipalib/test_frontend.py | 5 +++ tests/test_xmlrpc/test_host_plugin.py | 2 +- tests/test_xmlrpc/test_netgroup_plugin.py | 6 ++-- tests/test_xmlrpc/test_permission_plugin.py | 12 +++++++ tests/test_xmlrpc/test_ping_plugin.py | 52 +++++++++++++++++++++++++++++ 10 files changed, 123 insertions(+), 28 deletions(-) create mode 100644 tests/test_xmlrpc/test_ping_plugin.py diff --git a/ipalib/frontend.py b/ipalib/frontend.py index 10087ba24..c28fa54ae 100644 --- a/ipalib/frontend.py +++ b/ipalib/frontend.py @@ -29,7 +29,8 @@ from parameters import create_param, parse_param_spec, Param, Str, Flag, Passwor from output import Output, Entry, ListOfEntries from text import _, ngettext -from errors import ZeroArgumentError, MaxArgumentError, OverlapError, RequiresRoot, VersionError, RequirementError +from errors import (ZeroArgumentError, MaxArgumentError, OverlapError, + RequiresRoot, VersionError, RequirementError, OptionError) from errors import InvocationError from constants import TYPE_ERROR from ipapython.version import API_VERSION @@ -404,6 +405,8 @@ class Command(HasParam): output_params = Plugin.finalize_attr('output_params') has_output_params = tuple() + internal_options = tuple() + msg_summary = None msg_truncated = _('Results are truncated, try a more specific search') @@ -520,7 +523,13 @@ class Command(HasParam): def __options_2_params(self, options): for name in self.params: if name in options: - yield (name, options[name]) + yield (name, options.pop(name)) + # If any options remain, they are either internal or unknown + unused_keys = set(options).difference(self.internal_options) + unused_keys.discard('version') + if unused_keys: + raise OptionError(_('Unknown option: %(option)s'), + option=unused_keys.pop()) def args_options_2_entry(self, *args, **options): """ diff --git a/ipalib/plugins/aci.py b/ipalib/plugins/aci.py index f7c6039a9..7a27ce116 100644 --- a/ipalib/plugins/aci.py +++ b/ipalib/plugins/aci.py @@ -610,6 +610,8 @@ class aci_mod(crud.Update): takes_options = (_prefix_option,) + internal_options = ['rename'] + msg_summary = _('Modified ACI "%(value)s"') def execute(self, aciname, **kw): diff --git a/ipalib/plugins/automount.py b/ipalib/plugins/automount.py index dda14cc78..5c9f42b4c 100644 --- a/ipalib/plugins/automount.py +++ b/ipalib/plugins/automount.py @@ -787,6 +787,8 @@ class automountkey_add(LDAPCreate): msg_summary = _('Added automount key "%(value)s"') + internal_options = ['description', 'add_operation'] + def pre_callback(self, ldap, dn, entry_attrs, *keys, **options): options.pop('add_operation', None) options.pop('description', None) @@ -846,7 +848,7 @@ class automountmap_add_indirect(LDAPCreate): self.api.Command['automountmap_show'](location, parentmap) # Add a submount key self.api.Command['automountkey_add']( - location, parentmap, automountkey=key, key=key, + location, parentmap, automountkey=key, automountinformation='-fstype=autofs ldap:%s' % map) else: # adding to auto.master # Ensure auto.master exists @@ -910,6 +912,8 @@ class automountkey_mod(LDAPUpdate): msg_summary = _('Modified automount key "%(value)s"') + internal_options = ['newautomountkey'] + takes_options = LDAPUpdate.takes_options + ( IA5Str('newautomountinformation?', cli_name='newinfo', diff --git a/ipalib/plugins/permission.py b/ipalib/plugins/permission.py index d6fe385b1..05d19ad8d 100644 --- a/ipalib/plugins/permission.py +++ b/ipalib/plugins/permission.py @@ -90,6 +90,16 @@ output_params = ( ), ) +def filter_options(options, keys): + """Return a dict that includes entries from `options` that are in `keys` + + example: + >>> filtered = filter_options({'a': 1, 'b': 2, 'c': 3}, ['a', 'c']) + >>> filtered == {'a': 1, 'c': 3} + True + """ + return dict((k, options[k]) for k in keys if k in options) + class permission(LDAPObject): """ Permission object. @@ -331,7 +341,7 @@ class permission_mod(LDAPUpdate): cn = options['rename'] # rename finished - common_options = dict((k, options[k]) for k in ('all', 'raw') if k in options) + common_options = filter_options(options, ['all', 'raw']) result = self.api.Command.permission_show(cn, **common_options)['result'] for r in result: if not r.startswith('member_'): @@ -350,12 +360,19 @@ class permission_find(LDAPSearch): has_output_params = LDAPSearch.has_output_params + output_params def post_callback(self, ldap, entries, truncated, *args, **options): + + # There is an option/param overlap: "cn" must be passed as "aciname" + # to aci-find. Besides that we don't need cn anymore so pop it + aciname = options.pop('cn', None) + pkey_only = options.pop('pkey_only', False) if not pkey_only: for entry in entries: (dn, attrs) = entry try: - aci = self.api.Command.aci_show(attrs['cn'][0], aciprefix=ACI_PREFIX, **options)['result'] + common_options = filter_options(options, ['all', 'raw']) + aci = self.api.Command.aci_show(attrs['cn'][0], + aciprefix=ACI_PREFIX, **common_options)['result'] # copy information from respective ACI to permission entry for attr in self.obj.aci_attributes: @@ -377,23 +394,16 @@ class permission_find(LDAPSearch): # aren't already in the list along with their permission info. opts = copy.copy(options) + if aciname: + opts['aciname'] = aciname opts['aciprefix'] = ACI_PREFIX - try: - # permission ACI attribute is needed - del opts['raw'] - except: - pass - if 'cn' in options: - # the attribute for name is difference in acis - opts['aciname'] = options['cn'] + # permission ACI attribute is needed + opts.pop('raw', None) + opts.pop('sizelimit', None) aciresults = self.api.Command.aci_find(*args, **opts) truncated = truncated or aciresults['truncated'] results = aciresults['result'] - if 'cn' in options: - # there is an option/param overlap if --name is in the - # search list, we don't need cn anymore so drop it. - options.pop('cn') for aci in results: found = False if 'permission' in aci: @@ -403,7 +413,9 @@ class permission_find(LDAPSearch): found = True break if not found: - permission = self.api.Command.permission_show(aci['permission'], **options)['result'] + common_options = filter_options(options, ['all', 'raw']) + permission = self.api.Command.permission_show( + aci['permission'], **common_options)['result'] dn = permission['dn'] del permission['dn'] if pkey_only: @@ -429,8 +441,9 @@ class permission_show(LDAPRetrieve): has_output_params = LDAPRetrieve.has_output_params + output_params def post_callback(self, ldap, dn, entry_attrs, *keys, **options): try: - common_options = dict((k, options[k]) for k in ('all', 'raw') if k in options) - aci = self.api.Command.aci_show(keys[-1], aciprefix=ACI_PREFIX, **common_options)['result'] + common_options = filter_options(options, ['all', 'raw']) + aci = self.api.Command.aci_show(keys[-1], aciprefix=ACI_PREFIX, + **common_options)['result'] for attr in self.obj.aci_attributes: if attr in aci: entry_attrs[attr] = aci[attr] diff --git a/tests/test_cmdline/test_cli.py b/tests/test_cmdline/test_cli.py index 095577a3b..d961f8725 100644 --- a/tests/test_cmdline/test_cli.py +++ b/tests/test_cmdline/test_cli.py @@ -128,8 +128,7 @@ class TestCLIParsing(object): def test_dnsrecord_del_all(self): try: self.run_command('dnszone_add', idnsname=u'test-example.com', - idnssoamname=u'ns.test-example.com', - admin_email=u'devnull@test-example.com', force=True) + idnssoamname=u'ns.test-example.com', force=True) except errors.NotFound: raise nose.SkipTest('DNS is not configured') try: @@ -162,8 +161,7 @@ class TestCLIParsing(object): def test_dnsrecord_del_one_by_one(self): try: self.run_command('dnszone_add', idnsname=u'test-example.com', - idnssoamname=u'ns.test-example.com', - admin_email=u'devnull@test-example.com', force=True) + idnssoamname=u'ns.test-example.com', force=True) except errors.NotFound: raise nose.SkipTest('DNS is not configured') try: diff --git a/tests/test_ipalib/test_frontend.py b/tests/test_ipalib/test_frontend.py index b717a43ad..5f7ce65fb 100644 --- a/tests/test_ipalib/test_frontend.py +++ b/tests/test_ipalib/test_frontend.py @@ -511,6 +511,11 @@ class test_Command(ClassChecker): assert e.count == 2 assert str(e) == "command 'example' takes at most 2 arguments" + # Test that OptionError is raised when an extra option is given: + o = self.get_instance() + e = raises(errors.OptionError, o.args_options_2_params, bad_option=True) + assert e.option == 'bad_option' + # Test that OverlapError is raised: o = self.get_instance(args=('one', 'two'), options=('three', 'four')) e = raises(errors.OverlapError, o.args_options_2_params, diff --git a/tests/test_xmlrpc/test_host_plugin.py b/tests/test_xmlrpc/test_host_plugin.py index 8798168af..69ef82e20 100644 --- a/tests/test_xmlrpc/test_host_plugin.py +++ b/tests/test_xmlrpc/test_host_plugin.py @@ -126,7 +126,7 @@ class test_host(Declarative): command=('host_add', [fqdn1], dict( description=u'Test host 1', - locality=u'Undisclosed location 1', + l=u'Undisclosed location 1', force=True, ), ), diff --git a/tests/test_xmlrpc/test_netgroup_plugin.py b/tests/test_xmlrpc/test_netgroup_plugin.py index d51287bcd..951bc77a3 100644 --- a/tests/test_xmlrpc/test_netgroup_plugin.py +++ b/tests/test_xmlrpc/test_netgroup_plugin.py @@ -726,7 +726,7 @@ class test_netgroup(Declarative): dict( - desc='Add duplicatehost %r to netgroup %r' % (host1, netgroup1), + desc='Add duplicate host %r to netgroup %r' % (host1, netgroup1), command=( 'netgroup_add_member', [netgroup1], dict(host=host1) ), @@ -960,8 +960,8 @@ class test_netgroup(Declarative): ), dict( - desc='Search for all netgroups using empty memberuser', - command=('netgroup_find', [], dict(memberuser=None)), + desc='Search for all netgroups using empty member user', + command=('netgroup_find', [], dict(user=None)), expected=dict( count=2, truncated=False, diff --git a/tests/test_xmlrpc/test_permission_plugin.py b/tests/test_xmlrpc/test_permission_plugin.py index 6613c9bba..2e20b4f97 100644 --- a/tests/test_xmlrpc/test_permission_plugin.py +++ b/tests/test_xmlrpc/test_permission_plugin.py @@ -643,6 +643,18 @@ class test_permission(Declarative): ), + dict( + desc='Search using nonexistent --subtree', + command=('permission_find', [], {'subtree': u'foo'}), + expected=dict( + count=0, + truncated=False, + summary=u'0 permissions matched', + result=[], + ), + ), + + dict( desc='Delete %r' % permission1_renamed_ucase, command=('permission_del', [permission1_renamed_ucase], {}), diff --git a/tests/test_xmlrpc/test_ping_plugin.py b/tests/test_xmlrpc/test_ping_plugin.py new file mode 100644 index 000000000..284aed54f --- /dev/null +++ b/tests/test_xmlrpc/test_ping_plugin.py @@ -0,0 +1,52 @@ +# Authors: +# Petr Viktorin +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test the `ipalib/plugins/ping.py` module, and XML-RPC in general. +""" + +from ipalib import api, errors, _ +from tests.util import assert_equal, Fuzzy +from xmlrpc_test import Declarative + + +class test_ping(Declarative): + + tests = [ + dict( + desc='Ping the server', + command=('ping', [], {}), + expected=dict( + summary=Fuzzy('IPA server version .*. API version .*')), + ), + + dict( + desc='Try to ping with an argument', + command=('ping', ['bad_arg'], {}), + expected=errors.ZeroArgumentError(name='ping'), + ), + + dict( + desc='Try to ping with an option', + command=('ping', [], dict(bad_arg=True)), + expected=errors.OptionError(_('Unknown option: %(option)s'), + option='bad_arg'), + ), + + ] -- cgit