diff options
author | Petr Viktorin <pviktori@redhat.com> | 2014-12-18 10:22:24 +0100 |
---|---|---|
committer | Tomas Babej <tbabej@redhat.com> | 2015-01-14 11:40:28 +0100 |
commit | 07545569ecbdfdb1aeef6aa1878d827a7e5ddb38 (patch) | |
tree | ad799f3645052dc18d73f5ae622bb31f7989fe29 /ipatests/test_xmlrpc | |
parent | 8add23d4015be79699563829ba26a52c7d107ab9 (diff) | |
download | freeipa-07545569ecbdfdb1aeef6aa1878d827a7e5ddb38.tar.gz freeipa-07545569ecbdfdb1aeef6aa1878d827a7e5ddb38.tar.xz freeipa-07545569ecbdfdb1aeef6aa1878d827a7e5ddb38.zip |
test_host_plugin: Use HostTracker fixtures
The racker object "remembers" expected state across several tests,
so only changes (rather than all expected state) need to be specified
in each test. Also, the tracker fixture will make it easy to use hosts
in other test modules.
This change makes the tests independent; any permutation of any subset
of these tests should now pass.
Reviewed-By: Tomas Babej <tbabej@redhat.com>
Diffstat (limited to 'ipatests/test_xmlrpc')
-rw-r--r-- | ipatests/test_xmlrpc/conftest.py | 40 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/test_host_plugin.py | 1956 | ||||
-rw-r--r-- | ipatests/test_xmlrpc/xmlrpc_test.py | 29 |
3 files changed, 803 insertions, 1222 deletions
diff --git a/ipatests/test_xmlrpc/conftest.py b/ipatests/test_xmlrpc/conftest.py deleted file mode 100644 index 6f8552d28..000000000 --- a/ipatests/test_xmlrpc/conftest.py +++ /dev/null @@ -1,40 +0,0 @@ -# Authors: -# Petr Viktorin <pviktori@redhat.com> -# -# Copyright (C) 2008 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -""" -Per-directory test configuration -""" - -import pytest - -from ipapython.version import API_VERSION -from ipalib import api - - -@pytest.fixture -def command(request): - try: - default_version = request.cls.default_version - except AttributeError: - default_version = API_VERSION - - def cmd(cmd, *args, **options): - options.setdefault('version', default_version) - return api.Command[cmd](*args, **options) - return cmd diff --git a/ipatests/test_xmlrpc/test_host_plugin.py b/ipatests/test_xmlrpc/test_host_plugin.py index c0b406806..2b256772e 100644 --- a/ipatests/test_xmlrpc/test_host_plugin.py +++ b/ipatests/test_xmlrpc/test_host_plugin.py @@ -1,6 +1,7 @@ # Authors: # Rob Crittenden <rcritten@redhat.com> # Pavel Zuna <pzuna@redhat.com> +# Petr Viktorin <pviktori@redhat.com> # # Copyright (C) 2008, 2009 Red Hat # see file 'COPYING' for use and warranty information @@ -25,66 +26,35 @@ Test the `ipalib.plugins.host` module. import os import tempfile import base64 +import functools import pytest -from pytest_sourceorder import ordered from ipapython import ipautil from ipalib import api, errors, x509 -from ipalib.util import normalize_zone from ipapython.dn import DN from ipapython.dnsutil import DNSName -from ipatests.test_xmlrpc.xmlrpc_test import (RPCTest, XMLRPC_test, +from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test, fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer, fuzzy_hex, raises_exact) from ipatests.test_xmlrpc.test_user_plugin import get_group_dn from ipatests.test_xmlrpc import objectclasses from ipatests.test_xmlrpc.testcert import get_testcert from ipatests.util import assert_deepequal +from ipapython.version import API_VERSION -self_server_ns = normalize_zone(api.env.host) -self_server_ns_dnsname = DNSName(self_server_ns) - -fqdn1 = u'testhost1.%s' % api.env.domain -short1 = u'testhost1' -dn1 = DN(('fqdn',fqdn1),('cn','computers'),('cn','accounts'), - api.env.basedn) -service1 = u'dns/%s@%s' % (fqdn1, api.env.realm) -service1dn = DN(('krbprincipalname',service1.lower()),('cn','services'), - ('cn','accounts'),api.env.basedn) -fqdn2 = u'shouldnotexist.%s' % api.env.domain -dn2 = DN(('fqdn',fqdn2),('cn','computers'),('cn','accounts'), - api.env.basedn) -fqdn3 = u'testhost2.%s' % api.env.domain -short3 = u'testhost2' -dn3 = DN(('fqdn',fqdn3),('cn','computers'),('cn','accounts'), - api.env.basedn) -fqdn4 = u'testhost2.lab.%s' % api.env.domain -dn4 = DN(('fqdn',fqdn4),('cn','computers'),('cn','accounts'), - api.env.basedn) -invalidfqdn1 = u'foo_bar.lab.%s' % api.env.domain - -# DNS integration tests -dnszone = u'zone-ipv6host.test' +# Constants DNS integration tests +# TODO: Use tracker fixtures for zones/records/users/groups +dnszone = u'test-zone.test' dnszone_absolute = dnszone + '.' -dnszone_dnsname = DNSName(dnszone_absolute) dnszone_dn = DN(('idnsname', dnszone_absolute), api.env.container_dns, api.env.basedn) -dnszone_ns = u'ns1.%s' % dnszone_absolute -dnszone_ns_dnsname = DNSName(dnszone_ns) dnszone_rname = u'root.%s' % dnszone_absolute dnszone_rname_dnsname = DNSName(dnszone_rname) -dnszone_ip = u'172.16.29.1' revzone = u'29.16.172.in-addr.arpa.' -revzone_dnsname = DNSName(revzone) -revzone_ip = u'172.16.29.0' -revzone_ipprefix = u'172.16.29.' revzone_dn = DN(('idnsname', revzone), api.env.container_dns, api.env.basedn) revipv6zone = u'0.0.0.0.1.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.' -revipv6zone_dnsname = DNSName(revipv6zone) -revipv6zone_ip = u'2001:db8:1::' -revipv6zone_ipprefix = u'2001:db8:1:' revipv6zone_dn = DN(('idnsname', revipv6zone), api.env.container_dns, api.env.basedn) arec = u'172.16.29.22' @@ -93,50 +63,22 @@ aaaarec = u'2001:db8:1::beef' arec2 = u'172.16.29.33' aaaarec2 = u'2001:db8:1::dead' -ipv6only = u'testipv6onlyhost' -ipv6only_dnsname = DNSName(ipv6only) -ipv6only_dn = DN(('idnsname', ipv6only), dnszone_dn) -ipv6only_host_fqdn = u'%s.%s' % (ipv6only, dnszone) -ipv6only_host_dn = DN(('fqdn',ipv6only_host_fqdn),('cn','computers'),('cn','accounts'), - api.env.basedn) - -ipv4only = u'testipv4onlyhost' -ipv4only_dnsname = DNSName(ipv4only) -ipv4only_dn = DN(('idnsname', ipv4only), dnszone_dn) -ipv4only_host_fqdn = u'%s.%s' % (ipv4only, dnszone) -ipv4only_host_dn = DN(('fqdn',ipv4only_host_fqdn),('cn','computers'),('cn','accounts'), - api.env.basedn) - -ipv46both = u'testipv4and6host' -ipv46both_dnsname = DNSName(ipv46both) -ipv46both_dn = DN(('idnsname', ipv46both), dnszone_dn) -ipv46both_host_fqdn = u'%s.%s' % (ipv46both, dnszone) -ipv46both_host_dn = DN(('fqdn',ipv46both_host_fqdn),('cn','computers'),('cn','accounts'), - api.env.basedn) - -ipv4_fromip = u'withipv4addr' +ipv4_fromip = u'testipv4fromip' ipv4_fromip_ip = u'172.16.29.40' ipv4_fromip_arec = ipv4_fromip_ip ipv4_fromip_dnsname = DNSName(ipv4_fromip) ipv4_fromip_dn = DN(('idnsname', ipv4_fromip), dnszone_dn) ipv4_fromip_host_fqdn = u'%s.%s' % (ipv4_fromip, dnszone) -ipv4_fromip_host_dn = DN(('fqdn',ipv4_fromip_host_fqdn),('cn','computers'),('cn','accounts'), - api.env.basedn) ipv4_fromip_ptr = u'40' -ipv4_fromip_ptrrec = ipv4_fromip_host_fqdn + '.' ipv4_fromip_ptr_dnsname = DNSName(ipv4_fromip_ptr) ipv4_fromip_ptr_dn = DN(('idnsname', ipv4_fromip_ptr), revzone_dn) -ipv6_fromip = u'withipv6addr' +ipv6_fromip = u'testipv6fromip' ipv6_fromip_ipv6 = u'2001:db8:1::9' ipv6_fromip_aaaarec = ipv6_fromip_ipv6 ipv6_fromip_dnsname = DNSName(ipv6_fromip) ipv6_fromip_dn = DN(('idnsname', ipv6_fromip), dnszone_dn) -ipv6_fromip_host_fqdn = u'%s.%s' % (ipv6_fromip, dnszone) -ipv6_fromip_host_dn = DN(('fqdn',ipv6_fromip_host_fqdn),('cn','computers'),('cn','accounts'), - api.env.basedn) ipv6_fromip_ptr = u'9.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0' -ipv6_fromip_ptrrec = ipv6_fromip_host_fqdn + '.' ipv6_fromip_ptr_dnsname = DNSName(ipv6_fromip_ptr) ipv6_fromip_ptr_dn = DN(('idnsname', ipv6_fromip_ptr), revipv6zone_dn) @@ -154,542 +96,610 @@ hostgroup1_dn = DN(('cn',hostgroup1),('cn','hostgroups'),('cn','accounts'), api.env.basedn) -class TestNonexistentHost(RPCTest): - @classmethod - def clean_up(cls): - cls.clean('host_del', fqdn1) +class HostTracker(object): + """Wraps and tracks modifications to a Host object + + Stores a copy of state of a Host object, and allows checking that + the state in the database is the same as expected. + This allows creating independent tests: the individual tests check + that the relevant changes have been made. At the same time + the Host doesn't heet to be recreated and cleaned up for each test. + + Two attributes are used for tracking: ``exists`` (true if the Host is + supposed to exist) and ``attrs`` (a dict of LDAP attributes that are + expected to be returned from IPA commands). + + For commonly used operations, there is a helper method, e.g. + ``create``, ``update``, or ``find``, that does these steps: + + * ensure the Host exists (or does not exist, for "create") + * store the expected modifications + * get the IPA command to run, and run it + * check that the result matches the expected state + + Tests that require customization of these steps are expected to do them + manually, using lower-level methods. + Especially the first step (ensure the Host exists) is important for + achieving independent tests. + + The HostTracker object also stores information about the host, e.g. + ``fqdn`` and ``dn``. + """ + retrieve_keys = { + 'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host', + 'has_keytab', 'has_password', 'issuer', 'md5_fingerprint', + 'serial_number', 'serial_number_hex', 'sha1_fingerprint', + 'subject', 'usercertificate', 'valid_not_after', 'valid_not_before', + 'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user', + 'ipaallowedtoperform_read_keys_group', + 'ipaallowedtoperform_read_keys_host', + 'ipaallowedtoperform_read_keys_hostgroup', + 'ipaallowedtoperform_write_keys_user', + 'ipaallowedtoperform_write_keys_group', + 'ipaallowedtoperform_write_keys_host', + 'ipaallowedtoperform_write_keys_hostgroup'} + retrieve_all_keys = retrieve_keys | { + u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid', + u'managing_host', u'objectclass', u'serverhostname'} + create_keys = retrieve_keys | {'objectclass', 'ipauniqueid', + 'randompassword'} + update_keys = retrieve_keys - {'dn'} + managedby_keys = retrieve_keys - {'has_keytab', 'has_password'} + allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'} + + def __init__(self, name, fqdn=None, default_version=None): + self.api = api + self.default_version = default_version or API_VERSION + + self.shortname = name + if fqdn: + self.fqdn = fqdn + else: + self.fqdn = u'%s.%s' % (name, self.api.env.domain) + self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts', + self.api.env.basedn) + + self.description = u'Test host <%s>' % name + self.location = u'Undisclosed location <%s>' % name + + self.exists = False + + def filter_attrs(self, keys): + """Return a dict of expected attrs, filtered by the given keys""" + return {k: v for k, v in self.attrs.items() if k in keys} + + def run_command(self, name, *args, **options): + """Run the given IPA command + + Logs the command using print for easier debugging + """ + cmd = self.api.Command[name] - def test_retrieve_nonexistent(self, command): - with raises_exact(errors.NotFound( - reason=u'%s: host not found' % fqdn1)): - result = command('host_show', fqdn1) + options.setdefault('version', self.default_version) - def test_update_nonexistent(self, command): - with raises_exact(errors.NotFound( - reason=u'%s: host not found' % fqdn1)): - result = command('host_mod', fqdn1, description=u'Nope') + args_repr = ', '.join( + [repr(a) for a in args] + + ['%s=%r' % item for item in options.items()]) + try: + result = cmd(*args, **options) + except Exception as e: + print 'Ran command: %s(%s): %s: %s' % (cmd, args_repr, + type(e).__name__, e) + raise + else: + print 'Ran command: %s(%s): OK' % (cmd, args_repr) + return result + + def make_command(self, name, *args, **options): + """Make a functools.partial function to run the given command""" + return functools.partial(self.run_command, name, *args, **options) + + def make_fixture(self, request): + """Make a pytest fixture for this tracker + + The fixture ensures the host does not exist before and after the tests + that use it. + """ + del_command = self.make_delete_command() + try: + del_command() + except errors.NotFound: + pass - def test_delete_nonexistent(self, command): - with raises_exact(errors.NotFound( - reason=u'%s: host not found' % fqdn1)): - result = command('host_del', fqdn1) + def cleanup(): + existed = self.exists + try: + del_command() + except errors.NotFound: + if existed: + raise + self.exists = False + request.addfinalizer(cleanup) -@ordered -class TestHost(RPCTest): - @classmethod - def clean_up(cls): - cls.clean('host_del', fqdn1) + return self - def test_create_host(self, command): - result = command('host_add', fqdn1, - description=u'Test host 1', - l=u'Undisclosed location 1', - force=True) + def ensure_exists(self): + """If the host does not exist (according to tracker state), create it + """ + if not self.exists: + self.create(force=True) + + def ensure_missing(self): + """If the host exists (according to tracker state), delete it + """ + if self.exists: + self.delete() + + def make_create_command(self, force=True): + """Make function that creates this host using host_add""" + return self.make_command('host_add', self.fqdn, + description=self.description, + l=self.location, + force=force) + + def make_delete_command(self): + """Make function that deletes the host using host_del""" + return self.make_command('host_del', self.fqdn) + + def make_retrieve_command(self, all=False, raw=False): + """Make function that retrieves the host using host_show""" + return self.make_command('host_show', self.fqdn, all=all, raw=raw) + + def make_find_command(self, *args, **kwargs): + """Make function that finds hosts using host_find + + Note that the fqdn (or other search terms) needs to be specified + in arguments. + """ + return self.make_command('host_find', *args, **kwargs) + + def make_update_command(self, updates): + """Make function that modifies the host using host_mod""" + return self.make_command('host_mod', self.fqdn, **updates) + + def create(self, force=True): + """Helper function to create a host and check the result""" + self.ensure_missing() + self.track_create() + command = self.make_create_command(force=force) + result = command() + self.check_create(result) + + def track_create(self): + """Update expected state for host creation""" + self.attrs = dict( + dn=self.dn, + fqdn=[self.fqdn], + description=[self.description], + l=[self.location], + krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)], + objectclass=objectclasses.host, + ipauniqueid=[fuzzy_uuid], + managedby_host=[self.fqdn], + has_keytab=False, + has_password=False, + cn=[self.fqdn], + ipakrbokasdelegate=False, + ipakrbrequirespreauth=True, + managing_host=[self.fqdn], + serverhostname=[self.shortname], + ) + self.exists = True + + def check_create(self, result): + """Check `host_add` command result""" assert_deepequal(dict( - value=fqdn1, - summary=u'Added host "%s"' % fqdn1, - result=dict( - dn=dn1, - fqdn=[fqdn1], - description=[u'Test host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[fqdn1], - has_keytab=False, - has_password=False, - ), + value=self.fqdn, + summary=u'Added host "%s"' % self.fqdn, + result=self.filter_attrs(self.create_keys), ), result) - def test_create_duplicate(self, command): - with raises_exact(errors.DuplicateEntry( - message=u'host with name "%s" already exists' % fqdn1)): - result = command('host_add', fqdn1, - description=u'Test host 1', - l=u'Undisclosed location 1', - force=True) - - def test_retrieve(self, command): - result = command('host_show', fqdn1) + def delete(self): + """Helper function to delete a host and check the result""" + self.ensure_exists() + self.track_delete() + command = self.make_delete_command() + result = command() + self.check_delete(result) + + def track_delete(self): + """Update expected state for host deletion""" + self.exists = False + self.attrs = {} + + def check_delete(self, result): + """Check `host_del` command result""" assert_deepequal(dict( - value=fqdn1, - summary=None, - result=dict( - dn=dn1, - fqdn=[fqdn1], - description=[u'Test host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - has_keytab=False, - has_password=False, - managedby_host=[fqdn1], - ), + value=[self.fqdn], + summary=u'Deleted host "%s"' % self.fqdn, + result=dict(failed=[]), ), result) - def test_retrieve_all(self, command): - result = command('host_show', fqdn1, all=True) + def retrieve(self, all=False, raw=False): + """Helper function to retrieve a host and check the result""" + self.ensure_exists() + command = self.make_retrieve_command(all=all, raw=raw) + result = command() + self.check_retrieve(result, all=all, raw=raw) + + def check_retrieve(self, result, all=False, raw=False): + """Check `host_show` command result""" + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) assert_deepequal(dict( - value=fqdn1, + value=self.fqdn, summary=None, - result=dict( - dn=dn1, - cn=[fqdn1], - fqdn=[fqdn1], - description=[u'Test host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - serverhostname=[u'testhost1'], - objectclass=objectclasses.host, - managedby_host=[fqdn1], - managing_host=[fqdn1], - ipauniqueid=[fuzzy_uuid], - has_keytab=False, - has_password=False, - ipakrbokasdelegate=False, - ipakrbrequirespreauth=True, - ), + result=expected, ), result) - def test_search(self, command): - result = command('host_find', fqdn1) + def find(self, all=False, raw=False): + """Helper function to search for this hosts and check the result""" + self.ensure_exists() + command = self.make_find_command(self.fqdn, all=all, raw=raw) + result = command() + self.check_find(result, all=all, raw=raw) + + def check_find(self, result, all=False, raw=False): + """Check `host_find` command result""" + if all: + expected = self.filter_attrs(self.retrieve_all_keys) + else: + expected = self.filter_attrs(self.retrieve_keys) assert_deepequal(dict( count=1, truncated=False, summary=u'1 host matched', - result=[ - dict( - dn=dn1, - fqdn=[fqdn1], - description=[u'Test host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[u'%s' % fqdn1], - has_keytab=False, - has_password=False, - ), - ], + result=[expected], ), result) - def test_search_all(self, command): - result = command('host_find', fqdn1, all=True) - assert_deepequal(dict( - count=1, - truncated=False, - summary=u'1 host matched', - result=[ - dict( - dn=dn1, - cn=[fqdn1], - fqdn=[fqdn1], - description=[u'Test host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - serverhostname=[u'testhost1'], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[u'%s' % fqdn1], - managing_host=[u'%s' % fqdn1], - has_keytab=False, - has_password=False, - ipakrbokasdelegate=False, - ipakrbrequirespreauth=True, - ), - ] - ), result) + def update(self, updates, expected_updates=None): + """Helper function to update this hosts and check the result - def test_update(self, command): - result = command('host_mod', fqdn1, - description=u'Updated host 1', - usercertificate=get_testcert()) + The ``updates`` are used as options to the *_mod command, + and the self.attrs is updated with this dict. + Additionally, self.attrs is updated with ``expected_updates``. + """ + if expected_updates is None: + expected_updates = {} + + self.ensure_exists() + command = self.make_update_command(updates) + result = command() + self.attrs.update(updates) + self.attrs.update(expected_updates) + self.check_update(result, extra_keys=set(updates.keys()) | + set(expected_updates.keys())) + + def check_update(self, result, extra_keys=()): + """Check `host_update` command result""" assert_deepequal(dict( - value=fqdn1, - summary=u'Modified host "%s"' % fqdn1, - result=dict( - description=[u'Updated host 1'], - fqdn=[fqdn1], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[u'%s' % fqdn1], - usercertificate=[base64.b64decode(get_testcert())], - valid_not_before=fuzzy_date, - valid_not_after=fuzzy_date, - subject=DN(('CN', api.env.host), x509.subject_base()), - serial_number=fuzzy_digits, - serial_number_hex=fuzzy_hex, - md5_fingerprint=fuzzy_hash, - sha1_fingerprint=fuzzy_hash, - issuer=fuzzy_issuer, - has_keytab=False, - has_password=False, - ), + value=self.fqdn, + summary=u'Modified host "%s"' % self.fqdn, + result=self.filter_attrs(self.update_keys | set(extra_keys)) ), result) - def test_retrieve_2(self, command): - result = command('host_show', fqdn1) - assert_deepequal(dict( - value=fqdn1, - summary=None, - result=dict( - dn=dn1, - fqdn=[fqdn1], - description=[u'Updated host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - has_keytab=False, - has_password=False, - managedby_host=[u'%s' % fqdn1], - usercertificate=[base64.b64decode(get_testcert())], - valid_not_before=fuzzy_date, - valid_not_after=fuzzy_date, - subject=DN(('CN', api.env.host), x509.subject_base()), - serial_number=fuzzy_digits, - serial_number_hex=fuzzy_hex, - md5_fingerprint=fuzzy_hash, - sha1_fingerprint=fuzzy_hash, - issuer=fuzzy_issuer, - ), - ), result) - def test_try_rename(self, command): +@pytest.fixture(scope='class') +def host(request): + tracker = HostTracker(name=u'testhost1') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def host2(request): + tracker = HostTracker(name=u'testhost2') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def host3(request): + tracker = HostTracker(name=u'testhost3') + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def lab_host(request): + name = u'testhost1' + tracker = HostTracker(name=name, + fqdn=u'%s.lab.%s' % (name, api.env.domain)) + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def this_host(request): + """Fixture for the current master""" + tracker = HostTracker(name=api.env.host.partition('.')[0], + fqdn=api.env.host) + # This host is not created/deleted, so don't call make_fixture + tracker.exists = True + return tracker + + +@pytest.fixture(scope='class') +def invalid_host(request): + tracker = HostTracker(name='foo_bar',) + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def ipv6only_host(request): + name = u'testipv6onlyhost' + tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def ipv4only_host(request): + name = u'testipv4onlyhost' + tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def ipv46both_host(request): + name = u'testipv4and6host' + tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def ipv4_fromip_host(request): + name = u'testipv4fromip' + tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) + return tracker.make_fixture(request) + + +@pytest.fixture(scope='class') +def ipv6_fromip_host(request): + name = u'testipv6fromip' + tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) + return tracker.make_fixture(request) + + +class TestNonexistentHost(XMLRPC_test): + def test_retrieve_nonexistent(self, host): + host.ensure_missing() + command = host.make_retrieve_command() + with raises_exact(errors.NotFound( + reason=u'%s: host not found' % host.fqdn)): + command() + + def test_update_nonexistent(self, host): + host.ensure_missing() + command = host.make_update_command(updates=dict(description=u'Nope')) + with raises_exact(errors.NotFound( + reason=u'%s: host not found' % host.fqdn)): + command() + + def test_delete_nonexistent(self, host): + host.ensure_missing() + command = host.make_delete_command() + with raises_exact(errors.NotFound( + reason=u'%s: host not found' % host.fqdn)): + command() + + +class TestCRUD(XMLRPC_test): + def test_create_duplicate(self, host): + host.ensure_exists() + command = host.make_create_command(force=True) + with raises_exact(errors.DuplicateEntry( + message=u'host with name "%s" already exists' % host.fqdn)): + command() + + def test_retrieve_simple(self, host): + host.retrieve() + + def test_retrieve_all(self, host): + host.retrieve(all=True) + + def test_search_simple(self, host): + host.find() + + def test_search_all(self, host): + host.find(all=True) + + def test_update_simple(self, host): + host.update(dict( + description=u'Updated host 1', + usercertificate=get_testcert()), + expected_updates=dict( + description=[u'Updated host 1'], + usercertificate=[base64.b64decode(get_testcert())], + issuer=fuzzy_issuer, + md5_fingerprint=fuzzy_hash, + serial_number=fuzzy_digits, + serial_number_hex=fuzzy_hex, + sha1_fingerprint=fuzzy_hash, + subject=DN(('CN', api.env.host), x509.subject_base()), + valid_not_before=fuzzy_date, + valid_not_after=fuzzy_date, + )) + host.retrieve() + + def test_try_rename(self, host): + host.ensure_exists() + command = host.make_update_command( + updates=dict(setattr=u'fqdn=changed.example.com')) with raises_exact(errors.NotAllowedOnRDN()): - result = command('host_mod', fqdn1, - setattr=u'fqdn=changed.example.com') + command() - def test_add_mac_address(self, command): - result = command('host_mod', fqdn1, macaddress=u'00:50:56:30:F6:5F') - assert_deepequal(dict( - value=fqdn1, - summary=u'Modified host "%s"' % fqdn1, - result=dict( - description=[u'Updated host 1'], - fqdn=[fqdn1], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[u'%s' % fqdn1], - usercertificate=[base64.b64decode(get_testcert())], - valid_not_before=fuzzy_date, - valid_not_after=fuzzy_date, - subject=DN(('CN', api.env.host), x509.subject_base()), - serial_number=fuzzy_digits, - serial_number_hex=fuzzy_hex, - md5_fingerprint=fuzzy_hash, - sha1_fingerprint=fuzzy_hash, - macaddress=[u'00:50:56:30:F6:5F'], - issuer=fuzzy_issuer, - has_keytab=False, - has_password=False, - ), - ), result) + def test_add_mac_address(self, host): + host.update(dict(macaddress=u'00:50:56:30:F6:5F'), + expected_updates=dict(macaddress=[u'00:50:56:30:F6:5F'])) + host.retrieve() - def test_add_mac_addresses(self, command): - result = command('host_mod', fqdn1, - macaddress=[u'00:50:56:30:F6:5F', - u'00:50:56:2C:8D:82']) - assert_deepequal(dict( - value=fqdn1, - summary=u'Modified host "%s"' % fqdn1, - result=dict( - description=[u'Updated host 1'], - fqdn=[fqdn1], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[u'%s' % fqdn1], - usercertificate=[base64.b64decode(get_testcert())], - valid_not_before=fuzzy_date, - valid_not_after=fuzzy_date, - subject=DN(('CN', api.env.host), x509.subject_base()), - serial_number=fuzzy_digits, - serial_number_hex=fuzzy_hex, - md5_fingerprint=fuzzy_hash, - sha1_fingerprint=fuzzy_hash, - macaddress=[u'00:50:56:30:F6:5F', u'00:50:56:2C:8D:82'], - issuer=fuzzy_issuer, - has_keytab=False, - has_password=False, - ), - ), result) + def test_add_mac_addresses(self, host): + host.update(dict(macaddress=[u'00:50:56:30:F6:5F', + u'00:50:56:2C:8D:82'])) + host.retrieve() - def test_try_illegal_mac(self, command): + def test_try_illegal_mac(self, host): + command = host.make_update_command( + updates=dict(macaddress=[u'xx'])) with raises_exact(errors.ValidationError( name='macaddress', error=u'Must be of the form HH:HH:HH:HH:HH:HH, where ' + u'each H is a hexadecimal character.')): - result = command('host_mod', fqdn1, macaddress=[u'xx']) - - def test_add_ssh_pubkey(self, command): - result = command('host_mod', fqdn1, - ipasshpubkey=[sshpubkey]) - assert_deepequal(dict( - value=fqdn1, - summary=u'Modified host "%s"' % fqdn1, - result=dict( - description=[u'Updated host 1'], - fqdn=[fqdn1], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[u'%s' % fqdn1], - usercertificate=[base64.b64decode(get_testcert())], - valid_not_before=fuzzy_date, - valid_not_after=fuzzy_date, - subject=DN(('CN', api.env.host), x509.subject_base()), - serial_number=fuzzy_digits, - serial_number_hex=fuzzy_hex, - md5_fingerprint=fuzzy_hash, - sha1_fingerprint=fuzzy_hash, - issuer=fuzzy_issuer, - macaddress=[u'00:50:56:30:F6:5F', u'00:50:56:2C:8D:82'], - ipasshpubkey=[sshpubkey], - sshpubkeyfp=[sshpubkeyfp], - has_keytab=False, - has_password=False, - ), - ), result) - - def test_try_illegal_ssh_pubkey(self, command): + command() + + def test_add_ssh_pubkey(self, host): + host.update(dict(ipasshpubkey=[sshpubkey]), + expected_updates=dict( + ipasshpubkey=[sshpubkey], + sshpubkeyfp=[sshpubkeyfp], + )) + host.retrieve() + + def test_try_illegal_ssh_pubkey(self, host): + host.ensure_exists() + command = host.make_update_command( + updates=dict(ipasshpubkey=[u'no-pty %s' % sshpubkey])) with raises_exact(errors.ValidationError( name='sshpubkey', error=u'options are not allowed')): - result = command('host_mod', fqdn1, - ipasshpubkey=[u'no-pty %s' % sshpubkey]) + command() - def test_delete_host(self, command): - result = command('host_del', fqdn1) - assert_deepequal(dict( - value=[fqdn1], - summary=u'Deleted host "%s"' % fqdn1, - result=dict(failed=[]), - ), result) + def test_delete_host(self, host): + host.delete() - def test_retrieve_nonexistent_2(self, command): + def test_retrieve_nonexistent(self, host): + host.ensure_missing() + command = host.make_retrieve_command() with raises_exact(errors.NotFound( - reason=u'%s: host not found' % fqdn1)): - result = command('host_show', fqdn1) + reason=u'%s: host not found' % host.fqdn)): + command() - def test_update_nonexistent_2(self, command): + def test_update_nonexistent(self, host): + host.ensure_missing() + command = host.make_update_command( + updates=dict(description=u'Nope')) with raises_exact(errors.NotFound( - reason=u'%s: host not found' % fqdn1)): - result = command('host_mod', fqdn1, description=u'Nope') + reason=u'%s: host not found' % host.fqdn)): + command() - def test_delete_nonexistent_2(self, command): + def test_delete_nonexistent(self, host): + host.ensure_missing() + command = host.make_delete_command() with raises_exact(errors.NotFound( - reason=u'%s: host not found' % fqdn1)): - result = command('host_del', fqdn1) + reason=u'%s: host not found' % host.fqdn)): + command() + def test_try_add_not_in_dns(self, host): + host.ensure_missing() + command = host.make_create_command(force=False) + with raises_exact(errors.DNSNotARecordError( + reason=u'Host does not have corresponding DNS A/AAAA record')): + command() -@ordered -class TestHostWithService(RPCTest): - @classmethod - def clean_up(cls): - cls.clean('host_del', fqdn1) - cls.clean('service_del', service1) + def test_add_host_with_null_password(self, host): + host.ensure_missing() + command = host.make_create_command() + result = command(userpassword=None) + host.track_create() + host.check_create(result) - # Test deletion using a non-fully-qualified hostname. Services - # associated with this host should also be removed. - def test_recreate_host(self, command): - result = command('host_add', fqdn1, - description=u'Test host 1', - l=u'Undisclosed location 1', - force=True) - assert_deepequal(dict( - value=fqdn1, - summary=u'Added host "%s"' % fqdn1, - result=dict( - dn=dn1, - fqdn=[fqdn1], - description=[u'Test host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[fqdn1], - has_keytab=False, - has_password=False, - ), - ), result) +class TestMultipleMatches(XMLRPC_test): + def test_try_show_multiple_matches_with_shortname(self, host, lab_host): + host.ensure_exists() + lab_host.ensure_exists() + assert host.shortname == lab_host.shortname + command = host.make_command('host_show', host.shortname) + with pytest.raises(errors.SingleMatchExpected): + command() - def test_add_service_to_host(self, command): - result = command('service_add', service1, force=True) - assert_deepequal(dict( - value=service1, - summary=u'Added service "%s"' % service1, - result=dict( - dn=service1dn, - krbprincipalname=[service1], - objectclass=objectclasses.service, - managedby_host=[fqdn1], - ipauniqueid=[fuzzy_uuid], - ), - ), result) - def test_delete_using_hostname(self, command): - result = command('host_del', short1) - assert_deepequal(dict( - value=[short1], - summary=u'Deleted host "%s"' % short1, - result=dict(failed=[]), - ), result) +class TestHostWithService(XMLRPC_test): + """Test deletion using a non-fully-qualified hostname. + Services associated with this host should also be removed. + """ - def test_try_find_services(self, command): - result = command('service_find', fqdn1) - assert_deepequal(dict( - count=0, - truncated=False, - summary=u'0 services matched', - result=[], - ), result) + # TODO: Use a service tracker, when available + def test_host_with_service(self, host): + host.ensure_exists() -@ordered -class TestAddHostNotInDNS(RPCTest): - @classmethod - def clean_up(cls): - cls.clean('host_del', fqdn2) + service1 = u'dns/%s@%s' % (host.fqdn, host.api.env.realm) + service1dn = DN(('krbprincipalname', service1.lower()), + ('cn','services'), ('cn','accounts'), + host.api.env.basedn) - def test_try_add_not_in_dns(self, command): - with raises_exact(errors.DNSNotARecordError( - reason=u'Host does not have corresponding DNS A/AAAA record')): - result = command('host_add', fqdn2) - - def test_add_not_in_dns(self, command): - result = command('host_add', fqdn2, - description=u'Test host 2', - l=u'Undisclosed location 2', - userclass=[u'webserver', u'mailserver'], - force=True) - assert_deepequal(dict( - value=fqdn2, - summary=u'Added host "%s"' % fqdn2, + try: + result = host.run_command('service_add', service1, force=True) + assert_deepequal(dict( + value=service1, + summary=u'Added service "%s"' % service1, result=dict( - dn=dn2, - fqdn=[fqdn2], - description=[u'Test host 2'], - l=[u'Undisclosed location 2'], - krbprincipalname=[u'host/%s@%s' % (fqdn2, api.env.realm)], - objectclass=objectclasses.host, + dn=service1dn, + krbprincipalname=[service1], + objectclass=objectclasses.service, + managedby_host=[host.fqdn], ipauniqueid=[fuzzy_uuid], - managedby_host=[fqdn2], - userclass=[u'webserver', u'mailserver'], - has_keytab=False, - has_password=False, ), - ), result) + ), result) + host.delete() -@ordered -class TestManagedHosts(RPCTest): - @classmethod - def clean_up(cls): - cls.clean('host_del', fqdn1, fqdn2, fqdn3, fqdn4, **{'continue': True}) - cls.clean('service_del', service1) + result = host.run_command('service_find', host.fqdn) + assert_deepequal(dict( + count=0, + truncated=False, + summary=u'0 services matched', + result=[], + ), result) + finally: + try: + host.run_command('service_del', service1) + except errors.NotFound: + pass - def test_create_host(self, command): - result = command('host_add', fqdn1, - description=u'Test host 1', - l=u'Undisclosed location 1', - force=True) - assert_deepequal(dict( - value=fqdn1, - summary=u'Added host "%s"' % fqdn1, - result=dict( - dn=dn1, - fqdn=[fqdn1], - description=[u'Test host 1'], - l=[u'Undisclosed location 1'], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[fqdn1], - has_keytab=False, - has_password=False, - ), - ), result) - @pytest.mark.parametrize(['fqdn', 'dn', 'n'], - [(fqdn3, dn3, 2), (fqdn4, dn4, 4)]) - def test_create_more(self, command, fqdn, dn, n): - result = command('host_add', fqdn, - description=u'Test host %s' % n, - l=u'Undisclosed location %s' % n, - force=True) - assert_deepequal(dict( - value=fqdn, - summary=u'Added host "%s"' % fqdn, - result=dict( - dn=dn, - fqdn=[fqdn], - description=[u'Test host %s' % n], - l=[u'Undisclosed location %s' % n], - krbprincipalname=[u'host/%s@%s' % (fqdn, api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[u'%s' % fqdn], - has_keytab=False, - has_password=False, - ), - ), result) +class TestManagedHosts(XMLRPC_test): + def test_managed_hosts(self, host, host2, host3): + host.ensure_exists() + host2.ensure_exists() + host3.ensure_exists() - def test_add_managed_host(self, command): - result = command('host_add_managedby', fqdn3, host=fqdn1) - assert_deepequal(dict( - completed=1, - failed=dict( - managedby=dict( - host=tuple(), - ), - ), - result=dict( - dn=dn3, - fqdn=[fqdn3], - description=[u'Test host 2'], - l=[u'Undisclosed location 2'], - krbprincipalname=[u'host/%s@%s' % (fqdn3, api.env.realm)], - managedby_host=[u'%s' % fqdn3, u'%s' % fqdn1], - ), - ), result) + self.add_managed_host(host, host2) + host2.retrieve() + + self.search_man_noman_hosts(host2, host) + self.search_man_hosts(host2, host3) + + self.remove_man_hosts(host, host2) + host.retrieve() + host2.retrieve() + + def add_managed_host(self, manager, underling): + command = manager.make_command('host_add_managedby', + underling.fqdn, host=manager.fqdn) + result = command() + underling.attrs['managedby_host'] = [manager.fqdn, underling.fqdn] - def test_show_managed_host(self, command): - result = command('host_show', fqdn3) assert_deepequal(dict( - value=fqdn3, - summary=None, - result=dict( - dn=dn3, - fqdn=[fqdn3], - description=[u'Test host 2'], - l=[u'Undisclosed location 2'], - krbprincipalname=[u'host/%s@%s' % (fqdn3, api.env.realm)], - has_keytab=False, - has_password=False, - managedby_host=[u'%s' % fqdn3, u'%s' % fqdn1], - ), + completed=1, + failed={'managedby': {'host': ()}}, + result=underling.filter_attrs(underling.managedby_keys), ), result) - def test_search_man_noman_hosts(self, command): - result = command('host_find', fqdn3, - man_host=fqdn3, - not_man_host=fqdn1) + def search_man_noman_hosts(self, host, noman_host): + command = host.make_find_command(host.fqdn, + man_host=host.fqdn, + not_man_host=noman_host.fqdn) + result = command() assert_deepequal(dict( count=1, truncated=False, summary=u'1 host matched', - result=[ - dict( - dn=dn3, - fqdn=[fqdn3], - description=[u'Test host 2'], - l=[u'Undisclosed location 2'], - krbprincipalname=[u'host/%s@%s' % (fqdn3, api.env.realm)], - has_keytab=False, - has_password=False, - managedby_host=[u'%s' % fqdn3, u'%s' % fqdn1], - ), - ], + result=[host.filter_attrs(host.retrieve_keys)], ), result) - def test_search_man_hosts(self, command): - result = command('host_find', man_host=[fqdn3, fqdn4]) + def search_man_hosts(self, host1, host2): + command = host1.make_find_command(man_host=[host1.fqdn, host2.fqdn]) + result = command() assert_deepequal(dict( count=0, truncated=False, @@ -697,145 +707,69 @@ class TestManagedHosts(RPCTest): result=[], ), result) - def test_remove_man_hosts(self, command): - result = command('host_remove_managedby', fqdn3, host=u'%s' % fqdn1) - assert_deepequal(dict( - completed=1, - failed=dict( - managedby=dict( - host=tuple(), - ), - ), - result=dict( - dn=dn3, - fqdn=[fqdn3], - description=[u'Test host 2'], - l=[u'Undisclosed location 2'], - krbprincipalname=[u'host/%s@%s' % (fqdn3, api.env.realm)], - managedby_host=[u'%s' % fqdn3], - ), - ), result) + def remove_man_hosts(self, manager, underling): + command = manager.make_command('host_remove_managedby', + underling.fqdn, host=manager.fqdn) + result = command() - def test_try_show_multiple_matches(self, command): - with raises_exact(errors.SingleMatchExpected(found=2)): - result = command('host_show', short3) + underling.attrs['managedby_host'] = [underling.fqdn] - # --- - - def test_add_managed_host_2(self, command): - result = command('host_add_managedby', fqdn4, host=fqdn3) assert_deepequal(dict( completed=1, - failed=dict( - managedby=dict( - host=tuple(), - ), - ), - result=dict( - dn=dn4, - fqdn=[fqdn4], - description=[u'Test host 4'], - l=[u'Undisclosed location 4'], - krbprincipalname=[u'host/%s@%s' % (fqdn4, api.env.realm)], - managedby_host=[fqdn4, fqdn3], - ), - ), result) - - def test_delete_managed_host_2(self, command): - result = command('host_del', fqdn3) - assert_deepequal(dict( - value=[fqdn3], - summary=u'Deleted host "%s"' % fqdn3, - result=dict(failed=[]), - ), result) - - def test_retrieve_managed_host_2(self, command): - result = command('host_show', fqdn4) - assert_deepequal(dict( - value=fqdn4, - summary=None, - result=dict( - dn=dn4, - fqdn=[fqdn4], - description=[u'Test host 4'], - l=[u'Undisclosed location 4'], - krbprincipalname=[u'host/%s@%s' % (fqdn4, api.env.realm)], - has_keytab=False, - has_password=False, - managedby_host=[fqdn4], - ), - ), result) - - -class TestAddWithNullPassword(RPCTest): - @classmethod - def clean_up(cls): - cls.clean('host_del', fqdn3) - - def test_add_host_with_null_password(self, command): - result = command('host_add', fqdn3, - description=u'Test host 3', - force=True, - userpassword=None) - assert_deepequal(dict( - value=fqdn3, - summary=u'Added host "%s"' % fqdn3, - result=dict( - dn=dn3, - fqdn=[fqdn3], - description=[u'Test host 3'], - krbprincipalname=[u'host/%s@%s' % (fqdn3, api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[u'%s' % fqdn3], - has_keytab=False, - has_password=False, - ), + failed={'managedby': {'host': ()}}, + result=underling.filter_attrs(underling.managedby_keys), ), result) -class TestProtectedMaster(RPCTest): - def test_try_delete_master(self, command): +class TestProtectedMaster(XMLRPC_test): + def test_try_delete_master(self, this_host): + command = this_host.make_delete_command() with raises_exact(errors.ValidationError( name='hostname', error=u'An IPA master host cannot be deleted or disabled')): - result = command('host_del', api.env.host) + command() - def test_try_disable_master(self, command): + def test_try_disable_master(self, this_host): + command = this_host.make_command('host_disable', this_host.fqdn) with raises_exact(errors.ValidationError( name='hostname', error=u'An IPA master host cannot be deleted or disabled')): - result = command('host_disable', api.env.host) + command() -class TestValidation(RPCTest): - def test_try_validate_add(self, command): +class TestValidation(XMLRPC_test): + def test_try_validate_create(self, invalid_host): + command = invalid_host.make_create_command() with raises_exact(errors.ValidationError( name='hostname', error=u"invalid domain-name: only letters, numbers, '-' are " + u"allowed. DNS label may not start or end with '-'")): - result = command('host_add', invalidfqdn1) + command() # The assumption on these next 4 tests is that if we don't get a # validation error then the request was processed normally. - def test_try_validate_mod(self, command): + def test_try_validate_update(self, invalid_host): + command = invalid_host.make_update_command({}) with raises_exact(errors.NotFound( - reason=u'%s: host not found' % invalidfqdn1)): - result = command('host_mod', invalidfqdn1) + reason=u'%s: host not found' % invalid_host.fqdn)): + command() - def test_try_validate_del(self, command): + def test_try_validate_delete(self, invalid_host): + command = invalid_host.make_delete_command() with raises_exact(errors.NotFound( - reason=u'%s: host not found' % invalidfqdn1)): - result = command('host_del', invalidfqdn1) + reason=u'%s: host not found' % invalid_host.fqdn)): + command() - def test_try_validate_show(self, command): + def test_try_validate_retrieve(self, invalid_host): + command = invalid_host.make_retrieve_command() with raises_exact(errors.NotFound( - reason=u'%s: host not found' % invalidfqdn1)): - result = command('host_show', invalidfqdn1) + reason=u'%s: host not found' % invalid_host.fqdn)): + command() - def test_try_validate_find(self, command): - result = command('host_find', invalidfqdn1) + def test_try_validate_find(self, invalid_host): + command = invalid_host.make_find_command(invalid_host.fqdn) + result = command() assert_deepequal(dict( count=0, truncated=False, @@ -844,311 +778,143 @@ class TestValidation(RPCTest): ), result) -@ordered -class TestHostFalsePwdChange(XMLRPC_test): - - fqdn1 = u'testhost1.%s' % api.env.domain - short1 = u'testhost1' - new_pass = u'pass_123' - command = "ipa-client/ipa-join" - - @classmethod - def setup_class(cls): - super(TestHostFalsePwdChange, cls).setup_class() - [cls.keytabfd,cls.keytabname] = tempfile.mkstemp() - os.close(cls.keytabfd) +@pytest.yield_fixture +def keytabname(request): + keytabfd, keytabname = tempfile.mkstemp() + try: + os.close(keytabfd) + yield keytabname + finally: + os.unlink(keytabname) - does_command_exist = os.path.isfile(cls.command) - if not does_command_exist: - pytest.skip("Command '%s' not found" % cls.command) - - # auxiliary function for checking whether the join operation has set - # correct attributes - def host_joined(self): - ret = api.Command['host_show'](self.fqdn1, all=True) - assert (ret['result']['has_keytab'] == True) - assert (ret['result']['has_password'] == False) +class TestHostFalsePwdChange(XMLRPC_test): - def test_a_join_host(self): + def test_join_host(self, host, keytabname): """ - Create a test host and join him into IPA. + Create a test host and join it into IPA. """ + join_command = 'ipa-client/ipa-join' + if not os.path.isfile(join_command): + pytest.skip("Command '%s' not found" % join_command) + # create a test host with bulk enrollment password - host = api.Command['host_add'](self.fqdn1, random=True, force=True) - random_pass = host['result']['randompassword'] + host.track_create() + del host.attrs['krbprincipalname'] + host.attrs['has_password'] = True + objclass = list(set( + host.attrs['objectclass']) - {u'krbprincipal', u'krbprincipalaux'}) + host.attrs['objectclass'] = objclass + + command = host.make_create_command(force=True) + result = command(random=True) + random_pass = result['result']['randompassword'] + + host.attrs['randompassword'] = random_pass + host.check_create(result) + + del host.attrs['randompassword'] # joint the host with the bulk password - new_args = [self.command, - "-s", api.env.host, - "-h", self.fqdn1, - "-k", self.keytabname, - "-w", random_pass, - "-q", - ] + new_args = [ + join_command, + "-s", host.api.env.host, + "-h", host.fqdn, + "-k", keytabname, + "-w", random_pass, + "-q", + ] + try: + out, err, rc = ipautil.run(new_args) + except ipautil.CalledProcessError as e: # join operation may fail on 'adding key into keytab', but # the keytab is not necessary for further tests - (out, err, rc) = ipautil.run(new_args, None) - except ipautil.CalledProcessError as e: - pass - finally: - self.host_joined() + print e - def test_b_try_password(self): - """ - Try to change the password of enrolled host with specified password - """ + host.attrs['has_keytab'] = True + host.attrs['has_password'] = False + host.attrs['krbprincipalname'] = [u'host/%s@%s' % (host.fqdn, + host.api.env.realm)] + host.retrieve() + + # Try to change the password of enrolled host with specified password + command = host.make_update_command( + updates=dict(userpassword=u'pass_123')) with pytest.raises(errors.ValidationError): - api.Command['host_mod'](self.fqdn1, userpassword=self.new_pass) + command() - def test_c_try_random(self): - """ - Try to change the password of enrolled host with random password - """ + # Try to change the password of enrolled host with random password + command = host.make_update_command(updates=dict(random=True)) with pytest.raises(errors.ValidationError): - api.Command['host_mod'](self.fqdn1, random=True) + command() - def test_d_cleanup(self): - """ - Clean up test data - """ - os.unlink(self.keytabname) - api.Command['host_del'](self.fqdn1) - # verify that it's gone - with pytest.raises(errors.NotFound): - api.Command['host_show'](self.fqdn1) - - -@ordered -class TestHostDNS(RPCTest): - - @classmethod - def clean_up(cls): - cls.clean('host_del', - ipv6only_host_fqdn, ipv4only_host_fqdn, ipv46both_host_fqdn, - ipv4_fromip_host_fqdn, ipv6_fromip_host_fqdn, - **{'continue': True}) - cls.clean('dnszone_del', dnszone, revzone, revipv6zone, - **{'continue': True}) - - def test_create_zone(self, command): - result = command('dnszone_add', dnszone, idnssoarname=dnszone_rname) - assert_deepequal({ - 'value': dnszone_dnsname, - 'summary': None, - 'result': { - 'dn': dnszone_dn, - 'idnsname': [dnszone_dnsname], - 'idnszoneactive': [u'TRUE'], - 'idnssoamname': [self_server_ns_dnsname], - 'nsrecord': lambda x: True, - 'idnssoarname': [dnszone_rname_dnsname], - 'idnssoaserial': [fuzzy_digits], - 'idnssoarefresh': [fuzzy_digits], - 'idnssoaretry': [fuzzy_digits], - 'idnssoaexpire': [fuzzy_digits], - 'idnssoaminimum': [fuzzy_digits], - 'idnsallowdynupdate': [u'FALSE'], - 'idnsupdatepolicy': [u'grant %(realm)s krb5-self * A; ' - u'grant %(realm)s krb5-self * AAAA; ' - u'grant %(realm)s krb5-self * SSHFP;' % - dict(realm=api.env.realm)], - 'idnsallowtransfer': [u'none;'], - 'idnsallowquery': [u'any;'], - 'objectclass': objectclasses.dnszone, - }, - }, result) - - def test_create_reverse_zone(self, command): - result = command('dnszone_add', revzone, idnssoarname=dnszone_rname) - assert_deepequal({ - 'value': revzone_dnsname, - 'summary': None, - 'result': { - 'dn': revzone_dn, - 'idnsname': [revzone_dnsname], - 'idnszoneactive': [u'TRUE'], - 'idnssoamname': [self_server_ns_dnsname], - 'nsrecord': lambda x: True, - 'idnssoarname': [dnszone_rname_dnsname], - 'idnssoaserial': [fuzzy_digits], - 'idnssoarefresh': [fuzzy_digits], - 'idnssoaretry': [fuzzy_digits], - 'idnssoaexpire': [fuzzy_digits], - 'idnssoaminimum': [fuzzy_digits], - 'idnsallowdynupdate': [u'FALSE'], - 'idnsupdatepolicy': [ - u'grant %(realm)s krb5-subdomain %(zone)s PTR;' % - dict(realm=api.env.realm, zone=revzone)], - 'idnsallowtransfer': [u'none;'], - 'idnsallowquery': [u'any;'], - 'objectclass': objectclasses.dnszone, - }, - }, result) - - def test_create_ipv6_reverse_zone(self, command): - result = command('dnszone_add', revipv6zone, - idnssoarname=dnszone_rname) - assert_deepequal({ - 'value': revipv6zone_dnsname, - 'summary': None, - 'result': { - 'dn': revipv6zone_dn, - 'idnsname': [revipv6zone_dnsname], - 'idnszoneactive': [u'TRUE'], - 'idnssoamname': [self_server_ns_dnsname], - 'nsrecord': lambda x: True, - 'idnssoarname': [dnszone_rname_dnsname], - 'idnssoaserial': [fuzzy_digits], - 'idnssoarefresh': [fuzzy_digits], - 'idnssoaretry': [fuzzy_digits], - 'idnssoaexpire': [fuzzy_digits], - 'idnssoaminimum': [fuzzy_digits], - 'idnsallowdynupdate': [u'FALSE'], - 'idnsupdatepolicy': [ - u'grant %(realm)s krb5-subdomain %(zone)s PTR;' % - dict(realm=api.env.realm, zone=revipv6zone)], - 'idnsallowtransfer': [u'none;'], - 'idnsallowquery': [u'any;'], - 'objectclass': objectclasses.dnszone, - }, - }, result) - - def test_add_ipv6only_a_record(self, command): - result = command('dnsrecord_add', dnszone, ipv6only, - aaaarecord=aaaarec) - assert_deepequal({ - 'value': ipv6only_dnsname, - 'summary': None, - 'result': { - 'dn': ipv6only_dn, - 'idnsname': [ipv6only_dnsname], - 'aaaarecord': [aaaarec], - 'objectclass': objectclasses.dnsrecord, - }, - }, result) - - def test_add_ipv4only_a_record(self, command): - result = command('dnsrecord_add', dnszone, ipv4only, arecord=arec) - assert_deepequal({ - 'value': ipv4only_dnsname, - 'summary': None, - 'result': { - 'dn': ipv4only_dn, - 'idnsname': [ipv4only_dnsname], - 'arecord': [arec], - 'objectclass': objectclasses.dnsrecord, - }, - }, result) - - def test_add_ipv46both_aaaa_records(self, command): - result = command('dnsrecord_add', dnszone, ipv46both, - arecord=arec2, aaaarecord=aaaarec) - assert_deepequal({ - 'value': ipv46both_dnsname, - 'summary': None, - 'result': { - 'dn': ipv46both_dn, - 'idnsname': [ipv46both_dnsname], - 'arecord': [arec2], - 'aaaarecord': [aaaarec], - 'objectclass': objectclasses.dnsrecord, - }, - }, result) - - def test_add_ipv6only_host(self, command): - result = command('host_add', ipv6only_host_fqdn, - description=u'Test host 5', - l=u'Undisclosed location 5') - assert_deepequal(dict( - value=ipv6only_host_fqdn, - summary=u'Added host "%s"' % ipv6only_host_fqdn, - result=dict( - dn=ipv6only_host_dn, - fqdn=[ipv6only_host_fqdn], - description=[u'Test host 5'], - l=[u'Undisclosed location 5'], - krbprincipalname=[u'host/%s@%s' % (ipv6only_host_fqdn, - api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[ipv6only_host_fqdn], - has_keytab=False, - has_password=False, - ), - ), result) - def test_add_ipv4only_host(self, command): - result = command('host_add', ipv4only_host_fqdn, - description=u'Test host 6', - l=u'Undisclosed location 6') - assert_deepequal(dict( - value=ipv4only_host_fqdn, - summary=u'Added host "%s"' % ipv4only_host_fqdn, - result=dict( - dn=ipv4only_host_dn, - fqdn=[ipv4only_host_fqdn], - description=[u'Test host 6'], - l=[u'Undisclosed location 6'], - krbprincipalname=[u'host/%s@%s' % (ipv4only_host_fqdn, - api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[ipv4only_host_fqdn], - has_keytab=False, - has_password=False, - ), - ), result) +@pytest.yield_fixture(scope='class') +def dns_setup(host): + try: + host.run_command('dnszone_del', dnszone, revzone, revipv6zone, + **{'continue': True}) + except (errors.NotFound, errors.EmptyModlist): + pass - def test_add_ipv46both_host(self, command): - result = command('host_add', ipv46both_host_fqdn, - description=u'Test host 7', - l=u'Undisclosed location 7') - assert_deepequal(dict( - value=ipv46both_host_fqdn, - summary=u'Added host "%s"' % ipv46both_host_fqdn, - result=dict( - dn=ipv46both_host_dn, - fqdn=[ipv46both_host_fqdn], - description=[u'Test host 7'], - l=[u'Undisclosed location 7'], - krbprincipalname=[u'host/%s@%s' % (ipv46both_host_fqdn, - api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[ipv46both_host_fqdn], - has_keytab=False, - has_password=False, - ), - ), result) + try: + host.run_command('dnszone_add', dnszone, idnssoarname=dnszone_rname) + host.run_command('dnszone_add', revzone, idnssoarname=dnszone_rname) + host.run_command('dnszone_add', revipv6zone, + idnssoarname=dnszone_rname) + yield + finally: + try: + host.run_command('dnszone_del', dnszone, revzone, revipv6zone, + **{'continue': True}) + except (errors.NotFound, errors.EmptyModlist): + pass - def test_add_ipv4_host_from_ip(self, command): - result = command('host_add', ipv4_fromip_host_fqdn, - description=u'Test host 8', - l=u'Undisclosed location 8', - ip_address=ipv4_fromip_ip) - assert_deepequal(dict( - value=ipv4_fromip_host_fqdn, - summary=u'Added host "%s"' % ipv4_fromip_host_fqdn, - result=dict( - dn=ipv4_fromip_host_dn, - fqdn=[ipv4_fromip_host_fqdn], - description=[u'Test host 8'], - l=[u'Undisclosed location 8'], - krbprincipalname=[u'host/%s@%s' % (ipv4_fromip_host_fqdn, - api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[ipv4_fromip_host_fqdn], - has_keytab=False, - has_password=False, - ), - ), result) - def test_ipv4_a_record_created(self, command): - result = command('dnsrecord_show', dnszone, ipv4_fromip) +class TestHostDNS(XMLRPC_test): + def test_add_ipv6only_host(self, dns_setup, ipv6only_host): + ipv6only_host.run_command('dnsrecord_add', dnszone, + ipv6only_host.shortname, aaaarecord=aaaarec) + try: + ipv6only_host.create(force=False) + finally: + command = ipv6only_host.run_command('dnsrecord_del', dnszone, + ipv6only_host.shortname, + aaaarecord=aaaarec) + + def test_add_ipv4only_host(self, dns_setup, ipv4only_host): + ipv4only_host.run_command('dnsrecord_add', dnszone, + ipv4only_host.shortname, arecord=arec) + try: + ipv4only_host.create(force=False) + finally: + command = ipv4only_host.run_command('dnsrecord_del', dnszone, + ipv4only_host.shortname, + arecord=arec) + + def test_add_ipv46both_host(self, dns_setup, ipv46both_host): + ipv46both_host.run_command('dnsrecord_add', dnszone, + ipv46both_host.shortname, + arecord=arec2, aaaarecord=aaaarec2) + try: + ipv46both_host.create(force=False) + finally: + command = ipv46both_host.run_command('dnsrecord_del', dnszone, + ipv46both_host.shortname, + arecord=arec2, + aaaarecord=aaaarec2) + + def test_add_ipv4_host_from_ip(self, dns_setup, ipv4_fromip_host): + ipv4_fromip_host.ensure_missing() + ipv4_fromip_host.track_create() + command = ipv4_fromip_host.make_create_command(force=False) + result = command(ip_address=ipv4_fromip_ip) + ipv4_fromip_host.check_create(result) + + result = ipv4_fromip_host.run_command('dnsrecord_show', dnszone, + ipv4_fromip_host.shortname) assert_deepequal(dict( value=ipv4_fromip_dnsname, summary=None, @@ -1159,43 +925,27 @@ class TestHostDNS(RPCTest): ), ), result) - def test_ipv4_ptr_record_created(self, command): - result = command('dnsrecord_show', revzone, ipv4_fromip_ptr) + result = ipv4_fromip_host.run_command('dnsrecord_show', revzone, + ipv4_fromip_ptr) assert_deepequal(dict( value=ipv4_fromip_ptr_dnsname, summary=None, result=dict( dn=ipv4_fromip_ptr_dn, idnsname=[ipv4_fromip_ptr_dnsname], - ptrrecord=[ipv4_fromip_ptrrec], + ptrrecord=[ipv4_fromip_host.fqdn + '.'], ), ), result) - def test_add_ipv6_host_from_ip(self, command): - result = command('host_add', ipv6_fromip_host_fqdn, - description=u'Test host 9', - l=u'Undisclosed location 9', - ip_address=ipv6_fromip_ipv6) - assert_deepequal(dict( - value=ipv6_fromip_host_fqdn, - summary=u'Added host "%s"' % ipv6_fromip_host_fqdn, - result=dict( - dn=ipv6_fromip_host_dn, - fqdn=[ipv6_fromip_host_fqdn], - description=[u'Test host 9'], - l=[u'Undisclosed location 9'], - krbprincipalname=[u'host/%s@%s' % (ipv6_fromip_host_fqdn, - api.env.realm)], - objectclass=objectclasses.host, - ipauniqueid=[fuzzy_uuid], - managedby_host=[ipv6_fromip_host_fqdn], - has_keytab=False, - has_password=False, - ), - ), result) + def test_add_ipv6_host_from_ip(self, dns_setup, ipv6_fromip_host): + ipv6_fromip_host.ensure_missing() + ipv6_fromip_host.track_create() + command = ipv6_fromip_host.make_create_command(force=False) + result = command(ip_address=ipv6_fromip_ipv6) + ipv6_fromip_host.check_create(result) - def test_ipv6_aaaa_record_created(self, command): - result = command('dnsrecord_show', dnszone, ipv6_fromip) + result = ipv6_fromip_host.run_command('dnsrecord_show', dnszone, + ipv6_fromip_host.shortname) assert_deepequal(dict( value=ipv6_fromip_dnsname, summary=None, @@ -1206,102 +956,100 @@ class TestHostDNS(RPCTest): ), ), result) - def test_ipv6_ptr_record_added(self, command): - result = command('dnsrecord_show', revipv6zone, ipv6_fromip_ptr) + result = ipv6_fromip_host.run_command('dnsrecord_show', revipv6zone, + ipv6_fromip_ptr) assert_deepequal(dict( value=ipv6_fromip_ptr_dnsname, summary=None, result=dict( dn=ipv6_fromip_ptr_dn, idnsname=[ipv6_fromip_ptr_dnsname], - ptrrecord=[ipv6_fromip_ptrrec], + ptrrecord=[ipv6_fromip_host.fqdn + '.'], ), ), result) -@ordered -class TestHostAllowedTo(RPCTest): - - @classmethod - def clean_up(cls): - cls.clean('user_del', user1, user2, **{'continue': True}) - cls.clean('group_del', group1, group2, **{'continue': True}) - cls.clean('host_del', fqdn1, fqdn3, **{'continue': True}) - cls.clean('hostgroup_del', hostgroup1, **{'continue': True}) - - def test_prepare_entries(self, command): - result = command('user_add', givenname=u'Test', sn=u'User1') - result = command('user_add', givenname=u'Test', sn=u'User2') - result = command('group_add', group1) - result = command('group_add', group2) - result = command('host_add', fqdn1, force=True) - result = command('host_add', fqdn3, force=True) - result = command('hostgroup_add', hostgroup1, - description=u'Test hostgroup 1') - - def test_user_allow_retrieve_keytab(self, command): - result = command('host_allow_retrieve_keytab', fqdn1, user=user1) +@pytest.fixture(scope='class') +def allowedto_context(request, host3): + def cleanup(): + try: + host3.run_command('user_del', user1, user2, **{'continue': True}) + except errors.NotFound: + pass + try: + host3.run_command('group_del', group1, group2, + **{'continue': True}) + except errors.NotFound: + pass + try: + host3.run_command('hostgroup_del', hostgroup1) + except errors.NotFound: + pass + + cleanup() + request.addfinalizer(cleanup) + + host3.ensure_exists() + + host3.run_command('user_add', givenname=u'Test', sn=u'User1') + host3.run_command('user_add', givenname=u'Test', sn=u'User2') + host3.run_command('group_add', group1) + host3.run_command('group_add', group2) + host3.run_command('hostgroup_add', hostgroup1, + description=u'Test hostgroup 1') + + +class TestHostAllowedTo(XMLRPC_test): + + def test_user_allow_retrieve_keytab(self, allowedto_context, host): + host.ensure_exists() + result = host.run_command('host_allow_retrieve_keytab', host.fqdn, + user=user1) + host.attrs['ipaallowedtoperform_read_keys_user'] = [user1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=1, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), + result=host.filter_attrs(host.allowedto_keys), ), result) - def test_duplicate_add_user(self, command): - result = command('host_allow_retrieve_keytab', fqdn1, user=user1) + # Duplicates should not be accepted + result = host.run_command('host_allow_retrieve_keytab', host.fqdn, + user=user1) assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( - group=[], - host=[], - hostgroup=[], + group=[], host=[], hostgroup=[], user=[[user1, u'This entry is already a member']], ), ), completed=0, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), + result=host.filter_attrs(host.allowedto_keys), ), result) - def test_group_allow_retrieve_keytab(self, command): - result = command('host_allow_retrieve_keytab', fqdn1, - group=[group1, group2], host=[fqdn3], - hostgroup=[hostgroup1]) + def test_group_allow_retrieve_keytab(self, allowedto_context, host, host3): + host.ensure_exists() + host3.ensure_exists() + result = host.run_command('host_allow_retrieve_keytab', host.fqdn, + group=[group1, group2], host=[host3.fqdn], + hostgroup=[hostgroup1]) + host.attrs['ipaallowedtoperform_read_keys_group'] = [group1, group2] + host.attrs['ipaallowedtoperform_read_keys_host'] = [host3.fqdn] + host.attrs['ipaallowedtoperform_read_keys_hostgroup'] = [hostgroup1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=4, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1, group2], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), + result=host.filter_attrs(host.allowedto_keys), ), result) - def test_invalid_disallow_retrieve(self, command): - result = command('host_disallow_retrieve_keytab', fqdn1, - user=[user2]) + # Non-members cannot be removed + result = host.run_command('host_disallow_retrieve_keytab', host.fqdn, + user=[user2]) assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( @@ -1312,99 +1060,65 @@ class TestHostAllowedTo(RPCTest): ), ), completed=0, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1, group2], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), + result=host.filter_attrs(host.allowedto_keys), ), result) - def test_disallow_retrieve(self, command): - result = command('host_disallow_retrieve_keytab', fqdn1, - group=[group2]) + # Disallow one of the existing allowed groups + result = host.run_command('host_disallow_retrieve_keytab', host.fqdn, + group=[group2]) + host.attrs['ipaallowedtoperform_read_keys_group'] = [group1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=1, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), + result=host.filter_attrs(host.allowedto_keys), ), result) - def test_allow_create(self, command): - result = command('host_allow_create_keytab', fqdn1, - group=[group1, group2], user=[user1], host=[fqdn3], - hostgroup=[hostgroup1]) + host.retrieve() + + def test_allow_create(self, allowedto_context, host, host3): + host.ensure_exists() + host3.ensure_exists() + result = host.run_command('host_allow_create_keytab', host.fqdn, + group=[group1, group2], user=[user1], + host=[host3.fqdn], + hostgroup=[hostgroup1]) + host.attrs['ipaallowedtoperform_write_keys_user'] = [user1] + host.attrs['ipaallowedtoperform_write_keys_group'] = [group1, group2] + host.attrs['ipaallowedtoperform_write_keys_host'] = [host3.fqdn] + host.attrs['ipaallowedtoperform_write_keys_hostgroup'] = [hostgroup1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=5, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - ipaallowedtoperform_write_keys_user=[user1], - ipaallowedtoperform_write_keys_group=[group1, group2], - ipaallowedtoperform_write_keys_host=[fqdn3], - ipaallowedtoperform_write_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), - ), result) + result=host.filter_attrs(host.allowedto_keys), + ), result) - def test_duplicate_allow_create(self, command): - result = command('host_allow_create_keytab', fqdn1, - group=[group1], user=[user1], host=[fqdn3], - hostgroup=[hostgroup1]) + # Duplicates should not be accepted + result = host.run_command('host_allow_create_keytab', host.fqdn, + group=[group1], user=[user1], + host=[host3.fqdn], hostgroup=[hostgroup1]) assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( group=[[group1, u'This entry is already a member']], - host=[[fqdn3, u'This entry is already a member']], + host=[[host3.fqdn, u'This entry is already a member']], user=[[user1, u'This entry is already a member']], hostgroup=[[hostgroup1, u'This entry is already a member']], ), ), completed=0, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - ipaallowedtoperform_write_keys_user=[user1], - ipaallowedtoperform_write_keys_group=[group1, group2], - ipaallowedtoperform_write_keys_host=[fqdn3], - ipaallowedtoperform_write_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), - ), result) + result=host.filter_attrs(host.allowedto_keys), + ), result) - def test_invalid_disallow_create(self, command): - result = command('host_disallow_create_keytab', fqdn1, - user=[user2]) + # Non-mambers cannot be removed + result = host.run_command('host_disallow_create_keytab', host.fqdn, + user=[user2]) assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( @@ -1415,25 +1129,13 @@ class TestHostAllowedTo(RPCTest): ), ), completed=0, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - ipaallowedtoperform_write_keys_user=[user1], - ipaallowedtoperform_write_keys_group=[group1, group2], - ipaallowedtoperform_write_keys_host=[fqdn3], - ipaallowedtoperform_write_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), - ), result) + result=host.filter_attrs(host.allowedto_keys), + ), result) - def test_disallow_create(self, command): - result = command('host_disallow_create_keytab', fqdn1, - group=[group2]) + # Disallow one of the existing allowed groups + result = host.run_command('host_disallow_create_keytab', host.fqdn, + group=[group2]) + host.attrs['ipaallowedtoperform_write_keys_group'] = [group1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( @@ -1444,64 +1146,12 @@ class TestHostAllowedTo(RPCTest): ), ), completed=1, - result=dict( - dn=dn1, - fqdn=[fqdn1], - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - ipaallowedtoperform_write_keys_user=[user1], - ipaallowedtoperform_write_keys_group=[group1], - ipaallowedtoperform_write_keys_host=[fqdn3], - ipaallowedtoperform_write_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), + result=host.filter_attrs(host.allowedto_keys), ), result) - def test_host_show(self, command): - result = command('host_show', fqdn1) - assert_deepequal(dict( - value=fqdn1, - summary=None, - result=dict( - dn=dn1, - fqdn=[fqdn1], - has_keytab=False, - has_password=False, - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - ipaallowedtoperform_write_keys_user=[user1], - ipaallowedtoperform_write_keys_group=[group1], - ipaallowedtoperform_write_keys_host=[fqdn3], - ipaallowedtoperform_write_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), - ), result) + host.retrieve() - def test_host_mod(self, command): - result = command('host_mod', fqdn1, description=u"desc") - assert_deepequal(dict( - value=fqdn1, - summary=u'Modified host "%s"' % fqdn1, - result=dict( - description=[u"desc"], - fqdn=[fqdn1], - has_keytab=False, - has_password=False, - ipaallowedtoperform_read_keys_user=[user1], - ipaallowedtoperform_read_keys_group=[group1], - ipaallowedtoperform_read_keys_host=[fqdn3], - ipaallowedtoperform_read_keys_hostgroup=[hostgroup1], - ipaallowedtoperform_write_keys_user=[user1], - ipaallowedtoperform_write_keys_group=[group1], - ipaallowedtoperform_write_keys_host=[fqdn3], - ipaallowedtoperform_write_keys_hostgroup=[hostgroup1], - krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], - managedby_host=[fqdn1], - ), - ), result) + def test_host_mod(self, host): + # Done (usually) at the end to ensure the tracking works well + host.update(updates=dict(description=u'desc'), + expected_updates=dict(description=[u'desc'])) diff --git a/ipatests/test_xmlrpc/xmlrpc_test.py b/ipatests/test_xmlrpc/xmlrpc_test.py index acfdf711d..808abae19 100644 --- a/ipatests/test_xmlrpc/xmlrpc_test.py +++ b/ipatests/test_xmlrpc/xmlrpc_test.py @@ -367,32 +367,3 @@ def raises_exact(expected_exception): assert expected_exception.strerror == got_exception.strerror else: raise AssertionError('did not raise!') - - -class RPCTest(XMLRPC_test): - """Base class for RPC tests""" - @classmethod - def setup_class(cls): - super(RPCTest, cls).setup_class() - cls.clean_up() - - @classmethod - def teardown_class(cls): - cls.clean_up() - super(RPCTest, cls).teardown_class() - - @classmethod - def clean_up(self): - """Cleanup run on both setup and teardown - - To be overridden in subclasses. - Usually calls the clean() method. - """ - - @classmethod - def clean(cls, command, *args, **options): - """Run a command, ignoring NotFound/EmptyModlist errors""" - try: - api.Command[command](*args, **options) - except (errors.NotFound, errors.EmptyModlist): - pass |