diff options
Diffstat (limited to 'python/tests/ServiceProvider.py')
-rw-r--r-- | python/tests/ServiceProvider.py | 116 |
1 files changed, 57 insertions, 59 deletions
diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py index c548655b..2fd518a3 100644 --- a/python/tests/ServiceProvider.py +++ b/python/tests/ServiceProvider.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -32,57 +31,57 @@ from websimulator import * class ServiceProvider(Provider): idpSite = None # The identity provider, this service provider will use to authenticate users. - def assertionConsumer(self, httpRequest): + def assertionConsumer(self, handler): server = self.getServer() login = lasso.Login.new(server) - if httpRequest.method == 'GET': - login.init_request(httpRequest.query, lasso.httpMethodRedirect) + if handler.httpRequest.method == 'GET': + login.init_request(handler.httpRequest.query, lasso.httpMethodRedirect) login.build_request_msg() soapEndpoint = login.msg_url - self.failUnless(soapEndpoint) + failUnless(soapEndpoint) soapRequestMsg = login.msg_body - self.failUnless(soapRequestMsg) + failUnless(soapRequestMsg) httpResponse = self.sendHttpRequest( 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) try: login.process_response_msg(httpResponse.body) except lasso.Error, error: if error.code == -7: # FIXME: This will change, he said. - return self.newHttpResponse( + return handler.respond( 401, 'Access Unauthorized: User authentication failed on identity provider.') else: raise - elif httpRequest.method == 'POST': - authnResponseMsg = httpRequest.getFormField('LARES', None) - self.failUnless(authnResponseMsg) + elif handler.httpRequest.method == 'POST': + authnResponseMsg = handler.httpRequest.getFormField('LARES', None) + failUnless(authnResponseMsg) # FIXME: Should we do an init before process_authn_response_msg? try: login.process_authn_response_msg(authnResponseMsg) except lasso.Error, error: if error.code == -7: # FIXME: This will change, he said. - return self.newHttpResponse( + return handler.respond( 401, 'Access Unauthorized: User authentication failed on identity provider.') else: raise else: - return self.newHttpResponse( + return handler.respond( 400, - 'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method) + 'Bad Request: Method %s not handled by assertionConsumer' % handler.httpRequest.method) nameIdentifier = login.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) # Retrieve session dump, using name identifier or else try to use the client web session. # If session dump exists, give it to Lasso, so that it updates it. webSession = self.getWebSessionFromNameIdentifier(nameIdentifier) if webSession is None: - webSession = self.getWebSession(httpRequest.client) + webSession = self.getWebSession(handler.httpRequest.client) if webSession is not None: sessionDump = webSession.sessionDump if sessionDump is not None: @@ -99,29 +98,29 @@ class ServiceProvider(Provider): login.accept_sso() if webUser is not None and identityDump is None: - self.failUnless(login.is_identity_dirty()) + failUnless(login.is_identity_dirty()) identity = login.get_identity() - self.failUnless(identity) + failUnless(identity) identityDump = identity.dump() - self.failUnless(identityDump) - self.failUnless(login.is_session_dirty()) + failUnless(identityDump) + failUnless(login.is_session_dirty()) session = login.get_session() - self.failUnless(session) + failUnless(session) sessionDump = session.dump() - self.failUnless(sessionDump) + failUnless(sessionDump) # User is now authenticated. # If there was no web session yet, create it. Idem for the web user account. if webSession is None: - webSession = self.createWebSession(httpRequest.client) + webSession = self.createWebSession(handler.httpRequest.client) if webUser is None: # A real service provider would ask user to login locally to create federation. Or it # would ask user informations to create a local account. - webUserId = httpRequest.client.keyring.get(self.url, None) + webUserId = handler.httpRequest.client.keyring.get(self.url, None) userAuthenticated = webUserId in self.webUsers if not userAuthenticated: - return self.newHttpResponse(401, 'Access Unauthorized: User has no account.') + return handler.respond(401, 'Access Unauthorized: User has no account.') webUser = self.webUsers[webUserId] webSession.webUserId = webUser.uniqueId @@ -134,32 +133,32 @@ class ServiceProvider(Provider): self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId - return self.newHttpResponse(200) + return handler.respond() - def login(self, httpRequest): - libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None) - userAgent = httpRequest.headers.get('User-Agent', None) + def login(self, handler): + libertyEnabled = handler.httpRequest.headers.get('Liberty-Enabled', None) + userAgent = handler.httpRequest.headers.get('User-Agent', None) # FIXME: Lasso should have a function to compute useLecp. # Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent) useLecp = False if libertyEnabled: useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled if not useLecp: - return self.newHttpResponse(501, 'Unsupported Liberty Version.') + return handler.respond(501, 'Unsupported Liberty Version.') elif userAgent: useLecp = 'urn:liberty:iff:2003-08' in userAgent if not useLecp and "LIBV=" in userAgent: - return self.newHttpResponse(501, 'Unsupported Liberty Version.') + return handler.respond(501, 'Unsupported Liberty Version.') else: useLecp = False - forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False) - isPassive = httpRequest.getQueryBoolean('isPassive', False) + forceAuthn = handler.httpRequest.getQueryBoolean('forceAuthn', False) + isPassive = handler.httpRequest.getQueryBoolean('isPassive', False) server = self.getServer() if useLecp: lecp = lasso.Lecp.new(server) lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None. - self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest) + failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest) # FIXME: This protocol profile should be set by default by Lasso. lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost) @@ -180,15 +179,14 @@ class ServiceProvider(Provider): # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded. import base64 authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg) - self.failUnless(authnRequestEnvelopeMsg) + failUnless(authnRequestEnvelopeMsg) # FIXME: Lasso should set a lecp.msg_content_type to # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with # other profiles. # contentType = lecp.msg_content_type - # self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml') + # failUnlessEqual(contentType, 'application/vnd.liberty-request+xml') contentType = 'application/vnd.liberty-request+xml' - return self.newHttpResponse( - 200, + return handler.respond( headers = { 'Content-Type': contentType, 'Cache-Control': 'no-cache', @@ -198,7 +196,7 @@ class ServiceProvider(Provider): else: login = lasso.Login.new(server) login.init_authn_request(self.idpSite.providerId) - self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) + failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) if forceAuthn: login.request.set_forceAuthn(forceAuthn) if not isPassive: @@ -209,43 +207,43 @@ class ServiceProvider(Provider): login.request.set_relayState(relayState) login.build_authn_request_msg() authnRequestUrl = login.msg_url - self.failUnless(authnRequestUrl) - return httpRequest.client.redirect(authnRequestUrl) + failUnless(authnRequestUrl) + return handler.respondRedirectTemporarily(authnRequestUrl) - def logoutUsingSoap(self, httpRequest): - webSession = self.getWebSession(httpRequest.client) + def logoutUsingSoap(self, handler): + webSession = self.getWebSession(handler.httpRequest.client) if webSession is None: - return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.') + return handler.respond(401, 'Access Unauthorized: User has no session opened.') webUser = self.getWebUserFromWebSession(webSession) if webUser is None: - return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.') + return handler.respond(401, 'Access Unauthorized: User is not logged in.') server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeSp) - identityDump = self.getIdentityDump(httpRequest.client) + identityDump = self.getIdentityDump(handler.httpRequest.client) if identityDump is not None: logout.set_identity_from_dump(identityDump) - sessionDump = self.getSessionDump(httpRequest.client) + sessionDump = self.getSessionDump(handler.httpRequest.client) if sessionDump is not None: logout.set_session_from_dump(sessionDump) logout.init_request() logout.build_request_msg() soapEndpoint = logout.msg_url - self.failUnless(soapEndpoint) + failUnless(soapEndpoint) soapRequestMsg = logout.msg_body - self.failUnless(soapRequestMsg) + failUnless(soapRequestMsg) httpResponse = self.sendHttpRequest( 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) - self.failUnlessEqual(httpResponse.statusCode, 200) + failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) - self.failIf(logout.is_identity_dirty()) + failIf(logout.is_identity_dirty()) identity = logout.get_identity() - self.failUnless(identity) + failUnless(identity) identityDump = identity.dump() - self.failUnless(identityDump) - self.failUnless(logout.is_session_dirty()) + failUnless(identityDump) + failUnless(logout.is_session_dirty()) session = logout.get_session() if session is None: # The user is no more authenticated on any identity provider. Log him out. @@ -257,10 +255,10 @@ class ServiceProvider(Provider): else: # The user is still logged in on some other identity providers. sessionDump = session.dump() - self.failUnless(sessionDump) + failUnless(sessionDump) webSession.sessionDump = sessionDump nameIdentifier = logout.nameIdentifier - self.failUnless(nameIdentifier) + failUnless(nameIdentifier) del self.webSessionIdsByNameIdentifier[nameIdentifier] - return self.newHttpResponse(200) + return handler.respond() |