summaryrefslogtreecommitdiffstats
path: root/python/tests/ServiceProvider.py
diff options
context:
space:
mode:
authorEmmanuel Raviart <eraviart@entrouvert.com>2004-08-07 20:42:02 +0000
committerEmmanuel Raviart <eraviart@entrouvert.com>2004-08-07 20:42:02 +0000
commitb46a6f80382d309a4e0c4ebdca346c296b66a789 (patch)
treeae84b88cb68be02c4768bf2340e33e8f719f7da5 /python/tests/ServiceProvider.py
parent8d90adf21cc3023d92f8d264a510e9705c32ad81 (diff)
downloadlasso-b46a6f80382d309a4e0c4ebdca346c296b66a789.tar.gz
lasso-b46a6f80382d309a4e0c4ebdca346c296b66a789.tar.xz
lasso-b46a6f80382d309a4e0c4ebdca346c296b66a789.zip
Added LECP support in Python simulator and unit tests. I think I have found
several bugs in Lasso LECP implementation. My biggest problem is that I didn't find a way for IDP to set userAuthenticated, authenticationMethod, reauthenticateOnOrAfter to lecp before (or when) building response envelope with lecp.build_authn_response_envelope_msg(). Did I overlook something?
Diffstat (limited to 'python/tests/ServiceProvider.py')
-rw-r--r--python/tests/ServiceProvider.py160
1 files changed, 119 insertions, 41 deletions
diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py
index 4ecff2df..c548655b 100644
--- a/python/tests/ServiceProvider.py
+++ b/python/tests/ServiceProvider.py
@@ -21,8 +21,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-# FIXME: Replace principal with client in most methods.
-# FIXME: Rename webUser to userAccount.
import lasso
@@ -37,23 +35,46 @@ class ServiceProvider(Provider):
def assertionConsumer(self, httpRequest):
server = self.getServer()
login = lasso.Login.new(server)
- login.init_request(httpRequest.query, lasso.httpMethodRedirect)
- login.build_request_msg()
- soapEndpoint = login.msg_url
- self.failUnless(soapEndpoint)
- soapRequestMsg = login.msg_body
- self.failUnless(soapRequestMsg)
- httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
- self.failUnlessEqual(httpResponse.statusCode, 200)
- try:
- login.process_response_msg(httpResponse.body)
- except lasso.Error, error:
- if error.code == -7: # FIXME: This will change, he said.
- return HttpResponse(
- 401, 'Access Unauthorized: User authentication failed on identity provider.')
- else:
- raise
+ if httpRequest.method == 'GET':
+ login.init_request(httpRequest.query, lasso.httpMethodRedirect)
+ login.build_request_msg()
+
+ soapEndpoint = login.msg_url
+ self.failUnless(soapEndpoint)
+ soapRequestMsg = login.msg_body
+ self.failUnless(soapRequestMsg)
+ httpResponse = self.sendHttpRequest(
+ 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
+ body = soapRequestMsg)
+ self.failUnlessEqual(httpResponse.statusCode, 200)
+ try:
+ login.process_response_msg(httpResponse.body)
+ except lasso.Error, error:
+ if error.code == -7: # FIXME: This will change, he said.
+ return self.newHttpResponse(
+ 401,
+ 'Access Unauthorized: User authentication failed on identity provider.')
+ else:
+ raise
+ elif httpRequest.method == 'POST':
+ authnResponseMsg = httpRequest.getFormField('LARES', None)
+ self.failUnless(authnResponseMsg)
+ # FIXME: Should we do an init before process_authn_response_msg?
+ try:
+ login.process_authn_response_msg(authnResponseMsg)
+ except lasso.Error, error:
+ if error.code == -7: # FIXME: This will change, he said.
+ return self.newHttpResponse(
+ 401,
+ 'Access Unauthorized: User authentication failed on identity provider.')
+ else:
+ raise
+ else:
+ return self.newHttpResponse(
+ 400,
+ 'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method)
+
nameIdentifier = login.nameIdentifier
self.failUnless(nameIdentifier)
@@ -100,7 +121,7 @@ class ServiceProvider(Provider):
webUserId = httpRequest.client.keyring.get(self.url, None)
userAuthenticated = webUserId in self.webUsers
if not userAuthenticated:
- return HttpResponse(401, 'Access Unauthorized: User has no account.')
+ return self.newHttpResponse(401, 'Access Unauthorized: User has no account.')
webUser = self.webUsers[webUserId]
webSession.webUserId = webUser.uniqueId
@@ -113,35 +134,91 @@ class ServiceProvider(Provider):
self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId
self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId
- return HttpResponse(200)
+ return self.newHttpResponse(200)
+
+ def login(self, httpRequest):
+ libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None)
+ userAgent = httpRequest.headers.get('User-Agent', None)
+ # FIXME: Lasso should have a function to compute useLecp.
+ # Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent)
+ useLecp = False
+ if libertyEnabled:
+ useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled
+ if not useLecp:
+ return self.newHttpResponse(501, 'Unsupported Liberty Version.')
+ elif userAgent:
+ useLecp = 'urn:liberty:iff:2003-08' in userAgent
+ if not useLecp and "LIBV=" in userAgent:
+ return self.newHttpResponse(501, 'Unsupported Liberty Version.')
+ else:
+ useLecp = False
- def loginUsingRedirect(self, httpRequest):
- server = self.getServer()
- login = lasso.Login.new(server)
- login.init_authn_request(self.idpSite.providerId)
- self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False)
- if forceAuthn:
- login.request.set_forceAuthn(forceAuthn)
isPassive = httpRequest.getQueryBoolean('isPassive', False)
- if not isPassive:
- login.request.set_isPassive(isPassive)
- login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
- login.request.set_consent(lasso.libConsentObtained)
- relayState = 'fake'
- login.request.set_relayState(relayState)
- login.build_authn_request_msg()
- authnRequestUrl = login.msg_url
- self.failUnless(authnRequestUrl)
- return httpRequest.client.redirect(authnRequestUrl)
+ server = self.getServer()
+ if useLecp:
+ lecp = lasso.Lecp.new(server)
+ lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None.
+ self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest)
+
+ # FIXME: This protocol profile should be set by default by Lasso.
+ lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost)
+
+ # Same treatement as for non LECP login.
+ if forceAuthn:
+ lecp.request.set_forceAuthn(forceAuthn)
+ if not isPassive:
+ lecp.request.set_isPassive(isPassive)
+ lecp.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
+ lecp.request.set_consent(lasso.libConsentObtained)
+ relayState = 'fake'
+ lecp.request.set_relayState(relayState)
+
+ # FIXME: In my opinion, this method should be the renamed to build_authn_request_msg.
+ lecp.build_authn_request_envelope_msg()
+ authnRequestEnvelopeMsg = lecp.msg_body
+ # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded.
+ import base64
+ authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg)
+ self.failUnless(authnRequestEnvelopeMsg)
+ # FIXME: Lasso should set a lecp.msg_content_type to
+ # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with
+ # other profiles.
+ # contentType = lecp.msg_content_type
+ # self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml')
+ contentType = 'application/vnd.liberty-request+xml'
+ return self.newHttpResponse(
+ 200,
+ headers = {
+ 'Content-Type': contentType,
+ 'Cache-Control': 'no-cache',
+ 'Pragma': 'no-cache',
+ },
+ body = authnRequestEnvelopeMsg)
+ else:
+ login = lasso.Login.new(server)
+ login.init_authn_request(self.idpSite.providerId)
+ self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
+ if forceAuthn:
+ login.request.set_forceAuthn(forceAuthn)
+ if not isPassive:
+ login.request.set_isPassive(isPassive)
+ login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
+ login.request.set_consent(lasso.libConsentObtained)
+ relayState = 'fake'
+ login.request.set_relayState(relayState)
+ login.build_authn_request_msg()
+ authnRequestUrl = login.msg_url
+ self.failUnless(authnRequestUrl)
+ return httpRequest.client.redirect(authnRequestUrl)
def logoutUsingSoap(self, httpRequest):
webSession = self.getWebSession(httpRequest.client)
if webSession is None:
- return HttpResponse(401, 'Access Unauthorized: User has no session opened.')
+ return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.')
webUser = self.getWebUserFromWebSession(webSession)
if webUser is None:
- return HttpResponse(401, 'Access Unauthorized: User is not logged in.')
+ return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.')
server = self.getServer()
logout = lasso.Logout.new(server, lasso.providerTypeSp)
@@ -158,7 +235,8 @@ class ServiceProvider(Provider):
self.failUnless(soapEndpoint)
soapRequestMsg = logout.msg_body
self.failUnless(soapRequestMsg)
- httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
+ httpResponse = self.sendHttpRequest(
+ 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg)
self.failUnlessEqual(httpResponse.statusCode, 200)
logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
@@ -185,4 +263,4 @@ class ServiceProvider(Provider):
self.failUnless(nameIdentifier)
del self.webSessionIdsByNameIdentifier[nameIdentifier]
- return HttpResponse(200)
+ return self.newHttpResponse(200)