diff options
| author | Damien Laniel <dlaniel@entrouvert.com> | 2008-05-20 09:27:25 +0000 |
|---|---|---|
| committer | Damien Laniel <dlaniel@entrouvert.com> | 2008-05-20 09:27:25 +0000 |
| commit | 5711105dfb5bcaa9b09c19102c96811437bc6c08 (patch) | |
| tree | de612b82d5c487f88df6b3168aaf850e2a0e95f8 /bindings/python | |
| parent | 175af5a87be57317fa16c0fcef4340493b2cc68d (diff) | |
| download | lasso-5711105dfb5bcaa9b09c19102c96811437bc6c08.tar.gz lasso-5711105dfb5bcaa9b09c19102c96811437bc6c08.tar.xz lasso-5711105dfb5bcaa9b09c19102c96811437bc6c08.zip | |
added some tests for id-wsf 1.1 with new python bindings
Diffstat (limited to 'bindings/python')
| -rwxr-xr-x | bindings/python/tests/idwsf1_tests.py | 341 |
1 files changed, 341 insertions, 0 deletions
diff --git a/bindings/python/tests/idwsf1_tests.py b/bindings/python/tests/idwsf1_tests.py new file mode 100755 index 00000000..2162a523 --- /dev/null +++ b/bindings/python/tests/idwsf1_tests.py @@ -0,0 +1,341 @@ +#! /usr/bin/env python +# -*- coding: UTF-8 -*- +# +# Python unit tests for Lasso library +# +# Copyright (C) 2004-2007 Entr'ouvert +# http://lasso.entrouvert.org +# +# Authors: See AUTHORS file in top-level directory. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import os +import unittest +import sys + +if not '..' in sys.path: + sys.path.insert(0, '..') +if not '../.libs' in sys.path: + sys.path.insert(0, '../.libs') + +import lasso + +try: + dataDir +except NameError: + dataDir = '../../../tests/data' + +wsp_metadata = os.path.join(dataDir, 'sp1-la/metadata.xml') +wsp_private_key = os.path.join(dataDir, 'sp1-la/private-key-raw.pem') +wsp_public_key = os.path.join(dataDir, 'sp1-la/public-key.pem') +wsc_metadata = os.path.join(dataDir, 'sp2-la/metadata.xml') +wsc_private_key = os.path.join(dataDir, 'sp2-la/private-key-raw.pem') +wsc_public_key = os.path.join(dataDir, 'sp2-la/public-key.pem') +idp_metadata = os.path.join(dataDir, 'idp1-la/metadata.xml') +idp_private_key = os.path.join(dataDir, 'idp1-la/private-key-raw.pem') +idp_public_key = os.path.join(dataDir, 'idp1-la/public-key.pem') + +abstract_description = "Personal Profile Resource" +resource_id = "http://idp/user/resources/1" + +class IdWsf1TestCase(unittest.TestCase): + def get_wsp_server(self): + server = lasso.Server(wsp_metadata, wsp_private_key, None, None) + server.addProvider(lasso.PROVIDER_ROLE_IDP, idp_metadata, idp_public_key, None) + return server + + def get_wsc_server(self): + server = lasso.Server(wsc_metadata, wsc_private_key, None, None) + server.addProvider(lasso.PROVIDER_ROLE_IDP, idp_metadata, idp_public_key, None) + return server + + def get_idp_server(self): + server = lasso.Server(idp_metadata, idp_private_key, None, None) + server.addProvider(lasso.PROVIDER_ROLE_SP, wsp_metadata, wsp_public_key, None) + server.addProvider(lasso.PROVIDER_ROLE_SP, wsc_metadata, wsc_public_key, None) + return server + + def add_services(self, idp): + # Add Discovery service + disco_description = lasso.DiscoDescription.newWithBriefSoapHttpDescription( + lasso.SECURITY_MECH_NULL, + "http://idp/discovery/soapEndpoint", + "Discovery SOAP Endpoint description"); + disco_service_instance = lasso.DiscoServiceInstance( + lasso.DISCO_HREF, + "http://idp/providerId", + disco_description); + idp.addService(disco_service_instance); + + # Add Personal Profile service + pp_description = lasso.DiscoDescription.newWithBriefSoapHttpDescription( + lasso.SECURITY_MECH_NULL, + "http://idp/pp/soapEndpoint", + "Discovery SOAP Endpoint description"); + pp_service_instance = lasso.DiscoServiceInstance( + lasso.PP_HREF, + "http://idp/providerId", + pp_description); + idp.addService(pp_service_instance); + return idp + + def login(self, sp, idp): + sp_login = lasso.Login(sp) + sp_login.initAuthnRequest(sp.providerIds[0], lasso.HTTP_METHOD_POST) + sp_login.request.nameIdPolicy = lasso.LIB_NAMEID_POLICY_TYPE_FEDERATED + sp_login.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_POST + sp_login.buildAuthnRequestMsg() + + idp_login = lasso.Login(idp) + idp_login.processAuthnRequestMsg(sp_login.msgBody) + idp_login.validateRequestMsg(True, True) + + # Set a resource offering in the assertion + discovery_resource_id = "http://idp/discovery/resources/1" + idp_login.setResourceId(discovery_resource_id) + idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None) + idp_login.buildAuthnResponseMsg() + + sp_login = lasso.Login(sp) + sp_login.processAuthnResponseMsg(idp_login.msgBody) + sp_login.acceptSso() + + return sp_login.identity.dump(), sp_login.session.dump(), idp_login.identity.dump(), idp_login.session.dump() + + def get_resource_offering(self, soap_end_point='http://idp/pp/soapEndpoint'): + service_instance = lasso.DiscoServiceInstance( + lasso.PP_HREF, + self.idp.providerId, + lasso.DiscoDescription_newWithBriefSoapHttpDescription( + lasso.SECURITY_MECH_NULL, + soap_end_point)) + resource_offering = lasso.DiscoResourceOffering(service_instance) + resource_offering.resourceId = lasso.DiscoResourceID(resource_id) + resource_offering.abstract = abstract_description + return resource_offering + + def get_pp_service(self): + self.wsc = self.get_wsc_server() + self.idp = self.get_idp_server() + self.idp = self.add_services(self.idp) + + # Login from WSC + sp_identity_dump, sp_session_dump, idp_identity_dump, idp_session_dump = self.login(self.wsc, self.idp) + + # Init discovery query + wsc_disco = lasso.Discovery(self.wsc) + wsc_disco.setSessionFromDump(sp_session_dump) + wsc_disco.initQuery() + wsc_disco.addRequestedServiceType(lasso.PP_HREF) + wsc_disco.buildRequestMsg() + + # Process query + idp_disco = lasso.Discovery(self.idp) + idp_disco.processQueryMsg(wsc_disco.msgBody) + idp_disco.setIdentityFromDump(idp_identity_dump) + idp_disco.identity.addResourceOffering(self.get_resource_offering()) + idp_disco.buildResponseMsg() + + # Process response + wsc_disco.processQueryResponseMsg(idp_disco.msgBody); + return wsc_disco.getService() + +class DiscoveryQueryTestCase(IdWsf1TestCase): + def test01(self): + '''Test a discovery query''' + service = self.get_pp_service() + # Check service attributes + self.failUnless(service.resourceId is not None) + self.failUnless(service.resourceId.content == resource_id) + self.failUnless(service.providerId == self.wsc.providerIds[0]) + self.failUnless(service.abstractDescription == abstract_description) + +class DiscoveryModifyTestCase(IdWsf1TestCase): + def test01(self): + '''Test a discovery modify''' + self.wsp = self.get_wsp_server() + self.idp = self.get_idp_server() + self.idp = self.add_services(self.idp) + + # Login from WSP + sp_identity_dump, sp_session_dump, idp_identity_dump, idp_session_dump = self.login(self.wsp, self.idp) + + # Init discovery modify + wsp_disco = lasso.Discovery(self.wsp) + wsp_disco.setIdentityFromDump(sp_identity_dump) + wsp_disco.setSessionFromDump(sp_session_dump) + wsp_disco.initInsert(self.get_resource_offering()) + wsp_disco.buildRequestMsg() + + # Process Modify + request_type = lasso.getRequestTypeFromSoapMsg(wsp_disco.msgBody) + self.failUnless(request_type == lasso.REQUEST_TYPE_DISCO_MODIFY) + idp_disco = lasso.Discovery(self.idp) + idp_disco.processModifyMsg(wsp_disco.msgBody) + idp_disco.setIdentityFromDump(idp_identity_dump) + idp_disco.identity.addResourceOffering(self.get_resource_offering()) + idp_disco.buildModifyResponseMsg() + self.failUnless('<disco:Status code="OK"/>' in idp_disco.msgBody) + self.failUnless('<disco:ModifyResponse newEntryIDs="' in idp_disco.msgBody) + self.failUnless('<disco:ServiceType>urn:liberty:id-sis-pp:2003-08</disco:ServiceType>' in + idp_disco.identity.dump()) + + # Process Response + wsp_disco.processModifyResponseMsg(idp_disco.msgBody) + +class DataServiceQueryTestCase(IdWsf1TestCase): + def test01(self): + '''Test a data service query''' + wsc_service = self.get_pp_service() + wsc_service.initQuery('/pp:PP/pp:InformalName', 'name') + wsc_service.buildRequestMsg() + self.failUnless(lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody) + == lasso.REQUEST_TYPE_DST_QUERY) + + self.wsp = self.get_wsp_server() + wsp_service = lasso.DataService(self.wsp) + wsp_service.processQueryMsg(wsc_service.msgBody) + wsp_service.resourceData = ''' + <PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <InformalName>Damien</InformalName> + </PP>''' + wsp_service.buildResponseMsg() + + wsc_service.processQueryResponseMsg(wsp_service.msgBody) + self.failUnless(wsc_service.getAnswer('/pp:PP/pp:InformalName') == + '<InformalName xmlns="urn:liberty:id-sis-pp:2003-08">Damien</InformalName>') + +class DataServiceModifyTestCase(IdWsf1TestCase): + def test01(self): + '''Test a data service modify''' + + xpath = '/pp:PP/pp:InformalName' + old_data = ''' + <PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <InformalName>Damien</InformalName> + </PP>''' + new_data = '<InformalName>Alain</InformalName>' + + new_full_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <pp:InformalName xmlns:pp="urn:liberty:id-sis-pp:2003-08">Alain</pp:InformalName> + </PP>''' + + wsc_service = self.get_pp_service() + wsc_service.initModify(xpath, new_data) + wsc_service.buildRequestMsg() + + request_type = lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody) + self.failUnless(request_type == lasso.REQUEST_TYPE_DST_MODIFY) + + self.wsp = self.get_wsp_server() + wsp_service = lasso.DataService(self.wsp) + wsp_service.processModifyMsg(wsc_service.msgBody) + + item = wsp_service.request.modification[0] + self.failUnless(item.newData.any[0] == + '<pp:InformalName xmlns:pp="urn:liberty:id-sis-pp:2003-08">Alain</pp:InformalName>') + self.failUnless(item.select == '/pp:PP/pp:InformalName') + + wsp_service.resourceData = old_data + wsp_service.buildModifyResponseMsg() + # Save the new wsp_service.resourceData here + + self.failUnless(wsp_service.resourceData == new_full_data) + + wsc_service.processModifyResponseMsg(wsp_service.msgBody) + + def test02(self): + '''Test a data service modify - root element''' + + xpath = '/pp:PP' + old_data = ''' + <PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <InformalName>Damien</InformalName> + </PP>''' + new_data = ''' + <PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <InformalName>Alain</InformalName> + </PP>''' + + new_full_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <InformalName>Alain</InformalName> + </PP>''' + + wsc_service = self.get_pp_service() + wsc_service.initModify(xpath, new_data) + wsc_service.buildRequestMsg() + + request_type = lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody) + self.failUnless(request_type == lasso.REQUEST_TYPE_DST_MODIFY) + + self.wsp = self.get_wsp_server() + wsp_service = lasso.DataService(self.wsp) + wsp_service.processModifyMsg(wsc_service.msgBody) + wsp_service.resourceData = old_data + wsp_service.buildModifyResponseMsg() + # Save the new wsp_service.resourceData here + + self.failUnless(wsp_service.resourceData == new_full_data) + + wsc_service.processModifyResponseMsg(wsp_service.msgBody) + + def test03(self): + '''Test a data service modify with redirect for consent''' + + xpath = '/pp:PP/pp:InformalName' + old_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <InformalName>Damien</InformalName> + </PP>''' + new_data = '<InformalName>Alain</InformalName>' + + new_full_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08"> + <pp:InformalName xmlns:pp="urn:liberty:id-sis-pp:2003-08">Alain</pp:InformalName> + </PP>''' + redir_url = 'http://site/redirect_for_consent' + + wsc_service = self.get_pp_service() + wsc_service.initModify(xpath, new_data) + wsc_service.buildRequestMsg() + + request_type = lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody) + self.failUnless(request_type == lasso.REQUEST_TYPE_DST_MODIFY) + + self.wsp = self.get_wsp_server() + wsp_service = lasso.DataService(self.wsp) + wsp_service.processModifyMsg(wsc_service.msgBody) + wsp_service.resourceData = old_data + + wsp_service.needRedirectUser(redir_url) + wsp_service.buildModifyResponseMsg() + # Save the new wsp_service.resourceData here + + # Data mustn't have been modified here + self.failUnless(wsp_service.resourceData == old_data) + + wsc_service.processModifyResponseMsg(wsp_service.msgBody) + self.failUnless(wsc_service.getRedirectRequestUrl() == redir_url) + + +discoveryQuerySuite = unittest.makeSuite(DiscoveryQueryTestCase, 'test') +discoveryModifySuite = unittest.makeSuite(DiscoveryModifyTestCase, 'test') +dataServiceQuerySuite = unittest.makeSuite(DataServiceQueryTestCase, 'test') +dataServiceModifySuite = unittest.makeSuite(DataServiceModifyTestCase, 'test') + +allTests = unittest.TestSuite((discoveryQuerySuite, discoveryModifySuite, dataServiceQuerySuite, dataServiceModifySuite)) + +if __name__ == '__main__': + sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful()) + |
