From 00f133458b72239000a39786d9a36ea2df7f2d8e Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 30 May 2013 14:25:01 +0200 Subject: Introduce a class for remote commands Introduce a class inspired by subprocess.Popen that handles running a command on a remote machine and handling its output. To separate stdout & stderr streams of a remote command, they need to be read in parallel, so that one of them doesn't stall the runner when its buffer fills up. Accomplish this by using a thread for each stream. Part of the work for: https://fedorahosted.org/freeipa/ticket/3621 --- ipatests/test_integration/host.py | 174 +++++++++++++++------ .../test_integration/test_simple_replication.py | 4 +- 2 files changed, 132 insertions(+), 46 deletions(-) diff --git a/ipatests/test_integration/host.py b/ipatests/test_integration/host.py index a663a9906..da5546de9 100644 --- a/ipatests/test_integration/host.py +++ b/ipatests/test_integration/host.py @@ -20,21 +20,103 @@ """Host class for integration testing""" import os -import collections import socket +import threading +import subprocess import paramiko from ipapython import ipautil from ipapython.ipa_log_manager import log_mgr -RunResult = collections.namedtuple('RunResult', 'output exit_code') + +class RemoteCommand(object): + """A Popen-style object representing a remote command + + Unlike subprocess.Popen, this does not run the given command; instead + it only starts a shell. The command must be written to stdin manually. + + The standard error and output are handled by this class. They're not + available for file-like reading. They are logged by default. + To make sure reading doesn't stall after one buffer fills up, they are read + in parallel using threads. + + After calling wait(), stdout_text and stderr_text attributes will be + strings containing the output, and returncode will contain the + exit code. + + :param host: The Host on which the command is run + :param argv: The command that will be run (for logging only) + :param index: An identification number added to the logs + :param log_stdout: If false, stdout will not be logged + """ + def __init__(self, host, argv, index, log_stdout=True): + self.returncode = None + self.host = host + self.argv = argv + self._stdout_lines = [] + self._stderr_lines = [] + self.running_threads = set() + + self.logger_name = '%s.cmd%s' % (self.host.logger_name, index) + self.log = log_mgr.get_logger(self.logger_name) + + self.log.info('RUN %s', argv) + + self._ssh = host.transport.open_channel('session') + + self._ssh.invoke_shell() + stdin = self.stdin = self._ssh.makefile('wb') + stdout = self._ssh.makefile('rb') + stderr = self._ssh.makefile_stderr('rb') + + self._start_pipe_thread(self._stdout_lines, stdout, 'out', log_stdout) + self._start_pipe_thread(self._stderr_lines, stderr, 'err', True) + + self._done = False + + def wait(self, raiseonerr=True): + """Wait for the remote process to exit + + Raises an excption if the exit code is not 0. + """ + if self._done: + return self.returncode + + self._ssh.shutdown_write() + while self.running_threads: + self.running_threads.pop().join() + + self.stdout_text = ''.join(self._stdout_lines) + self.stderr_text = ''.join(self._stderr_lines) + self.returncode = self._ssh.recv_exit_status() + self._ssh.close() + + self._done = True + + self.log.info('Exit code: %s', self.returncode) + if raiseonerr and self.returncode: + raise subprocess.CalledProcessError(self.returncode, self.argv) + return self.returncode + + def _start_pipe_thread(self, result_list, stream, name, do_log=True): + log = log_mgr.get_logger('%s.%s' % (self.logger_name, name)) + + def read_stream(): + for line in stream: + if do_log: + log.info(line.rstrip('\n')) + result_list.append(line) + + thread = threading.Thread(target=read_stream) + self.running_threads.add(thread) + thread.start() + return thread class Host(object): - """Configuration for an IPA host""" + """Representation of a remote IPA host""" def __init__(self, domain, hostname, role, index): - self.log = log_mgr.get_logger(self) self.domain = domain self.role = role self.index = index @@ -43,6 +125,10 @@ class Host(object): self.hostname = shortname + '.' + self.domain.name self.external_hostname = hostname + self.logger_name = '%s.%s.%s' % ( + self.__module__, type(self).__name__, shortname) + self.log = log_mgr.get_logger(self.logger_name) + if self.config.ipv6: # $(dig +short $M $rrtype|tail -1) stdout, stderr, returncode = ipautil.run( @@ -64,8 +150,7 @@ class Host(object): self.env_sh_path = os.path.join(domain.config.test_dir, 'env.sh') - self.log = log_mgr.get_logger('%s.%s.%s' % ( - self.__module__, type(self).__name__, self.hostname)) + self._command_index = 0 def __repr__(self): template = ('<{s.__module__}.{s.__class__.__name__} ' @@ -99,47 +184,46 @@ class Host(object): return env def run_command(self, argv, set_env=True, stdin_text=None, - ignore_stdout=False): - assert argv - self.log.info('RUN %s', argv) - ssh = self.transport.open_channel('session') - try: - ssh.invoke_shell() - ssh.set_combine_stderr(True) - stdin = ssh.makefile('wb') - stdout = ssh.makefile('rb') + log_stdout=True, raiseonerr=True): + """Run the given command on this host + + Returns a RemoteCommand instance. The command will have already run + when this method returns, so its stdout_text, stderr_text, and + returncode attributes will be available. - if set_env: - stdin.write('. %s\n' % self.env_sh_path) - stdin.write('set -ex\n') + :param argv: Command to run, as either a Popen-style list, or a string + containing a shell script + :param set_env: If true, env.sh exporting configuration variables will + be sourced before running the command. + :param stdin_text: If given, will be written to the command's stdin + :param log_stdout: If false, standard output will not be logged + (but will still be available as cmd.stdout_text) + :param raiseonerr: If true, an exception will be raised if the command + does not exit with return code 0 + """ + command = RemoteCommand(self, argv, index=self._command_index, + log_stdout=log_stdout) + self._command_index += 1 + if set_env: + command.stdin.write('. %s\n' % self.env_sh_path) + command.stdin.write('set -e\n') + + if isinstance(argv, basestring): + command.stdin.write('(') + command.stdin.write(argv) + command.stdin.write(')') + else: for arg in argv: - stdin.write(ipautil.shell_quote(arg)) - stdin.write(' ') - if stdin_text: - stdin_filename = os.path.join(self.config.test_dir, 'stdin') - with self.sftp.open(stdin_filename, 'w') as f: - f.write(stdin_text) - stdin.write('<') - stdin.write(stdin_filename) - else: - stdin.write('< /dev/null') - if ignore_stdout: - stdin.write('> /dev/null') - stdin.write('\n') - ssh.shutdown_write() - output = [] - for line in stdout: - output.append(line) - self.log.info(' %s', line.strip('\n')) - exit_status = ssh.recv_exit_status() - self.log.info(' -> Exit code %s', exit_status) - if exit_status: - raise RuntimeError('Command %s exited with error code %s' % ( - argv[0], exit_status)) - return RunResult(''.join(output), exit_status) - finally: - ssh.close() + command.stdin.write(ipautil.shell_quote(arg)) + command.stdin.write(' ') + command.stdin.write(';exit\n') + if stdin_text: + command.stdin.write(stdin_text) + command.stdin.flush() + + command.wait(raiseonerr=raiseonerr) + return command @property def transport(self): @@ -173,11 +257,13 @@ class Host(object): self.sftp.chdir(path) def get_file_contents(self, filename): + """Read the named remote file and return the contents as a string""" self.log.info('READ %s', filename) with self.sftp.open(filename) as f: return f.read() def put_file_contents(self, filename, contents): + """Write the given string to the named remote file""" self.log.info('WRITE %s', filename) with self.sftp.open(filename, 'w') as f: return f.write(contents) diff --git a/ipatests/test_integration/test_simple_replication.py b/ipatests/test_integration/test_simple_replication.py index 80c0c4227..a5d1bb208 100644 --- a/ipatests/test_integration/test_simple_replication.py +++ b/ipatests/test_integration/test_simple_replication.py @@ -36,7 +36,7 @@ class TestSimpleReplication(IntegrationTest): time.sleep(5) result = self.replicas[0].run_command(['ipa', 'user-show', login]) - assert 'User login: %s' % login in result.output + assert 'User login: %s' % login in result.stdout_text def test_user_replication_to_master(self): login = 'testuser2' @@ -48,4 +48,4 @@ class TestSimpleReplication(IntegrationTest): time.sleep(5) result = self.master.run_command(['ipa', 'user-show', login]) - assert 'User login: %s' % login in result.output + assert 'User login: %s' % login in result.stdout_text -- cgit