summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPetr Viktorin <pviktori@redhat.com>2013-05-30 14:25:01 +0200
committerPetr Viktorin <pviktori@redhat.com>2013-07-15 15:49:07 +0200
commit00f133458b72239000a39786d9a36ea2df7f2d8e (patch)
tree1979202a891de1c9ab8a2f8dca1c30992558d9ef
parent353f3c62c3dc95db471a2b23fcd90d6071542362 (diff)
downloadfreeipa-00f133458b72239000a39786d9a36ea2df7f2d8e.zip
freeipa-00f133458b72239000a39786d9a36ea2df7f2d8e.tar.gz
freeipa-00f133458b72239000a39786d9a36ea2df7f2d8e.tar.xz
Introduce a class for remote commands
Introduce a class inspired by subprocess.Popen that handles running a command on a remote machine and handling its output. To separate stdout & stderr streams of a remote command, they need to be read in parallel, so that one of them doesn't stall the runner when its buffer fills up. Accomplish this by using a thread for each stream. Part of the work for: https://fedorahosted.org/freeipa/ticket/3621
-rw-r--r--ipatests/test_integration/host.py174
-rw-r--r--ipatests/test_integration/test_simple_replication.py4
2 files changed, 132 insertions, 46 deletions
diff --git a/ipatests/test_integration/host.py b/ipatests/test_integration/host.py
index a663a99..da5546d 100644
--- a/ipatests/test_integration/host.py
+++ b/ipatests/test_integration/host.py
@@ -20,21 +20,103 @@
"""Host class for integration testing"""
import os
-import collections
import socket
+import threading
+import subprocess
import paramiko
from ipapython import ipautil
from ipapython.ipa_log_manager import log_mgr
-RunResult = collections.namedtuple('RunResult', 'output exit_code')
+
+class RemoteCommand(object):
+ """A Popen-style object representing a remote command
+
+ Unlike subprocess.Popen, this does not run the given command; instead
+ it only starts a shell. The command must be written to stdin manually.
+
+ The standard error and output are handled by this class. They're not
+ available for file-like reading. They are logged by default.
+ To make sure reading doesn't stall after one buffer fills up, they are read
+ in parallel using threads.
+
+ After calling wait(), stdout_text and stderr_text attributes will be
+ strings containing the output, and returncode will contain the
+ exit code.
+
+ :param host: The Host on which the command is run
+ :param argv: The command that will be run (for logging only)
+ :param index: An identification number added to the logs
+ :param log_stdout: If false, stdout will not be logged
+ """
+ def __init__(self, host, argv, index, log_stdout=True):
+ self.returncode = None
+ self.host = host
+ self.argv = argv
+ self._stdout_lines = []
+ self._stderr_lines = []
+ self.running_threads = set()
+
+ self.logger_name = '%s.cmd%s' % (self.host.logger_name, index)
+ self.log = log_mgr.get_logger(self.logger_name)
+
+ self.log.info('RUN %s', argv)
+
+ self._ssh = host.transport.open_channel('session')
+
+ self._ssh.invoke_shell()
+ stdin = self.stdin = self._ssh.makefile('wb')
+ stdout = self._ssh.makefile('rb')
+ stderr = self._ssh.makefile_stderr('rb')
+
+ self._start_pipe_thread(self._stdout_lines, stdout, 'out', log_stdout)
+ self._start_pipe_thread(self._stderr_lines, stderr, 'err', True)
+
+ self._done = False
+
+ def wait(self, raiseonerr=True):
+ """Wait for the remote process to exit
+
+ Raises an excption if the exit code is not 0.
+ """
+ if self._done:
+ return self.returncode
+
+ self._ssh.shutdown_write()
+ while self.running_threads:
+ self.running_threads.pop().join()
+
+ self.stdout_text = ''.join(self._stdout_lines)
+ self.stderr_text = ''.join(self._stderr_lines)
+ self.returncode = self._ssh.recv_exit_status()
+ self._ssh.close()
+
+ self._done = True
+
+ self.log.info('Exit code: %s', self.returncode)
+ if raiseonerr and self.returncode:
+ raise subprocess.CalledProcessError(self.returncode, self.argv)
+ return self.returncode
+
+ def _start_pipe_thread(self, result_list, stream, name, do_log=True):
+ log = log_mgr.get_logger('%s.%s' % (self.logger_name, name))
+
+ def read_stream():
+ for line in stream:
+ if do_log:
+ log.info(line.rstrip('\n'))
+ result_list.append(line)
+
+ thread = threading.Thread(target=read_stream)
+ self.running_threads.add(thread)
+ thread.start()
+ return thread
class Host(object):
- """Configuration for an IPA host"""
+ """Representation of a remote IPA host"""
def __init__(self, domain, hostname, role, index):
- self.log = log_mgr.get_logger(self)
self.domain = domain
self.role = role
self.index = index
@@ -43,6 +125,10 @@ class Host(object):
self.hostname = shortname + '.' + self.domain.name
self.external_hostname = hostname
+ self.logger_name = '%s.%s.%s' % (
+ self.__module__, type(self).__name__, shortname)
+ self.log = log_mgr.get_logger(self.logger_name)
+
if self.config.ipv6:
# $(dig +short $M $rrtype|tail -1)
stdout, stderr, returncode = ipautil.run(
@@ -64,8 +150,7 @@ class Host(object):
self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh')
- self.log = log_mgr.get_logger('%s.%s.%s' % (
- self.__module__, type(self).__name__, self.hostname))
+ self._command_index = 0
def __repr__(self):
template = ('<{s.__module__}.{s.__class__.__name__} '
@@ -99,47 +184,46 @@ class Host(object):
return env
def run_command(self, argv, set_env=True, stdin_text=None,
- ignore_stdout=False):
- assert argv
- self.log.info('RUN %s', argv)
- ssh = self.transport.open_channel('session')
- try:
- ssh.invoke_shell()
- ssh.set_combine_stderr(True)
- stdin = ssh.makefile('wb')
- stdout = ssh.makefile('rb')
+ log_stdout=True, raiseonerr=True):
+ """Run the given command on this host
+
+ Returns a RemoteCommand instance. The command will have already run
+ when this method returns, so its stdout_text, stderr_text, and
+ returncode attributes will be available.
- if set_env:
- stdin.write('. %s\n' % self.env_sh_path)
- stdin.write('set -ex\n')
+ :param argv: Command to run, as either a Popen-style list, or a string
+ containing a shell script
+ :param set_env: If true, env.sh exporting configuration variables will
+ be sourced before running the command.
+ :param stdin_text: If given, will be written to the command's stdin
+ :param log_stdout: If false, standard output will not be logged
+ (but will still be available as cmd.stdout_text)
+ :param raiseonerr: If true, an exception will be raised if the command
+ does not exit with return code 0
+ """
+ command = RemoteCommand(self, argv, index=self._command_index,
+ log_stdout=log_stdout)
+ self._command_index += 1
+ if set_env:
+ command.stdin.write('. %s\n' % self.env_sh_path)
+ command.stdin.write('set -e\n')
+
+ if isinstance(argv, basestring):
+ command.stdin.write('(')
+ command.stdin.write(argv)
+ command.stdin.write(')')
+ else:
for arg in argv:
- stdin.write(ipautil.shell_quote(arg))
- stdin.write(' ')
- if stdin_text:
- stdin_filename = os.path.join(self.config.test_dir, 'stdin')
- with self.sftp.open(stdin_filename, 'w') as f:
- f.write(stdin_text)
- stdin.write('<')
- stdin.write(stdin_filename)
- else:
- stdin.write('< /dev/null')
- if ignore_stdout:
- stdin.write('> /dev/null')
- stdin.write('\n')
- ssh.shutdown_write()
- output = []
- for line in stdout:
- output.append(line)
- self.log.info(' %s', line.strip('\n'))
- exit_status = ssh.recv_exit_status()
- self.log.info(' -> Exit code %s', exit_status)
- if exit_status:
- raise RuntimeError('Command %s exited with error code %s' % (
- argv[0], exit_status))
- return RunResult(''.join(output), exit_status)
- finally:
- ssh.close()
+ command.stdin.write(ipautil.shell_quote(arg))
+ command.stdin.write(' ')
+ command.stdin.write(';exit\n')
+ if stdin_text:
+ command.stdin.write(stdin_text)
+ command.stdin.flush()
+
+ command.wait(raiseonerr=raiseonerr)
+ return command
@property
def transport(self):
@@ -173,11 +257,13 @@ class Host(object):
self.sftp.chdir(path)
def get_file_contents(self, filename):
+ """Read the named remote file and return the contents as a string"""
self.log.info('READ %s', filename)
with self.sftp.open(filename) as f:
return f.read()
def put_file_contents(self, filename, contents):
+ """Write the given string to the named remote file"""
self.log.info('WRITE %s', filename)
with self.sftp.open(filename, 'w') as f:
return f.write(contents)
diff --git a/ipatests/test_integration/test_simple_replication.py b/ipatests/test_integration/test_simple_replication.py
index 80c0c42..a5d1bb2 100644
--- a/ipatests/test_integration/test_simple_replication.py
+++ b/ipatests/test_integration/test_simple_replication.py
@@ -36,7 +36,7 @@ class TestSimpleReplication(IntegrationTest):
time.sleep(5)
result = self.replicas[0].run_command(['ipa', 'user-show', login])
- assert 'User login: %s' % login in result.output
+ assert 'User login: %s' % login in result.stdout_text
def test_user_replication_to_master(self):
login = 'testuser2'
@@ -48,4 +48,4 @@ class TestSimpleReplication(IntegrationTest):
time.sleep(5)
result = self.master.run_command(['ipa', 'user-show', login])
- assert 'User login: %s' % login in result.output
+ assert 'User login: %s' % login in result.stdout_text