diff options
author | Petr Viktorin <pviktori@redhat.com> | 2014-11-13 16:23:56 +0100 |
---|---|---|
committer | Tomas Babej <tbabej@redhat.com> | 2014-12-11 07:04:58 +0100 |
commit | 74f7d67fd5484137b77e54ab50d7869daa6a7db5 (patch) | |
tree | 976d9c22b7f0587135acdbb4418fec0155160c04 /ipatests/test_integration/host.py | |
parent | 8822be36d342c2bc499937c3f144e11ae98d8e58 (diff) | |
download | freeipa-74f7d67fd5484137b77e54ab50d7869daa6a7db5.tar.gz freeipa-74f7d67fd5484137b77e54ab50d7869daa6a7db5.tar.xz freeipa-74f7d67fd5484137b77e54ab50d7869daa6a7db5.zip |
test_integration: Use python-pytest-multihost
The core integration testing functionality was split into a separate
project. Use this project, and configure it for FreeIPA.
The "mh" (multihost) fixture is made available for integration tests.
Configuration based on environment variables is moved into a separate
module, to ease eventual deprecation.
Reviewed-By: Tomas Babej <tbabej@redhat.com>
Diffstat (limited to 'ipatests/test_integration/host.py')
-rw-r--r-- | ipatests/test_integration/host.py | 238 |
1 files changed, 10 insertions, 228 deletions
diff --git a/ipatests/test_integration/host.py b/ipatests/test_integration/host.py index 7a3a6ac77..399884fdb 100644 --- a/ipatests/test_integration/host.py +++ b/ipatests/test_integration/host.py @@ -19,93 +19,13 @@ """Host class for integration testing""" -import os -import socket +import pytest_multihost.host from ipapython.ipaldap import IPAdmin -from ipapython import ipautil -from ipapython.ipa_log_manager import log_mgr -from ipatests.test_integration import transport -from ipatests.test_integration.util import check_config_dict_empty -from ipatests.test_integration.util import TESTHOST_PREFIX -class BaseHost(object): +class Host(pytest_multihost.host.Host): """Representation of a remote IPA host""" - transport_class = None - - def __init__(self, domain, hostname, role, ip=None, - external_hostname=None): - self.domain = domain - self.role = str(role) - - shortname, dot, ext_domain = hostname.partition('.') - self.shortname = shortname - - self.hostname = (hostname[:-1] - if hostname.endswith('.') - else shortname + '.' + self.domain.name) - - self.external_hostname = str(external_hostname or hostname) - - self.netbios = self.domain.name.split('.')[0].upper() - - self.logger_name = '%s.%s.%s' % ( - self.__module__, type(self).__name__, shortname) - self.log = log_mgr.get_logger(self.logger_name) - - if ip: - self.ip = str(ip) - else: - if self.config.ipv6: - # $(dig +short $M $rrtype|tail -1) - stdout, stderr, returncode = ipautil.run( - ['dig', '+short', self.external_hostname, 'AAAA']) - self.ip = stdout.splitlines()[-1].strip() - else: - try: - self.ip = socket.gethostbyname(self.external_hostname) - except socket.gaierror: - self.ip = None - - if not self.ip: - raise RuntimeError('Could not determine IP address of %s' % - self.external_hostname) - - self.root_password = self.config.root_password - self.root_ssh_key_filename = self.config.root_ssh_key_filename - self.host_key = None - self.ssh_port = 22 - - self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh') - - self.log_collectors = [] - - def __str__(self): - template = ('<{s.__class__.__name__} {s.hostname} ({s.role})>') - return template.format(s=self) - - def __repr__(self): - template = ('<{s.__module__}.{s.__class__.__name__} ' - '{s.hostname} ({s.role})>') - return template.format(s=self) - - def add_log_collector(self, collector): - """Register a log collector for this host""" - self.log_collectors.append(collector) - - def remove_log_collector(self, collector): - """Unregister a log collector""" - self.log_collectors.remove(collector) - - @classmethod - def from_env(cls, env, domain, hostname, role, index, domain_index): - ip = env.get('BEAKER%s%s_IP_env%s' % - (role.upper(), index, domain_index), None) - external_hostname = env.get( - 'BEAKER%s%s_env%s' % (role.upper(), index, domain_index), None) - - return cls._make_host(domain, hostname, role, ip, external_hostname) @staticmethod def _make_host(domain, hostname, role, ip, external_hostname): @@ -120,84 +40,6 @@ class BaseHost(object): return cls(domain, hostname, role, ip, external_hostname) - @classmethod - def from_dict(cls, dct, domain): - if isinstance(dct, basestring): - dct = {'name': dct} - try: - role = dct.pop('role').lower() - except KeyError: - role = domain.static_roles[0] - - hostname = dct.pop('name') - if '.' not in hostname: - hostname = '.'.join((hostname, domain.name)) - - ip = dct.pop('ip', None) - external_hostname = dct.pop('external_hostname', None) - - check_config_dict_empty(dct, 'host %s' % hostname) - - return cls._make_host(domain, hostname, role, ip, external_hostname) - - def to_dict(self): - return { - 'name': str(self.hostname), - 'ip': self.ip, - 'role': self.role, - 'external_hostname': self.external_hostname, - } - - @property - def config(self): - return self.domain.config - - def to_env(self, **kwargs): - """Return environment variables specific to this host""" - env = self.domain.to_env(**kwargs) - - index = self.domain.hosts.index(self) + 1 - domain_index = self.config.domains.index(self.domain) + 1 - - role = self.role.upper() - if self.role != 'master': - role += str(index) - - env['MYHOSTNAME'] = self.hostname - env['MYBEAKERHOSTNAME'] = self.external_hostname - env['MYIP'] = self.ip - - prefix = ('' if self.role in self.domain.static_roles - else TESTHOST_PREFIX) - env_suffix = '_env%s' % domain_index - env['MYROLE'] = '%s%s%s' % (prefix, role, env_suffix) - env['MYENV'] = str(domain_index) - - return env - - @property - def transport(self): - try: - return self._transport - except AttributeError: - cls = self.transport_class - if cls: - # transport_class is None in the base class and must be - # set in subclasses. - # Pylint reports that calling None will fail - self._transport = cls(self) # pylint: disable=E1102 - else: - raise NotImplementedError('transport class not available') - return self._transport - - def get_file_contents(self, filename): - """Shortcut for transport.get_file_contents""" - return self.transport.get_file_contents(filename) - - def put_file_contents(self, filename, contents): - """Shortcut for transport.put_file_contents""" - self.transport.put_file_contents(filename, contents) - def ldap_connect(self): """Return an LDAPClient authenticated to this host as directory manager """ @@ -208,80 +50,20 @@ class BaseHost(object): ldap.do_simple_bind(binddn, self.config.dirman_password) return ldap - def collect_log(self, filename): - for collector in self.log_collectors: - collector(self, filename) - - def run_command(self, argv, set_env=True, stdin_text=None, - log_stdout=True, raiseonerr=True, - cwd=None): - """Run the given command on this host - - Returns a Shell instance. The command will have already run in the - shell when this method returns, so its stdout_text, stderr_text, and - returncode attributes will be available. - - :param argv: Command to run, as either a Popen-style list, or a string - containing a shell script - :param set_env: If true, env.sh exporting configuration variables will - be sourced before running the command. - :param stdin_text: If given, will be written to the command's stdin - :param log_stdout: If false, standard output will not be logged - (but will still be available as cmd.stdout_text) - :param raiseonerr: If true, an exception will be raised if the command - does not exit with return code 0 - :param cwd: The working directory for the command - """ - raise NotImplementedError() - - -class Host(BaseHost): - """A Unix host""" - transport_class = transport.SSHTransport - - def run_command(self, argv, set_env=True, stdin_text=None, - log_stdout=True, raiseonerr=True, - cwd=None): - # This will give us a Bash shell - command = self.transport.start_shell(argv, log_stdout=log_stdout) - - # Set working directory - if cwd is None: - cwd = self.config.test_dir - command.stdin.write('cd %s\n' % ipautil.shell_quote(cwd)) - - # Set the environment - if set_env: - command.stdin.write('. %s\n' % - ipautil.shell_quote(self.env_sh_path)) - command.stdin.write('set -e\n') - - if isinstance(argv, basestring): - # Run a shell command given as a string - command.stdin.write('(') - command.stdin.write(argv) - command.stdin.write(')') - else: - # Run a command given as a popen-style list (no shell expansion) - for arg in argv: - command.stdin.write(ipautil.shell_quote(arg)) - command.stdin.write(' ') - - command.stdin.write(';exit\n') - if stdin_text: - command.stdin.write(stdin_text) - command.stdin.flush() + @classmethod + def from_env(cls, env, domain, hostname, role, index, domain_index): + from ipatests.test_integration.env_config import host_from_env + return host_from_env(env, domain, hostname, role, index, domain_index) - command.wait(raiseonerr=raiseonerr) - return command + def to_env(self, **kwargs): + from ipatests.test_integration.env_config import host_to_env + return host_to_env(self, **kwargs) -class WinHost(BaseHost): +class WinHost(pytest_multihost.host.WinHost): """ Representation of a remote Windows host. This serves as a sketch class once we move from manual preparation of Active Directory to the automated setup. """ - - pass |