summaryrefslogtreecommitdiffstats
path: root/bindings/python
diff options
context:
space:
mode:
authorBenjamin Dauvergne <bdauvergne@entrouvert.com>2010-03-02 11:57:48 +0000
committerBenjamin Dauvergne <bdauvergne@entrouvert.com>2010-03-02 11:57:48 +0000
commit79271d3032e7f7fddd45187333bc0b77991f6d44 (patch)
tree497a03dc9e2f7f70db4d1b7b4a9b46d1f385ac01 /bindings/python
parent412e3e9606e062013edcfa2a90ebca2df3470287 (diff)
downloadlasso-79271d3032e7f7fddd45187333bc0b77991f6d44.tar.gz
lasso-79271d3032e7f7fddd45187333bc0b77991f6d44.tar.xz
lasso-79271d3032e7f7fddd45187333bc0b77991f6d44.zip
ID-WSF 2.0 python tests: finish tests for new ID-WSF 2.0 API
* bindings/python/tests/idwsf2_tests.py: all Discovery service request types are tested, and Data Service query is tested as well. Data Service testing and API should more tested, especially failure cases.
Diffstat (limited to 'bindings/python')
-rwxr-xr-xbindings/python/tests/idwsf2_tests.py183
1 files changed, 157 insertions, 26 deletions
diff --git a/bindings/python/tests/idwsf2_tests.py b/bindings/python/tests/idwsf2_tests.py
index 8e8a53a6..d7a4cb24 100755
--- a/bindings/python/tests/idwsf2_tests.py
+++ b/bindings/python/tests/idwsf2_tests.py
@@ -28,7 +28,7 @@
import os
import unittest
import sys
-import pprint
+from StringIO import StringIO
if not '..' in sys.path:
sys.path.insert(0, '..')
@@ -37,6 +37,13 @@ if not '../.libs' in sys.path:
import lasso
+try:
+ import lxml.etree as ET
+except ImportError:
+ try:
+ import elementtree.ElementTree as ET
+ except ImportError:
+ import xml.etree.ElementTree as ET
try:
dataDir
@@ -86,6 +93,30 @@ class IdWsf2TestCase(unittest.TestCase):
return server
+ def query(self, wsc, idp, idp_identity_dump, wsc_session_dump, uid, federations, services_map, service_associations, provider_ids = None, service_types = None, options = None, actions = None):
+ session = lasso.Session.newFromDump(wsc_session_dump)
+ assertion = session.getAssertion(idp.providerId)
+ self.failUnless(assertion is not None)
+ epr = assertion.idwsf2GetDiscoveryBootstrapEpr()
+ self.failUnless(epr is not None)
+ wsc_disco = lasso.IdWsf2Discovery(wsc)
+ wsc_disco.setEpr(epr)
+ wsc_disco.initQuery()
+ wsc_disco.addRequestedService(service_types = service_types, provider_ids = provider_ids, options = options, actions = actions)
+ wsc_disco.buildRequestMsg()
+ idp_disco = lasso.IdWsf2Discovery(idp)
+ idp_disco.setIdentityFromDump(idp_identity_dump)
+ idp_disco.processRequestMsg(wsc_disco.msgBody)
+ f = self.nid2tuple(idp_disco.getNameIdentifier())
+ uid = federations[f]
+ for id in service_associations[uid]:
+ idp_disco.addServiceMetadata(services_map[id])
+ idp_disco.validateRequest()
+ idp_disco.buildResponseMsg()
+ wsc_disco.processResponseMsg(idp_disco.msgBody)
+ return wsc_disco.endpointReferences
+
+
def metadataRegister(self, wsp, idp, session_dump, abstract = None, address = None, provider_id = None, service_types = None, services_map = None):
session = lasso.Session.newFromDump(session_dump)
@@ -129,7 +160,10 @@ class IdWsf2TestCase(unittest.TestCase):
self.failUnlessEqual(wsp_disco.metadatas[0].svcMDID, wsp_disco.response.svcMDID[0])
return wsp_disco.metadatas[0].svcMDID
- def addAssociation(self, wsp, idp, session_dump, svcmdid, service_maps, service_associations):
+ def nid2tuple(self, nid):
+ return (nid.nameQualifier, nid.format, nid.sPNameQualifier, nid.content)
+
+ def addAssociation(self, wsp, idp, session_dump, svcmdid, service_maps, federations, service_associations):
self.failUnless(isinstance(service_associations, dict))
self.failUnless(isinstance(service_maps, dict))
# Get the bootstrap
@@ -150,20 +184,21 @@ class IdWsf2TestCase(unittest.TestCase):
self.failUnlessEqual(idp_disco.svcmdids, (svcmdid,))
sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId()
name_identifier = idp_disco.getNameIdentifier()
- k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content)
- l = service_associations.get(k, [])
+ f = self.nid2tuple(name_identifier)
+ uid = federations[f]
+ l = service_associations.get(uid, [])
for id in idp_disco.svcmdids:
# check it exists
self.failUnless(service_maps.get(id) is not None)
# create association
if id not in l:
l.append(id)
- service_associations[k] = l
+ service_associations[uid] = l
idp_disco.validateRequest()
idp_disco.buildResponseMsg()
wsp_disco.processResponseMsg(idp_disco.msgBody)
- def login(self, sp, idp, sp_identity_dump=None, sp_session_dump=None,
+ def login(self, sp, idp, user_id, federations, sp_identity_dump=None, sp_session_dump=None,
idp_identity_dump=None, idp_session_dump=None):
sp_login = lasso.Login(sp)
idp_provider_id = 'http://idp5/metadata'
@@ -181,6 +216,14 @@ class IdWsf2TestCase(unittest.TestCase):
idp_login.processAuthnRequestMsg(query)
idp_login.validateRequestMsg(True, True)
idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None)
+ if idp_login.assertion.subject.encryptedId:
+ f = self.nid2tuple(idp_login.assertion.subject.encryptedId.originalData)
+ else:
+ f = self.nid2tuple(idp_login.assertion.subject.nameId)
+ federations[f] = user_id
+ l = federations.get(user_id, [])
+ l.append(f)
+ federations[user_id] = l
idp_login.idwsf2AddDiscoveryBootstrapEpr(url = idpSoapEndpoint, abstract = 'Discovery Service', security_mechanisms = (lasso.SECURITY_MECH_BEARER,))
idp_login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET)
artifact_message = idp_login.artifactMessage
@@ -216,7 +259,7 @@ class MetadataTestCase(IdWsf2TestCase):
"""Test metadata registration on the IdP"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {})
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
@@ -302,7 +345,7 @@ class MetadataTestCase(IdWsf2TestCase):
"Test failure by IdP for register request"
idp = self.getIdpServer()
wsp = self.getWspServer()
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {})
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
@@ -373,7 +416,7 @@ class MetadataTestCase(IdWsf2TestCase):
"""Test metadata register with redirection"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {})
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
@@ -458,6 +501,7 @@ class MetadataTestCase(IdWsf2TestCase):
# Here keep information about the request associated to ID: response_envelope.getMessageId().content
wsp_disco_dump = wsp_disco.dump()
wsp_disco = lasso.Node.newFromDump(wsp_disco_dump)
+ wsp_disco.server = wsp
request_envelope = wsp_disco.getSoapEnvelopeRequest()
self.failUnless(request_envelope is not None)
relates_to = request_envelope.getRelatesTo(True)
@@ -518,7 +562,8 @@ class MetadataTestCase(IdWsf2TestCase):
"""Test metadata query"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ federations = {}
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, federations)
service_map = {}
self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
(lasso.PP11_HREF,), address = spSoapEndpoint,
@@ -526,8 +571,7 @@ class MetadataTestCase(IdWsf2TestCase):
self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
(lasso.PP11_HREF,), address = spSoapEndpoint+'2',
abstract = 'My second PP service', services_map = service_map)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
-
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, federations)
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
wsp_disco.initMetadataQuery()
@@ -549,7 +593,7 @@ class MetadataTestCase(IdWsf2TestCase):
"""Test metadata delete"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {})
service_map = {}
self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
(lasso.PP11_HREF,), address = spSoapEndpoint,
@@ -557,7 +601,7 @@ class MetadataTestCase(IdWsf2TestCase):
self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
(lasso.PP11_HREF,), address = spSoapEndpoint+'2',
abstract = 'My second PP service', services_map = service_map)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {})
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
@@ -581,7 +625,7 @@ class MetadataAssociationTestCase(IdWsf2TestCase):
"""Metadata association add"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp, 1, {})
service_map = {}
svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
(lasso.PP11_HREF,), address = spSoapEndpoint,
@@ -608,7 +652,41 @@ class MetadataAssociationTestCase(IdWsf2TestCase):
def test02(self):
"""Metadata association query"""
- pass
+ idp = self.getIdpServer()
+ wsp = self.getWspServer()
+ wsc = self.getWscServer()
+ # Register the service, add an association
+ federations = {}
+ wsp_identity_dump, wsp_session_dump, \
+ idp_identity_dump, idp_session_dump, \
+ wsp_dst_epr = self.login(wsp, idp, 1, federations)
+ service_maps = {}
+ svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump,
+ service_types = (lasso.PP11_HREF,), address = spSoapEndpoint,
+ abstract = 'My first PP service', services_map = service_maps)
+ service_associations = {}
+ self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps,
+ federations, service_associations)
+ # Start a query
+ wsp_disco = lasso.IdWsf2Discovery(wsp)
+ wsp_disco.setEpr(wsp_dst_epr)
+ wsp_disco.initMetadataAssociationQuery()
+ wsp_disco.buildRequestMsg()
+ #
+ idp_disco = lasso.IdWsf2Discovery(idp)
+ idp_disco.processRequestMsg(wsp_disco.msgBody)
+ idp_disco.checkSecurityMechanism()
+ self.failUnlessEqual(idp_disco.svcmdids, None)
+ f = self.nid2tuple(idp_disco.getNameIdentifier())
+ uid = federations[f]
+ result = []
+ for svcmdid in service_associations[uid]:
+ result.append(svcmdid)
+ idp_disco.svcmdids = tuple(result)
+ idp_disco.validateRequest()
+ idp_disco.buildResponseMsg()
+ wsp_disco.processResponseMsg(idp_disco.msgBody)
+ self.failUnlessEqual(wsp_disco.svcmdids, (svcMDID,))
def test03(self):
"""Metadata association delete"""
@@ -616,20 +694,21 @@ class MetadataAssociationTestCase(IdWsf2TestCase):
class QueryTestCase(IdWsf2TestCase):
def test01(self):
- """Simple DST query"""
+ """Discovery Service Query"""
idp = self.getIdpServer()
wsp = self.getWspServer()
wsc = self.getWscServer()
+ federations = {}
# Register the service, add an association
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, wsp_dst_epr = self.login(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, wsp_dst_epr = self.login(wsp, idp, 1, federations)
service_maps = {}
svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
(lasso.PP11_HREF,), address = spSoapEndpoint,
abstract = 'My first PP service', services_map = service_maps)
service_associations = {}
- self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps, service_associations)
+ self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps, federations, service_associations)
# Try to find the service
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump, wsc_dst_epr = self.login(wsp, idp)
+ wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump, wsc_dst_epr = self.login(wsc, idp, 1, federations, idp_identity_dump = idp_identity_dump, idp_session_dump = idp_session_dump)
wsc_disco = lasso.IdWsf2Discovery(wsc)
wsc_disco.setEpr(wsc_dst_epr)
wsc_disco.initQuery()
@@ -638,21 +717,73 @@ class QueryTestCase(IdWsf2TestCase):
idp_disco = lasso.IdWsf2Discovery(idp)
idp_disco.setIdentityFromDump(idp_identity_dump)
idp_disco.processRequestMsg(wsc_disco.msgBody)
- name_identifier = idp_disco.getNameIdentifier()
- k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content)
- for x in service_associations:
- for id in service_associations[x]:
- idp_disco.addServiceMetadata(service_maps[id])
+ f = self.nid2tuple(idp_disco.getNameIdentifier())
+ uid = federations[f]
+ for id in service_associations[uid]:
+ idp_disco.addServiceMetadata(service_maps[id])
idp_disco.validateRequest()
idp_disco.buildResponseMsg()
+ wsc_disco.processResponseMsg(idp_disco.msgBody)
+ self.failUnlessEqual(len(wsc_disco.endpointReferences), 1)
+
+class DstTestCase(IdWsf2TestCase):
+ def test01(self):
+ """Data Service Template Query"""
+ content = '<pp:PP xmlns:pp="%s">Coin</pp:PP>' % lasso.PP11_HREF
+ idp = self.getIdpServer()
+ wsp = self.getWspServer()
+ wsc = self.getWscServer()
+ federations = {}
+ # Register the service, add an association
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, \
+ idp_session_dump, wsp_dst_epr = self.login(wsp, idp, 1,
+ federations)
+ service_maps = {}
+ svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump,
+ service_types = (lasso.PP11_HREF,), address =
+ spSoapEndpoint, abstract = 'My first PP service',
+ services_map = service_maps)
+ service_associations = {}
+ self.addAssociation(wsp, idp, wsp_session_dump, svcMDID,
+ service_maps, federations, service_associations)
+ wsc_identity_dump, wsc_session_dump, idp_identity_dump, \
+ idp_session_dump, wsc_dst_epr = self.login(wsc, idp, 1, federations,
+ idp_identity_dump = idp_identity_dump, idp_session_dump =
+ idp_session_dump)
+ eprs = self.query(wsc, idp, idp_identity_dump, wsc_session_dump, 1,
+ federations, service_maps, service_associations,
+ service_types = (lasso.PP11_HREF,))
+ self.failUnless(len(eprs), 1)
+ lasso.registerIdwsf2DstService(lasso.PP11_PREFIX, lasso.PP11_HREF)
+ wsc_dst = lasso.IdWsf2DataService(wsc)
+ wsc_dst.setEpr(eprs[0])
+ wsc_dst.initQuery()
+ wsc_dst.setServiceType(lasso.PP11_PREFIX, lasso.PP11_HREF)
+ wsc_dst.addQueryItem('/%s:PP' % lasso.PP11_PREFIX, 'xxx')
+ wsc_dst.buildRequestMsg()
+ wsp_dst = lasso.IdWsf2DataService(wsp)
+ wsp_dst.processRequestMsg(wsc_dst.msgBody)
+ self.failUnlessEqual(wsp_dst.requestType, lasso.IDWSF2_DATA_SERVICE_REQUEST_TYPE_QUERY)
+ wsp_dst.checkSecurityMechanism()
+ data = ET.parse(StringIO(content))
+ for item in wsp_dst.items:
+ result = data.xpath(item.select, namespaces = { lasso.PP11_PREFIX: lasso.PP11_HREF })
+ for found in result:
+ wsp_dst.setQueryItemResult(item.itemId, ET.tostring(found), True)
+ wsp_dst.setServiceType(lasso.PP11_PREFIX, lasso.PP11_HREF)
+ wsp_dst.initResponse()
+ wsp_dst.buildResponseMsg()
+ wsc_dst.processResponseMsg(wsp_dst.msgBody)
+
metadataSuite = unittest.makeSuite(MetadataTestCase, 'test')
metadataAssociationSuite = unittest.makeSuite(MetadataAssociationTestCase, 'test')
querySuite = unittest.makeSuite(QueryTestCase, 'test')
+dstSuite = unittest.makeSuite(DstTestCase, 'test')
allTests = unittest.TestSuite((metadataSuite,
- metadataAssociationSuite,querySuite))
+ metadataAssociationSuite,querySuite,dstSuite))
if __name__ == '__main__':
sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful())