diff options
Diffstat (limited to 'python/tests/idwsf2_tests.py')
-rwxr-xr-x | python/tests/idwsf2_tests.py | 288 |
1 files changed, 272 insertions, 16 deletions
diff --git a/python/tests/idwsf2_tests.py b/python/tests/idwsf2_tests.py index b08f1e27..b86cc7fe 100755 --- a/python/tests/idwsf2_tests.py +++ b/python/tests/idwsf2_tests.py @@ -51,6 +51,7 @@ class IdWsf2TestCase(unittest.TestCase): server = lasso.Server(wsp_metadata, wsp_private_key, None, None) server.addProvider(lasso.PROVIDER_ROLE_IDP, idp_metadata, None, None) + server.setEncryptionPrivateKey(wsp_private_key); return server; @@ -75,6 +76,7 @@ class IdWsf2TestCase(unittest.TestCase): server = lasso.Server(idp_metadata, idp_private_key, None, None) server.addProvider(lasso.PROVIDER_ROLE_SP, wsp_metadata, None, None) + server.getProvider(server.providerIds[0]).setEncryptionMode(lasso.ENCRYPTION_MODE_NAMEID); server.addProvider(lasso.PROVIDER_ROLE_SP, wsc_metadata, None, None) self.idp_server_dump = server.dump() @@ -116,9 +118,9 @@ class IdWsf2TestCase(unittest.TestCase): idp_login = lasso.Login(idp) query = sp_login.msgUrl.split('?')[1] if idp_identity_dump is not None: - login.setIdentityFromDump(idp_identity_dump) + idp_login.setIdentityFromDump(idp_identity_dump) if idp_session_dump is not None: - login.setSessionFromDump(idp_session_dump) + idp_login.setSessionFromDump(idp_session_dump) idp_login.processAuthnRequestMsg(query) idp_login.validateRequestMsg(True, True) idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None) @@ -147,7 +149,7 @@ class IdWsf2TestCase(unittest.TestCase): sp_identity_dump = sp_login.identity.dump() if sp_login.isSessionDirty: sp_session_dump = sp_login.session.dump() - + return sp_identity_dump, sp_session_dump, idp_identity_dump, idp_session_dump @@ -599,7 +601,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -689,7 +692,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -734,7 +738,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -782,7 +787,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -829,7 +835,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -847,6 +854,57 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): self.fail(e) def test07(self): + """Process discovery query and check name identifier""" + idp = self.getIdpServer() + idp = self.idpRegisterSelf(idp) + wsp = self.getWspServer() + idp, svcMDID = self.metadataRegister(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) + + wsp_disco = lasso.IdWsf2Discovery(wsp) + if wsp_identity_dump is not None: + wsp_disco.setIdentityFromDump(wsp_identity_dump) + if wsp_session_dump is not None: + wsp_disco.setSessionFromDump(wsp_session_dump) + wsp_disco.initMetadataAssociationAdd(svcMDID) + wsp_disco.buildRequestMsg() + + idp_disco = lasso.IdWsf2Discovery(idp) + idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) + if idp_identity_dump is not None: + idp_disco.setIdentityFromDump(idp_identity_dump) + if idp_session_dump is not None: + idp_disco.setSessionFromDump(idp_session_dump) + + idp_disco.registerMetadata() + if idp_disco.isIdentityDirty: + idp_identity_dump = idp_disco.identity.dump() + if idp_disco.isSessionDirty: + idp_session_dump = idp_disco.session.dump() + idp_disco.buildResponseMsg() + + wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) + + wsc = self.getWscServer() + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) + + wsc_disco = lasso.IdWsf2Discovery(wsc) + if wsc_identity_dump is not None: + wsc_disco.setIdentityFromDump(wsc_identity_dump) + if wsc_session_dump is not None: + wsc_disco.setSessionFromDump(wsc_session_dump) + + wsc_disco.initQuery() + wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') + wsc_disco.buildRequestMsg() + + idp_disco.processQueryMsg(wsc_disco.msgBody) + + self.failUnless(idp_disco.nameIdentifier and idp_disco.nameIdentifier.content, + 'missing name identifier') + + def test08(self): """Build discovery query response EPRs""" idp = self.getIdpServer() idp = self.idpRegisterSelf(idp) @@ -879,7 +937,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -902,7 +961,7 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): except lasso.Error, e: self.fail(e) - def test08(self): + def test09(self): """Build discovery query response""" idp = self.getIdpServer() idp = self.idpRegisterSelf(idp) @@ -935,7 +994,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -957,7 +1017,7 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): self.failUnless(idp_disco.msgBody, 'missing msgBody') - def test09(self): + def test10(self): """Process discovery query response""" idp = self.getIdpServer() idp = self.idpRegisterSelf(idp) @@ -990,7 +1050,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -1015,7 +1076,7 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): except lasso.Error, e: self.fail(e) - def test10(self): + def test11(self): """Check discovery query result""" idp = self.getIdpServer() idp = self.idpRegisterSelf(idp) @@ -1048,7 +1109,8 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = self.login(wsc, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) if wsc_identity_dump is not None: @@ -1073,13 +1135,207 @@ class DiscoveryQueryTestCase(IdWsf2TestCase): self.failUnless(wsc_disco.getService(), 'missing service after discovery query') +class DataServiceQueryTestCase(IdWsf2TestCase): + def getProfileService(self): + """Check discovery query result""" + idp = self.getIdpServer() + idp = self.idpRegisterSelf(idp) + wsp = self.getWspServer() + idp, svcMDID = self.metadataRegister(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) + + wsp_disco = lasso.IdWsf2Discovery(wsp) + if wsp_identity_dump is not None: + wsp_disco.setIdentityFromDump(wsp_identity_dump) + if wsp_session_dump is not None: + wsp_disco.setSessionFromDump(wsp_session_dump) + wsp_disco.initMetadataAssociationAdd(svcMDID) + wsp_disco.buildRequestMsg() + + idp_disco = lasso.IdWsf2Discovery(idp) + idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) + if idp_identity_dump is not None: + idp_disco.setIdentityFromDump(idp_identity_dump) + if idp_session_dump is not None: + idp_disco.setSessionFromDump(idp_session_dump) + + idp_disco.registerMetadata() + if idp_disco.isIdentityDirty: + idp_identity_dump = idp_disco.identity.dump() + if idp_disco.isSessionDirty: + idp_session_dump = idp_disco.session.dump() + idp_disco.buildResponseMsg() + + wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) + + wsc = self.getWscServer() + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ + self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) + + wsc_disco = lasso.IdWsf2Discovery(wsc) + if wsc_identity_dump is not None: + wsc_disco.setIdentityFromDump(wsc_identity_dump) + if wsc_session_dump is not None: + wsc_disco.setSessionFromDump(wsc_session_dump) + + wsc_disco.initQuery() + wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') + wsc_disco.buildRequestMsg() + + idp_disco.processQueryMsg(wsc_disco.msgBody) + if idp_identity_dump is not None: + idp_disco.setIdentityFromDump(idp_identity_dump) + if idp_session_dump is not None: + idp_disco.setSessionFromDump(idp_session_dump) + idp_disco.buildQueryResponseEprs() + idp_disco.buildResponseMsg() + + wsc_disco.processQueryResponseMsg(idp_disco.msgBody) + + return wsc_disco.getService(), wsp + + def test01(self): + """Data service init query""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + + try: + service.initQuery() + except lasso.Error, e: + self.fail(e) + + def test02(self): + """Data service init query - msgUrl construction""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + + self.failUnless(service.msgUrl, 'missing msgUrl') + + def test03(self): + """Data service add query item""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + + try: + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + except lasso.Error, e: + self.fail(e) + + def test04(self): + """Data service build query""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + service.buildRequestMsg() + + self.failUnless(service.msgBody, 'missing msgBody') + + def test05(self): + """Data service build query with multiple items""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + service.addQueryItem('not existing attribute', 'not existing attribute') + service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') + service.buildRequestMsg() + + self.failUnless(service.msgBody, 'missing msgBody') + + def test06(self): + """Data service process query""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + service.addQueryItem('not existing attribute', 'not existing attribute') + service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') + service.buildRequestMsg() + + wsp_service = lasso.IdWsf2DataService(wsp) + + try: + wsp_service.processQueryMsg(service.msgBody) + except lasso.Error, e: + self.fail(e) + + def test07(self): + """Data service check service type""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + service.addQueryItem('not existing attribute', 'not existing attribute') + service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') + service.buildRequestMsg() + + wsp_service = lasso.IdWsf2DataService(wsp) + wsp_service.processQueryMsg(service.msgBody) + + self.failUnless(wsp_service.type, 'service type is not set') + self.failUnless(wsp_service.type == 'urn:liberty:id-sis-pp:2005-05', 'wrong service type') + + def test08(self): + """Data service get query items""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + service.addQueryItem('not existing attribute', 'not existing attribute') + service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') + service.buildRequestMsg() + + wsp_service = lasso.IdWsf2DataService(wsp) + wsp_service.processQueryMsg(service.msgBody) + + self.failUnless(wsp_service.queryItems, 'queryItems list is None or empty') + + def test09(self): + """Data service check query items""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + service.addQueryItem('not existing attribute', 'not existing attribute') + service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') + service.buildRequestMsg() + + wsp_service = lasso.IdWsf2DataService(wsp) + wsp_service.processQueryMsg(service.msgBody) + + items = [ '/pp2:PP/pp2:InformalName', 'not existing attribute', '/pp2:PP/pp2:MsgContact' ] + for i in range(3): + self.failUnless(wsp_service.queryItems[i] == items[i], + "query items don't match : %s != %s" % (wsp_service.queryItems[i], items[i])) + + def test09(self): + """Data service check query items""" + service, wsp = self.getProfileService() + lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') + service.initQuery() + service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') + service.addQueryItem('not existing attribute', 'not existing attribute') + service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') + service.buildRequestMsg() + + wsp_service = lasso.IdWsf2DataService(wsp) + wsp_service.processQueryMsg(service.msgBody) + + self.failUnless(wsp_service.nameIdentifier and wsp_service.nameIdentifier.content, + 'missing name identifier') + + idpSelfRegistrationSuite = unittest.makeSuite(IdpSelfRegistrationTestCase, 'test') metadataRegisterSuite = unittest.makeSuite(MetadataRegisterTestCase, 'test') metadataAssociationAddSuite = unittest.makeSuite(MetadataAssociationAddTestCase, 'test') discoveryQuerySuite = unittest.makeSuite(DiscoveryQueryTestCase, 'test') +dataServiceQuerySuite = unittest.makeSuite(DataServiceQueryTestCase, 'test') allTests = unittest.TestSuite((idpSelfRegistrationSuite, metadataRegisterSuite, - metadataAssociationAddSuite, discoveryQuerySuite)) + metadataAssociationAddSuite, discoveryQuerySuite, dataServiceQuerySuite)) if __name__ == '__main__': sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful()) |