diff options
Diffstat (limited to 'python/tests')
-rw-r--r-- | python/tests/IdentityProvider.py | 108 | ||||
-rw-r--r-- | python/tests/LibertyEnabledClient.py | 32 | ||||
-rw-r--r-- | python/tests/Provider.py | 9 | ||||
-rw-r--r-- | python/tests/ServiceProvider.py | 116 | ||||
-rw-r--r-- | python/tests/errorchecking_tests.py | 10 | ||||
-rw-r--r-- | python/tests/login_tests.py | 81 | ||||
-rwxr-xr-x | python/tests/tests.py | 14 | ||||
-rw-r--r-- | python/tests/websimulator.py | 161 |
8 files changed, 247 insertions, 284 deletions
diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py index 2715c42a..27ca0265 100644 --- a/python/tests/IdentityProvider.py +++ b/python/tests/IdentityProvider.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -32,16 +31,16 @@ from websimulator import * class IdentityProvider(Provider): soapResponseMsgs = None - def __init__(self, test, internet, url): - Provider.__init__(self, test, internet, url) + def __init__(self, internet, url): + Provider.__init__(self, internet, url) self.soapResponseMsgs = {} - def singleSignOn(self, httpRequest): + def singleSignOn(self, handler): server = self.getServer() - if httpRequest.method == 'GET': + if handler.httpRequest.method == 'GET': # Single sign-on using HTTP redirect. login = lasso.Login.new(server) - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) webUser = None if webSession is not None: if webSession.sessionDump is not None: @@ -49,29 +48,28 @@ class IdentityProvider(Provider): webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: login.set_identity_from_dump(webUser.identityDump) - login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect) + login.init_from_authn_request_msg(handler.httpRequest.query, lasso.httpMethodRedirect) if not login.must_authenticate(): userAuthenticated = webUser is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME return self.singleSignOn_part2( - httpRequest, login, webSession, webUser, userAuthenticated, - authenticationMethod) + handler, login, webSession, webUser, userAuthenticated, authenticationMethod) if webSession is None: - webSession = self.createWebSession(httpRequest.client) + webSession = self.createWebSession(handler.httpRequest.client) webSession.loginDump = login.dump() # A real identity provider using a HTML form to ask user's login & password would store # idpLoginDump in a session variable and display the HTML login form. - webUserId = httpRequest.client.keyring.get(self.url, None) + webUserId = handler.httpRequest.client.keyring.get(self.url, None) userAuthenticated = webUserId in self.webUsers if userAuthenticated: webSession.webUserId = webUserId authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME server = self.getServer() - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) loginDump = webSession.loginDump del webSession.loginDump login = lasso.Login.new_from_dump(server, loginDump) @@ -83,12 +81,12 @@ class IdentityProvider(Provider): login.set_identity_from_dump(webUser.identityDump) return self.singleSignOn_part2( - httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod) - elif httpRequest.method == 'POST' \ - and httpRequest.headers.get('Content-Type', None) == 'text/xml': + handler, login, webSession, webUser, userAuthenticated, authenticationMethod) + elif handler.httpRequest.method == 'POST' \ + and handler.httpRequest.headers.get('Content-Type', None) == 'text/xml': # SOAP request => LECP single sign-on. lecp = lasso.Lecp.new(server) - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) webUser = None if webSession is not None: if webSession.sessionDump is not None: @@ -96,25 +94,24 @@ class IdentityProvider(Provider): webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: lecp.set_identity_from_dump(webUser.identityDump) - lecp.init_from_authn_request_msg(httpRequest.body, lasso.httpMethodSoap) + lecp.init_from_authn_request_msg(handler.httpRequest.body, lasso.httpMethodSoap) # FIXME: lecp.must_authenticate() should always return False. # The other solution is that we are able to not call lecp.must_authenticate() - # self.failIf(lecp.must_authenticate()) + # failIf(lecp.must_authenticate()) userAuthenticated = webUser is not None authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME # FIXME: It is very very strange that we don't provide userAuthenticated, # authenticationMethod, reauthenticateOnOrAfter' to lecp before or when build response. lecp.build_authn_response_envelope_msg() soapResponseMsg = lecp.msg_body - self.failUnless(soapResponseMsg) + failUnless(soapResponseMsg) # FIXME: Lasso should set a lecp.msg_content_type to # "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with # other profiles. # contentType = lecp.msg_content_type - # self.failUnlessEqual(contentType, 'application/vnd.liberty-response+xml') + # failUnlessEqual(contentType, 'application/vnd.liberty-response+xml') contentType = 'application/vnd.liberty-response+xml' - return self.newHttpResponse( - 200, + return handler.respond( headers = { 'Content-Type': contentType, 'Cache-Control': 'no-cache', @@ -122,60 +119,61 @@ class IdentityProvider(Provider): }, body = soapResponseMsg) else: - return self.newHttpResponse( - 400, 'Bad Request: Method %s not handled by singleSignOn' % httpRequest.method) + return handler.respond( + 400, + 'Bad Request: Method %s not handled by singleSignOn' % handler.httpRequest.method) - def singleSignOn_part2(self, httpRequest, login, webSession, webUser, userAuthenticated, + def singleSignOn_part2(self, handler, login, webSession, webUser, userAuthenticated, authenticationMethod): - self.failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME + failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME login.build_artifact_msg( userAuthenticated, authenticationMethod, 'FIXME: reauthenticateOnOrAfter', lasso.httpMethodRedirect) if userAuthenticated: if login.is_identity_dirty(): identityDump = login.get_identity().dump() - self.failUnless(identityDump) + failUnless(identityDump) webUser.identityDump = identityDump - self.failUnless(login.is_session_dirty()) + failUnless(login.is_session_dirty()) sessionDump = login.get_session().dump() - self.failUnless(sessionDump) + failUnless(sessionDump) webSession.sessionDump = sessionDump nameIdentifier = login.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId else: - self.failIf(login.is_identity_dirty()) - self.failIf(login.is_session_dirty()) + failIf(login.is_identity_dirty()) + failIf(login.is_session_dirty()) artifact = login.assertionArtifact - self.failUnless(artifact) + failUnless(artifact) soapResponseMsg = login.response_dump - self.failUnless(soapResponseMsg) + failUnless(soapResponseMsg) self.soapResponseMsgs[artifact] = soapResponseMsg responseUrl = login.msg_url - self.failUnless(responseUrl) - return httpRequest.client.redirect(responseUrl) + failUnless(responseUrl) + return handler.respondRedirectTemporarily(responseUrl) - def soapEndpoint(self, httpRequest): - soapRequestMsg = httpRequest.body + def soapEndpoint(self, handler): + soapRequestMsg = handler.httpRequest.body requestType = lasso.get_request_type_from_soap_msg(soapRequestMsg) if requestType == lasso.requestTypeLogin: server = self.getServer() login = lasso.Login.new(server) login.process_request_msg(soapRequestMsg) artifact = login.assertionArtifact - self.failUnless(artifact) + failUnless(artifact) soapResponseMsg = self.soapResponseMsgs.get(artifact, None) if soapResponseMsg is None: raise Exception('FIXME: Handle the case when artifact is wrong') - return self.newHttpResponse( - 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) + return handler.respond( + headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) elif requestType == lasso.requestTypeLogout: server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeIdp) logout.process_request_msg(soapRequestMsg, lasso.httpMethodSoap) nameIdentifier = logout.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) # Retrieve session dump and identity dump using name identifier. webSession = self.getWebSessionFromNameIdentifier(nameIdentifier) @@ -196,12 +194,12 @@ class IdentityProvider(Provider): logout.set_identity_from_dump(identityDump) logout.validate_request() - self.failIf(logout.is_identity_dirty()) + failIf(logout.is_identity_dirty()) identity = logout.get_identity() - self.failUnless(identity) + failUnless(identity) identityDump = identity.dump() - self.failUnless(identityDump) - self.failUnless(logout.is_session_dirty()) + failUnless(identityDump) + failUnless(logout.is_session_dirty()) # Log the user out. # It is done before logout from other service providers, since we don't want to @@ -212,7 +210,7 @@ class IdentityProvider(Provider): # anyway. del self.webSessions[webSession.uniqueId] nameIdentifier = logout.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) del self.webSessionIdsByNameIdentifier[nameIdentifier] # Tell each other service provider to logout the user. @@ -222,21 +220,21 @@ class IdentityProvider(Provider): logout.build_request_msg() soapEndpoint = logout.msg_url - self.failUnless(soapEndpoint) + failUnless(soapEndpoint) soapRequestMsg = logout.msg_body - self.failUnless(soapRequestMsg) + failUnless(soapRequestMsg) httpResponse = sendHttpRequest( 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) otherProviderId = logout.get_next_providerID() logout.build_response_msg() soapResponseMsg = logout.msg_body - self.failUnless(soapResponseMsg) - return self.newHttpResponse( - 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) + failUnless(soapResponseMsg) + return handler.respond( + headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) else: raise Exception('Unknown request type: %s' % requestType) diff --git a/python/tests/LibertyEnabledClient.py b/python/tests/LibertyEnabledClient.py index 397999bf..9ac5e1ee 100644 --- a/python/tests/LibertyEnabledClient.py +++ b/python/tests/LibertyEnabledClient.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -28,7 +27,7 @@ import lasso from websimulator import * -class LibertyEnabledClient(WebClient, Simulation): +class LibertyEnabledClient(WebClient): # A service provider MAY provide a list of identity providers it recognizes by including the # <lib:IDPList> element in the <lib:AuthnRequestEnvelope>. The format and processing rules for # the identity provider list MUST be as defined in [LibertyProtSchema]. @@ -59,28 +58,27 @@ class LibertyEnabledClient(WebClient, Simulation): # <lib:AuthnResponse> MUST be encoded by applying a base64 transformation (refer to # [RFC2045]) to the <lib:AuthnResponse> and all its elements. - # FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without - # metadata, instead of using 'singleSignOnServiceUrl'. - idpSingleSignOnServiceUrl = None - requestHeaders = WebClient.requestHeaders.copy() - requestHeaders.update({ + httpRequestHeaders = WebClient.httpRequestHeaders.copy() + httpRequestHeaders.update({ # FIXME: Is this the correct syntax for several URLs in LIBV? 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1', 'Liberty-Agent': 'LassoSimulator/0.0.0', # FIXME: As an alternative to 'Liberty-Enabled' header, a user agent may use: # 'User-Agent': ' '.join(( - # requestHeaders['User-Agent'], + # httpRequestHeaders['User-Agent'], # 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1')) - 'Accept': ','.join((requestHeaders['Accept'], 'application/vnd.liberty-request+xml')) + 'Accept': ','.join((httpRequestHeaders['Accept'], 'application/vnd.liberty-request+xml')) }) + # FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without + # metadata, instead of using 'singleSignOnServiceUrl'. + idpSingleSignOnServiceUrl = None - def __init__(self, test, internet): - Simulation.__init__(self, test) + def __init__(self, internet): WebClient.__init__(self, internet) def login(self, principal, site, path): httpResponse = self.sendHttpRequestToSite(site, 'GET', path) - self.failUnlessEqual( + failUnlessEqual( httpResponse.headers['Content-Type'], 'application/vnd.liberty-request+xml') lecp = lasso.Lecp.new(None) authnRequestEnvelope = httpResponse.body @@ -95,12 +93,12 @@ class LibertyEnabledClient(WebClient, Simulation): httpResponse = self.sendHttpRequest( 'POST', self.idpSingleSignOnServiceUrl, headers = {'Content-Type': 'text/xml'}, body = lecp.msg_body) - self.failUnlessEqual( + failUnlessEqual( httpResponse.headers.get('Content-Type', None), 'application/vnd.liberty-response+xml') lecp.process_authn_response_envelope_msg(httpResponse.body) lecp.build_authn_response_msg() - self.failUnless(lecp.msg_url) - self.failUnless(lecp.msg_body) + failUnless(lecp.msg_url) + failUnless(lecp.msg_body) # FIXME: Should we use 'multipart/form-data' for forms? return self.sendHttpRequest( 'POST', lecp.msg_url, headers = {'Content-Type': 'multipart/form-data'}, diff --git a/python/tests/Provider.py b/python/tests/Provider.py index 88406814..53cb10c2 100644 --- a/python/tests/Provider.py +++ b/python/tests/Provider.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -29,8 +28,8 @@ from websimulator import * class Provider(WebSite): - responseHeaders = WebSite.responseHeaders.copy() - responseHeaders.update({ + httpResponseHeaders = WebSite.httpResponseHeaders.copy() + httpResponseHeaders.update({ 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1', }) serverDump = None diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py index c548655b..2fd518a3 100644 --- a/python/tests/ServiceProvider.py +++ b/python/tests/ServiceProvider.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -32,57 +31,57 @@ from websimulator import * class ServiceProvider(Provider): idpSite = None # The identity provider, this service provider will use to authenticate users. - def assertionConsumer(self, httpRequest): + def assertionConsumer(self, handler): server = self.getServer() login = lasso.Login.new(server) - if httpRequest.method == 'GET': - login.init_request(httpRequest.query, lasso.httpMethodRedirect) + if handler.httpRequest.method == 'GET': + login.init_request(handler.httpRequest.query, lasso.httpMethodRedirect) login.build_request_msg() soapEndpoint = login.msg_url - self.failUnless(soapEndpoint) + failUnless(soapEndpoint) soapRequestMsg = login.msg_body - self.failUnless(soapRequestMsg) + failUnless(soapRequestMsg) httpResponse = self.sendHttpRequest( 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) try: login.process_response_msg(httpResponse.body) except lasso.Error, error: if error.code == -7: # FIXME: This will change, he said. - return self.newHttpResponse( + return handler.respond( 401, 'Access Unauthorized: User authentication failed on identity provider.') else: raise - elif httpRequest.method == 'POST': - authnResponseMsg = httpRequest.getFormField('LARES', None) - self.failUnless(authnResponseMsg) + elif handler.httpRequest.method == 'POST': + authnResponseMsg = handler.httpRequest.getFormField('LARES', None) + failUnless(authnResponseMsg) # FIXME: Should we do an init before process_authn_response_msg? try: login.process_authn_response_msg(authnResponseMsg) except lasso.Error, error: if error.code == -7: # FIXME: This will change, he said. - return self.newHttpResponse( + return handler.respond( 401, 'Access Unauthorized: User authentication failed on identity provider.') else: raise else: - return self.newHttpResponse( + return handler.respond( 400, - 'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method) + 'Bad Request: Method %s not handled by assertionConsumer' % handler.httpRequest.method) nameIdentifier = login.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) # Retrieve session dump, using name identifier or else try to use the client web session. # If session dump exists, give it to Lasso, so that it updates it. webSession = self.getWebSessionFromNameIdentifier(nameIdentifier) if webSession is None: - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) if webSession is not None: sessionDump = webSession.sessionDump if sessionDump is not None: @@ -99,29 +98,29 @@ class ServiceProvider(Provider): login.accept_sso() if webUser is not None and identityDump is None: - self.failUnless(login.is_identity_dirty()) + failUnless(login.is_identity_dirty()) identity = login.get_identity() - self.failUnless(identity) + failUnless(identity) identityDump = identity.dump() - self.failUnless(identityDump) - self.failUnless(login.is_session_dirty()) + failUnless(identityDump) + failUnless(login.is_session_dirty()) session = login.get_session() - self.failUnless(session) + failUnless(session) sessionDump = session.dump() - self.failUnless(sessionDump) + failUnless(sessionDump) # User is now authenticated. # If there was no web session yet, create it. Idem for the web user account. if webSession is None: - webSession = self.createWebSession(httpRequest.client) + webSession = self.createWebSession(handler.httpRequest.client) if webUser is None: # A real service provider would ask user to login locally to create federation. Or it # would ask user informations to create a local account. - webUserId = httpRequest.client.keyring.get(self.url, None) + webUserId = handler.httpRequest.client.keyring.get(self.url, None) userAuthenticated = webUserId in self.webUsers if not userAuthenticated: - return self.newHttpResponse(401, 'Access Unauthorized: User has no account.') + return handler.respond(401, 'Access Unauthorized: User has no account.') webUser = self.webUsers[webUserId] webSession.webUserId = webUser.uniqueId @@ -134,32 +133,32 @@ class ServiceProvider(Provider): self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId - return self.newHttpResponse(200) + return handler.respond() - def login(self, httpRequest): - libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None) - userAgent = httpRequest.headers.get('User-Agent', None) + def login(self, handler): + libertyEnabled = handler.httpRequest.headers.get('Liberty-Enabled', None) + userAgent = handler.httpRequest.headers.get('User-Agent', None) # FIXME: Lasso should have a function to compute useLecp. # Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent) useLecp = False if libertyEnabled: useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled if not useLecp: - return self.newHttpResponse(501, 'Unsupported Liberty Version.') + return handler.respond(501, 'Unsupported Liberty Version.') elif userAgent: useLecp = 'urn:liberty:iff:2003-08' in userAgent if not useLecp and "LIBV=" in userAgent: - return self.newHttpResponse(501, 'Unsupported Liberty Version.') + return handler.respond(501, 'Unsupported Liberty Version.') else: useLecp = False - forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False) - isPassive = httpRequest.getQueryBoolean('isPassive', False) + forceAuthn = handler.httpRequest.getQueryBoolean('forceAuthn', False) + isPassive = handler.httpRequest.getQueryBoolean('isPassive', False) server = self.getServer() if useLecp: lecp = lasso.Lecp.new(server) lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None. - self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest) + failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest) # FIXME: This protocol profile should be set by default by Lasso. lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost) @@ -180,15 +179,14 @@ class ServiceProvider(Provider): # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded. import base64 authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg) - self.failUnless(authnRequestEnvelopeMsg) + failUnless(authnRequestEnvelopeMsg) # FIXME: Lasso should set a lecp.msg_content_type to # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with # other profiles. # contentType = lecp.msg_content_type - # self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml') + # failUnlessEqual(contentType, 'application/vnd.liberty-request+xml') contentType = 'application/vnd.liberty-request+xml' - return self.newHttpResponse( - 200, + return handler.respond( headers = { 'Content-Type': contentType, 'Cache-Control': 'no-cache', @@ -198,7 +196,7 @@ class ServiceProvider(Provider): else: login = lasso.Login.new(server) login.init_authn_request(self.idpSite.providerId) - self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) + failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) if forceAuthn: login.request.set_forceAuthn(forceAuthn) if not isPassive: @@ -209,43 +207,43 @@ class ServiceProvider(Provider): login.request.set_relayState(relayState) login.build_authn_request_msg() authnRequestUrl = login.msg_url - self.failUnless(authnRequestUrl) - return httpRequest.client.redirect(authnRequestUrl) + failUnless(authnRequestUrl) + return handler.respondRedirectTemporarily(authnRequestUrl) - def logoutUsingSoap(self, httpRequest): - webSession = self.getWebSession(httpRequest.client) + def logoutUsingSoap(self, handler): + webSession = self.getWebSession(handler.httpRequest.client) if webSession is None: - return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.') + return handler.respond(401, 'Access Unauthorized: User has no session opened.') webUser = self.getWebUserFromWebSession(webSession) if webUser is None: - return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.') + return handler.respond(401, 'Access Unauthorized: User is not logged in.') server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeSp) - identityDump = self.getIdentityDump(httpRequest.client) + identityDump = self.getIdentityDump(handler.httpRequest.client) if identityDump is not None: logout.set_identity_from_dump(identityDump) - sessionDump = self.getSessionDump(httpRequest.client) + sessionDump = self.getSessionDump(handler.httpRequest.client) if sessionDump is not None: logout.set_session_from_dump(sessionDump) logout.init_request() logout.build_request_msg() soapEndpoint = logout.msg_url - self.failUnless(soapEndpoint) + failUnless(soapEndpoint) soapRequestMsg = logout.msg_body - self.failUnless(soapRequestMsg) + failUnless(soapRequestMsg) httpResponse = self.sendHttpRequest( 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) - self.failIf(logout.is_identity_dirty()) + failIf(logout.is_identity_dirty()) identity = logout.get_identity() - self.failUnless(identity) + failUnless(identity) identityDump = identity.dump() - self.failUnless(identityDump) - self.failUnless(logout.is_session_dirty()) + failUnless(identityDump) + failUnless(logout.is_session_dirty()) session = logout.get_session() if session is None: # The user is no more authenticated on any identity provider. Log him out. @@ -257,10 +255,10 @@ class ServiceProvider(Provider): else: # The user is still logged in on some other identity providers. sessionDump = session.dump() - self.failUnless(sessionDump) + failUnless(sessionDump) webSession.sessionDump = sessionDump nameIdentifier = logout.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) del self.webSessionIdsByNameIdentifier[nameIdentifier] - return self.newHttpResponse(200) + return handler.respond() diff --git a/python/tests/errorchecking_tests.py b/python/tests/errorchecking_tests.py index 3faa65db..87cb2b1b 100644 --- a/python/tests/errorchecking_tests.py +++ b/python/tests/errorchecking_tests.py @@ -3,11 +3,11 @@ # Python unit tests for Lasso library +# By: Frederic Peters <fpeters@entrouvert.com> +# Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Authors: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -27,8 +27,10 @@ import unittest import sys -sys.path.insert(0, '..') -sys.path.insert(0, '../.libs') +if not '..' in sys.path: + sys.path.insert(0, '..') +if not '../.libs' in sys.path: + sys.path.insert(0, '../.libs') import lasso diff --git a/python/tests/login_tests.py b/python/tests/login_tests.py index c3a1c46b..47378f48 100644 --- a/python/tests/login_tests.py +++ b/python/tests/login_tests.py @@ -3,11 +3,11 @@ # Python unit tests for Lasso library +# By: Frederic Peters <fpeters@entrouvert.com> +# Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Authors: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -27,11 +27,14 @@ import unittest import sys -sys.path.insert(0, '..') -sys.path.insert(0, '../.libs') +if not '..' in sys.path: + sys.path.insert(0, '..') +if not '../.libs' in sys.path: + sys.path.insert(0, '../.libs') import lasso +import builtins from IdentityProvider import IdentityProvider from LibertyEnabledClient import LibertyEnabledClient from ServiceProvider import ServiceProvider @@ -40,7 +43,7 @@ from websimulator import * class LoginTestCase(unittest.TestCase): def generateIdpSite(self, internet): - site = IdentityProvider(self, internet, 'https://identity-provider/') + site = IdentityProvider(internet, 'https://identity-provider/') site.providerId = 'https://identity-provider/metadata' server = lasso.Server.new( @@ -54,7 +57,7 @@ class LoginTestCase(unittest.TestCase): '../../examples/data/sp-public-key.pem', '../../examples/data/ca-crt.pem') site.serverDump = server.dump() - self.failUnless(site.serverDump) + failUnless(site.serverDump) server.destroy() site.addWebUser('Chantereau') @@ -65,7 +68,7 @@ class LoginTestCase(unittest.TestCase): return site def generateLibertyEnabledClient(self, internet): - client = LibertyEnabledClient(self, internet) + client = LibertyEnabledClient(internet) # FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without # metadata, instead of using 'singleSignOnServiceUrl'. ## server = lasso.Server.new( @@ -83,7 +86,7 @@ class LoginTestCase(unittest.TestCase): return client def generateSpSite(self, internet): - site = ServiceProvider(self, internet, 'https://service-provider/') + site = ServiceProvider(internet, 'https://service-provider/') site.providerId = 'https://service-provider/metadata' server = lasso.Server.new( @@ -97,7 +100,7 @@ class LoginTestCase(unittest.TestCase): '../../examples/data/idp-public-key.pem', '../../examples/data/ca-crt.pem') site.serverDump = server.dump() - self.failUnless(site.serverDump) + failUnless(site.serverDump) server.destroy() site.addWebUser('Nicolas') @@ -107,11 +110,15 @@ class LoginTestCase(unittest.TestCase): site.addWebUser('Frederic') return site -## def setUp(self): -## pass + def setUp(self): + for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless', + 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'): + builtins.set(name, getattr(self, name)) -## def tearDown(self): -## pass + def tearDown(self): + for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless', + 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'): + builtins.delete(name) def test00(self): """LECP login.""" @@ -138,11 +145,11 @@ class LoginTestCase(unittest.TestCase): principal.keyring[spSite.url] = 'Romain' httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 200) - self.failIf(spSite.webSessions) - self.failIf(idpSite.webSessions) + failUnlessEqual(httpResponse.statusCode, 200) + failIf(spSite.webSessions) + failIf(idpSite.webSessions) def test02(self): """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP. Done three times.""" @@ -156,28 +163,28 @@ class LoginTestCase(unittest.TestCase): principal.keyring[spSite.url] = 'Romain' httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) # Once again. Now the principal already has a federation between spSite and idpSite. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) # Once again. Do a new passive login between normal login and logout. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) del principal.keyring[idpSite.url] # Ensure identity provider will be really passive. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) # Once again, with isPassive and the user having no web session. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) def test03(self): """Service provider initiated login using HTTP redirect, but user fail to authenticate himself on identity provider. Then logout, with same problem.""" @@ -191,9 +198,9 @@ class LoginTestCase(unittest.TestCase): principal.keyring[spSite.url] = 'Frederic' httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) def test04(self): """Service provider initiated login using HTTP redirect, but user has no account on service @@ -208,9 +215,9 @@ class LoginTestCase(unittest.TestCase): # Christophe Nowicki has no account on service provider. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) def test05(self): """Service provider initiated login using HTTP redirect with isPassive for a user without federation yet.""" @@ -224,7 +231,7 @@ class LoginTestCase(unittest.TestCase): principal.keyring[spSite.url] = 'Romain' httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) def test06(self): """Testing forceAuthn flag.""" @@ -238,24 +245,24 @@ class LoginTestCase(unittest.TestCase): principal.keyring[spSite.url] = 'Romain' httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) # Ask user to reauthenticate while he is already logged. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) del principal.keyring[idpSite.url] # Ensure user can't authenticate. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) # Force authentication, but user won't authenticate. httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') - self.failUnlessEqual(httpResponse.statusCode, 401) + failUnlessEqual(httpResponse.statusCode, 401) ## def test06(self): ## """Service provider LECP login.""" diff --git a/python/tests/tests.py b/python/tests/tests.py index d7f2b99e..a8e902b4 100755 --- a/python/tests/tests.py +++ b/python/tests/tests.py @@ -2,14 +2,12 @@ # -*- coding: UTF-8 -*- -# PyLasso -- Python bindings for Lasso library +# Python unit tests for Lasso library +# By: Frederic Peters <fpeters@entrouvert.com> +# Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Authors: Nicolas Clapies <nclapies@entrouvert.com> -# Valery Febvre <vfebvre@easter-eggs.com> -# Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -33,8 +31,10 @@ import unittest from XmlTestRunner import XmlTestRunner -sys.path.insert(0, '..') -sys.path.insert(0, '../.libs') +if not '..' in sys.path: + sys.path.insert(0, '..') +if not '../.libs' in sys.path: + sys.path.insert(0, '../.libs') testSuites = ( diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py index 65064a0a..0c923d05 100644 --- a/python/tests/websimulator.py +++ b/python/tests/websimulator.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -27,7 +26,10 @@ # FIXME: Rename webUser to userAccount. -class HttpRequest(object): +import abstractweb + + +class HttpRequest(abstractweb.HttpRequestMixin, object): client = None # Principal or web site sending the request. body = None form = None @@ -46,10 +48,6 @@ class HttpRequest(object): if form: self.form = form - def send(self): - webSite = self.client.internet.getWebSite(self.url) - return webSite.doHttpRequest(self) - def getFormField(self, name, default = 'none'): if self.form and name in self.form: return self.form[name] @@ -57,49 +55,51 @@ class HttpRequest(object): raise KeyError(name) return default - def getQueryBoolean(self, name, default = 'none'): - try: - fieldValue = self.getQueryField(name) - except KeyError: - if default == 'none': - raise - return default - return fieldValue.lower not in ('', '0', 'false') + def getPath(self): + return self.pathAndQuery.split('?', 1)[0] + + def getPathAndQuery(self): + urlWithoutScheme = self.url[self.url.find('://') + 3:] + if '/' in urlWithoutScheme: + pathAndQuery = urlWithoutScheme[urlWithoutScheme.find('/'):] + else: + pathAndQuery = '' + return pathAndQuery def getQuery(self): - splitedUrl = self.url.split('?', 1) + splitedUrl = self.pathAndQuery.split('?', 1) if len(splitedUrl) > 1: return splitedUrl[1] else: return '' - def getQueryField(self, name, default = 'none'): - if self.query: - for field in self.query.split('&'): - fieldName, fieldValue = field.split('=') - if name == fieldName: - return fieldValue - if default == 'none': - raise KeyError(name) - return default + def getScheme(self): + return self.url.split(':', 1)[0].lower() + def send(self): + webSite = self.client.internet.getWebSite(self.url) + return webSite.handleHttpRequest(self) + + path = property(getPath) + pathAndQuery = property(getPathAndQuery) query = property(getQuery) + scheme = property(getScheme) -class HttpResponse(object): - body = None - headers = None - statusCode = None # 200 or... - statusMessage = None +class HttpResponse(abstractweb.HttpResponseMixin, object): + def send(self, httpRequestHandler): + return self - def __init__(self, statusCode, statusMessage = None, headers = None, body = None): - self.statusCode = statusCode - if statusMessage: - self.statusMessage = statusMessage - if headers: - self.headers = headers - if body: - self.body = body + +class HttpRequestHandler(abstractweb.HttpRequestHandlerMixin, object): + HttpResponse = HttpResponse # Class + + def __init__(self, webServer, httpRequest): + self.webServer = webServer + self.httpRequest = httpRequest + + def respondRedirectTemporarily(self, url): + return self.httpRequest.client.redirect(url) class Internet(object): @@ -118,41 +118,10 @@ class Internet(object): raise Exception('Unknown web site: %s' % url) -class Simulation(object): - test = None # The testing instance - - def __init__(self, test): - self.test = test - - def fail(self, msg = None): - return self.test.fail(msg) - - def failIf(self, expr, msg = None): - return self.test.failIf(expr, msg) - - def failIfAlmostEqual(self, first, second, places = 7, msg = None): - return self.test.failIfAlmostEqual(first, second, places, msg) - - def failIfEqual(self, first, second, msg = None): - return self.test.failIfEqual(first, second, msg) - - def failUnless(self, expr, msg = None): - return self.test.failUnless(expr, msg) - - def failUnlessAlmostEqual(self, first, second, places = 7, msg = None): - return self.test.failUnlessAlmostEqual(first, second, places, msg) - - def failUnlessRaises(self, excClass, callableObj, *args, **kwargs): - return self.test.failUnlessRaises(self, excClass, callableObj, *args, **kwargs) - - def failUnlessEqual(self, first, second, msg = None): - return self.test.failUnlessEqual(first, second, msg) - - class WebClient(object): internet = None keyring = None - requestHeaders = { + httpRequestHeaders = { 'User-Agent': 'LassoSimulator/0.0.0', 'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html', } @@ -168,15 +137,14 @@ class WebClient(object): return self.sendHttpRequest('GET', url) def sendHttpRequest(self, method, url, headers = None, body = None, form = None): - requestHeaders = self.requestHeaders if headers: - requestHeaders = self.requestHeaders.copy() + httpRequestHeaders = self.httpRequestHeaders.copy() for name, value in headers.iteritems(): - requestHeaders[name] = value + httpRequestHeaders[name] = value else: - requestHeaders = self.requestHeaders + httpRequestHeaders = self.httpRequestHeaders return HttpRequest( - self, method, url, headers = requestHeaders, body = body, form = form).send() + self, method, url, headers = httpRequestHeaders, body = body, form = form).send() def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None, form = None): @@ -225,20 +193,19 @@ class WebUser(object): self.uniqueId = uniqueId -class WebSite(WebClient, Simulation): +class WebSite(WebClient): """Simulation of a web site""" - lastWebSessionId = 0 - providerId = None # The Liberty providerID of this web site - responseHeaders = { + httpResponseHeaders = { 'Server': 'Lasso Simulator Web Server', } + lastWebSessionId = 0 + providerId = None # The Liberty providerID of this web site url = None # The main URL of web site webUsers = None webSessions = None - def __init__(self, test, internet, url): - Simulation.__init__(self, test) + def __init__(self, internet, url): WebClient.__init__(self, internet) self.url = url self.webUserIdsByNameIdentifier = {} @@ -257,14 +224,6 @@ class WebSite(WebClient, Simulation): client.webSessionIds[self.url] = self.lastWebSessionId return webSession - def doHttpRequest(self, httpRequest): - url = httpRequest.url - if url.startswith(self.url): - url = url[len(self.url):] - methodName = url.split('?', 1)[0].replace('/', '') - method = getattr(self, methodName) - return method(httpRequest) - def getIdentityDump(self, principal): webSession = self.getWebSession(principal) webUser = self.getWebUserFromWebSession(webSession) @@ -309,13 +268,15 @@ class WebSite(WebClient, Simulation): return None return self.webUsers.get(webUserId, None) - def newHttpResponse(self, statusCode, statusMessage = None, headers = None, body = None): - responseHeaders = self.responseHeaders - if headers: - responseHeaders = self.responseHeaders.copy() - for name, value in headers.iteritems(): - responseHeaders[name] = value - else: - responseHeaders = self.responseHeaders - return HttpResponse( - statusCode, statusMessage = statusMessage, headers = headers, body = body) + def handleHttpRequest(self, httpRequest): + httpRequestHandler = HttpRequestHandler(self, httpRequest) + return self.handleHttpRequestHandler(httpRequestHandler) + + def handleHttpRequestHandler(self, httpRequestHandler): + methodName = httpRequestHandler.httpRequest.path.replace('/', '') + try: + method = getattr(self, methodName) + except AttributeError: + return httpRequestHandler.respond( + 404, 'Path "%s" Not Found.' % httpRequestHandler.httpRequest.path) + return method(httpRequestHandler) |