summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Heimes <cheimes@redhat.com>2017-03-20 11:40:17 +0100
committerTomas Krizek <tkrizek@redhat.com>2017-03-22 13:42:04 +0100
commit8aadd55c93a627e88e007d2df864a5fb72fba0a2 (patch)
treec8bcb18d1d7e2ad4cd71687e63bee0c0aab9ebf3
parent8867412adc0ffd0cacf555a5c3693e04074fed5b (diff)
downloadfreeipa-8aadd55c93a627e88e007d2df864a5fb72fba0a2.tar.gz
freeipa-8aadd55c93a627e88e007d2df864a5fb72fba0a2.tar.xz
freeipa-8aadd55c93a627e88e007d2df864a5fb72fba0a2.zip
Move function run_repeatedly to tasks module
https://pagure.io/freeipa/issue/6798 Signed-off-by: Christian Heimes <cheimes@redhat.com> Reviewed-By: Milan Kubik <mkubik@redhat.com>
-rw-r--r--ipatests/pytest_plugins/integration/tasks.py55
-rw-r--r--ipatests/test_integration/test_trust.py4
-rw-r--r--ipatests/test_integration/util.py40
3 files changed, 49 insertions, 50 deletions
diff --git a/ipatests/pytest_plugins/integration/tasks.py b/ipatests/pytest_plugins/integration/tasks.py
index d95354924..3e06c85b6 100644
--- a/ipatests/pytest_plugins/integration/tasks.py
+++ b/ipatests/pytest_plugins/integration/tasks.py
@@ -36,7 +36,6 @@ from ipapython import ipautil
from ipaplatform.paths import paths
from ipapython.dn import DN
from ipapython.ipa_log_manager import log_mgr
-from ipatests.test_integration import util
from ipatests.pytest_plugins.integration.env_config import env_to_script
from ipatests.pytest_plugins.integration.host import Host
from ipalib import errors
@@ -450,7 +449,7 @@ def install_adtrust(host):
dig_output = '0 100 389 %s.' % host.hostname
dig_test = lambda x: re.search(re.escape(dig_output), x)
- util.run_repeatedly(host, dig_command, test=dig_test)
+ run_repeatedly(host, dig_command, test=dig_test)
def configure_dns_for_trust(master, ad):
@@ -481,12 +480,12 @@ def establish_trust_with_ad(master, ad_domain, extra_args=()):
master.run_command(['klist'])
master.run_command(['smbcontrol', 'all', 'debug', '100'])
- util.run_repeatedly(master,
- ['ipa', 'trust-add',
- '--type', 'ad', ad_domain,
- '--admin', 'Administrator',
- '--password'] + list(extra_args),
- stdin_text=master.config.ad_admin_password)
+ run_repeatedly(master,
+ ['ipa', 'trust-add',
+ '--type', 'ad', ad_domain,
+ '--admin', 'Administrator',
+ '--password'] + list(extra_args),
+ stdin_text=master.config.ad_admin_password)
master.run_command(['smbcontrol', 'all', 'debug', '1'])
clear_sssd_cache(master)
master.run_command(['systemctl', 'restart', 'krb5kdc.service'])
@@ -1246,3 +1245,43 @@ def restart_named(*args):
for host in args:
host.run_command(["systemctl", "restart", "named-pkcs11.service"])
time.sleep(20) # give a time to named to be ready (zone loading)
+
+
+def run_repeatedly(host, command, assert_zero_rc=True, test=None,
+ timeout=30, **kwargs):
+ """
+ Runs command on host repeatedly until it's finished successfully (returns
+ 0 exit code and its stdout passes the test function).
+
+ Returns True if the command was executed succesfully, False otherwise.
+
+ This method accepts additional kwargs and passes these arguments
+ to the actual run_command method.
+ """
+
+ time_waited = 0
+ time_step = 2
+
+ # Check that the test is a function
+ if test:
+ assert callable(test)
+
+ while(time_waited <= timeout):
+ result = host.run_command(command, raiseonerr=False, **kwargs)
+
+ return_code_ok = not assert_zero_rc or (result.returncode == 0)
+ test_ok = not test or test(result.stdout_text)
+
+ if return_code_ok and test_ok:
+ # Command successful
+ return True
+ else:
+ # Command not successful
+ time.sleep(time_step)
+ time_waited += time_step
+
+ raise AssertionError("Command: {cmd} repeatedly failed {times} times, "
+ "exceeding the timeout of {timeout} seconds."
+ .format(cmd=' '.join(command),
+ times=timeout // time_step,
+ timeout=timeout))
diff --git a/ipatests/test_integration/test_trust.py b/ipatests/test_integration/test_trust.py
index 94fde4d94..6a5dda8aa 100644
--- a/ipatests/test_integration/test_trust.py
+++ b/ipatests/test_integration/test_trust.py
@@ -80,8 +80,8 @@ class ADTrustBase(IntegrationTest):
% dict(idauth=_sid_identifier_authority)
stdout_re = re.escape(' ipaNTSecurityIdentifier: ') + sid_regex
- util.run_repeatedly(cls.master, command,
- test=lambda x: re.search(stdout_re, x))
+ tasks.run_repeatedly(cls.master, command,
+ test=lambda x: re.search(stdout_re, x))
@classmethod
def configure_dns_and_time(cls):
diff --git a/ipatests/test_integration/util.py b/ipatests/test_integration/util.py
index 179f6727e..b42111ec1 100644
--- a/ipatests/test_integration/util.py
+++ b/ipatests/test_integration/util.py
@@ -17,51 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import time
import re
from ipaplatform.paths import paths
from ipalib.constants import DEFAULT_CONFIG
-def run_repeatedly(host, command, assert_zero_rc=True, test=None,
- timeout=30, **kwargs):
- """
- Runs command on host repeatedly until it's finished successfully (returns
- 0 exit code and its stdout passes the test function).
-
- Returns True if the command was executed succesfully, False otherwise.
-
- This method accepts additional kwargs and passes these arguments
- to the actual run_command method.
- """
-
- time_waited = 0
- time_step = 2
-
- # Check that the test is a function
- if test:
- assert callable(test)
-
- while(time_waited <= timeout):
- result = host.run_command(command, raiseonerr=False, **kwargs)
-
- return_code_ok = not assert_zero_rc or (result.returncode == 0)
- test_ok = not test or test(result.stdout_text)
-
- if return_code_ok and test_ok:
- # Command successful
- return True
- else:
- # Command not successful
- time.sleep(time_step)
- time_waited += time_step
-
- raise AssertionError("Command: {cmd} repeatedly failed {times} times, "
- "exceeding the timeout of {timeout} seconds."
- .format(cmd=' '.join(command),
- times=timeout // time_step,
- timeout=timeout))
-
def get_host_ip_with_hostmask(host):
"""