summaryrefslogtreecommitdiffstats
path: root/bindings/python/tests/idwsf2_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'bindings/python/tests/idwsf2_tests.py')
-rwxr-xr-xbindings/python/tests/idwsf2_tests.py1519
1 files changed, 160 insertions, 1359 deletions
diff --git a/bindings/python/tests/idwsf2_tests.py b/bindings/python/tests/idwsf2_tests.py
index 1f3c7f46..8e8a53a6 100755
--- a/bindings/python/tests/idwsf2_tests.py
+++ b/bindings/python/tests/idwsf2_tests.py
@@ -28,6 +28,7 @@
import os
import unittest
import sys
+import pprint
if not '..' in sys.path:
sys.path.insert(0, '..')
@@ -86,25 +87,81 @@ class IdWsf2TestCase(unittest.TestCase):
return server
- def metadataRegister(self, wsp, idp):
+ def metadataRegister(self, wsp, idp, session_dump, abstract = None, address = None, provider_id = None, service_types = None, services_map = None):
+ session = lasso.Session.newFromDump(session_dump)
+ assertion = session.getAssertion(idp.providerId)
+ self.failUnless(assertion is not None)
+ epr = assertion.idwsf2GetDiscoveryBootstrapEpr()
+ self.failUnless(epr is not None)
wsp_disco = lasso.IdWsf2Discovery(wsp)
+ wsp_disco.setEpr(epr)
abstract = 'Personal Profile service'
+ self.failUnless(abstract is not None)
+ self.failUnless(address is not None)
+ self.failUnless(service_types is not None)
+ self.failUnless(isinstance(services_map, dict))
wsp_disco.initMetadataRegister()
+ if not provider_id:
+ provider_id = wsp.providerId
wsp_disco.addSimpleServiceMetadata(
- service_types = ('urn:liberty:id-sis-pp:2005-05',),
- abstract = abstract, provider_id = wsp.providerIds[0],
- address = idpSoapEndpoint)
+ service_types = service_types,
+ abstract = abstract, provider_id = provider_id,
+ address = address,
+ security_mechanisms = (lasso.SECURITY_MECH_BEARER,))
wsp_disco.buildRequestMsg()
idp_disco = lasso.IdWsf2Discovery(idp)
idp_disco.processRequestMsg(wsp_disco.msgBody)
+ idp_disco.checkSecurityMechanism()
idp_disco.validateRequest()
self.failUnlessEqual(len(idp_disco.metadatas), 1)
+ # add metadatas to directory
+ sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId()
+ self.failUnless(sender is not None)
+ metadatas = services_map.get(sender, [])
+ for metadata in idp_disco.metadatas:
+ services_map[metadata.svcMDID] = metadata
+ metadatas.append(metadata.svcMDID)
+ services_map[sender] = metadatas
idp_disco.buildResponseMsg()
wsp_disco.processResponseMsg(idp_disco.msgBody)
self.failUnlessEqual(len(wsp_disco.metadatas), 1)
self.failUnlessEqual(wsp_disco.metadatas[0].svcMDID, wsp_disco.response.svcMDID[0])
- return idp, wsp_disco.metadatas[0]
+ return wsp_disco.metadatas[0].svcMDID
+
+ def addAssociation(self, wsp, idp, session_dump, svcmdid, service_maps, service_associations):
+ self.failUnless(isinstance(service_associations, dict))
+ self.failUnless(isinstance(service_maps, dict))
+ # Get the bootstrap
+ session = lasso.Session.newFromDump(session_dump)
+ assertion = session.getAssertion(idp.providerId)
+ self.failUnless(assertion is not None)
+ epr = assertion.idwsf2GetDiscoveryBootstrapEpr()
+ self.failUnless(epr is not None)
+ wsp_disco = lasso.IdWsf2Discovery(wsp)
+ wsp_disco.setEpr(epr)
+ wsp_disco.initMetadataAssociationAdd()
+ wsp_disco.svcmdids = (svcmdid,)
+ wsp_disco.buildRequestMsg()
+ # Handle request
+ idp_disco = lasso.IdWsf2Discovery(idp)
+ idp_disco.processRequestMsg(wsp_disco.msgBody)
+ idp_disco.checkSecurityMechanism()
+ self.failUnlessEqual(idp_disco.svcmdids, (svcmdid,))
+ sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId()
+ name_identifier = idp_disco.getNameIdentifier()
+ k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content)
+ l = service_associations.get(k, [])
+ for id in idp_disco.svcmdids:
+ # check it exists
+ self.failUnless(service_maps.get(id) is not None)
+ # create association
+ if id not in l:
+ l.append(id)
+ service_associations[k] = l
+ idp_disco.validateRequest()
+ idp_disco.buildResponseMsg()
+ wsp_disco.processResponseMsg(idp_disco.msgBody)
def login(self, sp, idp, sp_identity_dump=None, sp_session_dump=None,
idp_identity_dump=None, idp_session_dump=None):
@@ -124,7 +181,7 @@ class IdWsf2TestCase(unittest.TestCase):
idp_login.processAuthnRequestMsg(query)
idp_login.validateRequestMsg(True, True)
idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None)
- idp_login.idwsf2AddDiscoveryBootstrapEpr(url = idpSoapEndpoint, abstract = 'Discovery Service', security_mech_id = lasso.SECURITY_MECH_BEARER)
+ idp_login.idwsf2AddDiscoveryBootstrapEpr(url = idpSoapEndpoint, abstract = 'Discovery Service', security_mechanisms = (lasso.SECURITY_MECH_BEARER,))
idp_login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET)
artifact_message = idp_login.artifactMessage
@@ -170,7 +227,7 @@ class MetadataTestCase(IdWsf2TestCase):
wsp_disco.addSimpleServiceMetadata(service_types =
(lasso.PP11_HREF,), abstract = abstract,
provider_id = wsp.providerId, address = spSoapEndpoint,
- security_mech_ids = (lasso.SECURITY_MECH_BEARER,))
+ security_mechanisms = (lasso.SECURITY_MECH_BEARER,))
self.failUnlessEqual(len(wsp_disco.metadatas), 1)
metadata = wsp_disco.metadatas[0]
self.failUnlessEqual(metadata.abstract, abstract)
@@ -256,7 +313,7 @@ class MetadataTestCase(IdWsf2TestCase):
wsp_disco.addSimpleServiceMetadata(service_types =
(lasso.PP11_HREF,), abstract = abstract,
provider_id = wsp.providerId, address = spSoapEndpoint,
- security_mech_ids= (lasso.SECURITY_MECH_BEARER,))
+ security_mechanisms= (lasso.SECURITY_MECH_BEARER,))
self.failUnlessEqual(len(wsp_disco.metadatas), 1)
metadata = wsp_disco.metadatas[0]
self.failUnlessEqual(metadata.abstract, abstract)
@@ -320,7 +377,6 @@ class MetadataTestCase(IdWsf2TestCase):
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
- print dst_epr.dump()
abstract = 'Personal Profile service'
wsp_disco.initMetadataRegister()
@@ -333,7 +389,7 @@ class MetadataTestCase(IdWsf2TestCase):
wsp_disco.addSimpleServiceMetadata(service_types =
(lasso.PP11_HREF,), abstract = abstract,
provider_id = wsp.providerId, address = spSoapEndpoint,
- security_mech_ids = (lasso.SECURITY_MECH_BEARER,))
+ security_mechanisms = (lasso.SECURITY_MECH_BEARER,))
self.failUnlessEqual(len(wsp_disco.metadatas), 1)
metadata = wsp_disco.metadatas[0]
self.failUnlessEqual(metadata.abstract, abstract)
@@ -422,7 +478,6 @@ class MetadataTestCase(IdWsf2TestCase):
try:
idp_disco.checkSecurityMechanism()
except lasso.Error, e:
- print wsp_disco.msgBody
self.fail(e)
try:
idp_disco.validateRequest()
@@ -459,1399 +514,145 @@ class MetadataTestCase(IdWsf2TestCase):
self.failUnless(len(wsp_disco.metadatas) == 1, 'missing svcMDID')
self.failUnless(wsp_disco.metadatas[0].svcMDID, 'missing svcMDID')
-class MetadataAssociationAddTestCase(IdWsf2TestCase):
- def test01(self):
- """Init metadata association add request"""
+ def test04(self):
+ """Test metadata query"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- idp, svcMD = self.metadataRegister(wsp, idp)
wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- try:
- wsp_disco.initMetadataAssociationAdd()
- except lasso.Error, e:
- self.fail(e)
- try:
- wsp_disco.addServiceMetadata(svcMD)
- except lasso.Error, e:
- self.fail(e)
-
- def test03(self):
- """Init metadata association add request - msgUrl construction"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMD = self.metadataRegister(wsp, idp)
+ service_map = {}
+ self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
+ (lasso.PP11_HREF,), address = spSoapEndpoint,
+ abstract = 'My first PP service', services_map = service_map)
+ self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
+ (lasso.PP11_HREF,), address = spSoapEndpoint+'2',
+ abstract = 'My second PP service', services_map = service_map)
wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
- wsp_disco.initMetadataAssociationAdd()
- wsp_disco.addServiceMetadata(svcMD)
+ wsp_disco.initMetadataQuery()
wsp_disco.buildRequestMsg()
- self.failUnless(wsp_disco.msgUrl, 'missing msgUrl')
- self.failUnless(wsp_disco.msgBody, 'missing msgBody')
+ idp_disco = lasso.IdWsf2Discovery(idp)
+ idp_disco.processRequestMsg(wsp_disco.msgBody)
+ idp_disco.checkSecurityMechanism()
+ self.failUnlessEqual(idp_disco.svcmdids, None)
+ sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId()
+ for svcMDID in service_map.get(sender, []):
+ idp_disco.addServiceMetadata(service_map.get(svcMDID))
+ idp_disco.validateRequest()
+ idp_disco.buildResponseMsg()
+ wsp_disco.processResponseMsg(idp_disco.msgBody)
+ self.failUnless(len(wsp_disco.metadatas), 2)
def test05(self):
- """Process metadata association add request"""
+ """Test metadata delete"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- idp, svcMD = self.metadataRegister(wsp, idp)
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ service_map = {}
+ self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
+ (lasso.PP11_HREF,), address = spSoapEndpoint,
+ abstract = 'My first PP service', services_map = service_map)
+ self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
+ (lasso.PP11_HREF,), address = spSoapEndpoint+'2',
+ abstract = 'My second PP service', services_map = service_map)
wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
wsp_disco = lasso.IdWsf2Discovery(wsp)
wsp_disco.setEpr(dst_epr)
- wsp_disco.initMetadataAssociationAdd()
- wsp_disco.addServiceMetadata(svcMD)
+ wsp_disco.initMetadataDelete()
+ svcmdids = tuple(service_map[wsp.providerId])
+ wsp_disco.setSvcmdids(svcmdids)
wsp_disco.buildRequestMsg()
idp_disco = lasso.IdWsf2Discovery(idp)
-
- try:
- idp_disco.processRequestMsg(wsp_disco.msgBody)
- except lasso.Error, e:
- self.fail(e)
-
- try:
- idp_disco.checkSecurityMechanism()
- except lasso.Error, e:
- self.fail(e)
-
- try:
- idp_disco.validateRequest()
- except lasso.Error, e:
- self.fail(e)
-
- def test06(self):
- """Register metadata association"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- try:
- idp_disco.registerMetadata()
- except lasso.Error, e:
- self.fail(e)
-
- def test07(self):
- """Check metadata association registration"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
- idp_disco.registerMetadata()
-
- self.failUnless(idp_disco.isIdentityDirty, 'identity has not changed, it should contain a svcMDID')
- self.failUnless(idp_disco.identity.dump() != idp_identity_dump,
- 'identity dump has not changed, it should contain a svcMDID')
-
- def test08(self):
- """Build metadata association add response"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
-
- idp_disco.buildResponseMsg()
-
- self.failUnless(idp_disco.msgBody)
-
- def test09(self):
- """Process metadata association add response"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
+ idp_disco.processRequestMsg(wsp_disco.msgBody)
+ idp_disco.checkSecurityMechanism()
+ self.failUnlessEqual(idp_disco.svcmdids, svcmdids)
+ sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId()
+ self.failUnlessEqual(sender, wsp.providerId)
+ idp_disco.validateRequest()
idp_disco.buildResponseMsg()
+ wsp_disco.processResponseMsg(idp_disco.msgBody)
- try:
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
- except lasso.Error, e:
- self.fail(e)
-
-
-class DiscoveryQueryTestCase(IdWsf2TestCase):
+class MetadataAssociationTestCase(IdWsf2TestCase):
def test01(self):
- """Init discovery query"""
+ """Metadata association add"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp)
+ service_map = {}
+ svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
+ (lasso.PP11_HREF,), address = spSoapEndpoint,
+ abstract = 'My first PP service', services_map = service_map)
+ # Make the request
wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
+ wsp_disco.setEpr(dst_epr)
+ wsp_disco.initMetadataAssociationAdd()
+ wsp_disco.svcmdids = (svcMDID,)
wsp_disco.buildRequestMsg()
-
+ # Receive it
idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
+ idp_disco.processRequestMsg(wsp_disco.msgBody)
+ idp_disco.checkSecurityMechanism()
+ self.failUnlessEqual(idp_disco.svcmdids, (svcMDID,))
+ sender = idp_disco.getSoapEnvelopeRequest().sb2GetProviderId()
+ name_identifier = idp_disco.getNameIdentifier()
+ # Store the association
+ self.failUnless(sender is not None)
+ self.failUnless(name_identifier is not None)
+ idp_disco.validateRequest()
idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- try:
- wsc_disco.initQuery()
- except lasso.Error, e:
- self.fail(e)
+ wsp_disco.processResponseMsg(idp_disco.msgBody)
def test02(self):
- """Init discovery query without login"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
-
- try:
- wsc_disco.initQuery()
- except lasso.Error, e:
- if e[0] != lasso.PROFILE_ERROR_SESSION_NOT_FOUND:
- self.fail(e)
- else:
- self.fail('Should have a "session not found" exception')
+ """Metadata association query"""
+ pass
def test03(self):
- """Init discovery query - check msg url"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
-
- self.failUnless(wsc_disco.msgUrl, 'missing msgUrl')
-
- def test04(self):
- """Add requested service type to discovery query"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
-
- try:
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- except lasso.Error, e:
- self.fail(e)
-
- def test05(self):
- """Build discovery query"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- wsc_disco.buildRequestMsg()
-
- self.failUnless(wsc_disco.msgBody)
-
- def test06(self):
- """Process discovery query"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- wsc_disco.buildRequestMsg()
-
- try:
- idp_disco.processQueryMsg(wsc_disco.msgBody)
- except lasso.Error, e:
- self.fail(e)
-
- def test07(self):
- """Process discovery query and check name identifier"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
+ """Metadata association delete"""
+ pass
- wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- wsc_disco.buildRequestMsg()
-
- idp_disco.processQueryMsg(wsc_disco.msgBody)
-
- def test08(self):
- """Build discovery query response EPRs"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- wsc_disco.buildRequestMsg()
-
- idp_disco.processQueryMsg(wsc_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- try:
- idp_disco.buildQueryResponseEprs()
- except lasso.Error, e:
- self.fail(e)
-
- def test09(self):
- """Build discovery query response"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- wsc_disco.buildRequestMsg()
-
- idp_disco.processQueryMsg(wsc_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
- idp_disco.buildQueryResponseEprs()
- idp_disco.buildResponseMsg()
-
- self.failUnless(idp_disco.msgBody, 'missing msgBody')
-
- def test10(self):
- """Process discovery query response"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- wsc_disco.buildRequestMsg()
-
- idp_disco.processQueryMsg(wsc_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
- idp_disco.buildQueryResponseEprs()
- idp_disco.buildResponseMsg()
-
- try:
- wsc_disco.processQueryResponseMsg(idp_disco.msgBody)
- except lasso.Error, e:
- self.fail(e)
-
- def test11(self):
- """Check discovery query result"""
+class QueryTestCase(IdWsf2TestCase):
+ def test01(self):
+ """Simple DST query"""
idp = self.getIdpServer()
wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
- idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
+ # Register the service, add an association
+ wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, wsp_dst_epr = self.login(wsp, idp)
+ service_maps = {}
+ svcMDID = self.metadataRegister(wsp, idp, wsp_session_dump, service_types =
+ (lasso.PP11_HREF,), address = spSoapEndpoint,
+ abstract = 'My first PP service', services_map = service_maps)
+ service_associations = {}
+ self.addAssociation(wsp, idp, wsp_session_dump, svcMDID, service_maps, service_associations)
+ # Try to find the service
+ wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump, wsc_dst_epr = self.login(wsp, idp)
wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
+ wsc_disco.setEpr(wsc_dst_epr)
wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
+ wsc_disco.addRequestedService(service_types = (lasso.PP11_HREF,))
wsc_disco.buildRequestMsg()
-
- idp_disco.processQueryMsg(wsc_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
- idp_disco.buildQueryResponseEprs()
- idp_disco.buildResponseMsg()
-
- wsc_disco.processQueryResponseMsg(idp_disco.msgBody)
-
- self.failUnless(wsc_disco.getService(), 'missing service after discovery query')
-
-
-class DataServiceQueryTestCase(IdWsf2TestCase):
- def getProfileService(self):
- """Check discovery query result"""
- idp = self.getIdpServer()
- wsp = self.getWspServer()
- idp, svcMDID = self.metadataRegister(wsp, idp)
- wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump = self.login(wsp, idp)
-
- wsp_disco = lasso.IdWsf2Discovery(wsp)
- if wsp_identity_dump is not None:
- wsp_disco.setIdentityFromDump(wsp_identity_dump)
- if wsp_session_dump is not None:
- wsp_disco.setSessionFromDump(wsp_session_dump)
- wsp_disco.initMetadataAssociationAdd(svcMDID)
- wsp_disco.buildRequestMsg()
-
idp_disco = lasso.IdWsf2Discovery(idp)
- idp_disco.processMetadataAssociationAddMsg(wsp_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
-
- idp_disco.registerMetadata()
- if idp_disco.isIdentityDirty:
- idp_identity_dump = idp_disco.identity.dump()
- if idp_disco.isSessionDirty:
- idp_session_dump = idp_disco.session.dump()
- idp_disco.buildResponseMsg()
-
- wsp_disco.processMetadataAssociationAddResponseMsg(idp_disco.msgBody)
-
- wsc = self.getWscServer()
- wsc_identity_dump, wsc_session_dump, idp_identity_dump, idp_session_dump = \
- self.login(wsc, idp, None, None, idp_identity_dump, idp_session_dump)
-
- wsc_disco = lasso.IdWsf2Discovery(wsc)
- if wsc_identity_dump is not None:
- wsc_disco.setIdentityFromDump(wsc_identity_dump)
- if wsc_session_dump is not None:
- wsc_disco.setSessionFromDump(wsc_session_dump)
-
- wsc_disco.initQuery()
- wsc_disco.addRequestedServiceType('urn:liberty:id-sis-pp:2005-05')
- wsc_disco.buildRequestMsg()
-
- idp_disco.processQueryMsg(wsc_disco.msgBody)
- if idp_identity_dump is not None:
- idp_disco.setIdentityFromDump(idp_identity_dump)
- if idp_session_dump is not None:
- idp_disco.setSessionFromDump(idp_session_dump)
- idp_disco.buildQueryResponseEprs()
+ idp_disco.setIdentityFromDump(idp_identity_dump)
+ idp_disco.processRequestMsg(wsc_disco.msgBody)
+ name_identifier = idp_disco.getNameIdentifier()
+ k = (name_identifier.nameQualifier, name_identifier.format, name_identifier.sPNameQualifier, name_identifier.content)
+ for x in service_associations:
+ for id in service_associations[x]:
+ idp_disco.addServiceMetadata(service_maps[id])
+ idp_disco.validateRequest()
idp_disco.buildResponseMsg()
- wsc_disco.processQueryResponseMsg(idp_disco.msgBody)
-
- return wsc_disco.getService(), wsp
-
- def test01(self):
- """Data service init query"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
-
- try:
- service.initQuery()
- except lasso.Error, e:
- self.fail(e)
-
- def test02(self):
- """Data service init query - msgUrl construction"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
-
- self.failUnless(service.msgUrl, 'missing msgUrl')
-
- def test03(self):
- """Data service add query item"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
-
- try:
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- except lasso.Error, e:
- self.fail(e)
-
- def test04(self):
- """Data service build query"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.buildRequestMsg()
-
- self.failUnless(service.msgBody, 'missing msgBody')
-
- def test05(self):
- """Data service build query with multiple items"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- self.failUnless(service.msgBody, 'missing msgBody')
-
- def test06(self):
- """Data service process query"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
-
- try:
- wsp_service.processQueryMsg(service.msgBody)
- except lasso.Error, e:
- self.fail(e)
-
- def test07(self):
- """Data service check service type"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- self.failUnless(wsp_service.type, 'service type is not set')
- self.failUnless(wsp_service.type == 'urn:liberty:id-sis-pp:2005-05', 'wrong service type')
-
- def test08(self):
- """Data service get query items"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- self.failUnless(wsp_service.queryItems, 'queryItems list is None or empty')
-
- def test09(self):
- """Data service check query items"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- items = [ '/pp2:PP/pp2:InformalName', 'not existing attribute', '/pp2:PP/pp2:MsgContact' ]
- for i in range(3):
- self.failUnless(wsp_service.queryItems[i] == items[i],
- "query items don't match : %s != %s" % (wsp_service.queryItems[i], items[i]))
-
- def test10(self):
- """Data service check name identifier"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- def test11(self):
- """Data service parse query items - success"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- try:
- wsp_service.parseQueryItems()
- except lasso.Error, e:
- self.fail(e)
-
- def test12(self):
- """Data service parse query items - failure - no item"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- try:
- wsp_service.parseQueryItems()
- except lasso.DstEmptyRequestError:
- pass
- except Exception, e:
- self.fail(e)
-
- def test13(self):
- """Data service parse query items - failure - wrong item"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- try:
- wsp_service.parseQueryItems()
- except lasso.Error, e:
- if e[0] != lasso.DST_ERROR_QUERY_FAILED:
- self.fail(e)
- else:
- self.fail('query items parsing should have failed because a wrong query item was requested')
-
- def test14(self):
- """Data service parse query items - failure - no data"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- try:
- wsp_service.parseQueryItems()
- except lasso.Error, e:
- if e[0] != lasso.DST_ERROR_MISSING_SERVICE_DATA:
- self.fail(e)
- else:
- self.fail('query items parsing should have failed because no data was provided')
-
-
- def test15(self):
- """Data service parse query items - partial failure"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- </PP>"""
-
- # No exception should be raised here but one will be raised on the WSC
- # when parsing response status
- try:
- wsp_service.parseQueryItems()
- except lasso.Error, e:
- self.fail(e)
-
- def test16(self):
- """Data service build query response"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- wsp_service.parseQueryItems()
- wsp_service.buildResponseMsg()
-
- self.failUnless(wsp_service.msgBody, 'missing msgBody')
-
- def test17(self):
- """Data service process query response - success"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- wsp_service.parseQueryItems()
- wsp_service.buildResponseMsg()
-
- try:
- service.processQueryResponseMsg(wsp_service.msgBody)
- except lasso.Error, e:
- self.fail(e)
-
- def test18(self):
- """Data service process query response - failure"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- try:
- wsp_service.parseQueryItems()
- except lasso.Error, e:
- pass
- wsp_service.buildResponseMsg()
-
- try:
- service.processQueryResponseMsg(wsp_service.msgBody)
- except lasso.Error, e:
- if e[0] != lasso.DST_ERROR_QUERY_FAILED:
- self.fail(e)
- else:
- self.fail('response should have a "failed" status because a wrong query item was requested')
-
- def test19(self):
- """Data service process query response - partial failure"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- try:
- wsp_service.parseQueryItems()
- except lasso.DstQueryPartiallyFailedError:
- pass
- except:
- self.fail('parseQueryItems should emit a "partially failed" error because a wrong query item was requested')
- wsp_service.buildResponseMsg()
-
- try:
- service.processQueryResponseMsg(wsp_service.msgBody)
- except lasso.DstQueryPartiallyFailedError:
- pass
- except:
- self.fail('response should have a "partially failed" status because a wrong query item was requested')
-
- def test20(self):
- """Data service get first attribute"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- wsp_service.parseQueryItems()
- wsp_service.buildResponseMsg()
-
- service.processQueryResponseMsg(wsp_service.msgBody)
- informal_name = service.getAttributeNode()
-
- self.failUnlessEqual(informal_name, """<pp2:InformalName xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">User name</pp2:InformalName>""", 'first attribute node is wrong')
-
- def test21(self):
- """Data service get attribute string"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- wsp_service.parseQueryItems()
- wsp_service.buildResponseMsg()
-
- service.processQueryResponseMsg(wsp_service.msgBody)
- informal_name = service.getAttributeString('name')
-
- self.failUnlessEqual(informal_name, 'User name', 'attribute string is wrong')
-
- def test22(self):
- """Data service get attribute node"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- wsp_service.parseQueryItems()
- wsp_service.buildResponseMsg()
-
- service.processQueryResponseMsg(wsp_service.msgBody)
- email = service.getAttributeNode('email')
-
- expected_result = """<pp2:MsgContact xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">.*?<pp2:MsgAccount>Email account</pp2:MsgAccount>.*?<pp2:MsgProvider>Email server</pp2:MsgProvider>.*?</pp2:MsgContact>"""
-
- import re
- result = re.findall(expected_result, email, re.DOTALL)
-
- self.failUnless(len(result) == 1, 'attribute node is wrong')
-
- def test23(self):
- """Data service get attribute node - partial failure"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <InformalName>User name</InformalName>
- <MsgContact>
- <MsgAccount>Email account</MsgAccount>
- <MsgProvider>Email server</MsgProvider>
- </MsgContact>
- </PP>"""
-
- try:
- wsp_service.parseQueryItems()
- except lasso.DstQueryPartiallyFailedError:
- pass
- except:
- self.fail('parseQueryItems should emit a "partially failed" error because a wrong query item was requested')
- wsp_service.buildResponseMsg()
-
- try:
- service.processQueryResponseMsg(wsp_service.msgBody)
- except lasso.Error, e:
- if e[0] == lasso.DST_ERROR_QUERY_PARTIALLY_FAILED:
- pass
- informal_name = service.getAttributeString('name')
- email = service.getAttributeNode('email')
-
- self.failUnlessEqual(informal_name, 'User name', 'attribute string is wrong')
- self.failUnlessEqual(email, None, 'attribute node should be None')
-
- def test24(self):
- """Data service redirect request"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- if '/pp2:PP/pp2:MsgContact' in wsp_service.queryItems:
- wsp_service.initRedirectUserForConsent('http://sp5/consent');
- wsp_service.buildResponseMsg()
-
- try:
- service.processQueryResponseMsg(wsp_service.msgBody)
- except lasso.SoapRedirectRequestFaultError:
- pass
- except Exception, e:
- self.fail(e)
- else:
- self.fail('a "soap fault redirect request" exception should have been raised')
-
- def test25(self):
- """Data service redirect request - check redirectUrl"""
- service, wsp = self.getProfileService()
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:InformalName', 'name')
- service.addQueryItem('not existing attribute', 'not existing attribute')
- service.addQueryItem('/pp2:PP/pp2:MsgContact', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- if '/pp2:PP/pp2:MsgContact' in wsp_service.queryItems:
- wsp_service.initRedirectUserForConsent('http://sp5/consent');
- wsp_service.buildResponseMsg()
-
- try:
- service.processQueryResponseMsg(wsp_service.msgBody)
- except lasso.SoapRedirectRequestFaultError:
- pass
- except Exception, e:
- self.fail(e)
-
- self.failUnlessEqual(service.redirectUrl, 'http://sp5/consent', 'redirectUrl is not set or wrong')
-
- def test26(self):
- """Data service get multiple attribute nodes with the same name"""
- lasso.registerIdWsf2DstService('pp2', 'urn:liberty:id-sis-pp:2005-05')
- service, wsp = self.getProfileService()
- service.initQuery()
- service.addQueryItem('/pp2:PP/pp2:MsgContact/pp2:MsgAccount', 'email')
- service.buildRequestMsg()
-
- wsp_service = lasso.IdWsf2DataService(wsp)
- wsp_service.processQueryMsg(service.msgBody)
-
- email1 = 'Email account 1'
- email2 = 'Email account 2'
- wsp_service.data = """<PP xmlns="urn:liberty:id-sis-pp:2005-05">
- <MsgContact>
- <MsgAccount>%s</MsgAccount>
- </MsgContact>
- <MsgContact>
- <MsgAccount>%s</MsgAccount>
- </MsgContact>
- </PP>""" % (email1, email2)
-
- wsp_service.parseQueryItems()
- wsp_service.buildResponseMsg()
-
- service.processQueryResponseMsg(wsp_service.msgBody)
-
- email_nodes = service.getAttributeNodes('email')
- self.failUnless(email_nodes[0] ==
- '<pp2:MsgAccount xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">%s</pp2:MsgAccount>' % email1)
- self.failUnless(email_nodes[1] ==
- '<pp2:MsgAccount xmlns="urn:liberty:id-sis-pp:2005-05" xmlns:pp2="urn:liberty:id-sis-pp:2005-05">%s</pp2:MsgAccount>' % email2)
-
- email_strings = service.getAttributeStrings('email')
- self.failUnless(email_strings[0] == email1)
- self.failUnless(email_strings[1] == email2)
-
-
metadataSuite = unittest.makeSuite(MetadataTestCase, 'test')
-#metadataAssociationAddSuite = unittest.makeSuite(MetadataAssociationAddTestCase, 'test')
-#discoveryQuerySuite = unittest.makeSuite(DiscoveryQueryTestCase, 'test')
-#dataServiceQuerySuite = unittest.makeSuite(DataServiceQueryTestCase, 'test')
+metadataAssociationSuite = unittest.makeSuite(MetadataAssociationTestCase, 'test')
+querySuite = unittest.makeSuite(QueryTestCase, 'test')
+
-allTests = unittest.TestSuite((metadataSuite,))
-# metadataAssociationAddSuite,)) # discoveryQuerySuite, dataServiceQuerySuite))
+allTests = unittest.TestSuite((metadataSuite,
+ metadataAssociationSuite,querySuite))
if __name__ == '__main__':
sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful())