diff options
Diffstat (limited to 'bindings/python/tests/idwsf2_tests.py')
-rwxr-xr-x | bindings/python/tests/idwsf2_tests.py | 1519 |
1 files changed, 160 insertions, 1359 deletions
diff --git a/bindings/python/tests/idwsf2_tests.py b/bindings/python/tests/idwsf2_tests.py index 1f3c7f46..8e8a53a6 100755 --- a/bindings/python/tests/idwsf2_tests.py +++ b/bindings/python/tests/idwsf2_tests.py @@ -28,6 +28,7 @@ import os import unittest import sys +import pprint if not '..' in sys.path: sys.path.insert(0, '..') @@ -86,25 +87,81 @@ class IdWsf2TestCase(unittest.TestCase): return server - def metadataRegister(self, wsp, idp): + def metadataRegister(self, wsp, idp, session_dump, abstract = None, address = None, provider_id = None, service_types = None, services_map = None): + session = lasso.Session.newFromDump(session_dump) + assertion = session.getAssertion(idp.providerId) + self.failUnless(assertion is not None) + epr = assertion.idwsf2GetDiscoveryBootstrapEpr() + self.failUnless(epr is not None) wsp_disco = lasso.IdWsf2Discovery(wsp) + wsp_disco.setEpr(epr) abstract = 'Personal Profile service' + self.failUnless(abstract is not None) + self.failUnless(address is not None) + self.failUnless(service_types is not None) + self.failUnless(isinstance(services_map, dict)) wsp_disco.initMetadataRegister() + if not provider_id: + provider_id = wsp.providerId wsp_disco.addSimpleServiceMetadata( - service_types = ('urn:liberty:id-sis-pp:2005-05',), - abstract = abstract, provider_id = wsp.providerIds[0], - address = idpSoapEndpoint) + service_types = service_types, + abstract = abstract, provider_id = provider_id, + address = address, + security_mechanisms = (lasso.SECURITY_MECH_BEARER,)) wsp_disco.buildRequestMsg() idp_disco = lasso.IdWsf2Discovery(idp) idp_disco.processRequestMsg(wsp_disco.msgBody) + idp_disco.checkSecurityMechanism() idp_disco.validateRequest() self.failUnlessEqual(len(idp_disco.metadatas), 1) + # add metadatas to directory + sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId() + self.failUnless(sender is not None) + metadatas = services_map.get(sender, []) + for metadata in idp_disco.metadatas: + services_map[metadata.svcMDID] = metadata + metadatas.append(metadata.svcMDID) + services_map[sender] = metadatas idp_disco.buildResponseMsg() wsp_disco.processResponseMsg(idp_disco.msgBody) self.failUnlessEqual(len(wsp_disco.metadatas), 1) self.failUnlessEqual(wsp_disco.metadatas[0].svcMDID, wsp_disco.response.svcMDID[0]) - return idp, wsp_disco.metadatas[0] + return wsp_disco.metadatas[0].svcMDID + + def addAssociation(self, wsp, idp, session_dump, svcmdid, service_maps, service_associations): + self.failUnless(isinstance(service_associations, dict)) + self.failUnless(isinstance(service_maps, dict)) + # Get the bootstrap + session = lasso.Session.newFromDump(session_dump) + assertion = session.getAssertion(idp.providerId) + self.failUnless(assertion is not None) + epr = assertion.idwsf2GetDiscoveryBootstrapEpr() + self.failUnless(epr is not None) + wsp_disco = lasso.IdWsf2Discovery(wsp) + wsp_disco.setEpr(epr) + wsp_disco.initMetadataAssociationAdd() + wsp_disco.svcmdids = (svcmdid,) + wsp_disco.buildRequestMsg() + # Handle request + idp_disco = lasso.IdWsf2Discovery(idp) + idp_disco.processRequestMsg(wsp_disco.msgBody) + idp_disco.checkSecurityMechanism() + self.failUnlessEqual(idp_disco.svcmdids, (svcmdid,)) + sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId() + name_identifier = idp_disco.getNameIdentifier() + k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content) + l = service_associations.get(k, []) + for id in idp_disco.svcmdids: + # check it exists + self.failUnless(service_maps.get(id) is not None) + # create association + if id not in l: + l.append(id) + service_associations[k] = l + idp_disco.validateRequest() + idp_disco.buildResponseMsg() + wsp_disco.processResponseMsg(idp_disco.msgBody) def login(self, sp, idp, sp_identity_dump=None, sp_session_dump=None, idp_identity_dump=None, idp_session_dump=None): @@ -124,7 +181,7 @@ class IdWsf2TestCase(unittest.TestCase): idp_login.processAuthnRequestMsg(query) idp_login.validateRequestMsg(True, True) idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None) - idp_login.idwsf2AddDiscoveryBootstrapEpr(url = idpSoapEndpoint, abstract = 'Discovery Service', security_mech_id = lasso.SECURITY_MECH_BEARER) + idp_login.idwsf2AddDiscoveryBootstrapEpr(url = idpSoapEndpoint, abstract = 'Discovery Service', security_mechanisms = (lasso.SECURITY_MECH_BEARER,)) idp_login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET) artifact_message = idp_login.artifactMessage @@ -170,7 +227,7 @@ class MetadataTestCase(IdWsf2TestCase): wsp_disco.addSimpleServiceMetadata(service_types = (lasso.PP11_HREF,), abstract = abstract, provider_id = wsp.providerId, address = spSoapEndpoint, - security_mech_ids = (lasso.SECURITY_MECH_BEARER,)) + security_mechanisms = (lasso.SECURITY_MECH_BEARER,)) self.failUnlessEqual(len(wsp_disco.metadatas), 1) metadata = wsp_disco.metadatas[0] self.failUnlessEqual(metadata.abstract, abstract) @@ -256,7 +313,7 @@ class MetadataTestCase(IdWsf2TestCase): wsp_disco.addSimpleServiceMetadata(service_types = (lasso.PP11_HREF,), abstract = abstract, provider_id = wsp.providerId, address = spSoapEndpoint, - security_mech_ids= (lasso.SECURITY_MECH_BEARER,)) + security_mechanisms= (lasso.SECURITY_MECH_BEARER,)) self.failUnlessEqual(len(wsp_disco.metadatas), 1) metadata = wsp_disco.metadatas[0] self.failUnlessEqual(metadata.abstract, abstract) @@ -320,7 +377,6 @@ class MetadataTestCase(IdWsf2TestCase): wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) - print dst_epr.dump() abstract = 'Personal Profile service' wsp_disco.initMetadataRegister() @@ -333,7 +389,7 @@ class MetadataTestCase(IdWsf2TestCase): wsp_disco.addSimpleServiceMetadata(service_types = (lasso.PP11_HREF,), abstract = abstract, provider_id = wsp.providerId, address = spSoapEndpoint, - security_mech_ids = (lasso.SECURITY_MECH_BEARER,)) + security_mechanisms = (lasso.SECURITY_MECH_BEARER,)) self.failUnlessEqual(len(wsp_disco.metadatas), 1) metadata = wsp_disco.metadatas[0] self.failUnlessEqual(metadata.abstract, abstract) @@ -422,7 +478,6 @@ class MetadataTestCase(IdWsf2TestCase): try: idp_disco.checkSecurityMechanism() except lasso.Error, e: - print wsp_disco.msgBody self.fail(e) try: idp_disco.validateRequest() @@ -459,1399 +514,145 @@ class MetadataTestCase(IdWsf2TestCase): self.failUnless(len(wsp_disco.metadatas) == 1, 'missing svcMDID') self.failUnless(wsp_disco.metadatas[0].svcMDID, 'missing svcMDID') -class MetadataAssociationAddTestCase(IdWsf2TestCase): - def test01(self): - """Init metadata association add request""" + def test04(self): + """Test metadata query""" idp = self.getIdpServer() wsp = self.getWspServer() - idp, svcMD = self.metadataRegister(wsp, idp) wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - try: - wsp_disco.initMetadataAssociationAdd() - except lasso.Error, e: - self.fail(e) - try: - wsp_disco.addServiceMetadata(svcMD) - except lasso.Error, e: - self.fail(e) - - def test03(self): - """Init metadata association add request - msgUrl construction""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMD = self.metadataRegister(wsp, idp) + service_map = {} + self.metadataRegister(wsp, idp, wsp_session_dump, service_types = + (lasso.PP11_HREF,), address = spSoapEndpoint, + abstract = 'My first PP service', services_map = service_map) + self.metadataRegister(wsp, idp, wsp_session_dump, service_types = + (lasso.PP11_HREF,), address = spSoapEndpoint+'2', + abstract = 'My second PP service', services_map = service_map) wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) - wsp_disco.initMetadataAssociationAdd() - wsp_disco.addServiceMetadata(svcMD) + wsp_disco.initMetadataQuery() wsp_disco.buildRequestMsg() - self.failUnless(wsp_disco.msgUrl, 'missing msgUrl') - self.failUnless(wsp_disco.msgBody, 'missing msgBody') + idp_disco = lasso.IdWsf2Discovery(idp) + idp_disco.processRequestMsg(wsp_disco.msgBody) + idp_disco.checkSecurityMechanism() + self.failUnlessEqual(idp_disco.svcmdids, None) + sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId() + for svcMDID in service_map.get(sender, []): + idp_disco.addServiceMetadata(service_map.get(svcMDID)) + idp_disco.validateRequest() + idp_disco.buildResponseMsg() + wsp_disco.processResponseMsg(idp_disco.msgBody) + self.failUnless(len(wsp_disco.metadatas), 2) def test05(self): - """Process metadata association add request""" + """Test metadata delete""" idp = self.getIdpServer() wsp = self.getWspServer() - idp, svcMD = self.metadataRegister(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + service_map = {} + self.metadataRegister(wsp, idp, wsp_session_dump, service_types = + (lasso.PP11_HREF,), address = spSoapEndpoint, + abstract = 'My first PP service', services_map = service_map) + self.metadataRegister(wsp, idp, wsp_session_dump, service_types = + (lasso.PP11_HREF,), address = spSoapEndpoint+'2', + abstract = 'My second PP service', services_map = service_map) wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) - wsp_disco.initMetadataAssociationAdd() - wsp_disco.addServiceMetadata(svcMD) + wsp_disco.initMetadataDelete() + svcmdids = tuple(service_map[wsp.providerId]) + wsp_disco.setSvcmdids(svcmdids) wsp_disco.buildRequestMsg() idp_disco = lasso.IdWsf2Discovery(idp) - - try: - idp_disco.processRequestMsg(wsp_disco.msgBody) - except lasso.Error, e: - self.fail(e) - - try: - idp_disco.checkSecurityMechanism() - except lasso.Error, e: - self.fail(e) - - try: - idp_disco.validateRequest() - except lasso.Error, e: - self.fail(e) - - def test06(self): - """Register metadata association""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - try: - idp_disco.registerMetadata() - except lasso.Error, e: - self.fail(e) - - def test07(self): - """Check metadata association registration""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - idp_disco.registerMetadata() - - self.failUnless(idp_disco.isIdentityDirty, 'identity has not changed, it should contain a svcMDID') - self.failUnless(idp_disco.identity.dump() != idp_identity_dump, - 'identity dump has not changed, it should contain a svcMDID') - - def test08(self): - """Build metadata association add response""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - - idp_disco.buildResponseMsg() - - self.failUnless(idp_disco.msgBody) - - def test09(self): - """Process metadata association add response""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() + idp_disco.processRequestMsg(wsp_disco.msgBody) + idp_disco.checkSecurityMechanism() + self.failUnlessEqual(idp_disco.svcmdids, svcmdids) + sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId() + self.failUnlessEqual(sender, wsp.providerId) + idp_disco.validateRequest() idp_disco.buildResponseMsg() + wsp_disco.processResponseMsg(idp_disco.msgBody) - try: - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - except lasso.Error, e: - self.fail(e) - - -class DiscoveryQueryTestCase(IdWsf2TestCase): +class MetadataAssociationTestCase(IdWsf2TestCase): def test01(self): - """Init discovery query""" + """Metadata association add""" idp = self.getIdpServer() wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + service_map = {} + svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types = + (lasso.PP11_HREF,), address = spSoapEndpoint, + abstract = 'My first PP service', services_map = service_map) + # Make the request wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) + wsp_disco.setEpr(dst_epr) + wsp_disco.initMetadataAssociationAdd() + wsp_disco.svcmdids = (svcMDID,) wsp_disco.buildRequestMsg() - + # Receive it idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() + idp_disco.processRequestMsg(wsp_disco.msgBody) + idp_disco.checkSecurityMechanism() + self.failUnlessEqual(idp_disco.svcmdids, (svcMDID,)) + sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId() + name_identifier = idp_disco.getNameIdentifier() + # Store the association + self.failUnless(sender is not None) + self.failUnless(name_identifier is not None) + idp_disco.validateRequest() idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - try: - wsc_disco.initQuery() - except lasso.Error, e: - self.fail(e) + wsp_disco.processResponseMsg(idp_disco.msgBody) def test02(self): - """Init discovery query without login""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - - wsc_disco = lasso.IdWsf2Discovery(wsc) - - try: - wsc_disco.initQuery() - except lasso.Error, e: - if e[0] != lasso.PROFILE_ERROR_SESSION_NOT_FOUND: - self.fail(e) - else: - self.fail('Should have a "session not found" exception') + """Metadata association query""" + pass def test03(self): - """Init discovery query - check msg url""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - - self.failUnless(wsc_disco.msgUrl, 'missing msgUrl') - - def test04(self): - """Add requested service type to discovery query""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - - try: - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - except lasso.Error, e: - self.fail(e) - - def test05(self): - """Build discovery query""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - wsc_disco.buildRequestMsg() - - self.failUnless(wsc_disco.msgBody) - - def test06(self): - """Process discovery query""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - wsc_disco.buildRequestMsg() - - try: - idp_disco.processQueryMsg(wsc_disco.msgBody) - except lasso.Error, e: - self.fail(e) - - def test07(self): - """Process discovery query and check name identifier""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) + """Metadata association delete""" + pass - wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - wsc_disco.buildRequestMsg() - - idp_disco.processQueryMsg(wsc_disco.msgBody) - - def test08(self): - """Build discovery query response EPRs""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - wsc_disco.buildRequestMsg() - - idp_disco.processQueryMsg(wsc_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - try: - idp_disco.buildQueryResponseEprs() - except lasso.Error, e: - self.fail(e) - - def test09(self): - """Build discovery query response""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - wsc_disco.buildRequestMsg() - - idp_disco.processQueryMsg(wsc_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - idp_disco.buildQueryResponseEprs() - idp_disco.buildResponseMsg() - - self.failUnless(idp_disco.msgBody, 'missing msgBody') - - def test10(self): - """Process discovery query response""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - wsc_disco.buildRequestMsg() - - idp_disco.processQueryMsg(wsc_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - idp_disco.buildQueryResponseEprs() - idp_disco.buildResponseMsg() - - try: - wsc_disco.processQueryResponseMsg(idp_disco.msgBody) - except lasso.Error, e: - self.fail(e) - - def test11(self): - """Check discovery query result""" +class QueryTestCase(IdWsf2TestCase): + def test01(self): + """Simple DST query""" idp = self.getIdpServer() wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - + # Register the service, add an association + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, wsp_dst_epr = self.login(wsp, idp) + service_maps = {} + svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types = + (lasso.PP11_HREF,), address = spSoapEndpoint, + abstract = 'My first PP service', services_map = service_maps) + service_associations = {} + self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps, service_associations) + # Try to find the service + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump, wsc_dst_epr = self.login(wsp, idp) wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - + wsc_disco.setEpr(wsc_dst_epr) wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') + wsc_disco.addRequestedService(service_types = (lasso.PP11_HREF,)) wsc_disco.buildRequestMsg() - - idp_disco.processQueryMsg(wsc_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - idp_disco.buildQueryResponseEprs() - idp_disco.buildResponseMsg() - - wsc_disco.processQueryResponseMsg(idp_disco.msgBody) - - self.failUnless(wsc_disco.getService(), 'missing service after discovery query') - - -class DataServiceQueryTestCase(IdWsf2TestCase): - def getProfileService(self): - """Check discovery query result""" - idp = self.getIdpServer() - wsp = self.getWspServer() - idp, svcMDID = self.metadataRegister(wsp, idp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp) - - wsp_disco = lasso.IdWsf2Discovery(wsp) - if wsp_identity_dump is not None: - wsp_disco.setIdentityFromDump(wsp_identity_dump) - if wsp_session_dump is not None: - wsp_disco.setSessionFromDump(wsp_session_dump) - wsp_disco.initMetadataAssociationAdd(svcMDID) - wsp_disco.buildRequestMsg() - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - - idp_disco.registerMetadata() - if idp_disco.isIdentityDirty: - idp_identity_dump = idp_disco.identity.dump() - if idp_disco.isSessionDirty: - idp_session_dump = idp_disco.session.dump() - idp_disco.buildResponseMsg() - - wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody) - - wsc = self.getWscServer() - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \ - self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump) - - wsc_disco = lasso.IdWsf2Discovery(wsc) - if wsc_identity_dump is not None: - wsc_disco.setIdentityFromDump(wsc_identity_dump) - if wsc_session_dump is not None: - wsc_disco.setSessionFromDump(wsc_session_dump) - - wsc_disco.initQuery() - wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05') - wsc_disco.buildRequestMsg() - - idp_disco.processQueryMsg(wsc_disco.msgBody) - if idp_identity_dump is not None: - idp_disco.setIdentityFromDump(idp_identity_dump) - if idp_session_dump is not None: - idp_disco.setSessionFromDump(idp_session_dump) - idp_disco.buildQueryResponseEprs() + idp_disco.setIdentityFromDump(idp_identity_dump) + idp_disco.processRequestMsg(wsc_disco.msgBody) + name_identifier = idp_disco.getNameIdentifier() + k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content) + for x in service_associations: + for id in service_associations[x]: + idp_disco.addServiceMetadata(service_maps[id]) + idp_disco.validateRequest() idp_disco.buildResponseMsg() - wsc_disco.processQueryResponseMsg(idp_disco.msgBody) - - return wsc_disco.getService(), wsp - - def test01(self): - """Data service init query""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - - try: - service.initQuery() - except lasso.Error, e: - self.fail(e) - - def test02(self): - """Data service init query - msgUrl construction""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - - self.failUnless(service.msgUrl, 'missing msgUrl') - - def test03(self): - """Data service add query item""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - - try: - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - except lasso.Error, e: - self.fail(e) - - def test04(self): - """Data service build query""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.buildRequestMsg() - - self.failUnless(service.msgBody, 'missing msgBody') - - def test05(self): - """Data service build query with multiple items""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - self.failUnless(service.msgBody, 'missing msgBody') - - def test06(self): - """Data service process query""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - - try: - wsp_service.processQueryMsg(service.msgBody) - except lasso.Error, e: - self.fail(e) - - def test07(self): - """Data service check service type""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - self.failUnless(wsp_service.type, 'service type is not set') - self.failUnless(wsp_service.type == 'urn:liberty:id-sis-pp:2005-05', 'wrong service type') - - def test08(self): - """Data service get query items""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - self.failUnless(wsp_service.queryItems, 'queryItems list is None or empty') - - def test09(self): - """Data service check query items""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - items = [ '/pp2:PP/pp2:InformalName', 'not existing attribute', '/pp2:PP/pp2:MsgContact' ] - for i in range(3): - self.failUnless(wsp_service.queryItems[i] == items[i], - "query items don't match : %s != %s" % (wsp_service.queryItems[i], items[i])) - - def test10(self): - """Data service check name identifier""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - def test11(self): - """Data service parse query items - success""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - try: - wsp_service.parseQueryItems() - except lasso.Error, e: - self.fail(e) - - def test12(self): - """Data service parse query items - failure - no item""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - try: - wsp_service.parseQueryItems() - except lasso.DstEmptyRequestError: - pass - except Exception, e: - self.fail(e) - - def test13(self): - """Data service parse query items - failure - wrong item""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('not existing attribute', 'not existing attribute') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - try: - wsp_service.parseQueryItems() - except lasso.Error, e: - if e[0] != lasso.DST_ERROR_QUERY_FAILED: - self.fail(e) - else: - self.fail('query items parsing should have failed because a wrong query item was requested') - - def test14(self): - """Data service parse query items - failure - no data""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - try: - wsp_service.parseQueryItems() - except lasso.Error, e: - if e[0] != lasso.DST_ERROR_MISSING_SERVICE_DATA: - self.fail(e) - else: - self.fail('query items parsing should have failed because no data was provided') - - - def test15(self): - """Data service parse query items - partial failure""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - </PP>""" - - # No exception should be raised here but one will be raised on the WSC - # when parsing response status - try: - wsp_service.parseQueryItems() - except lasso.Error, e: - self.fail(e) - - def test16(self): - """Data service build query response""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - wsp_service.parseQueryItems() - wsp_service.buildResponseMsg() - - self.failUnless(wsp_service.msgBody, 'missing msgBody') - - def test17(self): - """Data service process query response - success""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - wsp_service.parseQueryItems() - wsp_service.buildResponseMsg() - - try: - service.processQueryResponseMsg(wsp_service.msgBody) - except lasso.Error, e: - self.fail(e) - - def test18(self): - """Data service process query response - failure""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('not existing attribute', 'not existing attribute') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - try: - wsp_service.parseQueryItems() - except lasso.Error, e: - pass - wsp_service.buildResponseMsg() - - try: - service.processQueryResponseMsg(wsp_service.msgBody) - except lasso.Error, e: - if e[0] != lasso.DST_ERROR_QUERY_FAILED: - self.fail(e) - else: - self.fail('response should have a "failed" status because a wrong query item was requested') - - def test19(self): - """Data service process query response - partial failure""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - try: - wsp_service.parseQueryItems() - except lasso.DstQueryPartiallyFailedError: - pass - except: - self.fail('parseQueryItems should emit a "partially failed" error because a wrong query item was requested') - wsp_service.buildResponseMsg() - - try: - service.processQueryResponseMsg(wsp_service.msgBody) - except lasso.DstQueryPartiallyFailedError: - pass - except: - self.fail('response should have a "partially failed" status because a wrong query item was requested') - - def test20(self): - """Data service get first attribute""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - wsp_service.parseQueryItems() - wsp_service.buildResponseMsg() - - service.processQueryResponseMsg(wsp_service.msgBody) - informal_name = service.getAttributeNode() - - self.failUnlessEqual(informal_name, """<pp2:InformalName xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">User name</pp2:InformalName>""", 'first attribute node is wrong') - - def test21(self): - """Data service get attribute string""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - wsp_service.parseQueryItems() - wsp_service.buildResponseMsg() - - service.processQueryResponseMsg(wsp_service.msgBody) - informal_name = service.getAttributeString('name') - - self.failUnlessEqual(informal_name, 'User name', 'attribute string is wrong') - - def test22(self): - """Data service get attribute node""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - wsp_service.parseQueryItems() - wsp_service.buildResponseMsg() - - service.processQueryResponseMsg(wsp_service.msgBody) - email = service.getAttributeNode('email') - - expected_result = """<pp2:MsgContact xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">.*?<pp2:MsgAccount>Email account</pp2:MsgAccount>.*?<pp2:MsgProvider>Email server</pp2:MsgProvider>.*?</pp2:MsgContact>""" - - import re - result = re.findall(expected_result, email, re.DOTALL) - - self.failUnless(len(result) == 1, 'attribute node is wrong') - - def test23(self): - """Data service get attribute node - partial failure""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <InformalName>User name</InformalName> - <MsgContact> - <MsgAccount>Email account</MsgAccount> - <MsgProvider>Email server</MsgProvider> - </MsgContact> - </PP>""" - - try: - wsp_service.parseQueryItems() - except lasso.DstQueryPartiallyFailedError: - pass - except: - self.fail('parseQueryItems should emit a "partially failed" error because a wrong query item was requested') - wsp_service.buildResponseMsg() - - try: - service.processQueryResponseMsg(wsp_service.msgBody) - except lasso.Error, e: - if e[0] == lasso.DST_ERROR_QUERY_PARTIALLY_FAILED: - pass - informal_name = service.getAttributeString('name') - email = service.getAttributeNode('email') - - self.failUnlessEqual(informal_name, 'User name', 'attribute string is wrong') - self.failUnlessEqual(email, None, 'attribute node should be None') - - def test24(self): - """Data service redirect request""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - if '/pp2:PP/pp2:MsgContact' in wsp_service.queryItems: - wsp_service.initRedirectUserForConsent('http://sp5/consent'); - wsp_service.buildResponseMsg() - - try: - service.processQueryResponseMsg(wsp_service.msgBody) - except lasso.SoapRedirectRequestFaultError: - pass - except Exception, e: - self.fail(e) - else: - self.fail('a "soap fault redirect request" exception should have been raised') - - def test25(self): - """Data service redirect request - check redirectUrl""" - service, wsp = self.getProfileService() - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:InformalName', 'name') - service.addQueryItem('not existing attribute', 'not existing attribute') - service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - if '/pp2:PP/pp2:MsgContact' in wsp_service.queryItems: - wsp_service.initRedirectUserForConsent('http://sp5/consent'); - wsp_service.buildResponseMsg() - - try: - service.processQueryResponseMsg(wsp_service.msgBody) - except lasso.SoapRedirectRequestFaultError: - pass - except Exception, e: - self.fail(e) - - self.failUnlessEqual(service.redirectUrl, 'http://sp5/consent', 'redirectUrl is not set or wrong') - - def test26(self): - """Data service get multiple attribute nodes with the same name""" - lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05') - service, wsp = self.getProfileService() - service.initQuery() - service.addQueryItem('/pp2:PP/pp2:MsgContact/pp2:MsgAccount', 'email') - service.buildRequestMsg() - - wsp_service = lasso.IdWsf2DataService(wsp) - wsp_service.processQueryMsg(service.msgBody) - - email1 = 'Email account 1' - email2 = 'Email account 2' - wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05"> - <MsgContact> - <MsgAccount>%s</MsgAccount> - </MsgContact> - <MsgContact> - <MsgAccount>%s</MsgAccount> - </MsgContact> - </PP>""" % (email1, email2) - - wsp_service.parseQueryItems() - wsp_service.buildResponseMsg() - - service.processQueryResponseMsg(wsp_service.msgBody) - - email_nodes = service.getAttributeNodes('email') - self.failUnless(email_nodes[0] == - '<pp2:MsgAccount xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">%s</pp2:MsgAccount>' % email1) - self.failUnless(email_nodes[1] == - '<pp2:MsgAccount xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">%s</pp2:MsgAccount>' % email2) - - email_strings = service.getAttributeStrings('email') - self.failUnless(email_strings[0] == email1) - self.failUnless(email_strings[1] == email2) - - metadataSuite = unittest.makeSuite(MetadataTestCase, 'test') -#metadataAssociationAddSuite = unittest.makeSuite(MetadataAssociationAddTestCase, 'test') -#discoveryQuerySuite = unittest.makeSuite(DiscoveryQueryTestCase, 'test') -#dataServiceQuerySuite = unittest.makeSuite(DataServiceQueryTestCase, 'test') +metadataAssociationSuite = unittest.makeSuite(MetadataAssociationTestCase, 'test') +querySuite = unittest.makeSuite(QueryTestCase, 'test') + -allTests = unittest.TestSuite((metadataSuite,)) -# metadataAssociationAddSuite,)) # discoveryQuerySuite, dataServiceQuerySuite)) +allTests = unittest.TestSuite((metadataSuite, + metadataAssociationSuite,querySuite)) if __name__ == '__main__': sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful()) |