# -*- coding: UTF-8 -*- # Python Lasso Simulator # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org # # Author: Emmanuel Raviart # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import lasso from Provider import Provider from websimulator import * class IdentityProvider(Provider): soapResponseMsgs = None def __init__(self, test, internet, url): Provider.__init__(self, test, internet, url) self.soapResponseMsgs = {} def singleSignOn(self, httpRequest): server = self.getServer() if httpRequest.method == 'GET': # Single sign-on using HTTP redirect. login = lasso.Login.new(server) webSession = self.getWebSession(httpRequest.client) webUser = None if webSession is not None: if webSession.sessionDump is not None: login.set_session_from_dump(webSession.sessionDump) webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: login.set_identity_from_dump(webUser.identityDump) login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect) if not login.must_authenticate(): userAuthenticated = webUser is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME return self.singleSignOn_part2( httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod) if webSession is None: webSession = self.createWebSession(httpRequest.client) webSession.loginDump = login.dump() # A real identity provider using a HTML form to ask user's login & password would store # idpLoginDump in a session variable and display the HTML login form. webUserId = httpRequest.client.keyring.get(self.url, None) userAuthenticated = webUserId in self.webUsers if userAuthenticated: webSession.webUserId = webUserId authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME server = self.getServer() webSession = self.getWebSession(httpRequest.client) loginDump = webSession.loginDump del webSession.loginDump login = lasso.Login.new_from_dump(server, loginDump) # Set identity & session in login, because loginDump doesn't contain them. if webSession.sessionDump is not None: login.set_session_from_dump(webSession.sessionDump) webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: login.set_identity_from_dump(webUser.identityDump) return self.singleSignOn_part2( httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod) elif httpRequest.method == 'POST' \ and httpRequest.headers.get('Content-Type', None) == 'text/xml': # SOAP request => LECP single sign-on. lecp = lasso.Lecp.new(server) webSession = self.getWebSession(httpRequest.client) webUser = None if webSession is not None: if webSession.sessionDump is not None: lecp.set_session_from_dump(webSession.sessionDump) webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: lecp.set_identity_from_dump(webUser.identityDump) lecp.init_from_authn_request_msg(httpRequest.body, lasso.httpMethodSoap) # FIXME: lecp.must_authenticate() should always return False. # The other solution is that we are able to not call lecp.must_authenticate() # self.failIf(lecp.must_authenticate()) userAuthenticated = webUser is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME # FIXME: It is very very strange that we don't provide userAuthenticated, # authenticationMethod, reauthenticateOnOrAfter' to lecp before or when build response. lecp.build_authn_response_envelope_msg() soapResponseMsg = lecp.msg_body self.failUnless(soapResponseMsg) # FIXME: Lasso should set a lecp.msg_content_type to # "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with # other profiles. # contentType = lecp.msg_content_type # self.failUnlessEqual(contentType, 'application/vnd.liberty-response+xml') contentType = 'application/vnd.liberty-response+xml' return self.newHttpResponse( 200, headers = { 'Content-Type': contentType, 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', }, body = soapResponseMsg) else: return self.newHttpResponse( 400, 'Bad Request: Method %s not handled by singleSignOn' % httpRequest.method) def singleSignOn_part2(self, httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod): self.failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME login.build_artifact_msg( userAuthenticated, authenticationMethod, 'FIXME: reauthenticateOnOrAfter', lasso.httpMethodRedirect) if userAuthenticated: if login.is_identity_dirty(): identityDump = login.get_identity().dump() self.failUnless(identityDump) webUser.identityDump = identityDump self.failUnless(login.is_session_dirty()) sessionDump = login.get_session().dump() self.failUnless(sessionDump) webSession.sessionDump = sessionDump nameIdentifier = login.nameIdentifier self.failUnless(nameIdentifier) self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId else: self.failIf(login.is_identity_dirty()) self.failIf(login.is_session_dirty()) artifact = login.assertionArtifact self.failUnless(artifact) soapResponseMsg = login.response_dump self.failUnless(soapResponseMsg) self.soapResponseMsgs[artifact] = soapResponseMsg responseUrl = login.msg_url self.failUnless(responseUrl) return httpRequest.client.redirect(responseUrl) def soapEndpoint(self, httpRequest): soapRequestMsg = httpRequest.body requestType = lasso.get_request_type_from_soap_msg(soapRequestMsg) if requestType == lasso.requestTypeLogin: server = self.getServer() login = lasso.Login.new(server) login.process_request_msg(soapRequestMsg) artifact = login.assertionArtifact self.failUnless(artifact) soapResponseMsg = self.soapResponseMsgs.get(artifact, None) if soapResponseMsg is None: raise Exception('FIXME: Handle the case when artifact is wrong') return self.newHttpResponse( 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) elif requestType == lasso.requestTypeLogout: server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeIdp) logout.process_request_msg(soapRequestMsg, lasso.httpMethodSoap) nameIdentifier = logout.nameIdentifier self.failUnless(nameIdentifier) # Retrieve session dump and identity dump using name identifier. webSession = self.getWebSessionFromNameIdentifier(nameIdentifier) if webSession is None: raise Exception('FIXME: Handle the case when there is no web session') sessionDump = webSession.sessionDump if sessionDump is None: raise Exception( 'FIXME: Handle the case when there is no session dump in web session') logout.set_session_from_dump(sessionDump) webUser = self.getWebUserFromNameIdentifier(nameIdentifier) if webUser is None: raise Exception('FIXME: Handle the case when there is no web user') identityDump = webUser.identityDump if identityDump is None: raise Exception( 'FIXME: Handle the case when there is no identity dump in web user') logout.set_identity_from_dump(identityDump) logout.validate_request() self.failIf(logout.is_identity_dirty()) identity = logout.get_identity() self.failUnless(identity) identityDump = identity.dump() self.failUnless(identityDump) self.failUnless(logout.is_session_dirty()) # Log the user out. # It is done before logout from other service providers, since we don't want to # accept passive login connections inbetween. del webSession.sessionDump del webSession.webUserId # We also delete the session, but it is not mandantory, since the user is logged out # anyway. del self.webSessions[webSession.uniqueId] nameIdentifier = logout.nameIdentifier self.failUnless(nameIdentifier) del self.webSessionIdsByNameIdentifier[nameIdentifier] # Tell each other service provider to logout the user. otherProviderId = logout.get_next_providerID() while otherProviderId is not None: logout.init_request(otherProviderId) logout.build_request_msg() soapEndpoint = logout.msg_url self.failUnless(soapEndpoint) soapRequestMsg = logout.msg_body self.failUnless(soapRequestMsg) httpResponse = sendHttpRequest( 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) self.failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) otherProviderId = logout.get_next_providerID() logout.build_response_msg() soapResponseMsg = logout.msg_body self.failUnless(soapResponseMsg) return self.newHttpResponse( 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) else: raise Exception('Unknown request type: %s' % requestType)