diff options
Diffstat (limited to 'source/stf/comfychair.py')
-rw-r--r-- | source/stf/comfychair.py | 445 |
1 files changed, 445 insertions, 0 deletions
diff --git a/source/stf/comfychair.py b/source/stf/comfychair.py new file mode 100644 index 00000000000..522f9bedeba --- /dev/null +++ b/source/stf/comfychair.py @@ -0,0 +1,445 @@ +#! /usr/bin/env python + +# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org> +# Copyright (C) 2003 by Tim Potter <tpot@samba.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +"""comfychair: a Python-based instrument of software torture. + +Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org> +Copyright (C) 2003 by Tim Potter <tpot@samba.org> + +This is a test framework designed for testing programs written in +Python, or (through a fork/exec interface) any other language. + +For more information, see the file README.comfychair. + +To run a test suite based on ComfyChair, just run it as a program. +""" + +import sys, re + + +class TestCase: + """A base class for tests. This class defines required functions which + can optionally be overridden by subclasses. It also provides some + utility functions for""" + + def __init__(self): + self.test_log = "" + self.background_pids = [] + self._cleanups = [] + self._enter_rundir() + self._save_environment() + self.add_cleanup(self.teardown) + + + # -------------------------------------------------- + # Save and restore directory + def _enter_rundir(self): + import os + self.basedir = os.getcwd() + self.add_cleanup(self._restore_directory) + self.rundir = os.path.join(self.basedir, + 'testtmp', + self.__class__.__name__) + self.tmpdir = os.path.join(self.rundir, 'tmp') + os.system("rm -fr %s" % self.rundir) + os.makedirs(self.tmpdir) + os.system("mkdir -p %s" % self.rundir) + os.chdir(self.rundir) + + def _restore_directory(self): + import os + os.chdir(self.basedir) + + # -------------------------------------------------- + # Save and restore environment + def _save_environment(self): + import os + self._saved_environ = os.environ.copy() + self.add_cleanup(self._restore_environment) + + def _restore_environment(self): + import os + os.environ.clear() + os.environ.update(self._saved_environ) + + + def setup(self): + """Set up test fixture.""" + pass + + def teardown(self): + """Tear down test fixture.""" + pass + + def runtest(self): + """Run the test.""" + pass + + + def add_cleanup(self, c): + """Queue a cleanup to be run when the test is complete.""" + self._cleanups.append(c) + + + def fail(self, reason = ""): + """Say the test failed.""" + raise AssertionError(reason) + + + ############################################################# + # Requisition methods + + def require(self, predicate, message): + """Check a predicate for running this test. + +If the predicate value is not true, the test is skipped with a message explaining +why.""" + if not predicate: + raise NotRunError, message + + def require_root(self): + """Skip this test unless run by root.""" + import os + self.require(os.getuid() == 0, + "must be root to run this test") + + ############################################################# + # Assertion methods + + def assert_(self, expr, reason = ""): + if not expr: + raise AssertionError(reason) + + def assert_equal(self, a, b): + if not a == b: + raise AssertionError("assertEquals failed: %s" % `(a, b)`) + + def assert_notequal(self, a, b): + if a == b: + raise AssertionError("assertNotEqual failed: %s" % `(a, b)`) + + def assert_re_match(self, pattern, s): + """Assert that a string matches a particular pattern + + Inputs: + pattern string: regular expression + s string: to be matched + + Raises: + AssertionError if not matched + """ + if not re.match(pattern, s): + raise AssertionError("string does not match regexp\n" + " string: %s\n" + " re: %s" % (`s`, `pattern`)) + + def assert_re_search(self, pattern, s): + """Assert that a string *contains* a particular pattern + + Inputs: + pattern string: regular expression + s string: to be searched + + Raises: + AssertionError if not matched + """ + if not re.search(pattern, s): + raise AssertionError("string does not contain regexp\n" + " string: %s\n" + " re: %s" % (`s`, `pattern`)) + + + def assert_no_file(self, filename): + import os.path + assert not os.path.exists(filename), ("file exists but should not: %s" % filename) + + + ############################################################# + # Methods for running programs + + def runcmd_background(self, cmd): + import os + self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n" + pid = os.fork() + if pid == 0: + # child + try: + os.execvp("/bin/sh", ["/bin/sh", "-c", cmd]) + finally: + os._exit(127) + self.test_log = self.test_log + "pid: %d\n" % pid + return pid + + + def runcmd(self, cmd, expectedResult = 0): + """Run a command, fail if the command returns an unexpected exit + code. Return the output produced.""" + rc, output, stderr = self.runcmd_unchecked(cmd) + if rc != expectedResult: + raise AssertionError("""command returned %d; expected %s: \"%s\" +stdout: +%s +stderr: +%s""" % (rc, expectedResult, cmd, output, stderr)) + + return output, stderr + + + def run_captured(self, cmd): + """Run a command, capturing stdout and stderr. + + Based in part on popen2.py + + Returns (waitstatus, stdout, stderr).""" + import os, types + pid = os.fork() + if pid == 0: + # child + try: + pid = os.getpid() + openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC + + outfd = os.open('%d.out' % pid, openmode, 0666) + os.dup2(outfd, 1) + os.close(outfd) + + errfd = os.open('%d.err' % pid, openmode, 0666) + os.dup2(errfd, 2) + os.close(errfd) + + if isinstance(cmd, types.StringType): + cmd = ['/bin/sh', '-c', cmd] + + os.execvp(cmd[0], cmd) + finally: + os._exit(127) + else: + # parent + exited_pid, waitstatus = os.waitpid(pid, 0) + stdout = open('%d.out' % pid).read() + stderr = open('%d.err' % pid).read() + return waitstatus, stdout, stderr + + + def runcmd_unchecked(self, cmd, skip_on_noexec = 0): + """Invoke a command; return (exitcode, stdout, stderr)""" + import os + waitstatus, stdout, stderr = self.run_captured(cmd) + assert not os.WIFSIGNALED(waitstatus), \ + ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus))) + rc = os.WEXITSTATUS(waitstatus) + self.test_log = self.test_log + ("""Run command: %s +Wait status: %#x (exit code %d, signal %d) +stdout: +%s +stderr: +%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus), + stdout, stderr)) + if skip_on_noexec and rc == 127: + # Either we could not execute the command or the command + # returned exit code 127. According to system(3) we can't + # tell the difference. + raise NotRunError, "could not execute %s" % `cmd` + return rc, stdout, stderr + + + def explain_failure(self, exc_info = None): + print "test_log:" + print self.test_log + + + def log(self, msg): + """Log a message to the test log. This message is displayed if + the test fails, or when the runtests function is invoked with + the verbose option.""" + self.test_log = self.test_log + msg + "\n" + + +class NotRunError(Exception): + """Raised if a test must be skipped because of missing resources""" + def __init__(self, value = None): + self.value = value + + +def _report_error(case, debugger): + """Ask the test case to explain failure, and optionally run a debugger + + Input: + case TestCase instance + debugger if true, a debugger function to be applied to the traceback +""" + import sys + ex = sys.exc_info() + print "-----------------------------------------------------------------" + if ex: + import traceback + traceback.print_exc(file=sys.stdout) + case.explain_failure() + print "-----------------------------------------------------------------" + + if debugger: + tb = ex[2] + debugger(tb) + + +def runtests(test_list, verbose = 0, debugger = None): + """Run a series of tests. + + Inputs: + test_list sequence of TestCase classes + verbose print more information as testing proceeds + debugger debugger object to be applied to errors + + Returns: + unix return code: 0 for success, 1 for failures, 2 for test failure + """ + import traceback + ret = 0 + for test_class in test_list: + print "%-30s" % _test_name(test_class), + # flush now so that long running tests are easier to follow + sys.stdout.flush() + + obj = None + try: + try: # run test and show result + obj = test_class() + obj.setup() + obj.runtest() + print "OK" + except KeyboardInterrupt: + print "INTERRUPT" + _report_error(obj, debugger) + ret = 2 + break + except NotRunError, msg: + print "NOTRUN, %s" % msg.value + except: + print "FAIL" + _report_error(obj, debugger) + ret = 1 + finally: + while obj and obj._cleanups: + try: + apply(obj._cleanups.pop()) + except KeyboardInterrupt: + print "interrupted during teardown" + _report_error(obj, debugger) + ret = 2 + break + except: + print "error during teardown" + _report_error(obj, debugger) + ret = 1 + # Display log file if we're verbose + if ret == 0 and verbose: + obj.explain_failure() + + return ret + + +def _test_name(test_class): + """Return a human-readable name for a test class. + """ + try: + return test_class.__name__ + except: + return `test_class` + + +def print_help(): + """Help for people running tests""" + import sys + print """%s: software test suite based on ComfyChair + +usage: + To run all tests, just run this program. To run particular tests, + list them on the command line. + +options: + --help show usage message + --list list available tests + --verbose, -v show more information while running tests + --post-mortem, -p enter Python debugger on error +""" % sys.argv[0] + + +def print_list(test_list): + """Show list of available tests""" + for test_class in test_list: + print " %s" % _test_name(test_class) + + +def main(tests, extra_tests=[]): + """Main entry point for test suites based on ComfyChair. + + inputs: + tests Sequence of TestCase subclasses to be run by default. + extra_tests Sequence of TestCase subclasses that are available but + not run by default. + +Test suites should contain this boilerplate: + + if __name__ == '__main__': + comfychair.main(tests) + +This function handles standard options such as --help and --list, and +by default runs all tests in the suggested order. + +Calls sys.exit() on completion. +""" + from sys import argv + import getopt, sys + + opt_verbose = 0 + debugger = None + + opts, args = getopt.getopt(argv[1:], 'pv', + ['help', 'list', 'verbose', 'post-mortem']) + for opt, opt_arg in opts: + if opt == '--help': + print_help() + return + elif opt == '--list': + print_list(tests + extra_tests) + return + elif opt == '--verbose' or opt == '-v': + opt_verbose = 1 + elif opt == '--post-mortem' or opt == '-p': + import pdb + debugger = pdb.post_mortem + + if args: + all_tests = tests + extra_tests + by_name = {} + for t in all_tests: + by_name[_test_name(t)] = t + which_tests = [] + for name in args: + which_tests.append(by_name[name]) + else: + which_tests = tests + + sys.exit(runtests(which_tests, verbose=opt_verbose, + debugger=debugger)) + + +if __name__ == '__main__': + print __doc__ |