diff options
| author | Emmanuel Raviart <eraviart@entrouvert.com> | 2004-08-10 09:37:37 +0000 |
|---|---|---|
| committer | Emmanuel Raviart <eraviart@entrouvert.com> | 2004-08-10 09:37:37 +0000 |
| commit | c753e696c458cbb8e4e52d5413853fc0a72047c2 (patch) | |
| tree | 7e5b7682a0368fe74faa6b43f561e07f254bb45f /python | |
| parent | 18352ddb396946ca7583f8a770cd59ecfca8abdb (diff) | |
| download | lasso-c753e696c458cbb8e4e52d5413853fc0a72047c2.tar.gz lasso-c753e696c458cbb8e4e52d5413853fc0a72047c2.tar.xz lasso-c753e696c458cbb8e4e52d5413853fc0a72047c2.zip | |
Improved Python unit tests.
Diffstat (limited to 'python')
| -rw-r--r-- | python/tests/IdentityProvider.py | 108 | ||||
| -rw-r--r-- | python/tests/Provider.py | 21 | ||||
| -rw-r--r-- | python/tests/ServiceProvider.py | 30 | ||||
| -rw-r--r-- | python/tests/abstractweb.py | 87 | ||||
| -rw-r--r-- | python/tests/http.py | 55 | ||||
| -rw-r--r-- | python/tests/login_tests.py | 20 | ||||
| -rw-r--r-- | python/tests/web.py | 66 | ||||
| -rw-r--r-- | python/tests/websimulator.py | 143 |
8 files changed, 326 insertions, 204 deletions
diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py index 76cd3f0a..c48ddb5d 100644 --- a/python/tests/IdentityProvider.py +++ b/python/tests/IdentityProvider.py @@ -41,69 +41,41 @@ class IdentityProvider(Provider): # Single sign-on using HTTP redirect. login = lasso.Login.new(lassoServer) session = handler.session - user = None - if session is not None: - if session.lassoSessionDump is not None: - login.set_session_from_dump(session.lassoSessionDump) - user = self.getUserFromSession(session) - if user is not None and user.lassoIdentityDump is not None: - login.set_identity_from_dump(user.lassoIdentityDump) + if session is not None and session.lassoSessionDump is not None: + login.set_session_from_dump(session.lassoSessionDump) + user = handler.user + if user is not None and user.lassoIdentityDump is not None: + login.set_identity_from_dump(user.lassoIdentityDump) login.init_from_authn_request_msg(handler.httpRequest.query, lasso.httpMethodRedirect) if not login.must_authenticate(): userAuthenticated = user is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME return self.singleSignOn_part2( - handler, login, session, user, userAuthenticated, authenticationMethod) - - if session is None: - session = self.createSession(handler.httpRequest.client) - session.loginDump = login.dump() - - # A real identity provider using a HTML form to ask user's login & password would store - # idpLoginDump in a session variable and display the HTML login form. - userId = handler.httpRequest.client.keyring.get(self.url, None) - userAuthenticated = userId in self.users - if userAuthenticated: - session.userId = userId - authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME + handler, login, userAuthenticated, authenticationMethod) - lassoServer = self.getLassoServer() - session = handler.session - loginDump = session.loginDump - del session.loginDump - login = lasso.Login.new_from_dump(lassoServer, loginDump) - # Set identity & session in login, because loginDump doesn't contain them. - if session.lassoSessionDump is not None: - login.set_session_from_dump(session.lassoSessionDump) - user = self.getUserFromSession(session) - if user is not None and user.lassoIdentityDump is not None: - login.set_identity_from_dump(user.lassoIdentityDump) + return self.singleSignOn_authenticate(handler, login) - return self.singleSignOn_part2( - handler, login, session, user, userAuthenticated, authenticationMethod) elif handler.httpRequest.method == 'POST' \ and handler.httpRequest.headers.get('Content-Type', None) == 'text/xml': # SOAP request => LECP single sign-on. lecp = lasso.Lecp.new(lassoServer) session = handler.session - user = None - if session is not None: - if session.lassoSessionDump is not None: - lecp.set_session_from_dump(session.lassoSessionDump) - user = self.getUserFromSession(session) - if user is not None and user.lassoIdentityDump is not None: - lecp.set_identity_from_dump(user.lassoIdentityDump) + if session is not None and session.lassoSessionDump is not None: + lecp.set_session_from_dump(session.lassoSessionDump) + user = handler.user + if user is not None and user.lassoIdentityDump is not None: + lecp.set_identity_from_dump(user.lassoIdentityDump) lecp.init_from_authn_request_msg(handler.httpRequest.body, lasso.httpMethodSoap) # FIXME: lecp.must_authenticate() should always return False. # The other solution is that we are able to not call lecp.must_authenticate() # failIf(lecp.must_authenticate()) userAuthenticated = user is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME - # FIXME: It is very very strange that we don't provide userAuthenticated, - # authenticationMethod, reauthenticateOnOrAfter' to lecp before or when build response. lecp.build_authn_response_envelope_msg( - userAuthenticated, authenticationMethod, 'FIXME: reauthenticateOnOrAfter') + userAuthenticated, authenticationMethod, + "2005-05-03T16:12:00Z", # FIXME: reauthenticateOnOrAfter + ) soapResponseMsg = lecp.msg_body failUnless(soapResponseMsg) # FIXME: Lasso should set a lecp.msg_content_type to @@ -124,13 +96,47 @@ class IdentityProvider(Provider): 400, 'Bad Request: Method %s not handled by singleSignOn' % handler.httpRequest.method) - def singleSignOn_part2(self, handler, login, session, user, userAuthenticated, - authenticationMethod): + def singleSignOn_authenticate(self, handler, login): + if not self.instantAuthentication: + # The authentication needs to change page (needed for a HTML form, for example). + # Save Lasso login as a dump in session. + session = handler.session + if session is None: + session = handler.createSession() + session.lassoLoginDump = login.dump() + login = None + return self.authenticate(handler, self.singleSignOn_authenticate_part2, login) + + def singleSignOn_authenticate_part2(self, handler, userAuthenticated, authenticationMethod, + login = None): + if login is None: + # The authentication needed to change page (needed for a HTML form, for example). + # Reconstruct Lasso login from dump. + lassoServer = self.getLassoServer() + session = handler.session + failUnless(session) + failUnless(session.lassoLoginDump) + login = lasso.Login.new_from_dump(lassoServer, session.lassoLoginDump) + del session.lassoLoginDump + # Set identity & session in login, because session.lassoLoginDump doesn't contain them. + if session.lassoSessionDump is not None: + login.set_session_from_dump(session.lassoSessionDump) + user = handler.user + if user is not None and user.lassoIdentityDump is not None: + login.set_identity_from_dump(user.lassoIdentityDump) + return self.singleSignOn_part2(handler, login, userAuthenticated, authenticationMethod) + + def singleSignOn_part2(self, handler, login, userAuthenticated, authenticationMethod): failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME login.build_artifact_msg( - userAuthenticated, authenticationMethod, 'FIXME: reauthenticateOnOrAfter', + userAuthenticated, authenticationMethod, + "2005-05-03T16:12:00Z", # FIXME: reauthenticateOnOrAfter lasso.httpMethodRedirect) if userAuthenticated: + session = handler.session + failUnless(session) + user = handler.user + failUnless(user) if login.is_identity_dirty(): lassoIdentityDump = login.get_identity().dump() failUnless(lassoIdentityDump) @@ -180,19 +186,17 @@ class IdentityProvider(Provider): session = self.getSessionFromNameIdentifier(nameIdentifier) if session is None: raise Exception('FIXME: Handle the case when there is no web session') - lassoSessionDump = session.lassoSessionDump - if lassoSessionDump is None: + if session.lassoSessionDump is None: raise Exception( 'FIXME: Handle the case when there is no session dump in web session') - logout.set_session_from_dump(lassoSessionDump) + logout.set_session_from_dump(session.lassoSessionDump) user = self.getUserFromNameIdentifier(nameIdentifier) if user is None: raise Exception('FIXME: Handle the case when there is no web user') - lassoIdentityDump = user.lassoIdentityDump - if lassoIdentityDump is None: + if user.lassoIdentityDump is None: raise Exception( 'FIXME: Handle the case when there is no identity dump in web user') - logout.set_identity_from_dump(lassoIdentityDump) + logout.set_identity_from_dump(user.lassoIdentityDump) logout.validate_request() failIf(logout.is_identity_dirty()) diff --git a/python/tests/Provider.py b/python/tests/Provider.py index 498c81fb..a0e1a08f 100644 --- a/python/tests/Provider.py +++ b/python/tests/Provider.py @@ -33,8 +33,29 @@ class Provider(WebSite): 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1', }) lassoServerDump = None + providerId = None # The Liberty providerID of this web site sessionTokensByNameIdentifier = None userIdsByNameIdentifier = None + def __init__(self, internet, url): + WebSite.__init__(self, internet, url) + self.userIdsByNameIdentifier = {} + self.sessionTokensByNameIdentifier = {} + def getLassoServer(self): return lasso.Server.new_from_dump(self.lassoServerDump) + + def getSessionFromNameIdentifier(self, nameIdentifier): + sessionToken = self.sessionTokensByNameIdentifier.get(nameIdentifier, None) + if sessionToken is None: + # The user has no federation on this site or has no authentication assertion for this + # federation. + return None + return self.sessions.get(sessionToken, None) + + def getUserFromNameIdentifier(self, nameIdentifier): + userId = self.userIdsByNameIdentifier.get(nameIdentifier, None) + if userId is None: + # The user has no federation on this site. + return None + return self.users.get(userId, None) diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py index e23a9cf2..e7b2eeb9 100644 --- a/python/tests/ServiceProvider.py +++ b/python/tests/ServiceProvider.py @@ -83,22 +83,18 @@ class ServiceProvider(Provider): session = self.getSessionFromNameIdentifier(nameIdentifier) if session is None: session = handler.session - if session is not None: - lassoSessionDump = session.lassoSessionDump - if lassoSessionDump is not None: - login.set_session_from_dump(lassoSessionDump) + if session is not None and session.lassoSessionDump is not None: + login.set_session_from_dump(session.lassoSessionDump) # Retrieve identity dump, using name identifier or else try to retrieve him from web # session. If identity dump exists, give it to Lasso, so that it updates it. user = self.getUserFromNameIdentifier(nameIdentifier) if user is None: - user = self.getUserFromSession(session) - if user is not None: - lassoIdentityDump = user.lassoIdentityDump - if lassoIdentityDump is not None: - login.set_identity_from_dump(lassoIdentityDump) + user = handler.user + if user is not None and user.lassoIdentityDump is not None: + login.set_identity_from_dump(user.lassoIdentityDump) login.accept_sso() - if user is not None and lassoIdentityDump is None: + if user is not None and user.lassoIdentityDump is None: failUnless(login.is_identity_dirty()) lassoIdentity = login.get_identity() failUnless(lassoIdentity) @@ -114,7 +110,7 @@ class ServiceProvider(Provider): # If there was no web session yet, create it. Idem for the web user account. if session is None: - session = self.createSession(handler.httpRequest.client) + session = handler.createSession() if user is None: # A real service provider would ask user to login locally to create federation. Or it # would ask user informations to create a local account. @@ -211,18 +207,16 @@ class ServiceProvider(Provider): session = handler.session if session is None: return handler.respond(401, 'Access Unauthorized: User has no session opened.') - user = self.getUserFromSession(session) + user = handler.user if user is None: return handler.respond(401, 'Access Unauthorized: User is not logged in.') lassoServer = self.getLassoServer() logout = lasso.Logout.new(lassoServer, lasso.providerTypeSp) - lassoIdentityDump = self.getIdentityDump(handler.httpRequest.client) - if lassoIdentityDump is not None: - logout.set_identity_from_dump(lassoIdentityDump) - lassoSessionDump = self.getLassoSessionDump(handler.httpRequest.client) - if lassoSessionDump is not None: - logout.set_session_from_dump(lassoSessionDump) + if user.lassoIdentityDump is not None: + logout.set_identity_from_dump(user.lassoIdentityDump) + if session.lassoSessionDump is not None: + logout.set_session_from_dump(session.lassoSessionDump) logout.init_request() logout.build_request_msg() diff --git a/python/tests/abstractweb.py b/python/tests/abstractweb.py index cdfc1445..671c0a73 100644 --- a/python/tests/abstractweb.py +++ b/python/tests/abstractweb.py @@ -142,6 +142,16 @@ class HttpRequestHandlerMixin: user = None site = None # The virtual host + def createSession(self): + session = self.site.newSession() + self.session = session + return session + + def createUser(self): + user = self.site.newUser() + self.user = user + return user + def respond(self, statusCode = 200, statusMessage = None, headers = None, body = None): self.httpResponse = self.HttpResponse( self, statusCode, statusMessage = statusMessage, headers = headers, body = body) @@ -161,11 +171,44 @@ class HttpRequestHandlerMixin: class WebSessionMixin: + isDirty = True publishToken = False token = None + userId = None # ID of logged user + + def __init__(self, token): + self.token = token + + def getSimpleLabel(self): + return self.token + + def save(self): + pass + + simpleLabel = property(getSimpleLabel) class WebSiteMixin: + httpResponseHeaders = { + 'Server': 'Lasso Simulator Web Server', + } + instantAuthentication = False + lastSessionToken = 0 + lastUserId = 0 + users = None + sessions = None + WebSession = None # Class + WebUser = None # Class + + def __init__(self): + self.users = {} + self.sessions = {} + + def authenticate(self, handler, callback, *arguments, **keywordArguments): + # The arguments & keywordArguments should be given back to callback only for + # instant authentication. + raise NotImplementedError + def authenticateX509User(self, clientCertificate): # We should check certificate (for example clientCertificate.get_serial_number() # and return the user if one matches, or None otherwise. @@ -175,6 +218,50 @@ class WebSiteMixin: # We should check login & password and return the user if one matches or None otherwise. return None + def handleHttpRequestHandler(self, httpRequestHandler): + methodName = httpRequestHandler.httpRequest.path.replace('/', '') + try: + method = getattr(self, methodName) + except AttributeError: + return httpRequestHandler.respond( + 404, 'Path "%s" Not Found.' % httpRequestHandler.httpRequest.path) + return method(httpRequestHandler) + + def newSession(self): + self.lastSessionToken += 1 + session = self.WebSession(self.lastSessionToken) + self.sessions[self.lastSessionToken] = session + return session + + def newUser(self, name = None): + if name is None: + self.lastUserId += 1 + userId = self.lastUserId + else: + userId = name + user = self.WebUser(userId, name = name) + self.users[userId] = user + return user + class WebUserMixin: + isDirty = True + name = None sessionToken = None + uniqueId = None + + def __init__(self, uniqueId, name = None): + self.uniqueId = uniqueId + if name: + self.name = name + + def getSimpleLabel(self): + if self.name: + return self.name + else: + return 'Anonymous User #%s' % self.uniqueId + + def save(self): + pass + + simpleLabel = property(getSimpleLabel) diff --git a/python/tests/http.py b/python/tests/http.py index 46347fa3..750c4e93 100644 --- a/python/tests/http.py +++ b/python/tests/http.py @@ -274,7 +274,14 @@ class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): # requestline = None # Strange # request_version = None # Strange server_version = 'HttpRequestHandlerMixin/1.0' - site = None # Class variable. + site = None # Class variable + testCookieSupport = False + + def createSession(self): + session = abstractweb.HttpRequestHandlerMixin.createSession(self) + if self.canUseCookie: + self.testCookieSupport = True + return session def handle(self): """Handle multiple requests if necessary.""" @@ -314,7 +321,7 @@ class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): logger.info(self.raw_requestline.strip()) logger.debug(str(self.headers)) -############### + # Retrieve the session and user, if possible. session = None sessionToken = None @@ -330,8 +337,8 @@ class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): % clientCertificate.get_serial_number()) else: sessionToken = user.sessionToken - if sessionToken: - session = self.site.getSessionFromToken(sessionToken) + if sessionToken is not None: + session = self.site.sessions.get(sessionToken) if session is None: sessionToken = None del user.sessionToken @@ -379,8 +386,8 @@ class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): return self.outputErrorUnauthorized(httpPath) else: sessionToken = user.sessionToken - if sessionToken: - session = self.site.getSessionFromToken(sessionToken) + if sessionToken is not None: + session = self.site.sessions.get(sessionToken) if session is None: sessionToken = None del user.sessionToken @@ -394,9 +401,10 @@ class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): elif login: # No password was given. Assume login contains a session token. # TODO: sanity chek on login - session = self.site.getSessionFromToken(sessionToken) - if session is not None: - user = session.user + sessionToken = login + session = self.site.sessions.get(sessionToken) + if session is not None and session.userId is not None: + user = self.site.users.get(session.userId) if user is not None and user.sessionToken != session.token: # Sanity check. user.sessionToken = session.token @@ -434,16 +442,17 @@ class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): sessionTokenInCookie = True canUseCookie = True if session is None and sessionToken is not None: - session = self.site.getSessionFromToken(sessionToken) + session = self.site.sessions.get(sessionToken) if session is None: sessionToken = None sessionTokenInCookie = False else: if user is None: - user = session.user - if user is not None and user.sessionToken != sessionToken: - # Sanity check. - user.sessionToken = sessionToken + if session.userId is not None: + user = self.site.users.get(session.userId) + if user is not None and user.sessionToken != sessionToken: + # Sanity check. + user.sessionToken = sessionToken else: # The user has been authenticated (using HTTP or X.509 authentication), but the # associated session didn't exist (or was too old, or...). So, update @@ -453,28 +462,30 @@ class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): # token (it is better not to store it in a cookie or in URLs). if session.publishToken: del session.publishToken + self.canUseCookie = canUseCookie if session is None and user is not None: # The user has been authenticated (using HTTP or X.509 authentication), but the session # doesn't exist yet (or was too old, or...). Create a new session. - session = sessions.getOrCreateSession() + session = self.createSession() # For security reasons, we want to minimize the publication of session # token (it is better not to store it in a cookie or in URLs). # session.publishToken = False # False is the default value. - session.setAccountAbsolutePath(account.getAbsolutePath()) + session.userId = user.uniqueId user.sessionToken = session.token - self.user = user - if user is not None: - logger.debug('User: %s' % user.simpleLabel) - self.session = session + else: + self.session = session if session is not None: if not sessionTokenInCookie: # The sessionToken is valid but is not stored in the cookie. So, don't try to # use cookie. canUseCookie = False logger.debug('Session: %s' % session.simpleLabel) - self.canUseCookie = canUseCookie + self.user = user + if user is not None: + logger.debug('User: %s' % user.simpleLabel) -############### + # Now, the HTTP request handler has done everything it could done. Transfer the processing + # to the site. try: self.site.handleHttpRequestHandler(self) diff --git a/python/tests/login_tests.py b/python/tests/login_tests.py index 2979f1ba..5dd09b76 100644 --- a/python/tests/login_tests.py +++ b/python/tests/login_tests.py @@ -60,10 +60,10 @@ class LoginTestCase(unittest.TestCase): failUnless(site.lassoServerDump) lassoServer.destroy() - site.addUser('Chantereau') - site.addUser('Clapies') - site.addUser('Febvre') - site.addUser('Nowicki') + site.newUser('Chantereau') + site.newUser('Clapies') + site.newUser('Febvre') + site.newUser('Nowicki') # Frederic Peters has no account on identity provider. return site @@ -97,11 +97,11 @@ class LoginTestCase(unittest.TestCase): failUnless(site.lassoServerDump) lassoServer.destroy() - site.addUser('Nicolas') - site.addUser('Romain') - site.addUser('Valery') + site.newUser('Nicolas') + site.newUser('Romain') + site.newUser('Valery') # Christophe Nowicki has no account on service provider. - site.addUser('Frederic') + site.newUser('Frederic') return site def setUp(self): @@ -247,6 +247,7 @@ class LoginTestCase(unittest.TestCase): def test07(self): """LECP login.""" + internet = Internet() idpSite = self.generateIdpSite(internet) spSite = self.generateSpSite(internet) @@ -262,10 +263,9 @@ class LoginTestCase(unittest.TestCase): httpResponse = lecp.login(principal, spSite, '/login') failUnlessEqual(httpResponse.statusCode, 401) - # Now we authenticate principal, before testing LECP. So, LECP must succeed. + # Now authenticate principal, before testing LECP. So, LECP must succeed. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') failUnlessEqual(httpResponse.statusCode, 200) - idpSite.createSession(lecp) httpResponse = lecp.login(principal, spSite, '/login') failUnlessEqual(httpResponse.statusCode, 200) diff --git a/python/tests/web.py b/python/tests/web.py new file mode 100644 index 00000000..90d74a73 --- /dev/null +++ b/python/tests/web.py @@ -0,0 +1,66 @@ +# -*- coding: UTF-8 -*- + + +# HTTP Client and Server Enhanced Classes +# By: Frederic Peters <fpeters@entrouvert.com> +# Emmanuel Raviart <eraviart@entrouvert.com> +# +# Copyright (C) 2004 Entr'ouvert +# http://www.entrouvert.org +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + + +"""HTTP client and server enhanced classes + +Features: +- HTTPS using OpenSSL; +- web sessions (with or without cookie); +- user authentication (support of basic HTTP-authentication, X.509v3 certificate authentication, + HTML based authentication, etc). +""" + + +import abstractweb + + +class WebSite(abstractweb.WebSiteMixin, WebClient): + instantAuthentication = True # Authentication doesn't use a HTML form. + url = None # The main URL of web site + WebSession = WebSession + WebUser = WebUser + + def __init__(self, internet, url): + WebClient.__init__(self, internet) + abstractweb.WebSiteMixin.__init__(self) + self.url = url + self.internet.addWebSite(self) + + def authenticate(self, handler, callback, *arguments, **keywordArguments): + FIXME: TODO. + + import lasso + authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME + if userAuthenticated: + session = handler.session + if session is None: + session = handler.createSession() + user = handler.user + if user is None: + user = handler.createUser() + session.userId = user.uniqueId + user.sessionToken = session.token + return callback(handler, userAuthenticated, authenticationMethod, *arguments, + **keywordArguments) diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py index 360fcd06..f5fb843f 100644 --- a/python/tests/websimulator.py +++ b/python/tests/websimulator.py @@ -101,14 +101,14 @@ class HttpRequestHandler(abstractweb.HttpRequestHandlerMixin, object): self.site = site self.httpRequest = httpRequest - def getSession(self): - return self.site.getSessionFromPrincipal(self.httpRequest.client) - + def createSession(self): + session = abstractweb.HttpRequestHandlerMixin.createSession(self) + self.httpRequest.client.sessionTokens[self.site.url] = session.token + return session + def respondRedirectTemporarily(self, url): return self.httpRequest.client.redirect(url) - session = property(getSession) - class Internet(object): webSites = None @@ -181,120 +181,59 @@ class WebSession(abstractweb.WebSessionMixin, object): """Simulation of session of a web site""" expirationTime = None # A sample session variable - isDirty = True + lassoLoginDump = None # Used only by some identity providers lassoSessionDump = None - loginDump = None # Used only by some identity providers - userId = None # ID of logged user. - def __init__(self, token): - self.token = token - def save(self): - pass +class WebUser(abstractweb.WebUserMixin, object): + """Simulation of user of a web site""" + + lassoIdentityDump = None + language = 'fr' # A sample user variable class WebSite(abstractweb.WebSiteMixin, WebClient): """Simulation of a web site""" - httpResponseHeaders = { - 'Server': 'Lasso Simulator Web Server', - } - lastSessionToken = 0 - providerId = None # The Liberty providerID of this web site + instantAuthentication = True # Authentication doesn't use a HTML form. url = None # The main URL of web site - users = None - sessions = None + WebSession = WebSession + WebUser = WebUser def __init__(self, internet, url): WebClient.__init__(self, internet) + abstractweb.WebSiteMixin.__init__(self) self.url = url - self.userIdsByNameIdentifier = {} - self.users = {} - self.sessionTokensByNameIdentifier = {} - self.sessions = {} self.internet.addWebSite(self) - def addUser(self, name): - self.users[name] = WebUser(name) - - def createSession(self, client): - self.lastSessionToken += 1 - session = WebSession(self.lastSessionToken) - self.sessions[self.lastSessionToken] = session - client.sessionTokens[self.url] = self.lastSessionToken - return session - - def getIdentityDump(self, principal): - session = self.getSessionFromPrincipal(principal) - user = self.getUserFromSession(session) - if user is None: - return None - return user.lassoIdentityDump - - def getLassoSessionDump(self, principal): - session = self.getSessionFromPrincipal(principal) - if session is None: - return None - return session.lassoSessionDump - - def getSessionFromNameIdentifier(self, nameIdentifier): - sessionToken = self.sessionTokensByNameIdentifier.get(nameIdentifier, None) - if sessionToken is None: - # The user has no federation on this site or has no authentication assertion for this - # federation. - return None - return self.sessions.get(sessionToken, None) - - def getSessionFromPrincipal(self, principal): - sessionToken = principal.sessionTokens.get(self.url, None) - return self.getSessionFromToken(sessionToken) - - def getSessionFromToken(self, sessionToken): - if sessionToken is None: - # The user has no web session opened on this site. - return None - return self.sessions.get(sessionToken, None) - - def getUserFromNameIdentifier(self, nameIdentifier): - userId = self.userIdsByNameIdentifier.get(nameIdentifier, None) - if userId is None: - # The user has no federation on this site. - return None - return self.users.get(userId, None) - - def getUserFromSession(self, session): - if session is None: - return None - userId = session.userId - if userId is None: - # The user has no account on this site. - return None - return self.users.get(userId, None) + def authenticate(self, handler, callback, *arguments, **keywordArguments): + userId = handler.httpRequest.client.keyring.get(self.url, None) + userAuthenticated = userId in self.users + + import lasso + authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME + if userAuthenticated: + session = handler.session + if session is None: + session = handler.createSession() + user = handler.user + if user is None: + user = handler.createUser() + session.userId = user.uniqueId + user.sessionToken = session.token + return callback(handler, userAuthenticated, authenticationMethod, *arguments, + **keywordArguments) def handleHttpRequest(self, httpRequest): httpRequestHandler = HttpRequestHandler(self, httpRequest) - return self.handleHttpRequestHandler(httpRequestHandler) - - def handleHttpRequestHandler(self, httpRequestHandler): - methodName = httpRequestHandler.httpRequest.path.replace('/', '') - try: - method = getattr(self, methodName) - except AttributeError: - return httpRequestHandler.respond( - 404, 'Path "%s" Not Found.' % httpRequestHandler.httpRequest.path) - return method(httpRequestHandler) + # Retrieve session and user. + sessionToken = httpRequest.client.sessionTokens.get(self.url, None) + if sessionToken is not None: + session = self.sessions.get(sessionToken) + if session is not None: + httpRequestHandler.session = session + if session.userId is not None: + httpRequestHandler.user = self.users.get(session.userId, None) -class WebUser(abstractweb.WebUserMixin, object): - """Simulation of user of a web site""" - - isDirty = True - lassoIdentityDump = None - language = 'fr' # A sample user variable - uniqueId = None # The user name is used as an ID in this simulation. - - def __init__(self, uniqueId): - self.uniqueId = uniqueId - - def save(self): - pass + return self.handleHttpRequestHandler(httpRequestHandler) |
