summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorEmmanuel Raviart <eraviart@entrouvert.com>2004-08-25 10:12:13 +0000
committerEmmanuel Raviart <eraviart@entrouvert.com>2004-08-25 10:12:13 +0000
commit9d460cf67c3999a0ec7723c377a60df07268b5c8 (patch)
treec385668c848d9a322a6ec45433af18d00d0f50c7 /python
parent738257f33a816017ba2edf01288f14f61ca2f21f (diff)
downloadlasso-9d460cf67c3999a0ec7723c377a60df07268b5c8.tar.gz
lasso-9d460cf67c3999a0ec7723c377a60df07268b5c8.tar.xz
lasso-9d460cf67c3999a0ec7723c377a60df07268b5c8.zip
Removed obsolete Python test framework.
Diffstat (limited to 'python')
-rw-r--r--python/tests/IdentityProvider.py257
-rw-r--r--python/tests/LibertyEnabledClientProxy.py131
-rw-r--r--python/tests/LibertyEnabledProxy.py65
-rw-r--r--python/tests/Provider.py60
-rw-r--r--python/tests/ServiceProvider.py321
-rw-r--r--python/tests/abstractweb.py329
-rw-r--r--python/tests/assertions.py126
-rw-r--r--python/tests/builtins.py45
-rw-r--r--python/tests/http.py935
-rw-r--r--python/tests/liberty.py60
-rw-r--r--python/tests/libertysimulator.py60
-rw-r--r--python/tests/login_tests.py257
-rwxr-xr-xpython/tests/sample-idp.py150
-rwxr-xr-xpython/tests/sample-lep.py152
-rwxr-xr-xpython/tests/sample-sp-lep.py147
-rwxr-xr-xpython/tests/sample-sp.py147
-rw-r--r--python/tests/submissions.py292
-rw-r--r--python/tests/web.py159
-rw-r--r--python/tests/websimulator.py244
19 files changed, 27 insertions, 3910 deletions
diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py
deleted file mode 100644
index 90673d61..00000000
--- a/python/tests/IdentityProvider.py
+++ /dev/null
@@ -1,257 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import lasso
-
-import Provider
-
-
-class IdentityProviderMixin(Provider.ProviderMixin):
- soapResponseMsgs = None
-
- def __init__(self):
- Provider.ProviderMixin.__init__(self)
- self.soapResponseMsgs = {}
-
- def login_done(self, handler, userAuthenticated, authenticationMethod):
- # Reconstruct Lasso login from dump.
- lassoServer = self.getLassoServer()
- session = handler.session
- failUnless(session)
- failUnless(session.lassoLoginDump)
- login = lasso.Login.new_from_dump(lassoServer, session.lassoLoginDump)
- del session.lassoLoginDump
- # Set identity & session in login, because session.lassoLoginDump doesn't contain them.
- if session.lassoSessionDump is not None:
- login.set_session_from_dump(session.lassoSessionDump)
- user = handler.user
- if user is not None and user.lassoIdentityDump is not None:
- login.set_identity_from_dump(user.lassoIdentityDump)
-
- return self.singleSignOn_done(handler, login, userAuthenticated, authenticationMethod)
-
- def singleSignOn(self, handler):
- lassoServer = self.getLassoServer()
- if handler.httpRequest.method == 'GET':
- # Single sign-on using HTTP redirect.
- login = lasso.Login(lassoServer)
- session = handler.session
- if session is not None and session.lassoSessionDump is not None:
- login.set_session_from_dump(session.lassoSessionDump)
- user = handler.user
- if user is not None and user.lassoIdentityDump is not None:
- login.set_identity_from_dump(user.lassoIdentityDump)
- login.init_from_authn_request_msg(handler.httpRequest.query, lasso.httpMethodRedirect)
-
- if not login.must_authenticate():
- userAuthenticated = user is not None
- authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
- return self.singleSignOn_done(
- handler, login, userAuthenticated, authenticationMethod)
-
- # The authentication may need to change page (needed for a HTML form, for example).
- # => Save Lasso login as a dump in session, so that we retrieve it once the user is
- # authenticated.
- if session is None:
- session = handler.createSession()
- session.publishToken = True
- session.lassoLoginDump = login.dump()
- return self.callHttpFunction(self.login, handler)
-
- elif handler.httpRequest.method == 'POST' \
- and handler.httpRequest.headers.get('Content-Type', None) == 'text/xml':
- # SOAP request => LECP single sign-on.
- lecp = lasso.Lecp(lassoServer)
- session = handler.session
- if session is not None and session.lassoSessionDump is not None:
- lecp.set_session_from_dump(session.lassoSessionDump)
- user = handler.user
- if user is not None and user.lassoIdentityDump is not None:
- lecp.set_identity_from_dump(user.lassoIdentityDump)
- lecp.init_from_authn_request_msg(handler.httpRequest.body, lasso.httpMethodSoap)
- # FIXME: lecp.must_authenticate() should always return False. Because we are in SOAP.
- # And we can't do a HTTP redirect in SOAP.
- # The other solution is that we shall not call lecp.must_authenticate().
- # failIf(lecp.must_authenticate())
- userAuthenticated = user is not None
- authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
- lecp.build_authn_response_envelope_msg(
- userAuthenticated, authenticationMethod,
- "2005-05-03T16:12:00Z", # FIXME: reauthenticateOnOrAfter
- )
- soapResponseMsg = lecp.msg_body
- failUnless(soapResponseMsg)
- # FIXME: Lasso should set a lecp.msg_content_type to
- # "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with
- # other profiles.
- # contentType = lecp.msg_content_type
- # failUnlessEqual(contentType, 'application/vnd.liberty-response+xml')
- contentType = 'application/vnd.liberty-response+xml'
- headers = {
- 'Content-Type': contentType,
- 'Cache-Control': 'no-cache',
- 'Pragma': 'no-cache',
- }
- headers.update(self.libertyEnabledHeaders)
- return handler.respond(headers = headers, body = soapResponseMsg)
- else:
- return handler.respond(
- 400,
- 'Bad Request: Method %s not handled by singleSignOn' % handler.httpRequest.method)
-
- def singleSignOn_done(self, handler, login, userAuthenticated, authenticationMethod):
- failUnlessEqual(login.protocolProfile, lasso.loginProtocolProfileBrwsArt) # FIXME
- login.build_artifact_msg(
- userAuthenticated, authenticationMethod,
- "2005-05-03T16:12:00Z", # FIXME: reauthenticateOnOrAfter
- lasso.httpMethodRedirect)
- if userAuthenticated:
- session = handler.session
- failUnless(session)
- user = handler.user
- failUnless(user)
- if login.is_identity_dirty():
- lassoIdentityDump = login.get_identity().dump()
- failUnless(lassoIdentityDump)
- user.lassoIdentityDump = lassoIdentityDump
- failUnless(login.is_session_dirty())
- lassoSessionDump = login.get_session().dump()
- failUnless(lassoSessionDump)
- session.lassoSessionDump = lassoSessionDump
- nameIdentifier = login.nameIdentifier
- failUnless(nameIdentifier)
- self.userIdsByNameIdentifier[nameIdentifier] = user.uniqueId
- self.sessionTokensByNameIdentifier[nameIdentifier] = session.token
- else:
- failIf(login.is_identity_dirty())
- failIf(login.is_session_dirty())
- artifact = login.assertionArtifact
- failUnless(artifact)
- soapResponseMsg = login.response_dump
- failUnless(soapResponseMsg)
- self.soapResponseMsgs[artifact] = soapResponseMsg
- responseUrl = login.msg_url
- failUnless(responseUrl)
- return handler.respondRedirectTemporarily(responseUrl)
-
- def soapEndpoint(self, handler):
- soapRequestMsg = handler.httpRequest.body
- requestType = lasso.get_request_type_from_soap_msg(soapRequestMsg)
- if requestType == lasso.requestTypeLogin:
- lassoServer = self.getLassoServer()
- login = lasso.Login(lassoServer)
- # FIXME: What should we return when there is an error in process_request_msg?
- # FIXME: Create a new Lasso function build_response_msg, with either None or
- # soapResponseMessage as argument. It is called after process_request_message and
- # should either create a new response or keep the one in soapResponseMsg (if it already
- # contained an error or if there is no error).
- login.process_request_msg(soapRequestMsg)
- artifact = login.assertionArtifact
- failUnless(artifact)
- soapResponseMsg = self.soapResponseMsgs.get(artifact, None)
- if soapResponseMsg is None:
- raise Exception('FIXME: Handle the case when artifact is wrong')
- del self.soapResponseMsgs[artifact]
- return handler.respond(
- headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg)
- elif requestType == lasso.requestTypeLogout:
- lassoServer = self.getLassoServer()
- logout = lasso.Logout(lassoServer, lasso.providerTypeIdp)
- logout.process_request_msg(soapRequestMsg, lasso.httpMethodSoap)
- nameIdentifier = logout.nameIdentifier
- failUnless(nameIdentifier)
-
- # Retrieve session and user using name identifier.
- session = self.getSessionFromNameIdentifier(nameIdentifier)
- if session is None:
- raise Exception('FIXME: Handle the case when there is no web session')
- user = self.getUserFromNameIdentifier(nameIdentifier)
- if user is None:
- raise Exception('FIXME: Handle the case when there is no web user')
-
- # The identity provider may want to do some things, before logging out.
- self.soapEndpoint_logout_prepare(handler, session, user)
-
- if session.lassoSessionDump is None:
- raise Exception(
- 'FIXME: Handle the case when there is no session dump in web session')
- logout.set_session_from_dump(session.lassoSessionDump)
- if user.lassoIdentityDump is None:
- raise Exception(
- 'FIXME: Handle the case when there is no identity dump in web user')
- logout.set_identity_from_dump(user.lassoIdentityDump)
-
- logout.validate_request()
- failIf(logout.is_identity_dirty())
- lassoIdentity = logout.get_identity()
- failUnless(lassoIdentity)
- lassoIdentityDump = lassoIdentity.dump()
- failUnless(lassoIdentityDump)
- failUnless(logout.is_session_dirty())
-
- # Log the user out.
- # It is done before logout from other service providers, since we don't want to
- # accept passive login connections inbetween.
- del session.lassoSessionDump
- del session.userId
- del user.sessionToken
- # We also delete the session, but it is not mandantory, since the user is logged out
- # anyway.
- del self.sessions[session.token]
- nameIdentifier = logout.nameIdentifier
- failUnless(nameIdentifier)
- del self.sessionTokensByNameIdentifier[nameIdentifier]
-
- # Tell each other service provider to logout the user.
- otherProviderId = logout.get_next_providerID()
- while otherProviderId is not None:
- logout.init_request(otherProviderId)
- logout.build_request_msg()
-
- soapEndpoint = logout.msg_url
- failUnless(soapEndpoint)
- soapRequestMsg = logout.msg_body
- failUnless(soapRequestMsg)
- httpResponse = self.sendHttpRequest(
- 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
- body = soapRequestMsg)
- failUnlessEqual(httpResponse.statusCode, 200)
- logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
-
- otherProviderId = logout.get_next_providerID()
-
- logout.build_response_msg()
- soapResponseMsg = logout.msg_body
- failUnless(soapResponseMsg)
- return handler.respond(
- headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg)
- else:
- raise Exception('Unknown request type: %s' % requestType)
-
- def soapEndpoint_logout_prepare(self, handler, session, user):
- """Prepare logout.
-
- Override this method to do some processing before identity provider logout proceeds.
- """
- pass
diff --git a/python/tests/LibertyEnabledClientProxy.py b/python/tests/LibertyEnabledClientProxy.py
deleted file mode 100644
index b925d8fc..00000000
--- a/python/tests/LibertyEnabledClientProxy.py
+++ /dev/null
@@ -1,131 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import lasso
-
-import abstractweb
-
-
-class LibertyEnabledClientProxyMixin(abstractweb.WebClientMixin):
- # A service provider MAY provide a list of identity providers it recognizes by including the
- # <lib:IDPList> element in the <lib:AuthnRequestEnvelope>. The format and processing rules for
- # the identity provider list MUST be as defined in [LibertyProtSchema].
-
- # The identity provider list can be used by the LECP to create a user identifier to be
- # presented to the Principal. For example, the LECP could compare the list of the Principal's
- # known identities (and the identities of the identity provider that provides those identities)
- # against the list provided by the service provider and then only display the intersection.
-
- # If the service provider does not support the LECP-advertised Liberty version, the service
- # provider MUST return to the LECP an HTTP 501 response with the reason phrase "Unsupported
- # Liberty Version."
- #
- # The responses in step 3 and step 6 SHOULD NOT be cached. To this end service providers and
- # identity providers SHOULD place both "Cache-Control: no-cache" and "Pragma: no-cache" on
- # their responses to ensure that the LECP and any intervening proxies will not cache the
- # response.
-
- # If the LECP discovers a syntax error due to the service provider or cannot proceed any
- # further for other reasons (for example, cannot resolve identity provider, cannot reach the
- # identity provider, etc.), the LECP MUST return to the service provider a <lib:AuthnResponse>
- # with a <samlp:Status> indicating the desired error element as defined in
- # [LibertyProtSchema]. The <lib:AuthnResponse> containing the error status MUST be sent using
- # a POST to the service provider's assertion consumer service URL obtained from the
- # <lib:AssertionConsumerServiceURL> element of the <lib:AuthnRequestEnvelope>. The POST MUST
- # be a form that contains the field LARES with the value being the <lib:AuthnResponse>
- # protocol message as defined in [LibertyProtSchema], containing the <samlp:Status>. The
- # <lib:AuthnResponse> MUST be encoded by applying a base64 transformation (refer to
- # [RFC2045]) to the <lib:AuthnResponse> and all its elements.
-
- httpRequestHeaders = abstractweb.WebClientMixin.httpRequestHeaders.copy()
- httpRequestHeaders.update({
- # FIXME: Is this the correct syntax for several URLs in LIBV?
- 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1',
- 'Liberty-Agent': 'LassoSimulator/0.0.0',
- # FIXME: As an alternative to 'Liberty-Enabled' header, a user agent may use:
- # 'User-Agent': ' '.join((
- # httpRequestHeaders['User-Agent'],
- # 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1'))
- 'Accept': ','.join((httpRequestHeaders['Accept'], 'application/vnd.liberty-request+xml'))
- })
- idpSite = None # The identity provider, this LECP will use to authenticate users.
- lassoServerDump = None
- principal = None
-
- def getLassoServer(self):
- return lasso.Server.new_from_dump(self.lassoServerDump)
-
- def getSessionTokens(self):
- # LECP is a proxy, not au principal, so it doesn't have its own sessionTokens.
- if self.principal is None:
- return {}
- return self.principal.sessionTokens
-
- def login(self, principal, site, path):
- self.principal = principal
-
- httpResponse = self.sendHttpRequestToSite(site, 'GET', path)
- failUnlessEqual(
- httpResponse.headers['Content-Type'], 'application/vnd.liberty-request+xml')
- libertyEnabledHeader = httpResponse.headers.get('Liberty-Enabled')
- failUnless(libertyEnabledHeader)
- failUnless('LIBV=urn:liberty:iff:2003-08' in libertyEnabledHeader)
- lassoServer = self.getLassoServer()
- lecp = lasso.Lecp(lassoServer)
- authnRequestEnvelope = httpResponse.body
- lecp.process_authn_request_envelope_msg(authnRequestEnvelope)
- # FIXME: The service provider could return an IDPList in authnRequestEnvelope, so that
- # we verify that self.idpSingleSignOnServiceUrl belongs to one of them
- lecp.build_authn_request_msg(self.idpSite.providerId)
- failUnless(lecp.msg_url)
- failUnless(lecp.msg_body)
- httpResponse = self.sendHttpRequest(
- 'POST', lecp.msg_url, headers = {'Content-Type': 'text/xml'},
- body = lecp.msg_body)
- failUnlessEqual(
- httpResponse.headers.get('Content-Type', None), 'application/vnd.liberty-response+xml')
- libertyEnabledHeader = httpResponse.headers.get('Liberty-Enabled')
- failUnless(libertyEnabledHeader)
- failUnless('LIBV=urn:liberty:iff:2003-08' in libertyEnabledHeader)
- lecp.process_authn_response_envelope_msg(httpResponse.body)
- lecp.build_authn_response_msg()
- failUnless(lecp.msg_url)
- failUnless(lecp.msg_body)
-
- del self.principal
-
- # FIXME: Should we use 'multipart/form-data' for forms?
- return self.sendHttpRequest(
- 'POST', lecp.msg_url, headers = {'Content-Type': 'multipart/form-data'},
- form = {'LARES': lecp.msg_body})
-
- def setKeyring(self, keyring):
- # LECP is a proxy, not au principal, so it doesn't have its own keyring.
- pass
-
- def setSessionTokens(self, sessionTokens):
- # LECP is a proxy, not au principal, so it doesn't have its own sessionTokens.
- pass
-
- sessionTokens = property(getSessionTokens, setSessionTokens)
diff --git a/python/tests/LibertyEnabledProxy.py b/python/tests/LibertyEnabledProxy.py
deleted file mode 100644
index 3b33e810..00000000
--- a/python/tests/LibertyEnabledProxy.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import lasso
-
-from IdentityProvider import IdentityProviderMixin
-from ServiceProvider import ServiceProviderMixin
-
-
-class LibertyEnabledProxyMixin(IdentityProviderMixin, ServiceProviderMixin):
- def __init__(self):
- ServiceProviderMixin.__init__(self)
- IdentityProviderMixin.__init__(self)
-
- def assertionConsumer_done(self, handler):
- # Before, this proxy was considered as a service provider. Now it acts again as an identity
- # provider.
- # FIXME: We should retrieve authentication method from session.lassoSessionDump.
- # FIXME: Handle Liberty ProxyCount.
- return self.login_done(handler, True, lasso.samlAuthenticationMethodPassword)
-
- def login(self, handler):
- # Before, this proxy was considered as an identity provider. Now it is a service provider.
- # FIXME: Handle Liberty ProxyCount.
- return ServiceProviderMixin.login(self, handler)
-
- def login_failed(self, handler):
- # Before, this proxy was considered as a service provider. Now it acts again as an identity
- # provider.
- # FIXME: Handle Liberty ProxyCount.
- return self.login_done(handler, False, None)
-
- def logout_done(self, handler, nameIdentifier):
- # Before, this proxy was considered as a service provider. Now it acts again as an identity
- # provider.
- # FIXME: Handle Liberty ProxyCount.
-
- # Don't do logout_done actions, because they will be done in soapEndpoint.
- return None
-
- def soapEndpoint_logout_prepare(self, handler, session, user):
- # Before, this proxy was considered as an identity provider. Now it is a service provider.
- # FIXME: Handle Liberty ProxyCount.
- return self.logout_do(handler, session, user)
diff --git a/python/tests/Provider.py b/python/tests/Provider.py
deleted file mode 100644
index 5f40dad9..00000000
--- a/python/tests/Provider.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import lasso
-
-import abstractweb
-
-
-class ProviderMixin(abstractweb.WebSiteMixin):
- lassoServerDump = None
- libertyEnabledHeaders = {
- 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1',
- }
- providerId = None # The Liberty providerID of this web site
- sessionTokensByNameIdentifier = None
- userIdsByNameIdentifier = None
-
- def __init__(self):
- abstractweb.WebSiteMixin.__init__(self)
- self.userIdsByNameIdentifier = {}
- self.sessionTokensByNameIdentifier = {}
-
- def getLassoServer(self):
- return lasso.Server.new_from_dump(self.lassoServerDump)
-
- def getSessionFromNameIdentifier(self, nameIdentifier):
- sessionToken = self.sessionTokensByNameIdentifier.get(nameIdentifier, None)
- if sessionToken is None:
- # The user has no federation on this site or has no authentication assertion for this
- # federation.
- return None
- return self.sessions.get(sessionToken, None)
-
- def getUserFromNameIdentifier(self, nameIdentifier):
- userId = self.userIdsByNameIdentifier.get(nameIdentifier, None)
- if userId is None:
- # The user has no federation on this site.
- return None
- return self.users.get(userId, None)
diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py
deleted file mode 100644
index b906dec9..00000000
--- a/python/tests/ServiceProvider.py
+++ /dev/null
@@ -1,321 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import lasso
-
-import Provider
-
-
-class ServiceProviderMixin(Provider.ProviderMixin):
- createNewAccountWhenNewFederationForUnknownUser = False
- idpSite = None # The identity provider, this service provider will use to authenticate users.
-
- def assertionConsumer(self, handler):
- lassoServer = self.getLassoServer()
- login = lasso.Login(lassoServer)
-
- if handler.httpRequest.method == 'GET':
- relayState = handler.httpRequest.getQueryField('RelayState', None)
- login.init_request(handler.httpRequest.query, lasso.httpMethodRedirect)
- login.build_request_msg()
-
- soapEndpoint = login.msg_url
- failUnless(soapEndpoint)
- soapRequestMsg = login.msg_body
- failUnless(soapRequestMsg)
- httpResponse = self.sendHttpRequest(
- 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
- body = soapRequestMsg)
- failUnlessEqual(httpResponse.statusCode, 200)
- try:
- login.process_response_msg(httpResponse.body)
- except lasso.Error, error:
- if error.code == -7: # FIXME: This will change, he said.
- return handler.respond(
- 401,
- 'Access Unauthorized: User authentication failed on identity provider.')
- else:
- raise
- elif handler.httpRequest.method == 'POST':
- relayState = handler.httpRequest.getFormField('RelayState', None)
- authnResponseMsg = handler.httpRequest.getFormField('LARES', None)
- failUnless(authnResponseMsg)
- # FIXME: Should we do an init before process_authn_response_msg?
- try:
- login.process_authn_response_msg(authnResponseMsg)
- except lasso.Error, error:
- if error.code == -7: # FIXME: This will change, he said.
- return handler.respond(
- 401,
- 'Access Unauthorized: User authentication failed on identity provider.')
- else:
- raise
- else:
- return handler.respond(
- 400,
- 'Bad Request: Method %s not handled by assertionConsumer'
- % handler.httpRequest.method)
-
- nameIdentifier = login.nameIdentifier
- failUnless(nameIdentifier)
-
- # Retrieve session dump, using name identifier or else try to use the client web session.
- # If session dump exists, give it to Lasso, so that it updates it.
- session = self.getSessionFromNameIdentifier(nameIdentifier)
- if session is None:
- session = handler.session
- if session is not None and session.lassoSessionDump is not None:
- login.set_session_from_dump(session.lassoSessionDump)
- # Retrieve identity dump, using name identifier or else try to retrieve him from web
- # session. If identity dump exists, give it to Lasso, so that it updates it.
- user = self.getUserFromNameIdentifier(nameIdentifier)
- if user is None:
- user = handler.user
- if user is not None and user.lassoIdentityDump is not None:
- login.set_identity_from_dump(user.lassoIdentityDump)
-
- login.accept_sso()
- if user is not None and user.lassoIdentityDump is None:
- failUnless(login.is_identity_dirty())
- lassoIdentity = login.get_identity()
- failUnless(lassoIdentity)
- lassoIdentityDump = lassoIdentity.dump()
- failUnless(lassoIdentityDump)
- failUnless(login.is_session_dirty())
- lassoSession = login.get_session()
- failUnless(lassoSession)
- lassoSessionDump = lassoSession.dump()
- failUnless(lassoSessionDump)
-
- # User is now authenticated.
-
- # If there was no web session yet, create it. Idem for the web user account.
- if session is None:
- session = handler.createSession()
- session.publishToken = True
- if user is None:
- # The user has been successfully authenticated on identity provider, but he has no
- # account on this service provider or his account is not federated yet and he is not
- # logged.
- # A real service provider would ask user to login locally to create a federation. Or it
- # would ask user informations to create a local account. Or it would automatically
- # create a new account...
- if self.createNewAccountWhenNewFederationForUnknownUser:
- user = handler.createUser()
- else:
- # Save some informations in session for a short time (until user is logged).
- # These informations can't be stored as fields in URL query, because they are too
- # large.
- session.lassoIdentityDump = lassoIdentityDump
- session.lassoSessionDump = lassoSessionDump
- session.nameIdentifier = nameIdentifier
- session.relayState = relayState
-
- # We do a redirect now for two reasons:
- # - We don't want the user to be able to reload assertionConsumer page (because the
- # artifact has been removed from identity-provider).
- # - For HTTP authentication, we don't want to emit a 401 Unauthorized that would
- # force the Principal to reload the assertionConsumer page.
- # FIXME: Add the session token to redirect URL.
- return handler.respondRedirectTemporarily('/login_local')
-
- session.userId = user.uniqueId
- user.sessionToken = session.token
-
- # Store the updated identity dump and session dump.
- session.lassoSessionDump = lassoSessionDump
- if login.is_identity_dirty():
- user.lassoIdentityDump = lassoIdentityDump
-
- self.userIdsByNameIdentifier[nameIdentifier] = user.uniqueId
- self.sessionTokensByNameIdentifier[nameIdentifier] = session.token
-
- # We do a redirect now because we don't want the user to be able to reload
- # assertionConsumer page (because the artifact has been removed from identity-provider).
- # FIXME: Add the session token to redirect URL.
- redirectUrl = '/assertionConsumer_done'
- if relayState:
- redirectUrl = '%s?RelayState=%s' % (redirectUrl, relayState)
- return handler.respondRedirectTemporarily(redirectUrl)
-
- def assertionConsumer_done(self, handler):
- # A real service provider could use the string relayState for any purpose.
- relayState = handler.httpRequest.getQueryField('RelayState', None)
- return handler.respond(
- 200, headers = {'Content-Type': 'text/plain'},
- body = 'Liberty authentication succeeded\nRelayState = %s' % relayState)
-
- def login(self, handler):
- libertyEnabled = handler.httpRequest.headers.get('Liberty-Enabled', None)
- userAgent = handler.httpRequest.headers.get('User-Agent', None)
- # FIXME: Lasso should have a function to compute useLecp.
- # Or this should be done in lasso.Login(lassoServer, libertyEnabled, userAgent)
- useLecp = False
- if libertyEnabled:
- useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled
- if not useLecp:
- return handler.respond(501, 'Unsupported Liberty Version.')
- elif userAgent:
- useLecp = 'urn:liberty:iff:2003-08' in userAgent
- if not useLecp and "LIBV=" in userAgent:
- return handler.respond(501, 'Unsupported Liberty Version.')
- else:
- useLecp = False
-
- forceAuthn = handler.httpRequest.getQueryBoolean('forceAuthn', False)
- isPassive = handler.httpRequest.getQueryBoolean('isPassive', False)
- relayState = handler.httpRequest.getQueryField('RelayState', None)
- lassoServer = self.getLassoServer()
- if useLecp:
- lecp = lasso.Lecp(lassoServer)
- lecp.init_authn_request()
- failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest)
-
- # FIXME: This protocol profile should be set by default by Lasso.
- lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost)
-
- # Same treatement as for non LECP login.
- if forceAuthn:
- lecp.request.set_forceAuthn(forceAuthn)
- if not isPassive:
- lecp.request.set_isPassive(isPassive)
- lecp.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
- lecp.request.set_consent(lasso.libConsentObtained)
- if relayState:
- lecp.request.set_relayState(relayState)
-
- lecp.build_authn_request_envelope_msg()
- authnRequestEnvelopeMsg = lecp.msg_body
- failUnless(authnRequestEnvelopeMsg)
- # FIXME: Lasso should set a lecp.msg_content_type to
- # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with
- # other profiles.
- # contentType = lecp.msg_content_type
- # failUnlessEqual(contentType, 'application/vnd.liberty-request+xml')
- contentType = 'application/vnd.liberty-request+xml'
- headers = {
- 'Content-Type': contentType,
- 'Cache-Control': 'no-cache',
- 'Pragma': 'no-cache',
- }
- headers.update(self.libertyEnabledHeaders)
- return handler.respond(headers = headers, body = authnRequestEnvelopeMsg)
- else:
- login = lasso.Login(lassoServer)
- login.init_authn_request(lasso.httpMethodRedirect)
- #login.init_authn_request()
- failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
- if forceAuthn:
- login.request.set_forceAuthn(forceAuthn)
- if not isPassive:
- login.request.set_isPassive(isPassive)
- login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
- login.request.set_consent(lasso.libConsentObtained)
- if relayState:
- login.request.set_relayState(relayState)
- login.build_authn_request_msg(self.idpSite.providerId)
- authnRequestUrl = login.msg_url
- failUnless(authnRequestUrl)
- return handler.respondRedirectTemporarily(authnRequestUrl)
-
- def login_done(self, handler, userAuthenticated, authenticationMethod):
- # Remove informations that are no more needed in session.
- session = handler.session
- lassoIdentityDump = session.lassoIdentityDump
- del session.lassoIdentityDump
- nameIdentifier = session.nameIdentifier
- del session.nameIdentifier
- relayState = session.relayState
- del session.relayState
-
- if not userAuthenticated:
- return self.login_failed(handler)
-
- # User has been authenticated => Create federation.
- user = handler.user
- user.lassoIdentityDump = lassoIdentityDump
- self.userIdsByNameIdentifier[nameIdentifier] = user.uniqueId
- self.sessionTokensByNameIdentifier[nameIdentifier] = session.token
- # Note: The uppercase for RelayState below is not a bug.
- return self.callHttpFunction(self.assertionConsumer_done, handler, RelayState = relayState)
-
- def logout(self, handler):
- session = handler.session
- if session is None:
- return handler.respond(401, 'Access Unauthorized: User has no session opened.')
- user = handler.user
- if user is None:
- return handler.respond(401, 'Access Unauthorized: User is not logged in.')
- return self.logout_do(handler, session, user)
-
- def logout_do(self, handler, session, user):
- lassoServer = self.getLassoServer()
- logout = lasso.Logout(lassoServer, lasso.providerTypeSp)
- if user.lassoIdentityDump is not None:
- logout.set_identity_from_dump(user.lassoIdentityDump)
- if session.lassoSessionDump is not None:
- logout.set_session_from_dump(session.lassoSessionDump)
- logout.init_request()
- logout.build_request_msg()
-
- soapEndpoint = logout.msg_url
- failUnless(soapEndpoint)
- soapRequestMsg = logout.msg_body
- failUnless(soapRequestMsg)
- httpResponse = self.sendHttpRequest(
- 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg)
- failUnlessEqual(httpResponse.statusCode, 200)
-
- logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
- failIf(logout.is_identity_dirty())
- identity = logout.get_identity()
- failUnless(identity)
- lassoIdentityDump = identity.dump()
- failUnless(lassoIdentityDump)
- failUnless(logout.is_session_dirty())
- lassoSession = logout.get_session()
- if lassoSession is None:
- # The user is no more authenticated on any identity provider. Log him out.
- del session.lassoSessionDump
- del session.userId
- del user.sessionToken
- del handler.user
- # We also delete the session, but it is not mandantory, since the user is logged out
- # anyway.
- del handler.session
- del self.sessions[session.token]
- else:
- # The user is still logged in on some other identity providers.
- lassoSessionDump = lassoSession.dump()
- failUnless(lassoSessionDump)
- session.lassoSessionDump = lassoSessionDump
- nameIdentifier = logout.nameIdentifier
- return self.logout_done(handler, nameIdentifier)
-
- def logout_done(self, handler, nameIdentifier):
- failUnless(nameIdentifier)
- del self.sessionTokensByNameIdentifier[nameIdentifier]
-
- return handler.respond(200, headers = {'Content-Type': 'text/plain'},
- body = 'Liberty logout succeeded')
diff --git a/python/tests/abstractweb.py b/python/tests/abstractweb.py
deleted file mode 100644
index 94fb644d..00000000
--- a/python/tests/abstractweb.py
+++ /dev/null
@@ -1,329 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Abstract web classes for HTTP clients and servers or simulators
-# By: Frederic Peters <fpeters@entrouvert.com>
-# Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://www.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-
-
-"""Abstract web classes for HTTP clients and servers or simulators"""
-
-
-class HttpRequestMixin:
- body = None
- headers = None
- method = None # 'GET' or 'POST' or 'PUT' or...
- path = None
- pathAndQuery = None
- query = None
- scheme = None # 'http' or 'https'
- url = None
-
- def getFormField(self, name, default = None):
- raise NotImplementedError
-
- def getQueryBoolean(self, name, default = None):
- fieldValue = self.getQueryField(name, 'none')
- if fieldValue == 'none':
- return default
- else:
- return fieldValue.lower not in ('', '0', 'false')
-
- def getQueryField(self, name, default = None):
- if self.query:
- for field in self.query.split('&'):
- fieldName, fieldValue = field.split('=')
- if name == fieldName:
- return fieldValue
- return default
-
- def hasFormField(self, name):
- raise NotImplementedError
-
- def hasQueryField(self, name):
- if self.query:
- for field in self.query.split('&'):
- fieldName, fieldValue = field.split('=')
- if name == fieldName:
- return True
- return False
-
-
-class FunctionHttpRequest(HttpRequestMixin, object):
- method = 'GET'
- previousHttpRequest = None
- queryFields = None
-
- def __init__(self, previousHttpRequest, **queryFields):
- self.previousHttpRequest = previousHttpRequest
- self.queryFields = queryFields
-
- def getFormField(self, name, default = None):
- return default
-
- def getHeaders(self):
- return self.previousHttpRequest.headers
-
- def getQueryField(self, name, default = None):
- return self.queryFields.get(name, default)
-
- def hasFormField(self, name):
- return False
-
- def hasQueryField(self, name):
- return name in self.queryFields
-
- headers = property(getHeaders)
-
-
-class HttpResponseMixin:
- body = None
- defaultStatusMessages = {
- '100': 'Continue',
- '101': 'Switching Protocols',
- '200': 'OK',
- '201': 'Created',
- '202': 'Accepted',
- '203': 'Non-Authoritative Information',
- '204': 'No Content',
- '205': 'Reset Content',
- '206': 'Partial Content',
- '300': 'Multiple Choices',
- '301': 'Moved Permanently',
- '302': 'Found',
- '303': 'See Other',
- '304': 'Not Modified',
- '305': 'Use Proxy',
- '307': 'Temporary Redirect',
- '400': 'Bad Request',
- '401': 'Unauthorized',
- '402': 'Payment Required',
- '403': 'Forbidden',
- '404': 'Not Found',
- '405': 'Method Not Allowed',
- '406': 'Not Acceptable',
- '407': 'Proxy Authentication Required',
- '408': 'Request Time-out',
- '409': 'Conflict',
- '410': 'Gone',
- '411': 'Length Required',
- '412': 'Precondition Failed',
- '413': 'Request Entity Too Large',
- '414': 'Request-URI Too Large',
- '415': 'Unsupported Media Type',
- '416': 'Requested range not satisfiable',
- '417': 'Expectation Failed',
- '500': 'Internal Server Error',
- '501': 'Not Implemented',
- '502': 'Bad Gateway',
- '503': 'Service Unavailable',
- '504': 'Gateway Time-out',
- '505': 'HTTP Version not supported',
- }
- headers = None
- statusCode = None # 200 or...
- statusMessage = None
-
- def __init__(self, httpRequestHandler, statusCode, statusMessage = None, headers = None,
- body = None):
- self.statusCode = statusCode
- if statusMessage:
- self.statusMessage = statusMessage
- else:
- self.statusMessage = self.defaultStatusMessages.get(statusCode)
- httpResponseHeaders = httpRequestHandler.site.httpResponseHeaders
- if headers:
- httpResponseHeaders = httpResponseHeaders.copy()
- for name, value in headers.iteritems():
- httpResponseHeaders[name] = value
- if httpResponseHeaders:
- self.headers = httpResponseHeaders
- if body:
- self.body = body
-
- def send(self, httpRequestHandler):
- raise NotImplementedError
-
-
-class HttpRequestHandlerMixin:
- httpRequest = None
- HttpResponse = None # Class
- httpResponse = None
- session = None
- user = None
- site = None # The virtual host
-
- def createSession(self):
- session = self.site.newSession()
- self.session = session
- return session
-
- def createUser(self):
- user = self.site.newUser()
- self.user = user
- return user
-
- def respond(self, statusCode = 200, statusMessage = None, headers = None, body = None):
- self.httpResponse = self.HttpResponse(
- self, statusCode, statusMessage = statusMessage, headers = headers, body = body)
-
- # Session and user must be saved before responding. Otherwise, when the server is
- # multitasked or multithreaded, it may receive a new HTTP request before the session is
- # saved.
- if self.session is not None and self.session.isDirty:
- self.session.save()
- if self.user is not None and self.user.isDirty:
- self.user.save()
-
- return self.httpResponse.send(self)
-
- def respondRedirectTemporarily(self, url):
- raise NotImplementedError
-
-
-class WebClientMixin:
- httpRequestHeaders = {
- 'User-Agent': 'LassoSimulator/0.0.0',
- 'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html',
- }
-
- def __init__(self):
- pass
-
-
-class WebSessionMixin(WebClientMixin):
- isDirty = True
- token = None
- userId = None # ID of logged user
-
- def __init__(self, token):
- WebClientMixin.__init__(self)
- self.token = token
-
- def getSimpleLabel(self):
- return self.token
-
- def save(self):
- pass
-
- simpleLabel = property(getSimpleLabel)
-
-
-class WebSiteMixin:
- FunctionHttpRequest = FunctionHttpRequest # Class
- httpResponseHeaders = {
- 'Server': 'Lasso Simulator Web Server',
- }
- lastSessionToken = 0
- lastUserId = 0
- users = None
- sessions = None
- WebSession = None # Class
- WebUser = None # Class
-
- def __init__(self):
- self.users = {}
- self.sessions = {}
-
- def authenticateX509User(self, clientCertificate):
- # We should check certificate (for example clientCertificate.get_serial_number()
- # and return the user if one matches, or None otherwise.
- return None
-
- def authenticateLoginPasswordUser(self, login, password):
- # We should check login & password and return the user if one matches or None otherwise.
- return None
-
- def callHttpFunction(self, function, httpRequestHandler, **queryFields):
- httpRequestHandler.httpRequest = self.FunctionHttpRequest(
- httpRequestHandler.httpRequest, **queryFields)
- try:
- result = function(httpRequestHandler)
- finally:
- httpRequestHandler.httpRequest = httpRequestHandler.httpRequest.previousHttpRequest
- return result
-
- def handleHttpRequestHandler(self, httpRequestHandler):
- methodName = httpRequestHandler.httpRequest.path.replace('/', '')
- try:
- method = getattr(self, methodName)
- except AttributeError:
- return httpRequestHandler.respond(
- 404, 'Path "%s" Not Found.' % httpRequestHandler.httpRequest.path)
- return method(httpRequestHandler)
-
- def login(self, handler):
- # On most site (except Liberty service providers), authentication is done locally.
- return self.callHttpFunction(self.login_local, handler)
-
- def login_done(self, handler, userAuthenticated, authenticationMethod):
- if not userAuthenticated:
- return self.login_failed(handler)
- return handler.respond(
- 200, headers = {'Content-Type': 'text/plain'},
- body = 'Login terminated:\n userAuthenticated = %s\n authenticationMethod = %s' % (
- userAuthenticated, authenticationMethod))
-
- def login_failed(self, handler):
- return handler.respond(401, 'Access Unauthorized: User has no account.')
-
- def login_local(self, handler):
- # Note: Once local login is done, the HTTP function login_done must be called.
- raise NotImplementedError
-
- def newSession(self):
- self.lastSessionToken += 1
- sessionToken = str(self.lastSessionToken)
- session = self.WebSession(sessionToken)
- self.sessions[sessionToken] = session
- return session
-
- def newUser(self, name = None):
- if name is None:
- self.lastUserId += 1
- userId = str(self.lastUserId)
- else:
- userId = name
- user = self.WebUser(userId, name = name)
- self.users[userId] = user
- return user
-
-
-class WebUserMixin:
- isDirty = True
- name = None
- sessionToken = None
- uniqueId = None
-
- def __init__(self, uniqueId, name = None):
- self.uniqueId = uniqueId
- if name:
- self.name = name
-
- def getSimpleLabel(self):
- if self.name:
- return self.name
- else:
- return 'Anonymous User #%s' % self.uniqueId
-
- def save(self):
- pass
-
- simpleLabel = property(getSimpleLabel)
diff --git a/python/tests/assertions.py b/python/tests/assertions.py
deleted file mode 100644
index c142ef1b..00000000
--- a/python/tests/assertions.py
+++ /dev/null
@@ -1,126 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Assertion functions.
-# By: Frederic Peters <fpeters@entrouvert.com>
-# Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://www.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-#
-#
-# Original code taken from Python2.3 unittest module:
-#
-# Copyright (c) 1999, 2000, 2001 Steve Purcell
-# This module is free software, and you may redistribute it and/or modify
-# it under the same terms as Python itself, so long as this copyright message
-# and disclaimer are retained in their original form.
-#
-# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
-# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
-# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
-# DAMAGE.
-
-# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
-# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
-# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
-
-
-"""Assertion functions"""
-
-
-import builtins
-
-
-def fail(message = None):
- """Fail immediately, with the given message."""
- raise self.failureException, message
-
-
-def failIf(expression, message = None):
- """Fail the test if the expression is true."""
- if expression:
- raise self.failureException, message
-
-
-def failIfAlmostEqual(first, second, places = 7, message = None):
- """Fail if the two objects are equal as determined by their difference rounded to the given
- number of decimal places (default 7) and comparing to zero.
-
- Note that decimal places (from zero) is usually not the same as significant digits
- (measured from the most signficant digit).
- """
- if round(second - first, places) == 0:
- raise self.failureException, \
- (message or '%s == %s within %s places' % (`first`, `second`, `places`))
-
-
-def failIfEqual(first, second, message = None):
- """Fail if the two objects are equal as determined by the '==' operator."""
- if first == second:
- raise self.failureException, (message or '%s == %s' % (`first`, `second`))
-
-
-def failUnless(expression, message = None):
- """Fail the test unless the expression is true."""
- if not expression:
- raise self.failureException, message
-
-
-def failUnlessAlmostEqual(first, second, places = 7, message = None):
- """Fail if the two objects are unequal as determined by their difference rounded to the given
- number of decimal places (default 7) and comparing to zero.
-
- Note that decimal places (from zero) is usually not the same as significant digits (measured
- from the most signficant digit).
- """
- if round(second - first, places) != 0:
- raise self.failureException, \
- (message or '%s != %s within %s places' % (`first`, `second`, `places` ))
-
-
-def failUnlessEqual(first, second, message = None):
- """Fail if the two objects are unequal as determined by the '==' operator."""
- if not first == second:
- raise self.failureException, \
- (message or '%s != %s' % (`first`, `second`))
-
-
-def failUnlessRaises(exceptionClass, callableObject, *arguments, **keywordArguments):
- """Fail unless an exception of class exceptionClass is thrown by callableObject when invoked
- with arguments arguments and keyword arguments keywordArguments. If a different type of
- exception is thrown, it will not be caught, and the test case will be deemed to have suffered
- an error, exactly as for an unexpected exception.
- """
- try:
- callableObject(*arguments, **keywordArguments)
- except exceptionClass:
- return
- else:
- if hasattr(exceptionClass, '__name__'):
- exceptionName = exceptionClass.__name__
- else:
- exceptionName = str(exceptionClass)
- raise self.failureException, exceptionName
-
-
-allGlobals = globals()
-for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless',
- 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'):
- builtins.set(name, allGlobals[name])
diff --git a/python/tests/builtins.py b/python/tests/builtins.py
deleted file mode 100644
index feb18719..00000000
--- a/python/tests/builtins.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Built-in variables access functions
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://www.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-"""Built-in variables access functions
-
-Allow the declaration of global variables accessible from everywhere
-(in opposition to global variables which are only easily accessible inside
-the module they are declared in).
-"""
-
-
-import __builtin__
-
-
-def delete(name):
- del __builtin__.__dict__[name]
-
-
-def get(name, default = None):
- return __builtin__.__dict__.get(name, default)
-
-
-def set(name, value):
- __builtin__.__dict__[name] = value
diff --git a/python/tests/http.py b/python/tests/http.py
deleted file mode 100644
index 27e8a99a..00000000
--- a/python/tests/http.py
+++ /dev/null
@@ -1,935 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# HTTP Client and Server Enhanced Classes
-# By: Frederic Peters <fpeters@entrouvert.com>
-# Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://www.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-
-
-"""HTTP client and server enhanced classes
-
-Features:
-- HTTPS using OpenSSL;
-- web sessions (with or without cookie);
-- user authentication (support of basic HTTP-authentication, X.509v3 certificate authentication,
- HTML based authentication, etc).
-"""
-
-
-import base64
-import BaseHTTPServer
-import Cookie
-import cStringIO
-import gzip
-import httplib
-import os
-import socket
-import SocketServer
-import sys
-import time
-
-try:
- from OpenSSL import SSL
-except ImportError:
- SSL = None
-
-import abstractweb
-import submissions
-
-
-try:
- logger
-except NameError:
- logger = None
-if logger is None:
- import logging as logger
-
-
-class BaseHTTPSRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
- def setup(self):
- """
- We need to use socket._fileobject Because SSL.Connection
- doesn't have a 'dup'. Not exactly sure WHY this is, but
- this is backed up by comments in socket.py and SSL/connection.c
- """
-
- self.connection = self.request # for doPOST
- self.rfile = socket._fileobject(self.request, 'rb', self.rbufsize)
- self.wfile = socket._fileobject(self.request, 'wb', self.wbufsize)
-
-
-class BaseHTTPSServer(SocketServer.TCPServer):
- def __init__(self, server_address, RequestHandlerClass, privateKeyFilePath,
- certificateFilePath, peerCaCertificateFile = None,
- certificateChainFilePath = None, verifyClient = None):
- SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
- self.verifyClient = verifyClient
-
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- nVerify = SSL.VERIFY_NONE
- ctx.set_options(SSL.OP_NO_SSLv2)
-
- ctx.use_privatekey_file(privateKeyFilePath)
- ctx.use_certificate_file(certificateFilePath)
- if peerCaCertificateFile:
- ctx.load_verify_locations(peerCaCertificateFile)
- if certificateChainFilePath:
- ctx.use_certificate_chain_file(certificateChainFilePath)
- if verifyClient == 'require':
- nVerify |= SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
- elif verifyClient in ('optional', 'optional_on_ca'):
- nVerify |= SSL.VERIFY_PEER
- ctx.set_verify(nVerify, self.verifyCallback)
-
- self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
- self.server_bind()
- self.server_activate()
-
- def verifyCallback(self, connection, x509Object, errorNumber, errorDepth, returnCode):
- logger.info('http.HttpsConnection(%s, %s, %s, %s, %s, %s)' % (
- self, connection, x509Object, errorNumber, errorDepth, returnCode))
- return returnCode
-
- #~ def server_bind(self):
- #~ """Override server_bind to store the server name."""
-
- #~ SocketServer.TCPServer.server_bind(self)
- #~ host, port = self.socket.getsockname()[:2]
- #~ self.server_name = socket.getfqdn(host)
- #~ self.server_port = port
-
-
-class HttpRequest(abstractweb.HttpRequestMixin, object):
- handler = None
- submission = None
-
- def __init__(self, handler):
- self.handler = handler
- self.submission = submissions.readSubmission(self.handler)
-
- def getBody(self):
- return self.submission.readFile()
-
- def getHeaders(self):
- return self.handler.headers
-
- def getMethod(self):
- return self.handler.command
-
- def getPath(self):
- return self.pathAndQuery.split('?', 1)[0]
-
- def getPathAndQuery(self):
- return self.handler.path
-
- def getQuery(self):
- splitedPathAndQuery = self.pathAndQuery.split('?', 1)
- if len(splitedPathAndQuery) > 1:
- return splitedPathAndQuery[1]
- else:
- return ''
-
- def getScheme(self):
- return self.handler.scheme
-
- def getUrl(self):
- return "%s://%s%s" % (self.scheme, self.headers.get('Host'), self.pathAndQuery)
-
- body = property(getBody)
- headers = property(getHeaders)
- method = property(getMethod)
- path = property(getPath)
- pathAndQuery = property(getPathAndQuery)
- query = property(getQuery)
- scheme = property(getScheme)
- url = property(getUrl)
-
-
-class HttpResponse(abstractweb.HttpResponseMixin, object):
- def send(self, httpRequestHandler):
- statusCode = self.statusCode
- if statusCode == 401:
- return self.send401(httpRequestHandler)
- elif statusCode == 404:
- return self.send404(httpRequestHandler)
- assert statusCode in (200, 207)
- if self.headers is None:
- headers = {}
- else:
- headers = self.headers.copy()
- if time.time() > httpRequestHandler.socketCreationTime + 300:
- headers['Connection'] = 'close'
- elif not httpRequestHandler.close_connection:
- headers['Connection'] = 'Keep-Alive'
- # TODO: Could also output Content-MD5.
- lastModified = headers.get('Last-Modified')
- ifModifiedSince = httpRequestHandler.httpRequest.headers.get('If-Modified-Since')
- if lastModified and ifModifiedSince:
- # We don't want to use bandwith if the file was not modified.
- try:
- lastModifiedTime = time.strptime(lastModified[:25], '%a, %d %b %Y %H:%M:%S')
- ifModifiedSinceTime = time.strptime(ifModifiedSince[:25], '%a, %d %b %Y %H:%M:%S')
- if lastModifiedTime[:8] <= ifModifiedSinceTime[:8]:
- httpRequestHandler.send_response(304, 'Not Modified.')
- for key in ('Connection', 'Content-Location'):
- if key in headers:
- httpRequestHandler.send_header(key, headers[key])
- httpRequestHandler.setCookie()
- httpRequestHandler.end_headers()
- return
- except (ValueError, KeyError):
- pass
- data = self.body
- if data is None:
- data = ''
- if isinstance(data, basestring):
- dataFile = None
- dataSize = len(data)
- else:
- dataFile = data
- data = ''
- if hasattr(dataFile, 'fileno'):
- dataSize = os.fstat(dataFile.fileno())[6]
- else:
- # For StringIO and cStringIO classes.
- dataSize = len(dataFile.getvalue())
- if dataFile is not None:
- assert not data
- data = dataFile.read(1048576) # Read first MB chunk
- contentType = headers.get('Content-Type')
- if contentType and contentType.split(';')[0] == 'text/html' and data.startswith('<?xml'):
- # Internet Explorer 6 renders the page differently when they start with <?xml...>, so
- # skip it.
- i = data.find('\n')
- if i > 0:
- data = data[i + 1:]
- else:
- i = data.find('>')
- if i > 0:
- data = data[i + 1:]
- dataSize -= i + 1
- # Compress data if possible and if data is not too big.
- acceptEncoding = httpRequestHandler.httpRequest.headers.get('Accept-Encoding', '')
- if 0 < dataSize < 1048576 and 'gzip' in acceptEncoding \
- and 'gzip;q=0' not in acceptEncoding:
- # Since dataSize < 1 MB, the data is fully contained in string.
- zbuf = cStringIO.StringIO()
- zfile = gzip.GzipFile(mode = 'wb', fileobj = zbuf)
- zfile.write(data)
- zfile.close()
- data = zbuf.getvalue()
- dataSize = len(data)
- headers['Content-Encoding'] = 'gzip'
- headers['Content-Length'] = '%d' % dataSize
- statusMessages = {
- 200: 'OK',
- 207: 'Multi-Status',
- }
- assert statusCode in statusMessages, 'Unknown status code %d.' % statusCode
- if httpRequestHandler.httpAuthenticationLogoutTrick and statusCode == 200:
- statusCode = 401
- statusMessage = 'Access Unauthorized'
- headers['WWW-Authenticate'] = 'Basic realm="%s"' % httpRequestHandler.realm
- else:
- statusMessage = statusMessages[statusCode]
- httpRequestHandler.send_response(statusCode, statusMessage)
- for key, value in headers.items():
- httpRequestHandler.send_header(key, value)
- httpRequestHandler.setCookie()
- httpRequestHandler.end_headers()
- if httpRequestHandler.httpRequest.method != 'HEAD' and dataSize > 0:
- outputFile = httpRequestHandler.wfile
- if data:
- outputFile.write(data)
- if dataFile is not None:
- while True:
- chunk = dataFile.read(1048576) # 1 MB chunk
- if not chunk:
- break
- outputFile.write(chunk)
-
- def send401(self, httpRequestHandler):
- logger.info(self.statusMessage)
- data = '<html><body>%s</body></html>' % self.statusMessage
- headers = {}
- if httpRequestHandler.useHttpAuthentication == 'not this time':
- del httpRequestHandler.useHttpAuthentication
- if httpRequestHandler.useHttpAuthentication != True:
- httpRequestHandler.useHttpAuthentication = True
- elif httpRequestHandler.useHttpAuthentication:
- headers["WWW-Authenticate"] = 'Basic realm="%s"' % httpRequestHandler.realm
- return httpRequestHandler.send_error(
- self.statusCode, self.statusMessage, data, headers, setCookie = True)
-
- def send404(self, httpRequestHandler):
- logger.info(self.statusMessage)
- data = '<html><body>%s</body></html>' % self.statusMessage
- return httpRequestHandler.send_error(
- self.statusCode, self.statusMessage, data, setCookie = True)
-
-
-class HttpRequestHandlerMixin(abstractweb.HttpRequestHandlerMixin):
- canUseCookie = False
- cookie = None
- httpAuthenticationLogoutTrick = False
- HttpResponse = HttpResponse # Class
- socketCreationTime = None
- protocol_version = 'HTTP/1.1'
- realm = 'HttpRequestHandlerMixin Web Site'
- server_version = 'HttpRequestHandlerMixin/1.0'
- site = None # Class variable
- testCookieSupport = False
- useHttpAuthentication = True
-
- def createSession(self):
- session = abstractweb.HttpRequestHandlerMixin.createSession(self)
- if self.canUseCookie:
- self.testCookieSupport = True
- return session
-
- def handle(self):
- """Handle multiple requests if necessary."""
- self.socketCreationTime = time.time()
- try:
- try:
- self.close_connection = True
- self.handle_one_request()
- while not self.close_connection:
- self.handle_one_request()
- except socket.timeout:
- pass
- except KeyboardInterrupt:
- raise
- except SSL.ZeroReturnError:
- pass
- except SSL.Error, exception:
- logger.debug('SSL error in handle. Error = %s, %s' % (exception, exception[0]))
- raise # FIXME
- if exception[0] == ('PEM routines', 'PEM_read_bio', 'no start line'):
- pass
- else:
- self.outputUnknownException()
- except:
- self.outputUnknownException()
- finally:
- del self.socketCreationTime
-
- def handle_one_request(self):
- """Handle a single HTTP request."""
- self.raw_requestline = self.rfile.readline()
- if not self.raw_requestline:
- self.close_connection = True
- return
- if not self.parse_request(): # An error code has been sent, just exit
- return
- logger.info(self.raw_requestline.strip())
- logger.debug(str(self.headers))
-
- # The server isn't forked nor threaded, so we don't want to keep connections open, to avoid
- # dead-locks which occur for example when the connection with the navigator to the identity
- # provider is kept open, while a service provider sends a SOAP request to the identity
- # provider.
- # Remove this line for forked or threaded servers.
- self.close_connection = True
-
- self.httpRequest = HttpRequest(self)
-
- # Retrieve the session and user, if possible.
-
- session = None
- sessionToken = None
- user = None
-
- # Handle X.509 certificate authentication.
- if hasattr(self.connection, 'get_peer_certificate'):
- clientCertificate = self.connection.get_peer_certificate()
- if clientCertificate:
- user = self.site.authenticateX509User(clientCertificate)
- if user is None:
- logger.info('Unknown certificate (serial number = %s)'
- % clientCertificate.get_serial_number())
- else:
- sessionToken = user.sessionToken
- if sessionToken is not None:
- session = self.site.sessions.get(sessionToken)
- if session is None:
- sessionToken = None
- del user.sessionToken
- else:
- # For security reasons, we want to minimize the publication of
- # session token (it is better not to store it in a cookie or in
- # URLs). The client need to send the certificate each time, for the
- # session to continue.
- if session.publishToken:
- del session.publishToken
-
- # Handle HTTP authentication.
- authorization = self.httpRequest.headers.get('authorization')
- if self.httpRequest.hasQueryField('login') and not authorization \
- and self.useHttpAuthentication:
- # Ask for HTTP authentication.
- return self.outputErrorUnauthorized(self.httpRequest.path)
- if self.httpRequest.hasQueryField('logout') and authorization:
- # Since HTTP authentication provides no way to logout, we send a status
- # Unauthorized to force the user to press the cancel button. But instead of
- # sending an error page immediately, we send the real page, so the user will see
- # the page instead of an error message.
- authorization = None
- self.httpAuthenticationLogoutTrick = True
- if authorization:
- try:
- authenticationScheme, credentials = authorization.split(None, 1)
- except ValueError:
- return self.outputErrorUnauthorized(self.httpRequest.path)
- authenticationScheme = authenticationScheme.lower()
- if authenticationScheme == 'basic':
- loginAndPassword = base64.decodestring(credentials)
- try:
- login, password = loginAndPassword.split(':', 1)
- except:
- login = loginAndPassword
- password = ''
- logger.debug('Basic authentication: login = "%s" / password = "%s"' % (
- login, password))
- if password:
- user = self.site.authenticateLoginPasswordUser(login, password)
- if user is None:
- logger.info('Unknown user (login = "%s" / password = "%s")' % (
- login, password))
- return self.outputErrorUnauthorized(self.httpRequest.path)
- else:
- sessionToken = user.sessionToken
- if sessionToken is not None:
- session = self.site.sessions.get(sessionToken)
- if session is None:
- sessionToken = None
- del user.sessionToken
- else:
- # For security reasons, we want to minimize the publication of
- # session token (it is better not to store it in a cookie or in
- # URLs). The client need to send the certificate each time, for the
- # session to continue.
- if session.publishToken:
- del session.publishToken
- elif login:
- # No password was given. Assume login contains a session token.
- # TODO: sanity chek on login
- sessionToken = login
- session = self.site.sessions.get(sessionToken)
- if session is not None and session.userId is not None:
- user = self.site.users.get(session.userId)
- if user is not None and user.sessionToken != session.token:
- # Sanity check.
- user.sessionToken = session.token
- else:
- logger.info('Unknown authentication scheme = %s' % authenticationScheme)
- return self.outputErrorUnauthorized(self.httpRequest.path)
-
- # Handle use of cookies, session and user.
- cookie = None
- cookieContent = {}
- if self.httpRequest.headers.has_key('Cookie'):
- logger.debug('Cookie received:')
- cookie = Cookie.SimpleCookie(
- self.httpRequest.headers['Cookie'])
- for k, v in cookie.items():
- cookieContent[k] = v.value
- logger.debug(' %s = %s' % (k, cookieContent[k]))
- self.cookie = cookie
-
- sessionToken = None
- sessionTokenInCookie = False
- if self.httpRequest.hasQueryField('sessionToken'):
- sessionToken = self.httpRequest.getQueryField('sessionToken')
- if not sessionToken:
- sessionToken = None
- if session is not None and sessionToken != session.token:
- sessionToken = None
- if cookieContent.has_key('sessionToken'):
- cookieSessionToken = cookieContent['sessionToken']
- if cookieSessionToken:
- if session is None or cookieSessionToken == session.token:
- if sessionToken is None:
- sessionToken = cookieSessionToken
- if cookieSessionToken == sessionToken:
- sessionTokenInCookie = True
- canUseCookie = True
- if session is None and sessionToken is not None:
- session = self.site.sessions.get(sessionToken)
- if session is None:
- sessionToken = None
- sessionTokenInCookie = False
- else:
- if user is None:
- if session.userId is not None:
- user = self.site.users.get(session.userId)
- if user is not None and user.sessionToken != sessionToken:
- # Sanity check.
- user.sessionToken = sessionToken
- else:
- # The user has been authenticated (using HTTP or X.509 authentication), but the
- # associated session didn't exist (or was too old, or...). So, update
- # its sessionToken.
- user.sessionToken = sessionToken
- # For security reasons, we want to minimize the publication of session
- # token (it is better not to store it in a cookie or in URLs).
- if session.publishToken:
- del session.publishToken
- if session is None and user is not None:
- # The user has been authenticated (using HTTP or X.509 authentication), but the session
- # doesn't exist yet (or was too old, or...). Create a new session.
- session = self.createSession()
- # For security reasons, we want to minimize the publication of session
- # token (it is better not to store it in a cookie or in URLs).
- # session.publishToken = False # False is the default value.
- session.userId = user.uniqueId
- user.sessionToken = session.token
- else:
- self.session = session
- if session is not None:
- if not sessionTokenInCookie:
- # The sessionToken is valid but is not stored in the cookie. So, don't try to
- # use cookie.
- canUseCookie = False
- logger.debug('Session: %s' % session.simpleLabel)
- self.canUseCookie = canUseCookie
- self.user = user
- if user is not None:
- logger.debug('User: %s' % user.simpleLabel)
-
- # Now, the HTTP request handler has done everything it could done. Transfer the processing
- # to the site.
-
- try:
- self.site.handleHttpRequestHandler(self)
- except IOError:
- logger.exception('An exception occured:')
- path = self.path.split('?')[0]
- return self.outputErrorNotFound(path)
-
- def log_message(self, format, *arguments):
- """Override BaseHTTPServer.HttpRequestHandler method to use logger.
-
- Do not use. Use logger instead.
- """
-
- logger.info('%s - - [%s] %s' % (
- self.address_string(), self.log_date_time_string(), format % arguments))
-
-## def outputAlert(self, data, title = None, url = None):
-## import html
-## if title is None:
-## title = N_('Alert')
-## # FIXME: Handle XSLT template.
-## if url:
-## buttonsBar = html.div(class_ = 'buttons-bar')
-## actionButtonsBar = html.span(class_ = 'action-buttons-bar')
-## buttonsBar.append(actionButtonsBar)
-## actionButtonsBar.append(html.a(_('OK'), class_ = 'button', href = url))
-## else:
-## buttonsBar = None
-## layout = html.html(
-## html.head(html.title(_(title))),
-## html.body(
-## html.p(_(data), class_ = 'alert'),
-## buttonsBar,
-## ),
-## )
-## self.outputData(layout.serialize(), contentLocation = None, mimeType = 'text/html')
-
-## def outputData(self, data, contentLocation = None, headers = None, mimeType = None,
-## modificationTime = None, successCode = 200):
-## # Session and user must be saved before responding. Otherwise, when the server is
-## # multitasked or multithreaded, it may receive a new HTTP request before the session is
-## # saved.
-## if self.session is not None and self.session.isDirty:
-## self.session.save()
-## if self.user is not None and self.user.isDirty:
-## self.user.save()
-
-## if isinstance(data, basestring):
-## dataFile = None
-## dataSize = len(data)
-## else:
-## dataFile = data
-## data = ''
-## if hasattr(dataFile, 'fileno'):
-## dataSize = os.fstat(dataFile.fileno())[6]
-## else:
-## # For StringIO and cStringIO classes.
-## dataSize = len(dataFile.getvalue())
-
-## if headers is None:
-## headers = {}
-## if time.time() > self.socketCreationTime + 300:
-## headers['Connection'] = 'close'
-## elif not self.close_connection:
-## headers['Connection'] = 'Keep-Alive'
-## if contentLocation is not None:
-## headers['Content-Location'] = contentLocation
-## if mimeType:
-## headers['Content-Type'] = '%s; charset=utf-8' % mimeType
-## if modificationTime:
-## headers['Last-Modified'] = time.strftime('%a, %d %b %Y %H:%M:%S GMT', modificationTime)
-## # TODO: Could also output Content-MD5.
-## ifModifiedSince = self.headers.get('If-Modified-Since')
-## if modificationTime and ifModifiedSince:
-## # We don't want to use bandwith if the file was not modified.
-## try:
-## ifModifiedSinceTime = time.strptime(ifModifiedSince[:25], '%a, %d %b %Y %H:%M:%S')
-## if modificationTime[:8] <= ifModifiedSinceTime[:8]:
-## self.send_response(304, 'Not Modified.')
-## for key in ('Connection', 'Content-Location'):
-## if key in headers:
-## self.send_header(key, headers[key])
-## self.setCookie()
-## self.end_headers()
-## return
-## except (ValueError, KeyError):
-## pass
-## if dataFile is not None:
-## assert not data
-## data = dataFile.read(1048576) # Read first MB chunk
-## if mimeType == 'text/html' and data.startswith('<?xml'):
-## # Internet Explorer 6 renders the page differently when they start with <?xml...>, so
-## # skip it.
-## i = data.find('\n')
-## if i > 0:
-## data = data[i + 1:]
-## else:
-## i = data.find('>')
-## if i > 0:
-## data = data[i + 1:]
-## dataSize -= i + 1
-## # Compress data if possible and if data is not too big.
-## acceptEncoding = self.headers.get('Accept-Encoding', '')
-## if 0 < dataSize < 1048576 and 'gzip' in acceptEncoding \
-## and 'gzip;q=0' not in acceptEncoding:
-## # Since dataSize < 1 MB, the data is fully contained in string.
-## zbuf = cStringIO.StringIO()
-## zfile = gzip.GzipFile(mode = 'wb', fileobj = zbuf)
-## zfile.write(data)
-## zfile.close()
-## data = zbuf.getvalue()
-## dataSize = len(data)
-## headers['Content-Encoding'] = 'gzip'
-## headers['Content-Length'] = '%d' % dataSize
-## successMessages = {
-## 200: 'OK',
-## 207: 'Multi-Status',
-## }
-## assert successCode in successMessages, 'Unknown success code %d.' % successCode
-## if self.httpAuthenticationLogoutTrick and successCode == 200:
-## successCode = 401
-## successMessage = 'Access Unauthorized'
-## headers['WWW-Authenticate'] = 'Basic realm="%s"' % self.realm
-## else:
-## successMessage = successMessages[successCode]
-## self.send_response(successCode, successMessage)
-## for key, value in headers.items():
-## self.send_header(key, value)
-## self.setCookie()
-## self.end_headers()
-## if self.httpRequest.method != 'HEAD' and dataSize > 0:
-## outputFile = self.wfile
-## if data:
-## outputFile.write(data)
-## if dataFile is not None:
-## while True:
-## chunk = dataFile.read(1048576) # 1 MB chunk
-## if not chunk:
-## break
-## outputFile.write(chunk)
-## return
-
-## def outputErrorAccessForbidden(self, filePath):
-## if filePath is None:
-## message = 'Access Forbidden'
-## else:
-## message = 'Access to "%s" Forbidden.' % filePath
-## logger.info(message)
-## data = '<html><body>%s</body></html>' % message
-## return self.send_error(403, message, data, setCookie = True)
-
-## def outputErrorBadRequest(self, reason):
-## if reason:
-## message = 'Bad Request: %s' % reason
-## else:
-## message = 'Bad Request'
-## logger.info(message)
-## data = '<html><body>%s</body></html>' % message
-## return self.send_error(400, message, data)
-
- def outputErrorInternalServer(self):
- message = 'Internal Server Error'
- logger.info(message)
- data = '<html><body>%s</body></html>' % message
- return self.send_error(500, message, data)
-
-## def outputErrorMethodNotAllowed(self, reason):
-## if reason:
-## message = 'Method Not Allowed: %s' % reason
-## else:
-## message = 'Method Not Allowed'
-## logger.info(message)
-## data = '<html><body>%s</body></html>' % message
-## # This error doesn't need a pretty interface.
-## # FIXME: Add an 'Allow' header containing a list of valid methods for the requested
-## # resource.
-## return self.send_error(405, message, data)
-
-## def outputErrorNotFound(self, filePath):
-## if filePath is None:
-## message = 'Not Found'
-## else:
-## message = 'Path "%s" Not Found.' % filePath
-## logger.info(message)
-## data = '<html><body>%s</body></html>' % message
-## return self.send_error(404, message, data, setCookie = True)
-
- def outputErrorUnauthorized(self, filePath):
- if filePath is None:
- message = 'Access Unauthorized'
- else:
- message = 'Access to "%s" Unauthorized.' % filePath
- logger.info(message)
- data = '<html><body>%s</body></html>' % message
- headers = {}
- if self.useHttpAuthentication:
- headers["WWW-Authenticate"] = 'Basic realm="%s"' % self.realm
- return self.send_error(401, message, data, headers, setCookie = True)
-
-## def outputInformationContinue(self):
-## message = 'Continue'
-## logger.debug(message)
-## self.send_response(100, message)
-
-## def outputSuccessCreated(self, filePath):
-## if filePath is None:
-## message = 'Created'
-## else:
-## message = 'File "%s" Created.' % filePath
-## logger.debug(message)
-## data = '<html><body>%s</body></html>' % message
-## self.send_response(201, message)
-## if time.time() > self.socketCreationTime + 300:
-## self.send_header('Connection', 'close')
-## elif not self.close_connection:
-## self.send_header('Connection', 'Keep-Alive')
-## self.send_header('Content-Type', 'text/html; charset=utf-8')
-## self.send_header('Content-Length', '%d' % len(data))
-## self.setCookie()
-## self.end_headers()
-## if self.httpRequest.method != 'HEAD':
-## self.wfile.write(data)
-
-## def outputSuccessNoContent(self):
-## message = 'No Content'
-## logger.debug(message)
-## self.send_response(204, message)
-## if time.time() > self.socketCreationTime + 300:
-## self.send_header('Connection', 'close')
-## elif not self.close_connection:
-## self.send_header('Connection', 'Keep-Alive')
-## self.setCookie()
-## self.end_headers()
-
- def outputUnknownException(self):
- import traceback, cStringIO
- f = cStringIO.StringIO()
- traceback.print_exc(file = f)
- exceptionTraceback = f.getvalue()
- exceptionType, exception = sys.exc_info()[:2]
- logger.debug("""\
-An exception "%(exception)s" of class "%(exceptionType)s" occurred.
-
-%(traceback)s
-""" % {
- 'exception': exception,
- 'exceptionType': exceptionType,
- 'traceback': exceptionTraceback,
- })
- return self.outputErrorInternalServer()
-
- def respondRedirectTemporarily(self, url):
- # Session and user must be saved before responding. Otherwise, when the server is
- # multitasked or multithreaded, it may receive a new HTTP request before the session is
- # saved.
- if self.session is not None and self.session.isDirty:
- self.session.save()
- if self.user is not None and self.user.isDirty:
- self.user.save()
-
- message = 'Moved Temporarily to "%s".' % url
- logger.debug(message)
- data = '<html><body>%s</body></html>' % message
- self.send_response(302, message)
- self.send_header('Location', url)
- if time.time() > self.socketCreationTime + 300:
- self.send_header('Connection', 'close')
- elif not self.close_connection:
- self.send_header('Connection', 'Keep-Alive')
- self.send_header('Content-Type', 'text/html; charset=utf-8')
- self.send_header('Content-Length', '%d' % len(data))
- self.setCookie()
- self.end_headers()
- if self.httpRequest.method != 'HEAD':
- self.wfile.write(data)
-
- def send_error(self, code, message = None, data = None, headers = None, setCookie = False):
- # Session and user must be saved before responding. Otherwise, when the server is
- # multitasked or multithreaded, it may receive a new HTTP request before the session is
- # saved.
- if self.session is not None and self.session.isDirty:
- self.session.save()
- if self.user is not None and self.user.isDirty:
- self.user.save()
-
- shortMessage, longMessage = self.responses.get(code, ('???', '???'))
- if message is None:
- message = shortMessage
- if not data:
- explain = longMessage
- data = self.error_message_format % {
- 'code': code,
- 'message': message,
- 'explain': longMessage,
- }
- self.send_response(code, message)
- self.send_header('Content-Type', 'text/html; charset=utf-8')
- self.send_header('Content-Length', '%d' % len(data))
- self.send_header('Connection', 'close')
- if headers is not None:
- for name, value in headers.items():
- self.send_header(name, value)
- if setCookie:
- self.setCookie()
- self.end_headers()
- if self.httpRequest.method != 'HEAD' and code >= 200 and code not in (204, 304):
- self.wfile.write(data)
-
- def setCookie(self):
- if not self.canUseCookie:
- return
- oldCookie = self.cookie
- cookie = Cookie.SimpleCookie()
- cookieContent = {}
- session = self.session
- if session is not None and session.publishToken:
- cookieContent['sessionToken'] = session.token
- for key, value in cookieContent.items():
- cookie[key] = value
- cookie[key]['path'] = '/'
- if not cookieContent:
- if oldCookie:
- for key, morsel in oldCookie.items():
- cookie[key] = ''
- cookie[key]['max-age'] = 0
- cookie[key]['path'] = '/'
- else:
- cookie = None
- if cookie is not None:
- # Is new cookie different from previous one?
- sameCookie = False
- if oldCookie is not None and cookie.keys() == oldCookie.keys():
- for key, morsel in cookie.items():
- oldMorsel = oldCookie[key]
- if morsel.value != oldMorsel.value:
- break
- else:
- sameCookie = True
- if not sameCookie:
- for morsel in cookie.values():
- self.send_header(
- 'Set-Cookie', morsel.output(header = '')[1:])
- self.cookie = cookie
-
-
-class HttpRequestHandler(HttpRequestHandlerMixin, BaseHTTPServer.BaseHTTPRequestHandler):
- scheme = 'http'
-
-
-class HttpsConnection(httplib.HTTPConnection):
- certificateFile = None
- default_port = httplib.HTTPS_PORT
- peerCaCertificateFile = None
- privateKeyFile = None
-
- def __init__(self, host, port = None, privateKeyFile = None, certificateFile = None,
- peerCaCertificateFile = None, strict = None):
- httplib.HTTPConnection.__init__(self, host, port, strict)
- self.privateKeyFile = privateKeyFile
- self.certificateFile = certificateFile
- self.peerCaCertificateFile = peerCaCertificateFile
-
- def connect(self):
- """Connect to a host on a given (SSL) port."""
-
- context = SSL.Context(SSL.SSLv23_METHOD)
- # Demand a certificate.
- context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, self.verifyCallback)
- if self.privateKeyFile:
- context.use_privatekey_file(self.privateKeyFile)
- if self.certificateFile:
- context.use_certificate_file(self.certificateFile)
- if self.peerCaCertificateFile:
- context.load_verify_locations(self.peerCaCertificateFile)
-
- # Strange hack, that is derivated from httplib.HTTPSConnection, but that I (Emmanuel) don't
- # really understand...
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sslSocket = SSL.Connection(context, sock)
- sslSocket.connect((self.host, self.port))
- self.sock = httplib.FakeSocket(sslSocket, sslSocket)
-
- def verifyCallback(self, connection, x509Object, errorNumber, errorDepth, returnCode):
- logger.debug('http.HttpsConnection(%s, %s, %s, %s, %s, %s)' % (
- self, connection, x509Object, errorNumber, errorDepth, returnCode))
- # FIXME: What should be done?
- return returnCode
-
-
-class HttpsRequestHandler(HttpRequestHandlerMixin, BaseHTTPSRequestHandler):
- scheme = 'https'
-
-
-## # We use ForkingMixIn instead of ThreadingMixIn because the Python binding for
-## # libxml2 limits the number of registered xpath functions to 10. Even if we use
-## # only one xpathContext, this would limit the number of threads to 10, wich is
-## # not enough for a web server.
-
-
-## class HttpServer(SocketServer.ForkingMixIn, BaseHTTPServer.HTTPServer):
-## pass
-
-
-## class HttpsServer(SocketServer.ForkingMixIn, BaseHTTPSServer):
-## pass
-
-
-# No fork nor thread.
-
-class HttpServer(BaseHTTPServer.HTTPServer):
- pass
-
-
-class HttpsServer(BaseHTTPSServer):
- pass
-
diff --git a/python/tests/liberty.py b/python/tests/liberty.py
deleted file mode 100644
index 02125d39..00000000
--- a/python/tests/liberty.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-from LibertyEnabledClientProxy import LibertyEnabledClientProxyMixin
-from LibertyEnabledProxy import LibertyEnabledProxyMixin
-from IdentityProvider import IdentityProviderMixin
-from ServiceProvider import ServiceProviderMixin
-from Provider import ProviderMixin
-import web
-
-
-class LibertyEnabledClientProxy(LibertyEnabledClientProxyMixin, web.WebClient):
- def __init__(self):
- web.WebClient.__init__(self)
- LibertyEnabledClientProxyMixin.__init__(self)
-
-
-class LibertyEnabledProxy(LibertyEnabledProxyMixin, web.WebSite):
- def __init__(self, url):
- web.WebSite.__init__(self, url)
- LibertyEnabledProxyMixin.__init__(self)
-
-
-class Provider(ProviderMixin, web.WebSite):
- def __init__(self, url):
- web.WebSite.__init__(self, url)
- ProviderMixin.__init__(self)
-
-
-class IdentityProvider(IdentityProviderMixin, Provider):
- def __init__(self, url):
- Provider.__init__(self, url)
- IdentityProviderMixin.__init__(self)
-
-
-class ServiceProvider(ServiceProviderMixin, Provider):
- def __init__(self, url):
- Provider.__init__(self, url)
- ServiceProviderMixin.__init__(self)
diff --git a/python/tests/libertysimulator.py b/python/tests/libertysimulator.py
deleted file mode 100644
index a7751cdf..00000000
--- a/python/tests/libertysimulator.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-from LibertyEnabledClientProxy import LibertyEnabledClientProxyMixin
-from LibertyEnabledProxy import LibertyEnabledProxyMixin
-from IdentityProvider import IdentityProviderMixin
-from ServiceProvider import ServiceProviderMixin
-from Provider import ProviderMixin
-import websimulator
-
-
-class LibertyEnabledClientProxy(LibertyEnabledClientProxyMixin, websimulator.WebClient):
- def __init__(self, internet):
- websimulator.WebClient.__init__(self, internet)
- LibertyEnabledClientProxyMixin.__init__(self)
-
-
-class LibertyEnabledProxy(LibertyEnabledProxyMixin, websimulator.WebSite):
- def __init__(self, internet, url):
- websimulator.WebSite.__init__(self, internet, url)
- LibertyEnabledProxyMixin.__init__(self)
-
-
-class Provider(ProviderMixin, websimulator.WebSite):
- def __init__(self, internet, url):
- websimulator.WebSite.__init__(self, internet, url)
- ProviderMixin.__init__(self)
-
-
-class IdentityProvider(IdentityProviderMixin, Provider):
- def __init__(self, internet, url):
- Provider.__init__(self, internet, url)
- IdentityProviderMixin.__init__(self)
-
-
-class ServiceProvider(ServiceProviderMixin, Provider):
- def __init__(self, internet, url):
- Provider.__init__(self, internet, url)
- ServiceProviderMixin.__init__(self)
diff --git a/python/tests/login_tests.py b/python/tests/login_tests.py
index b16389dc..6c93fb99 100644
--- a/python/tests/login_tests.py
+++ b/python/tests/login_tests.py
@@ -34,50 +34,14 @@ if not '../.libs' in sys.path:
import lasso
-import builtins
-from libertysimulator import *
-from websimulator import *
-
class LoginTestCase(unittest.TestCase):
- def generateIdpSite(self, internet):
- site = IdentityProvider(internet, 'https://idp1')
- site.providerId = 'https://idp1/metadata'
+ pass
- lassoServer = lasso.Server(
- '../../tests/data/idp1-la/metadata.xml',
- None, # '../../tests/data/idp1-la/public-key.pem' is no more used
- '../../tests/data/idp1-la/private-key-raw.pem',
- '../../tests/data/idp1-la/certificate.pem',
- lasso.signatureMethodRsaSha1)
- lassoServer.add_provider(
- '../../tests/data/sp1-la/metadata.xml',
- '../../tests/data/sp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- site.lassoServerDump = lassoServer.dump()
- failUnless(site.lassoServerDump)
-
- site.newUser('Chantereau')
- site.newUser('Clapies')
- site.newUser('Febvre')
- site.newUser('Nowicki')
- # Frederic Peters has no account on identity provider.
- return site
- def generateLibertyEnabledClientProxy(self, internet):
- clientProxy = LibertyEnabledClientProxy(internet)
- lassoServer = lasso.Server()
- lassoServer.add_provider(
- '../../tests/data/idp1-la/metadata.xml',
- '../../tests/data/idp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- clientProxy.lassoServerDump = lassoServer.dump()
- failUnless(clientProxy.lassoServerDump)
- return clientProxy
-
- def generateSpSite(self, internet):
- site = ServiceProvider(internet, 'https://sp1')
- site.providerId = 'https://service-provider/metadata'
+class LogoutTestCase(unittest.TestCase):
+ def test01(self):
+ """SP logout without session and indentity; testing init_request."""
lassoServer = lasso.Server(
'../../tests/data/sp1-la/metadata.xml',
@@ -89,203 +53,36 @@ class LoginTestCase(unittest.TestCase):
'../../tests/data/idp1-la/metadata.xml',
'../../tests/data/idp1-la/public-key.pem',
'../../tests/data/ca1-la/certificate.pem')
- site.lassoServerDump = lassoServer.dump()
- failUnless(site.lassoServerDump)
-
- site.newUser('Nicolas')
- site.newUser('Romain')
- site.newUser('Valery')
- # Christophe Nowicki has no account on service provider.
- site.newUser('Frederic')
- return site
-
- def setUp(self):
- for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless',
- 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'):
- builtins.set(name, getattr(self, name))
-
- def tearDown(self):
- for name in ('fail', 'failIf', 'failIfAlmostEqual', 'failIfEqual', 'failUnless',
- 'failUnlessAlmostEqual', 'failUnlessRaises', 'failUnlessEqual'):
- builtins.delete(name)
-
- def test01(self):
- """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Romain Chantereau')
- principal.keyring[idpSite.url] = 'Chantereau'
- principal.keyring[spSite.url] = 'Romain'
-
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
- failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 200)
- failIf(spSite.sessions)
- failIf(idpSite.sessions)
-
- def test01_withRelayState(self):
- """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP. Checking RelayState."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Romain Chantereau')
- principal.keyring[idpSite.url] = 'Chantereau'
- principal.keyring[spSite.url] = 'Romain'
-
- httpResponse = principal.sendHttpRequestToSite(
- spSite, 'GET', '/login?RelayState=a_sample_relay_state')
- failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 200)
- failIf(spSite.sessions)
- failIf(idpSite.sessions)
+ logout = lasso.Logout(lassoServer, lasso.providerTypeSp)
+ try:
+ logout.init_request()
+ except lasso.Error, error:
+ if error.code != -1:
+ raise
+ else:
+ self.fail('logout.init_request without having set identity before should fail')
def test02(self):
- """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP. Done three times."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Romain Chantereau')
- principal.keyring[idpSite.url] = 'Chantereau'
- principal.keyring[spSite.url] = 'Romain'
-
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
- failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 200)
-
- # Once again. Now the principal already has a federation between spSite and idpSite.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
- failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 200)
+ """IDP logout without session and identity; testing logout.get_next_providerID."""
- # Once again. Do a new passive login between normal login and logout.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
- failUnlessEqual(httpResponse.statusCode, 200)
- del principal.keyring[idpSite.url] # Ensure identity provider will be really passive.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
- failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 200)
-
- # Once again, with isPassive and the user having no web session.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
- failUnlessEqual(httpResponse.statusCode, 401)
-
- def test03(self):
- """Service provider initiated login using HTTP redirect, but user fail to authenticate himself on identity provider. Then logout, with same problem."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Frederic Peters')
- # Frederic Peters has no account on identity provider.
- principal.keyring[spSite.url] = 'Frederic'
-
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
- failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 401)
-
- def test04(self):
- """Service provider initiated login using HTTP redirect, but user has no account on service
- provider and doesn't create one."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Christophe Nowicki')
- principal.keyring[idpSite.url] = 'Nowicki'
- # Christophe Nowicki has no account on service provider.
-
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
- failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 401)
-
- def test05(self):
- """Service provider initiated login using HTTP redirect with isPassive for a user without federation yet."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Romain Chantereau')
- principal.keyring[idpSite.url] = 'Chantereau'
- principal.keyring[spSite.url] = 'Romain'
-
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
- failUnlessEqual(httpResponse.statusCode, 401)
-
- def test06(self):
- """Testing forceAuthn flag."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Romain Chantereau')
- principal.keyring[idpSite.url] = 'Chantereau'
- principal.keyring[spSite.url] = 'Romain'
-
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
- failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 200)
-
- # Ask user to reauthenticate while he is already logged.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
- failUnlessEqual(httpResponse.statusCode, 200)
- del principal.keyring[idpSite.url] # Ensure user can't authenticate.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
- failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 200)
-
- # Force authentication, but user won't authenticate.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
- failUnlessEqual(httpResponse.statusCode, 401)
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logout')
- failUnlessEqual(httpResponse.statusCode, 401)
-
- def test07(self):
- """LECP login."""
-
- internet = Internet()
- idpSite = self.generateIdpSite(internet)
- spSite = self.generateSpSite(internet)
- spSite.idpSite = idpSite
- principal = Principal(internet, 'Romain Chantereau')
- principal.keyring[idpSite.url] = 'Chantereau'
- principal.keyring[spSite.url] = 'Romain'
- lecp = self.generateLibertyEnabledClientProxy(internet)
- lecp.idpSite = idpSite
-
- # Try LECP, but the principal is not authenticated on idp1. So, LECP must fail.
- httpResponse = lecp.login(principal, spSite, '/login')
- failUnlessEqual(httpResponse.statusCode, 401)
-
- # Now authenticate principal, before testing LECP. So, LECP must succeed.
- httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
- failUnlessEqual(httpResponse.statusCode, 200)
- httpResponse = lecp.login(principal, spSite, '/login')
- failUnlessEqual(httpResponse.statusCode, 200)
+ lassoServer = lasso.Server(
+ '../../tests/data/idp1-la/metadata.xml',
+ None, # '../../tests/data/idp1-la/public-key.pem' is no more used
+ '../../tests/data/idp1-la/private-key-raw.pem',
+ '../../tests/data/idp1-la/certificate.pem',
+ lasso.signatureMethodRsaSha1)
+ lassoServer.add_provider(
+ '../../tests/data/sp1-la/metadata.xml',
+ '../../tests/data/sp1-la/public-key.pem',
+ '../../tests/data/ca1-la/certificate.pem')
+ logout = lasso.Logout(lassoServer, lasso.providerTypeIdp)
+ self.failIf(logout.get_next_providerID())
suite1 = unittest.makeSuite(LoginTestCase, 'test')
+suite2 = unittest.makeSuite(LogoutTestCase, 'test')
-allTests = unittest.TestSuite((suite1,))
+allTests = unittest.TestSuite((suite1, suite2))
if __name__ == '__main__':
sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful())
diff --git a/python/tests/sample-idp.py b/python/tests/sample-idp.py
deleted file mode 100755
index 046aa436..00000000
--- a/python/tests/sample-idp.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#! /usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import logging
-from optparse import OptionParser
-import sys
-
-if not '..' in sys.path:
- sys.path.insert(0, '..')
-if not '../.libs' in sys.path:
- sys.path.insert(0, '../.libs')
-
-import lasso
-
-import assertions
-import builtins
-import http
-import liberty
-
-
-applicationCamelCaseName = 'LassoSimulator'
-applicationPublicName = 'Lasso Simulator'
-applicationVersion = '(Unreleased CVS Version)'
-logger = None
-
-
-class HttpRequestHandlerMixin:
- realm = '%s Web Site' % applicationPublicName
- server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion)
-
- def version_string(self):
- return '%s %s' % (applicationPublicName, applicationVersion)
-
-
-class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler):
- pass
-
-
-class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler):
- pass
-
-
-def main():
- # Parse command line options.
- parser = OptionParser(version = '%%prog %s' % applicationVersion)
- parser.add_option(
- '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath',
- help = 'specify an alternate configuration file',
- default = '/etc/lasso-simulator/config.xml')
- parser.add_option(
- '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background',
- action = 'store_true', default = False)
- parser.add_option(
- '-D', '--debug', dest = 'debugMode', help = 'enable program debugging',
- action = 'store_true', default = False)
- parser.add_option(
- '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file',
- default = '/dev/null')
- parser.add_option(
- '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel',
- help = 'specify log level (debug, info, warning, error, critical)', default = 'info')
- (options, args) = parser.parse_args()
- if options.logLevel.upper() not in logging._levelNames:
- raise Exception('Unknown log level: "%s"' % options.logLevel)
-
- # Configure logger.
- logger = logging.getLogger()
- if options.debugMode and not options.daemonMode:
- handler = logging.StreamHandler(sys.stderr)
- else:
- handler = logging.FileHandler(options.logFilePath)
- formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- logger.setLevel(logging._levelNames[options.logLevel.upper()])
- builtins.set('logger', logger)
-
- site = liberty.IdentityProvider('https://idp1:1998/')
- site.providerId = 'https://idp1/metadata'
-
- lassoServer = lasso.Server.new(
- '../../tests/data/idp1-la/metadata.xml',
- None, # '../../tests/data/idp1-la/public-key.pem' is no more used
- '../../tests/data/idp1-la/private-key-raw.pem',
- '../../tests/data/idp1-la/certificate.pem',
- lasso.signatureMethodRsaSha1)
- lassoServer.add_provider(
- '../../tests/data/sp1-la/metadata.xml',
- '../../tests/data/sp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- lassoServer.add_provider(
- '../../tests/data/lecp1-la/metadata.xml',
- '../../tests/data/lecp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- site.lassoServerDump = lassoServer.dump()
- failUnless(site.lassoServerDump)
- lassoServer.destroy()
-
- site.certificateAbsolutePath = '../../tests/data/idp1-ssl/certificate.pem'
- site.privateKeyAbsolutePath = '../../tests/data/idp1-ssl/private-key-raw.pem'
- site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem'
-
- site.newUser('Chantereau')
- site.newUser('Clapies')
- site.newUser('Febvre')
- site.newUser('Nowicki')
- # Frederic Peters has no account on identity provider.
-
- HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host.
-## httpServer = http.HttpServer(('idp1', 1997), HttpRequestHandler)
-## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname())
- httpServer = http.HttpsServer(
- ('idp1', 1998),
- HttpsRequestHandler,
- site.privateKeyAbsolutePath, # Server private key
- site.certificateAbsolutePath, # Server certificate
- site.peerCaCertificateAbsolutePath, # Clients certification authority certificate
- None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852
- None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13
- )
- logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname())
- try:
- httpServer.serve_forever()
- except KeyboardInterrupt:
- pass
-
-if __name__ == '__main__':
- main()
diff --git a/python/tests/sample-lep.py b/python/tests/sample-lep.py
deleted file mode 100755
index 89b8df27..00000000
--- a/python/tests/sample-lep.py
+++ /dev/null
@@ -1,152 +0,0 @@
-#! /usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import logging
-from optparse import OptionParser
-import sys
-
-if not '..' in sys.path:
- sys.path.insert(0, '..')
-if not '../.libs' in sys.path:
- sys.path.insert(0, '../.libs')
-
-import lasso
-
-import assertions
-import builtins
-import http
-import liberty
-
-
-applicationCamelCaseName = 'LassoSimulator'
-applicationPublicName = 'Lasso Simulator'
-applicationVersion = '(Unreleased CVS Version)'
-logger = None
-
-
-class HttpRequestHandlerMixin:
- realm = '%s Web Site' % applicationPublicName
- server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion)
-
- def version_string(self):
- return '%s %s' % (applicationPublicName, applicationVersion)
-
-
-class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler):
- pass
-
-
-class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler):
- pass
-
-
-def main():
- # Parse command line options.
- parser = OptionParser(version = '%%prog %s' % applicationVersion)
- parser.add_option(
- '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath',
- help = 'specify an alternate configuration file',
- default = '/etc/lasso-simulator/config.xml')
- parser.add_option(
- '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background',
- action = 'store_true', default = False)
- parser.add_option(
- '-D', '--debug', dest = 'debugMode', help = 'enable program debugging',
- action = 'store_true', default = False)
- parser.add_option(
- '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file',
- default = '/dev/null')
- parser.add_option(
- '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel',
- help = 'specify log level (debug, info, warning, error, critical)', default = 'info')
- (options, args) = parser.parse_args()
- if options.logLevel.upper() not in logging._levelNames:
- raise Exception('Unknown log level: "%s"' % options.logLevel)
-
- # Configure logger.
- logger = logging.getLogger()
- if options.debugMode and not options.daemonMode:
- handler = logging.StreamHandler(sys.stderr)
- else:
- handler = logging.FileHandler(options.logFilePath)
- formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- logger.setLevel(logging._levelNames[options.logLevel.upper()])
- builtins.set('logger', logger)
-
- site = liberty.LibertyEnabledProxy('https://lecp1:2014/')
- site.providerId = 'https://lecp1/metadata'
- site.idpSite = liberty.IdentityProvider('https://idp1:1998/')
- site.idpSite.providerId = 'https://idp1/metadata'
-
- lassoServer = lasso.Server.new(
- '../../tests/data/lecp1-la/metadata.xml',
- None, # '../../tests/data/lecp1-la/public-key.pem' is no more used
- '../../tests/data/lecp1-la/private-key-raw.pem',
- '../../tests/data/lecp1-la/certificate.pem',
- lasso.signatureMethodRsaSha1)
- lassoServer.add_provider(
- '../../tests/data/idp1-la/metadata.xml',
- '../../tests/data/idp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- lassoServer.add_provider(
- '../../tests/data/sp1-la/metadata.xml',
- '../../tests/data/sp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- site.lassoServerDump = lassoServer.dump()
- failUnless(site.lassoServerDump)
- lassoServer.destroy()
-
- site.certificateAbsolutePath = '../../tests/data/lecp1-ssl/certificate.pem'
- site.privateKeyAbsolutePath = '../../tests/data/lecp1-ssl/private-key-raw.pem'
- site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem'
-
- site.newUser('rc')
- site.newUser('nc')
- # site.newUser('vf') Valery Febvre has no account on liberty-enabled proxy.
- site.newUser('cn')
- site.newUser('fp')
-
- HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host.
-## httpServer = http.HttpServer(('lecp1', 2013), HttpRequestHandler)
-## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname())
- httpServer = http.HttpsServer(
- ('lecp1', 2014),
- HttpsRequestHandler,
- site.privateKeyAbsolutePath, # Server private key
- site.certificateAbsolutePath, # Server certificate
- site.peerCaCertificateAbsolutePath, # Clients certification authority certificate
- None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852
- None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13
- )
- logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname())
- try:
- httpServer.serve_forever()
- except KeyboardInterrupt:
- pass
-
-if __name__ == '__main__':
- main()
diff --git a/python/tests/sample-sp-lep.py b/python/tests/sample-sp-lep.py
deleted file mode 100755
index eabef144..00000000
--- a/python/tests/sample-sp-lep.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#! /usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import logging
-from optparse import OptionParser
-import sys
-
-if not '..' in sys.path:
- sys.path.insert(0, '..')
-if not '../.libs' in sys.path:
- sys.path.insert(0, '../.libs')
-
-import lasso
-
-import assertions
-import builtins
-import http
-import liberty
-
-applicationCamelCaseName = 'LassoSimulator'
-applicationPublicName = 'Lasso Simulator'
-applicationVersion = '(Unreleased CVS Version)'
-logger = None
-
-
-class HttpRequestHandlerMixin:
- realm = '%s Web Site' % applicationPublicName
- server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion)
-
- def version_string(self):
- return '%s %s' % (applicationPublicName, applicationVersion)
-
-
-class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler):
- pass
-
-
-class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler):
- pass
-
-
-def main():
- # Parse command line options.
- parser = OptionParser(version = '%%prog %s' % applicationVersion)
- parser.add_option(
- '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath',
- help = 'specify an alternate configuration file',
- default = '/etc/lasso-simulator/config.xml')
- parser.add_option(
- '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background',
- action = 'store_true', default = False)
- parser.add_option(
- '-D', '--debug', dest = 'debugMode', help = 'enable program debugging',
- action = 'store_true', default = False)
- parser.add_option(
- '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file',
- default = '/dev/null')
- parser.add_option(
- '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel',
- help = 'specify log level (debug, info, warning, error, critical)', default = 'info')
- (options, args) = parser.parse_args()
- if options.logLevel.upper() not in logging._levelNames:
- raise Exception('Unknown log level: "%s"' % options.logLevel)
-
- # Configure logger.
- logger = logging.getLogger()
- if options.debugMode and not options.daemonMode:
- handler = logging.StreamHandler(sys.stderr)
- else:
- handler = logging.FileHandler(options.logFilePath)
- formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- logger.setLevel(logging._levelNames[options.logLevel.upper()])
- builtins.set('logger', logger)
-
- site = liberty.ServiceProvider('https://sp1:2006/')
- site.providerId = 'https://sp1/metadata'
- site.idpSite = liberty.IdentityProvider('https://lecp1:2014/')
- site.idpSite.providerId = 'https://lecp1/metadata'
-
- lassoServer = lasso.Server.new(
- '../../tests/data/sp1-la/metadata.xml',
- None, # '../../tests/data/sp1-la/public-key.pem' is no more used
- '../../tests/data/sp1-la/private-key-raw.pem',
- '../../tests/data/sp1-la/certificate.pem',
- lasso.signatureMethodRsaSha1)
- lassoServer.add_provider(
- '../../tests/data/lecp1-la/metadata.xml',
- '../../tests/data/lecp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- site.lassoServerDump = lassoServer.dump()
- failUnless(site.lassoServerDump)
- lassoServer.destroy()
-
- site.certificateAbsolutePath = '../../tests/data/sp1-ssl/certificate.pem'
- site.privateKeyAbsolutePath = '../../tests/data/sp1-ssl/private-key-raw.pem'
- site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem'
-
- site.newUser('Nicolas')
- site.newUser('Romain')
- site.newUser('Valery')
- # Christophe Nowicki has no account on service provider.
- site.newUser('Frederic')
-
- HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host.
-## httpServer = http.HttpServer(('sp1', 2005), HttpRequestHandler)
-## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname())
- httpServer = http.HttpsServer(
- ('sp1', 2006),
- HttpsRequestHandler,
- site.privateKeyAbsolutePath, # Server private key
- site.certificateAbsolutePath, # Server certificate
- site.peerCaCertificateAbsolutePath, # Clients certification authority certificate
- None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852
- None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13
- )
- logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname())
- try:
- httpServer.serve_forever()
- except KeyboardInterrupt:
- pass
-
-if __name__ == '__main__':
- main()
diff --git a/python/tests/sample-sp.py b/python/tests/sample-sp.py
deleted file mode 100755
index 8e9c0329..00000000
--- a/python/tests/sample-sp.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#! /usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import logging
-from optparse import OptionParser
-import sys
-
-if not '..' in sys.path:
- sys.path.insert(0, '..')
-if not '../.libs' in sys.path:
- sys.path.insert(0, '../.libs')
-
-import lasso
-
-import assertions
-import builtins
-import http
-import liberty
-
-applicationCamelCaseName = 'LassoSimulator'
-applicationPublicName = 'Lasso Simulator'
-applicationVersion = '(Unreleased CVS Version)'
-logger = None
-
-
-class HttpRequestHandlerMixin:
- realm = '%s Web Site' % applicationPublicName
- server_version = '%s/%s' % (applicationCamelCaseName, applicationVersion)
-
- def version_string(self):
- return '%s %s' % (applicationPublicName, applicationVersion)
-
-
-class HttpRequestHandler(HttpRequestHandlerMixin, http.HttpRequestHandler):
- pass
-
-
-class HttpsRequestHandler(HttpRequestHandlerMixin, http.HttpsRequestHandler):
- pass
-
-
-def main():
- # Parse command line options.
- parser = OptionParser(version = '%%prog %s' % applicationVersion)
- parser.add_option(
- '-c', '--config', metavar = 'FILE', dest = 'configurationFilePath',
- help = 'specify an alternate configuration file',
- default = '/etc/lasso-simulator/config.xml')
- parser.add_option(
- '-d', '--daemon', dest = 'daemonMode', help = 'run main process in background',
- action = 'store_true', default = False)
- parser.add_option(
- '-D', '--debug', dest = 'debugMode', help = 'enable program debugging',
- action = 'store_true', default = False)
- parser.add_option(
- '-l', '--log', metavar = 'FILE', dest = 'logFilePath', help = 'specify log file',
- default = '/dev/null')
- parser.add_option(
- '-L', '--log-level', metavar = 'LEVEL', dest = 'logLevel',
- help = 'specify log level (debug, info, warning, error, critical)', default = 'info')
- (options, args) = parser.parse_args()
- if options.logLevel.upper() not in logging._levelNames:
- raise Exception('Unknown log level: "%s"' % options.logLevel)
-
- # Configure logger.
- logger = logging.getLogger()
- if options.debugMode and not options.daemonMode:
- handler = logging.StreamHandler(sys.stderr)
- else:
- handler = logging.FileHandler(options.logFilePath)
- formatter = logging.Formatter('%(asctime)s %(levelname)-9s %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- logger.setLevel(logging._levelNames[options.logLevel.upper()])
- builtins.set('logger', logger)
-
- site = liberty.ServiceProvider('https://sp1:2006/')
- site.providerId = 'https://sp1/metadata'
- site.idpSite = liberty.IdentityProvider('https://idp1:1998/')
- site.idpSite.providerId = 'https://idp1/metadata'
-
- lassoServer = lasso.Server.new(
- '../../tests/data/sp1-la/metadata.xml',
- None, # '../../tests/data/sp1-la/public-key.pem' is no more used
- '../../tests/data/sp1-la/private-key-raw.pem',
- '../../tests/data/sp1-la/certificate.pem',
- lasso.signatureMethodRsaSha1)
- lassoServer.add_provider(
- '../../tests/data/idp1-la/metadata.xml',
- '../../tests/data/idp1-la/public-key.pem',
- '../../tests/data/ca1-la/certificate.pem')
- site.lassoServerDump = lassoServer.dump()
- failUnless(site.lassoServerDump)
- lassoServer.destroy()
-
- site.certificateAbsolutePath = '../../tests/data/sp1-ssl/certificate.pem'
- site.privateKeyAbsolutePath = '../../tests/data/sp1-ssl/private-key-raw.pem'
- site.peerCaCertificateAbsolutePath = '../../tests/data/ca1-ssl/certificate.pem'
-
- site.newUser('Nicolas')
- site.newUser('Romain')
- site.newUser('Valery')
- # Christophe Nowicki has no account on service provider.
- site.newUser('Frederic')
-
- HttpRequestHandlerMixin.site = site # Directly a site, not a server => no virtual host.
-## httpServer = http.HttpServer(('sp1', 2005), HttpRequestHandler)
-## logger.info('Serving HTTP on %s port %s...' % httpServer.socket.getsockname())
- httpServer = http.HttpsServer(
- ('sp1', 2006),
- HttpsRequestHandler,
- site.privateKeyAbsolutePath, # Server private key
- site.certificateAbsolutePath, # Server certificate
- site.peerCaCertificateAbsolutePath, # Clients certification authority certificate
- None, # sslCertificateChainFile see mod_ssl, ssl_engine_init.c, line 852
- None, # sslVerifyClient http://www.modssl.org/docs/2.1/ssl_reference.html#ToC13
- )
- logger.info('Serving HTTPS on %s port %s...' % httpServer.socket.getsockname())
- try:
- httpServer.serve_forever()
- except KeyboardInterrupt:
- pass
-
-if __name__ == '__main__':
- main()
diff --git a/python/tests/submissions.py b/python/tests/submissions.py
deleted file mode 100644
index a9f9504c..00000000
--- a/python/tests/submissions.py
+++ /dev/null
@@ -1,292 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# HTTP Client and Server Enhanced Classes
-# By: Frederic Peters <fpeters@entrouvert.com>
-# Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://www.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-
-
-"""Wrapper for HTML form submissions, simulating Web Forms 2 behaviour
-
-See http://whatwg.org/specs/web-forms/2004-06-27-call-for-comments/#x-www-form-xml
-"""
-
-
-import cgi
-
-
-class AbstractSubmission(object):
- httpRequestHandler = None
- length = None
- mimeType = None
-
- def __init__(self, httpRequestHandler, contentLength):
- assert httpRequestHandler
- self.httpRequestHandler = httpRequestHandler
- assert isinstance(contentLength, int)
- self.length = contentLength
-
- def getField(self, name, index = 0, default = None):
- # Return either a string or a sequence of strings.
- fieldList = self.getFieldList(name, index)
- if not fieldList:
- return default
- elif len(fieldList) == 1:
- return fieldList[0]
- else:
- return fieldList
-
- def getFieldList(self, name, index = 0):
- # Return a sequence of strings.
- raise NotImplementedError
-
- def getFile(self, name, index = 0, default = None):
- # Return either an instance of FileUpload or a sequence of FileUpload instances.
- fileList = self.getFileList(name, index)
- if not fileList:
- return default
- elif len(fileList) == 1:
- return fileList[0]
- else:
- return fileList
-
- def getFileList(self, name, index = 0):
- # Return a sequence of FileUpload instances.
- raise NotImplementedError
-
-## def getRepeat(self, template):
-## raise NotImplementedError
-
- def hasField(self, name, index = 0):
- raise NotImplementedError
-
- def hasFile(self, name, index = 0):
- raise NotImplementedError
-
- def readFile(self):
- raise NotImplementedError
-
-
-class FakeSubmission(AbstractSubmission):
- _fields = None
-
- def __init__(self, fields = None, query = None):
- self._fields = {}
- if fields:
- for name, value in fields.items():
- self._fields[name] = [value]
- if query:
- for name, value in cgi.parse_qsl(query, keep_blank_values = True):
- if name in self._fields:
- self._fields[name].append(value)
- else:
- self._fields[name] = [value]
-
- def getFieldList(self, name, index = 0):
- if index == 0 and name in self._fields:
- return self._fields[name]
- return []
-
- def getFileList(self, name, index = 0):
- return []
-
- def hasField(self, name, index = 0):
- return index == 0 and name in self._fields
-
- def hasFile(self, name, index = 0):
- return False
-
- def readFile(self):
- return None
-
-
-class FieldStorageSubmission(AbstractSubmission):
- """Submission wrapper for all encoding types handled by module 'cgi':
- 'application/x-www-form-urlencoded', 'multipart/form-data'...
-
- This submission method discards the control index and repetition block parts of the form data
- set. So, for these encoding types, control index is always 0.
- """
-
- fieldStorage = None
-
- def __init__(self, httpRequestHandler, contentType, contentLength, contentTypeHeader):
- super(FieldStorageSubmission, self).__init__(httpRequestHandler, contentLength)
- assert contentType
- self.mimeType = contentType
- # The use of environ seems to be required by cgi.FieldStorage.
- # It also needs to add "content-type" in headers.
- fakeHeaders = {}
- for key, value in httpRequestHandler.headers.items():
- fakeHeaders[key] = value
- environ = {
- "CONTENT_TYPE": contentTypeHeader,
- "REQUEST_METHOD": httpRequestHandler.command,
- }
- if not "content-type" in fakeHeaders:
- fakeHeaders["content-type"] = environ["CONTENT_TYPE"]
- if contentLength:
- environ["CONTENT_LENGTH"] = str(contentLength)
- splitedPath = httpRequestHandler.path.split("?")
- if len(splitedPath) >= 2:
- httpQuery = splitedPath[1]
- if httpQuery:
- environ["QUERY_STRING"] = httpQuery
- self.fieldStorage = cgi.FieldStorage(
- environ = environ,
- fp = httpRequestHandler.rfile,
- headers = fakeHeaders,
- keep_blank_values = True)
-
- def getFieldList(self, name, index = 0):
- if index > 0:
- return []
- return [item.value
- for item in self.fieldStorage.list
- if item.name == name and item.filename is None]
-
- def getFileList(self, name, index = 0):
- if index > 0:
- return []
- return [FileUpload(item.filename, item.type, item.file)
- for item in self.fieldStorage.list
- if item.name == name and item.filename is not None]
-
- def hasField(self, name, index = 0):
- if index == 0:
- for item in self.fieldStorage.list:
- if item.name == name and item.filename is None:
- return True
- return False
-
- def hasFile(self, name, index = 0):
- if index == 0:
- for item in self.fieldStorage.list:
- if item.name == name and item.filename is not None:
- return True
- return False
-
- def readFile(self):
- return None
-
-
-class FileUpload(object):
- file = None
- filename = None # Optional
- mimeType = None # Optional: MIME type with optional parameters.
-
- def __init__(self, filename, mimeType, file):
- if filename is not None:
- self.filename = filename
- if mimeType is not None:
- self.mimeType = mimeType
- assert file is not None
- self.file = file
-
-
-class FileUploadSubmission(AbstractSubmission):
- """Submission for exactly one file
-
- If the enctype attribute is not specified in the form (or is set to the empty string), and the
- form consists of exactly one file upload control with exactly one file selected, then the user
- agent use this submission method.
- Also used for HTTP PUT...
-
- Note: FileUploadSubmission contains all the FileUpload interface, so that it can be used as a
- FileUpload.
- """
-
- file = None
- filename = None # Always None
-
- def __init__(self, httpRequestHandler, contentType, contentLength):
- super(FileUploadSubmission, self).__init__(httpRequestHandler, contentLength)
- assert contentType
- self.mimeType = contentType
- self.file = httpRequestHandler.rfile
-
- def getFieldList(self, name, index = 0):
- return []
-
- def getFileList(self, name, index = 0):
- return []
-
- def hasField(self, name, index = 0):
- return False
-
- def hasFile(self, name, index = 0):
- return False
-
- def readFile(self):
- if self.length == 0:
- return None
- return self.file.read(self.length)
-
-
-class XmlFormSubmission(AbstractSubmission):
- """Submission for encoding type 'application/x-www-form+xml'"""
-
- file = None
- mimeType = "application/x-www-form+xml"
-
- def __init__(self, httpRequestHandler, contentType, contentLength):
- super(XmlFormSubmission, self).__init__(httpRequestHandler, contentLength)
- assert contentType == self.mimeType
- self.file = httpRequestHandler.rfile
-
- def getFieldList(self, name, index = 0):
- raise NotImplementedError
-
- def getFileList(self, name, index = 0):
- return NotImplementedError
-
- def hasField(self, name, index = 0):
- raise NotImplementedError
-
- def hasFile(self, name, index = 0):
- raise NotImplementedError
-
- def readFile(self):
- return None
-
-
-def readSubmission(httpRequestHandler):
- # Get query, headers and form variables.
- if httpRequestHandler.headers.typeheader is None:
- if httpRequestHandler.command in ("GET", "HEAD", "POST"):
- contentTypeHeader = "application/x-www-form-urlencoded"
- else:
- contentTypeHeader = httpRequestHandler.headers.type
- else:
- contentTypeHeader = httpRequestHandler.headers.typeheader
- contentType, contentTypeOptions = cgi.parse_header(contentTypeHeader)
- contentLength = httpRequestHandler.headers.get("content-length")
- try:
- contentLength = int(contentLength)
- except (TypeError, ValueError):
- contentLength = 0
- if contentType == "application/x-www-form+xml":
- submission = XmlFormSubmission(httpRequestHandler, contentType, contentLength)
- elif contentType in ("application/x-www-form-urlencoded", "multipart/form-data"):
- submission = FieldStorageSubmission(
- httpRequestHandler, contentType, contentLength, contentTypeHeader)
- else:
- submission = FileUploadSubmission(httpRequestHandler, contentType, contentLength)
- return submission
diff --git a/python/tests/web.py b/python/tests/web.py
deleted file mode 100644
index 55011a28..00000000
--- a/python/tests/web.py
+++ /dev/null
@@ -1,159 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# HTTP Client and Server Enhanced Classes
-# By: Frederic Peters <fpeters@entrouvert.com>
-# Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://www.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-
-
-"""HTTP client and server enhanced classes
-
-Features:
-- HTTPS using OpenSSL;
-- web sessions (with or without cookie);
-- user authentication (support of basic HTTP-authentication, X.509v3 certificate authentication,
- HTML based authentication, etc).
-"""
-
-
-import urlparse
-
-from OpenSSL import SSL
-
-import abstractweb
-import http
-
-
-class ReceivedHttpResponse(object):
- body = None
- headers = None
- statusCode = None # 200 or...
- statusMessage = None
-
- def __init__(self, statusCode = 200, statusMessage = None, headers = None, body = None):
- if statusCode:
- self.statusCode = statusCode
- if statusMessage:
- self.statusMessage = statusMessage
- if headers:
- self.headers = headers
- if body:
- self.body = body
-
-
-class WebClient(abstractweb.WebClientMixin, object):
- certificateAbsolutePath = None
- privateKeyAbsolutePath = None
- peerCaCertificateAbsolutePath = None
-
- def sendHttpRequest(self, method, url, headers = None, body = None):
- parsedUrl = urlparse.urlparse(url)
- addressingScheme, hostName, path = parsedUrl[:3]
- if addressingScheme == 'https':
- connection = http.HttpsConnection(
- hostName, None, self.privateKeyAbsolutePath, self.certificateAbsolutePath,
- self.peerCaCertificateAbsolutePath)
- else:
- connection = httplib.HTTPConnection(hostName)
- if headers:
- httpRequestHeaders = self.httpRequestHeaders.copy()
- for name, value in headers.iteritems():
- httpRequestHeaders[name] = value
- else:
- httpRequestHeaders = self.httpRequestHeaders
- failUnless('Content-Type' in httpRequestHeaders)
- try:
- connection.request('POST', path, body, httpRequestHeaders)
- except SSL.Error, error:
- if error.args and error.args[0] and error.args[0][0] \
- and error.args[0][0][0] == 'SSL routines':
- logger.debug('SSL Error in sendHttpRequest. Error = %s' % repr(error))
- raise
- response = connection.getresponse()
- try:
- body = response.read()
- except SSL.SysCallError, error:
- logger.debug('No SOAP answer in sendHttpRequest. Error = %s' % repr(error))
- raise
- httpResponse = ReceivedHttpResponse(response.status, response.reason, response.msg, body)
- return httpResponse
-
-
-class WebSession(abstractweb.WebSessionMixin, object):
- """Simulation of session of a web site"""
-
- expirationTime = None # A sample session variable
- lassoLoginDump = None # Used only by some identity providers
- lassoSessionDump = None
- publishToken = False
-
-
-class WebUser(abstractweb.WebUserMixin, object):
- """Simulation of user of a web site"""
-
- lassoIdentityDump = None
- language = 'fr' # A sample user variable
- password = None
-
-
-class WebSite(abstractweb.WebSiteMixin, WebClient):
- url = None # The main URL of web site
- WebSession = WebSession
- WebUser = WebUser
-
- def __init__(self, url):
- WebClient.__init__(self)
- abstractweb.WebSiteMixin.__init__(self)
- self.url = url
-
- def authenticateLoginPasswordUser(self, login, password):
- # We should check login & password and return the user if one matches or None otherwise.
- # FIXME: Check password also.
- return self.users.get(login)
-
- def login_local(self, handler):
- user = handler.user
- if user is None:
- failUnless(handler.useHttpAuthentication)
- return handler.outputErrorUnauthorized(handler.httpRequest.path)
- else:
- # The user is already authenticated using HTTP authentication.
- userAuthenticated = True
-
- # authenticationMethod = lasso.samlAuthenticationMethodPassword
- authenticationMethod = 'urn:oasis:names:tc:SAML:1.0:am:password'
- if userAuthenticated:
- session = handler.session
- if session is None:
- session = handler.createSession()
- # No need to publish token, because we are using HTTP authentication.
- if session.publishToken:
- del session.publishToken
- user = handler.user
- if user is None:
- user = handler.createUser()
- session.userId = user.uniqueId
- user.sessionToken = session.token
- return self.login_done(handler, userAuthenticated, authenticationMethod)
-
- def login_failed(self, handler):
- if handler.useHttpAuthentication:
- handler.useHttpAuthentication = 'not this time'
- return handler.respond(401, 'Access Unauthorized: User has no account.')
diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py
deleted file mode 100644
index 2d6738fa..00000000
--- a/python/tests/websimulator.py
+++ /dev/null
@@ -1,244 +0,0 @@
-# -*- coding: UTF-8 -*-
-
-
-# Lasso Simulator
-# By: Emmanuel Raviart <eraviart@entrouvert.com>
-#
-# Copyright (C) 2004 Entr'ouvert
-# http://lasso.entrouvert.org
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-import abstractweb
-
-
-class HttpRequest(abstractweb.HttpRequestMixin, object):
- client = None # Principal or web site sending the request.
- form = None
-
- def __init__(self, client, method, url, headers = None, body = None, form = None):
- self.client = client
- self.method = method
- self.url = url
- if headers:
- self.headers = headers
- if body:
- self.body = body
- if form:
- self.form = form
-
- def getFormField(self, name, default = None):
- if self.form is not None:
- return self.form.get(name, default)
- return default
-
- def getPath(self):
- return self.pathAndQuery.split('?', 1)[0]
-
- def getPathAndQuery(self):
- urlWithoutScheme = self.url[self.url.find('://') + 3:]
- if '/' in urlWithoutScheme:
- pathAndQuery = urlWithoutScheme[urlWithoutScheme.find('/'):]
- else:
- pathAndQuery = ''
- return pathAndQuery
-
- def getQuery(self):
- splitedUrl = self.pathAndQuery.split('?', 1)
- if len(splitedUrl) > 1:
- return splitedUrl[1]
- else:
- return ''
-
- def getScheme(self):
- return self.url.split(':', 1)[0].lower()
-
- def hasFormField(self, name):
- if self.form is None:
- return False
- return name in self.form
-
- def send(self):
- webSite = self.client.internet.getWebSite(self.url)
- return webSite.handleHttpRequest(self)
-
- path = property(getPath)
- pathAndQuery = property(getPathAndQuery)
- query = property(getQuery)
- scheme = property(getScheme)
-
-
-class FunctionHttpRequest(abstractweb.FunctionHttpRequest):
- def getClient(self):
- return self.previousHttpRequest.client
-
- client = property(getClient)
-
-
-class HttpResponse(abstractweb.HttpResponseMixin, object):
- def send(self, httpRequestHandler):
- return self
-
-
-class HttpRequestHandler(abstractweb.HttpRequestHandlerMixin, object):
- HttpResponse = HttpResponse # Class
-
- def __init__(self, site, httpRequest):
- self.site = site
- self.httpRequest = httpRequest
-
- def createSession(self):
- session = abstractweb.HttpRequestHandlerMixin.createSession(self)
- self.httpRequest.client.sessionTokens[self.site.url] = session.token
- return session
-
- def respondRedirectTemporarily(self, url):
- scheme = url.split('://')[0].lower()
- if scheme not in ('http', 'https'):
- # The url doesn't include host name => add it.
- path = url
- url = self.site.url
- if path:
- if path[0] == '/':
- while url[-1] == '/':
- url = url[:-1]
- elif url[-1] != '/':
- url += '/'
- url += path
- return self.httpRequest.client.redirect(url)
-
-
-class Internet(object):
- webSites = None
-
- def __init__(self):
- self.webSites = {}
-
- def addWebSite(self, webSite):
- self.webSites[webSite.url] = webSite
-
- def getWebSite(self, url):
- for webSiteUrl, webSite in self.webSites.iteritems():
- if url.startswith(webSiteUrl):
- return webSite
- raise Exception('Unknown web site: %s' % url)
-
-
-class WebClient(abstractweb.WebClientMixin, object):
- internet = None
- keyring = None
- sessionTokens = None # Simulate the cookies, stored in user's navigator, and containing the
- # IDs of sessions already opened by the user.
-
- def __init__(self, internet):
- self.internet = internet
- self.keyring = {}
- self.sessionTokens = {}
-
- def redirect(self, url):
- return self.sendHttpRequest('GET', url)
-
- def sendHttpRequest(self, method, url, headers = None, body = None, form = None):
- if headers:
- httpRequestHeaders = self.httpRequestHeaders.copy()
- for name, value in headers.iteritems():
- httpRequestHeaders[name] = value
- else:
- httpRequestHeaders = self.httpRequestHeaders
- return HttpRequest(
- self, method, url, headers = httpRequestHeaders, body = body, form = form).send()
-
- def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None,
- form = None):
- url = webSite.url
- if path:
- if path[0] == '/':
- while url[-1] == '/':
- url = url[:-1]
- elif url[-1] != '/':
- url += '/'
- url += path
- return self.sendHttpRequest(method, url, headers = headers, body = body, form = form)
-
-
-class Principal(WebClient):
- """Simulation of a user and its web navigator"""
-
- name = None # The user name
-
- def __init__(self, internet, name):
- WebClient.__init__(self, internet)
- self.name = name
-
-
-class WebSession(abstractweb.WebSessionMixin, object):
- """Simulation of session of a web site"""
-
- expirationTime = None # A sample session variable
- lassoLoginDump = None # Used only by some identity providers
- lassoSessionDump = None
-
-
-class WebUser(abstractweb.WebUserMixin, object):
- """Simulation of user of a web site"""
-
- lassoIdentityDump = None
- language = 'fr' # A sample user variable
-
-
-class WebSite(abstractweb.WebSiteMixin, WebClient):
- """Simulation of a web site"""
-
- FunctionHttpRequest = FunctionHttpRequest # Class
- url = None # The main URL of web site
- WebSession = WebSession
- WebUser = WebUser
-
- def __init__(self, internet, url):
- WebClient.__init__(self, internet)
- abstractweb.WebSiteMixin.__init__(self)
- self.url = url
- self.internet.addWebSite(self)
-
- def handleHttpRequest(self, httpRequest):
- httpRequestHandler = HttpRequestHandler(self, httpRequest)
-
- # Retrieve session and user.
- sessionToken = httpRequest.client.sessionTokens.get(self.url, None)
- if sessionToken is not None:
- session = self.sessions.get(sessionToken)
- if session is not None:
- httpRequestHandler.session = session
- if session.userId is not None:
- httpRequestHandler.user = self.users.get(session.userId, None)
-
- return self.handleHttpRequestHandler(httpRequestHandler)
-
- def login_local(self, handler):
- userId = handler.httpRequest.client.keyring.get(self.url, None)
- userAuthenticated = userId in self.users
- # authenticationMethod = lasso.samlAuthenticationMethodPassword
- authenticationMethod = 'urn:oasis:names:tc:SAML:1.0:am:password'
- if userAuthenticated:
- session = handler.session
- if session is None:
- session = handler.createSession()
- user = handler.user
- if user is None:
- user = handler.createUser()
- session.userId = user.uniqueId
- user.sessionToken = session.token
- return self.login_done(handler, userAuthenticated, authenticationMethod)