diff options
author | Petr Viktorin <pviktori@redhat.com> | 2013-05-21 13:40:27 +0200 |
---|---|---|
committer | Martin Kosek <mkosek@redhat.com> | 2013-06-17 19:22:50 +0200 |
commit | c60142efda817f030a7495cd6fe4a19953e55afa (patch) | |
tree | 31a840ceddd4381311bbc879f9851bb71a8e2ffa /ipatests/test_cmdline | |
parent | 6d66e826c1c248dffc80056b20c1e4b74b04d46f (diff) | |
download | freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.gz freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.xz freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.zip |
Make an ipa-tests package
Rename the 'tests' directory to 'ipa-tests', and create an ipa-tests RPM
containing the test suite
Part of the work for: https://fedorahosted.org/freeipa/ticket/3654
Diffstat (limited to 'ipatests/test_cmdline')
-rw-r--r-- | ipatests/test_cmdline/cmdline.py | 70 | ||||
-rw-r--r-- | ipatests/test_cmdline/test_cli.py | 327 | ||||
-rw-r--r-- | ipatests/test_cmdline/test_help.py | 141 | ||||
-rw-r--r-- | ipatests/test_cmdline/test_ipagetkeytab.py | 152 |
4 files changed, 690 insertions, 0 deletions
diff --git a/ipatests/test_cmdline/cmdline.py b/ipatests/test_cmdline/cmdline.py new file mode 100644 index 000000000..6f3541d27 --- /dev/null +++ b/ipatests/test_cmdline/cmdline.py @@ -0,0 +1,70 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2010 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Base class for all cmdline tests +""" + +import nose +import krbV + +from ipalib import api +from ipalib import errors +from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test +from ipaserver.plugins.ldap2 import ldap2 +from ipapython import ipautil + +# See if our LDAP server is up and we can talk to it over GSSAPI +ccache = krbV.default_context().default_ccache() + +try: + conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn) + conn.connect(ccache=ccache) + conn.disconnect() + server_available = True +except errors.DatabaseError: + server_available = False +except Exception, e: + server_available = False + +class cmdline_test(XMLRPC_test): + """ + Base class for all command-line tests + """ + # some reasonable default command + command = '/bin/ls' + + def setUp(self): + # raise an error if the command is missing even if the remote + # server is not available. + if not ipautil.file_exists(self.command): + raise AssertionError( + 'Command %r not available' % self.command + ) + super(cmdline_test, self).setUp() + if not server_available: + raise nose.SkipTest( + 'Server not available: %r' % api.env.xmlrpc_uri + ) + + def tearDown(self): + """ + nose tear-down fixture. + """ + super(cmdline_test, self).tearDown() diff --git a/ipatests/test_cmdline/test_cli.py b/ipatests/test_cmdline/test_cli.py new file mode 100644 index 000000000..fe411b703 --- /dev/null +++ b/ipatests/test_cmdline/test_cli.py @@ -0,0 +1,327 @@ +import shlex +import sys +import contextlib +import StringIO + +import nose + +from ipatests import util +from ipalib import api, errors +from ipapython.version import API_VERSION + + +class TestCLIParsing(object): + """Tests that commandlines are correctly parsed to Command keyword args + """ + def check_command(self, commandline, expected_command_name, **kw_expected): + argv = shlex.split(commandline) + executioner = api.Backend.cli + + cmd = executioner.get_command(argv) + kw_got = executioner.parse(cmd, argv[1:]) + kw_got = executioner.process_keyword_arguments(cmd, kw_got) + util.assert_deepequal(expected_command_name, cmd.name, 'Command name') + util.assert_deepequal(kw_expected, kw_got) + + def run_command(self, command_name, **kw): + """Run a command on the server""" + if not api.Backend.xmlclient.isconnected(): + api.Backend.xmlclient.connect(fallback=False) + try: + api.Command[command_name](**kw) + except errors.NetworkError: + raise nose.SkipTest('%r: Server not available: %r' % + (self.__module__, api.env.xmlrpc_uri)) + + @contextlib.contextmanager + def fake_stdin(self, string_in): + """Context manager that temporarily replaces stdin to read a string""" + old_stdin = sys.stdin + sys.stdin = StringIO.StringIO(string_in) + yield + sys.stdin = old_stdin + + def test_ping(self): + self.check_command('ping', 'ping', + version=API_VERSION) + + def test_user_show(self): + self.check_command('user-show admin', 'user_show', + uid=u'admin', + rights=False, + raw=False, + all=False, + version=API_VERSION) + + def test_user_show_underscore(self): + self.check_command('user_show admin', 'user_show', + uid=u'admin', + rights=False, + raw=False, + all=False, + version=API_VERSION) + + def test_group_add(self): + self.check_command('group-add tgroup1 --desc="Test group"', + 'group_add', + cn=u'tgroup1', + description=u'Test group', + nonposix=False, + external=False, + raw=False, + all=False, + version=API_VERSION) + + def test_sudocmdgroup_add_member(self): + # Test CSV splitting is not done + self.check_command( + # The following is as it would appear on the command line: + r'sudocmdgroup-add-member tcmdgroup1 --sudocmds=ab,c --sudocmds=d', + 'sudocmdgroup_add_member', + cn=u'tcmdgroup1', + sudocmd=[u'ab,c', u'd'], + raw=False, + all=False, + version=API_VERSION) + + def test_group_add_nonposix(self): + self.check_command('group-add tgroup1 --desc="Test group" --nonposix', + 'group_add', + cn=u'tgroup1', + description=u'Test group', + nonposix=True, + external=False, + raw=False, + all=False, + version=API_VERSION) + + def test_group_add_gid(self): + self.check_command('group-add tgroup1 --desc="Test group" --gid=1234', + 'group_add', + cn=u'tgroup1', + description=u'Test group', + gidnumber=u'1234', + nonposix=False, + external=False, + raw=False, + all=False, + version=API_VERSION) + + def test_group_add_interactive(self): + with self.fake_stdin('Test group\n'): + self.check_command('group-add tgroup1', 'group_add', + cn=u'tgroup1', + description=u'Test group', + nonposix=False, + external=False, + raw=False, + all=False, + version=API_VERSION) + + def test_dnsrecord_add(self): + self.check_command('dnsrecord-add test-example.com ns --a-rec=1.2.3.4', + 'dnsrecord_add', + dnszoneidnsname=u'test-example.com', + idnsname=u'ns', + arecord=u'1.2.3.4', + structured=False, + force=False, + raw=False, + all=False, + version=API_VERSION) + + def test_dnsrecord_del_all(self): + try: + self.run_command('dnszone_add', idnsname=u'test-example.com', + idnssoamname=u'ns.test-example.com', force=True) + except errors.NotFound: + raise nose.SkipTest('DNS is not configured') + try: + self.run_command('dnsrecord_add', + dnszoneidnsname=u'test-example.com', + idnsname=u'ns', arecord=u'1.2.3.4') + with self.fake_stdin('yes\n'): + self.check_command('dnsrecord_del test-example.com ns', + 'dnsrecord_del', + dnszoneidnsname=u'test-example.com', + idnsname=u'ns', + del_all=True, + structured=False, + version=API_VERSION) + with self.fake_stdin('YeS\n'): + self.check_command('dnsrecord_del test-example.com ns', + 'dnsrecord_del', + dnszoneidnsname=u'test-example.com', + idnsname=u'ns', + del_all=True, + structured=False, + version=API_VERSION) + finally: + self.run_command('dnszone_del', idnsname=u'test-example.com') + + def test_dnsrecord_del_one_by_one(self): + try: + self.run_command('dnszone_add', idnsname=u'test-example.com', + idnssoamname=u'ns.test-example.com', force=True) + except errors.NotFound: + raise nose.SkipTest('DNS is not configured') + try: + records = (u'1 1 E3B72BA346B90570EED94BE9334E34AA795CED23', + u'2 1 FD2693C1EFFC11A8D2BE57229212A04B45663791') + for record in records: + self.run_command('dnsrecord_add', + dnszoneidnsname=u'test-example.com', idnsname=u'ns', + sshfprecord=record) + with self.fake_stdin('no\nyes\nyes\n'): + self.check_command('dnsrecord_del test-example.com ns', + 'dnsrecord_del', + dnszoneidnsname=u'test-example.com', + idnsname=u'ns', + del_all=False, + sshfprecord=records, + structured=False, + version=API_VERSION) + finally: + self.run_command('dnszone_del', idnsname=u'test-example.com') + + def test_dnsrecord_add_ask_for_missing_fields(self): + sshfp_parts = (1, 1, u'E3B72BA346B90570EED94BE9334E34AA795CED23') + + with self.fake_stdin('SSHFP\n%d\n%d\n%s' % sshfp_parts): + self.check_command('dnsrecord-add test-example.com sshfp', + 'dnsrecord_add', + dnszoneidnsname=u'test-example.com', + idnsname=u'sshfp', + sshfp_part_fp_type=sshfp_parts[0], + sshfp_part_algorithm=sshfp_parts[1], + sshfp_part_fingerprint=sshfp_parts[2], + structured=False, + raw=False, + all=False, + force=False, + version=API_VERSION) + + # NOTE: when a DNS record part is passed via command line, it is not + # converted to its base type when transfered via wire + with self.fake_stdin('%d\n%s' % (sshfp_parts[1], sshfp_parts[2])): + self.check_command('dnsrecord-add test-example.com sshfp ' \ + '--sshfp-algorithm=%d' % sshfp_parts[0], + 'dnsrecord_add', + dnszoneidnsname=u'test-example.com', + idnsname=u'sshfp', + sshfp_part_fp_type=sshfp_parts[0], + sshfp_part_algorithm=unicode(sshfp_parts[1]), # passed via cmdline + sshfp_part_fingerprint=sshfp_parts[2], + structured=False, + raw=False, + all=False, + force=False, + version=API_VERSION) + + with self.fake_stdin(sshfp_parts[2]): + self.check_command('dnsrecord-add test-example.com sshfp ' \ + '--sshfp-algorithm=%d --sshfp-fp-type=%d' % (sshfp_parts[0], sshfp_parts[1]), + 'dnsrecord_add', + dnszoneidnsname=u'test-example.com', + idnsname=u'sshfp', + sshfp_part_fp_type=unicode(sshfp_parts[0]), # passed via cmdline + sshfp_part_algorithm=unicode(sshfp_parts[1]), # passed via cmdline + sshfp_part_fingerprint=sshfp_parts[2], + structured=False, + raw=False, + all=False, + force=False, + version=API_VERSION) + + def test_dnsrecord_del_comma(self): + try: + self.run_command( + 'dnszone_add', idnsname=u'test-example.com', + idnssoamname=u'ns.test-example.com', force=True) + except errors.NotFound: + raise nose.SkipTest('DNS is not configured') + try: + self.run_command( + 'dnsrecord_add', + dnszoneidnsname=u'test-example.com', + idnsname=u'test', + txtrecord=u'"A pretty little problem," said Holmes.') + with self.fake_stdin('no\nyes\n'): + self.check_command( + 'dnsrecord_del test-example.com test', + 'dnsrecord_del', + dnszoneidnsname=u'test-example.com', + idnsname=u'test', + del_all=False, + txtrecord=[u'"A pretty little problem," said Holmes.'], + structured=False, + version=API_VERSION) + finally: + self.run_command('dnszone_del', idnsname=u'test-example.com') + + def test_dnszone_add(self): + """ + Test dnszone-add with nameserver IP passed interatively + """ + # Pass IP of nameserver interactively for nameserver in zone + # (absolute name) + with self.fake_stdin('1.1.1.1\n'): + self.check_command( + 'dnszone_add example.com --name-server=ns.example.com. ' + '--admin-email=admin@example.com', + 'dnszone_add', + idnsname=u'example.com', + idnssoamname=u'ns.example.com.', + idnssoarname=u'admin@example.com', + ip_address=u'1.1.1.1', + idnssoaexpire=util.Fuzzy(type=int), + idnssoaserial=util.Fuzzy(type=int), + idnssoaretry=util.Fuzzy(type=int), + idnssoaminimum=util.Fuzzy(type=int), + idnssoarefresh=util.Fuzzy(type=int), + all=False, + raw=False, + force=False, + version=API_VERSION + ) + + # Pass IP of nameserver interactively for nameserver in zone + # (relative name) + with self.fake_stdin('1.1.1.1\n'): + self.check_command( + 'dnszone_add example.com --name-server=ns ' + '--admin-email=admin@example.com', + 'dnszone_add', + idnsname=u'example.com', + idnssoamname=u'ns', + idnssoarname=u'admin@example.com', + ip_address=u'1.1.1.1', + idnssoaexpire=util.Fuzzy(type=int), + idnssoaserial=util.Fuzzy(type=int), + idnssoaretry=util.Fuzzy(type=int), + idnssoaminimum=util.Fuzzy(type=int), + idnssoarefresh=util.Fuzzy(type=int), + all=False, + raw=False, + force=False, + version=API_VERSION + ) + + # Nameserver is outside the zone - no need to pass the IP + self.check_command( + 'dnszone_add example.com --name-server=ns.example.net. ' + '--admin-email=admin@example.com', + 'dnszone_add', + idnsname=u'example.com', + idnssoamname=u'ns.example.net.', + idnssoarname=u'admin@example.com', + idnssoaexpire=util.Fuzzy(type=int), + idnssoaserial=util.Fuzzy(type=int), + idnssoaretry=util.Fuzzy(type=int), + idnssoaminimum=util.Fuzzy(type=int), + idnssoarefresh=util.Fuzzy(type=int), + all=False, + raw=False, + force=False, + version=API_VERSION + ) diff --git a/ipatests/test_cmdline/test_help.py b/ipatests/test_cmdline/test_help.py new file mode 100644 index 000000000..4cf633683 --- /dev/null +++ b/ipatests/test_cmdline/test_help.py @@ -0,0 +1,141 @@ +# Authors: Petr Viktorin <pviktori@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import sys +import contextlib +import StringIO + +from nose.tools import assert_raises # pylint: disable=E0611 + +from ipalib import api, errors +from ipalib.plugins.user import user_add + + +class CLITestContext(object): + """Context manager that replaces stdout & stderr, and catches SystemExit + + Whatever was printed to the streams is available in ``stdout`` and + ``stderr`` attrributes once the with statement finishes. + + When exception is given, asserts that exception is raised. The exception + will be available in the ``exception`` attribute. + """ + def __init__(self, exception=None): + self.exception = exception + + def __enter__(self): + self.old_streams = sys.stdout, sys.stderr + self.stdout_fileobj = sys.stdout = StringIO.StringIO() + self.stderr_fileobj = sys.stderr = StringIO.StringIO() + return self + + def __exit__(self, exc_type, exc_value, traceback): + sys.stdout, sys.stderr = self.old_streams + self.stdout = self.stdout_fileobj.getvalue() + self.stderr = self.stderr_fileobj.getvalue() + self.stdout_fileobj.close() + self.stderr_fileobj.close() + if self.exception: + assert isinstance(exc_value, self.exception), exc_value + self.exception = exc_value + return True + + +def test_ipa_help(): + """Test that `ipa help` only writes to stdout""" + with CLITestContext() as ctx: + return_value = api.Backend.cli.run(['help']) + assert return_value == 0 + assert ctx.stderr == '' + + +def test_ipa_without_arguments(): + """Test that `ipa` errors out, and prints the help to stderr""" + with CLITestContext(exception=SystemExit) as ctx: + api.Backend.cli.run([]) + assert ctx.exception.code == 2 + assert ctx.stdout == '' + assert 'Error: Command not specified' in ctx.stderr + + with CLITestContext() as help_ctx: + api.Backend.cli.run(['help']) + assert help_ctx.stdout in ctx.stderr + + +def test_bare_topic(): + """Test that `ipa user` errors out, and prints the help to stderr + + This is because `user` is a topic, not a command, so `ipa user` doesn't + match our usage string. The help should be accessed using `ipa help user`. + """ + with CLITestContext(exception=errors.CommandError) as ctx: + api.Backend.cli.run(['user']) + assert ctx.exception.name == 'user' + assert ctx.stdout == '' + + with CLITestContext() as help_ctx: + return_value = api.Backend.cli.run(['help', 'user']) + assert return_value == 0 + assert help_ctx.stdout in ctx.stderr + + +def test_command_help(): + """Test that `help user-add` & `user-add -h` are equivalent and contain doc + """ + with CLITestContext() as help_ctx: + return_value = api.Backend.cli.run(['help', 'user-add']) + assert return_value == 0 + assert help_ctx.stderr == '' + + with CLITestContext(exception=SystemExit) as h_ctx: + api.Backend.cli.run(['user-add', '-h']) + assert h_ctx.exception.code == 0 + assert h_ctx.stderr == '' + + assert h_ctx.stdout == help_ctx.stdout + assert unicode(user_add.__doc__) in help_ctx.stdout + + +def test_ambiguous_command_or_topic(): + """Test that `help ping` & `ping -h` are NOT equivalent + + One is a topic, the other is a command + """ + with CLITestContext() as help_ctx: + return_value = api.Backend.cli.run(['help', 'ping']) + assert return_value == 0 + assert help_ctx.stderr == '' + + with CLITestContext(exception=SystemExit) as h_ctx: + api.Backend.cli.run(['ping', '-h']) + assert h_ctx.exception.code == 0 + assert h_ctx.stderr == '' + + assert h_ctx.stdout != help_ctx.stdout + +def test_multiline_description(): + """Test that all of a multi-line command description appears in output + """ + # This assumes trust_add has multiline doc. Ensure it is so. + assert '\n\n' in unicode(api.Command.trust_add.doc).strip() + + with CLITestContext(exception=SystemExit) as help_ctx: + return_value = api.Backend.cli.run(['trust-add', '-h']) + + assert unicode(api.Command.trust_add.doc).strip() in help_ctx.stdout diff --git a/ipatests/test_cmdline/test_ipagetkeytab.py b/ipatests/test_cmdline/test_ipagetkeytab.py new file mode 100644 index 000000000..cb46fd23b --- /dev/null +++ b/ipatests/test_cmdline/test_ipagetkeytab.py @@ -0,0 +1,152 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2010 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test `ipa-getkeytab` +""" + +import os +import shutil +from cmdline import cmdline_test +from ipalib import api +from ipalib import errors +import tempfile +from ipapython import ipautil +import nose +import tempfile +import krbV +from ipaserver.plugins.ldap2 import ldap2 +from ipapython.dn import DN + +def use_keytab(principal, keytab): + try: + tmpdir = tempfile.mkdtemp(prefix = "tmp-") + ccache_file = 'FILE:%s/ccache' % tmpdir + krbcontext = krbV.default_context() + principal = str(principal) + keytab = krbV.Keytab(name=keytab, context=krbcontext) + principal = krbV.Principal(name=principal, context=krbcontext) + os.environ['KRB5CCNAME'] = ccache_file + ccache = krbV.CCache(name=ccache_file, context=krbcontext, primary_principal=principal) + ccache.init(principal) + ccache.init_creds_keytab(keytab=keytab, principal=principal) + conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn) + conn.connect(ccache=ccache) + conn.disconnect() + except krbV.Krb5Error, e: + raise StandardError('Unable to bind to LDAP. Error initializing principal %s in %s: %s' % (principal.name, keytab, str(e))) + finally: + del os.environ['KRB5CCNAME'] + if tmpdir: + shutil.rmtree(tmpdir) + +class test_ipagetkeytab(cmdline_test): + """ + Test `ipa-getkeytab`. + """ + command = "ipa-client/ipa-getkeytab" + host_fqdn = u'ipatest.%s' % api.env.domain + service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm) + [keytabfd, keytabname] = tempfile.mkstemp() + os.close(keytabfd) + + def test_0_setup(self): + """ + Create a host to test against. + """ + # Create the service + try: + api.Command['host_add'](self.host_fqdn, force=True) + except errors.DuplicateEntry: + # it already exists, no problem + pass + + def test_1_run(self): + """ + Create a keytab with `ipa-getkeytab` for a non-existent service. + """ + new_args = [self.command, + "-s", api.env.host, + "-p", "test/notfound.example.com", + "-k", self.keytabname, + ] + (out, err, rc) = ipautil.run(new_args, stdin=None, raiseonerr=False) + assert err == 'Operation failed! PrincipalName not found.\n\n' + + def test_2_run(self): + """ + Create a keytab with `ipa-getkeytab` for an existing service. + """ + # Create the service + try: + api.Command['service_add'](self.service_princ, force=True) + except errors.DuplicateEntry: + # it already exists, no problem + pass + + os.unlink(self.keytabname) + new_args = [self.command, + "-s", api.env.host, + "-p", self.service_princ, + "-k", self.keytabname, + ] + try: + (out, err, rc) = ipautil.run(new_args, None) + expected = 'Keytab successfully retrieved and stored in: %s\n' % ( + self.keytabname) + assert expected in err, 'Success message not in output:\n%s' % err + except ipautil.CalledProcessError, e: + assert (False) + + def test_3_use(self): + """ + Try to use the service keytab. + """ + use_keytab(self.service_princ, self.keytabname) + + def test_4_disable(self): + """ + Disable a kerberos principal + """ + # Verify that it has a principal key + entry = api.Command['service_show'](self.service_princ)['result'] + assert(entry['has_keytab'] == True) + + # Disable it + api.Command['service_disable'](self.service_princ) + + # Verify that it looks disabled + entry = api.Command['service_show'](self.service_princ)['result'] + assert(entry['has_keytab'] == False) + + def test_5_use_disabled(self): + """ + Try to use the disabled keytab + """ + try: + use_keytab(self.service_princ, self.keytabname) + except StandardError, errmsg: + assert('Unable to bind to LDAP. Error initializing principal' in str(errmsg)) + + def test_9_cleanup(self): + """ + Clean up test data + """ + # First create the host that will use this policy + os.unlink(self.keytabname) + api.Command['host_del'](self.host_fqdn) |