summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrick Uiterwijk <puiterwijk@redhat.com>2015-04-28 19:11:12 +0200
committerPatrick Uiterwijk <puiterwijk@redhat.com>2015-04-28 20:53:06 +0200
commit86f5401c7cb620046b6dd7730844998dec595f43 (patch)
tree60e8eb4602d570341ac7a5cd575b0f55be51e27e
parentba45934659346510966ca6c58a01dbba3eca7d2f (diff)
downloadipsilon.git-86f5401c7cb620046b6dd7730844998dec595f43.tar.gz
ipsilon.git-86f5401c7cb620046b6dd7730844998dec595f43.tar.xz
ipsilon.git-86f5401c7cb620046b6dd7730844998dec595f43.zip
Add OpenID test suite
This tests core OpenID and the Attribute Exchange, Simple Registration and Teams extensions. Using a small wsgi tool because mod_auth_openid does not support all extensions. Signed-off-by: Patrick Uiterwijk <puiterwijk@redhat.com> Reviewed-by: Rob Crittenden <rcritten@redhat.com>
-rw-r--r--Makefile1
-rw-r--r--ipsilon/login/authtest.py3
-rw-r--r--ipsilon/providers/openidp.py7
-rw-r--r--tests/blobs/openid_app.py114
-rwxr-xr-xtests/helpers/http.py84
-rwxr-xr-xtests/openid.py132
-rwxr-xr-xtests/testmapping.py4
7 files changed, 336 insertions, 9 deletions
diff --git a/Makefile b/Makefile
index dce214a..2434898 100644
--- a/Makefile
+++ b/Makefile
@@ -98,6 +98,7 @@ tests: wrappers
PYTHONPATH=./ ./tests/tests.py --test=pgdb
PYTHONPATH=./ ./tests/tests.py --test=fconf
PYTHONPATH=./ ./tests/tests.py --test=ldap
+ PYTHONPATH=./ ./tests/tests.py --test=openid
test: lp-test unittests tests
diff --git a/ipsilon/login/authtest.py b/ipsilon/login/authtest.py
index 7fc4160..5f0ff6e 100644
--- a/ipsilon/login/authtest.py
+++ b/ipsilon/login/authtest.py
@@ -36,7 +36,8 @@ class TestAuth(LoginFormBase):
'givenname': 'Test User',
'surname': username,
'fullname': 'Test User %s' % username,
- 'email': '%s@example.com' % username
+ 'email': '%s@example.com' % username,
+ '_groups': [username]
}
return self.lm.auth_successful(self.trans,
username, 'password', testdata)
diff --git a/ipsilon/providers/openidp.py b/ipsilon/providers/openidp.py
index 4e47d3e..032c406 100644
--- a/ipsilon/providers/openidp.py
+++ b/ipsilon/providers/openidp.py
@@ -143,6 +143,8 @@ class Installer(ProviderInstaller):
help='Configure OpenID Provider')
group.add_argument('--openid-dburi',
help='OpenID database URI')
+ group.add_argument('--openid-extensions', default='',
+ help='List of OpenID Extensions to enable')
def configure(self, opts):
if opts['openid'] != 'yes':
@@ -160,10 +162,11 @@ class Installer(ProviderInstaller):
po.wipe_data()
po.wipe_config_values()
config = {'endpoint url': url,
- 'identity_url_template': '%sid/%%(username)s' % url,
+ 'identity url template': '%sid/%%(username)s' % url,
'database url': opts['openid_dburi'] or
opts['database_url'] % {
- 'datadir': opts['data_dir'], 'dbname': 'openid'}}
+ 'datadir': opts['data_dir'], 'dbname': 'openid'},
+ 'enabled extensions': opts['openid_extensions']}
po.save_plugin_config(config)
# Update global config to add login plugin
diff --git a/tests/blobs/openid_app.py b/tests/blobs/openid_app.py
new file mode 100644
index 0000000..db80bbd
--- /dev/null
+++ b/tests/blobs/openid_app.py
@@ -0,0 +1,114 @@
+# Copyright (C) 2015 Ipsilon project Contributors, for licensee see COPYING
+import sys
+sys.stdout = sys.stderr
+
+import cherrypy
+import os
+import pwd
+
+from openid.consumer import consumer
+from openid.extensions import sreg, ax
+from openid_teams import teams
+
+
+class OpenIDApp(object):
+ def index(self, extensions):
+ self.extensions = extensions == 'YES'
+ oidconsumer = consumer.Consumer(dict(), None)
+ try:
+ request = oidconsumer.begin('http://127.0.0.10:45080/idp1/')
+ except Exception as ex:
+ return 'ERROR: %s' % ex
+
+ if request is None:
+ return 'ERROR: No request'
+
+ # Attach extensions here
+ if self.extensions:
+ request.addExtension(sreg.SRegRequest(
+ required=['nickname', 'email', 'timezone']))
+ ax_req = ax.FetchRequest()
+ ax_req_name = ax.AttrInfo('http://schema.openid.net/namePerson')
+ ax_req.add(ax_req_name)
+ request.addExtension(ax_req)
+ username = pwd.getpwuid(os.getuid())[0]
+ request.addExtension(teams.TeamsRequest(requested=[username]))
+
+ # Build and send final request
+ trust_root = cherrypy.url()
+ return_to = trust_root + 'finish'
+ if request.shouldSendRedirect():
+ redirect_url = request.redirectURL(
+ trust_root, return_to)
+ raise cherrypy.HTTPRedirect(redirect_url)
+ else:
+ return request.htmlMarkup(
+ trust_root, return_to)
+ index.exposed = True
+
+ def finish(self, **args):
+ oidconsumer = consumer.Consumer(dict(), None)
+ info = oidconsumer.complete(cherrypy.request.params, cherrypy.url())
+ display_identifier = info.getDisplayIdentifier()
+
+ if info.status == consumer.FAILURE and display_identifier:
+ return 'ERROR:Verification of %s failed: %s' % (
+ display_identifier, info.message)
+ elif info.status == consumer.CANCEL:
+ return 'ERROR: Cancelled'
+ elif info.status == consumer.SUCCESS:
+ username = pwd.getpwuid(os.getuid())[0]
+ expected_identifier = 'http://127.0.0.10:45080/idp1/openid/id/%s/'\
+ % username
+ if expected_identifier != display_identifier:
+ return 'ERROR: Wrong id returned: %s != %s' % (
+ expected_identifier,
+ display_identifier)
+
+ if self.extensions:
+ sreg_resp = sreg.SRegResponse.fromSuccessResponse(info)
+ teams_resp = teams.TeamsResponse.fromSuccessResponse(info)
+ ax_resp = ax.FetchResponse.fromSuccessResponse(info)
+
+ if sreg_resp is None:
+ return 'ERROR: No sreg!'
+ elif teams_resp is None:
+ return 'ERROR: No teams!'
+ elif ax_resp is None:
+ return 'ERROR: No AX!'
+
+ # Check values
+ expected_name = 'Test User %s' % username
+ expected_email = '%s@example.com' % username
+
+ ax_name = ax_resp.data[
+ 'http://schema.openid.net/namePerson'][0]
+ sreg_email = sreg_resp.data['email']
+
+ if ax_name != expected_name:
+ return 'ERROR: Wrong name returned: %s != %s' % (
+ expected_name,
+ ax_name)
+
+ if sreg_email != expected_email:
+ return 'ERROR: Wrong email returned: %s != %s' % (
+ expected_email,
+ sreg_email)
+
+ if username not in teams_resp.teams:
+ return 'ERROR: User not in self-named group (%s not in %s)' %\
+ (username, teams_resp.teams)
+
+ if self.extensions:
+ return 'SUCCESS, WITH EXTENSIONS'
+ else:
+ return 'SUCCESS, WITHOUT EXTENSIONS'
+ else:
+ return 'ERROR: Strange error: %s' % info.message
+ finish.exposed = True
+
+
+cherrypy.config['environment'] = 'embedded'
+
+application = cherrypy.Application(OpenIDApp(),
+ script_name=None, config=None)
diff --git a/tests/helpers/http.py b/tests/helpers/http.py
index 2478e2a..ff696d4 100755
--- a/tests/helpers/http.py
+++ b/tests/helpers/http.py
@@ -113,14 +113,17 @@ class HttpSessions(object):
return action
def get_form_data(self, page, form_id, input_fields):
+ form_selector = '//form'
+ if form_id:
+ form_selector += '[@id="%s"]' % form_id
values = []
- action = page.first_value('//form[@id="%s"]/@action' % form_id)
+ action = page.first_value('%s/@action' % form_selector)
values.append(action)
- method = page.first_value('//form[@id="%s"]/@method' % form_id)
+ method = page.first_value('%s/@method' % form_selector)
values.append(method)
for field in input_fields:
- value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id,
- field))
+ value = page.all_values('%s/input/@%s' % (form_selector,
+ field))
values.append(value)
return values
@@ -180,6 +183,65 @@ class HttpSessions(object):
return [method, self.new_url(referer, action_url),
{'headers': headers, 'data': payload}]
+ def handle_openid_form(self, page):
+ if type(page) != PageTree:
+ raise TypeError("Expected PageTree object")
+
+ if not page.first_value('//title/text()') == \
+ 'OpenID transaction in progress':
+ raise WrongPage('Not OpenID autosubmit form')
+
+ try:
+ results = self.get_form_data(page, None,
+ ["name", "value"])
+ action_url = results[0]
+ if action_url is None:
+ raise Exception
+ method = results[1]
+ names = results[2]
+ values = results[3]
+ except Exception: # pylint: disable=broad-except
+ raise WrongPage("Not OpenID autosubmit form")
+
+ referer = page.make_referer()
+ headers = {'referer': referer}
+
+ payload = {}
+ for i in range(0, len(names)):
+ payload[names[i]] = values[i]
+
+ return [method, self.new_url(referer, action_url),
+ {'headers': headers, 'data': payload}]
+
+ def handle_openid_consent_form(self, page):
+ if type(page) != PageTree:
+ raise TypeError("Expected PageTree object")
+
+ try:
+ results = self.get_form_data(page, "consent_form",
+ ['name', 'value'])
+ action_url = results[0]
+ if action_url is None:
+ raise Exception
+ method = results[1]
+ names = results[2]
+ values = results[3]
+ except Exception: # pylint: disable=broad-except
+ raise WrongPage("Not an OpenID Consent Form Page")
+
+ referer = page.make_referer()
+ headers = {'referer': referer}
+
+ payload = {}
+ for i in range(0, len(names)):
+ payload[names[i]] = values[i]
+
+ # Replace known values
+ payload['decided_allow'] = 'Allow'
+
+ return [method, self.new_url(referer, action_url),
+ {'headers': headers, 'data': payload}]
+
def fetch_page(self, idp, target_url, follow_redirect=True):
url = target_url
action = 'get'
@@ -187,7 +249,7 @@ class HttpSessions(object):
while True:
r = self.access(action, url, **args) # pylint: disable=star-args
- if r.status_code == 303:
+ if r.status_code == 303 or r.status_code == 302:
if not follow_redirect:
return PageTree(r)
url = r.headers['location']
@@ -208,6 +270,18 @@ class HttpSessions(object):
except WrongPage:
pass
+ try:
+ (action, url, args) = self.handle_openid_consent_form(page)
+ continue
+ except WrongPage:
+ pass
+
+ try:
+ (action, url, args) = self.handle_openid_form(page)
+ continue
+ except WrongPage:
+ pass
+
# Either we got what we wanted, or we have to stop anyway
return page
else:
diff --git a/tests/openid.py b/tests/openid.py
new file mode 100755
index 0000000..ebc92ba
--- /dev/null
+++ b/tests/openid.py
@@ -0,0 +1,132 @@
+#!/usr/bin/python
+#
+# Copyright (C) 2014 Simo Sorce <simo@redhat.com>
+#
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+from helpers.common import IpsilonTestBase # pylint: disable=relative-import
+from helpers.http import HttpSessions # pylint: disable=relative-import
+import os
+import pwd
+import sys
+import inspect
+from string import Template
+
+idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
+ 'CONFDIR': '${TESTDIR}/etc',
+ 'DATADIR': '${TESTDIR}/lib',
+ 'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
+ 'STATICDIR': '${ROOTDIR}',
+ 'BINDIR': '${ROOTDIR}/ipsilon',
+ 'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'}
+
+
+idp_a = {'hostname': '${ADDRESS}:${PORT}',
+ 'admin_user': '${TEST_USER}',
+ 'system_user': '${TEST_USER}',
+ 'instance': '${NAME}',
+ 'secure': 'no',
+ 'testauth': 'yes',
+ 'openid': 'yes',
+ 'openid_extensions': 'Attribute Exchange,Simple Registration,Teams',
+ 'pam': 'no',
+ 'krb': 'no',
+ 'ipa': 'no',
+ 'server_debugging': 'True'}
+
+
+def fixup_sp_httpd(httpdir, testdir):
+ client_wsgi = """
+
+WSGIScriptAlias / ${TESTDIR}/blobs/openid_app.py
+
+<Directory ${TESTDIR}/blobs>
+ Require all granted
+</Directory>
+"""
+ t = Template(client_wsgi)
+ text = t.substitute({'TESTDIR': testdir})
+ with open(httpdir + '/conf.d/ipsilon-openid-client.conf', 'a') as f:
+ f.write(text)
+
+
+class IpsilonTest(IpsilonTestBase):
+
+ def __init__(self):
+ super(IpsilonTest, self).__init__('openid', __file__)
+
+ def setup_servers(self, env=None):
+ print "Installing IDP server"
+ name = 'idp1'
+ addr = '127.0.0.10'
+ port = '45080'
+ idp = self.generate_profile(idp_g, idp_a, name, addr, port)
+ conf = self.setup_idp_server(idp, name, addr, port, env)
+
+ print "Starting IDP's httpd server"
+ self.start_http_server(conf, env)
+
+ print "Installing first SP server"
+ name = 'sp1'
+ addr = '127.0.0.11'
+ port = '45081'
+ conf = self.setup_http(name, addr, port)
+ testdir = os.path.dirname(os.path.abspath(inspect.getfile(
+ inspect.currentframe())))
+ fixup_sp_httpd(os.path.dirname(conf), testdir)
+
+ print "Starting SP's httpd server"
+ self.start_http_server(conf, env)
+
+
+if __name__ == '__main__':
+
+ idpname = 'idp1'
+ sp1name = 'sp1'
+ user = pwd.getpwuid(os.getuid())[0]
+
+ sess = HttpSessions()
+ sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon')
+ sess.add_server(sp1name, 'http://127.0.0.11:45081')
+
+ print "openid: Authenticate to IDP ...",
+ try:
+ sess.auth_to_idp(idpname)
+ except Exception as e: # pylint: disable=broad-except
+ print >> sys.stderr, " ERROR: %s" % repr(e)
+ sys.exit(1)
+ print " SUCCESS"
+
+ print "openid: Run OpenID Protocol ...",
+ try:
+ page = sess.fetch_page(idpname,
+ 'http://127.0.0.11:45081/?extensions=NO')
+ page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS')
+ except ValueError as e:
+ print >> sys.stderr, " ERROR: %s" % repr(e)
+ sys.exit(1)
+ print " SUCCESS"
+
+ print "openid: Run OpenID Protocol with extensions ...",
+ try:
+ page = sess.fetch_page(idpname,
+ 'http://127.0.0.11:45081/?extensions=YES')
+ page.expected_value('text()', 'SUCCESS, WITH EXTENSIONS')
+ except ValueError as e:
+ print >> sys.stderr, " ERROR: %s" % repr(e)
+ sys.exit(1)
+ print " SUCCESS"
diff --git a/tests/testmapping.py b/tests/testmapping.py
index 1bc69ec..d5e5dd0 100755
--- a/tests/testmapping.py
+++ b/tests/testmapping.py
@@ -65,7 +65,7 @@ def check_info_plugin(s, idp_name, urlbase, expected):
"""
Logout, login, fetch SP page to get the info variables and
compare the MELLON_ ones to what we expect. IDP and NAMEID are
- ignored. The authtest plugin returns no groups.
+ ignored.
"""
# Log out
@@ -195,6 +195,7 @@ if __name__ == '__main__':
'surname': user,
'givenname': 'Test User',
'email': '%s@example.com' % user,
+ 'groups': user,
}
check_info_plugin(sess, idpname, spurl, expect)
except Exception, e: # pylint: disable=broad-except
@@ -221,6 +222,7 @@ if __name__ == '__main__':
'surname': user,
'givenname': 'Test User',
'email': '%s@example.com' % user,
+ 'groups': user
}
check_info_plugin(sess, idpname, spurl, expect)
except Exception, e: # pylint: disable=broad-except