summaryrefslogtreecommitdiffstats
path: root/ipatests/test_integration/host.py
diff options
context:
space:
mode:
authorPetr Viktorin <pviktori@redhat.com>2014-11-13 16:23:56 +0100
committerTomas Babej <tbabej@redhat.com>2014-12-11 07:04:58 +0100
commit74f7d67fd5484137b77e54ab50d7869daa6a7db5 (patch)
tree976d9c22b7f0587135acdbb4418fec0155160c04 /ipatests/test_integration/host.py
parent8822be36d342c2bc499937c3f144e11ae98d8e58 (diff)
downloadfreeipa-74f7d67fd5484137b77e54ab50d7869daa6a7db5.tar.gz
freeipa-74f7d67fd5484137b77e54ab50d7869daa6a7db5.tar.xz
freeipa-74f7d67fd5484137b77e54ab50d7869daa6a7db5.zip
test_integration: Use python-pytest-multihost
The core integration testing functionality was split into a separate project. Use this project, and configure it for FreeIPA. The "mh" (multihost) fixture is made available for integration tests. Configuration based on environment variables is moved into a separate module, to ease eventual deprecation. Reviewed-By: Tomas Babej <tbabej@redhat.com>
Diffstat (limited to 'ipatests/test_integration/host.py')
-rw-r--r--ipatests/test_integration/host.py238
1 files changed, 10 insertions, 228 deletions
diff --git a/ipatests/test_integration/host.py b/ipatests/test_integration/host.py
index 7a3a6ac77..399884fdb 100644
--- a/ipatests/test_integration/host.py
+++ b/ipatests/test_integration/host.py
@@ -19,93 +19,13 @@
"""Host class for integration testing"""
-import os
-import socket
+import pytest_multihost.host
from ipapython.ipaldap import IPAdmin
-from ipapython import ipautil
-from ipapython.ipa_log_manager import log_mgr
-from ipatests.test_integration import transport
-from ipatests.test_integration.util import check_config_dict_empty
-from ipatests.test_integration.util import TESTHOST_PREFIX
-class BaseHost(object):
+class Host(pytest_multihost.host.Host):
"""Representation of a remote IPA host"""
- transport_class = None
-
- def __init__(self, domain, hostname, role, ip=None,
- external_hostname=None):
- self.domain = domain
- self.role = str(role)
-
- shortname, dot, ext_domain = hostname.partition('.')
- self.shortname = shortname
-
- self.hostname = (hostname[:-1]
- if hostname.endswith('.')
- else shortname + '.' + self.domain.name)
-
- self.external_hostname = str(external_hostname or hostname)
-
- self.netbios = self.domain.name.split('.')[0].upper()
-
- self.logger_name = '%s.%s.%s' % (
- self.__module__, type(self).__name__, shortname)
- self.log = log_mgr.get_logger(self.logger_name)
-
- if ip:
- self.ip = str(ip)
- else:
- if self.config.ipv6:
- # $(dig +short $M $rrtype|tail -1)
- stdout, stderr, returncode = ipautil.run(
- ['dig', '+short', self.external_hostname, 'AAAA'])
- self.ip = stdout.splitlines()[-1].strip()
- else:
- try:
- self.ip = socket.gethostbyname(self.external_hostname)
- except socket.gaierror:
- self.ip = None
-
- if not self.ip:
- raise RuntimeError('Could not determine IP address of %s' %
- self.external_hostname)
-
- self.root_password = self.config.root_password
- self.root_ssh_key_filename = self.config.root_ssh_key_filename
- self.host_key = None
- self.ssh_port = 22
-
- self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh')
-
- self.log_collectors = []
-
- def __str__(self):
- template = ('<{s.__class__.__name__} {s.hostname} ({s.role})>')
- return template.format(s=self)
-
- def __repr__(self):
- template = ('<{s.__module__}.{s.__class__.__name__} '
- '{s.hostname} ({s.role})>')
- return template.format(s=self)
-
- def add_log_collector(self, collector):
- """Register a log collector for this host"""
- self.log_collectors.append(collector)
-
- def remove_log_collector(self, collector):
- """Unregister a log collector"""
- self.log_collectors.remove(collector)
-
- @classmethod
- def from_env(cls, env, domain, hostname, role, index, domain_index):
- ip = env.get('BEAKER%s%s_IP_env%s' %
- (role.upper(), index, domain_index), None)
- external_hostname = env.get(
- 'BEAKER%s%s_env%s' % (role.upper(), index, domain_index), None)
-
- return cls._make_host(domain, hostname, role, ip, external_hostname)
@staticmethod
def _make_host(domain, hostname, role, ip, external_hostname):
@@ -120,84 +40,6 @@ class BaseHost(object):
return cls(domain, hostname, role, ip, external_hostname)
- @classmethod
- def from_dict(cls, dct, domain):
- if isinstance(dct, basestring):
- dct = {'name': dct}
- try:
- role = dct.pop('role').lower()
- except KeyError:
- role = domain.static_roles[0]
-
- hostname = dct.pop('name')
- if '.' not in hostname:
- hostname = '.'.join((hostname, domain.name))
-
- ip = dct.pop('ip', None)
- external_hostname = dct.pop('external_hostname', None)
-
- check_config_dict_empty(dct, 'host %s' % hostname)
-
- return cls._make_host(domain, hostname, role, ip, external_hostname)
-
- def to_dict(self):
- return {
- 'name': str(self.hostname),
- 'ip': self.ip,
- 'role': self.role,
- 'external_hostname': self.external_hostname,
- }
-
- @property
- def config(self):
- return self.domain.config
-
- def to_env(self, **kwargs):
- """Return environment variables specific to this host"""
- env = self.domain.to_env(**kwargs)
-
- index = self.domain.hosts.index(self) + 1
- domain_index = self.config.domains.index(self.domain) + 1
-
- role = self.role.upper()
- if self.role != 'master':
- role += str(index)
-
- env['MYHOSTNAME'] = self.hostname
- env['MYBEAKERHOSTNAME'] = self.external_hostname
- env['MYIP'] = self.ip
-
- prefix = ('' if self.role in self.domain.static_roles
- else TESTHOST_PREFIX)
- env_suffix = '_env%s' % domain_index
- env['MYROLE'] = '%s%s%s' % (prefix, role, env_suffix)
- env['MYENV'] = str(domain_index)
-
- return env
-
- @property
- def transport(self):
- try:
- return self._transport
- except AttributeError:
- cls = self.transport_class
- if cls:
- # transport_class is None in the base class and must be
- # set in subclasses.
- # Pylint reports that calling None will fail
- self._transport = cls(self) # pylint: disable=E1102
- else:
- raise NotImplementedError('transport class not available')
- return self._transport
-
- def get_file_contents(self, filename):
- """Shortcut for transport.get_file_contents"""
- return self.transport.get_file_contents(filename)
-
- def put_file_contents(self, filename, contents):
- """Shortcut for transport.put_file_contents"""
- self.transport.put_file_contents(filename, contents)
-
def ldap_connect(self):
"""Return an LDAPClient authenticated to this host as directory manager
"""
@@ -208,80 +50,20 @@ class BaseHost(object):
ldap.do_simple_bind(binddn, self.config.dirman_password)
return ldap
- def collect_log(self, filename):
- for collector in self.log_collectors:
- collector(self, filename)
-
- def run_command(self, argv, set_env=True, stdin_text=None,
- log_stdout=True, raiseonerr=True,
- cwd=None):
- """Run the given command on this host
-
- Returns a Shell instance. The command will have already run in the
- shell when this method returns, so its stdout_text, stderr_text, and
- returncode attributes will be available.
-
- :param argv: Command to run, as either a Popen-style list, or a string
- containing a shell script
- :param set_env: If true, env.sh exporting configuration variables will
- be sourced before running the command.
- :param stdin_text: If given, will be written to the command's stdin
- :param log_stdout: If false, standard output will not be logged
- (but will still be available as cmd.stdout_text)
- :param raiseonerr: If true, an exception will be raised if the command
- does not exit with return code 0
- :param cwd: The working directory for the command
- """
- raise NotImplementedError()
-
-
-class Host(BaseHost):
- """A Unix host"""
- transport_class = transport.SSHTransport
-
- def run_command(self, argv, set_env=True, stdin_text=None,
- log_stdout=True, raiseonerr=True,
- cwd=None):
- # This will give us a Bash shell
- command = self.transport.start_shell(argv, log_stdout=log_stdout)
-
- # Set working directory
- if cwd is None:
- cwd = self.config.test_dir
- command.stdin.write('cd %s\n' % ipautil.shell_quote(cwd))
-
- # Set the environment
- if set_env:
- command.stdin.write('. %s\n' %
- ipautil.shell_quote(self.env_sh_path))
- command.stdin.write('set -e\n')
-
- if isinstance(argv, basestring):
- # Run a shell command given as a string
- command.stdin.write('(')
- command.stdin.write(argv)
- command.stdin.write(')')
- else:
- # Run a command given as a popen-style list (no shell expansion)
- for arg in argv:
- command.stdin.write(ipautil.shell_quote(arg))
- command.stdin.write(' ')
-
- command.stdin.write(';exit\n')
- if stdin_text:
- command.stdin.write(stdin_text)
- command.stdin.flush()
+ @classmethod
+ def from_env(cls, env, domain, hostname, role, index, domain_index):
+ from ipatests.test_integration.env_config import host_from_env
+ return host_from_env(env, domain, hostname, role, index, domain_index)
- command.wait(raiseonerr=raiseonerr)
- return command
+ def to_env(self, **kwargs):
+ from ipatests.test_integration.env_config import host_to_env
+ return host_to_env(self, **kwargs)
-class WinHost(BaseHost):
+class WinHost(pytest_multihost.host.WinHost):
"""
Representation of a remote Windows host.
This serves as a sketch class once we move from manual preparation of
Active Directory to the automated setup.
"""
-
- pass