diff options
Diffstat (limited to 'proxy/tests/runtests.py')
-rwxr-xr-x | proxy/tests/runtests.py | 130 |
1 files changed, 104 insertions, 26 deletions
diff --git a/proxy/tests/runtests.py b/proxy/tests/runtests.py index 0f1b85b..9c8c206 100755 --- a/proxy/tests/runtests.py +++ b/proxy/tests/runtests.py @@ -12,6 +12,44 @@ import subprocess import sys import time +try: + from colorama import Fore, Style + + def format_key(status, key): + if status == "success": + color = Fore.GREEN + elif status == "failure": + color = Fore.RED + else: + color = Style.DIM + Fore.YELLOW + return "[" + color + key + Style.RESET_ALL + "]" + +except ImportError: + + def format_key(status, key): + if status == "success": + color = " OO " + elif status == "failure": + color = " XX " + else: + color = " -- " + return "[" + color + key + color + "]" + +def print_keyed(status, key, text, io): + print("%s %s" % (format_key(status, key), text), file=io) + + +def print_success(key, text, io=sys.stderr): + print_keyed("success", key, text, io) + + +def print_failure(key, text, io=sys.stderr): + print_keyed("failure", key, text, io) + + +def print_warning(key, text, io=sys.stderr): + print_keyed("other", key, text, io) + def parse_args(): parser = argparse.ArgumentParser(description='GSS-Proxy Tests Environment') @@ -167,7 +205,8 @@ USR_KTNAME = "user.gssproxy.keytab" USR_CCACHE = "krb5ccache_usr" SVC_KTNAME = "kdc.gssproxy.keytab" KEY_TYPE = "aes256-cts-hmac-sha1-96:normal" - +USR2_NAME = "user2" +USR2_PWD = "usrpwd" def setup_keys(tesdir, env): @@ -190,6 +229,10 @@ def setup_keys(tesdir, env): with (open(testlog, 'a')) as logfile: kadmin_local(cmd, env, logfile) + cmd = "addprinc -pw %s %s" % (USR2_PWD, USR2_NAME) + with (open(testlog, 'a')) as logfile: + kadmin_local(cmd, env, logfile) + keys_env = { "KRB5_KTNAME": svc_keytab} keys_env.update(env) @@ -275,7 +318,7 @@ def setup_gssapi_env(testdir, wrapenv): def run_interposetest(testdir, env): - testlog = os.path.join(testdir, 'tests.log') + testlog = os.path.join(testdir, 'interposetest.log') ienv = {"KRB5CCNAME": os.path.join(testdir, 'interpose_ccache'), "KRB5_KTNAME": os.path.join(testdir, SVC_KTNAME)} @@ -364,21 +407,21 @@ def setup_gssproxy(testdir, logfile, env): return gproc, socket -def run_basic_test(testdir, logfile, env, expected_failure=False): +def run_basic_test(testdir, env, expected_failure=False): - print("STARTING BASIC init/Accept tests") + logfile = open(os.path.join(testdir, 'testinitaccept.log'), 'a') svc_name = "host@%s" % WRAP_HOSTNAME svc_keytab = os.path.join(testdir, SVC_KTNAME) - svcenv = {'KRB5_KTNAME': svc_keytab} + svcenv = {'KRB5_KTNAME': svc_keytab, + 'KRB5CCNAME': os.path.join(testdir, 't_accept_ccache'), + 'KRB5_TRACE': os.path.join(testdir, 't_accept_trace.log')} svcenv.update(env) - cli_keytab = os.path.join(testdir, USR_KTNAME) - cli_ccache = os.path.join(testdir, 't_basic_ccname') - clienv = {'GSS_USE_PROXY': 'yes', + clienv = {'KRB5CCNAME': os.path.join(testdir, 't_init_ccache'), + 'KRB5_TRACE': os.path.join(testdir, 't_init_trace.log'), + 'GSS_USE_PROXY': 'yes', 'GSSPROXY_BEHAVIOR': 'REMOTE_FIRST'} - #'KRB5CCNAME': cli_ccache, - #'KRB5_CLIENT_KTNAME': cli_keytab} clienv.update(env) pipe0 = os.pipe() @@ -392,35 +435,67 @@ def run_basic_test(testdir, logfile, env, expected_failure=False): stderr=logfile, env=svcenv, preexec_fn=os.setsid) try: - p1.wait(1) + p1.wait(30) except subprocess.TimeoutExpired: # p1.returncode is set to None here pass if p1.returncode != 0 and not expected_failure: - print("FAILED: Init test returned %s" % str(p1.returncode), - file=sys.stderr) + print_failure("SUCCESS" if p1.returncode == 0 else "FAILED", + "Init test returned %s" % str(p1.returncode)) try: os.killpg(p2.pid, signal.SIGTERM) except OSError: pass else: - print("SUCCESS: Init test returned %s" % str(p1.returncode), - file=sys.stderr) + print_success("SUCCESS" if p1.returncode == 0 else "FAILED", + "Init test returned %s" % str(p1.returncode)) try: - p2.wait(1) + p2.wait(30) except subprocess.TimeoutExpired: # p2.returncode is set to None here pass if p2.returncode != 0 and not expected_failure: - print("FAILED: Accept test returned %s" % str(p2.returncode), - file=sys.stderr) + print_failure("SUCCESS" if p1.returncode == 0 else "FAILED", + "Accept test returned %s" % str(p2.returncode)) try: os.killpg(p1.pid, signal.SIGTERM) except OSError: pass else: - print("SUCCESS: Accept test returned %s" % str(p2.returncode), - file=sys.stderr) + print_success("SUCCESS" if p1.returncode == 0 else "FAILED", + "Accept test returned %s" % str(p2.returncode)) + + +def run_acquire_test(testdir, env, expected_failure=False): + + logfile = open(os.path.join(testdir, 'testacquire.log'), 'a') + + svc_name = "host@%s" % WRAP_HOSTNAME + svc_keytab = os.path.join(testdir, SVC_KTNAME) + testenv = {'KRB5CCNAME': os.path.join(testdir, 't_acquire_ccache'), + 'KRB5_KTNAME': svc_keytab, + 'KRB5_TRACE': os.path.join(testdir, 't_acquire_trace.log'), + 'GSS_USE_PROXY': 'yes', + 'GSSPROXY_BEHAVIOR': 'REMOTE_FIRST'} + testenv.update(env) + + cmd = ["./tests/t_acquire", svc_name] + print("[COMMAND]\n%s\n[ENVIRONMENT]\n%s\n" % (cmd, env), file=logfile) + logfile.flush() + + p1 = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=logfile, + env=testenv, preexec_fn=os.setsid) + try: + p1.wait(30) + except subprocess.TimeoutExpired: + # p1.returncode is set to None here + pass + if p1.returncode != 0 and not expected_failure: + print_failure("SUCCESS" if p1.returncode == 0 else "FAILED", + "Acquire test returned %s" % str(p1.returncode)) + else: + print_success("SUCCESS" if p1.returncode == 0 else "FAILED", + "Acquire test returned %s" % str(p1.returncode)) if __name__ == '__main__': @@ -450,34 +525,37 @@ if __name__ == '__main__': with (open(testlog, 'a')) as logfile: gproc, gpsocket = setup_gssproxy(testdir, logfile, keysenv) - time.sleep(2) #Give time to gssproxy to fully start up + time.sleep(5) #Give time to gssproxy to fully start up processes['GSS-Proxy(%d)' % gproc.pid] = gproc gssapienv['GSSPROXY_SOCKET'] = gpsocket - run_basic_test(testdir, logfile, gssapienv) + print("Testing basic acquire creds", file=sys.stderr) + run_acquire_test(testdir, gssapienv) + print("Testing basic init/accept context", file=sys.stderr) + run_basic_test(testdir, gssapienv) print("Testing basic SIGHUP with no change", file=sys.stderr) os.kill(gproc.pid, signal.SIGHUP) time.sleep(1) #Let gssproxy reload everything - run_basic_test(testdir, logfile, gssapienv) + run_basic_test(testdir, gssapienv) print("Testing SIGHUP with dropped service", file=sys.stderr) update_gssproxy_conf(testdir, keysenv, GSSPROXY_CONF_MINIMAL_TEMPLATE) os.kill(gproc.pid, signal.SIGHUP) time.sleep(1) #Let gssproxy reload everything - run_basic_test(testdir, logfile, gssapienv, True) + run_basic_test(testdir, gssapienv, True) print("Testing SIGHUP with new service", file=sys.stderr) update_gssproxy_conf(testdir, keysenv, GSSPROXY_CONF_TEMPLATE) os.kill(gproc.pid, signal.SIGHUP) time.sleep(1) #Let gssproxy reload everything - run_basic_test(testdir, logfile, gssapienv) + run_basic_test(testdir, gssapienv) print("Testing SIGHUP with change of socket", file=sys.stderr) update_gssproxy_conf(testdir, keysenv, GSSPROXY_CONF_SOCKET_TEMPLATE) gssapienv['GSSPROXY_SOCKET'] += "2" os.kill(gproc.pid, signal.SIGHUP) time.sleep(1) #Let gssproxy reload everything - run_basic_test(testdir, logfile, gssapienv) + run_basic_test(testdir, gssapienv) finally: for name in processes: print("Killing %s" % name) |