summaryrefslogtreecommitdiffstats
path: root/python/tests/IdentityProvider.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests/IdentityProvider.py')
-rw-r--r--python/tests/IdentityProvider.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py
new file mode 100644
index 00000000..046d6f0d
--- /dev/null
+++ b/python/tests/IdentityProvider.py
@@ -0,0 +1,182 @@
+# -*- coding: UTF-8 -*-
+
+
+# Python Lasso Simulator
+#
+# Copyright (C) 2004 Entr'ouvert
+# http://lasso.entrouvert.org
+#
+# Author: Emmanuel Raviart <eraviart@entrouvert.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+# FIXME: Replace principal with client in most methods.
+# FIXME: Rename webUser to userAccount.
+
+
+import lasso
+
+from Provider import Provider
+from websimulator import *
+
+
+class IdentityProvider(Provider):
+ soapResponseMsgs = None
+
+ def __init__(self, test, internet, url):
+ Provider.__init__(self, test, internet, url)
+ self.soapResponseMsgs = {}
+
+ def singleSignOn(self, httpRequest):
+ server = self.getServer()
+ login = lasso.Login.new(server)
+ identityDump = self.getIdentityDump(httpRequest.client)
+ if identityDump is not None:
+ login.set_identity_from_dump(identityDump)
+ sessionDump = self.getSessionDump(httpRequest.client)
+ if sessionDump is not None:
+ login.set_session_from_dump(sessionDump)
+ authnRequestQuery = self.extractQueryFromUrl(httpRequest.url)
+ login.init_from_authn_request_msg(authnRequestQuery, lasso.httpMethodRedirect)
+
+ self.failUnless(login.must_authenticate()) # FIXME: To improve.
+ webSession = self.getWebSession(httpRequest.client)
+ if webSession is None:
+ webSession = self.createWebSession(httpRequest.client)
+ webSession.loginDump = login.dump()
+
+ # A real identity provider using a HTML form to ask user's login & password would store
+ # idpLoginDump in a session variable and display the HTML login form.
+ webUserId = httpRequest.client.keyring.get(self.url, None)
+ userAuthenticated = webUserId in self.webUsers
+ if userAuthenticated:
+ webSession.webUserId = webUserId
+ authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
+
+ server = self.getServer()
+ webSession = self.getWebSession(httpRequest.client)
+ loginDump = webSession.loginDump
+ del webSession.loginDump
+ login = lasso.Login.new_from_dump(server, loginDump)
+ # Set identity & session in login, because loginDump doesn't contain them.
+ identityDump = self.getIdentityDump(httpRequest.client)
+ if identityDump is not None:
+ login.set_identity_from_dump(identityDump)
+ sessionDump = self.getSessionDump(httpRequest.client)
+ if sessionDump is not None:
+ login.set_session_from_dump(sessionDump)
+ self.failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME
+ login.build_artifact_msg(
+ userAuthenticated, authenticationMethod, "FIXME: reauthenticateOnOrAfter",
+ lasso.httpMethodRedirect)
+ webUser = self.getWebUserFromWebSession(webSession)
+ if login.is_identity_dirty():
+ identityDump = login.get_identity().dump()
+ self.failUnless(identityDump)
+ webUser.identityDump = identityDump
+ self.failUnless(login.is_session_dirty())
+ sessionDump = login.get_session().dump()
+ self.failUnless(sessionDump)
+ webSession.sessionDump = sessionDump
+ nameIdentifier = login.nameIdentifier
+ self.failUnless(nameIdentifier)
+ self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId
+ self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId
+ artifact = login.assertionArtifact
+ self.failUnless(artifact)
+ soapResponseMsg = login.response_dump
+ self.failUnless(soapResponseMsg)
+ self.soapResponseMsgs[artifact] = soapResponseMsg
+ responseUrl = login.msg_url
+ self.failUnless(responseUrl)
+ return httpRequest.client.redirect(responseUrl)
+
+ def soapEndpoint(self, httpRequest):
+ soapRequestMsg = httpRequest.body
+ requestType = lasso.get_request_type_from_soap_msg(soapRequestMsg)
+ if requestType == lasso.requestTypeLogin:
+ server = self.getServer()
+ login = lasso.Login.new(server)
+ login.process_request_msg(soapRequestMsg)
+ artifact = login.assertionArtifact
+ self.failUnless(artifact)
+ soapResponseMsg = self.soapResponseMsgs.get(artifact, None)
+ if soapResponseMsg is None:
+ raise Exception("FIXME: Handle the case when artifact is wrong")
+ return HttpResponse(200, body = soapResponseMsg)
+ elif requestType == lasso.requestTypeLogout:
+ server = self.getServer()
+ logout = lasso.Logout.new(server, lasso.providerTypeIdp)
+ logout.process_request_msg(soapRequestMsg, lasso.httpMethodSoap)
+ nameIdentifier = logout.nameIdentifier
+ self.failUnless(nameIdentifier)
+
+ # Retrieve session dump and identity dump using name identifier.
+ webSession = self.getWebSessionFromNameIdentifier(nameIdentifier)
+ if webSession is None:
+ raise Exception("FIXME: Handle the case when there is no web session")
+ sessionDump = webSession.sessionDump
+ if sessionDump is None:
+ raise Exception(
+ "FIXME: Handle the case when there is no session dump in web session")
+ logout.set_session_from_dump(sessionDump)
+ webUser = self.getWebUserFromNameIdentifier(nameIdentifier)
+ if webUser is None:
+ raise Exception("FIXME: Handle the case when there is no web user")
+ identityDump = webUser.identityDump
+ if identityDump is None:
+ raise Exception(
+ "FIXME: Handle the case when there is no identity dump in web user")
+ logout.set_identity_from_dump(identityDump)
+
+ logout.validate_request()
+ self.failIf(logout.is_identity_dirty())
+ identity = logout.get_identity()
+ self.failUnless(identity)
+ identityDump = identity.dump()
+ self.failUnless(identityDump)
+ self.failUnless(logout.is_session_dirty())
+ session = logout.get_session()
+ if session is None:
+ del webSession.sessionDump
+ else:
+ sessionDump = session.dump()
+ self.failUnless(sessionDump)
+ webSession.sessionDump = sessionDump
+ nameIdentifier = logout.nameIdentifier
+ self.failUnless(nameIdentifier)
+ del self.webSessionIdsByNameIdentifier[nameIdentifier]
+
+ # Tell each other service provider to logout the user.
+ otherProviderId = logout.get_next_providerID()
+ while otherProviderId is not None:
+ logout.init_request(otherProviderId)
+ logout.build_request_msg()
+
+ soapEndpoint = logout.msg_url
+ self.failUnless(soapEndpoint)
+ soapRequestMsg = logout.msg_body
+ self.failUnless(soapRequestMsg)
+ httpResponse = HttpRequest(self, "POST", soapEndpoint, body = soapRequestMsg).ask()
+ self.failUnlessEqual(httpResponse.statusCode, 200)
+ logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
+
+ otherProviderId = logout.get_next_providerID()
+
+ logout.build_response_msg()
+ soapResponseMsg = logout.msg_body
+ self.failUnless(soapResponseMsg)
+ return HttpResponse(200, body = soapResponseMsg)
+ else:
+ raise Exception("Unknown request type: %s" % requestType)