diff options
| author | Emmanuel Raviart <eraviart@entrouvert.com> | 2004-08-25 10:12:13 +0000 |
|---|---|---|
| committer | Emmanuel Raviart <eraviart@entrouvert.com> | 2004-08-25 10:12:13 +0000 |
| commit | 9d460cf67c3999a0ec7723c377a60df07268b5c8 (patch) | |
| tree | c385668c848d9a322a6ec45433af18d00d0f50c7 /python/tests | |
| parent | 738257f33a816017ba2edf01288f14f61ca2f21f (diff) | |
| download | lasso-9d460cf67c3999a0ec7723c377a60df07268b5c8.tar.gz lasso-9d460cf67c3999a0ec7723c377a60df07268b5c8.tar.xz lasso-9d460cf67c3999a0ec7723c377a60df07268b5c8.zip | |
Removed obsolete Python test framework.
Diffstat (limited to 'python/tests')
| -rw-r--r-- | python/tests/IdentityProvider.py | 257 | ||||
| -rw-r--r-- | python/tests/LibertyEnabledClientProxy.py | 131 | ||||
| -rw-r--r-- | python/tests/LibertyEnabledProxy.py | 65 | ||||
| -rw-r--r-- | python/tests/Provider.py | 60 | ||||
| -rw-r--r-- | python/tests/ServiceProvider.py | 321 | ||||
| -rw-r--r-- | python/tests/abstractweb.py | 329 | ||||
| -rw-r--r-- | python/tests/assertions.py | 126 | ||||
| -rw-r--r-- | python/tests/builtins.py | 45 | ||||
| -rw-r--r-- | python/tests/http.py | 935 | ||||
| -rw-r--r-- | python/tests/liberty.py | 60 | ||||
| -rw-r--r-- | python/tests/libertysimulator.py | 60 | ||||
| -rw-r--r-- | python/tests/login_tests.py | 257 | ||||
| -rwxr-xr-x | python/tests/sample-idp.py | 150 | ||||
| -rwxr-xr-x | python/tests/sample-lep.py | 152 | ||||
| -rwxr-xr-x | python/tests/sample-sp-lep.py | 147 | ||||
| -rwxr-xr-x | python/tests/sample-sp.py | 147 | ||||
| -rw-r--r-- | python/tests/submissions.py | 292 | ||||
| -rw-r--r-- | python/tests/web.py | 159 | ||||
| -rw-r--r-- | python/tests/websimulator.py | 244 |
19 files changed, 27 insertions, 3910 deletions
diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py deleted file mode 100644 index 90673d61..00000000 --- a/python/tests/IdentityProvider.py +++ /dev/null @@ -1,257 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import lasso - -import Provider - - -class IdentityProviderMixin(Provider.ProviderMixin): - soapResponseMsgs = None - - def __init__(self): - Provider.ProviderMixin.__init__(self) - self.soapResponseMsgs = {} - - def login_done(self, handler, userAuthenticated, authenticationMethod): - # Reconstruct Lasso login from dump. - lassoServer = self.getLassoServer() - session = handler.session - failUnless(session) - failUnless(session.lassoLoginDump) - login = lasso.Login.new_from_dump(lassoServer, session.lassoLoginDump) - del session.lassoLoginDump - # Set identity & session in login, because session.lassoLoginDump doesn't contain them. - if session.lassoSessionDump is not None: - login.set_session_from_dump(session.lassoSessionDump) - user = handler.user - if user is not None and user.lassoIdentityDump is not None: - login.set_identity_from_dump(user.lassoIdentityDump) - - return self.singleSignOn_done(handler, login, userAuthenticated, authenticationMethod) - - def singleSignOn(self, handler): - lassoServer = self.getLassoServer() - if handler.httpRequest.method == 'GET': - # Single sign-on using HTTP redirect. - login = lasso.Login(lassoServer) - session = handler.session - if session is not None and session.lassoSessionDump is not None: - login.set_session_from_dump(session.lassoSessionDump) - user = handler.user - if user is not None and user.lassoIdentityDump is not None: - login.set_identity_from_dump(user.lassoIdentityDump) - login.init_from_authn_request_msg(handler.httpRequest.query, lasso.httpMethodRedirect) - - if not login.must_authenticate(): - userAuthenticated = user is not None - authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME - return self.singleSignOn_done( - handler, login, userAuthenticated, authenticationMethod) - - # The authentication may need to change page (needed for a HTML form, for example). - # => Save Lasso login as a dump in session, so that we retrieve it once the user is - # authenticated. - if session is None: - session = handler.createSession() - session.publishToken = True - session.lassoLoginDump = login.dump() - return self.callHttpFunction(self.login, handler) - - elif handler.httpRequest.method == 'POST' \ - and handler.httpRequest.headers.get('Content-Type', None) == 'text/xml': - # SOAP request => LECP single sign-on. - lecp = lasso.Lecp(lassoServer) - session = handler.session - if session is not None and session.lassoSessionDump is not None: - lecp.set_session_from_dump(session.lassoSessionDump) - user = handler.user - if user is not None and user.lassoIdentityDump is not None: - lecp.set_identity_from_dump(user.lassoIdentityDump) - lecp.init_from_authn_request_msg(handler.httpRequest.body, lasso.httpMethodSoap) - # FIXME: lecp.must_authenticate() should always return False. Because we are in SOAP. - # And we can't do a HTTP redirect in SOAP. - # The other solution is that we shall not call lecp.must_authenticate(). - # failIf(lecp.must_authenticate()) - userAuthenticated = user is not None - authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME - lecp.build_authn_response_envelope_msg( - userAuthenticated, authenticationMethod, - "2005-05-03T16:12:00Z", # FIXME: reauthenticateOnOrAfter - ) - soapResponseMsg = lecp.msg_body - failUnless(soapResponseMsg) - # FIXME: Lasso should set a lecp.msg_content_type to - # "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with - # other profiles. - # contentType = lecp.msg_content_type - # failUnlessEqual(contentType, 'application/vnd.liberty-response+xml') - contentType = 'application/vnd.liberty-response+xml' - headers = { - 'Content-Type': contentType, - 'Cache-Control': 'no-cache', - 'Pragma': 'no-cache', - } - headers.update(self.libertyEnabledHeaders) - return handler.respond(headers = headers, body = soapResponseMsg) - else: - return handler.respond( - 400, - 'Bad Request: Method %s not handled by singleSignOn' % handler.httpRequest.method) - - def singleSignOn_done(self, handler, login, userAuthenticated, authenticationMethod): - failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME - login.build_artifact_msg( - userAuthenticated, authenticationMethod, - "2005-05-03T16:12:00Z", # FIXME: reauthenticateOnOrAfter - lasso.httpMethodRedirect) - if userAuthenticated: - session = handler.session - failUnless(session) - user = handler.user - failUnless(user) - if login.is_identity_dirty(): - lassoIdentityDump = login.get_identity().dump() - failUnless(lassoIdentityDump) - user.lassoIdentityDump = lassoIdentityDump - failUnless(login.is_session_dirty()) - lassoSessionDump = login.get_session().dump() - failUnless(lassoSessionDump) - session.lassoSessionDump = lassoSessionDump - nameIdentifier = login.nameIdentifier - failUnless(nameIdentifier) - self.userIdsByNameIdentifier[nameIdentifier] = user.uniqueId - self.sessionTokensByNameIdentifier[nameIdentifier] = session.token - else: - failIf(login.is_identity_dirty()) - failIf(login.is_session_dirty()) - artifact = login.assertionArtifact - failUnless(artifact) - soapResponseMsg = login.response_dump - failUnless(soapResponseMsg) - self.soapResponseMsgs[artifact] = soapResponseMsg - responseUrl = login.msg_url - failUnless(responseUrl) - return handler.respondRedirectTemporarily(responseUrl) - - def soapEndpoint(self, handler): - soapRequestMsg = handler.httpRequest.body - requestType = lasso.get_request_type_from_soap_msg(soapRequestMsg) - if requestType == lasso.requestTypeLogin: - lassoServer = self.getLassoServer() - login = lasso.Login(lassoServer) - # FIXME: What should we return when there is an error in process_request_msg? - # FIXME: Create a new Lasso function build_response_msg, with either None or - # soapResponseMessage as argument. It is called after process_request_message and - # should either create a new response or keep the one in soapResponseMsg (if it already - # contained an error or if there is no error). - login.process_request_msg(soapRequestMsg) - artifact = login.assertionArtifact - failUnless(artifact) - soapResponseMsg = self.soapResponseMsgs.get(artifact, None) - if soapResponseMsg is None: - raise Exception('FIXME: Handle the case when artifact is wrong') - del self.soapResponseMsgs[artifact] - return handler.respond( - headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) - elif requestType == lasso.requestTypeLogout: - lassoServer = self.getLassoServer() - logout = lasso.Logout(lassoServer, lasso.providerTypeIdp) - logout.process_request_msg(soapRequestMsg, lasso.httpMethodSoap) - nameIdentifier = logout.nameIdentifier - failUnless(nameIdentifier) - - # Retrieve session and user using name identifier. - session = self.getSessionFromNameIdentifier(nameIdentifier) - if session is None: - raise Exception('FIXME: Handle the case when there is no web session') - user = self.getUserFromNameIdentifier(nameIdentifier) - if user is None: - raise Exception('FIXME: Handle the case when there is no web user') - - # The identity provider may want to do some things, before logging out. - self.soapEndpoint_logout_prepare(handler, session, user) - - if session.lassoSessionDump is None: - raise Exception( - 'FIXME: Handle the case when there is no session dump in web session') - logout.set_session_from_dump(session.lassoSessionDump) - if user.lassoIdentityDump is None: - raise Exception( - 'FIXME: Handle the case when there is no identity dump in web user') - logout.set_identity_from_dump(user.lassoIdentityDump) - - logout.validate_request() - failIf(logout.is_identity_dirty()) - lassoIdentity = logout.get_identity() - failUnless(lassoIdentity) - lassoIdentityDump = lassoIdentity.dump() - failUnless(lassoIdentityDump) - failUnless(logout.is_session_dirty()) - - # Log the user out. - # It is done before logout from other service providers, since we don't want to - # accept passive login connections inbetween. - del session.lassoSessionDump - del session.userId - del user.sessionToken - # We also delete the session, but it is not mandantory, since the user is logged out - # anyway. - del self.sessions[session.token] - nameIdentifier = logout.nameIdentifier - failUnless(nameIdentifier) - del self.sessionTokensByNameIdentifier[nameIdentifier] - - # Tell each other service provider to logout the user. - otherProviderId = logout.get_next_providerID() - while otherProviderId is not None: - logout.init_request(otherProviderId) - logout.build_request_msg() - - soapEndpoint = logout.msg_url - failUnless(soapEndpoint) - soapRequestMsg = logout.msg_body - failUnless(soapRequestMsg) - httpResponse = self.sendHttpRequest( - 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, - body = soapRequestMsg) - failUnlessEqual(httpResponse.statusCode, 200) - logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) - - otherProviderId = logout.get_next_providerID() - - logout.build_response_msg() - soapResponseMsg = logout.msg_body - failUnless(soapResponseMsg) - return handler.respond( - headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) - else: - raise Exception('Unknown request type: %s' % requestType) - - def soapEndpoint_logout_prepare(self, handler, session, user): - """Prepare logout. - - Override this method to do some processing before identity provider logout proceeds. - """ - pass diff --git a/python/tests/LibertyEnabledClientProxy.py b/python/tests/LibertyEnabledClientProxy.py deleted file mode 100644 index b925d8fc..00000000 --- a/python/tests/LibertyEnabledClientProxy.py +++ /dev/null @@ -1,131 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import lasso - -import abstractweb - - -class LibertyEnabledClientProxyMixin(abstractweb.WebClientMixin): - # A service provider MAY provide a list of identity providers it recognizes by including the - # <lib:IDPList> element in the <lib:AuthnRequestEnvelope>. The format and processing rules for - # the identity provider list MUST be as defined in [LibertyProtSchema]. - - # The identity provider list can be used by the LECP to create a user identifier to be - # presented to the Principal. For example, the LECP could compare the list of the Principal's - # known identities (and the identities of the identity provider that provides those identities) - # against the list provided by the service provider and then only display the intersection. - - # If the service provider does not support the LECP-advertised Liberty version, the service - # provider MUST return to the LECP an HTTP 501 response with the reason phrase "Unsupported - # Liberty Version." - # - # The responses in step 3 and step 6 SHOULD NOT be cached. To this end service providers and - # identity providers SHOULD place both "Cache-Control: no-cache" and "Pragma: no-cache" on - # their responses to ensure that the LECP and any intervening proxies will not cache the - # response. - - # If the LECP discovers a syntax error due to the service provider or cannot proceed any - # further for other reasons (for example, cannot resolve identity provider, cannot reach the - # identity provider, etc.), the LECP MUST return to the service provider a <lib:AuthnResponse> - # with a <samlp:Status> indicating the desired error element as defined in - # [LibertyProtSchema]. The <lib:AuthnResponse> containing the error status MUST be sent using - # a POST to the service provider's assertion consumer service URL obtained from the - # <lib:AssertionConsumerServiceURL> element of the <lib:AuthnRequestEnvelope>. The POST MUST - # be a form that contains the field LARES with the value being the <lib:AuthnResponse> - # protocol message as defined in [LibertyProtSchema], containing the <samlp:Status>. The - # <lib:AuthnResponse> MUST be encoded by applying a base64 transformation (refer to - # [RFC2045]) to the <lib:AuthnResponse> and all its elements. - - httpRequestHeaders = abstractweb.WebClientMixin.httpRequestHeaders.copy() - httpRequestHeaders.update({ - # FIXME: Is this the correct syntax for several URLs in LIBV? - 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1', - 'Liberty-Agent': 'LassoSimulator/0.0.0', - # FIXME: As an alternative to 'Liberty-Enabled' header, a user agent may use: - # 'User-Agent': ' '.join(( - # httpRequestHeaders['User-Agent'], - # 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1')) - 'Accept': ','.join((httpRequestHeaders['Accept'], 'application/vnd.liberty-request+xml')) - }) - idpSite = None # The identity provider, this LECP will use to authenticate users. - lassoServerDump = None - principal = None - - def getLassoServer(self): - return lasso.Server.new_from_dump(self.lassoServerDump) - - def getSessionTokens(self): - # LECP is a proxy, not au principal, so it doesn't have its own sessionTokens. - if self.principal is None: - return {} - return self.principal.sessionTokens - - def login(self, principal, site, path): - self.principal = principal - - httpResponse = self.sendHttpRequestToSite(site, 'GET', path) - failUnlessEqual( - httpResponse.headers['Content-Type'], 'application/vnd.liberty-request+xml') - libertyEnabledHeader = httpResponse.headers.get('Liberty-Enabled') - failUnless(libertyEnabledHeader) - failUnless('LIBV=urn:liberty:iff:2003-08' in libertyEnabledHeader) - lassoServer = self.getLassoServer() - lecp = lasso.Lecp(lassoServer) - authnRequestEnvelope = httpResponse.body - lecp.process_authn_request_envelope_msg(authnRequestEnvelope) - # FIXME: The service provider could return an IDPList in authnRequestEnvelope, so that - # we verify that self.idpSingleSignOnServiceUrl belongs to one of them - lecp.build_authn_request_msg(self.idpSite.providerId) - failUnless(lecp.msg_url) - failUnless(lecp.msg_body) - httpResponse = self.sendHttpRequest( - 'POST', lecp.msg_url, headers = {'Content-Type': 'text/xml'}, - body = lecp.msg_body) - failUnlessEqual( - httpResponse.headers.get('Content-Type', None), 'application/vnd.liberty-response+xml') - libertyEnabledHeader = httpResponse.headers.get('Liberty-Enabled') - failUnless(libertyEnabledHeader) - failUnless('LIBV=urn:liberty:iff:2003-08' in libertyEnabledHeader) - lecp.process_authn_response_envelope_msg(httpResponse.body) - lecp.build_authn_response_msg() - failUnless(lecp.msg_url) - failUnless(lecp.msg_body) - - del self.principal - - # FIXME: Should we use 'multipart/form-data' for forms? - return self.sendHttpRequest( - 'POST', lecp.msg_url, headers = {'Content-Type': 'multipart/form-data'}, - form = {'LARES': lecp.msg_body}) - - def setKeyring(self, keyring): - # LECP is a proxy, not au principal, so it doesn't have its own keyring. - pass - - def setSessionTokens(self, sessionTokens): - # LECP is a proxy, not au principal, so it doesn't have its own sessionTokens. - pass - - sessionTokens = property(getSessionTokens, setSessionTokens) diff --git a/python/tests/LibertyEnabledProxy.py b/python/tests/LibertyEnabledProxy.py deleted file mode 100644 index 3b33e810..00000000 --- a/python/tests/LibertyEnabledProxy.py +++ /dev/null @@ -1,65 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import lasso - -from IdentityProvider import IdentityProviderMixin -from ServiceProvider import ServiceProviderMixin - - -class LibertyEnabledProxyMixin(IdentityProviderMixin, ServiceProviderMixin): - def __init__(self): - ServiceProviderMixin.__init__(self) - IdentityProviderMixin.__init__(self) - - def assertionConsumer_done(self, handler): - # Before, this proxy was considered as a service provider. Now it acts again as an identity - # provider. - # FIXME: We should retrieve authentication method from session.lassoSessionDump. - # FIXME: Handle Liberty ProxyCount. - return self.login_done(handler, True, lasso.samlAuthenticationMethodPassword) - - def login(self, handler): - # Before, this proxy was considered as an identity provider. Now it is a service provider. - # FIXME: Handle Liberty ProxyCount. - return ServiceProviderMixin.login(self, handler) - - def login_failed(self, handler): - # Before, this proxy was considered as a service provider. Now it acts again as an identity - # provider. - # FIXME: Handle Liberty ProxyCount. - return self.login_done(handler, False, None) - - def logout_done(self, handler, nameIdentifier): - # Before, this proxy was considered as a service provider. Now it acts again as an identity - # provider. - # FIXME: Handle Liberty ProxyCount. - - # Don't do logout_done actions, because they will be done in soapEndpoint. - return None - - def soapEndpoint_logout_prepare(self, handler, session, user): - # Before, this proxy was considered as an identity provider. Now it is a service provider. - # FIXME: Handle Liberty ProxyCount. - return self.logout_do(handler, session, user) diff --git a/python/tests/Provider.py b/python/tests/Provider.py deleted file mode 100644 index 5f40dad9..00000000 --- a/python/tests/Provider.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import lasso - -import abstractweb - - -class ProviderMixin(abstractweb.WebSiteMixin): - lassoServerDump = None - libertyEnabledHeaders = { - 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1', - } - providerId = None # The Liberty providerID of this web site - sessionTokensByNameIdentifier = None - userIdsByNameIdentifier = None - - def __init__(self): - abstractweb.WebSiteMixin.__init__(self) - self.userIdsByNameIdentifier = {} - self.sessionTokensByNameIdentifier = {} - - def getLassoServer(self): - return lasso.Server.new_from_dump(self.lassoServerDump) - - def getSessionFromNameIdentifier(self, nameIdentifier): - sessionToken = self.sessionTokensByNameIdentifier.get(nameIdentifier, None) - if sessionToken is None: - # The user has no federation on this site or has no authentication assertion for this - # federation. - return None - return self.sessions.get(sessionToken, None) - - def getUserFromNameIdentifier(self, nameIdentifier): - userId = self.userIdsByNameIdentifier.get(nameIdentifier, None) - if userId is None: - # The user has no federation on this site. - return None - return self.users.get(userId, None) diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py deleted file mode 100644 index b906dec9..00000000 --- a/python/tests/ServiceProvider.py +++ /dev/null @@ -1,321 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import lasso - -import Provider - - -class ServiceProviderMixin(Provider.ProviderMixin): - createNewAccountWhenNewFederationForUnknownUser = False - idpSite = None # The identity provider, this service provider will use to authenticate users. - - def assertionConsumer(self, handler): - lassoServer = self.getLassoServer() - login = lasso.Login(lassoServer) - - if handler.httpRequest.method == 'GET': - relayState = handler.httpRequest.getQueryField('RelayState', None) - login.init_request(handler.httpRequest.query, lasso.httpMethodRedirect) - login.build_request_msg() - - soapEndpoint = login.msg_url - failUnless(soapEndpoint) - soapRequestMsg = login.msg_body - failUnless(soapRequestMsg) - httpResponse = self.sendHttpRequest( - 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, - body = soapRequestMsg) - failUnlessEqual(httpResponse.statusCode, 200) - try: - login.process_response_msg(httpResponse.body) - except lasso.Error, error: - if error.code == -7: # FIXME: This will change, he said. - return handler.respond( - 401, - 'Access Unauthorized: User authentication failed on identity provider.') - else: - raise - elif handler.httpRequest.method == 'POST': - relayState = handler.httpRequest.getFormField('RelayState', None) - authnResponseMsg = handler.httpRequest.getFormField('LARES', None) - failUnless(authnResponseMsg) - # FIXME: Should we do an init before process_authn_response_msg? - try: - login.process_authn_response_msg(authnResponseMsg) - except lasso.Error, error: - if error.code == -7: # FIXME: This will change, he said. - return handler.respond( - 401, - 'Access Unauthorized: User authentication failed on identity provider.') - else: - raise - else: - return handler.respond( - 400, - 'Bad Request: Method %s not handled by assertionConsumer' - % handler.httpRequest.method) - - nameIdentifier = login.nameIdentifier - failUnless(nameIdentifier) - - # Retrieve session dump, using name identifier or else try to use the client web session. - # If session dump exists, give it to Lasso, so that it updates it. - session = self.getSessionFromNameIdentifier(nameIdentifier) - if session is None: - session = handler.session - if session is not None and session.lassoSessionDump is not None: - login.set_session_from_dump(session.lassoSessionDump) - # Retrieve identity dump, using name identifier or else try to retrieve him from web - # session. If identity dump exists, give it to Lasso, so that it updates it. - user = self.getUserFromNameIdentifier(nameIdentifier) - if user is None: - user = handler.user - if user is not None and user.lassoIdentityDump is not None: - login.set_identity_from_dump(user.lassoIdentityDump) - - login.accept_sso() - if user is not None and user.lassoIdentityDump is None: - failUnless(login.is_identity_dirty()) - lassoIdentity = login.get_identity() - failUnless(lassoIdentity) - lassoIdentityDump = lassoIdentity.dump() - failUnless(lassoIdentityDump) - failUnless(login.is_session_dirty()) - lassoSession = login.get_session() - failUnless(lassoSession) - lassoSessionDump = lassoSession.dump() - failUnless(lassoSessionDump) - - # User is now authenticated. - - # If there was no web session yet, create it. Idem for the web user account. - if session is None: - session = handler.createSession() - session.publishToken = True - if user is None: - # The user has been successfully authenticated on identity provider, but he has no - # account on this service provider or his account is not federated yet and he is not - # logged. - # A real service provider would ask user to login locally to create a federation. Or it - # would ask user informations to create a local account. Or it would automatically - # create a new account... - if self.createNewAccountWhenNewFederationForUnknownUser: - user = handler.createUser() - else: - # Save some informations in session for a short time (until user is logged). - # These informations can't be stored as fields in URL query, because they are too - # large. - session.lassoIdentityDump = lassoIdentityDump - session.lassoSessionDump = lassoSessionDump - session.nameIdentifier = nameIdentifier - session.relayState = relayState - - # We do a redirect now for two reasons: - # - We don't want the user to be able to reload assertionConsumer page (because the - # artifact has been removed from identity-provider). - # - For HTTP authentication, we don't want to emit a 401 Unauthorized that would - # force the Principal to reload the assertionConsumer page. - # FIXME: Add the session token to redirect URL. - return handler.respondRedirectTemporarily('/login_local') - - session.userId = user.uniqueId - user.sessionToken = session.token - - # Store the updated identity dump and session dump. - session.lassoSessionDump = lassoSessionDump - if login.is_identity_dirty(): - user.lassoIdentityDump = lassoIdentityDump - - self.userIdsByNameIdentifier[nameIdentifier] = user.uniqueId - self.sessionTokensByNameIdentifier[nameIdentifier] = session.token - - # We do a redirect now because we don't want the user to be able to reload - # assertionConsumer page (because the artifact has been removed from identity-provider). - # FIXME: Add the session token to redirect URL. - redirectUrl = '/assertionConsumer_done' - if relayState: - redirectUrl = '%s?RelayState=%s' % (redirectUrl, relayState) - return handler.respondRedirectTemporarily(redirectUrl) - - def assertionConsumer_done(self, handler): - # A real service provider could use the string relayState for any purpose. - relayState = handler.httpRequest.getQueryField('RelayState', None) - return handler.respond( - 200, headers = {'Content-Type': 'text/plain'}, - body = 'Liberty authentication succeeded\nRelayState = %s' % relayState) - - def login(self, handler): - libertyEnabled = handler.httpRequest.headers.get('Liberty-Enabled', None) - userAgent = handler.httpRequest.headers.get('User-Agent', None) - # FIXME: Lasso should have a function to compute useLecp. - # Or this should be done in lasso.Login(lassoServer, libertyEnabled, userAgent) - useLecp = False - if libertyEnabled: - useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled - if not useLecp: - return handler.respond(501, 'Unsupported Liberty Version.') - elif userAgent: - useLecp = 'urn:liberty:iff:2003-08' in userAgent - if not useLecp and "LIBV=" in userAgent: - return handler.respond(501, 'Unsupported Liberty Version.') - else: - useLecp = False - - forceAuthn = handler.httpRequest.getQueryBoolean('forceAuthn', False) - isPassive = handler.httpRequest.getQueryBoolean('isPassive', False) - relayState = handler.httpRequest.getQueryField('RelayState', None) - lassoServer = self.getLassoServer() - if useLecp: - lecp = lasso.Lecp(lassoServer) - lecp.init_authn_request() - failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest) - - # FIXME: This protocol profile should be set by default by Lasso. - lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost) - - # Same treatement as for non LECP login. - if forceAuthn: - lecp.request.set_forceAuthn(forceAuthn) - if not isPassive: - lecp.request.set_isPassive(isPassive) - lecp.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) - lecp.request.set_consent(lasso.libConsentObtained) - if relayState: - lecp.request.set_relayState(relayState) - - lecp.build_authn_request_envelope_msg() - authnRequestEnvelopeMsg = lecp.msg_body - failUnless(authnRequestEnvelopeMsg) - # FIXME: Lasso should set a lecp.msg_content_type to - # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with - # other profiles. - # contentType = lecp.msg_content_type - # failUnlessEqual(contentType, 'application/vnd.liberty-request+xml') - contentType = 'application/vnd.liberty-request+xml' - headers = { - 'Content-Type': contentType, - 'Cache-Control': 'no-cache', - 'Pragma': 'no-cache', - } - headers.update(self.libertyEnabledHeaders) - return handler.respond(headers = headers, body = authnRequestEnvelopeMsg) - else: - login = lasso.Login(lassoServer) - login.init_authn_request(lasso.httpMethodRedirect) - #login.init_authn_request() - failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) - if forceAuthn: - login.request.set_forceAuthn(forceAuthn) - if not isPassive: - login.request.set_isPassive(isPassive) - login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) - login.request.set_consent(lasso.libConsentObtained) - if relayState: - login.request.set_relayState(relayState) - login.build_authn_request_msg(self.idpSite.providerId) - authnRequestUrl = login.msg_url - failUnless(authnRequestUrl) - return handler.respondRedirectTemporarily(authnRequestUrl) - - def login_done(self, handler, userAuthenticated, authenticationMethod): - # Remove informations that are no more needed in session. - session = handler.session - lassoIdentityDump = session.lassoIdentityDump - del session.lassoIdentityDump - nameIdentifier = session.nameIdentifier - del session.nameIdentifier - relayState = session.relayState - del session.relayState - - if not userAuthenticated: - return self.login_failed(handler) - - # User has been authenticated => Create federation. - user = handler.user - user.lassoIdentityDump = lassoIdentityDump - self.userIdsByNameIdentifier[nameIdentifier] = user.uniqueId - self.sessionTokensByNameIdentifier[nameIdentifier] = session.token - # Note: The uppercase for RelayState below is not a bug. - return self.callHttpFunction(self.assertionConsumer_done, handler, RelayState = relayState) - - def logout(self, handler): - session = handler.session - if session is None: - return handler.respond(401, 'Access Unauthorized: User has no session opened.') - user = handler.user - if user is None: - return handler.respond(401, 'Access Unauthorized: User is not logged in.') - return self.logout_do(handler, session, user) - - def logout_do(self, handler, session, user): - lassoServer = self.getLassoServer() - logout = lasso.Logout(lassoServer, lasso.providerTypeSp) - if user.lassoIdentityDump is not None: - logout.set_identity_from_dump(user.lassoIdentityDump) - if session.lassoSessionDump is not None: - logout.set_session_from_dump(session.lassoSessionDump) - logout.init_request() - logout.build_request_msg() - - soapEndpoint = logout.msg_url - failUnless(soapEndpoint) - soapRequestMsg = logout.msg_body - failUnless(soapRequestMsg) - httpResponse = self.sendHttpRequest( - 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) - failUnlessEqual(httpResponse.statusCode, 200) - - logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) - failIf(logout.is_identity_dirty()) - identity = logout.get_identity() - failUnless(identity) - lassoIdentityDump = identity.dump() - failUnless(lassoIdentityDump) - failUnless(logout.is_session_dirty()) - lassoSession = logout.get_session() - if lassoSession is None: - # The user is no more authenticated on any identity provider. Log him out. - del session.lassoSessionDump - del session.userId - del user.sessionToken - del handler.user - # We also delete the session, but it is not mandantory, since the user is logged out - # anyway. - del handler.session - del self.sessions[session.token] - else: - # The user is still logged in on some other identity providers. - lassoSessionDump = lassoSession.dump() - failUnless(lassoSessionDump) - session.lassoSessionDump = lassoSessionDump - nameIdentifier = logout.nameIdentifier - return self.logout_done(handler, nameIdentifier) - - def logout_done(self, handler, nameIdentifier): - failUnless(nameIdentifier) - del self.sessionTokensByNameIdentifier[nameIdentifier] - - return handler.respond(200, headers = {'Content-Type': 'text/plain'}, - body = 'Liberty logout succeeded') diff --git a/python/tests/abstractweb.py b/python/tests/abstractweb.py deleted file mode 100644 index 94fb644d..00000000 --- a/python/tests/abstractweb.py +++ /dev/null @@ -1,329 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Abstract web classes for HTTP clients and servers or simulators -# By: Frederic Peters <fpeters@entrouvert.com> -# Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://www.entrouvert.org -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - - -"""Abstract web classes for HTTP clients and servers or simulators""" - - -class HttpRequestMixin: - body = None - headers = None - method = None # 'GET' or 'POST' or 'PUT' or... - path = None - pathAndQuery = None - query = None - scheme = None # 'http' or 'https' - url = None - - def getFormField(self, name, default = None): - raise NotImplementedError - - def getQueryBoolean(self, name, default = None): - fieldValue = self.getQueryField(name, 'none') - if fieldValue == 'none': - return default - else: - return fieldValue.lower not in ('', '0', 'false') - - def getQueryField(self, name, default = None): - if self.query: - for field in self.query.split('&'): - fieldName, fieldValue = field.split('=') - if name == fieldName: - return fieldValue - return default - - def hasFormField(self, name): - raise NotImplementedError - - def hasQueryField(self, name): - if self.query: - for field in self.query.split('&'): - fieldName, fieldValue = field.split('=') - if name == fieldName: - return True - return False - - -class FunctionHttpRequest(HttpRequestMixin, object): - method = 'GET' - previousHttpRequest = None - queryFields = None - - def __init__(self, previousHttpRequest, **queryFields): - self.previousHttpRequest = previousHttpRequest - self.queryFields = queryFields - - def getFormField(self, name, default = None): - return default - - def getHeaders(self): - return self.previousHttpRequest.headers - - def getQueryField(self, name, default = None): - return self.queryFields.get(name, default) - - def hasFormField(self, name): - return False - - def hasQueryField(self, name): - return name in self.queryFields - - headers = property(getHeaders) - - -class HttpResponseMixin: - body = None - defaultStatusMessages = { - '100': 'Continue', - '101': 'Switching Protocols', - '200': 'OK', - '201': 'Created', - '202': 'Accepted', - '203': 'Non-Authoritative Information', - '204': 'No Content', - '205': 'Reset Content', - '206': 'Partial Content', - '300': 'Multiple Choices', - '301': 'Moved Permanently', - '302': 'Found', - '303': 'See Other', - '304': 'Not Modified', - '305': 'Use Proxy', - '307': 'Temporary Redirect', - '400': 'Bad Request', - '401': 'Unauthorized', - '402': 'Payment Required', - '403': 'Forbidden', - '404': 'Not Found', - '405': 'Method Not Allowed', - '406': 'Not Acceptable', - '407': 'Proxy Authentication Required', - '408': 'Request Time-out', - '409': 'Conflict', - '410': 'Gone', - '411': 'Length Required', - '412': 'Precondition Failed', - '413': 'Request Entity Too Large', - '414': 'Request-URI Too Large', - '415': 'Unsupported Media Type', - '416': 'Requested range not satisfiable', - '417': 'Expectation Failed', - '500': 'Internal Server Error', - '501': 'Not Implemented', - '502': 'Bad Gateway', - '503': 'Service Unavailable', - '504': 'Gateway Time-out', - '505': 'HTTP Version not supported', - } - headers = None - statusCode = None # 200 or... - statusMessage = None - - def __init__(self, httpRequestHandler, statusCode, statusMessage = None, headers = None, - body = None): - self.statusCode = statusCode - if statusMessage: - self.statusMessage = statusMessage - else: - self.statusMessage = self.defaultStatusMessages.get(statusCode) - httpResponseHeaders = httpRequestHandler.site.httpResponseHeaders - if headers: - httpResponseHeaders = httpResponseHeaders.copy() - for name, value in headers.iteritems(): - httpResponseHeaders[name] = value - if httpResponseHeaders: - self.headers = httpResponseHeaders - if body: - self.body = body - - def send(self, httpRequestHandler): - raise NotImplementedError - - -class HttpRequestHandlerMixin: - httpRequest = None - HttpResponse = None # Class - httpResponse = None - session = None - user = None - site = None # The virtual host - - def createSession(self): - session = self.site.newSession() - self.session = session - return session - - def createUser(self): - user = self.site.newUser() - self.user = user - return user - - def respond(self, statusCode = 200, statusMessage = None, headers = None, body = None): - self.httpResponse = self.HttpResponse( - self, statusCode, statusMessage = statusMessage, headers = headers, body = body) - - # Session and user must be saved before responding. Otherwise, when the server is - # multitasked or multithreaded, it may receive a new HTTP request before the session is - # saved. - if self.session is not None and self.session.isDirty: - self.session.save() - if self.user is not None and self.user.isDirty: - self.user.save() - - return self.httpResponse.send(self) - - def respondRedirectTemporarily(self, url): - raise NotImplementedError - - -class WebClientMixin: - httpRequestHeaders = { - 'User-Agent': 'LassoSimulator/0.0.0', - 'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html', - } - - def __init__(self): - pass - - -class WebSessionMixin(WebClientMixin): - isDirty = True - token = None - userId = None # ID of logged user - - def __init__(self, token): - WebClientMixin.__init__(self) - self.token = token - - def getSimpleLabel(self): - return self.token - - def save(self): - pass - - simpleLabel = property(getSimpleLabel) - - -class WebSiteMixin: - FunctionHttpRequest = FunctionHttpRequest # Class - httpResponseHeaders = { - 'Server': 'Lasso Simulator Web Server', - } - lastSessionToken = 0 - lastUserId = 0 - users = None - sessions = None - WebSession = None # Class - WebUser = None # Class - - def __init__(self): - self.users = {} - self.sessions = {} - - def authenticateX509User(self, clientCertificate): - # We should check certificate (for example clientCertificate.get_serial_number() - # and return the user if one matches, or None otherwise. - return None - - def authenticateLoginPasswordUser(self, login, password): - # We should check login & password and return the user if one matches or None otherwise. - return None - - def callHttpFunction(self, function, httpRequestHandler, **queryFields): - httpRequestHandler.httpRequest = self.FunctionHttpRequest( - httpRequestHandler.httpRequest, **queryFields) - try: - result = function(httpRequestHandler) - finally: - httpRequestHandler.httpRequest = httpRequestHandler.httpRequest.previousHttpRequest - return result - - def handleHttpRequestHandler(self, httpRequestHandler): - methodName = httpRequestHandler.httpRequest.path.replace('/', '') - try: - method = getattr(self, methodName) - except AttributeError: - return httpRequestHandler.respond( - 404, 'Path "%s" Not Found.' % httpRequestHandler.httpRequest.path) - return method(httpRequestHandler) - - def login(self, handler): - # On most site (except Liberty service providers), authentication is done locally. - return self.callHttpFunction(self.login_local, handler) - - def login_done(self, handler, userAuthenticated, authenticationMethod): - if not userAuthenticated: - return self.login_failed(handler) - return handler.respond( - 200, headers = {'Content-Type': 'text/plain'}, - body = 'Login terminated:\n userAuthenticated = %s\n authenticationMethod = %s' % ( - userAuthenticated, authenticationMethod)) - - def login_failed(self, handler): - return handler.respond(401, 'Access Unauthorized: User has no account.') - - def login_local(self, handler): - # Note: Once local login is done, the HTTP function login_done must be called. - raise NotImplementedError - - def newSession(self): - self.lastSessionToken += 1 - sessionToken = str(self.lastSessionToken) - session = self.WebSession(sessionToken) - self.sessions[sessionToken] = session - return session - - def newUser(self, name = None): - if name is None: - self.lastUserId += 1 - userId = str(self.lastUserId) - else: - userId = name - user = self.WebUser(userId, name = name) - self.users[userId] = user - return user - - -class WebUserMixin: - isDirty = True - name = None - sessionToken = None - uniqueId = None - - def __init__(self, uniqueId, name = None): - self.uniqueId = uniqueId - if name: - self.name = name - - def getSimpleLabel(self): - if self.name: - return self.name - else: - return 'Anonymous User #%s' % self.uniqueId - - def save(self): - pass - - simpleLabel = property(getSimpleLabel) diff --git a/python/tests/assertions.py b/python/tests/assertions.py deleted file mode 100644 index c142ef1b..00000000 --- a/python/tests/assertions.py +++ /dev/null @@ -1,126 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Assertion functions. -# By: Frederic Peters <fpeters@entrouvert.com> -# Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://www.entrouvert.org -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# -# -# Original code taken from Python2.3 unittest module: -# -# Copyright (c) 1999, 2000, 2001 Steve Purcell -# This module is free software, and you may redistribute it and/or modify -# it under the same terms as Python itself, so long as this copyright message -# and disclaimer are retained in their original form. -# -# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, -# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF -# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH -# DAMAGE. - -# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, -# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, -# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. - - -"""Assertion functions""" - - -import builtins - - -def fail(message = None): - """Fail immediately, with the given message.""" - raise self.failureException, message - - -def failIf(expression, message = None): - """Fail the test if the expression is true.""" - if expression: - raise self.failureException, message - - -def failIfAlmostEqual(first, second, places = 7, message = None): - """Fail if the two objects are equal as determined by their difference rounded to the given - number of decimal places (default 7) and comparing to zero. - - Note that decimal places (from zero) is usually not the same as significant digits - (measured from the most signficant digit). - """ - if round(second - first, places) == 0: - raise self.failureException, \ - (message or '%s == %s within %s places' % (`first`, `second`, `places`)) - - -def failIfEqual(first, second, message = None): - """Fail if the two objects are equal as determined by the '==' operator.""" - if first == second: - raise self.failureException, (message or '%s == %s' % (`first`, `second`)) - - -def failUnless(expression, message = None): - """Fail the test unless the expression is true.""" - if not expression: - raise self.failureException, message - - -def failUnlessAlmostEqual(first, second, places = 7, message = None): - """Fail if the two objects are unequal as determined by their difference rounded to the given - number of decimal places (default 7) and comparing to zero. - - Note that decimal places (from zero) is usually not the same as significant digits (measured - from the most signficant digit). - """ - if round(second - first, places) != 0: - raise self.failureException, \ - (message or '%s != %s within %s places' % (`first`, `second`, `places` )) - - -def failUnlessEqual(first, second, message = None): - """Fail if the two objects are unequal as determined by the '==' operator.""" - if not first == second: - raise self.failureException, \ - (message or '%s != %s' % (`first`, `second`)) - - -def failUnlessRaises(exceptionClass, callableObject, *arguments, **keywordArguments): - """Fail unless an exception of class exceptionClass is thrown by callableObject when invoked - with arguments arguments and keyword arguments keywordArguments. If a different type of - exception is thrown, it will not be caught, and the test case will be deemed to have suffered - an error, exactly as for an unexpected exception. - """ - try: - callableObject(*arguments, **keywordArguments) - except exceptionClass: - return - else: - if hasattr(exceptionClass, '__name__'): - exceptionName = exceptionClass.__name__ - else: - exceptionName = str(exceptionClass) - raise self.failureException, exceptionName - - -allGlobals = globals() -for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless', - 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'): - builtins.set(name, allGlobals[name]) diff --git a/python/tests/builtins.py b/python/tests/builtins.py deleted file mode 100644 index feb18719..00000000 --- a/python/tests/builtins.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Built-in variables access functions -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://www.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -"""Built-in variables access functions - -Allow the declaration of global variables accessible from everywhere -(in opposition to global variables which are only easily accessible inside -the module they are declared in). -""" - - -import __builtin__ - - -def delete(name): - del __builtin__.__dict__[name] - - -def get(name, default = None): - return __builtin__.__dict__.get(name, default) - - -def set(name, value): - __builtin__.__dict__[name] = value diff --git a/python/tests/http.py b/python/tests/http.py deleted file mode 100644 index 27e8a99a..00000000 --- a/python/tests/http.py +++ /dev/null @@ -1,935 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# HTTP Client and Server Enhanced Classes -# By: Frederic Peters <fpeters@entrouvert.com> -# Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://www.entrouvert.org -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - - -"""HTTP client and server enhanced classes - -Features: -- HTTPS using OpenSSL; -- web sessions (with or without cookie); -- user authentication (support of basic HTTP-authentication, X.509v3 certificate authentication, - HTML based authentication, etc). -""" - - -import base64 -import BaseHTTPServer -import Cookie -import cStringIO -import gzip -import httplib -import os -import socket -import SocketServer -import sys -import time - -try: - from OpenSSL import SSL -except ImportError: - SSL = None - -import abstractweb -import submissions - - -try: - logger -except NameError: - logger = None -if logger is None: - import logging as logger - - -class BaseHTTPSRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): - def setup(self): - """ - We need to use socket._fileobject Because SSL.Connection - doesn't have a 'dup'. Not exactly sure WHY this is, but - this is backed up by comments in socket.py and SSL/connection.c - """ - - self.connection = self.request # for doPOST - self.rfile = socket._fileobject(self.request, 'rb', self.rbufsize) - self.wfile = socket._fileobject(self.request, 'wb', self.wbufsize) - - -class BaseHTTPSServer(SocketServer.TCPServer): - def __init__(self, server_address, RequestHandlerClass, privateKeyFilePath, - certificateFilePath, peerCaCertificateFile = None, - certificateChainFilePath = None, verifyClient = None): - SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass) - self.verifyClient = verifyClient - - ctx = SSL.Context(SSL.SSLv23_METHOD) - nVerify = SSL.VERIFY_NONE - ctx.set_options(SSL.OP_NO_SSLv2) - - ctx.use_privatekey_file(privateKeyFilePath) - ctx.use_certificate_file(certificateFilePath) - if peerCaCertificateFile: - ctx.load_verify_locations(peerCaCertificateFile) - if certificateChainFilePath: - ctx.use_certificate_chain_file(certificateChainFilePath) - if verifyClient == 'require': - nVerify |= SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT - elif verifyClient in ('optional', 'optional_on_ca'): - nVerify |= SSL.VERIFY_PEER - ctx.set_verify(nVerify, self.verifyCallback) - - self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type)) - self.server_bind() - self.server_activate() - - def verifyCallback(self, connection, x509Object, errorNumber, errorDepth, returnCode): - logger.info('http.HttpsConnection(%s, %s, %s, %s, %s, %s)' % ( - self, connection, x509Object, errorNumber, errorDepth, returnCode)) - return returnCode - - #~ def server_bind(self): - #~ """Override server_bind to store the server name.""" - - #~ SocketServer.TCPServer.server_bind(self) - #~ host, port = self.socket.getsockname()[:2] - #~ self.server_name = socket.getfqdn(host) - #~ self.server_port = port - - -class HttpRequest(abstractweb.HttpRequestMixin, object): - handler = None - submission = None - - def __init__(self, handler): - self.handler = handler - self.submission = submissions.readSubmission(self.handler) - - def getBody(self): - return self.submission.readFile() - - def getHeaders(self): - return self.handler.headers - - def getMethod(self): - return self.handler.command - - def getPath(self): - return self.pathAndQuery.split('?', 1)[0] - - def getPathAndQuery(self): - return self.handler.path - - def getQuery(self): - splitedPathAndQuery = self.pathAndQuery.split('?', 1) - if len(splitedPathAndQuery) > 1: - return splitedPathAndQuery[1] - else: - return '' - - def getScheme(self): - return self.handler.scheme - - def getUrl(self): - return "%s://%s%s" % (self.scheme, self.headers.get('Host'), self.pathAndQuery) - - body = property(getBody) - headers = property(getHeaders) - method = property(getMethod) - path = property(getPath) - pathAndQuery = property(getPathAndQuery) - query = property(getQuery) - scheme = property(getScheme) - url = property(getUrl) - - -class HttpResponse(abstractweb.HttpResponseMixin, object): - def send(self, httpRequestHandler): - statusCode = self.statusCode - if statusCode == 401: - return self.send401(httpRequestHandler) - elif statusCode == 404: - return self.send404(httpRequestHandler) - assert statusCode in (200, 207) - if self.headers is None: - headers = {} - else: - headers = self.headers.copy() - if time.time() > httpRequestHandler.socketCreationTime + 300: - headers['Connection'] = 'close' - elif not httpRequestHandler.close_connection: - headers['Connection'] = 'Keep-Alive' - # TODO: Could also output Content-MD5. - lastModified = headers.get('Last-Modified') - ifModifiedSince = httpRequestHandler.httpRequest.headers.get('If-Modified-Since') - if lastModified and ifModifiedSince: - # We don't want to use bandwith if the file was not modified. - try: - lastModifiedTime = time.strptime(lastModified[:25], '%a, %d %b %Y %H:%M:%S') - ifModifiedSinceTime = time.strptime(ifModifiedSince[:25], '%a, %d %b %Y %H:%M:%S') - if lastModifiedTime[:8] <= ifModifiedSinceTime[:8]: - httpRequestHandler.send_response(304, 'Not Modified.') - for key in ('Connection', 'Content-Location'): - if key in headers: - httpRequestHandler.send_header(key, headers[key]) - httpRequestHandler.setCookie() - httpRequestHandler.end_headers() - return - except (ValueError, KeyError): - pass - data = self.body - if data is None: - data = '' - if isinstance(data, basestring): - dataFile = None - dataSize = len(data) - else: - dataFile = data - data = '' - if hasattr(dataFile, 'fileno'): - dataSize = os.fstat(dataFile.fileno())[6] - else: - # For StringIO and cStringIO classes. - dataSize = len(dataFile.getvalue()) - if dataFile is not None: - assert not data - data = dataFile.read(1048576) # Read first MB chunk - contentType = headers.get('Content-Type') - if contentType and contentType.split(';')[0] == 'text/html' and data.startswith('<?xml'): - # Internet Explorer 6 renders the page differently when they start with <?xml...>, so - # skip it. - i = data.find('\n') - if i > 0: - data = data[i + 1:] - else: - i = data.find('>') - if i > 0: - data = data[i + 1:] - dataSize -= i + 1 - # Compress data if possible and if data is not too big. - acceptEncoding = httpRequestHandler.httpRequest.headers.get('Accept-Encoding', '') - if 0 < dataSize < 1048576 and 'gzip' in acceptEncoding \ - and 'gzip;q=0' not in acceptEncoding: - # Since dataSize < 1 MB, the data is fully contained in string. - zbuf = cStringIO.StringIO() - zfile = gzip.GzipFile(mode = 'wb', fileobj = zbuf) - zfile.write(data) - zfile.close() - data = zbuf.getvalue() - dataSize = len(data) - headers['Content-Encoding'] = 'gzip' - headers['Content-Length'] = '%d' % dataSize - statusMessages = { - 200: 'OK', - 207: 'Multi-Status', - } - assert statusCode in statusMessages, 'Unknown status code %d.' % statusCode - if httpRequestHandler.httpAuthenticationLogoutTrick and statusCode == 200: - statusCode = 401 - statusMessage = 'Access Unauthorized' - headers['WWW-Authenticate'] = 'Basic realm="%s"' % httpRequestHandler.realm - else: - statusMessage = statusMessages[statusCode] - httpRequestHandler.send_response(statusCode, statusMessage) - for key, value in headers.items(): - httpRequestHandler.send_header(key, value) - httpRequestHandler.setCookie() - httpRequestHandler.end_headers() - if httpRequestHandler.httpRequest.method != 'HEAD' and dataSize > 0: - outputFile = httpRequestHandler.wfile - if data: - outputFile.write(data) - if dataFile is not None: - while True: - chunk = dataFile.read(1048576) # 1 MB chunk - if not chunk: - break - outputFile.write(chunk) - - def send401(self, httpRequestHandler): - logger.info(self.statusMessage) - data = '<html><body>%s</body></html>' % self.statusMessage - headers = {} - if httpRequestHandler.useHttpAuthentication == 'not this time': - del httpRequestHandler.useHttpAuthentication - if httpRequestHandler.useHttpAuthentication != True: - httpRequestHandler.useHttpAuthentication = True - elif httpRequestHandler.useHttpAuthentication: - headers["WWW-Authenticate"] = 'Basic realm="%s"' % httpRequestHandler.realm - return httpRequestHandler.send_error( - self.statusCode, self.statusMessage, data, headers, setCookie = True) - - def send404(self, httpRequestHandler): - logger.info(self.statusMessage) - data = '<html><body>%s</body></html>' % self.statusMessage - return httpRequestHandler.send_error( - self.statusCode, self.statusMessage, data, setCookie = True) - - -class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin): - canUseCookie = False - cookie = None - httpAuthenticationLogoutTrick = False - HttpResponse = HttpResponse # Class - socketCreationTime = None - protocol_version = 'HTTP/1.1' - realm = 'HttpRequestHandlerMixin Web Site' - server_version = 'HttpRequestHandlerMixin/1.0' - site = None # Class variable - testCookieSupport = False - useHttpAuthentication = True - - def createSession(self): - session = abstractweb.HttpRequestHandlerMixin.createSession(self) - if self.canUseCookie: - self.testCookieSupport = True - return session - - def handle(self): - """Handle multiple requests if necessary.""" - self.socketCreationTime = time.time() - try: - try: - self.close_connection = True - self.handle_one_request() - while not self.close_connection: - self.handle_one_request() - except socket.timeout: - pass - except KeyboardInterrupt: - raise - except SSL.ZeroReturnError: - pass - except SSL.Error, exception: - logger.debug('SSL error in handle. Error = %s, %s' % (exception, exception[0])) - raise # FIXME - if exception[0] == ('PEM routines', 'PEM_read_bio', 'no start line'): - pass - else: - self.outputUnknownException() - except: - self.outputUnknownException() - finally: - del self.socketCreationTime - - def handle_one_request(self): - """Handle a single HTTP request.""" - self.raw_requestline = self.rfile.readline() - if not self.raw_requestline: - self.close_connection = True - return - if not self.parse_request(): # An error code has been sent, just exit - return - logger.info(self.raw_requestline.strip()) - logger.debug(str(self.headers)) - - # The server isn't forked nor threaded, so we don't want to keep connections open, to avoid - # dead-locks which occur for example when the connection with the navigator to the identity - # provider is kept open, while a service provider sends a SOAP request to the identity - # provider. - # Remove this line for forked or threaded servers. - self.close_connection = True - - self.httpRequest = HttpRequest(self) - - # Retrieve the session and user, if possible. - - session = None - sessionToken = None - user = None - - # Handle X.509 certificate authentication. - if hasattr(self.connection, 'get_peer_certificate'): - clientCertificate = self.connection.get_peer_certificate() - if clientCertificate: - user = self.site.authenticateX509User(clientCertificate) - if user is None: - logger.info('Unknown certificate (serial number = %s)' - % clientCertificate.get_serial_number()) - else: - sessionToken = user.sessionToken - if sessionToken is not None: - session = self.site.sessions.get(sessionToken) - if session is None: - sessionToken = None - del user.sessionToken - else: - # For security reasons, we want to minimize the publication of - # session token (it is better not to store it in a cookie or in - # URLs). The client need to send the certificate each time, for the - # session to continue. - if session.publishToken: - del session.publishToken - - # Handle HTTP authentication. - authorization = self.httpRequest.headers.get('authorization') - if self.httpRequest.hasQueryField('login') and not authorization \ - and self.useHttpAuthentication: - # Ask for HTTP authentication. - return self.outputErrorUnauthorized(self.httpRequest.path) - if self.httpRequest.hasQueryField('logout') and authorization: - # Since HTTP authentication provides no way to logout, we send a status - # Unauthorized to force the user to press the cancel button. But instead of - # sending an error page immediately, we send the real page, so the user will see - # the page instead of an error message. - authorization = None - self.httpAuthenticationLogoutTrick = True - if authorization: - try: - authenticationScheme, credentials = authorization.split(None, 1) - except ValueError: - return self.outputErrorUnauthorized(self.httpRequest.path) - authenticationScheme = authenticationScheme.lower() - if authenticationScheme == 'basic': - loginAndPassword = base64.decodestring(credentials) - try: - login, password = loginAndPassword.split(':', 1) - except: - login = loginAndPassword - password = '' - logger.debug('Basic authentication: login = "%s" / password = "%s"' % ( - login, password)) - if password: - user = self.site.authenticateLoginPasswordUser(login, password) - if user is None: - logger.info('Unknown user (login = "%s" / password = "%s")' % ( - login, password)) - return self.outputErrorUnauthorized(self.httpRequest.path) - else: - sessionToken = user.sessionToken - if sessionToken is not None: - session = self.site.sessions.get(sessionToken) - if session is None: - sessionToken = None - del user.sessionToken - else: - # For security reasons, we want to minimize the publication of - # session token (it is better not to store it in a cookie or in - # URLs). The client need to send the certificate each time, for the - # session to continue. - if session.publishToken: - del session.publishToken - elif login: - # No password was given. Assume login contains a session token. - # TODO: sanity chek on login - sessionToken = login - session = self.site.sessions.get(sessionToken) - if session is not None and session.userId is not None: - user = self.site.users.get(session.userId) - if user is not None and user.sessionToken != session.token: - # Sanity check. - user.sessionToken = session.token - else: - logger.info('Unknown authentication scheme = %s' % authenticationScheme) - return self.outputErrorUnauthorized(self.httpRequest.path) - - # Handle use of cookies, session and user. - cookie = None - cookieContent = {} - if self.httpRequest.headers.has_key('Cookie'): - logger.debug('Cookie received:') - cookie = Cookie.SimpleCookie( - self.httpRequest.headers['Cookie']) - for k, v in cookie.items(): - cookieContent[k] = v.value - logger.debug(' %s = %s' % (k, cookieContent[k])) - self.cookie = cookie - - sessionToken = None - sessionTokenInCookie = False - if self.httpRequest.hasQueryField('sessionToken'): - sessionToken = self.httpRequest.getQueryField('sessionToken') - if not sessionToken: - sessionToken = None - if session is not None and sessionToken != session.token: - sessionToken = None - if cookieContent.has_key('sessionToken'): - cookieSessionToken = cookieContent['sessionToken'] - if cookieSessionToken: - if session is None or cookieSessionToken == session.token: - if sessionToken is None: - sessionToken = cookieSessionToken - if cookieSessionToken == sessionToken: - sessionTokenInCookie = True - canUseCookie = True - if session is None and sessionToken is not None: - session = self.site.sessions.get(sessionToken) - if session is None: - sessionToken = None - sessionTokenInCookie = False - else: - if user is None: - if session.userId is not None: - user = self.site.users.get(session.userId) - if user is not None and user.sessionToken != sessionToken: - # Sanity check. - user.sessionToken = sessionToken - else: - # The user has been authenticated (using HTTP or X.509 authentication), but the - # associated session didn't exist (or was too old, or...). So, update - # its sessionToken. - user.sessionToken = sessionToken - # For security reasons, we want to minimize the publication of session - # token (it is better not to store it in a cookie or in URLs). - if session.publishToken: - del session.publishToken - if session is None and user is not None: - # The user has been authenticated (using HTTP or X.509 authentication), but the session - # doesn't exist yet (or was too old, or...). Create a new session. - session = self.createSession() - # For security reasons, we want to minimize the publication of session - # token (it is better not to store it in a cookie or in URLs). - # session.publishToken = False # False is the default value. - session.userId = user.uniqueId - user.sessionToken = session.token - else: - self.session = session - if session is not None: - if not sessionTokenInCookie: - # The sessionToken is valid but is not stored in the cookie. So, don't try to - # use cookie. - canUseCookie = False - logger.debug('Session: %s' % session.simpleLabel) - self.canUseCookie = canUseCookie - self.user = user - if user is not None: - logger.debug('User: %s' % user.simpleLabel) - - # Now, the HTTP request handler has done everything it could done. Transfer the processing - # to the site. - - try: - self.site.handleHttpRequestHandler(self) - except IOError: - logger.exception('An exception occured:') - path = self.path.split('?')[0] - return self.outputErrorNotFound(path) - - def log_message(self, format, *arguments): - """Override BaseHTTPServer.HttpRequestHandler method to use logger. - - Do not use. Use logger instead. - """ - - logger.info('%s - - [%s] %s' % ( - self.address_string(), self.log_date_time_string(), format % arguments)) - -## def outputAlert(self, data, title = None, url = None): -## import html -## if title is None: -## title = N_('Alert') -## # FIXME: Handle XSLT template. -## if url: -## buttonsBar = html.div(class_ = 'buttons-bar') -## actionButtonsBar = html.span(class_ = 'action-buttons-bar') -## buttonsBar.append(actionButtonsBar) -## actionButtonsBar.append(html.a(_('OK'), class_ = 'button', href = url)) -## else: -## buttonsBar = None -## layout = html.html( -## html.head(html.title(_(title))), -## html.body( -## html.p(_(data), class_ = 'alert'), -## buttonsBar, -## ), -## ) -## self.outputData(layout.serialize(), contentLocation = None, mimeType = 'text/html') - -## def outputData(self, data, contentLocation = None, headers = None, mimeType = None, -## modificationTime = None, successCode = 200): -## # Session and user must be saved before responding. Otherwise, when the server is -## # multitasked or multithreaded, it may receive a new HTTP request before the session is -## # saved. -## if self.session is not None and self.session.isDirty: -## self.session.save() -## if self.user is not None and self.user.isDirty: -## self.user.save() - -## if isinstance(data, basestring): -## dataFile = None -## dataSize = len(data) -## else: -## dataFile = data -## data = '' -## if hasattr(dataFile, 'fileno'): -## dataSize = os.fstat(dataFile.fileno())[6] -## else: -## # For StringIO and cStringIO classes. -## dataSize = len(dataFile.getvalue()) - -## if headers is None: -## headers = {} -## if time.time() > self.socketCreationTime + 300: -## headers['Connection'] = 'close' -## elif not self.close_connection: -## headers['Connection'] = 'Keep-Alive' -## if contentLocation is not None: -## headers['Content-Location'] = contentLocation -## if mimeType: -## headers['Content-Type'] = '%s; charset=utf-8' % mimeType -## if modificationTime: -## headers['Last-Modified'] = time.strftime('%a, %d %b %Y %H:%M:%S GMT', modificationTime) -## # TODO: Could also output Content-MD5. -## ifModifiedSince = self.headers.get('If-Modified-Since') -## if modificationTime and ifModifiedSince: -## # We don't want to use bandwith if the file was not modified. -## try: -## ifModifiedSinceTime = time.strptime(ifModifiedSince[:25], '%a, %d %b %Y %H:%M:%S') -## if modificationTime[:8] <= ifModifiedSinceTime[:8]: -## self.send_response(304, 'Not Modified.') -## for key in ('Connection', 'Content-Location'): -## if key in headers: -## self.send_header(key, headers[key]) -## self.setCookie() -## self.end_headers() -## return -## except (ValueError, KeyError): -## pass -## if dataFile is not None: -## assert not data -## data = dataFile.read(1048576) # Read first MB chunk -## if mimeType == 'text/html' and data.startswith('<?xml'): -## # Internet Explorer 6 renders the page differently when they start with <?xml...>, so -## # skip it. -## i = data.find('\n') -## if i > 0: -## data = data[i + 1:] -## else: -## i = data.find('>') -## if i > 0: -## data = data[i + 1:] -## dataSize -= i + 1 -## # Compress data if possible and if data is not too big. -## acceptEncoding = self.headers.get('Accept-Encoding', '') -## if 0 < dataSize < 1048576 and 'gzip' in acceptEncoding \ -## and 'gzip;q=0' not in acceptEncoding: -## # Since dataSize < 1 MB, the data is fully contained in string. -## zbuf = cStringIO.StringIO() -## zfile = gzip.GzipFile(mode = 'wb', fileobj = zbuf) -## zfile.write(data) -## zfile.close() -## data = zbuf.getvalue() -## dataSize = len(data) -## headers['Content-Encoding'] = 'gzip' -## headers['Content-Length'] = '%d' % dataSize -## successMessages = { -## 200: 'OK', -## 207: 'Multi-Status', -## } -## assert successCode in successMessages, 'Unknown success code %d.' % successCode -## if self.httpAuthenticationLogoutTrick and successCode == 200: -## successCode = 401 -## successMessage = 'Access Unauthorized' -## headers['WWW-Authenticate'] = 'Basic realm="%s"' % self.realm -## else: -## successMessage = successMessages[successCode] -## self.send_response(successCode, successMessage) -## for key, value in headers.items(): -## self.send_header(key, value) -## self.setCookie() -## self.end_headers() -## if self.httpRequest.method != 'HEAD' and dataSize > 0: -## outputFile = self.wfile -## if data: -## outputFile.write(data) -## if dataFile is not None: -## while True: -## chunk = dataFile.read(1048576) # 1 MB chunk -## if not chunk: -## break -## outputFile.write(chunk) -## return - -## def outputErrorAccessForbidden(self, filePath): -## if filePath is None: -## message = 'Access Forbidden' -## else: -## message = 'Access to "%s" Forbidden.' % filePath -## logger.info(message) -## data = '<html><body>%s</body></html>' % message -## return self.send_error(403, message, data, setCookie = True) - -## def outputErrorBadRequest(self, reason): -## if reason: -## message = 'Bad Request: %s' % reason -## else: -## message = 'Bad Request' -## logger.info(message) -## data = '<html><body>%s</body></html>' % message -## return self.send_error(400, message, data) - - def outputErrorInternalServer(self): - message = 'Internal Server Error' - logger.info(message) - data = '<html><body>%s</body></html>' % message - return self.send_error(500, message, data) - -## def outputErrorMethodNotAllowed(self, reason): -## if reason: -## message = 'Method Not Allowed: %s' % reason -## else: -## message = 'Method Not Allowed' -## logger.info(message) -## data = '<html><body>%s</body></html>' % message -## # This error doesn't need a pretty interface. -## # FIXME: Add an 'Allow' header containing a list of valid methods for the requested -## # resource. -## return self.send_error(405, message, data) - -## def outputErrorNotFound(self, filePath): -## if filePath is None: -## message = 'Not Found' -## else: -## message = 'Path "%s" Not Found.' % filePath -## logger.info(message) -## data = '<html><body>%s</body></html>' % message -## return self.send_error(404, message, data, setCookie = True) - - def outputErrorUnauthorized(self, filePath): - if filePath is None: - message = 'Access Unauthorized' - else: - message = 'Access to "%s" Unauthorized.' % filePath - logger.info(message) - data = '<html><body>%s</body></html>' % message - headers = {} - if self.useHttpAuthentication: - headers["WWW-Authenticate"] = 'Basic realm="%s"' % self.realm - return self.send_error(401, message, data, headers, setCookie = True) - -## def outputInformationContinue(self): -## message = 'Continue' -## logger.debug(message) -## self.send_response(100, message) - -## def outputSuccessCreated(self, filePath): -## if filePath is None: -## message = 'Created' -## else: -## message = 'File "%s" Created.' % filePath -## logger.debug(message) -## data = '<html><body>%s</body></html>' % message -## self.send_response(201, message) -## if time.time() > self.socketCreationTime + 300: -## self.send_header('Connection', 'close') -## elif not self.close_connection: -## self.send_header('Connection', 'Keep-Alive') -## self.send_header('Content-Type', 'text/html; charset=utf-8') -## self.send_header('Content-Length', '%d' % len(data)) -## self.setCookie() -## self.end_headers() -## if self.httpRequest.method != 'HEAD': -## self.wfile.write(data) - -## def outputSuccessNoContent(self): -## message = 'No Content' -## logger.debug(message) -## self.send_response(204, message) -## if time.time() > self.socketCreationTime + 300: -## self.send_header('Connection', 'close') -## elif not self.close_connection: -## self.send_header('Connection', 'Keep-Alive') -## self.setCookie() -## self.end_headers() - - def outputUnknownException(self): - import traceback, cStringIO - f = cStringIO.StringIO() - traceback.print_exc(file = f) - exceptionTraceback = f.getvalue() - exceptionType, exception = sys.exc_info()[:2] - logger.debug("""\ -An exception "%(exception)s" of class "%(exceptionType)s" occurred. - -%(traceback)s -""" % { - 'exception': exception, - 'exceptionType': exceptionType, - 'traceback': exceptionTraceback, - }) - return self.outputErrorInternalServer() - - def respondRedirectTemporarily(self, url): - # Session and user must be saved before responding. Otherwise, when the server is - # multitasked or multithreaded, it may receive a new HTTP request before the session is - # saved. - if self.session is not None and self.session.isDirty: - self.session.save() - if self.user is not None and self.user.isDirty: - self.user.save() - - message = 'Moved Temporarily to "%s".' % url - logger.debug(message) - data = '<html><body>%s</body></html>' % message - self.send_response(302, message) - self.send_header('Location', url) - if time.time() > self.socketCreationTime + 300: - self.send_header('Connection', 'close') - elif not self.close_connection: - self.send_header('Connection', 'Keep-Alive') - self.send_header('Content-Type', 'text/html; charset=utf-8') - self.send_header('Content-Length', '%d' % len(data)) - self.setCookie() - self.end_headers() - if self.httpRequest.method != 'HEAD': - self.wfile.write(data) - - def send_error(self, code, message = None, data = None, headers = None, setCookie = False): - # Session and user must be saved before responding. Otherwise, when the server is - # multitasked or multithreaded, it may receive a new HTTP request before the session is - # saved. - if self.session is not None and self.session.isDirty: - self.session.save() - if self.user is not None and self.user.isDirty: - self.user.save() - - shortMessage, longMessage = self.responses.get(code, ('???', '???')) - if message is None: - message = shortMessage - if not data: - explain = longMessage - data = self.error_message_format % { - 'code': code, - 'message': message, - 'explain': longMessage, - } - self.send_response(code, message) - self.send_header('Content-Type', 'text/html; charset=utf-8') - self.send_header('Content-Length', '%d' % len(data)) - self.send_header('Connection', 'close') - if headers is not None: - for name, value in headers.items(): - self.send_header(name, value) - if setCookie: - self.setCookie() - self.end_headers() - if self.httpRequest.method != 'HEAD' and code >= 200 and code not in (204, 304): - self.wfile.write(data) - - def setCookie(self): - if not self.canUseCookie: - return - oldCookie = self.cookie - cookie = Cookie.SimpleCookie() - cookieContent = {} - session = self.session - if session is not None and session.publishToken: - cookieContent['sessionToken'] = session.token - for key, value in cookieContent.items(): - cookie[key] = value - cookie[key]['path'] = '/' - if not cookieContent: - if oldCookie: - for key, morsel in oldCookie.items(): - cookie[key] = '' - cookie[key]['max-age'] = 0 - cookie[key]['path'] = '/' - else: - cookie = None - if cookie is not None: - # Is new cookie different from previous one? - sameCookie = False - if oldCookie is not None and cookie.keys() == oldCookie.keys(): - for key, morsel in cookie.items(): - oldMorsel = oldCookie[key] - if morsel.value != oldMorsel.value: - break - else: - sameCookie = True - if not sameCookie: - for morsel in cookie.values(): - self.send_header( - 'Set-Cookie', morsel.output(header = '')[1:]) - self.cookie = cookie - - -class HttpRequestHandler(HttpRequestHandlerMixin, BaseHTTPServer.BaseHTTPRequestHandler): - scheme = 'http' - - -class HttpsConnection(httplib.HTTPConnection): - certificateFile = None - default_port = httplib.HTTPS_PORT - peerCaCertificateFile = None - privateKeyFile = None - - def __init__(self, host, port = None, privateKeyFile = None, certificateFile = None, - peerCaCertificateFile = None, strict = None): - httplib.HTTPConnection.__init__(self, host, port, strict) - self.privateKeyFile = privateKeyFile - self.certificateFile = certificateFile - self.peerCaCertificateFile = peerCaCertificateFile - - def connect(self): - """Connect to a host on a given (SSL) port.""" - - context = SSL.Context(SSL.SSLv23_METHOD) - # Demand a certificate. - context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, self.verifyCallback) - if self.privateKeyFile: - context.use_privatekey_file(self.privateKeyFile) - if self.certificateFile: - context.use_certificate_file(self.certificateFile) - if self.peerCaCertificateFile: - context.load_verify_locations(self.peerCaCertificateFile) - - # Strange hack, that is derivated from httplib.HTTPSConnection, but that I (Emmanuel) don't - # really understand... - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sslSocket = SSL.Connection(context, sock) - sslSocket.connect((self.host, self.port)) - self.sock = httplib.FakeSocket(sslSocket, sslSocket) - - def verifyCallback(self, connection, x509Object, errorNumber, errorDepth, returnCode): - logger.debug('http.HttpsConnection(%s, %s, %s, %s, %s, %s)' % ( - self, connection, x509Object, errorNumber, errorDepth, returnCode)) - # FIXME: What should be done? - return returnCode - - -class HttpsRequestHandler(HttpRequestHandlerMixin, BaseHTTPSRequestHandler): - scheme = 'https' - - -## # We use ForkingMixIn instead of ThreadingMixIn because the Python binding for -## # libxml2 limits the number of registered xpath functions to 10. Even if we use -## # only one xpathContext, this would limit the number of threads to 10, wich is -## # not enough for a web server. - - -## class HttpServer(SocketServer.ForkingMixIn, BaseHTTPServer.HTTPServer): -## pass - - -## class HttpsServer(SocketServer.ForkingMixIn, BaseHTTPSServer): -## pass - - -# No fork nor thread. - -class HttpServer(BaseHTTPServer.HTTPServer): - pass - - -class HttpsServer(BaseHTTPSServer): - pass - diff --git a/python/tests/liberty.py b/python/tests/liberty.py deleted file mode 100644 index 02125d39..00000000 --- a/python/tests/liberty.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -from LibertyEnabledClientProxy import LibertyEnabledClientProxyMixin -from LibertyEnabledProxy import LibertyEnabledProxyMixin -from IdentityProvider import IdentityProviderMixin -from ServiceProvider import ServiceProviderMixin -from Provider import ProviderMixin -import web - - -class LibertyEnabledClientProxy(LibertyEnabledClientProxyMixin, web.WebClient): - def __init__(self): - web.WebClient.__init__(self) - LibertyEnabledClientProxyMixin.__init__(self) - - -class LibertyEnabledProxy(LibertyEnabledProxyMixin, web.WebSite): - def __init__(self, url): - web.WebSite.__init__(self, url) - LibertyEnabledProxyMixin.__init__(self) - - -class Provider(ProviderMixin, web.WebSite): - def __init__(self, url): - web.WebSite.__init__(self, url) - ProviderMixin.__init__(self) - - -class IdentityProvider(IdentityProviderMixin, Provider): - def __init__(self, url): - Provider.__init__(self, url) - IdentityProviderMixin.__init__(self) - - -class ServiceProvider(ServiceProviderMixin, Provider): - def __init__(self, url): - Provider.__init__(self, url) - ServiceProviderMixin.__init__(self) diff --git a/python/tests/libertysimulator.py b/python/tests/libertysimulator.py deleted file mode 100644 index a7751cdf..00000000 --- a/python/tests/libertysimulator.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -from LibertyEnabledClientProxy import LibertyEnabledClientProxyMixin -from LibertyEnabledProxy import LibertyEnabledProxyMixin -from IdentityProvider import IdentityProviderMixin -from ServiceProvider import ServiceProviderMixin -from Provider import ProviderMixin -import websimulator - - -class LibertyEnabledClientProxy(LibertyEnabledClientProxyMixin, websimulator.WebClient): - def __init__(self, internet): - websimulator.WebClient.__init__(self, internet) - LibertyEnabledClientProxyMixin.__init__(self) - - -class LibertyEnabledProxy(LibertyEnabledProxyMixin, websimulator.WebSite): - def __init__(self, internet, url): - websimulator.WebSite.__init__(self, internet, url) - LibertyEnabledProxyMixin.__init__(self) - - -class Provider(ProviderMixin, websimulator.WebSite): - def __init__(self, internet, url): - websimulator.WebSite.__init__(self, internet, url) - ProviderMixin.__init__(self) - - -class IdentityProvider(IdentityProviderMixin, Provider): - def __init__(self, internet, url): - Provider.__init__(self, internet, url) - IdentityProviderMixin.__init__(self) - - -class ServiceProvider(ServiceProviderMixin, Provider): - def __init__(self, internet, url): - Provider.__init__(self, internet, url) - ServiceProviderMixin.__init__(self) diff --git a/python/tests/login_tests.py b/python/tests/login_tests.py index b16389dc..6c93fb99 100644 --- a/python/tests/login_tests.py +++ b/python/tests/login_tests.py @@ -34,50 +34,14 @@ if not '../.libs' in sys.path: import lasso -import builtins -from libertysimulator import * -from websimulator import * - class LoginTestCase(unittest.TestCase): - def generateIdpSite(self, internet): - site = IdentityProvider(internet, 'https://idp1') - site.providerId = 'https://idp1/metadata' + pass - lassoServer = lasso.Server( - '../../tests/data/idp1-la/metadata.xml', - None, # '../../tests/data/idp1-la/public-key.pem' is no more used - '../../tests/data/idp1-la/private-key-raw.pem', - '../../tests/data/idp1-la/certificate.pem', - lasso.signatureMethodRsaSha1) - lassoServer.add_provider( - '../../tests/data/sp1-la/metadata.xml', - '../../tests/data/sp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - site.lassoServerDump = lassoServer.dump() - failUnless(site.lassoServerDump) - - site.newUser('Chantereau') - site.newUser('Clapies') - site.newUser('Febvre') - site.newUser('Nowicki') - # Frederic Peters has no account on identity provider. - return site - def generateLibertyEnabledClientProxy(self, internet): - clientProxy = LibertyEnabledClientProxy(internet) - lassoServer = lasso.Server() - lassoServer.add_provider( - '../../tests/data/idp1-la/metadata.xml', - '../../tests/data/idp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - clientProxy.lassoServerDump = lassoServer.dump() - failUnless(clientProxy.lassoServerDump) - return clientProxy - - def generateSpSite(self, internet): - site = ServiceProvider(internet, 'https://sp1') - site.providerId = 'https://service-provider/metadata' +class LogoutTestCase(unittest.TestCase): + def test01(self): + """SP logout without session and indentity; testing init_request.""" lassoServer = lasso.Server( '../../tests/data/sp1-la/metadata.xml', @@ -89,203 +53,36 @@ class LoginTestCase(unittest.TestCase): '../../tests/data/idp1-la/metadata.xml', '../../tests/data/idp1-la/public-key.pem', '../../tests/data/ca1-la/certificate.pem') - site.lassoServerDump = lassoServer.dump() - failUnless(site.lassoServerDump) - - site.newUser('Nicolas') - site.newUser('Romain') - site.newUser('Valery') - # Christophe Nowicki has no account on service provider. - site.newUser('Frederic') - return site - - def setUp(self): - for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless', - 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'): - builtins.set(name, getattr(self, name)) - - def tearDown(self): - for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless', - 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'): - builtins.delete(name) - - def test01(self): - """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Romain Chantereau') - principal.keyring[idpSite.url] = 'Chantereau' - principal.keyring[spSite.url] = 'Romain' - - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 200) - failIf(spSite.sessions) - failIf(idpSite.sessions) - - def test01_withRelayState(self): - """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP. Checking RelayState.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Romain Chantereau') - principal.keyring[idpSite.url] = 'Chantereau' - principal.keyring[spSite.url] = 'Romain' - - httpResponse = principal.sendHttpRequestToSite( - spSite, 'GET', '/login?RelayState=a_sample_relay_state') - failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 200) - failIf(spSite.sessions) - failIf(idpSite.sessions) + logout = lasso.Logout(lassoServer, lasso.providerTypeSp) + try: + logout.init_request() + except lasso.Error, error: + if error.code != -1: + raise + else: + self.fail('logout.init_request without having set identity before should fail') def test02(self): - """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP. Done three times.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Romain Chantereau') - principal.keyring[idpSite.url] = 'Chantereau' - principal.keyring[spSite.url] = 'Romain' - - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 200) - - # Once again. Now the principal already has a federation between spSite and idpSite. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 200) + """IDP logout without session and identity; testing logout.get_next_providerID.""" - # Once again. Do a new passive login between normal login and logout. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - failUnlessEqual(httpResponse.statusCode, 200) - del principal.keyring[idpSite.url] # Ensure identity provider will be really passive. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') - failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 200) - - # Once again, with isPassive and the user having no web session. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') - failUnlessEqual(httpResponse.statusCode, 401) - - def test03(self): - """Service provider initiated login using HTTP redirect, but user fail to authenticate himself on identity provider. Then logout, with same problem.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Frederic Peters') - # Frederic Peters has no account on identity provider. - principal.keyring[spSite.url] = 'Frederic' - - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 401) - - def test04(self): - """Service provider initiated login using HTTP redirect, but user has no account on service - provider and doesn't create one.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Christophe Nowicki') - principal.keyring[idpSite.url] = 'Nowicki' - # Christophe Nowicki has no account on service provider. - - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 401) - - def test05(self): - """Service provider initiated login using HTTP redirect with isPassive for a user without federation yet.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Romain Chantereau') - principal.keyring[idpSite.url] = 'Chantereau' - principal.keyring[spSite.url] = 'Romain' - - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') - failUnlessEqual(httpResponse.statusCode, 401) - - def test06(self): - """Testing forceAuthn flag.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Romain Chantereau') - principal.keyring[idpSite.url] = 'Chantereau' - principal.keyring[spSite.url] = 'Romain' - - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 200) - - # Ask user to reauthenticate while he is already logged. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - failUnlessEqual(httpResponse.statusCode, 200) - del principal.keyring[idpSite.url] # Ensure user can't authenticate. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 200) - - # Force authentication, but user won't authenticate. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') - failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout') - failUnlessEqual(httpResponse.statusCode, 401) - - def test07(self): - """LECP login.""" - - internet = Internet() - idpSite = self.generateIdpSite(internet) - spSite = self.generateSpSite(internet) - spSite.idpSite = idpSite - principal = Principal(internet, 'Romain Chantereau') - principal.keyring[idpSite.url] = 'Chantereau' - principal.keyring[spSite.url] = 'Romain' - lecp = self.generateLibertyEnabledClientProxy(internet) - lecp.idpSite = idpSite - - # Try LECP, but the principal is not authenticated on idp1. So, LECP must fail. - httpResponse = lecp.login(principal, spSite, '/login') - failUnlessEqual(httpResponse.statusCode, 401) - - # Now authenticate principal, before testing LECP. So, LECP must succeed. - httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') - failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = lecp.login(principal, spSite, '/login') - failUnlessEqual(httpResponse.statusCode, 200) + lassoServer = lasso.Server( + '../../tests/data/idp1-la/metadata.xml', + None, # '../../tests/data/idp1-la/public-key.pem' is no more used + '../../tests/data/idp1-la/private-key-raw.pem', + '../../tests/data/idp1-la/certificate.pem', + lasso.signatureMethodRsaSha1) + lassoServer.add_provider( + '../../tests/data/sp1-la/metadata.xml', + '../../tests/data/sp1-la/public-key.pem', + '../../tests/data/ca1-la/certificate.pem') + logout = lasso.Logout(lassoServer, lasso.providerTypeIdp) + self.failIf(logout.get_next_providerID()) suite1 = unittest.makeSuite(LoginTestCase, 'test') +suite2 = unittest.makeSuite(LogoutTestCase, 'test') -allTests = unittest.TestSuite((suite1,)) +allTests = unittest.TestSuite((suite1, suite2)) if __name__ == '__main__': sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful()) diff --git a/python/tests/sample-idp.py b/python/tests/sample-idp.py deleted file mode 100755 index 046aa436..00000000 --- a/python/tests/sample-idp.py +++ /dev/null @@ -1,150 +0,0 @@ -#! /usr/bin/env python -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import logging -from optparse import OptionParser -import sys - -if not '..' in sys.path: - sys.path.insert(0, '..') -if not '../.libs' in sys.path: - sys.path.insert(0, '../.libs') - -import lasso - -import assertions -import builtins -import http -import liberty - - -applicationCamelCaseName = 'LassoSimulator' -applicationPublicName = 'Lasso Simulator' -applicationVersion = '(Unreleased CVS Version)' -logger = None - - -class HttpRequestHandlerMixin: - realm = '%s Web Site' % applicationPublicName - server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion) - - def version_string(self): - return '%s %s' % (applicationPublicName, applicationVersion) - - -class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler): - pass - - -class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler): - pass - - -def main(): - # Parse command line options. - parser = OptionParser(version = '%%prog %s' % applicationVersion) - parser.add_option( - '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath', - help = 'specify an alternate configuration file', - default = '/etc/lasso-simulator/config.xml') - parser.add_option( - '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background', - action = 'store_true', default = False) - parser.add_option( - '-D', '--debug', dest = 'debugMode', help = 'enable program debugging', - action = 'store_true', default = False) - parser.add_option( - '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file', - default = '/dev/null') - parser.add_option( - '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel', - help = 'specify log level (debug, info, warning, error, critical)', default = 'info') - (options, args) = parser.parse_args() - if options.logLevel.upper() not in logging._levelNames: - raise Exception('Unknown log level: "%s"' % options.logLevel) - - # Configure logger. - logger = logging.getLogger() - if options.debugMode and not options.daemonMode: - handler = logging.StreamHandler(sys.stderr) - else: - handler = logging.FileHandler(options.logFilePath) - formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging._levelNames[options.logLevel.upper()]) - builtins.set('logger', logger) - - site = liberty.IdentityProvider('https://idp1:1998/') - site.providerId = 'https://idp1/metadata' - - lassoServer = lasso.Server.new( - '../../tests/data/idp1-la/metadata.xml', - None, # '../../tests/data/idp1-la/public-key.pem' is no more used - '../../tests/data/idp1-la/private-key-raw.pem', - '../../tests/data/idp1-la/certificate.pem', - lasso.signatureMethodRsaSha1) - lassoServer.add_provider( - '../../tests/data/sp1-la/metadata.xml', - '../../tests/data/sp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - lassoServer.add_provider( - '../../tests/data/lecp1-la/metadata.xml', - '../../tests/data/lecp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - site.lassoServerDump = lassoServer.dump() - failUnless(site.lassoServerDump) - lassoServer.destroy() - - site.certificateAbsolutePath = '../../tests/data/idp1-ssl/certificate.pem' - site.privateKeyAbsolutePath = '../../tests/data/idp1-ssl/private-key-raw.pem' - site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem' - - site.newUser('Chantereau') - site.newUser('Clapies') - site.newUser('Febvre') - site.newUser('Nowicki') - # Frederic Peters has no account on identity provider. - - HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host. -## httpServer = http.HttpServer(('idp1', 1997), HttpRequestHandler) -## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname()) - httpServer = http.HttpsServer( - ('idp1', 1998), - HttpsRequestHandler, - site.privateKeyAbsolutePath, # Server private key - site.certificateAbsolutePath, # Server certificate - site.peerCaCertificateAbsolutePath, # Clients certification authority certificate - None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852 - None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13 - ) - logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname()) - try: - httpServer.serve_forever() - except KeyboardInterrupt: - pass - -if __name__ == '__main__': - main() diff --git a/python/tests/sample-lep.py b/python/tests/sample-lep.py deleted file mode 100755 index 89b8df27..00000000 --- a/python/tests/sample-lep.py +++ /dev/null @@ -1,152 +0,0 @@ -#! /usr/bin/env python -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import logging -from optparse import OptionParser -import sys - -if not '..' in sys.path: - sys.path.insert(0, '..') -if not '../.libs' in sys.path: - sys.path.insert(0, '../.libs') - -import lasso - -import assertions -import builtins -import http -import liberty - - -applicationCamelCaseName = 'LassoSimulator' -applicationPublicName = 'Lasso Simulator' -applicationVersion = '(Unreleased CVS Version)' -logger = None - - -class HttpRequestHandlerMixin: - realm = '%s Web Site' % applicationPublicName - server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion) - - def version_string(self): - return '%s %s' % (applicationPublicName, applicationVersion) - - -class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler): - pass - - -class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler): - pass - - -def main(): - # Parse command line options. - parser = OptionParser(version = '%%prog %s' % applicationVersion) - parser.add_option( - '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath', - help = 'specify an alternate configuration file', - default = '/etc/lasso-simulator/config.xml') - parser.add_option( - '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background', - action = 'store_true', default = False) - parser.add_option( - '-D', '--debug', dest = 'debugMode', help = 'enable program debugging', - action = 'store_true', default = False) - parser.add_option( - '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file', - default = '/dev/null') - parser.add_option( - '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel', - help = 'specify log level (debug, info, warning, error, critical)', default = 'info') - (options, args) = parser.parse_args() - if options.logLevel.upper() not in logging._levelNames: - raise Exception('Unknown log level: "%s"' % options.logLevel) - - # Configure logger. - logger = logging.getLogger() - if options.debugMode and not options.daemonMode: - handler = logging.StreamHandler(sys.stderr) - else: - handler = logging.FileHandler(options.logFilePath) - formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging._levelNames[options.logLevel.upper()]) - builtins.set('logger', logger) - - site = liberty.LibertyEnabledProxy('https://lecp1:2014/') - site.providerId = 'https://lecp1/metadata' - site.idpSite = liberty.IdentityProvider('https://idp1:1998/') - site.idpSite.providerId = 'https://idp1/metadata' - - lassoServer = lasso.Server.new( - '../../tests/data/lecp1-la/metadata.xml', - None, # '../../tests/data/lecp1-la/public-key.pem' is no more used - '../../tests/data/lecp1-la/private-key-raw.pem', - '../../tests/data/lecp1-la/certificate.pem', - lasso.signatureMethodRsaSha1) - lassoServer.add_provider( - '../../tests/data/idp1-la/metadata.xml', - '../../tests/data/idp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - lassoServer.add_provider( - '../../tests/data/sp1-la/metadata.xml', - '../../tests/data/sp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - site.lassoServerDump = lassoServer.dump() - failUnless(site.lassoServerDump) - lassoServer.destroy() - - site.certificateAbsolutePath = '../../tests/data/lecp1-ssl/certificate.pem' - site.privateKeyAbsolutePath = '../../tests/data/lecp1-ssl/private-key-raw.pem' - site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem' - - site.newUser('rc') - site.newUser('nc') - # site.newUser('vf') Valery Febvre has no account on liberty-enabled proxy. - site.newUser('cn') - site.newUser('fp') - - HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host. -## httpServer = http.HttpServer(('lecp1', 2013), HttpRequestHandler) -## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname()) - httpServer = http.HttpsServer( - ('lecp1', 2014), - HttpsRequestHandler, - site.privateKeyAbsolutePath, # Server private key - site.certificateAbsolutePath, # Server certificate - site.peerCaCertificateAbsolutePath, # Clients certification authority certificate - None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852 - None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13 - ) - logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname()) - try: - httpServer.serve_forever() - except KeyboardInterrupt: - pass - -if __name__ == '__main__': - main() diff --git a/python/tests/sample-sp-lep.py b/python/tests/sample-sp-lep.py deleted file mode 100755 index eabef144..00000000 --- a/python/tests/sample-sp-lep.py +++ /dev/null @@ -1,147 +0,0 @@ -#! /usr/bin/env python -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import logging -from optparse import OptionParser -import sys - -if not '..' in sys.path: - sys.path.insert(0, '..') -if not '../.libs' in sys.path: - sys.path.insert(0, '../.libs') - -import lasso - -import assertions -import builtins -import http -import liberty - -applicationCamelCaseName = 'LassoSimulator' -applicationPublicName = 'Lasso Simulator' -applicationVersion = '(Unreleased CVS Version)' -logger = None - - -class HttpRequestHandlerMixin: - realm = '%s Web Site' % applicationPublicName - server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion) - - def version_string(self): - return '%s %s' % (applicationPublicName, applicationVersion) - - -class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler): - pass - - -class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler): - pass - - -def main(): - # Parse command line options. - parser = OptionParser(version = '%%prog %s' % applicationVersion) - parser.add_option( - '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath', - help = 'specify an alternate configuration file', - default = '/etc/lasso-simulator/config.xml') - parser.add_option( - '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background', - action = 'store_true', default = False) - parser.add_option( - '-D', '--debug', dest = 'debugMode', help = 'enable program debugging', - action = 'store_true', default = False) - parser.add_option( - '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file', - default = '/dev/null') - parser.add_option( - '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel', - help = 'specify log level (debug, info, warning, error, critical)', default = 'info') - (options, args) = parser.parse_args() - if options.logLevel.upper() not in logging._levelNames: - raise Exception('Unknown log level: "%s"' % options.logLevel) - - # Configure logger. - logger = logging.getLogger() - if options.debugMode and not options.daemonMode: - handler = logging.StreamHandler(sys.stderr) - else: - handler = logging.FileHandler(options.logFilePath) - formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging._levelNames[options.logLevel.upper()]) - builtins.set('logger', logger) - - site = liberty.ServiceProvider('https://sp1:2006/') - site.providerId = 'https://sp1/metadata' - site.idpSite = liberty.IdentityProvider('https://lecp1:2014/') - site.idpSite.providerId = 'https://lecp1/metadata' - - lassoServer = lasso.Server.new( - '../../tests/data/sp1-la/metadata.xml', - None, # '../../tests/data/sp1-la/public-key.pem' is no more used - '../../tests/data/sp1-la/private-key-raw.pem', - '../../tests/data/sp1-la/certificate.pem', - lasso.signatureMethodRsaSha1) - lassoServer.add_provider( - '../../tests/data/lecp1-la/metadata.xml', - '../../tests/data/lecp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - site.lassoServerDump = lassoServer.dump() - failUnless(site.lassoServerDump) - lassoServer.destroy() - - site.certificateAbsolutePath = '../../tests/data/sp1-ssl/certificate.pem' - site.privateKeyAbsolutePath = '../../tests/data/sp1-ssl/private-key-raw.pem' - site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem' - - site.newUser('Nicolas') - site.newUser('Romain') - site.newUser('Valery') - # Christophe Nowicki has no account on service provider. - site.newUser('Frederic') - - HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host. -## httpServer = http.HttpServer(('sp1', 2005), HttpRequestHandler) -## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname()) - httpServer = http.HttpsServer( - ('sp1', 2006), - HttpsRequestHandler, - site.privateKeyAbsolutePath, # Server private key - site.certificateAbsolutePath, # Server certificate - site.peerCaCertificateAbsolutePath, # Clients certification authority certificate - None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852 - None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13 - ) - logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname()) - try: - httpServer.serve_forever() - except KeyboardInterrupt: - pass - -if __name__ == '__main__': - main() diff --git a/python/tests/sample-sp.py b/python/tests/sample-sp.py deleted file mode 100755 index 8e9c0329..00000000 --- a/python/tests/sample-sp.py +++ /dev/null @@ -1,147 +0,0 @@ -#! /usr/bin/env python -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import logging -from optparse import OptionParser -import sys - -if not '..' in sys.path: - sys.path.insert(0, '..') -if not '../.libs' in sys.path: - sys.path.insert(0, '../.libs') - -import lasso - -import assertions -import builtins -import http -import liberty - -applicationCamelCaseName = 'LassoSimulator' -applicationPublicName = 'Lasso Simulator' -applicationVersion = '(Unreleased CVS Version)' -logger = None - - -class HttpRequestHandlerMixin: - realm = '%s Web Site' % applicationPublicName - server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion) - - def version_string(self): - return '%s %s' % (applicationPublicName, applicationVersion) - - -class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler): - pass - - -class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler): - pass - - -def main(): - # Parse command line options. - parser = OptionParser(version = '%%prog %s' % applicationVersion) - parser.add_option( - '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath', - help = 'specify an alternate configuration file', - default = '/etc/lasso-simulator/config.xml') - parser.add_option( - '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background', - action = 'store_true', default = False) - parser.add_option( - '-D', '--debug', dest = 'debugMode', help = 'enable program debugging', - action = 'store_true', default = False) - parser.add_option( - '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file', - default = '/dev/null') - parser.add_option( - '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel', - help = 'specify log level (debug, info, warning, error, critical)', default = 'info') - (options, args) = parser.parse_args() - if options.logLevel.upper() not in logging._levelNames: - raise Exception('Unknown log level: "%s"' % options.logLevel) - - # Configure logger. - logger = logging.getLogger() - if options.debugMode and not options.daemonMode: - handler = logging.StreamHandler(sys.stderr) - else: - handler = logging.FileHandler(options.logFilePath) - formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging._levelNames[options.logLevel.upper()]) - builtins.set('logger', logger) - - site = liberty.ServiceProvider('https://sp1:2006/') - site.providerId = 'https://sp1/metadata' - site.idpSite = liberty.IdentityProvider('https://idp1:1998/') - site.idpSite.providerId = 'https://idp1/metadata' - - lassoServer = lasso.Server.new( - '../../tests/data/sp1-la/metadata.xml', - None, # '../../tests/data/sp1-la/public-key.pem' is no more used - '../../tests/data/sp1-la/private-key-raw.pem', - '../../tests/data/sp1-la/certificate.pem', - lasso.signatureMethodRsaSha1) - lassoServer.add_provider( - '../../tests/data/idp1-la/metadata.xml', - '../../tests/data/idp1-la/public-key.pem', - '../../tests/data/ca1-la/certificate.pem') - site.lassoServerDump = lassoServer.dump() - failUnless(site.lassoServerDump) - lassoServer.destroy() - - site.certificateAbsolutePath = '../../tests/data/sp1-ssl/certificate.pem' - site.privateKeyAbsolutePath = '../../tests/data/sp1-ssl/private-key-raw.pem' - site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem' - - site.newUser('Nicolas') - site.newUser('Romain') - site.newUser('Valery') - # Christophe Nowicki has no account on service provider. - site.newUser('Frederic') - - HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host. -## httpServer = http.HttpServer(('sp1', 2005), HttpRequestHandler) -## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname()) - httpServer = http.HttpsServer( - ('sp1', 2006), - HttpsRequestHandler, - site.privateKeyAbsolutePath, # Server private key - site.certificateAbsolutePath, # Server certificate - site.peerCaCertificateAbsolutePath, # Clients certification authority certificate - None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852 - None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13 - ) - logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname()) - try: - httpServer.serve_forever() - except KeyboardInterrupt: - pass - -if __name__ == '__main__': - main() diff --git a/python/tests/submissions.py b/python/tests/submissions.py deleted file mode 100644 index a9f9504c..00000000 --- a/python/tests/submissions.py +++ /dev/null @@ -1,292 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# HTTP Client and Server Enhanced Classes -# By: Frederic Peters <fpeters@entrouvert.com> -# Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://www.entrouvert.org -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - - -"""Wrapper for HTML form submissions, simulating Web Forms 2 behaviour - -See http://whatwg.org/specs/web-forms/2004-06-27-call-for-comments/#x-www-form-xml -""" - - -import cgi - - -class AbstractSubmission(object): - httpRequestHandler = None - length = None - mimeType = None - - def __init__(self, httpRequestHandler, contentLength): - assert httpRequestHandler - self.httpRequestHandler = httpRequestHandler - assert isinstance(contentLength, int) - self.length = contentLength - - def getField(self, name, index = 0, default = None): - # Return either a string or a sequence of strings. - fieldList = self.getFieldList(name, index) - if not fieldList: - return default - elif len(fieldList) == 1: - return fieldList[0] - else: - return fieldList - - def getFieldList(self, name, index = 0): - # Return a sequence of strings. - raise NotImplementedError - - def getFile(self, name, index = 0, default = None): - # Return either an instance of FileUpload or a sequence of FileUpload instances. - fileList = self.getFileList(name, index) - if not fileList: - return default - elif len(fileList) == 1: - return fileList[0] - else: - return fileList - - def getFileList(self, name, index = 0): - # Return a sequence of FileUpload instances. - raise NotImplementedError - -## def getRepeat(self, template): -## raise NotImplementedError - - def hasField(self, name, index = 0): - raise NotImplementedError - - def hasFile(self, name, index = 0): - raise NotImplementedError - - def readFile(self): - raise NotImplementedError - - -class FakeSubmission(AbstractSubmission): - _fields = None - - def __init__(self, fields = None, query = None): - self._fields = {} - if fields: - for name, value in fields.items(): - self._fields[name] = [value] - if query: - for name, value in cgi.parse_qsl(query, keep_blank_values = True): - if name in self._fields: - self._fields[name].append(value) - else: - self._fields[name] = [value] - - def getFieldList(self, name, index = 0): - if index == 0 and name in self._fields: - return self._fields[name] - return [] - - def getFileList(self, name, index = 0): - return [] - - def hasField(self, name, index = 0): - return index == 0 and name in self._fields - - def hasFile(self, name, index = 0): - return False - - def readFile(self): - return None - - -class FieldStorageSubmission(AbstractSubmission): - """Submission wrapper for all encoding types handled by module 'cgi': - 'application/x-www-form-urlencoded', 'multipart/form-data'... - - This submission method discards the control index and repetition block parts of the form data - set. So, for these encoding types, control index is always 0. - """ - - fieldStorage = None - - def __init__(self, httpRequestHandler, contentType, contentLength, contentTypeHeader): - super(FieldStorageSubmission, self).__init__(httpRequestHandler, contentLength) - assert contentType - self.mimeType = contentType - # The use of environ seems to be required by cgi.FieldStorage. - # It also needs to add "content-type" in headers. - fakeHeaders = {} - for key, value in httpRequestHandler.headers.items(): - fakeHeaders[key] = value - environ = { - "CONTENT_TYPE": contentTypeHeader, - "REQUEST_METHOD": httpRequestHandler.command, - } - if not "content-type" in fakeHeaders: - fakeHeaders["content-type"] = environ["CONTENT_TYPE"] - if contentLength: - environ["CONTENT_LENGTH"] = str(contentLength) - splitedPath = httpRequestHandler.path.split("?") - if len(splitedPath) >= 2: - httpQuery = splitedPath[1] - if httpQuery: - environ["QUERY_STRING"] = httpQuery - self.fieldStorage = cgi.FieldStorage( - environ = environ, - fp = httpRequestHandler.rfile, - headers = fakeHeaders, - keep_blank_values = True) - - def getFieldList(self, name, index = 0): - if index > 0: - return [] - return [item.value - for item in self.fieldStorage.list - if item.name == name and item.filename is None] - - def getFileList(self, name, index = 0): - if index > 0: - return [] - return [FileUpload(item.filename, item.type, item.file) - for item in self.fieldStorage.list - if item.name == name and item.filename is not None] - - def hasField(self, name, index = 0): - if index == 0: - for item in self.fieldStorage.list: - if item.name == name and item.filename is None: - return True - return False - - def hasFile(self, name, index = 0): - if index == 0: - for item in self.fieldStorage.list: - if item.name == name and item.filename is not None: - return True - return False - - def readFile(self): - return None - - -class FileUpload(object): - file = None - filename = None # Optional - mimeType = None # Optional: MIME type with optional parameters. - - def __init__(self, filename, mimeType, file): - if filename is not None: - self.filename = filename - if mimeType is not None: - self.mimeType = mimeType - assert file is not None - self.file = file - - -class FileUploadSubmission(AbstractSubmission): - """Submission for exactly one file - - If the enctype attribute is not specified in the form (or is set to the empty string), and the - form consists of exactly one file upload control with exactly one file selected, then the user - agent use this submission method. - Also used for HTTP PUT... - - Note: FileUploadSubmission contains all the FileUpload interface, so that it can be used as a - FileUpload. - """ - - file = None - filename = None # Always None - - def __init__(self, httpRequestHandler, contentType, contentLength): - super(FileUploadSubmission, self).__init__(httpRequestHandler, contentLength) - assert contentType - self.mimeType = contentType - self.file = httpRequestHandler.rfile - - def getFieldList(self, name, index = 0): - return [] - - def getFileList(self, name, index = 0): - return [] - - def hasField(self, name, index = 0): - return False - - def hasFile(self, name, index = 0): - return False - - def readFile(self): - if self.length == 0: - return None - return self.file.read(self.length) - - -class XmlFormSubmission(AbstractSubmission): - """Submission for encoding type 'application/x-www-form+xml'""" - - file = None - mimeType = "application/x-www-form+xml" - - def __init__(self, httpRequestHandler, contentType, contentLength): - super(XmlFormSubmission, self).__init__(httpRequestHandler, contentLength) - assert contentType == self.mimeType - self.file = httpRequestHandler.rfile - - def getFieldList(self, name, index = 0): - raise NotImplementedError - - def getFileList(self, name, index = 0): - return NotImplementedError - - def hasField(self, name, index = 0): - raise NotImplementedError - - def hasFile(self, name, index = 0): - raise NotImplementedError - - def readFile(self): - return None - - -def readSubmission(httpRequestHandler): - # Get query, headers and form variables. - if httpRequestHandler.headers.typeheader is None: - if httpRequestHandler.command in ("GET", "HEAD", "POST"): - contentTypeHeader = "application/x-www-form-urlencoded" - else: - contentTypeHeader = httpRequestHandler.headers.type - else: - contentTypeHeader = httpRequestHandler.headers.typeheader - contentType, contentTypeOptions = cgi.parse_header(contentTypeHeader) - contentLength = httpRequestHandler.headers.get("content-length") - try: - contentLength = int(contentLength) - except (TypeError, ValueError): - contentLength = 0 - if contentType == "application/x-www-form+xml": - submission = XmlFormSubmission(httpRequestHandler, contentType, contentLength) - elif contentType in ("application/x-www-form-urlencoded", "multipart/form-data"): - submission = FieldStorageSubmission( - httpRequestHandler, contentType, contentLength, contentTypeHeader) - else: - submission = FileUploadSubmission(httpRequestHandler, contentType, contentLength) - return submission diff --git a/python/tests/web.py b/python/tests/web.py deleted file mode 100644 index 55011a28..00000000 --- a/python/tests/web.py +++ /dev/null @@ -1,159 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# HTTP Client and Server Enhanced Classes -# By: Frederic Peters <fpeters@entrouvert.com> -# Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://www.entrouvert.org -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - - -"""HTTP client and server enhanced classes - -Features: -- HTTPS using OpenSSL; -- web sessions (with or without cookie); -- user authentication (support of basic HTTP-authentication, X.509v3 certificate authentication, - HTML based authentication, etc). -""" - - -import urlparse - -from OpenSSL import SSL - -import abstractweb -import http - - -class ReceivedHttpResponse(object): - body = None - headers = None - statusCode = None # 200 or... - statusMessage = None - - def __init__(self, statusCode = 200, statusMessage = None, headers = None, body = None): - if statusCode: - self.statusCode = statusCode - if statusMessage: - self.statusMessage = statusMessage - if headers: - self.headers = headers - if body: - self.body = body - - -class WebClient(abstractweb.WebClientMixin, object): - certificateAbsolutePath = None - privateKeyAbsolutePath = None - peerCaCertificateAbsolutePath = None - - def sendHttpRequest(self, method, url, headers = None, body = None): - parsedUrl = urlparse.urlparse(url) - addressingScheme, hostName, path = parsedUrl[:3] - if addressingScheme == 'https': - connection = http.HttpsConnection( - hostName, None, self.privateKeyAbsolutePath, self.certificateAbsolutePath, - self.peerCaCertificateAbsolutePath) - else: - connection = httplib.HTTPConnection(hostName) - if headers: - httpRequestHeaders = self.httpRequestHeaders.copy() - for name, value in headers.iteritems(): - httpRequestHeaders[name] = value - else: - httpRequestHeaders = self.httpRequestHeaders - failUnless('Content-Type' in httpRequestHeaders) - try: - connection.request('POST', path, body, httpRequestHeaders) - except SSL.Error, error: - if error.args and error.args[0] and error.args[0][0] \ - and error.args[0][0][0] == 'SSL routines': - logger.debug('SSL Error in sendHttpRequest. Error = %s' % repr(error)) - raise - response = connection.getresponse() - try: - body = response.read() - except SSL.SysCallError, error: - logger.debug('No SOAP answer in sendHttpRequest. Error = %s' % repr(error)) - raise - httpResponse = ReceivedHttpResponse(response.status, response.reason, response.msg, body) - return httpResponse - - -class WebSession(abstractweb.WebSessionMixin, object): - """Simulation of session of a web site""" - - expirationTime = None # A sample session variable - lassoLoginDump = None # Used only by some identity providers - lassoSessionDump = None - publishToken = False - - -class WebUser(abstractweb.WebUserMixin, object): - """Simulation of user of a web site""" - - lassoIdentityDump = None - language = 'fr' # A sample user variable - password = None - - -class WebSite(abstractweb.WebSiteMixin, WebClient): - url = None # The main URL of web site - WebSession = WebSession - WebUser = WebUser - - def __init__(self, url): - WebClient.__init__(self) - abstractweb.WebSiteMixin.__init__(self) - self.url = url - - def authenticateLoginPasswordUser(self, login, password): - # We should check login & password and return the user if one matches or None otherwise. - # FIXME: Check password also. - return self.users.get(login) - - def login_local(self, handler): - user = handler.user - if user is None: - failUnless(handler.useHttpAuthentication) - return handler.outputErrorUnauthorized(handler.httpRequest.path) - else: - # The user is already authenticated using HTTP authentication. - userAuthenticated = True - - # authenticationMethod = lasso.samlAuthenticationMethodPassword - authenticationMethod = 'urn:oasis:names:tc:SAML:1.0:am:password' - if userAuthenticated: - session = handler.session - if session is None: - session = handler.createSession() - # No need to publish token, because we are using HTTP authentication. - if session.publishToken: - del session.publishToken - user = handler.user - if user is None: - user = handler.createUser() - session.userId = user.uniqueId - user.sessionToken = session.token - return self.login_done(handler, userAuthenticated, authenticationMethod) - - def login_failed(self, handler): - if handler.useHttpAuthentication: - handler.useHttpAuthentication = 'not this time' - return handler.respond(401, 'Access Unauthorized: User has no account.') diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py deleted file mode 100644 index 2d6738fa..00000000 --- a/python/tests/websimulator.py +++ /dev/null @@ -1,244 +0,0 @@ -# -*- coding: UTF-8 -*- - - -# Lasso Simulator -# By: Emmanuel Raviart <eraviart@entrouvert.com> -# -# Copyright (C) 2004 Entr'ouvert -# http://lasso.entrouvert.org -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -import abstractweb - - -class HttpRequest(abstractweb.HttpRequestMixin, object): - client = None # Principal or web site sending the request. - form = None - - def __init__(self, client, method, url, headers = None, body = None, form = None): - self.client = client - self.method = method - self.url = url - if headers: - self.headers = headers - if body: - self.body = body - if form: - self.form = form - - def getFormField(self, name, default = None): - if self.form is not None: - return self.form.get(name, default) - return default - - def getPath(self): - return self.pathAndQuery.split('?', 1)[0] - - def getPathAndQuery(self): - urlWithoutScheme = self.url[self.url.find('://') + 3:] - if '/' in urlWithoutScheme: - pathAndQuery = urlWithoutScheme[urlWithoutScheme.find('/'):] - else: - pathAndQuery = '' - return pathAndQuery - - def getQuery(self): - splitedUrl = self.pathAndQuery.split('?', 1) - if len(splitedUrl) > 1: - return splitedUrl[1] - else: - return '' - - def getScheme(self): - return self.url.split(':', 1)[0].lower() - - def hasFormField(self, name): - if self.form is None: - return False - return name in self.form - - def send(self): - webSite = self.client.internet.getWebSite(self.url) - return webSite.handleHttpRequest(self) - - path = property(getPath) - pathAndQuery = property(getPathAndQuery) - query = property(getQuery) - scheme = property(getScheme) - - -class FunctionHttpRequest(abstractweb.FunctionHttpRequest): - def getClient(self): - return self.previousHttpRequest.client - - client = property(getClient) - - -class HttpResponse(abstractweb.HttpResponseMixin, object): - def send(self, httpRequestHandler): - return self - - -class HttpRequestHandler(abstractweb.HttpRequestHandlerMixin, object): - HttpResponse = HttpResponse # Class - - def __init__(self, site, httpRequest): - self.site = site - self.httpRequest = httpRequest - - def createSession(self): - session = abstractweb.HttpRequestHandlerMixin.createSession(self) - self.httpRequest.client.sessionTokens[self.site.url] = session.token - return session - - def respondRedirectTemporarily(self, url): - scheme = url.split('://')[0].lower() - if scheme not in ('http', 'https'): - # The url doesn't include host name => add it. - path = url - url = self.site.url - if path: - if path[0] == '/': - while url[-1] == '/': - url = url[:-1] - elif url[-1] != '/': - url += '/' - url += path - return self.httpRequest.client.redirect(url) - - -class Internet(object): - webSites = None - - def __init__(self): - self.webSites = {} - - def addWebSite(self, webSite): - self.webSites[webSite.url] = webSite - - def getWebSite(self, url): - for webSiteUrl, webSite in self.webSites.iteritems(): - if url.startswith(webSiteUrl): - return webSite - raise Exception('Unknown web site: %s' % url) - - -class WebClient(abstractweb.WebClientMixin, object): - internet = None - keyring = None - sessionTokens = None # Simulate the cookies, stored in user's navigator, and containing the - # IDs of sessions already opened by the user. - - def __init__(self, internet): - self.internet = internet - self.keyring = {} - self.sessionTokens = {} - - def redirect(self, url): - return self.sendHttpRequest('GET', url) - - def sendHttpRequest(self, method, url, headers = None, body = None, form = None): - if headers: - httpRequestHeaders = self.httpRequestHeaders.copy() - for name, value in headers.iteritems(): - httpRequestHeaders[name] = value - else: - httpRequestHeaders = self.httpRequestHeaders - return HttpRequest( - self, method, url, headers = httpRequestHeaders, body = body, form = form).send() - - def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None, - form = None): - url = webSite.url - if path: - if path[0] == '/': - while url[-1] == '/': - url = url[:-1] - elif url[-1] != '/': - url += '/' - url += path - return self.sendHttpRequest(method, url, headers = headers, body = body, form = form) - - -class Principal(WebClient): - """Simulation of a user and its web navigator""" - - name = None # The user name - - def __init__(self, internet, name): - WebClient.__init__(self, internet) - self.name = name - - -class WebSession(abstractweb.WebSessionMixin, object): - """Simulation of session of a web site""" - - expirationTime = None # A sample session variable - lassoLoginDump = None # Used only by some identity providers - lassoSessionDump = None - - -class WebUser(abstractweb.WebUserMixin, object): - """Simulation of user of a web site""" - - lassoIdentityDump = None - language = 'fr' # A sample user variable - - -class WebSite(abstractweb.WebSiteMixin, WebClient): - """Simulation of a web site""" - - FunctionHttpRequest = FunctionHttpRequest # Class - url = None # The main URL of web site - WebSession = WebSession - WebUser = WebUser - - def __init__(self, internet, url): - WebClient.__init__(self, internet) - abstractweb.WebSiteMixin.__init__(self) - self.url = url - self.internet.addWebSite(self) - - def handleHttpRequest(self, httpRequest): - httpRequestHandler = HttpRequestHandler(self, httpRequest) - - # Retrieve session and user. - sessionToken = httpRequest.client.sessionTokens.get(self.url, None) - if sessionToken is not None: - session = self.sessions.get(sessionToken) - if session is not None: - httpRequestHandler.session = session - if session.userId is not None: - httpRequestHandler.user = self.users.get(session.userId, None) - - return self.handleHttpRequestHandler(httpRequestHandler) - - def login_local(self, handler): - userId = handler.httpRequest.client.keyring.get(self.url, None) - userAuthenticated = userId in self.users - # authenticationMethod = lasso.samlAuthenticationMethodPassword - authenticationMethod = 'urn:oasis:names:tc:SAML:1.0:am:password' - if userAuthenticated: - session = handler.session - if session is None: - session = handler.createSession() - user = handler.user - if user is None: - user = handler.createUser() - session.userId = user.uniqueId - user.sessionToken = session.token - return self.login_done(handler, userAuthenticated, authenticationMethod) |
