summaryrefslogtreecommitdiffstats
path: root/tests/helpers
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2015-04-28 10:52:55 -0400
committerSimo Sorce <simo@redhat.com>2015-05-07 10:44:45 -0400
commitb34a4553cf8c60453fbef245d7d844a30339c734 (patch)
tree8125ac3a49605a61ee9d5e16896af1a96e41820d /tests/helpers
parentaa5dc3b417db962a075a092d0d3528010c1059f7 (diff)
downloadipsilon.git-b34a4553cf8c60453fbef245d7d844a30339c734.tar.gz
ipsilon.git-b34a4553cf8c60453fbef245d7d844a30339c734.tar.xz
ipsilon.git-b34a4553cf8c60453fbef245d7d844a30339c734.zip
Configure a KDC, add test for GSSAPI/Kerberos
Using nss_wrappers so we can control host names we can setup a KDC and test GSSAPI, including fallback to forms-based auth. This also means that fetch_page() needs to handle 401 a bit better, so it can re-try a failed authentication or fall back to forms-based auth. Note for posterity: if gss_localname() fails this is likely due to using the wrong krb5.conf in Apache, so pass in all environment variables. The KDC setup code was based heavily on the tests in the gssproxy project. https://fedorahosted.org/ipsilon/ticket/116 Signed-off-by: Rob Crittenden <rcritten@redhat.com> Reviewed-by: Simo Sorce <simo@redhat.com>
Diffstat (limited to 'tests/helpers')
-rwxr-xr-xtests/helpers/common.py157
-rwxr-xr-xtests/helpers/http.py54
2 files changed, 197 insertions, 14 deletions
diff --git a/tests/helpers/common.py b/tests/helpers/common.py
index 4cf27f9..8e6c53b 100755
--- a/tests/helpers/common.py
+++ b/tests/helpers/common.py
@@ -29,6 +29,64 @@ from string import Template
import subprocess
+WRAP_HOSTNAME = 'idp.ipsilon.dev'
+TESTREALM = 'IPSILON.DEV'
+TESTDOMAIN = 'ipsilon.dev'
+KDC_DBNAME = 'db.file'
+KDC_STASH = 'stash.file'
+KDC_PASSWORD = 'ipsilon'
+KRB5_CONF_TEMPLATE = '''
+[libdefaults]
+ default_realm = ${TESTREALM}
+ dns_lookup_realm = false
+ dns_lookup_kdc = false
+ rdns = false
+ ticket_lifetime = 24h
+ forwardable = yes
+ default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
+ udp_preference_limit = 0
+
+[realms]
+ ${TESTREALM} = {
+ kdc =${WRAP_HOSTNAME}
+ }
+
+[domain_realm]
+ .${TESTDOMAIN} = ${TESTREALM}
+ ${TESTDOMAIN} = ${TESTREALM}
+
+[dbmodules]
+ ${TESTREALM} = {
+ database_name = ${KDCDIR}/${KDC_DBNAME}
+ }
+'''
+
+KDC_CONF_TEMPLATE = '''
+[kdcdefaults]
+ kdc_ports = 88
+ kdc_tcp_ports = 88
+ restrict_anonymous_to_tgt = true
+
+[realms]
+ ${TESTREALM} = {
+ master_key_type = aes256-cts
+ max_life = 7d
+ max_renewable_life = 14d
+ acl_file = ${KDCDIR}/kadm5.acl
+ dict_file = /usr/share/dict/words
+ default_principal_flags = +preauth
+ admin_keytab = ${TESTREALM}/kadm5.keytab
+ key_stash_file = ${KDCDIR}/${KDC_STASH}
+ }
+[logging]
+ kdc = FILE:${KDCLOG}
+'''
+
+USER_KTNAME = "user.keytab"
+HTTP_KTNAME = "http.keytab"
+KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
+
+
class IpsilonTestBase(object):
def __init__(self, name, execname):
@@ -73,6 +131,7 @@ class IpsilonTestBase(object):
'TESTDIR': self.testdir,
'ROOTDIR': self.rootdir,
'NAMEID': nameid,
+ 'HTTP_KTNAME': HTTP_KTNAME,
'TEST_USER': self.testuser})
filename = os.path.join(self.testdir, '%s_profile.cfg' % name)
@@ -167,6 +226,104 @@ class IpsilonTestBase(object):
env=env, preexec_fn=os.setsid)
self.processes.append(p)
+ def setup_kdc(self, env):
+
+ # setup kerberos environment
+ testlog = os.path.join(self.testdir, 'kerb.log')
+ krb5conf = os.path.join(self.testdir, 'krb5.conf')
+ kdcconf = os.path.join(self.testdir, 'kdc.conf')
+ kdcdir = os.path.join(self.testdir, 'kdc')
+ if os.path.exists(kdcdir):
+ shutil.rmtree(kdcdir)
+ os.makedirs(kdcdir)
+
+ t = Template(KRB5_CONF_TEMPLATE)
+ text = t.substitute({'TESTREALM': TESTREALM,
+ 'TESTDOMAIN': TESTDOMAIN,
+ 'TESTDIR': self.testdir,
+ 'KDCDIR': kdcdir,
+ 'KDC_DBNAME': KDC_DBNAME,
+ 'WRAP_HOSTNAME': WRAP_HOSTNAME})
+ with open(krb5conf, 'w+') as f:
+ f.write(text)
+
+ t = Template(KDC_CONF_TEMPLATE)
+ text = t.substitute({'TESTREALM': TESTREALM,
+ 'KDCDIR': kdcdir,
+ 'KDCLOG': testlog,
+ 'KDC_STASH': KDC_STASH})
+ with open(kdcconf, 'w+') as f:
+ f.write(text)
+
+ kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
+ 'KRB5_CONFIG': krb5conf,
+ 'KRB5_KDC_PROFILE': kdcconf}
+ kdcenv.update(env)
+
+ with (open(testlog, 'a')) as logfile:
+ ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
+ "-r", TESTREALM, "-P", KDC_PASSWORD],
+ stdout=logfile, stderr=logfile,
+ env=kdcenv, preexec_fn=os.setsid)
+ ksetup.wait()
+ if ksetup.returncode != 0:
+ raise ValueError('KDC Setup failed')
+
+ kdcproc = subprocess.Popen(['krb5kdc', '-n'],
+ env=kdcenv, preexec_fn=os.setsid)
+ self.processes.append(kdcproc)
+
+ return kdcenv
+
+ def kadmin_local(self, cmd, env, logfile):
+ ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
+ stdout=logfile, stderr=logfile,
+ env=env, preexec_fn=os.setsid)
+ ksetup.wait()
+ if ksetup.returncode != 0:
+ raise ValueError('Kadmin local [%s] failed' % cmd)
+
+ def setup_keys(self, env):
+
+ testlog = os.path.join(self.testdir, 'kerb.log')
+
+ svc_name = "HTTP/%s" % WRAP_HOSTNAME
+ svc_keytab = os.path.join(self.testdir, HTTP_KTNAME)
+ cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+ cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+
+ usr_keytab = os.path.join(self.testdir, USER_KTNAME)
+ cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+ cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser)
+ with (open(testlog, 'a')) as logfile:
+ self.kadmin_local(cmd, env, logfile)
+
+ keys_env = {"KRB5_KTNAME": svc_keytab}
+ keys_env.update(env)
+
+ return keys_env
+
+ def kinit_keytab(self, kdcenv):
+ testlog = os.path.join(self.testdir, 'kinit.log')
+ usr_keytab = os.path.join(self.testdir, USER_KTNAME)
+ kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join(
+ self.testdir, 'ccaches/user')
+ with (open(testlog, 'a')) as logfile:
+ logfile.write("\n%s\n" % kdcenv)
+ ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab,
+ self.testuser],
+ stdout=logfile, stderr=logfile,
+ env=kdcenv, preexec_fn=os.setsid)
+ ksetup.wait()
+ if ksetup.returncode != 0:
+ raise ValueError('kinit %s failed' % self.testuser)
+
def wait(self):
for p in self.processes:
os.killpg(p.pid, signal.SIGTERM)
diff --git a/tests/helpers/http.py b/tests/helpers/http.py
index ff696d4..0da7ee2 100755
--- a/tests/helpers/http.py
+++ b/tests/helpers/http.py
@@ -24,6 +24,7 @@ import string
import urlparse
import json
from urllib import urlencode
+from requests_kerberos import HTTPKerberosAuth, OPTIONAL
class WrongPage(Exception):
@@ -89,18 +90,25 @@ class HttpSessions(object):
raise ValueError("Unknown URL: %s" % url)
- def get(self, url, **kwargs):
+ def get(self, url, krb=False, **kwargs):
session = self.get_session(url)
- return session.get(url, allow_redirects=False, **kwargs)
+ allow_redirects = False
+ if krb:
+ # In at least the test instance we don't get back a negotiate
+ # blob to do mutual authentication against.
+ kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
+ kwargs['auth'] = kerberos_auth
+ allow_redirects = True
+ return session.get(url, allow_redirects=allow_redirects, **kwargs)
def post(self, url, **kwargs):
session = self.get_session(url)
return session.post(url, allow_redirects=False, **kwargs)
- def access(self, action, url, **kwargs):
+ def access(self, action, url, krb=False, **kwargs):
action = string.lower(action)
if action == 'get':
- return self.get(url, **kwargs)
+ return self.get(url, krb, **kwargs)
elif action == 'post':
return self.post(url, **kwargs)
else:
@@ -242,19 +250,39 @@ class HttpSessions(object):
return [method, self.new_url(referer, action_url),
{'headers': headers, 'data': payload}]
- def fetch_page(self, idp, target_url, follow_redirect=True):
+ def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
+ """
+ Fetch a page and parse the response code to determine what to do
+ next.
+
+ The login process consists of redirections (302/303) and
+ potentially an unauthorized (401). For the case of unauthorized
+ try the page returned in case of fallback authentication.
+ """
url = target_url
action = 'get'
args = {}
while True:
- r = self.access(action, url, **args) # pylint: disable=star-args
+ # pylint: disable=star-args
+ r = self.access(action, url, krb=krb, **args)
if r.status_code == 303 or r.status_code == 302:
if not follow_redirect:
return PageTree(r)
url = r.headers['location']
action = 'get'
args = {}
+ elif r.status_code == 401:
+ page = PageTree(r)
+ if r.headers.get('WWW-Authenticate', None) is None:
+ return page
+
+ # Fall back, hopefully to testauth authentication.
+ try:
+ (action, url, args) = self.handle_login_form(idp, page)
+ continue
+ except WrongPage:
+ pass
elif r.status_code == 200:
page = PageTree(r)
@@ -288,12 +316,12 @@ class HttpSessions(object):
raise ValueError("Unhandled status (%d) on url %s" % (
r.status_code, url))
- def auth_to_idp(self, idp):
+ def auth_to_idp(self, idp, krb=False):
srv = self.servers[idp]
target_url = '%s/%s/' % (srv['baseuri'], idp)
- r = self.access('get', target_url)
+ r = self.access('get', target_url, krb=krb)
if r.status_code != 200:
raise ValueError("Access to idp failed: %s" % repr(r))
@@ -302,7 +330,8 @@ class HttpSessions(object):
href = page.first_value('//div[@id="content"]/p/a/@href')
url = self.new_url(target_url, href)
- page = self.fetch_page(idp, url)
+ page = self.fetch_page(idp, url, krb=krb)
+
page.expected_value('//div[@id="welcome"]/p/text()',
'Welcome %s!' % srv['user'])
@@ -325,7 +354,6 @@ class HttpSessions(object):
def add_sp_metadata(self, idp, sp, rest=False):
expected_status = 200
- idpsrv = self.servers[idp]
(idpuri, m) = self.get_sp_metadata(idp, sp)
url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
headers = {'referer': url}
@@ -334,13 +362,11 @@ class HttpSessions(object):
payload = {'metadata': m.content}
headers['content-type'] = 'application/x-www-form-urlencoded'
url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
- r = idpsrv['session'].post(url, headers=headers,
- data=urlencode(payload))
+ r = self.post(url, headers=headers, data=urlencode(payload))
else:
metafile = {'metafile': m.content}
payload = {'name': sp}
- r = idpsrv['session'].post(url, headers=headers,
- data=payload, files=metafile)
+ r = self.post(url, headers=headers, data=payload, files=metafile)
if r.status_code != expected_status:
raise ValueError('Failed to post SP data [%s]' % repr(r))