diff options
Diffstat (limited to 'python/tests/IdentityProvider.py')
| -rw-r--r-- | python/tests/IdentityProvider.py | 108 |
1 files changed, 53 insertions, 55 deletions
diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py index 2715c42a..27ca0265 100644 --- a/python/tests/IdentityProvider.py +++ b/python/tests/IdentityProvider.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -32,16 +31,16 @@ from websimulator import * class IdentityProvider(Provider): soapResponseMsgs = None - def __init__(self, test, internet, url): - Provider.__init__(self, test, internet, url) + def __init__(self, internet, url): + Provider.__init__(self, internet, url) self.soapResponseMsgs = {} - def singleSignOn(self, httpRequest): + def singleSignOn(self, handler): server = self.getServer() - if httpRequest.method == 'GET': + if handler.httpRequest.method == 'GET': # Single sign-on using HTTP redirect. login = lasso.Login.new(server) - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) webUser = None if webSession is not None: if webSession.sessionDump is not None: @@ -49,29 +48,28 @@ class IdentityProvider(Provider): webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: login.set_identity_from_dump(webUser.identityDump) - login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect) + login.init_from_authn_request_msg(handler.httpRequest.query, lasso.httpMethodRedirect) if not login.must_authenticate(): userAuthenticated = webUser is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME return self.singleSignOn_part2( - httpRequest, login, webSession, webUser, userAuthenticated, - authenticationMethod) + handler, login, webSession, webUser, userAuthenticated, authenticationMethod) if webSession is None: - webSession = self.createWebSession(httpRequest.client) + webSession = self.createWebSession(handler.httpRequest.client) webSession.loginDump = login.dump() # A real identity provider using a HTML form to ask user's login & password would store # idpLoginDump in a session variable and display the HTML login form. - webUserId = httpRequest.client.keyring.get(self.url, None) + webUserId = handler.httpRequest.client.keyring.get(self.url, None) userAuthenticated = webUserId in self.webUsers if userAuthenticated: webSession.webUserId = webUserId authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME server = self.getServer() - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) loginDump = webSession.loginDump del webSession.loginDump login = lasso.Login.new_from_dump(server, loginDump) @@ -83,12 +81,12 @@ class IdentityProvider(Provider): login.set_identity_from_dump(webUser.identityDump) return self.singleSignOn_part2( - httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod) - elif httpRequest.method == 'POST' \ - and httpRequest.headers.get('Content-Type', None) == 'text/xml': + handler, login, webSession, webUser, userAuthenticated, authenticationMethod) + elif handler.httpRequest.method == 'POST' \ + and handler.httpRequest.headers.get('Content-Type', None) == 'text/xml': # SOAP request => LECP single sign-on. lecp = lasso.Lecp.new(server) - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) webUser = None if webSession is not None: if webSession.sessionDump is not None: @@ -96,25 +94,24 @@ class IdentityProvider(Provider): webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: lecp.set_identity_from_dump(webUser.identityDump) - lecp.init_from_authn_request_msg(httpRequest.body, lasso.httpMethodSoap) + lecp.init_from_authn_request_msg(handler.httpRequest.body, lasso.httpMethodSoap) # FIXME: lecp.must_authenticate() should always return False. # The other solution is that we are able to not call lecp.must_authenticate() - # self.failIf(lecp.must_authenticate()) + # failIf(lecp.must_authenticate()) userAuthenticated = webUser is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME # FIXME: It is very very strange that we don't provide userAuthenticated, # authenticationMethod, reauthenticateOnOrAfter' to lecp before or when build response. lecp.build_authn_response_envelope_msg() soapResponseMsg = lecp.msg_body - self.failUnless(soapResponseMsg) + failUnless(soapResponseMsg) # FIXME: Lasso should set a lecp.msg_content_type to # "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with # other profiles. # contentType = lecp.msg_content_type - # self.failUnlessEqual(contentType, 'application/vnd.liberty-response+xml') + # failUnlessEqual(contentType, 'application/vnd.liberty-response+xml') contentType = 'application/vnd.liberty-response+xml' - return self.newHttpResponse( - 200, + return handler.respond( headers = { 'Content-Type': contentType, 'Cache-Control': 'no-cache', @@ -122,60 +119,61 @@ class IdentityProvider(Provider): }, body = soapResponseMsg) else: - return self.newHttpResponse( - 400, 'Bad Request: Method %s not handled by singleSignOn' % httpRequest.method) + return handler.respond( + 400, + 'Bad Request: Method %s not handled by singleSignOn' % handler.httpRequest.method) - def singleSignOn_part2(self, httpRequest, login, webSession, webUser, userAuthenticated, + def singleSignOn_part2(self, handler, login, webSession, webUser, userAuthenticated, authenticationMethod): - self.failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME + failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME login.build_artifact_msg( userAuthenticated, authenticationMethod, 'FIXME: reauthenticateOnOrAfter', lasso.httpMethodRedirect) if userAuthenticated: if login.is_identity_dirty(): identityDump = login.get_identity().dump() - self.failUnless(identityDump) + failUnless(identityDump) webUser.identityDump = identityDump - self.failUnless(login.is_session_dirty()) + failUnless(login.is_session_dirty()) sessionDump = login.get_session().dump() - self.failUnless(sessionDump) + failUnless(sessionDump) webSession.sessionDump = sessionDump nameIdentifier = login.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId else: - self.failIf(login.is_identity_dirty()) - self.failIf(login.is_session_dirty()) + failIf(login.is_identity_dirty()) + failIf(login.is_session_dirty()) artifact = login.assertionArtifact - self.failUnless(artifact) + failUnless(artifact) soapResponseMsg = login.response_dump - self.failUnless(soapResponseMsg) + failUnless(soapResponseMsg) self.soapResponseMsgs[artifact] = soapResponseMsg responseUrl = login.msg_url - self.failUnless(responseUrl) - return httpRequest.client.redirect(responseUrl) + failUnless(responseUrl) + return handler.respondRedirectTemporarily(responseUrl) - def soapEndpoint(self, httpRequest): - soapRequestMsg = httpRequest.body + def soapEndpoint(self, handler): + soapRequestMsg = handler.httpRequest.body requestType = lasso.get_request_type_from_soap_msg(soapRequestMsg) if requestType == lasso.requestTypeLogin: server = self.getServer() login = lasso.Login.new(server) login.process_request_msg(soapRequestMsg) artifact = login.assertionArtifact - self.failUnless(artifact) + failUnless(artifact) soapResponseMsg = self.soapResponseMsgs.get(artifact, None) if soapResponseMsg is None: raise Exception('FIXME: Handle the case when artifact is wrong') - return self.newHttpResponse( - 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) + return handler.respond( + headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) elif requestType == lasso.requestTypeLogout: server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeIdp) logout.process_request_msg(soapRequestMsg, lasso.httpMethodSoap) nameIdentifier = logout.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) # Retrieve session dump and identity dump using name identifier. webSession = self.getWebSessionFromNameIdentifier(nameIdentifier) @@ -196,12 +194,12 @@ class IdentityProvider(Provider): logout.set_identity_from_dump(identityDump) logout.validate_request() - self.failIf(logout.is_identity_dirty()) + failIf(logout.is_identity_dirty()) identity = logout.get_identity() - self.failUnless(identity) + failUnless(identity) identityDump = identity.dump() - self.failUnless(identityDump) - self.failUnless(logout.is_session_dirty()) + failUnless(identityDump) + failUnless(logout.is_session_dirty()) # Log the user out. # It is done before logout from other service providers, since we don't want to @@ -212,7 +210,7 @@ class IdentityProvider(Provider): # anyway. del self.webSessions[webSession.uniqueId] nameIdentifier = logout.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) del self.webSessionIdsByNameIdentifier[nameIdentifier] # Tell each other service provider to logout the user. @@ -222,21 +220,21 @@ class IdentityProvider(Provider): logout.build_request_msg() soapEndpoint = logout.msg_url - self.failUnless(soapEndpoint) + failUnless(soapEndpoint) soapRequestMsg = logout.msg_body - self.failUnless(soapRequestMsg) + failUnless(soapRequestMsg) httpResponse = sendHttpRequest( 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) otherProviderId = logout.get_next_providerID() logout.build_response_msg() soapResponseMsg = logout.msg_body - self.failUnless(soapResponseMsg) - return self.newHttpResponse( - 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) + failUnless(soapResponseMsg) + return handler.respond( + headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) else: raise Exception('Unknown request type: %s' % requestType) |
