#! /usr/bin/env python
# -*- coding: UTF-8 -*-
#
# $Id: profiles_tests.py 3254 2007-06-05 21:23:57Z fpeters $
#
# Python unit tests for Lasso library
#
# Copyright (C) 2004-2007 Entr'ouvert
# http://lasso.entrouvert.org
#
# Authors: See AUTHORS file in top-level directory.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see .
import os
import unittest
import sys
if not '..' in sys.path:
sys.path.insert(0, '..')
if not '../.libs' in sys.path:
sys.path.insert(0, '../.libs')
import lasso
import logging
logging.basicConfig()
try:
dataDir
except NameError:
srcdir = os.environ.get('TOP_SRCDIR', '.')
dataDir = '%s/tests/data' % srcdir
def server(local_name, remote_role, remote_name):
pwd = os.path.join(dataDir, local_name, 'password')
password = None
if os.path.exists(pwd):
password = file(pwd).read()
s = lasso.Server(os.path.join(dataDir, local_name, 'metadata.xml'),
os.path.join(dataDir, local_name, 'private-key.pem'),
password)
s.addProvider(remote_role, os.path.join(dataDir, remote_name, 'metadata.xml'))
return s
class ServerTestCase(unittest.TestCase):
def test01(self):
"""Server construction, dump & newFromDump."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'sp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/public-key.pem'),
os.path.join(dataDir, 'idp1-la/certificate.pem'))
dump = lassoServer.dump()
lassoServer2 = lassoServer.newFromDump(dump)
dump2 = lassoServer2.dump()
self.failUnlessEqual(dump, dump2)
def test02(self):
"""Server construction without argument, dump & newFromDump."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'sp1-la/metadata.xml'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/public-key.pem'))
dump = lassoServer.dump()
lassoServer2 = lassoServer.newFromDump(dump)
dump2 = lassoServer2.dump()
self.failUnlessEqual(dump, dump2)
class LoginTestCase(unittest.TestCase):
def test01(self):
"""SP login; testing access to authentication request."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'sp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/public-key.pem'),
os.path.join(dataDir, 'idp1-la/certificate.pem'))
login = lasso.Login(lassoServer)
login.initAuthnRequest()
login.request
login.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
self.failUnlessEqual(login.request.protocolProfile, lasso.LIB_PROTOCOL_PROFILE_BRWS_ART)
def test02(self):
"""SP login; testing processing of an empty Response."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'sp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/public-key.pem'),
os.path.join(dataDir, 'idp1-la/certificate.pem'))
login = lasso.Login(lassoServer)
try:
login.processResponseMsg('')
except lasso.Error, error:
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
raise
def test03(self):
"""Conversion of a lib:AuthnRequest with an AuthnContext into a query and back."""
sp = lasso.Server(
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'sp1-la/certificate.pem'))
sp.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/public-key.pem'),
os.path.join(dataDir, 'idp1-la/certificate.pem'))
spLogin = lasso.Login(sp)
spLogin.initAuthnRequest()
requestAuthnContext = lasso.LibRequestAuthnContext()
authnContextClassRefsList = []
authnContextClassRefsList.append(
lasso.LIB_AUTHN_CONTEXT_CLASS_REF_PASSWORD)
requestAuthnContext.authnContextClassRef = tuple(authnContextClassRefsList)
spLogin.request.requestAuthnContext = requestAuthnContext
spLogin.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
spLogin.buildAuthnRequestMsg()
authnRequestUrl = spLogin.msgUrl
authnRequestQuery = spLogin.msgUrl[spLogin.msgUrl.index('?') + 1:]
idp = lasso.Server(
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'idp1-la/certificate.pem'))
idp.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/public-key.pem'),
os.path.join(dataDir, 'sp1-la/certificate.pem'))
idpLogin = lasso.Login(idp)
idpLogin.processAuthnRequestMsg(authnRequestQuery)
self.failUnless(idpLogin.request.requestAuthnContext)
authnContextClassRefsList = idpLogin.request.requestAuthnContext.authnContextClassRef
self.failUnlessEqual(len(authnContextClassRefsList), 1)
self.failUnlessEqual(authnContextClassRefsList[0],
lasso.LIB_AUTHN_CONTEXT_CLASS_REF_PASSWORD)
def test04(self):
"""Conversion of a lib:AuthnRequest with extensions into a query and back."""
sp = lasso.Server(
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'sp1-la/certificate.pem'))
sp.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/public-key.pem'),
os.path.join(dataDir, 'idp1-la/certificate.pem'))
spLogin = lasso.Login(sp)
spLogin.initAuthnRequest()
requestAuthnContext = lasso.LibRequestAuthnContext()
extensionList = []
for extension in (
'do',
'do action 2do action 3'):
extensionList.append(
'%s'
% extension)
spLogin.request.extension = tuple(extensionList)
spLogin.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
spLogin.buildAuthnRequestMsg()
authnRequestUrl = spLogin.msgUrl
authnRequestQuery = spLogin.msgUrl[spLogin.msgUrl.index('?') + 1:]
idp = lasso.Server(
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'idp1-la/certificate.pem'))
idp.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/public-key.pem'),
os.path.join(dataDir, 'sp1-la/certificate.pem'))
idpLogin = lasso.Login(idp)
idpLogin.processAuthnRequestMsg(authnRequestQuery)
self.failUnless(idpLogin.request.extension)
extensionsList = idpLogin.request.extension
self.failUnlessEqual(len(extensionsList), 1)
self.failUnless('do' in extensionsList[0])
self.failUnless('do action 2' in extensionsList[0])
self.failUnless('do action 3' in extensionsList[0])
def test05(self):
'''SAMLv2 Authn request emitted and received using Artifact binding'''
sp = lasso.Server(
os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
assert sp
sp.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp5-saml2/metadata.xml'))
sp_login = lasso.Login(sp)
assert sp_login
sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_ARTIFACT_GET)
sp_login.buildAuthnRequestMsg()
sp_login_dump = sp_login.dump()
idp = lasso.Server(
os.path.join(dataDir, 'idp5-saml2/metadata.xml'),
os.path.join(dataDir, 'idp5-saml2/private-key.pem'))
idp.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
idp_login = lasso.Login(idp)
idp_login.initRequest(sp_login.msgUrl.split('?')[1], lasso.HTTP_METHOD_ARTIFACT_GET)
idp_login.buildRequestMsg()
sp_login2 = lasso.Login.newFromDump(sp, sp_login_dump)
assert isinstance(sp_login2, lasso.Login)
assert idp_login.msgBody
sp_login2.processRequestMsg(idp_login.msgBody)
sp_login2.buildResponseMsg()
assert sp_login2.msgBody
try:
idp_login.processResponseMsg(sp_login2.msgBody)
except:
raise
assert isinstance(idp_login.request, lasso.Samlp2AuthnRequest)
def test_06(self):
'''Login test between SP and IdP with encrypted private keys'''
sp_server = server('sp7-saml2', lasso.PROVIDER_ROLE_IDP, 'idp7-saml2')
idp_server = server('idp7-saml2', lasso.PROVIDER_ROLE_SP, 'sp7-saml2')
sp_login = lasso.Login(sp_server)
sp_login.initAuthnRequest()
sp_login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST;
sp_login.buildAuthnRequestMsg()
idp_login = lasso.Login(idp_server)
idp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
idp_login.validateRequestMsg(True, True)
idp_login.buildAssertion("None", "None", "None", "None", "None")
idp_login.buildAuthnResponseMsg()
sp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
sp_login.processAuthnResponseMsg(idp_login.msgBody)
sp_login.acceptSso()
def test07(self):
'''SAMLv2 SSO with DSA key for the IdP'''
sp = lasso.Server(
os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
assert sp
sp.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp12-dsa-saml2/metadata.xml'))
sp_login = lasso.Login(sp)
assert sp_login
sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_REDIRECT)
sp_login.buildAuthnRequestMsg()
idp = lasso.Server(
os.path.join(dataDir, 'idp12-dsa-saml2/metadata.xml'),
os.path.join(dataDir, 'idp12-dsa-saml2/private-key.pem'))
idp.signatureMethod = lasso.SIGNATURE_METHOD_DSA_SHA1
idp.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
idp_login = lasso.Login(idp)
print sp_login.msgUrl
idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
idp_login.protocolProfile = lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST;
idp_login.validateRequestMsg(True, True)
idp_login.buildAssertion("None", "None", "None", "None", "None")
idp_login.buildAuthnResponseMsg()
class LogoutTestCase(unittest.TestCase):
def test01(self):
"""SP logout without session and identity; testing initRequest."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'sp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_IDP,
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/public-key.pem'),
os.path.join(dataDir, 'idp1-la/certificate.pem'))
logout = lasso.Logout(lassoServer)
try:
logout.initRequest()
except lasso.Error, error:
if error[0] != lasso.PROFILE_ERROR_SESSION_NOT_FOUND:
raise
else:
self.fail('logout.initRequest without having set identity before should fail')
def test02(self):
"""IDP logout without session and identity; testing logout.getNextProviderId."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'idp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/public-key.pem'),
os.path.join(dataDir, 'sp1-la/certificate.pem'))
logout = lasso.Logout(lassoServer)
self.failIf(logout.getNextProviderId())
def test03(self):
"""IDP logout; testing processRequestMsg with non Liberty query."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'idp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/public-key.pem'),
os.path.join(dataDir, 'sp1-la/certificate.pem'))
logout = lasso.Logout(lassoServer)
# The processRequestMsg should fail but not abort.
try:
logout.processRequestMsg('passport=0&lasso=1')
except lasso.Error, error:
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
raise
else:
self.fail('Logout processRequestMsg should have failed.')
def test04(self):
"""IDP logout; testing processResponseMsg with non Liberty query."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'idp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/public-key.pem'),
os.path.join(dataDir, 'sp1-la/certificate.pem'))
logout = lasso.Logout(lassoServer)
# The processResponseMsg should fail but not abort.
try:
logout.processResponseMsg('liberty=&alliance')
except lasso.Error, error:
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
raise
else:
self.fail('Logout processResponseMsg should have failed.')
class DefederationTestCase(unittest.TestCase):
def test01(self):
"""IDP initiated defederation; testing processNotificationMsg with non Liberty query."""
lassoServer = lasso.Server(
os.path.join(dataDir, 'idp1-la/metadata.xml'),
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
None,
os.path.join(dataDir, 'idp1-la/certificate.pem'))
lassoServer.addProvider(
lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp1-la/metadata.xml'),
os.path.join(dataDir, 'sp1-la/public-key.pem'),
os.path.join(dataDir, 'sp1-la/certificate.pem'))
defederation = lasso.Defederation(lassoServer)
# The processNotificationMsg should fail but not abort.
try:
defederation.processNotificationMsg('nonLibertyQuery=1')
except lasso.Error, error:
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
raise
else:
self.fail('Defederation processNotificationMsg should have failed.')
class IdentityTestCase(unittest.TestCase):
def test01(self):
"""Identity newFromDump & dump."""
return
# test disabled since dump format changed
identityDump = """_CD739B41C602EAEA93626EBD1751CB46_11EA77A4FED32C41824AC5DE87298E65"""
identity = lasso.Identity.newFromDump(identityDump)
newIdentityDump = identity.dump()
self.failUnlessEqual(identityDump, newIdentityDump)
class AttributeAuthorityTestCase(unittest.TestCase):
def test01(self):
'''Attribute request and response test between sp5 and idp6'''
s = lasso.Server(
os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
s.addProvider(lasso.PROVIDER_ROLE_ATTRIBUTE_AUTHORITY,
os.path.join(dataDir, 'idp6-saml2/metadata.xml'))
s2 = lasso.Server(
os.path.join(dataDir, 'idp6-saml2/metadata.xml'),
os.path.join(dataDir, 'idp6-saml2/private-key.pem'))
s2.addProvider(lasso.PROVIDER_ROLE_SP,
os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
aq = lasso.AssertionQuery(s)
rpid = s.providers.keys()[0]
aq.initRequest(rpid,
lasso.HTTP_METHOD_SOAP,
lasso.ASSERTION_QUERY_REQUEST_TYPE_ATTRIBUTE)
assert aq.request
assert aq.remoteProviderId == rpid
nid = lasso.Saml2NameID.newWithPersistentFormat(
lasso.buildUniqueId(32),
s.providerId, s2.providerId)
aq.nameIdentifier = nid
aq.addAttributeRequest(
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
'testAttribute')
aq.buildRequestMsg()
assert aq.msgUrl
assert aq.msgBody
aq2 = lasso.AssertionQuery(s2)
aq2.processRequestMsg(aq.msgBody)
assert aq.request
aq2.validateRequest()
assert aq2.response
assertion = lasso.Saml2Assertion()
aq2.response.assertion = (assertion, )
for attribute in aq2.request.attribute:
content = lasso.MiscTextNode.newWithString("xxx")
content.textChild = True
assertion.addAttributeWithNode(attribute.name, attribute.nameFormat,
content)
assertion.addAttributeWithNode(attribute.name, attribute.nameFormat,
content)
assertion.subject = aq.request.subject
s2.saml2AssertionSetupSignature(assertion)
aq2.buildResponseMsg()
aq.processResponseMsg(aq2.msgBody)
assert aq.response
assert aq.response.assertion[0]
assert aq.response.assertion[0].attributeStatement[0]
assert aq.response.assertion[0].attributeStatement[0].attribute[0]
assert aq.response.assertion[0].attributeStatement[0].attribute[0].attributeValue[0]
class LogoutTestCase(unittest.TestCase):
def test01(self):
'''Test parsing of a logout request with more than one session index'''
content = '''
me
coin
id1
id2
id3
'''
node = lasso.Samlp2LogoutRequest.newFromXmlNode(content)
assert isinstance(node, lasso.Samlp2LogoutRequest)
assert node.sessionIndex == 'id1'
assert node.sessionIndexes == ('id2', 'id3', 'id1')
serverSuite = unittest.makeSuite(ServerTestCase, 'test')
loginSuite = unittest.makeSuite(LoginTestCase, 'test')
logoutSuite = unittest.makeSuite(LogoutTestCase, 'test')
defederationSuite = unittest.makeSuite(DefederationTestCase, 'test')
identitySuite = unittest.makeSuite(IdentityTestCase, 'test')
attributeSuite = unittest.makeSuite(AttributeAuthorityTestCase, 'test')
logoutSuite = unittest.makeSuite(LogoutTestCase, 'test')
allTests = unittest.TestSuite((serverSuite, loginSuite, logoutSuite, defederationSuite,
identitySuite, attributeSuite, logoutSuite))
if __name__ == '__main__':
sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful())