summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorEmmanuel Raviart <eraviart@entrouvert.com>2004-08-07 20:42:02 +0000
committerEmmanuel Raviart <eraviart@entrouvert.com>2004-08-07 20:42:02 +0000
commitb46a6f80382d309a4e0c4ebdca346c296b66a789 (patch)
treeae84b88cb68be02c4768bf2340e33e8f719f7da5 /python
parent8d90adf21cc3023d92f8d264a510e9705c32ad81 (diff)
downloadlasso-b46a6f80382d309a4e0c4ebdca346c296b66a789.tar.gz
lasso-b46a6f80382d309a4e0c4ebdca346c296b66a789.tar.xz
lasso-b46a6f80382d309a4e0c4ebdca346c296b66a789.zip
Added LECP support in Python simulator and unit tests. I think I have found
several bugs in Lasso LECP implementation. My biggest problem is that I didn't find a way for IDP to set userAuthenticated, authenticationMethod, reauthenticateOnOrAfter to lecp before (or when) building response envelope with lecp.build_authn_response_envelope_msg(). Did I overlook something?
Diffstat (limited to 'python')
-rw-r--r--python/tests/IdentityProvider.py125
-rw-r--r--python/tests/LibertyEnabledClient.py107
-rw-r--r--python/tests/Provider.py10
-rw-r--r--python/tests/ServiceProvider.py160
-rw-r--r--python/tests/login_tests.py83
-rw-r--r--python/tests/websimulator.py76
6 files changed, 437 insertions, 124 deletions
diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py
index e1cfef5b..2715c42a 100644
--- a/python/tests/IdentityProvider.py
+++ b/python/tests/IdentityProvider.py
@@ -21,8 +21,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-# FIXME: Replace principal with client in most methods.
-# FIXME: Rename webUser to userAccount.
import lasso
@@ -40,49 +38,92 @@ class IdentityProvider(Provider):
def singleSignOn(self, httpRequest):
server = self.getServer()
- login = lasso.Login.new(server)
- webSession = self.getWebSession(httpRequest.client)
- webUser = None
- if webSession is not None:
+ if httpRequest.method == 'GET':
+ # Single sign-on using HTTP redirect.
+ login = lasso.Login.new(server)
+ webSession = self.getWebSession(httpRequest.client)
+ webUser = None
+ if webSession is not None:
+ if webSession.sessionDump is not None:
+ login.set_session_from_dump(webSession.sessionDump)
+ webUser = self.getWebUserFromWebSession(webSession)
+ if webUser is not None and webUser.identityDump is not None:
+ login.set_identity_from_dump(webUser.identityDump)
+ login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect)
+
+ if not login.must_authenticate():
+ userAuthenticated = webUser is not None
+ authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
+ return self.singleSignOn_part2(
+ httpRequest, login, webSession, webUser, userAuthenticated,
+ authenticationMethod)
+
+ if webSession is None:
+ webSession = self.createWebSession(httpRequest.client)
+ webSession.loginDump = login.dump()
+
+ # A real identity provider using a HTML form to ask user's login & password would store
+ # idpLoginDump in a session variable and display the HTML login form.
+ webUserId = httpRequest.client.keyring.get(self.url, None)
+ userAuthenticated = webUserId in self.webUsers
+ if userAuthenticated:
+ webSession.webUserId = webUserId
+ authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
+
+ server = self.getServer()
+ webSession = self.getWebSession(httpRequest.client)
+ loginDump = webSession.loginDump
+ del webSession.loginDump
+ login = lasso.Login.new_from_dump(server, loginDump)
+ # Set identity & session in login, because loginDump doesn't contain them.
if webSession.sessionDump is not None:
login.set_session_from_dump(webSession.sessionDump)
webUser = self.getWebUserFromWebSession(webSession)
if webUser is not None and webUser.identityDump is not None:
login.set_identity_from_dump(webUser.identityDump)
- login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect)
- if not login.must_authenticate():
- userAuthenticated = webUser is not None
- authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
return self.singleSignOn_part2(
httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod)
-
- if webSession is None:
- webSession = self.createWebSession(httpRequest.client)
- webSession.loginDump = login.dump()
-
- # A real identity provider using a HTML form to ask user's login & password would store
- # idpLoginDump in a session variable and display the HTML login form.
- webUserId = httpRequest.client.keyring.get(self.url, None)
- userAuthenticated = webUserId in self.webUsers
- if userAuthenticated:
- webSession.webUserId = webUserId
- authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
-
- server = self.getServer()
- webSession = self.getWebSession(httpRequest.client)
- loginDump = webSession.loginDump
- del webSession.loginDump
- login = lasso.Login.new_from_dump(server, loginDump)
- # Set identity & session in login, because loginDump doesn't contain them.
- if webSession.sessionDump is not None:
- login.set_session_from_dump(webSession.sessionDump)
- webUser = self.getWebUserFromWebSession(webSession)
- if webUser is not None and webUser.identityDump is not None:
- login.set_identity_from_dump(webUser.identityDump)
-
- return self.singleSignOn_part2(
- httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod)
+ elif httpRequest.method == 'POST' \
+ and httpRequest.headers.get('Content-Type', None) == 'text/xml':
+ # SOAP request => LECP single sign-on.
+ lecp = lasso.Lecp.new(server)
+ webSession = self.getWebSession(httpRequest.client)
+ webUser = None
+ if webSession is not None:
+ if webSession.sessionDump is not None:
+ lecp.set_session_from_dump(webSession.sessionDump)
+ webUser = self.getWebUserFromWebSession(webSession)
+ if webUser is not None and webUser.identityDump is not None:
+ lecp.set_identity_from_dump(webUser.identityDump)
+ lecp.init_from_authn_request_msg(httpRequest.body, lasso.httpMethodSoap)
+ # FIXME: lecp.must_authenticate() should always return False.
+ # The other solution is that we are able to not call lecp.must_authenticate()
+ # self.failIf(lecp.must_authenticate())
+ userAuthenticated = webUser is not None
+ authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
+ # FIXME: It is very very strange that we don't provide userAuthenticated,
+ # authenticationMethod, reauthenticateOnOrAfter' to lecp before or when build response.
+ lecp.build_authn_response_envelope_msg()
+ soapResponseMsg = lecp.msg_body
+ self.failUnless(soapResponseMsg)
+ # FIXME: Lasso should set a lecp.msg_content_type to
+ # "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with
+ # other profiles.
+ # contentType = lecp.msg_content_type
+ # self.failUnlessEqual(contentType, 'application/vnd.liberty-response+xml')
+ contentType = 'application/vnd.liberty-response+xml'
+ return self.newHttpResponse(
+ 200,
+ headers = {
+ 'Content-Type': contentType,
+ 'Cache-Control': 'no-cache',
+ 'Pragma': 'no-cache',
+ },
+ body = soapResponseMsg)
+ else:
+ return self.newHttpResponse(
+ 400, 'Bad Request: Method %s not handled by singleSignOn' % httpRequest.method)
def singleSignOn_part2(self, httpRequest, login, webSession, webUser, userAuthenticated,
authenticationMethod):
@@ -127,7 +168,8 @@ class IdentityProvider(Provider):
soapResponseMsg = self.soapResponseMsgs.get(artifact, None)
if soapResponseMsg is None:
raise Exception('FIXME: Handle the case when artifact is wrong')
- return HttpResponse(200, body = soapResponseMsg)
+ return self.newHttpResponse(
+ 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg)
elif requestType == lasso.requestTypeLogout:
server = self.getServer()
logout = lasso.Logout.new(server, lasso.providerTypeIdp)
@@ -183,7 +225,9 @@ class IdentityProvider(Provider):
self.failUnless(soapEndpoint)
soapRequestMsg = logout.msg_body
self.failUnless(soapRequestMsg)
- httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
+ httpResponse = sendHttpRequest(
+ 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
+ body = soapRequestMsg)
self.failUnlessEqual(httpResponse.statusCode, 200)
logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
@@ -192,6 +236,7 @@ class IdentityProvider(Provider):
logout.build_response_msg()
soapResponseMsg = logout.msg_body
self.failUnless(soapResponseMsg)
- return HttpResponse(200, body = soapResponseMsg)
+ return self.newHttpResponse(
+ 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg)
else:
raise Exception('Unknown request type: %s' % requestType)
diff --git a/python/tests/LibertyEnabledClient.py b/python/tests/LibertyEnabledClient.py
new file mode 100644
index 00000000..397999bf
--- /dev/null
+++ b/python/tests/LibertyEnabledClient.py
@@ -0,0 +1,107 @@
+# -*- coding: UTF-8 -*-
+
+
+# Python Lasso Simulator
+#
+# Copyright (C) 2004 Entr'ouvert
+# http://lasso.entrouvert.org
+#
+# Author: Emmanuel Raviart <eraviart@entrouvert.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+
+import lasso
+
+from websimulator import *
+
+
+class LibertyEnabledClient(WebClient, Simulation):
+ # A service provider MAY provide a list of identity providers it recognizes by including the
+ # <lib:IDPList> element in the <lib:AuthnRequestEnvelope>. The format and processing rules for
+ # the identity provider list MUST be as defined in [LibertyProtSchema].
+
+ # The identity provider list can be used by the LECP to create a user identifier to be
+ # presented to the Principal. For example, the LECP could compare the list of the Principal's
+ # known identities (and the identities of the identity provider that provides those identities)
+ # against the list provided by the service provider and then only display the intersection.
+
+ # If the service provider does not support the LECP-advertised Liberty version, the service
+ # provider MUST return to the LECP an HTTP 501 response with the reason phrase "Unsupported
+ # Liberty Version."
+ #
+ # The responses in step 3 and step 6 SHOULD NOT be cached. To this end service providers and
+ # identity providers SHOULD place both "Cache-Control: no-cache" and "Pragma: no-cache" on
+ # their responses to ensure that the LECP and any intervening proxies will not cache the
+ # response.
+
+ # If the LECP discovers a syntax error due to the service provider or cannot proceed any
+ # further for other reasons (for example, cannot resolve identity provider, cannot reach the
+ # identity provider, etc.), the LECP MUST return to the service provider a <lib:AuthnResponse>
+ # with a <samlp:Status> indicating the desired error element as defined in
+ # [LibertyProtSchema]. The <lib:AuthnResponse> containing the error status MUST be sent using
+ # a POST to the service provider's assertion consumer service URL obtained from the
+ # <lib:AssertionConsumerServiceURL> element of the <lib:AuthnRequestEnvelope>. The POST MUST
+ # be a form that contains the field LARES with the value being the <lib:AuthnResponse>
+ # protocol message as defined in [LibertyProtSchema], containing the <samlp:Status>. The
+ # <lib:AuthnResponse> MUST be encoded by applying a base64 transformation (refer to
+ # [RFC2045]) to the <lib:AuthnResponse> and all its elements.
+
+ # FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without
+ # metadata, instead of using 'singleSignOnServiceUrl'.
+ idpSingleSignOnServiceUrl = None
+ requestHeaders = WebClient.requestHeaders.copy()
+ requestHeaders.update({
+ # FIXME: Is this the correct syntax for several URLs in LIBV?
+ 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1',
+ 'Liberty-Agent': 'LassoSimulator/0.0.0',
+ # FIXME: As an alternative to 'Liberty-Enabled' header, a user agent may use:
+ # 'User-Agent': ' '.join((
+ # requestHeaders['User-Agent'],
+ # 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1'))
+ 'Accept': ','.join((requestHeaders['Accept'], 'application/vnd.liberty-request+xml'))
+ })
+
+ def __init__(self, test, internet):
+ Simulation.__init__(self, test)
+ WebClient.__init__(self, internet)
+
+ def login(self, principal, site, path):
+ httpResponse = self.sendHttpRequestToSite(site, 'GET', path)
+ self.failUnlessEqual(
+ httpResponse.headers['Content-Type'], 'application/vnd.liberty-request+xml')
+ lecp = lasso.Lecp.new(None)
+ authnRequestEnvelope = httpResponse.body
+ # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded. I think this
+ # is an error.
+ import base64
+ authnRequestEnvelope = base64.encodestring(authnRequestEnvelope)
+ lecp.process_authn_request_envelope_msg(authnRequestEnvelope)
+ lecp.build_authn_request_msg()
+ # FIXME: The service provider could return an IDPList in authnRequestEnvelope, so that
+ # we verify that self.idpSingleSignOnServiceUrl belongs to one of them
+ httpResponse = self.sendHttpRequest(
+ 'POST', self.idpSingleSignOnServiceUrl, headers = {'Content-Type': 'text/xml'},
+ body = lecp.msg_body)
+ self.failUnlessEqual(
+ httpResponse.headers.get('Content-Type', None), 'application/vnd.liberty-response+xml')
+ lecp.process_authn_response_envelope_msg(httpResponse.body)
+ lecp.build_authn_response_msg()
+ self.failUnless(lecp.msg_url)
+ self.failUnless(lecp.msg_body)
+ # FIXME: Should we use 'multipart/form-data' for forms?
+ return self.sendHttpRequest(
+ 'POST', lecp.msg_url, headers = {'Content-Type': 'multipart/form-data'},
+ form = {'LARES': lecp.msg_body})
diff --git a/python/tests/Provider.py b/python/tests/Provider.py
index 809376d4..88406814 100644
--- a/python/tests/Provider.py
+++ b/python/tests/Provider.py
@@ -21,8 +21,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-# FIXME: Replace principal with client in most methods.
-# FIXME: Rename webUser to userAccount.
import lasso
@@ -31,5 +29,13 @@ from websimulator import *
class Provider(WebSite):
+ responseHeaders = WebSite.responseHeaders.copy()
+ responseHeaders.update({
+ 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1',
+ })
+ serverDump = None
+ webUserIdsByNameIdentifier = None
+ webSessionIdsByNameIdentifier = None
+
def getServer(self):
return lasso.Server.new_from_dump(self.serverDump)
diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py
index 4ecff2df..c548655b 100644
--- a/python/tests/ServiceProvider.py
+++ b/python/tests/ServiceProvider.py
@@ -21,8 +21,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-# FIXME: Replace principal with client in most methods.
-# FIXME: Rename webUser to userAccount.
import lasso
@@ -37,23 +35,46 @@ class ServiceProvider(Provider):
def assertionConsumer(self, httpRequest):
server = self.getServer()
login = lasso.Login.new(server)
- login.init_request(httpRequest.query, lasso.httpMethodRedirect)
- login.build_request_msg()
- soapEndpoint = login.msg_url
- self.failUnless(soapEndpoint)
- soapRequestMsg = login.msg_body
- self.failUnless(soapRequestMsg)
- httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
- self.failUnlessEqual(httpResponse.statusCode, 200)
- try:
- login.process_response_msg(httpResponse.body)
- except lasso.Error, error:
- if error.code == -7: # FIXME: This will change, he said.
- return HttpResponse(
- 401, 'Access Unauthorized: User authentication failed on identity provider.')
- else:
- raise
+ if httpRequest.method == 'GET':
+ login.init_request(httpRequest.query, lasso.httpMethodRedirect)
+ login.build_request_msg()
+
+ soapEndpoint = login.msg_url
+ self.failUnless(soapEndpoint)
+ soapRequestMsg = login.msg_body
+ self.failUnless(soapRequestMsg)
+ httpResponse = self.sendHttpRequest(
+ 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
+ body = soapRequestMsg)
+ self.failUnlessEqual(httpResponse.statusCode, 200)
+ try:
+ login.process_response_msg(httpResponse.body)
+ except lasso.Error, error:
+ if error.code == -7: # FIXME: This will change, he said.
+ return self.newHttpResponse(
+ 401,
+ 'Access Unauthorized: User authentication failed on identity provider.')
+ else:
+ raise
+ elif httpRequest.method == 'POST':
+ authnResponseMsg = httpRequest.getFormField('LARES', None)
+ self.failUnless(authnResponseMsg)
+ # FIXME: Should we do an init before process_authn_response_msg?
+ try:
+ login.process_authn_response_msg(authnResponseMsg)
+ except lasso.Error, error:
+ if error.code == -7: # FIXME: This will change, he said.
+ return self.newHttpResponse(
+ 401,
+ 'Access Unauthorized: User authentication failed on identity provider.')
+ else:
+ raise
+ else:
+ return self.newHttpResponse(
+ 400,
+ 'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method)
+
nameIdentifier = login.nameIdentifier
self.failUnless(nameIdentifier)
@@ -100,7 +121,7 @@ class ServiceProvider(Provider):
webUserId = httpRequest.client.keyring.get(self.url, None)
userAuthenticated = webUserId in self.webUsers
if not userAuthenticated:
- return HttpResponse(401, 'Access Unauthorized: User has no account.')
+ return self.newHttpResponse(401, 'Access Unauthorized: User has no account.')
webUser = self.webUsers[webUserId]
webSession.webUserId = webUser.uniqueId
@@ -113,35 +134,91 @@ class ServiceProvider(Provider):
self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId
self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId
- return HttpResponse(200)
+ return self.newHttpResponse(200)
+
+ def login(self, httpRequest):
+ libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None)
+ userAgent = httpRequest.headers.get('User-Agent', None)
+ # FIXME: Lasso should have a function to compute useLecp.
+ # Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent)
+ useLecp = False
+ if libertyEnabled:
+ useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled
+ if not useLecp:
+ return self.newHttpResponse(501, 'Unsupported Liberty Version.')
+ elif userAgent:
+ useLecp = 'urn:liberty:iff:2003-08' in userAgent
+ if not useLecp and "LIBV=" in userAgent:
+ return self.newHttpResponse(501, 'Unsupported Liberty Version.')
+ else:
+ useLecp = False
- def loginUsingRedirect(self, httpRequest):
- server = self.getServer()
- login = lasso.Login.new(server)
- login.init_authn_request(self.idpSite.providerId)
- self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False)
- if forceAuthn:
- login.request.set_forceAuthn(forceAuthn)
isPassive = httpRequest.getQueryBoolean('isPassive', False)
- if not isPassive:
- login.request.set_isPassive(isPassive)
- login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
- login.request.set_consent(lasso.libConsentObtained)
- relayState = 'fake'
- login.request.set_relayState(relayState)
- login.build_authn_request_msg()
- authnRequestUrl = login.msg_url
- self.failUnless(authnRequestUrl)
- return httpRequest.client.redirect(authnRequestUrl)
+ server = self.getServer()
+ if useLecp:
+ lecp = lasso.Lecp.new(server)
+ lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None.
+ self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest)
+
+ # FIXME: This protocol profile should be set by default by Lasso.
+ lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost)
+
+ # Same treatement as for non LECP login.
+ if forceAuthn:
+ lecp.request.set_forceAuthn(forceAuthn)
+ if not isPassive:
+ lecp.request.set_isPassive(isPassive)
+ lecp.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
+ lecp.request.set_consent(lasso.libConsentObtained)
+ relayState = 'fake'
+ lecp.request.set_relayState(relayState)
+
+ # FIXME: In my opinion, this method should be the renamed to build_authn_request_msg.
+ lecp.build_authn_request_envelope_msg()
+ authnRequestEnvelopeMsg = lecp.msg_body
+ # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded.
+ import base64
+ authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg)
+ self.failUnless(authnRequestEnvelopeMsg)
+ # FIXME: Lasso should set a lecp.msg_content_type to
+ # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with
+ # other profiles.
+ # contentType = lecp.msg_content_type
+ # self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml')
+ contentType = 'application/vnd.liberty-request+xml'
+ return self.newHttpResponse(
+ 200,
+ headers = {
+ 'Content-Type': contentType,
+ 'Cache-Control': 'no-cache',
+ 'Pragma': 'no-cache',
+ },
+ body = authnRequestEnvelopeMsg)
+ else:
+ login = lasso.Login.new(server)
+ login.init_authn_request(self.idpSite.providerId)
+ self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
+ if forceAuthn:
+ login.request.set_forceAuthn(forceAuthn)
+ if not isPassive:
+ login.request.set_isPassive(isPassive)
+ login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
+ login.request.set_consent(lasso.libConsentObtained)
+ relayState = 'fake'
+ login.request.set_relayState(relayState)
+ login.build_authn_request_msg()
+ authnRequestUrl = login.msg_url
+ self.failUnless(authnRequestUrl)
+ return httpRequest.client.redirect(authnRequestUrl)
def logoutUsingSoap(self, httpRequest):
webSession = self.getWebSession(httpRequest.client)
if webSession is None:
- return HttpResponse(401, 'Access Unauthorized: User has no session opened.')
+ return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.')
webUser = self.getWebUserFromWebSession(webSession)
if webUser is None:
- return HttpResponse(401, 'Access Unauthorized: User is not logged in.')
+ return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.')
server = self.getServer()
logout = lasso.Logout.new(server, lasso.providerTypeSp)
@@ -158,7 +235,8 @@ class ServiceProvider(Provider):
self.failUnless(soapEndpoint)
soapRequestMsg = logout.msg_body
self.failUnless(soapRequestMsg)
- httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
+ httpResponse = self.sendHttpRequest(
+ 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg)
self.failUnlessEqual(httpResponse.statusCode, 200)
logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
@@ -185,4 +263,4 @@ class ServiceProvider(Provider):
self.failUnless(nameIdentifier)
del self.webSessionIdsByNameIdentifier[nameIdentifier]
- return HttpResponse(200)
+ return self.newHttpResponse(200)
diff --git a/python/tests/login_tests.py b/python/tests/login_tests.py
index 75b00aad..c3a1c46b 100644
--- a/python/tests/login_tests.py
+++ b/python/tests/login_tests.py
@@ -33,6 +33,7 @@ sys.path.insert(0, '../.libs')
import lasso
from IdentityProvider import IdentityProvider
+from LibertyEnabledClient import LibertyEnabledClient
from ServiceProvider import ServiceProvider
from websimulator import *
@@ -63,6 +64,24 @@ class LoginTestCase(unittest.TestCase):
# Frederic Peters has no account on identity provider.
return site
+ def generateLibertyEnabledClient(self, internet):
+ client = LibertyEnabledClient(self, internet)
+ # FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without
+ # metadata, instead of using 'singleSignOnServiceUrl'.
+## server = lasso.Server.new(
+## None, # A LECP has no metadata.
+## '../../examples/data/client-public-key.pem',
+## '../../examples/data/client-private-key.pem',
+## '../../examples/data/client-crt.pem',
+## lasso.signatureMethodRsaSha1)
+## server.add_provider(
+## '../../examples/data/idp-metadata.xml',
+## '../../examples/data/idp-public-key.pem',
+## '../../examples/data/ca-crt.pem')
+## client.server = server
+ client.idpSingleSignOnServiceUrl = 'https://identity-provider/singleSignOn'
+ return client
+
def generateSpSite(self, internet):
site = ServiceProvider(self, internet, 'https://service-provider/')
site.providerId = 'https://service-provider/metadata'
@@ -94,6 +113,19 @@ class LoginTestCase(unittest.TestCase):
## def tearDown(self):
## pass
+ def test00(self):
+ """LECP login."""
+ internet = Internet()
+ idpSite = self.generateIdpSite(internet)
+ spSite = self.generateSpSite(internet)
+ spSite.idpSite = idpSite
+ lec = self.generateLibertyEnabledClient(internet)
+ principal = Principal(internet, 'Romain Chantereau')
+ principal.keyring[idpSite.url] = 'Chantereau'
+ principal.keyring[spSite.url] = 'Romain'
+ httpResponse = lec.login(principal, spSite, '/login')
+ raise str((httpResponse.headers['Content-Type'], httpResponse.body))
+
def test01(self):
"""Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP."""
@@ -105,9 +137,9 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
self.failIf(spSite.webSessions)
self.failIf(idpSite.webSessions)
@@ -123,30 +155,28 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Once again. Now the principal already has a federation between spSite and idpSite.
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Once again. Do a new passive login between normal login and logout.
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
del principal.keyring[idpSite.url] # Ensure identity provider will be really passive.
- httpResponse = spSite.doHttpRequest(HttpRequest(
- principal, 'GET', '/loginUsingRedirect?isPassive=1'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
self.failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Once again, with isPassive and the user having no web session.
- httpResponse = spSite.doHttpRequest(HttpRequest(
- principal, 'GET', '/loginUsingRedirect?isPassive=1'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test03(self):
@@ -160,9 +190,9 @@ class LoginTestCase(unittest.TestCase):
# Frederic Peters has no account on identity provider.
principal.keyring[spSite.url] = 'Frederic'
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test04(self):
@@ -177,9 +207,9 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Nowicki'
# Christophe Nowicki has no account on service provider.
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test05(self):
@@ -193,8 +223,7 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
- httpResponse = spSite.doHttpRequest(HttpRequest(
- principal, 'GET', '/loginUsingRedirect?isPassive=1'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test06(self):
@@ -208,28 +237,24 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
- httpResponse = spSite.doHttpRequest(HttpRequest(
- principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Ask user to reauthenticate while he is already logged.
- httpResponse = spSite.doHttpRequest(HttpRequest(
- principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 200)
del principal.keyring[idpSite.url] # Ensure user can't authenticate.
- httpResponse = spSite.doHttpRequest(HttpRequest(
- principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Force authentication, but user won't authenticate.
- httpResponse = spSite.doHttpRequest(HttpRequest(
- principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
+ httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 401)
## def test06(self):
diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py
index 806b5dd5..65064a0a 100644
--- a/python/tests/websimulator.py
+++ b/python/tests/websimulator.py
@@ -21,6 +21,8 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+
# FIXME: Replace principal with client in most methods.
# FIXME: Rename webUser to userAccount.
@@ -28,21 +30,33 @@
class HttpRequest(object):
client = None # Principal or web site sending the request.
body = None
- header = None
+ form = None
+ headers = None
method = None # 'GET' or 'POST' or 'PUT' or...
url = None
- def __init__(self, client, method, url, body = None):
+ def __init__(self, client, method, url, headers = None, body = None, form = None):
self.client = client
self.method = method
self.url = url
+ if headers:
+ self.headers = headers
if body:
self.body = body
+ if form:
+ self.form = form
- def ask(self):
+ def send(self):
webSite = self.client.internet.getWebSite(self.url)
return webSite.doHttpRequest(self)
+ def getFormField(self, name, default = 'none'):
+ if self.form and name in self.form:
+ return self.form[name]
+ if default == 'none':
+ raise KeyError(name)
+ return default
+
def getQueryBoolean(self, name, default = 'none'):
try:
fieldValue = self.getQueryField(name)
@@ -60,8 +74,7 @@ class HttpRequest(object):
return ''
def getQueryField(self, name, default = 'none'):
- query = self.query
- if query:
+ if self.query:
for field in self.query.split('&'):
fieldName, fieldValue = field.split('=')
if name == fieldName:
@@ -75,14 +88,16 @@ class HttpRequest(object):
class HttpResponse(object):
body = None
- header = None
+ headers = None
statusCode = None # 200 or...
statusMessage = None
- def __init__(self, statusCode, statusMessage = None, body = None):
+ def __init__(self, statusCode, statusMessage = None, headers = None, body = None):
self.statusCode = statusCode
if statusMessage:
self.statusMessage = statusMessage
+ if headers:
+ self.headers = headers
if body:
self.body = body
@@ -137,6 +152,10 @@ class Simulation(object):
class WebClient(object):
internet = None
keyring = None
+ requestHeaders = {
+ 'User-Agent': 'LassoSimulator/0.0.0',
+ 'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html',
+ }
webSessionIds = None # Simulate the cookies, stored in user's navigator, and containing the
# IDs of sessions already opened by the user.
@@ -146,8 +165,30 @@ class WebClient(object):
self.webSessionIds = {}
def redirect(self, url):
- webSite = self.internet.getWebSite(url)
- return webSite.doHttpRequest(HttpRequest(self, 'GET', url))
+ return self.sendHttpRequest('GET', url)
+
+ def sendHttpRequest(self, method, url, headers = None, body = None, form = None):
+ requestHeaders = self.requestHeaders
+ if headers:
+ requestHeaders = self.requestHeaders.copy()
+ for name, value in headers.iteritems():
+ requestHeaders[name] = value
+ else:
+ requestHeaders = self.requestHeaders
+ return HttpRequest(
+ self, method, url, headers = requestHeaders, body = body, form = form).send()
+
+ def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None,
+ form = None):
+ url = webSite.url
+ if path:
+ if path[0] == '/':
+ while url[-1] == '/':
+ url = url[:-1]
+ elif url[-1] != '/':
+ url += '/'
+ url += path
+ return self.sendHttpRequest(method, url, headers = headers, body = body, form = form)
class Principal(WebClient):
@@ -189,11 +230,11 @@ class WebSite(WebClient, Simulation):
lastWebSessionId = 0
providerId = None # The Liberty providerID of this web site
- serverDump = None
+ responseHeaders = {
+ 'Server': 'Lasso Simulator Web Server',
+ }
url = None # The main URL of web site
- webUserIdsByNameIdentifier = None
webUsers = None
- webSessionIdsByNameIdentifier = None
webSessions = None
def __init__(self, test, internet, url):
@@ -267,3 +308,14 @@ class WebSite(WebClient, Simulation):
# The user has no account on this site.
return None
return self.webUsers.get(webUserId, None)
+
+ def newHttpResponse(self, statusCode, statusMessage = None, headers = None, body = None):
+ responseHeaders = self.responseHeaders
+ if headers:
+ responseHeaders = self.responseHeaders.copy()
+ for name, value in headers.iteritems():
+ responseHeaders[name] = value
+ else:
+ responseHeaders = self.responseHeaders
+ return HttpResponse(
+ statusCode, statusMessage = statusMessage, headers = headers, body = body)