diff options
Diffstat (limited to 'python/tests/ServiceProvider.py')
-rw-r--r-- | python/tests/ServiceProvider.py | 160 |
1 files changed, 119 insertions, 41 deletions
diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py index 4ecff2df..c548655b 100644 --- a/python/tests/ServiceProvider.py +++ b/python/tests/ServiceProvider.py @@ -21,8 +21,6 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# FIXME: Replace principal with client in most methods. -# FIXME: Rename webUser to userAccount. import lasso @@ -37,23 +35,46 @@ class ServiceProvider(Provider): def assertionConsumer(self, httpRequest): server = self.getServer() login = lasso.Login.new(server) - login.init_request(httpRequest.query, lasso.httpMethodRedirect) - login.build_request_msg() - soapEndpoint = login.msg_url - self.failUnless(soapEndpoint) - soapRequestMsg = login.msg_body - self.failUnless(soapRequestMsg) - httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask() - self.failUnlessEqual(httpResponse.statusCode, 200) - try: - login.process_response_msg(httpResponse.body) - except lasso.Error, error: - if error.code == -7: # FIXME: This will change, he said. - return HttpResponse( - 401, 'Access Unauthorized: User authentication failed on identity provider.') - else: - raise + if httpRequest.method == 'GET': + login.init_request(httpRequest.query, lasso.httpMethodRedirect) + login.build_request_msg() + + soapEndpoint = login.msg_url + self.failUnless(soapEndpoint) + soapRequestMsg = login.msg_body + self.failUnless(soapRequestMsg) + httpResponse = self.sendHttpRequest( + 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, + body = soapRequestMsg) + self.failUnlessEqual(httpResponse.statusCode, 200) + try: + login.process_response_msg(httpResponse.body) + except lasso.Error, error: + if error.code == -7: # FIXME: This will change, he said. + return self.newHttpResponse( + 401, + 'Access Unauthorized: User authentication failed on identity provider.') + else: + raise + elif httpRequest.method == 'POST': + authnResponseMsg = httpRequest.getFormField('LARES', None) + self.failUnless(authnResponseMsg) + # FIXME: Should we do an init before process_authn_response_msg? + try: + login.process_authn_response_msg(authnResponseMsg) + except lasso.Error, error: + if error.code == -7: # FIXME: This will change, he said. + return self.newHttpResponse( + 401, + 'Access Unauthorized: User authentication failed on identity provider.') + else: + raise + else: + return self.newHttpResponse( + 400, + 'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method) + nameIdentifier = login.nameIdentifier self.failUnless(nameIdentifier) @@ -100,7 +121,7 @@ class ServiceProvider(Provider): webUserId = httpRequest.client.keyring.get(self.url, None) userAuthenticated = webUserId in self.webUsers if not userAuthenticated: - return HttpResponse(401, 'Access Unauthorized: User has no account.') + return self.newHttpResponse(401, 'Access Unauthorized: User has no account.') webUser = self.webUsers[webUserId] webSession.webUserId = webUser.uniqueId @@ -113,35 +134,91 @@ class ServiceProvider(Provider): self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId - return HttpResponse(200) + return self.newHttpResponse(200) + + def login(self, httpRequest): + libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None) + userAgent = httpRequest.headers.get('User-Agent', None) + # FIXME: Lasso should have a function to compute useLecp. + # Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent) + useLecp = False + if libertyEnabled: + useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled + if not useLecp: + return self.newHttpResponse(501, 'Unsupported Liberty Version.') + elif userAgent: + useLecp = 'urn:liberty:iff:2003-08' in userAgent + if not useLecp and "LIBV=" in userAgent: + return self.newHttpResponse(501, 'Unsupported Liberty Version.') + else: + useLecp = False - def loginUsingRedirect(self, httpRequest): - server = self.getServer() - login = lasso.Login.new(server) - login.init_authn_request(self.idpSite.providerId) - self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False) - if forceAuthn: - login.request.set_forceAuthn(forceAuthn) isPassive = httpRequest.getQueryBoolean('isPassive', False) - if not isPassive: - login.request.set_isPassive(isPassive) - login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) - login.request.set_consent(lasso.libConsentObtained) - relayState = 'fake' - login.request.set_relayState(relayState) - login.build_authn_request_msg() - authnRequestUrl = login.msg_url - self.failUnless(authnRequestUrl) - return httpRequest.client.redirect(authnRequestUrl) + server = self.getServer() + if useLecp: + lecp = lasso.Lecp.new(server) + lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None. + self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest) + + # FIXME: This protocol profile should be set by default by Lasso. + lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost) + + # Same treatement as for non LECP login. + if forceAuthn: + lecp.request.set_forceAuthn(forceAuthn) + if not isPassive: + lecp.request.set_isPassive(isPassive) + lecp.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) + lecp.request.set_consent(lasso.libConsentObtained) + relayState = 'fake' + lecp.request.set_relayState(relayState) + + # FIXME: In my opinion, this method should be the renamed to build_authn_request_msg. + lecp.build_authn_request_envelope_msg() + authnRequestEnvelopeMsg = lecp.msg_body + # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded. + import base64 + authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg) + self.failUnless(authnRequestEnvelopeMsg) + # FIXME: Lasso should set a lecp.msg_content_type to + # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with + # other profiles. + # contentType = lecp.msg_content_type + # self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml') + contentType = 'application/vnd.liberty-request+xml' + return self.newHttpResponse( + 200, + headers = { + 'Content-Type': contentType, + 'Cache-Control': 'no-cache', + 'Pragma': 'no-cache', + }, + body = authnRequestEnvelopeMsg) + else: + login = lasso.Login.new(server) + login.init_authn_request(self.idpSite.providerId) + self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) + if forceAuthn: + login.request.set_forceAuthn(forceAuthn) + if not isPassive: + login.request.set_isPassive(isPassive) + login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) + login.request.set_consent(lasso.libConsentObtained) + relayState = 'fake' + login.request.set_relayState(relayState) + login.build_authn_request_msg() + authnRequestUrl = login.msg_url + self.failUnless(authnRequestUrl) + return httpRequest.client.redirect(authnRequestUrl) def logoutUsingSoap(self, httpRequest): webSession = self.getWebSession(httpRequest.client) if webSession is None: - return HttpResponse(401, 'Access Unauthorized: User has no session opened.') + return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.') webUser = self.getWebUserFromWebSession(webSession) if webUser is None: - return HttpResponse(401, 'Access Unauthorized: User is not logged in.') + return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.') server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeSp) @@ -158,7 +235,8 @@ class ServiceProvider(Provider): self.failUnless(soapEndpoint) soapRequestMsg = logout.msg_body self.failUnless(soapRequestMsg) - httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask() + httpResponse = self.sendHttpRequest( + 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) self.failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) @@ -185,4 +263,4 @@ class ServiceProvider(Provider): self.failUnless(nameIdentifier) del self.webSessionIdsByNameIdentifier[nameIdentifier] - return HttpResponse(200) + return self.newHttpResponse(200) |