diff options
author | Benjamin Dauvergne <bdauvergne@entrouvert.com> | 2010-03-02 11:57:48 +0000 |
---|---|---|
committer | Benjamin Dauvergne <bdauvergne@entrouvert.com> | 2010-03-02 11:57:48 +0000 |
commit | 79271d3032e7f7fddd45187333bc0b77991f6d44 (patch) | |
tree | 497a03dc9e2f7f70db4d1b7b4a9b46d1f385ac01 /bindings/python | |
parent | 412e3e9606e062013edcfa2a90ebca2df3470287 (diff) | |
download | lasso-79271d3032e7f7fddd45187333bc0b77991f6d44.tar.gz lasso-79271d3032e7f7fddd45187333bc0b77991f6d44.tar.xz lasso-79271d3032e7f7fddd45187333bc0b77991f6d44.zip |
ID-WSF 2.0 python tests: finish tests for new ID-WSF 2.0 API
* bindings/python/tests/idwsf2_tests.py:
all Discovery service request types are tested, and Data Service
query is tested as well. Data Service testing and API should more
tested, especially failure cases.
Diffstat (limited to 'bindings/python')
-rwxr-xr-x | bindings/python/tests/idwsf2_tests.py | 183 |
1 files changed, 157 insertions, 26 deletions
diff --git a/bindings/python/tests/idwsf2_tests.py b/bindings/python/tests/idwsf2_tests.py index 8e8a53a6..d7a4cb24 100755 --- a/bindings/python/tests/idwsf2_tests.py +++ b/bindings/python/tests/idwsf2_tests.py @@ -28,7 +28,7 @@ import os import unittest import sys -import pprint +from StringIO import StringIO if not '..' in sys.path: sys.path.insert(0, '..') @@ -37,6 +37,13 @@ if not '../.libs' in sys.path: import lasso +try: + import lxml.etree as ET +except ImportError: + try: + import elementtree.ElementTree as ET + except ImportError: + import xml.etree.ElementTree as ET try: dataDir @@ -86,6 +93,30 @@ class IdWsf2TestCase(unittest.TestCase): return server + def query(self, wsc, idp, idp_identity_dump, wsc_session_dump, uid, federations, services_map, service_associations, provider_ids = None, service_types = None, options = None, actions = None): + session = lasso.Session.newFromDump(wsc_session_dump) + assertion = session.getAssertion(idp.providerId) + self.failUnless(assertion is not None) + epr = assertion.idwsf2GetDiscoveryBootstrapEpr() + self.failUnless(epr is not None) + wsc_disco = lasso.IdWsf2Discovery(wsc) + wsc_disco.setEpr(epr) + wsc_disco.initQuery() + wsc_disco.addRequestedService(service_types = service_types, provider_ids = provider_ids, options = options, actions = actions) + wsc_disco.buildRequestMsg() + idp_disco = lasso.IdWsf2Discovery(idp) + idp_disco.setIdentityFromDump(idp_identity_dump) + idp_disco.processRequestMsg(wsc_disco.msgBody) + f = self.nid2tuple(idp_disco.getNameIdentifier()) + uid = federations[f] + for id in service_associations[uid]: + idp_disco.addServiceMetadata(services_map[id]) + idp_disco.validateRequest() + idp_disco.buildResponseMsg() + wsc_disco.processResponseMsg(idp_disco.msgBody) + return wsc_disco.endpointReferences + + def metadataRegister(self, wsp, idp, session_dump, abstract = None, address = None, provider_id = None, service_types = None, services_map = None): session = lasso.Session.newFromDump(session_dump) @@ -129,7 +160,10 @@ class IdWsf2TestCase(unittest.TestCase): self.failUnlessEqual(wsp_disco.metadatas[0].svcMDID, wsp_disco.response.svcMDID[0]) return wsp_disco.metadatas[0].svcMDID - def addAssociation(self, wsp, idp, session_dump, svcmdid, service_maps, service_associations): + def nid2tuple(self, nid): + return (nid.nameQualifier, nid.format, nid.sPNameQualifier, nid.content) + + def addAssociation(self, wsp, idp, session_dump, svcmdid, service_maps, federations, service_associations): self.failUnless(isinstance(service_associations, dict)) self.failUnless(isinstance(service_maps, dict)) # Get the bootstrap @@ -150,20 +184,21 @@ class IdWsf2TestCase(unittest.TestCase): self.failUnlessEqual(idp_disco.svcmdids, (svcmdid,)) sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId() name_identifier = idp_disco.getNameIdentifier() - k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content) - l = service_associations.get(k, []) + f = self.nid2tuple(name_identifier) + uid = federations[f] + l = service_associations.get(uid, []) for id in idp_disco.svcmdids: # check it exists self.failUnless(service_maps.get(id) is not None) # create association if id not in l: l.append(id) - service_associations[k] = l + service_associations[uid] = l idp_disco.validateRequest() idp_disco.buildResponseMsg() wsp_disco.processResponseMsg(idp_disco.msgBody) - def login(self, sp, idp, sp_identity_dump=None, sp_session_dump=None, + def login(self, sp, idp, user_id, federations, sp_identity_dump=None, sp_session_dump=None, idp_identity_dump=None, idp_session_dump=None): sp_login = lasso.Login(sp) idp_provider_id = 'http://idp5/metadata' @@ -181,6 +216,14 @@ class IdWsf2TestCase(unittest.TestCase): idp_login.processAuthnRequestMsg(query) idp_login.validateRequestMsg(True, True) idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None) + if idp_login.assertion.subject.encryptedId: + f = self.nid2tuple(idp_login.assertion.subject.encryptedId.originalData) + else: + f = self.nid2tuple(idp_login.assertion.subject.nameId) + federations[f] = user_id + l = federations.get(user_id, []) + l.append(f) + federations[user_id] = l idp_login.idwsf2AddDiscoveryBootstrapEpr(url = idpSoapEndpoint, abstract = 'Discovery Service', security_mechanisms = (lasso.SECURITY_MECH_BEARER,)) idp_login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET) artifact_message = idp_login.artifactMessage @@ -216,7 +259,7 @@ class MetadataTestCase(IdWsf2TestCase): """Test metadata registration on the IdP""" idp = self.getIdpServer() wsp = self.getWspServer() - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {}) wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) @@ -302,7 +345,7 @@ class MetadataTestCase(IdWsf2TestCase): "Test failure by IdP for register request" idp = self.getIdpServer() wsp = self.getWspServer() - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {}) wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) @@ -373,7 +416,7 @@ class MetadataTestCase(IdWsf2TestCase): """Test metadata register with redirection""" idp = self.getIdpServer() wsp = self.getWspServer() - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {}) wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) @@ -458,6 +501,7 @@ class MetadataTestCase(IdWsf2TestCase): # Here keep information about the request associated to ID: response_envelope.getMessageId().content wsp_disco_dump = wsp_disco.dump() wsp_disco = lasso.Node.newFromDump(wsp_disco_dump) + wsp_disco.server = wsp request_envelope = wsp_disco.getSoapEnvelopeRequest() self.failUnless(request_envelope is not None) relates_to = request_envelope.getRelatesTo(True) @@ -518,7 +562,8 @@ class MetadataTestCase(IdWsf2TestCase): """Test metadata query""" idp = self.getIdpServer() wsp = self.getWspServer() - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + federations = {} + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, federations) service_map = {} self.metadataRegister(wsp, idp, wsp_session_dump, service_types = (lasso.PP11_HREF,), address = spSoapEndpoint, @@ -526,8 +571,7 @@ class MetadataTestCase(IdWsf2TestCase): self.metadataRegister(wsp, idp, wsp_session_dump, service_types = (lasso.PP11_HREF,), address = spSoapEndpoint+'2', abstract = 'My second PP service', services_map = service_map) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, federations) wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) wsp_disco.initMetadataQuery() @@ -549,7 +593,7 @@ class MetadataTestCase(IdWsf2TestCase): """Test metadata delete""" idp = self.getIdpServer() wsp = self.getWspServer() - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {}) service_map = {} self.metadataRegister(wsp, idp, wsp_session_dump, service_types = (lasso.PP11_HREF,), address = spSoapEndpoint, @@ -557,7 +601,7 @@ class MetadataTestCase(IdWsf2TestCase): self.metadataRegister(wsp, idp, wsp_session_dump, service_types = (lasso.PP11_HREF,), address = spSoapEndpoint+'2', abstract = 'My second PP service', services_map = service_map) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {}) wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) @@ -581,7 +625,7 @@ class MetadataAssociationTestCase(IdWsf2TestCase): """Metadata association add""" idp = self.getIdpServer() wsp = self.getWspServer() - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {}) service_map = {} svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types = (lasso.PP11_HREF,), address = spSoapEndpoint, @@ -608,7 +652,41 @@ class MetadataAssociationTestCase(IdWsf2TestCase): def test02(self): """Metadata association query""" - pass + idp = self.getIdpServer() + wsp = self.getWspServer() + wsc = self.getWscServer() + # Register the service, add an association + federations = {} + wsp_identity_dump, wsp_session_dump, \ + idp_identity_dump, idp_session_dump, \ + wsp_dst_epr = self.login(wsp, idp, 1, federations) + service_maps = {} + svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, + service_types = (lasso.PP11_HREF,), address = spSoapEndpoint, + abstract = 'My first PP service', services_map = service_maps) + service_associations = {} + self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps, + federations, service_associations) + # Start a query + wsp_disco = lasso.IdWsf2Discovery(wsp) + wsp_disco.setEpr(wsp_dst_epr) + wsp_disco.initMetadataAssociationQuery() + wsp_disco.buildRequestMsg() + # + idp_disco = lasso.IdWsf2Discovery(idp) + idp_disco.processRequestMsg(wsp_disco.msgBody) + idp_disco.checkSecurityMechanism() + self.failUnlessEqual(idp_disco.svcmdids, None) + f = self.nid2tuple(idp_disco.getNameIdentifier()) + uid = federations[f] + result = [] + for svcmdid in service_associations[uid]: + result.append(svcmdid) + idp_disco.svcmdids = tuple(result) + idp_disco.validateRequest() + idp_disco.buildResponseMsg() + wsp_disco.processResponseMsg(idp_disco.msgBody) + self.failUnlessEqual(wsp_disco.svcmdids, (svcMDID,)) def test03(self): """Metadata association delete""" @@ -616,20 +694,21 @@ class MetadataAssociationTestCase(IdWsf2TestCase): class QueryTestCase(IdWsf2TestCase): def test01(self): - """Simple DST query""" + """Discovery Service Query""" idp = self.getIdpServer() wsp = self.getWspServer() wsc = self.getWscServer() + federations = {} # Register the service, add an association - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, wsp_dst_epr = self.login(wsp, idp) + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, wsp_dst_epr = self.login(wsp, idp, 1, federations) service_maps = {} svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types = (lasso.PP11_HREF,), address = spSoapEndpoint, abstract = 'My first PP service', services_map = service_maps) service_associations = {} - self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps, service_associations) + self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps, federations, service_associations) # Try to find the service - wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump, wsc_dst_epr = self.login(wsp, idp) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump, wsc_dst_epr = self.login(wsc, idp, 1, federations, idp_identity_dump = idp_identity_dump, idp_session_dump = idp_session_dump) wsc_disco = lasso.IdWsf2Discovery(wsc) wsc_disco.setEpr(wsc_dst_epr) wsc_disco.initQuery() @@ -638,21 +717,73 @@ class QueryTestCase(IdWsf2TestCase): idp_disco = lasso.IdWsf2Discovery(idp) idp_disco.setIdentityFromDump(idp_identity_dump) idp_disco.processRequestMsg(wsc_disco.msgBody) - name_identifier = idp_disco.getNameIdentifier() - k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content) - for x in service_associations: - for id in service_associations[x]: - idp_disco.addServiceMetadata(service_maps[id]) + f = self.nid2tuple(idp_disco.getNameIdentifier()) + uid = federations[f] + for id in service_associations[uid]: + idp_disco.addServiceMetadata(service_maps[id]) idp_disco.validateRequest() idp_disco.buildResponseMsg() + wsc_disco.processResponseMsg(idp_disco.msgBody) + self.failUnlessEqual(len(wsc_disco.endpointReferences), 1) + +class DstTestCase(IdWsf2TestCase): + def test01(self): + """Data Service Template Query""" + content = '<pp:PP xmlns:pp="%s">Coin</pp:PP>' % lasso.PP11_HREF + idp = self.getIdpServer() + wsp = self.getWspServer() + wsc = self.getWscServer() + federations = {} + # Register the service, add an association + wsp_identity_dump, wsp_session_dump, idp_identity_dump, \ + idp_session_dump, wsp_dst_epr = self.login(wsp, idp, 1, + federations) + service_maps = {} + svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, + service_types = (lasso.PP11_HREF,), address = + spSoapEndpoint, abstract = 'My first PP service', + services_map = service_maps) + service_associations = {} + self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, + service_maps, federations, service_associations) + wsc_identity_dump, wsc_session_dump, idp_identity_dump, \ + idp_session_dump, wsc_dst_epr = self.login(wsc, idp, 1, federations, + idp_identity_dump = idp_identity_dump, idp_session_dump = + idp_session_dump) + eprs = self.query(wsc, idp, idp_identity_dump, wsc_session_dump, 1, + federations, service_maps, service_associations, + service_types = (lasso.PP11_HREF,)) + self.failUnless(len(eprs), 1) + lasso.registerIdwsf2DstService(lasso.PP11_PREFIX, lasso.PP11_HREF) + wsc_dst = lasso.IdWsf2DataService(wsc) + wsc_dst.setEpr(eprs[0]) + wsc_dst.initQuery() + wsc_dst.setServiceType(lasso.PP11_PREFIX, lasso.PP11_HREF) + wsc_dst.addQueryItem('/%s:PP' % lasso.PP11_PREFIX, 'xxx') + wsc_dst.buildRequestMsg() + wsp_dst = lasso.IdWsf2DataService(wsp) + wsp_dst.processRequestMsg(wsc_dst.msgBody) + self.failUnlessEqual(wsp_dst.requestType, lasso.IDWSF2_DATA_SERVICE_REQUEST_TYPE_QUERY) + wsp_dst.checkSecurityMechanism() + data = ET.parse(StringIO(content)) + for item in wsp_dst.items: + result = data.xpath(item.select, namespaces = { lasso.PP11_PREFIX: lasso.PP11_HREF }) + for found in result: + wsp_dst.setQueryItemResult(item.itemId, ET.tostring(found), True) + wsp_dst.setServiceType(lasso.PP11_PREFIX, lasso.PP11_HREF) + wsp_dst.initResponse() + wsp_dst.buildResponseMsg() + wsc_dst.processResponseMsg(wsp_dst.msgBody) + metadataSuite = unittest.makeSuite(MetadataTestCase, 'test') metadataAssociationSuite = unittest.makeSuite(MetadataAssociationTestCase, 'test') querySuite = unittest.makeSuite(QueryTestCase, 'test') +dstSuite = unittest.makeSuite(DstTestCase, 'test') allTests = unittest.TestSuite((metadataSuite, - metadataAssociationSuite,querySuite)) + metadataAssociationSuite,querySuite,dstSuite)) if __name__ == '__main__': sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful()) |