summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/backend_sql.conf3
-rw-r--r--tests/default_fixtures.py7
-rw-r--r--tests/test_auth.py244
-rw-r--r--tests/test_backend.py88
-rw-r--r--tests/test_backend_kvs.py10
-rw-r--r--tests/test_backend_sql.py8
-rw-r--r--tests/test_content_types.py2
-rw-r--r--tests/test_overrides.conf3
-rw-r--r--tests/test_sql_upgrade.py12
-rw-r--r--tests/test_v3.py8
-rw-r--r--tests/test_v3_trust.py286
11 files changed, 667 insertions, 4 deletions
diff --git a/tests/backend_sql.conf b/tests/backend_sql.conf
index e1027cc8..0baf610c 100644
--- a/tests/backend_sql.conf
+++ b/tests/backend_sql.conf
@@ -22,3 +22,6 @@ driver = keystone.catalog.backends.sql.Catalog
[policy]
driver = keystone.policy.backends.sql.Policy
+
+[trust]
+driver = keystone.trust.backends.sql.Trust
diff --git a/tests/default_fixtures.py b/tests/default_fixtures.py
index 4499be17..1141ddfd 100644
--- a/tests/default_fixtures.py
+++ b/tests/default_fixtures.py
@@ -109,5 +109,12 @@ ROLES = [
}, {
'id': 'other',
'name': 'Other',
+ }, {
+ 'id': 'browser',
+ 'name': 'Browser',
+ }, {
+ 'id': 'writer',
+ 'name': 'Writer',
}
+
]
diff --git a/tests/test_auth.py b/tests/test_auth.py
index ae9bdac3..8c8feba0 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -12,23 +12,29 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+import datetime
import time
import uuid
+from keystone import auth
from keystone import config
from keystone import exception
from keystone.openstack.common import timeutils
from keystone import test
from keystone import token
+from keystone import trust
import default_fixtures
CONF = config.CONF
+TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
def _build_user_auth(token=None, user_id=None, username=None,
- password=None, tenant_id=None, tenant_name=None):
+ password=None, tenant_id=None, tenant_name=None,
+ trust_id=None):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
@@ -49,6 +55,8 @@ def _build_user_auth(token=None, user_id=None, username=None,
auth_json['tenantName'] = tenant_name
if tenant_id is not None:
auth_json['tenantId'] = tenant_id
+ if trust_id is not None:
+ auth_json['trust_id'] = trust_id
return auth_json
@@ -452,6 +460,240 @@ class AuthWithRemoteUser(AuthTest):
body_dict)
+class AuthWithTrust(AuthTest):
+ def setUp(self):
+ super(AuthWithTrust, self).setUp()
+
+ trust.Manager()
+ self.trust_controller = trust.controllers.TrustV3()
+ self.auth_v3_controller = auth.controllers.Auth()
+ self.trustor = self.user_foo
+ self.trustee = self.user_two
+ self.assigned_roles = [self.role_member['id'],
+ self.role_browser['id']]
+ for assigned_role in self.assigned_roles:
+ self.identity_api.add_role_to_user_and_project(
+ self.trustor['id'], self.tenant_bar['id'], assigned_role)
+
+ self.sample_data = {'trustor_user_id': self.trustor['id'],
+ 'trustee_user_id': self.trustee['id'],
+ 'project_id': self.tenant_bar['id'],
+ 'impersonation': 'True',
+ 'roles': [{'id': self.role_browser['id']},
+ {'name': self.role_member['name']}]}
+ expires_at = timeutils.strtime(timeutils.utcnow() +
+ datetime.timedelta(minutes=10),
+ fmt=TIME_FORMAT)
+ self.create_trust(expires_at=expires_at)
+
+ def create_trust(self, expires_at=None, impersonation='True'):
+ username = self.trustor['name'],
+ password = 'foo2'
+ body_dict = _build_user_auth(username=username, password=password)
+ self.unscoped_token = self.controller.authenticate({}, body_dict)
+ context = {'token_id': self.unscoped_token['access']['token']['id']}
+ trust_data = copy.deepcopy(self.sample_data)
+ trust_data['expires_at'] = expires_at
+ trust_data['impersonation'] = impersonation
+
+ self.new_trust = (self.trust_controller.create_trust
+ (context, trust=trust_data)['trust'])
+
+ def build_v2_token_request(self, username, password):
+ body_dict = _build_user_auth(username=username, password=password)
+ self.unscoped_token = self.controller.authenticate({}, body_dict)
+ unscoped_token_id = self.unscoped_token['access']['token']['id']
+ request_body = _build_user_auth(token={'id': unscoped_token_id},
+ trust_id=self.new_trust['id'],
+ tenant_id=self.tenant_bar['id'])
+ return request_body
+
+ def test_create_trust_bad_data_fails(self):
+ context = {'token_id': self.unscoped_token['access']['token']['id']}
+ bad_sample_data = {'trustor_user_id': self.trustor['id']}
+
+ self.assertRaises(exception.ValidationError,
+ self.trust_controller.create_trust,
+ context, trust=bad_sample_data)
+
+ def test_create_trust_no_roles(self):
+ self.new_trust = None
+ self.sample_data['roles'] = []
+ self.create_trust()
+ self.assertEquals(self.new_trust['roles'], [])
+
+ def test_create_trust(self):
+ self.assertEquals(self.new_trust['trustor_user_id'],
+ self.trustor['id'])
+ self.assertEquals(self.new_trust['trustee_user_id'],
+ self.trustee['id'])
+ role_ids = [self.role_browser['id'], self.role_member['id']]
+ self.assertTrue(timeutils.parse_strtime(self.new_trust['expires_at'],
+ fmt=TIME_FORMAT))
+
+ for role in self.new_trust['roles']:
+ self.assertIn(role['id'], role_ids)
+
+ def test_get_trust(self):
+ context = {'token_id': self.unscoped_token['access']['token']['id']}
+ trust = self.trust_controller.get_trust(context,
+ self.new_trust['id'])['trust']
+ self.assertEquals(trust['trustor_user_id'],
+ self.trustor['id'])
+ self.assertEquals(trust['trustee_user_id'],
+ self.trustee['id'])
+ role_ids = [self.role_browser['id'], self.role_member['id']]
+ for role in self.new_trust['roles']:
+ self.assertIn(role['id'], role_ids)
+
+ def test_create_trust_no_impersonation(self):
+ self.create_trust(expires_at=None, impersonation='False')
+ self.assertEquals(self.new_trust['trustor_user_id'],
+ self.trustor['id'])
+ self.assertEquals(self.new_trust['trustee_user_id'],
+ self.trustee['id'])
+ self.assertEquals(self.new_trust['impersonation'],
+ 'False')
+ auth_response = self.fetch_v2_token_from_trust()
+ token_user = auth_response['access']['user']
+ self.assertEquals(token_user['id'],
+ self.new_trust['trustee_user_id'])
+
+ #TODO Endpoints
+
+ def test_token_from_trust_wrong_user_fails(self):
+ new_trust = self.create_trust()
+ request_body = self.build_v2_token_request('FOO', 'foo2')
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def fetch_v2_token_from_trust(self):
+ request_body = self.build_v2_token_request('TWO', 'two2')
+ auth_response = self.controller.authenticate({}, request_body)
+ return auth_response
+
+ def fetch_v3_token_from_trust(self):
+ self.identity_api.create_domain("default",
+ {"name": "default",
+ "id": "default"})
+ v3_password_data = {
+ 'identity': {
+ "methods": ["password"],
+ "password": {
+ "user": {
+ "id": self.trustee["id"],
+ "password": self.trustee["password"]}}
+ },
+ 'scope': {
+ 'project': {
+ 'id': self.tenant_baz['id']}}}
+ auth_response = (self.auth_v3_controller.authenticate_for_token
+ ({}, v3_password_data))
+ token = auth_response.headers['X-Subject-Token']
+
+ v3_req_with_trust = {
+ "identity": {
+ "methods": ["token"],
+ "token": {"id": token}},
+ "scope": {
+ "trust": {"id": self.new_trust['id']}}}
+ token_auth_response = (self.auth_v3_controller.authenticate_for_token
+ ({}, v3_req_with_trust))
+ return token_auth_response
+
+ def test_create_v3_token_from_trust(self):
+ auth_response = self.fetch_v3_token_from_trust()
+
+ trust_token_user = auth_response.json['token']['user']
+ self.assertEquals(trust_token_user['id'], self.trustor['id'])
+
+ trust_token_trust = auth_response.json['token']['trust']
+ self.assertEquals(trust_token_trust['id'], self.new_trust['id'])
+ self.assertEquals(trust_token_trust['trustor_user']['id'],
+ self.trustor['id'])
+ self.assertEquals(trust_token_trust['trustee_user']['id'],
+ self.trustee['id'])
+
+ trust_token_roles = auth_response.json['token']['roles']
+ self.assertEquals(len(trust_token_roles), 2)
+
+ def test_v3_trust_token_get_token_fails(self):
+ auth_response = self.fetch_v3_token_from_trust()
+ trust_token = auth_response.headers['X-Subject-Token']
+ v3_token_data = {
+ "methods": ["token"],
+ "token": {"id": trust_token}
+ }
+ self.assertRaises(
+ exception.Unauthorized,
+ self.auth_v3_controller.authenticate_for_token,
+ {}, v3_token_data)
+
+ def test_token_from_trust(self):
+ auth_response = self.fetch_v2_token_from_trust()
+
+ self.assertIsNotNone(auth_response)
+ self.assertEquals(len(auth_response['access']['metadata']['roles']),
+ 2,
+ "user_foo has three roles, but the token should"
+ " only get the two roles specified in the trust.")
+
+ def test_token_from_trust_cant_get_another_token(self):
+ auth_response = self.fetch_v2_token_from_trust()
+ trust_token_id = auth_response['access']['token']['id']
+ request_body = _build_user_auth(token={'id': trust_token_id},
+ tenant_id=self.tenant_bar['id'])
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_delete_trust_revokes_token(self):
+ context = {'token_id': self.unscoped_token['access']['token']['id']}
+ auth_response = self.fetch_v2_token_from_trust()
+ trust_id = self.new_trust['id']
+ trust_token_id = auth_response['access']['token']['id']
+ tokens = self.token_api.list_tokens(self.trustor['id'],
+ trust_id=trust_id)
+ self.assertEquals(len(tokens), 1)
+ self.trust_controller.delete_trust(context, trust_id=trust_id)
+ tokens = self.token_api.list_tokens(self.trustor['id'],
+ trust_id=trust_id)
+ self.assertEquals(len(tokens), 0)
+
+ def test_token_from_trust_with_no_role_fails(self):
+ for assigned_role in self.assigned_roles:
+ self.identity_api.remove_role_from_user_and_project(
+ self.trustor['id'], self.tenant_bar['id'], assigned_role)
+ request_body = self.build_v2_token_request('TWO', 'two2')
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_expired_trust_get_token_fails(self):
+ expiry = "1999-02-18T10:10:00Z"
+ self.create_trust(expiry)
+ request_body = self.build_v2_token_request('TWO', 'two2')
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_token_from_trust_with_wrong_role_fails(self):
+ self.identity_api.add_role_to_user_and_project(
+ self.trustor['id'],
+ self.tenant_bar['id'],
+ self.role_other['id'])
+ for assigned_role in self.assigned_roles:
+ self.identity_api.remove_role_from_user_and_project(
+ self.trustor['id'], self.tenant_bar['id'], assigned_role)
+
+ request_body = self.build_v2_token_request('TWO', 'two2')
+
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+
class TokenExpirationTest(AuthTest):
def _maintain_token_expiration(self):
"""Token expiration should be maintained after re-auth & validation."""
diff --git a/tests/test_backend.py b/tests/test_backend.py
index 029901eb..1af0822c 100644
--- a/tests/test_backend.py
+++ b/tests/test_backend.py
@@ -29,6 +29,7 @@ from keystone import test
CONF = config.CONF
DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
+TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
class IdentityTests(object):
@@ -1888,12 +1889,14 @@ class TokenTests(object):
self.assertRaises(exception.TokenNotFound,
self.token_api.delete_token, token_id)
- def create_token_sample_data(self, tenant_id=None):
+ def create_token_sample_data(self, tenant_id=None, trust_id=None):
token_id = uuid.uuid4().hex
data = {'id': token_id, 'a': 'b',
'user': {'id': 'testuserid'}}
if tenant_id is not None:
data['tenant'] = {'id': tenant_id, 'name': tenant_id}
+ if trust_id is not None:
+ data['trust_id'] = trust_id
self.token_api.create_token(token_id, data)
return token_id
@@ -1936,6 +1939,13 @@ class TokenTests(object):
self.assertNotIn(token_id3, tokens)
self.assertIn(token_id4, tokens)
+ def test_token_list_trust(self):
+ trust_id = uuid.uuid4().hex
+ token_id5 = self.create_token_sample_data(trust_id=trust_id)
+ tokens = self.token_api.list_tokens('testuserid', trust_id=trust_id)
+ self.assertEquals(len(tokens), 1)
+ self.assertIn(token_id5, tokens)
+
def test_get_token_404(self):
self.assertRaises(exception.TokenNotFound,
self.token_api.get_token,
@@ -1965,7 +1975,7 @@ class TokenTests(object):
data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None,
'user': {'id': 'testuserid'}}
data_ref = self.token_api.create_token(token_id, data)
- self.assertDictEqual(data_ref, data)
+ self.assertIsNotNone(data_ref['expires'])
new_data_ref = self.token_api.get_token(token_id)
self.assertEqual(data_ref, new_data_ref)
@@ -2002,6 +2012,80 @@ class TokenTests(object):
for x in xrange(2)])
+class TrustTests(object):
+ def create_sample_trust(self, new_id):
+ self.trustor = self.user_foo
+ self.trustee = self.user_two
+ trust_data = (self.trust_api.create_trust
+ (new_id,
+ {'trustor_user_id': self.trustor['id'],
+ 'trustee_user_id': self.user_two['id'],
+ 'project_id': self.tenant_bar['id'],
+ 'expires_at': timeutils.
+ parse_isotime('2031-02-18T18:10:00Z'),
+ 'impersonation': True},
+ roles=[{"id": "member"},
+ {"id": "other"},
+ {"id": "browser"}]))
+ return trust_data
+
+ def test_delete_trust(self):
+ new_id = uuid.uuid4().hex
+ trust_data = self.create_sample_trust(new_id)
+ trust_id = trust_data['id']
+ self.assertIsNotNone(trust_data)
+ trust_data = self.trust_api.get_trust(trust_id)
+ self.assertEquals(new_id, trust_data['id'])
+ self.trust_api.delete_trust(trust_id)
+ self.assertIsNone(self.trust_api.get_trust(trust_id))
+
+ def test_get_trust(self):
+ new_id = uuid.uuid4().hex
+ trust_data = self.create_sample_trust(new_id)
+ trust_id = trust_data['id']
+ self.assertIsNotNone(trust_data)
+ trust_data = self.trust_api.get_trust(trust_id)
+ self.assertEquals(new_id, trust_data['id'])
+
+ def test_create_trust(self):
+ new_id = uuid.uuid4().hex
+ trust_data = self.create_sample_trust(new_id)
+
+ self.assertEquals(new_id, trust_data['id'])
+ self.assertEquals(self.trustee['id'], trust_data['trustee_user_id'])
+ self.assertEquals(self.trustor['id'], trust_data['trustor_user_id'])
+ self.assertTrue(timeutils.normalize_time(trust_data['expires_at']) >
+ timeutils.utcnow())
+
+ self.assertEquals([{'id':'member'},
+ {'id': 'other'},
+ {'id': 'browser'}], trust_data['roles'])
+
+ def test_list_trust_by_trustee(self):
+ for i in range(0, 3):
+ trust_data = self.create_sample_trust(uuid.uuid4().hex)
+ trusts = self.trust_api.list_trusts_for_trustee(self.trustee)
+ self.assertEqual(len(trusts), 3)
+ self.assertEqual(trusts[0]["trustee_user_id"], self.trustee['id'])
+ trusts = self.trust_api.list_trusts_for_trustee(self.trustor)
+ self.assertEqual(len(trusts), 0)
+
+ def test_list_trust_by_trustee(self):
+ for i in range(0, 3):
+ trust_data = self.create_sample_trust(uuid.uuid4().hex)
+ trusts = self.trust_api.list_trusts_for_trustor(self.trustor['id'])
+ self.assertEqual(len(trusts), 3)
+ self.assertEqual(trusts[0]["trustor_user_id"], self.trustor['id'])
+ trusts = self.trust_api.list_trusts_for_trustor(self.trustee['id'])
+ self.assertEqual(len(trusts), 0)
+
+ def test_list_trusts(self):
+ for i in range(0, 3):
+ trust_data = self.create_sample_trust(uuid.uuid4().hex)
+ trusts = self.trust_api.list_trusts()
+ self.assertEqual(len(trusts), 3)
+
+
class CommonHelperTests(test.TestCase):
def test_format_helper_raises_malformed_on_missing_key(self):
with self.assertRaises(exception.MalformedEndpoint):
diff --git a/tests/test_backend_kvs.py b/tests/test_backend_kvs.py
index e1d99d47..74b5e4eb 100644
--- a/tests/test_backend_kvs.py
+++ b/tests/test_backend_kvs.py
@@ -22,6 +22,7 @@ from keystone import exception
from keystone.identity.backends import kvs as identity_kvs
from keystone import test
from keystone.token.backends import kvs as token_kvs
+from keystone.trust.backends import kvs as trust_kvs
import default_fixtures
import test_backend
@@ -71,6 +72,15 @@ class KvsToken(test.TestCase, test_backend.TokenTests):
self.token_api = token_kvs.Token(db={})
+class KvsTrust(test.TestCase, test_backend.TrustTests):
+ def setUp(self):
+ super(KvsTrust, self).setUp()
+ self.trust_api = trust_kvs.Trust(db={})
+ self.identity_api = identity_kvs.Identity(db={})
+ self.catalog_api = catalog_kvs.Catalog(db={})
+ self.load_fixtures(default_fixtures)
+
+
class KvsCatalog(test.TestCase, test_backend.CatalogTests):
def setUp(self):
super(KvsCatalog, self).setUp()
diff --git a/tests/test_backend_sql.py b/tests/test_backend_sql.py
index a4be85e3..04310307 100644
--- a/tests/test_backend_sql.py
+++ b/tests/test_backend_sql.py
@@ -24,6 +24,8 @@ from keystone import identity
from keystone import policy
from keystone import test
from keystone import token
+from keystone import trust
+
import default_fixtures
import test_backend
@@ -43,6 +45,7 @@ class SqlTests(test.TestCase):
self.catalog_man = catalog.Manager()
self.identity_man = identity.Manager()
self.token_man = token.Manager()
+ self.trust_man = trust.Manager()
self.policy_man = policy.Manager()
# create shortcut references to each driver
@@ -50,6 +53,7 @@ class SqlTests(test.TestCase):
self.identity_api = self.identity_man.driver
self.token_api = self.token_man.driver
self.policy_api = self.policy_man.driver
+ self.trust_api = self.trust_man.driver
# populate the engine with tables & fixtures
self.load_fixtures(default_fixtures)
@@ -221,6 +225,10 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key])
+class SqlTrust(SqlTests, test_backend.TrustTests):
+ pass
+
+
class SqlToken(SqlTests, test_backend.TokenTests):
pass
diff --git a/tests/test_content_types.py b/tests/test_content_types.py
index 91b61661..e0b42db9 100644
--- a/tests/test_content_types.py
+++ b/tests/test_content_types.py
@@ -90,7 +90,7 @@ class RestfulTestCase(test.TestCase):
# Initialize headers dictionary
headers = {} if not headers else headers
- connection = httplib.HTTPConnection(host, port, timeout=10)
+ connection = httplib.HTTPConnection(host, port, timeout=100000)
# Perform the request
connection.request(method, path, body, headers)
diff --git a/tests/test_overrides.conf b/tests/test_overrides.conf
index 48f5dd7f..0e41fd32 100644
--- a/tests/test_overrides.conf
+++ b/tests/test_overrides.conf
@@ -8,6 +8,9 @@ driver = keystone.identity.backends.kvs.Identity
driver = keystone.catalog.backends.templated.TemplatedCatalog
template_file = default_catalog.templates
+[trust]
+driver = keystone.trust.backends.kvs.Trust
+
[signing]
certfile = ../examples/pki/certs/signing_cert.pem
keyfile = ../examples/pki/private/signing_key.pem
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py
index 85ea7580..003fb4ec 100644
--- a/tests/test_sql_upgrade.py
+++ b/tests/test_sql_upgrade.py
@@ -526,6 +526,18 @@ class SqlUpgradeTests(test.TestCase):
cmd = this_table.delete(id=project['id'])
self.engine.execute(cmd)
+ def test_upgrade_trusts(self):
+ self.assertEqual(self.schema.version, 0, "DB is at version 0")
+ self.upgrade(18)
+ self.assertTableColumns("trust",
+ ["id", "trustor_user_id",
+ "trustee_user_id",
+ "project_id", "impersonation",
+ "deleted_at",
+ "expires_at", "extra"])
+ self.assertTableColumns("trust_role",
+ ["trust_id", "role_id"])
+
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user
diff --git a/tests/test_v3.py b/tests/test_v3.py
index a0252af0..2aa2c2b5 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -7,6 +7,8 @@ from keystone.common.sql import util as sql_util
from keystone import auth
from keystone import test
from keystone import config
+from keystone.policy.backends import rules
+
import test_content_types
@@ -16,11 +18,14 @@ CONF = config.CONF
class RestfulTestCase(test_content_types.RestfulTestCase):
def setUp(self):
+ rules.reset()
+
self.config([
test.etcdir('keystone.conf.sample'),
test.testsdir('test_overrides.conf'),
test.testsdir('backend_sql.conf'),
test.testsdir('backend_sql_disk.conf')])
+
sql_util.setup_test_database()
self.load_backends()
@@ -62,6 +67,9 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
sql_util.teardown_test_database()
# need to reset the plug-ins
auth.controllers.AUTH_METHODS = {}
+ #drop the policy rules
+ CONF.reset()
+ rules.reset()
def new_ref(self):
"""Populates a ref with attributes common to all API entities."""
diff --git a/tests/test_v3_trust.py b/tests/test_v3_trust.py
new file mode 100644
index 00000000..19d645d8
--- /dev/null
+++ b/tests/test_v3_trust.py
@@ -0,0 +1,286 @@
+import copy
+import uuid
+import test_v3
+import json
+
+from keystone import config
+from keystone.common.sql import util as sql_util
+from keystone import test
+
+import test_content_types
+
+
+CONF = config.CONF
+DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
+
+
+class TrustTestCase(test_v3.RestfulTestCase):
+ def setUp(self):
+ super(TrustTestCase, self).setUp()
+ self.domain = None
+ self.password = 'freeipa4all'
+ self.auth_url = '/v2.0'
+ self.admin_url = '/v2.0'
+ self.admin_url_v3 = '/v3'
+ self.url_template = "%(auth_url)s/%(resource)s"
+ self.headers = {'Content-type': 'application/json'}
+ self.trustor = self.create_user()
+ self.trustee = self.create_user()
+ self.role_1 = self.create_role()
+ self.role_2 = self.create_role()
+ self.grant_role_to_user(self.trustor['id'],
+ self.role_1['id'],
+ self.get_project()['id'])
+ self.grant_role_to_user(self.trustor['id'],
+ self.role_2['id'],
+ self.get_project()['id'])
+
+ def v3_request(self, path, data):
+ r = self.request(method='POST',
+ path=path,
+ body=data,
+ headers=self.headers)
+ return r
+
+ def get_unscoped_token_response(self, username, password):
+ url = self.url_template % {'auth_url': self.admin_url,
+ 'resource': "tokens"}
+ data = self.get_unscoped_auth(username=username, password=password)
+ r = self.restful_request(method='POST',
+ port=self._public_port(),
+ path=url,
+ body=data,
+ headers=self.headers)
+ if 'access' in r.body:
+ return r.body['access']
+ raise Exception(r)
+
+ def get_scoped_token_response(self, username, password, project_name):
+ url = self.url_template % {'auth_url': self.admin_url,
+ 'resource': "tokens"}
+ data = self.get_scoped_auth(username, password, project_name)
+ r = self.restful_request(method='POST',
+ port=self._public_port(),
+ path=url,
+ body=data,
+ headers=self.headers)
+ if 'access' in r.body:
+ return r.body['access']
+ raise Exception(r)
+
+ def get_admin_token_data(self):
+ if not hasattr(self, 'admin_token_response'):
+ self.admin_token_response = self.get_scoped_token_response(
+ 'admin', 'freeipa4all', 'demo')
+ return self.admin_token_response
+
+ def get_admin_token_id(self):
+ return 'ADMIN'
+
+ def make_admin_post_request(self, resource, data):
+ return self.make_post_request(resource,
+ data,
+ self.get_admin_token_id())
+
+ def make_post_request(self, resource, data, token_id):
+ headers = copy.copy(self.headers)
+ headers["x-auth-token"] = token_id
+ url = self.url_template % {'auth_url': self.admin_url_v3,
+ 'resource': resource}
+ r = self.restful_request(method='POST',
+ path=url,
+ port=self._admin_port(),
+ body=data,
+ headers=headers)
+ return r
+
+ def make_v2_post_request(self, resource, data, token_id):
+ headers = copy.copy(self.headers)
+ headers["x-auth-token"] = token_id
+ url = self.url_template % {'auth_url': self.admin_url,
+ 'resource': resource}
+ r = self.restful_request(method='POST',
+ path=url,
+ port=self._admin_port(),
+ body=data,
+ headers=headers)
+ return r
+
+ def make_put_request(self, resource, data, token_id):
+ headers = copy.copy(self.headers)
+ headers["x-auth-token"] = self.get_admin_token_id()
+ url = self.url_template % {'auth_url': self.admin_url_v3,
+ 'resource': resource}
+ r = self.request(method='PUT',
+ path=url,
+ port=self._admin_port(),
+ body=json.dumps(data),
+ headers=headers)
+ return r
+
+ def create_domain(self):
+ domain = self.new_domain_ref()
+ resource = 'domains'
+ data = {'domain': domain}
+ r = self.make_admin_post_request(resource, data)
+ dom = r.body['domain']
+ self.domain = dom
+
+ def create_project(self):
+ project = self.new_project_ref(
+ domain_id=self.get_domain()['id'])
+ data = {'project': project}
+ r = self.make_admin_post_request('projects', data)
+ self.project = r.body['project']
+
+ def get_domain(self):
+ if not self.domain:
+ #once authenticate supports domains, use the following function
+# self.create_domain()
+ self.domain = {'id': DEFAULT_DOMAIN_ID}
+ return self.domain
+
+ def get_project(self):
+ if not hasattr(self, 'project'):
+ self.create_project()
+ return self.project
+
+ def create_user(self):
+ user_id = uuid.uuid4().hex
+ user = {'user': {'name': uuid.uuid4().hex,
+ 'password': self.password,
+ 'enabled': True,
+ 'domain_id': self.get_domain()['id'],
+ 'project_id': self.get_project()['id']}}
+ r = self.make_admin_post_request('users', user)
+ return r.body['user']
+
+ def create_role(self):
+ ref = self.new_role_ref()
+ body = {'role': ref}
+ r = self.make_admin_post_request('roles', body)
+ return r.body['role']
+
+ def grant_role_to_user(self, user_id, role_id, project_id):
+ """PUT /projects/{project_id}/users/{user_id}/roles/{role_id}"""
+ url_template = 'projects/%(project_id)s/users'\
+ '/%(user_id)s/roles/%(role_id)s'
+ url = url_template % {'project_id': project_id,
+ 'user_id': user_id,
+ 'role_id': role_id}
+ r = self.make_put_request(url, '', self.get_admin_token_id())
+ return r
+
+ def get_scoped_auth(self, username, password, project_name):
+ return {"auth":
+ {"passwordCredentials": {"username": username,
+ "password": password},
+ "projectName": project_name}}
+
+ def get_unscoped_auth(self, username, password):
+ return {"auth":
+ {"passwordCredentials": {"username": username,
+ "password": password}}}
+
+ def create_trust(self, impersonation=True):
+ trustor_token = self.get_scoped_token_response(
+ self.trustor['name'],
+ self.password,
+ self.get_project()['name'])
+ trustee_token = self.get_unscoped_token_response(self.trustee['name'],
+ self.password)
+ trust_request = {'trust':
+ {'trustor_user_id': self.trustor['id'],
+ 'trustee_user_id': self.trustee['id'],
+ 'project_id': self.get_project()['id'],
+ 'impersonation': impersonation,
+ 'description': 'described',
+ 'roles': []}}
+ trust_response = self.make_post_request('trusts', trust_request,
+ trustor_token['token']['id'])
+ return trust_response, trustee_token
+
+ def test_create_trust(self):
+ trust_response, trustee_token = self.create_trust()
+ trust_id = trust_response.body['trust']['id']
+ self.assertEquals(trust_response.body['trust']['description'],
+ 'described')
+ auth_data = {"auth": {"token": {'id': trustee_token['token']['id']},
+ "trust_id": trust_id}}
+ r = self.make_v2_post_request("tokens",
+ auth_data,
+ trustee_token['token']['id'])
+ trust_token = r.body
+ self.assertIsNotNone(trust_token['access']['token']['id'])
+ self.assertEquals(trust_token['access']['trust']['trustee_user_id'],
+ self.trustee['id'])
+ self.assertEquals(trust_token['access']['trust']['id'], trust_id)
+
+ def test_delete_trust(self):
+ trust_response, trustee_token = self.create_trust()
+ url = self.url_template % {'auth_url': self.admin_url_v3,
+ 'resource': "trusts/"}
+ url += trust_response.body['trust']['id']
+ trustor_token = self.get_scoped_token_response(
+ self.trustor['name'],
+ self.password,
+ self.get_project()['name'])
+
+ headers = copy.copy(self.headers)
+ headers["x-auth-token"] = trustor_token['token']['id']
+ response = self.request(method='DELETE',
+ path=url,
+ port=self._public_port(),
+ body="",
+ headers=headers)
+ self.assertIsNotNone(response)
+
+ def test_list_trusts(self):
+ trustor_token = self.get_scoped_token_response(
+ self.trustor['name'],
+ self.password,
+ self.get_project()['name'])
+
+ for i in range(0, 3):
+ trust_response, trustee_token = self.create_trust()
+ url = self.url_template % {'auth_url': self.admin_url_v3,
+ 'resource': "trusts"}
+ headers = copy.copy(self.headers)
+ headers["x-auth-token"] = self.get_admin_token_id()
+ trust_lists_response = self.restful_request(method='GET',
+ path=url,
+ port=self._public_port(),
+ body="",
+ headers=headers)
+ trusts = trust_lists_response.body['trusts']
+ self.assertEqual(len(trusts), 3)
+
+ trustee_url = url + "?trustee_user_id=" + self.trustee['id']
+ headers["x-auth-token"] = trustee_token['token']['id']
+ trust_lists_response = self.restful_request(
+ method='GET', path=trustee_url, port=self._public_port(),
+ body="", headers=headers)
+ trusts = trust_lists_response.body['trusts']
+ self.assertEqual(len(trusts), 3)
+
+ headers["x-auth-token"] = trustor_token['token']['id']
+
+ trust_lists_response = self.restful_request(
+ method='GET', path=trustee_url, port=self._public_port(),
+ body="", headers=headers, expected_status=403)
+
+ trustor_url = url + "?trustor_user_id=" + self.trustor['id']
+ headers["x-auth-token"] = trustor_token['token']['id']
+ trust_lists_response = self.restful_request(
+ method='GET',
+ path=trustor_url,
+ port=self._public_port(),
+ body="",
+ headers=headers)
+ trusts = trust_lists_response.body['trusts']
+ self.assertEqual(len(trusts), 3)
+
+ headers["x-auth-token"] = trustee_token['token']['id']
+ trust_lists_response = self.restful_request(
+ method='GET', path=trustor_url, port=self._public_port(),
+ body="", headers=headers, expected_status=403)