summaryrefslogtreecommitdiffstats
path: root/python/tests/ServiceProvider.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests/ServiceProvider.py')
-rw-r--r--python/tests/ServiceProvider.py116
1 files changed, 57 insertions, 59 deletions
diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py
index c548655b..2fd518a3 100644
--- a/python/tests/ServiceProvider.py
+++ b/python/tests/ServiceProvider.py
@@ -1,12 +1,11 @@
# -*- coding: UTF-8 -*-
-# Python Lasso Simulator
+# Lasso Simulator
+# By: Emmanuel Raviart <eraviart@entrouvert.com>
#
# Copyright (C) 2004 Entr'ouvert
# http://lasso.entrouvert.org
-#
-# Author: Emmanuel Raviart <eraviart@entrouvert.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -32,57 +31,57 @@ from websimulator import *
class ServiceProvider(Provider):
idpSite = None # The identity provider, this service provider will use to authenticate users.
- def assertionConsumer(self, httpRequest):
+ def assertionConsumer(self, handler):
server = self.getServer()
login = lasso.Login.new(server)
- if httpRequest.method == 'GET':
- login.init_request(httpRequest.query, lasso.httpMethodRedirect)
+ if handler.httpRequest.method == 'GET':
+ login.init_request(handler.httpRequest.query, lasso.httpMethodRedirect)
login.build_request_msg()
soapEndpoint = login.msg_url
- self.failUnless(soapEndpoint)
+ failUnless(soapEndpoint)
soapRequestMsg = login.msg_body
- self.failUnless(soapRequestMsg)
+ failUnless(soapRequestMsg)
httpResponse = self.sendHttpRequest(
'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
body = soapRequestMsg)
- self.failUnlessEqual(httpResponse.statusCode, 200)
+ failUnlessEqual(httpResponse.statusCode, 200)
try:
login.process_response_msg(httpResponse.body)
except lasso.Error, error:
if error.code == -7: # FIXME: This will change, he said.
- return self.newHttpResponse(
+ return handler.respond(
401,
'Access Unauthorized: User authentication failed on identity provider.')
else:
raise
- elif httpRequest.method == 'POST':
- authnResponseMsg = httpRequest.getFormField('LARES', None)
- self.failUnless(authnResponseMsg)
+ elif handler.httpRequest.method == 'POST':
+ authnResponseMsg = handler.httpRequest.getFormField('LARES', None)
+ failUnless(authnResponseMsg)
# FIXME: Should we do an init before process_authn_response_msg?
try:
login.process_authn_response_msg(authnResponseMsg)
except lasso.Error, error:
if error.code == -7: # FIXME: This will change, he said.
- return self.newHttpResponse(
+ return handler.respond(
401,
'Access Unauthorized: User authentication failed on identity provider.')
else:
raise
else:
- return self.newHttpResponse(
+ return handler.respond(
400,
- 'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method)
+ 'Bad Request: Method %s not handled by assertionConsumer' % handler.httpRequest.method)
nameIdentifier = login.nameIdentifier
- self.failUnless(nameIdentifier)
+ failUnless(nameIdentifier)
# Retrieve session dump, using name identifier or else try to use the client web session.
# If session dump exists, give it to Lasso, so that it updates it.
webSession = self.getWebSessionFromNameIdentifier(nameIdentifier)
if webSession is None:
- webSession = self.getWebSession(httpRequest.client)
+ webSession = self.getWebSession(handler.httpRequest.client)
if webSession is not None:
sessionDump = webSession.sessionDump
if sessionDump is not None:
@@ -99,29 +98,29 @@ class ServiceProvider(Provider):
login.accept_sso()
if webUser is not None and identityDump is None:
- self.failUnless(login.is_identity_dirty())
+ failUnless(login.is_identity_dirty())
identity = login.get_identity()
- self.failUnless(identity)
+ failUnless(identity)
identityDump = identity.dump()
- self.failUnless(identityDump)
- self.failUnless(login.is_session_dirty())
+ failUnless(identityDump)
+ failUnless(login.is_session_dirty())
session = login.get_session()
- self.failUnless(session)
+ failUnless(session)
sessionDump = session.dump()
- self.failUnless(sessionDump)
+ failUnless(sessionDump)
# User is now authenticated.
# If there was no web session yet, create it. Idem for the web user account.
if webSession is None:
- webSession = self.createWebSession(httpRequest.client)
+ webSession = self.createWebSession(handler.httpRequest.client)
if webUser is None:
# A real service provider would ask user to login locally to create federation. Or it
# would ask user informations to create a local account.
- webUserId = httpRequest.client.keyring.get(self.url, None)
+ webUserId = handler.httpRequest.client.keyring.get(self.url, None)
userAuthenticated = webUserId in self.webUsers
if not userAuthenticated:
- return self.newHttpResponse(401, 'Access Unauthorized: User has no account.')
+ return handler.respond(401, 'Access Unauthorized: User has no account.')
webUser = self.webUsers[webUserId]
webSession.webUserId = webUser.uniqueId
@@ -134,32 +133,32 @@ class ServiceProvider(Provider):
self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId
self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId
- return self.newHttpResponse(200)
+ return handler.respond()
- def login(self, httpRequest):
- libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None)
- userAgent = httpRequest.headers.get('User-Agent', None)
+ def login(self, handler):
+ libertyEnabled = handler.httpRequest.headers.get('Liberty-Enabled', None)
+ userAgent = handler.httpRequest.headers.get('User-Agent', None)
# FIXME: Lasso should have a function to compute useLecp.
# Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent)
useLecp = False
if libertyEnabled:
useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled
if not useLecp:
- return self.newHttpResponse(501, 'Unsupported Liberty Version.')
+ return handler.respond(501, 'Unsupported Liberty Version.')
elif userAgent:
useLecp = 'urn:liberty:iff:2003-08' in userAgent
if not useLecp and "LIBV=" in userAgent:
- return self.newHttpResponse(501, 'Unsupported Liberty Version.')
+ return handler.respond(501, 'Unsupported Liberty Version.')
else:
useLecp = False
- forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False)
- isPassive = httpRequest.getQueryBoolean('isPassive', False)
+ forceAuthn = handler.httpRequest.getQueryBoolean('forceAuthn', False)
+ isPassive = handler.httpRequest.getQueryBoolean('isPassive', False)
server = self.getServer()
if useLecp:
lecp = lasso.Lecp.new(server)
lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None.
- self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest)
+ failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest)
# FIXME: This protocol profile should be set by default by Lasso.
lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost)
@@ -180,15 +179,14 @@ class ServiceProvider(Provider):
# FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded.
import base64
authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg)
- self.failUnless(authnRequestEnvelopeMsg)
+ failUnless(authnRequestEnvelopeMsg)
# FIXME: Lasso should set a lecp.msg_content_type to
# "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with
# other profiles.
# contentType = lecp.msg_content_type
- # self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml')
+ # failUnlessEqual(contentType, 'application/vnd.liberty-request+xml')
contentType = 'application/vnd.liberty-request+xml'
- return self.newHttpResponse(
- 200,
+ return handler.respond(
headers = {
'Content-Type': contentType,
'Cache-Control': 'no-cache',
@@ -198,7 +196,7 @@ class ServiceProvider(Provider):
else:
login = lasso.Login.new(server)
login.init_authn_request(self.idpSite.providerId)
- self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
+ failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
if forceAuthn:
login.request.set_forceAuthn(forceAuthn)
if not isPassive:
@@ -209,43 +207,43 @@ class ServiceProvider(Provider):
login.request.set_relayState(relayState)
login.build_authn_request_msg()
authnRequestUrl = login.msg_url
- self.failUnless(authnRequestUrl)
- return httpRequest.client.redirect(authnRequestUrl)
+ failUnless(authnRequestUrl)
+ return handler.respondRedirectTemporarily(authnRequestUrl)
- def logoutUsingSoap(self, httpRequest):
- webSession = self.getWebSession(httpRequest.client)
+ def logoutUsingSoap(self, handler):
+ webSession = self.getWebSession(handler.httpRequest.client)
if webSession is None:
- return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.')
+ return handler.respond(401, 'Access Unauthorized: User has no session opened.')
webUser = self.getWebUserFromWebSession(webSession)
if webUser is None:
- return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.')
+ return handler.respond(401, 'Access Unauthorized: User is not logged in.')
server = self.getServer()
logout = lasso.Logout.new(server, lasso.providerTypeSp)
- identityDump = self.getIdentityDump(httpRequest.client)
+ identityDump = self.getIdentityDump(handler.httpRequest.client)
if identityDump is not None:
logout.set_identity_from_dump(identityDump)
- sessionDump = self.getSessionDump(httpRequest.client)
+ sessionDump = self.getSessionDump(handler.httpRequest.client)
if sessionDump is not None:
logout.set_session_from_dump(sessionDump)
logout.init_request()
logout.build_request_msg()
soapEndpoint = logout.msg_url
- self.failUnless(soapEndpoint)
+ failUnless(soapEndpoint)
soapRequestMsg = logout.msg_body
- self.failUnless(soapRequestMsg)
+ failUnless(soapRequestMsg)
httpResponse = self.sendHttpRequest(
'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg)
- self.failUnlessEqual(httpResponse.statusCode, 200)
+ failUnlessEqual(httpResponse.statusCode, 200)
logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
- self.failIf(logout.is_identity_dirty())
+ failIf(logout.is_identity_dirty())
identity = logout.get_identity()
- self.failUnless(identity)
+ failUnless(identity)
identityDump = identity.dump()
- self.failUnless(identityDump)
- self.failUnless(logout.is_session_dirty())
+ failUnless(identityDump)
+ failUnless(logout.is_session_dirty())
session = logout.get_session()
if session is None:
# The user is no more authenticated on any identity provider. Log him out.
@@ -257,10 +255,10 @@ class ServiceProvider(Provider):
else:
# The user is still logged in on some other identity providers.
sessionDump = session.dump()
- self.failUnless(sessionDump)
+ failUnless(sessionDump)
webSession.sessionDump = sessionDump
nameIdentifier = logout.nameIdentifier
- self.failUnless(nameIdentifier)
+ failUnless(nameIdentifier)
del self.webSessionIdsByNameIdentifier[nameIdentifier]
- return self.newHttpResponse(200)
+ return handler.respond()