summaryrefslogtreecommitdiffstats
path: root/tests/helpers
diff options
context:
space:
mode:
Diffstat (limited to 'tests/helpers')
-rwxr-xr-xtests/helpers/common.py157
-rwxr-xr-xtests/helpers/http.py54
2 files changed, 197 insertions, 14 deletions
diff --git a/tests/helpers/common.py b/tests/helpers/common.py
index 4cf27f9..8e6c53b 100755
--- a/tests/helpers/common.py
+++ b/tests/helpers/common.py
@@ -29,6 +29,64 @@ from string import Template
import subprocess
+WRAP_HOSTNAME = 'idp.ipsilon.dev'
+TESTREALM = 'IPSILON.DEV'
+TESTDOMAIN = 'ipsilon.dev'
+KDC_DBNAME = 'db.file'
+KDC_STASH = 'stash.file'
+KDC_PASSWORD = 'ipsilon'
+KRB5_CONF_TEMPLATE = '''
+[libdefaults]
+ default_realm = ${TESTREALM}
+ dns_lookup_realm = false
+ dns_lookup_kdc = false
+ rdns = false
+ ticket_lifetime = 24h
+ forwardable = yes
+ default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
+ udp_preference_limit = 0
+
+[realms]
+ ${TESTREALM} = {
+ kdc =${WRAP_HOSTNAME}
+ }
+
+[domain_realm]
+ .${TESTDOMAIN} = ${TESTREALM}
+ ${TESTDOMAIN} = ${TESTREALM}
+
+[dbmodules]
+ ${TESTREALM} = {
+ database_name = ${KDCDIR}/${KDC_DBNAME}
+ }
+'''
+
+KDC_CONF_TEMPLATE = '''
+[kdcdefaults]
+ kdc_ports = 88
+ kdc_tcp_ports = 88
+ restrict_anonymous_to_tgt = true
+
+[realms]
+ ${TESTREALM} = {
+ master_key_type = aes256-cts
+ max_life = 7d
+ max_renewable_life = 14d
+ acl_file = ${KDCDIR}/kadm5.acl
+ dict_file = /usr/share/dict/words
+ default_principal_flags = +preauth
+ admin_keytab = ${TESTREALM}/kadm5.keytab
+ key_stash_file = ${KDCDIR}/${KDC_STASH}
+ }
+[logging]
+ kdc = FILE:${KDCLOG}
+'''
+
+USER_KTNAME = "user.keytab"
+HTTP_KTNAME = "http.keytab"
+KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
+
+
class IpsilonTestBase(object):
def __init__(self, name, execname):
@@ -73,6 +131,7 @@ class IpsilonTestBase(object):
'TESTDIR': self.testdir,
'ROOTDIR': self.rootdir,
'NAMEID': nameid,
+ 'HTTP_KTNAME': HTTP_KTNAME,
'TEST_USER': self.testuser})
filename = os.path.join(self.testdir, '%s_profile.cfg' % name)
@@ -167,6 +226,104 @@ class IpsilonTestBase(object):
env=env, preexec_fn=os.setsid)
self.processes.append(p)
+ def setup_kdc(self, env):
+
+ # setup kerberos environment
+ testlog = os.path.join(self.testdir, 'kerb.log')
+ krb5conf = os.path.join(self.testdir, 'krb5.conf')
+ kdcconf = os.path.join(self.testdir, 'kdc.conf')
+ kdcdir = os.path.join(self.testdir, 'kdc')
+ if os.path.exists(kdcdir):
+ shutil.rmtree(kdcdir)
+ os.makedirs(kdcdir)
+
+ t = Template(KRB5_CONF_TEMPLATE)
+ text = t.substitute({'TESTREALM': TESTREALM,
+ 'TESTDOMAIN': TESTDOMAIN,
+ 'TESTDIR': self.testdir,
+ 'KDCDIR': kdcdir,
+ 'KDC_DBNAME': KDC_DBNAME,
+ 'WRAP_HOSTNAME': WRAP_HOSTNAME})
+ with open(krb5conf, 'w+') as f:
+ f.write(text)
+
+ t = Template(KDC_CONF_TEMPLATE)
+ text = t.substitute({'TESTREALM': TESTREALM,
+ 'KDCDIR': kdcdir,
+ 'KDCLOG': testlog,
+ 'KDC_STASH': KDC_STASH})
+ with open(kdcconf, 'w+') as f:
+ f.write(text)
+
+ kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
+ 'KRB5_CONFIG': krb5conf,
+ 'KRB5_KDC_PROFILE': kdcconf}
+ kdcenv.update(env)
+
+ with (open(testlog, 'a')) as logfile:
+ ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
+ "-r", TESTREALM, "-P", KDC_PASSWORD],
+ stdout=logfile, stderr=logfile,
+ env=kdcenv, preexec_fn=os.setsid)
+ ksetup.wait()
+ if ksetup.returncode != 0:
+ raise ValueError('KDC Setup failed')
+
+ kdcproc = subprocess.Popen(['krb5kdc', '-n'],
+ env=kdcenv, preexec_fn=os.setsid)
+ self.processes.append(kdcproc)
+
+ return kdcenv
+
+ def kadmin_local(self, cmd, env, logfile):
+ ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
+ stdout=logfile, stderr=logfile,
+ env=env, preexec_fn=os.setsid)
+ ksetup.wait()
+ if ksetup.returncode != 0:
+ raise ValueError('Kadmin local [%s] failed' % cmd)
+
+ def setup_keys(self, env):
+
+ testlog = os.path.join(self.testdir, 'kerb.log')
+
+ svc_name = "HTTP/%s" % WRAP_HOSTNAME
+ svc_keytab = os.path.join(self.testdir, HTTP_KTNAME)
+ cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+ cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+
+ usr_keytab = os.path.join(self.testdir, USER_KTNAME)
+ cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+ cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+
+ keys_env = {"KRB5_KTNAME": svc_keytab}
+ keys_env.update(env)
+
+ return keys_env
+
+ def kinit_keytab(self, kdcenv):
+ testlog = os.path.join(self.testdir, 'kinit.log')
+ usr_keytab = os.path.join(self.testdir, USER_KTNAME)
+ kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join(
+ self.testdir, 'ccaches/user')
+ with (open(testlog, 'a')) as logfile:
+ logfile.write("\n%s\n" % kdcenv)
+ ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab,
+ self.testuser],
+ stdout=logfile, stderr=logfile,
+ env=kdcenv, preexec_fn=os.setsid)
+ ksetup.wait()
+ if ksetup.returncode != 0:
+ raise ValueError('kinit %s failed' % self.testuser)
+
def wait(self):
for p in self.processes:
os.killpg(p.pid, signal.SIGTERM)
diff --git a/tests/helpers/http.py b/tests/helpers/http.py
index ff696d4..0da7ee2 100755
--- a/tests/helpers/http.py
+++ b/tests/helpers/http.py
@@ -24,6 +24,7 @@ import string
import urlparse
import json
from urllib import urlencode
+from requests_kerberos import HTTPKerberosAuth, OPTIONAL
class WrongPage(Exception):
@@ -89,18 +90,25 @@ class HttpSessions(object):
raise ValueError("Unknown URL: %s" % url)
- def get(self, url, **kwargs):
+ def get(self, url, krb=False, **kwargs):
session = self.get_session(url)
- return session.get(url, allow_redirects=False, **kwargs)
+ allow_redirects = False
+ if krb:
+ # In at least the test instance we don't get back a negotiate
+ # blob to do mutual authentication against.
+ kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
+ kwargs['auth'] = kerberos_auth
+ allow_redirects = True
+ return session.get(url, allow_redirects=allow_redirects, **kwargs)
def post(self, url, **kwargs):
session = self.get_session(url)
return session.post(url, allow_redirects=False, **kwargs)
- def access(self, action, url, **kwargs):
+ def access(self, action, url, krb=False, **kwargs):
action = string.lower(action)
if action == 'get':
- return self.get(url, **kwargs)
+ return self.get(url, krb, **kwargs)
elif action == 'post':
return self.post(url, **kwargs)
else:
@@ -242,19 +250,39 @@ class HttpSessions(object):
return [method, self.new_url(referer, action_url),
{'headers': headers, 'data': payload}]
- def fetch_page(self, idp, target_url, follow_redirect=True):
+ def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
+ """
+ Fetch a page and parse the response code to determine what to do
+ next.
+
+ The login process consists of redirections (302/303) and
+ potentially an unauthorized (401). For the case of unauthorized
+ try the page returned in case of fallback authentication.
+ """
url = target_url
action = 'get'
args = {}
while True:
- r = self.access(action, url, **args) # pylint: disable=star-args
+ # pylint: disable=star-args
+ r = self.access(action, url, krb=krb, **args)
if r.status_code == 303 or r.status_code == 302:
if not follow_redirect:
return PageTree(r)
url = r.headers['location']
action = 'get'
args = {}
+ elif r.status_code == 401:
+ page = PageTree(r)
+ if r.headers.get('WWW-Authenticate', None) is None:
+ return page
+
+ # Fall back, hopefully to testauth authentication.
+ try:
+ (action, url, args) = self.handle_login_form(idp, page)
+ continue
+ except WrongPage:
+ pass
elif r.status_code == 200:
page = PageTree(r)
@@ -288,12 +316,12 @@ class HttpSessions(object):
raise ValueError("Unhandled status (%d) on url %s" % (
r.status_code, url))
- def auth_to_idp(self, idp):
+ def auth_to_idp(self, idp, krb=False):
srv = self.servers[idp]
target_url = '%s/%s/' % (srv['baseuri'], idp)
- r = self.access('get', target_url)
+ r = self.access('get', target_url, krb=krb)
if r.status_code != 200:
raise ValueError("Access to idp failed: %s" % repr(r))
@@ -302,7 +330,8 @@ class HttpSessions(object):
href = page.first_value('//div[@id="content"]/p/a/@href')
url = self.new_url(target_url, href)
- page = self.fetch_page(idp, url)
+ page = self.fetch_page(idp, url, krb=krb)
+
page.expected_value('//div[@id="welcome"]/p/text()',
'Welcome %s!' % srv['user'])
@@ -325,7 +354,6 @@ class HttpSessions(object):
def add_sp_metadata(self, idp, sp, rest=False):
expected_status = 200
- idpsrv = self.servers[idp]
(idpuri, m) = self.get_sp_metadata(idp, sp)
url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
headers = {'referer': url}
@@ -334,13 +362,11 @@ class HttpSessions(object):
payload = {'metadata': m.content}
headers['content-type'] = 'application/x-www-form-urlencoded'
url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
- r = idpsrv['session'].post(url, headers=headers,
- data=urlencode(payload))
+ r = self.post(url, headers=headers, data=urlencode(payload))
else:
metafile = {'metafile': m.content}
payload = {'name': sp}
- r = idpsrv['session'].post(url, headers=headers,
- data=payload, files=metafile)
+ r = self.post(url, headers=headers, data=payload, files=metafile)
if r.status_code != expected_status:
raise ValueError('Failed to post SP data [%s]' % repr(r))