summaryrefslogtreecommitdiffstats
path: root/proxy/tests/runtests.py
diff options
context:
space:
mode:
Diffstat (limited to 'proxy/tests/runtests.py')
-rwxr-xr-xproxy/tests/runtests.py130
1 files changed, 104 insertions, 26 deletions
diff --git a/proxy/tests/runtests.py b/proxy/tests/runtests.py
index 0f1b85b..9c8c206 100755
--- a/proxy/tests/runtests.py
+++ b/proxy/tests/runtests.py
@@ -12,6 +12,44 @@ import subprocess
import sys
import time
+try:
+ from colorama import Fore, Style
+
+ def format_key(status, key):
+ if status == "success":
+ color = Fore.GREEN
+ elif status == "failure":
+ color = Fore.RED
+ else:
+ color = Style.DIM + Fore.YELLOW
+ return "[" + color + key + Style.RESET_ALL + "]"
+
+except ImportError:
+
+ def format_key(status, key):
+ if status == "success":
+ color = " OO "
+ elif status == "failure":
+ color = " XX "
+ else:
+ color = " -- "
+ return "[" + color + key + color + "]"
+
+def print_keyed(status, key, text, io):
+ print("%s %s" % (format_key(status, key), text), file=io)
+
+
+def print_success(key, text, io=sys.stderr):
+ print_keyed("success", key, text, io)
+
+
+def print_failure(key, text, io=sys.stderr):
+ print_keyed("failure", key, text, io)
+
+
+def print_warning(key, text, io=sys.stderr):
+ print_keyed("other", key, text, io)
+
def parse_args():
parser = argparse.ArgumentParser(description='GSS-Proxy Tests Environment')
@@ -167,7 +205,8 @@ USR_KTNAME = "user.gssproxy.keytab"
USR_CCACHE = "krb5ccache_usr"
SVC_KTNAME = "kdc.gssproxy.keytab"
KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
-
+USR2_NAME = "user2"
+USR2_PWD = "usrpwd"
def setup_keys(tesdir, env):
@@ -190,6 +229,10 @@ def setup_keys(tesdir, env):
with (open(testlog, 'a')) as logfile:
kadmin_local(cmd, env, logfile)
+ cmd = "addprinc -pw %s %s" % (USR2_PWD, USR2_NAME)
+ with (open(testlog, 'a')) as logfile:
+ kadmin_local(cmd, env, logfile)
+
keys_env = { "KRB5_KTNAME": svc_keytab}
keys_env.update(env)
@@ -275,7 +318,7 @@ def setup_gssapi_env(testdir, wrapenv):
def run_interposetest(testdir, env):
- testlog = os.path.join(testdir, 'tests.log')
+ testlog = os.path.join(testdir, 'interposetest.log')
ienv = {"KRB5CCNAME": os.path.join(testdir, 'interpose_ccache'),
"KRB5_KTNAME": os.path.join(testdir, SVC_KTNAME)}
@@ -364,21 +407,21 @@ def setup_gssproxy(testdir, logfile, env):
return gproc, socket
-def run_basic_test(testdir, logfile, env, expected_failure=False):
+def run_basic_test(testdir, env, expected_failure=False):
- print("STARTING BASIC init/Accept tests")
+ logfile = open(os.path.join(testdir, 'testinitaccept.log'), 'a')
svc_name = "host@%s" % WRAP_HOSTNAME
svc_keytab = os.path.join(testdir, SVC_KTNAME)
- svcenv = {'KRB5_KTNAME': svc_keytab}
+ svcenv = {'KRB5_KTNAME': svc_keytab,
+ 'KRB5CCNAME': os.path.join(testdir, 't_accept_ccache'),
+ 'KRB5_TRACE': os.path.join(testdir, 't_accept_trace.log')}
svcenv.update(env)
- cli_keytab = os.path.join(testdir, USR_KTNAME)
- cli_ccache = os.path.join(testdir, 't_basic_ccname')
- clienv = {'GSS_USE_PROXY': 'yes',
+ clienv = {'KRB5CCNAME': os.path.join(testdir, 't_init_ccache'),
+ 'KRB5_TRACE': os.path.join(testdir, 't_init_trace.log'),
+ 'GSS_USE_PROXY': 'yes',
'GSSPROXY_BEHAVIOR': 'REMOTE_FIRST'}
- #'KRB5CCNAME': cli_ccache,
- #'KRB5_CLIENT_KTNAME': cli_keytab}
clienv.update(env)
pipe0 = os.pipe()
@@ -392,35 +435,67 @@ def run_basic_test(testdir, logfile, env, expected_failure=False):
stderr=logfile, env=svcenv, preexec_fn=os.setsid)
try:
- p1.wait(1)
+ p1.wait(30)
except subprocess.TimeoutExpired:
# p1.returncode is set to None here
pass
if p1.returncode != 0 and not expected_failure:
- print("FAILED: Init test returned %s" % str(p1.returncode),
- file=sys.stderr)
+ print_failure("SUCCESS" if p1.returncode == 0 else "FAILED",
+ "Init test returned %s" % str(p1.returncode))
try:
os.killpg(p2.pid, signal.SIGTERM)
except OSError:
pass
else:
- print("SUCCESS: Init test returned %s" % str(p1.returncode),
- file=sys.stderr)
+ print_success("SUCCESS" if p1.returncode == 0 else "FAILED",
+ "Init test returned %s" % str(p1.returncode))
try:
- p2.wait(1)
+ p2.wait(30)
except subprocess.TimeoutExpired:
# p2.returncode is set to None here
pass
if p2.returncode != 0 and not expected_failure:
- print("FAILED: Accept test returned %s" % str(p2.returncode),
- file=sys.stderr)
+ print_failure("SUCCESS" if p1.returncode == 0 else "FAILED",
+ "Accept test returned %s" % str(p2.returncode))
try:
os.killpg(p1.pid, signal.SIGTERM)
except OSError:
pass
else:
- print("SUCCESS: Accept test returned %s" % str(p2.returncode),
- file=sys.stderr)
+ print_success("SUCCESS" if p1.returncode == 0 else "FAILED",
+ "Accept test returned %s" % str(p2.returncode))
+
+
+def run_acquire_test(testdir, env, expected_failure=False):
+
+ logfile = open(os.path.join(testdir, 'testacquire.log'), 'a')
+
+ svc_name = "host@%s" % WRAP_HOSTNAME
+ svc_keytab = os.path.join(testdir, SVC_KTNAME)
+ testenv = {'KRB5CCNAME': os.path.join(testdir, 't_acquire_ccache'),
+ 'KRB5_KTNAME': svc_keytab,
+ 'KRB5_TRACE': os.path.join(testdir, 't_acquire_trace.log'),
+ 'GSS_USE_PROXY': 'yes',
+ 'GSSPROXY_BEHAVIOR': 'REMOTE_FIRST'}
+ testenv.update(env)
+
+ cmd = ["./tests/t_acquire", svc_name]
+ print("[COMMAND]\n%s\n[ENVIRONMENT]\n%s\n" % (cmd, env), file=logfile)
+ logfile.flush()
+
+ p1 = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=logfile,
+ env=testenv, preexec_fn=os.setsid)
+ try:
+ p1.wait(30)
+ except subprocess.TimeoutExpired:
+ # p1.returncode is set to None here
+ pass
+ if p1.returncode != 0 and not expected_failure:
+ print_failure("SUCCESS" if p1.returncode == 0 else "FAILED",
+ "Acquire test returned %s" % str(p1.returncode))
+ else:
+ print_success("SUCCESS" if p1.returncode == 0 else "FAILED",
+ "Acquire test returned %s" % str(p1.returncode))
if __name__ == '__main__':
@@ -450,34 +525,37 @@ if __name__ == '__main__':
with (open(testlog, 'a')) as logfile:
gproc, gpsocket = setup_gssproxy(testdir, logfile, keysenv)
- time.sleep(2) #Give time to gssproxy to fully start up
+ time.sleep(5) #Give time to gssproxy to fully start up
processes['GSS-Proxy(%d)' % gproc.pid] = gproc
gssapienv['GSSPROXY_SOCKET'] = gpsocket
- run_basic_test(testdir, logfile, gssapienv)
+ print("Testing basic acquire creds", file=sys.stderr)
+ run_acquire_test(testdir, gssapienv)
+ print("Testing basic init/accept context", file=sys.stderr)
+ run_basic_test(testdir, gssapienv)
print("Testing basic SIGHUP with no change", file=sys.stderr)
os.kill(gproc.pid, signal.SIGHUP)
time.sleep(1) #Let gssproxy reload everything
- run_basic_test(testdir, logfile, gssapienv)
+ run_basic_test(testdir, gssapienv)
print("Testing SIGHUP with dropped service", file=sys.stderr)
update_gssproxy_conf(testdir, keysenv, GSSPROXY_CONF_MINIMAL_TEMPLATE)
os.kill(gproc.pid, signal.SIGHUP)
time.sleep(1) #Let gssproxy reload everything
- run_basic_test(testdir, logfile, gssapienv, True)
+ run_basic_test(testdir, gssapienv, True)
print("Testing SIGHUP with new service", file=sys.stderr)
update_gssproxy_conf(testdir, keysenv, GSSPROXY_CONF_TEMPLATE)
os.kill(gproc.pid, signal.SIGHUP)
time.sleep(1) #Let gssproxy reload everything
- run_basic_test(testdir, logfile, gssapienv)
+ run_basic_test(testdir, gssapienv)
print("Testing SIGHUP with change of socket", file=sys.stderr)
update_gssproxy_conf(testdir, keysenv, GSSPROXY_CONF_SOCKET_TEMPLATE)
gssapienv['GSSPROXY_SOCKET'] += "2"
os.kill(gproc.pid, signal.SIGHUP)
time.sleep(1) #Let gssproxy reload everything
- run_basic_test(testdir, logfile, gssapienv)
+ run_basic_test(testdir, gssapienv)
finally:
for name in processes:
print("Killing %s" % name)