diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/backend_sql.conf | 3 | ||||
-rw-r--r-- | tests/default_fixtures.py | 7 | ||||
-rw-r--r-- | tests/test_auth.py | 244 | ||||
-rw-r--r-- | tests/test_backend.py | 88 | ||||
-rw-r--r-- | tests/test_backend_kvs.py | 10 | ||||
-rw-r--r-- | tests/test_backend_sql.py | 8 | ||||
-rw-r--r-- | tests/test_content_types.py | 2 | ||||
-rw-r--r-- | tests/test_overrides.conf | 3 | ||||
-rw-r--r-- | tests/test_sql_upgrade.py | 12 | ||||
-rw-r--r-- | tests/test_v3.py | 8 | ||||
-rw-r--r-- | tests/test_v3_trust.py | 286 |
11 files changed, 667 insertions, 4 deletions
diff --git a/tests/backend_sql.conf b/tests/backend_sql.conf index e1027cc8..0baf610c 100644 --- a/tests/backend_sql.conf +++ b/tests/backend_sql.conf @@ -22,3 +22,6 @@ driver = keystone.catalog.backends.sql.Catalog [policy] driver = keystone.policy.backends.sql.Policy + +[trust] +driver = keystone.trust.backends.sql.Trust diff --git a/tests/default_fixtures.py b/tests/default_fixtures.py index 4499be17..1141ddfd 100644 --- a/tests/default_fixtures.py +++ b/tests/default_fixtures.py @@ -109,5 +109,12 @@ ROLES = [ }, { 'id': 'other', 'name': 'Other', + }, { + 'id': 'browser', + 'name': 'Browser', + }, { + 'id': 'writer', + 'name': 'Writer', } + ] diff --git a/tests/test_auth.py b/tests/test_auth.py index ae9bdac3..8c8feba0 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -12,23 +12,29 @@ # License for the specific language governing permissions and limitations # under the License. +import copy +import datetime import time import uuid +from keystone import auth from keystone import config from keystone import exception from keystone.openstack.common import timeutils from keystone import test from keystone import token +from keystone import trust import default_fixtures CONF = config.CONF +TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' def _build_user_auth(token=None, user_id=None, username=None, - password=None, tenant_id=None, tenant_name=None): + password=None, tenant_id=None, tenant_name=None, + trust_id=None): """Build auth dictionary. It will create an auth dictionary based on all the arguments @@ -49,6 +55,8 @@ def _build_user_auth(token=None, user_id=None, username=None, auth_json['tenantName'] = tenant_name if tenant_id is not None: auth_json['tenantId'] = tenant_id + if trust_id is not None: + auth_json['trust_id'] = trust_id return auth_json @@ -452,6 +460,240 @@ class AuthWithRemoteUser(AuthTest): body_dict) +class AuthWithTrust(AuthTest): + def setUp(self): + super(AuthWithTrust, self).setUp() + + trust.Manager() + self.trust_controller = trust.controllers.TrustV3() + self.auth_v3_controller = auth.controllers.Auth() + self.trustor = self.user_foo + self.trustee = self.user_two + self.assigned_roles = [self.role_member['id'], + self.role_browser['id']] + for assigned_role in self.assigned_roles: + self.identity_api.add_role_to_user_and_project( + self.trustor['id'], self.tenant_bar['id'], assigned_role) + + self.sample_data = {'trustor_user_id': self.trustor['id'], + 'trustee_user_id': self.trustee['id'], + 'project_id': self.tenant_bar['id'], + 'impersonation': 'True', + 'roles': [{'id': self.role_browser['id']}, + {'name': self.role_member['name']}]} + expires_at = timeutils.strtime(timeutils.utcnow() + + datetime.timedelta(minutes=10), + fmt=TIME_FORMAT) + self.create_trust(expires_at=expires_at) + + def create_trust(self, expires_at=None, impersonation='True'): + username = self.trustor['name'], + password = 'foo2' + body_dict = _build_user_auth(username=username, password=password) + self.unscoped_token = self.controller.authenticate({}, body_dict) + context = {'token_id': self.unscoped_token['access']['token']['id']} + trust_data = copy.deepcopy(self.sample_data) + trust_data['expires_at'] = expires_at + trust_data['impersonation'] = impersonation + + self.new_trust = (self.trust_controller.create_trust + (context, trust=trust_data)['trust']) + + def build_v2_token_request(self, username, password): + body_dict = _build_user_auth(username=username, password=password) + self.unscoped_token = self.controller.authenticate({}, body_dict) + unscoped_token_id = self.unscoped_token['access']['token']['id'] + request_body = _build_user_auth(token={'id': unscoped_token_id}, + trust_id=self.new_trust['id'], + tenant_id=self.tenant_bar['id']) + return request_body + + def test_create_trust_bad_data_fails(self): + context = {'token_id': self.unscoped_token['access']['token']['id']} + bad_sample_data = {'trustor_user_id': self.trustor['id']} + + self.assertRaises(exception.ValidationError, + self.trust_controller.create_trust, + context, trust=bad_sample_data) + + def test_create_trust_no_roles(self): + self.new_trust = None + self.sample_data['roles'] = [] + self.create_trust() + self.assertEquals(self.new_trust['roles'], []) + + def test_create_trust(self): + self.assertEquals(self.new_trust['trustor_user_id'], + self.trustor['id']) + self.assertEquals(self.new_trust['trustee_user_id'], + self.trustee['id']) + role_ids = [self.role_browser['id'], self.role_member['id']] + self.assertTrue(timeutils.parse_strtime(self.new_trust['expires_at'], + fmt=TIME_FORMAT)) + + for role in self.new_trust['roles']: + self.assertIn(role['id'], role_ids) + + def test_get_trust(self): + context = {'token_id': self.unscoped_token['access']['token']['id']} + trust = self.trust_controller.get_trust(context, + self.new_trust['id'])['trust'] + self.assertEquals(trust['trustor_user_id'], + self.trustor['id']) + self.assertEquals(trust['trustee_user_id'], + self.trustee['id']) + role_ids = [self.role_browser['id'], self.role_member['id']] + for role in self.new_trust['roles']: + self.assertIn(role['id'], role_ids) + + def test_create_trust_no_impersonation(self): + self.create_trust(expires_at=None, impersonation='False') + self.assertEquals(self.new_trust['trustor_user_id'], + self.trustor['id']) + self.assertEquals(self.new_trust['trustee_user_id'], + self.trustee['id']) + self.assertEquals(self.new_trust['impersonation'], + 'False') + auth_response = self.fetch_v2_token_from_trust() + token_user = auth_response['access']['user'] + self.assertEquals(token_user['id'], + self.new_trust['trustee_user_id']) + + #TODO Endpoints + + def test_token_from_trust_wrong_user_fails(self): + new_trust = self.create_trust() + request_body = self.build_v2_token_request('FOO', 'foo2') + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def fetch_v2_token_from_trust(self): + request_body = self.build_v2_token_request('TWO', 'two2') + auth_response = self.controller.authenticate({}, request_body) + return auth_response + + def fetch_v3_token_from_trust(self): + self.identity_api.create_domain("default", + {"name": "default", + "id": "default"}) + v3_password_data = { + 'identity': { + "methods": ["password"], + "password": { + "user": { + "id": self.trustee["id"], + "password": self.trustee["password"]}} + }, + 'scope': { + 'project': { + 'id': self.tenant_baz['id']}}} + auth_response = (self.auth_v3_controller.authenticate_for_token + ({}, v3_password_data)) + token = auth_response.headers['X-Subject-Token'] + + v3_req_with_trust = { + "identity": { + "methods": ["token"], + "token": {"id": token}}, + "scope": { + "trust": {"id": self.new_trust['id']}}} + token_auth_response = (self.auth_v3_controller.authenticate_for_token + ({}, v3_req_with_trust)) + return token_auth_response + + def test_create_v3_token_from_trust(self): + auth_response = self.fetch_v3_token_from_trust() + + trust_token_user = auth_response.json['token']['user'] + self.assertEquals(trust_token_user['id'], self.trustor['id']) + + trust_token_trust = auth_response.json['token']['trust'] + self.assertEquals(trust_token_trust['id'], self.new_trust['id']) + self.assertEquals(trust_token_trust['trustor_user']['id'], + self.trustor['id']) + self.assertEquals(trust_token_trust['trustee_user']['id'], + self.trustee['id']) + + trust_token_roles = auth_response.json['token']['roles'] + self.assertEquals(len(trust_token_roles), 2) + + def test_v3_trust_token_get_token_fails(self): + auth_response = self.fetch_v3_token_from_trust() + trust_token = auth_response.headers['X-Subject-Token'] + v3_token_data = { + "methods": ["token"], + "token": {"id": trust_token} + } + self.assertRaises( + exception.Unauthorized, + self.auth_v3_controller.authenticate_for_token, + {}, v3_token_data) + + def test_token_from_trust(self): + auth_response = self.fetch_v2_token_from_trust() + + self.assertIsNotNone(auth_response) + self.assertEquals(len(auth_response['access']['metadata']['roles']), + 2, + "user_foo has three roles, but the token should" + " only get the two roles specified in the trust.") + + def test_token_from_trust_cant_get_another_token(self): + auth_response = self.fetch_v2_token_from_trust() + trust_token_id = auth_response['access']['token']['id'] + request_body = _build_user_auth(token={'id': trust_token_id}, + tenant_id=self.tenant_bar['id']) + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_delete_trust_revokes_token(self): + context = {'token_id': self.unscoped_token['access']['token']['id']} + auth_response = self.fetch_v2_token_from_trust() + trust_id = self.new_trust['id'] + trust_token_id = auth_response['access']['token']['id'] + tokens = self.token_api.list_tokens(self.trustor['id'], + trust_id=trust_id) + self.assertEquals(len(tokens), 1) + self.trust_controller.delete_trust(context, trust_id=trust_id) + tokens = self.token_api.list_tokens(self.trustor['id'], + trust_id=trust_id) + self.assertEquals(len(tokens), 0) + + def test_token_from_trust_with_no_role_fails(self): + for assigned_role in self.assigned_roles: + self.identity_api.remove_role_from_user_and_project( + self.trustor['id'], self.tenant_bar['id'], assigned_role) + request_body = self.build_v2_token_request('TWO', 'two2') + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_expired_trust_get_token_fails(self): + expiry = "1999-02-18T10:10:00Z" + self.create_trust(expiry) + request_body = self.build_v2_token_request('TWO', 'two2') + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_token_from_trust_with_wrong_role_fails(self): + self.identity_api.add_role_to_user_and_project( + self.trustor['id'], + self.tenant_bar['id'], + self.role_other['id']) + for assigned_role in self.assigned_roles: + self.identity_api.remove_role_from_user_and_project( + self.trustor['id'], self.tenant_bar['id'], assigned_role) + + request_body = self.build_v2_token_request('TWO', 'two2') + + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + class TokenExpirationTest(AuthTest): def _maintain_token_expiration(self): """Token expiration should be maintained after re-auth & validation.""" diff --git a/tests/test_backend.py b/tests/test_backend.py index 029901eb..1af0822c 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -29,6 +29,7 @@ from keystone import test CONF = config.CONF DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id +TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' class IdentityTests(object): @@ -1888,12 +1889,14 @@ class TokenTests(object): self.assertRaises(exception.TokenNotFound, self.token_api.delete_token, token_id) - def create_token_sample_data(self, tenant_id=None): + def create_token_sample_data(self, tenant_id=None, trust_id=None): token_id = uuid.uuid4().hex data = {'id': token_id, 'a': 'b', 'user': {'id': 'testuserid'}} if tenant_id is not None: data['tenant'] = {'id': tenant_id, 'name': tenant_id} + if trust_id is not None: + data['trust_id'] = trust_id self.token_api.create_token(token_id, data) return token_id @@ -1936,6 +1939,13 @@ class TokenTests(object): self.assertNotIn(token_id3, tokens) self.assertIn(token_id4, tokens) + def test_token_list_trust(self): + trust_id = uuid.uuid4().hex + token_id5 = self.create_token_sample_data(trust_id=trust_id) + tokens = self.token_api.list_tokens('testuserid', trust_id=trust_id) + self.assertEquals(len(tokens), 1) + self.assertIn(token_id5, tokens) + def test_get_token_404(self): self.assertRaises(exception.TokenNotFound, self.token_api.get_token, @@ -1965,7 +1975,7 @@ class TokenTests(object): data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None, 'user': {'id': 'testuserid'}} data_ref = self.token_api.create_token(token_id, data) - self.assertDictEqual(data_ref, data) + self.assertIsNotNone(data_ref['expires']) new_data_ref = self.token_api.get_token(token_id) self.assertEqual(data_ref, new_data_ref) @@ -2002,6 +2012,80 @@ class TokenTests(object): for x in xrange(2)]) +class TrustTests(object): + def create_sample_trust(self, new_id): + self.trustor = self.user_foo + self.trustee = self.user_two + trust_data = (self.trust_api.create_trust + (new_id, + {'trustor_user_id': self.trustor['id'], + 'trustee_user_id': self.user_two['id'], + 'project_id': self.tenant_bar['id'], + 'expires_at': timeutils. + parse_isotime('2031-02-18T18:10:00Z'), + 'impersonation': True}, + roles=[{"id": "member"}, + {"id": "other"}, + {"id": "browser"}])) + return trust_data + + def test_delete_trust(self): + new_id = uuid.uuid4().hex + trust_data = self.create_sample_trust(new_id) + trust_id = trust_data['id'] + self.assertIsNotNone(trust_data) + trust_data = self.trust_api.get_trust(trust_id) + self.assertEquals(new_id, trust_data['id']) + self.trust_api.delete_trust(trust_id) + self.assertIsNone(self.trust_api.get_trust(trust_id)) + + def test_get_trust(self): + new_id = uuid.uuid4().hex + trust_data = self.create_sample_trust(new_id) + trust_id = trust_data['id'] + self.assertIsNotNone(trust_data) + trust_data = self.trust_api.get_trust(trust_id) + self.assertEquals(new_id, trust_data['id']) + + def test_create_trust(self): + new_id = uuid.uuid4().hex + trust_data = self.create_sample_trust(new_id) + + self.assertEquals(new_id, trust_data['id']) + self.assertEquals(self.trustee['id'], trust_data['trustee_user_id']) + self.assertEquals(self.trustor['id'], trust_data['trustor_user_id']) + self.assertTrue(timeutils.normalize_time(trust_data['expires_at']) > + timeutils.utcnow()) + + self.assertEquals([{'id':'member'}, + {'id': 'other'}, + {'id': 'browser'}], trust_data['roles']) + + def test_list_trust_by_trustee(self): + for i in range(0, 3): + trust_data = self.create_sample_trust(uuid.uuid4().hex) + trusts = self.trust_api.list_trusts_for_trustee(self.trustee) + self.assertEqual(len(trusts), 3) + self.assertEqual(trusts[0]["trustee_user_id"], self.trustee['id']) + trusts = self.trust_api.list_trusts_for_trustee(self.trustor) + self.assertEqual(len(trusts), 0) + + def test_list_trust_by_trustee(self): + for i in range(0, 3): + trust_data = self.create_sample_trust(uuid.uuid4().hex) + trusts = self.trust_api.list_trusts_for_trustor(self.trustor['id']) + self.assertEqual(len(trusts), 3) + self.assertEqual(trusts[0]["trustor_user_id"], self.trustor['id']) + trusts = self.trust_api.list_trusts_for_trustor(self.trustee['id']) + self.assertEqual(len(trusts), 0) + + def test_list_trusts(self): + for i in range(0, 3): + trust_data = self.create_sample_trust(uuid.uuid4().hex) + trusts = self.trust_api.list_trusts() + self.assertEqual(len(trusts), 3) + + class CommonHelperTests(test.TestCase): def test_format_helper_raises_malformed_on_missing_key(self): with self.assertRaises(exception.MalformedEndpoint): diff --git a/tests/test_backend_kvs.py b/tests/test_backend_kvs.py index e1d99d47..74b5e4eb 100644 --- a/tests/test_backend_kvs.py +++ b/tests/test_backend_kvs.py @@ -22,6 +22,7 @@ from keystone import exception from keystone.identity.backends import kvs as identity_kvs from keystone import test from keystone.token.backends import kvs as token_kvs +from keystone.trust.backends import kvs as trust_kvs import default_fixtures import test_backend @@ -71,6 +72,15 @@ class KvsToken(test.TestCase, test_backend.TokenTests): self.token_api = token_kvs.Token(db={}) +class KvsTrust(test.TestCase, test_backend.TrustTests): + def setUp(self): + super(KvsTrust, self).setUp() + self.trust_api = trust_kvs.Trust(db={}) + self.identity_api = identity_kvs.Identity(db={}) + self.catalog_api = catalog_kvs.Catalog(db={}) + self.load_fixtures(default_fixtures) + + class KvsCatalog(test.TestCase, test_backend.CatalogTests): def setUp(self): super(KvsCatalog, self).setUp() diff --git a/tests/test_backend_sql.py b/tests/test_backend_sql.py index a4be85e3..04310307 100644 --- a/tests/test_backend_sql.py +++ b/tests/test_backend_sql.py @@ -24,6 +24,8 @@ from keystone import identity from keystone import policy from keystone import test from keystone import token +from keystone import trust + import default_fixtures import test_backend @@ -43,6 +45,7 @@ class SqlTests(test.TestCase): self.catalog_man = catalog.Manager() self.identity_man = identity.Manager() self.token_man = token.Manager() + self.trust_man = trust.Manager() self.policy_man = policy.Manager() # create shortcut references to each driver @@ -50,6 +53,7 @@ class SqlTests(test.TestCase): self.identity_api = self.identity_man.driver self.token_api = self.token_man.driver self.policy_api = self.policy_man.driver + self.trust_api = self.trust_man.driver # populate the engine with tables & fixtures self.load_fixtures(default_fixtures) @@ -221,6 +225,10 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key]) +class SqlTrust(SqlTests, test_backend.TrustTests): + pass + + class SqlToken(SqlTests, test_backend.TokenTests): pass diff --git a/tests/test_content_types.py b/tests/test_content_types.py index 91b61661..e0b42db9 100644 --- a/tests/test_content_types.py +++ b/tests/test_content_types.py @@ -90,7 +90,7 @@ class RestfulTestCase(test.TestCase): # Initialize headers dictionary headers = {} if not headers else headers - connection = httplib.HTTPConnection(host, port, timeout=10) + connection = httplib.HTTPConnection(host, port, timeout=100000) # Perform the request connection.request(method, path, body, headers) diff --git a/tests/test_overrides.conf b/tests/test_overrides.conf index 48f5dd7f..0e41fd32 100644 --- a/tests/test_overrides.conf +++ b/tests/test_overrides.conf @@ -8,6 +8,9 @@ driver = keystone.identity.backends.kvs.Identity driver = keystone.catalog.backends.templated.TemplatedCatalog template_file = default_catalog.templates +[trust] +driver = keystone.trust.backends.kvs.Trust + [signing] certfile = ../examples/pki/certs/signing_cert.pem keyfile = ../examples/pki/private/signing_key.pem diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py index 85ea7580..003fb4ec 100644 --- a/tests/test_sql_upgrade.py +++ b/tests/test_sql_upgrade.py @@ -526,6 +526,18 @@ class SqlUpgradeTests(test.TestCase): cmd = this_table.delete(id=project['id']) self.engine.execute(cmd) + def test_upgrade_trusts(self): + self.assertEqual(self.schema.version, 0, "DB is at version 0") + self.upgrade(18) + self.assertTableColumns("trust", + ["id", "trustor_user_id", + "trustee_user_id", + "project_id", "impersonation", + "deleted_at", + "expires_at", "extra"]) + self.assertTableColumns("trust_role", + ["trust_id", "role_id"]) + def populate_user_table(self, with_pass_enab=False, with_pass_enab_domain=False): # Populate the appropriate fields in the user diff --git a/tests/test_v3.py b/tests/test_v3.py index a0252af0..2aa2c2b5 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -7,6 +7,8 @@ from keystone.common.sql import util as sql_util from keystone import auth from keystone import test from keystone import config +from keystone.policy.backends import rules + import test_content_types @@ -16,11 +18,14 @@ CONF = config.CONF class RestfulTestCase(test_content_types.RestfulTestCase): def setUp(self): + rules.reset() + self.config([ test.etcdir('keystone.conf.sample'), test.testsdir('test_overrides.conf'), test.testsdir('backend_sql.conf'), test.testsdir('backend_sql_disk.conf')]) + sql_util.setup_test_database() self.load_backends() @@ -62,6 +67,9 @@ class RestfulTestCase(test_content_types.RestfulTestCase): sql_util.teardown_test_database() # need to reset the plug-ins auth.controllers.AUTH_METHODS = {} + #drop the policy rules + CONF.reset() + rules.reset() def new_ref(self): """Populates a ref with attributes common to all API entities.""" diff --git a/tests/test_v3_trust.py b/tests/test_v3_trust.py new file mode 100644 index 00000000..19d645d8 --- /dev/null +++ b/tests/test_v3_trust.py @@ -0,0 +1,286 @@ +import copy +import uuid +import test_v3 +import json + +from keystone import config +from keystone.common.sql import util as sql_util +from keystone import test + +import test_content_types + + +CONF = config.CONF +DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id + + +class TrustTestCase(test_v3.RestfulTestCase): + def setUp(self): + super(TrustTestCase, self).setUp() + self.domain = None + self.password = 'freeipa4all' + self.auth_url = '/v2.0' + self.admin_url = '/v2.0' + self.admin_url_v3 = '/v3' + self.url_template = "%(auth_url)s/%(resource)s" + self.headers = {'Content-type': 'application/json'} + self.trustor = self.create_user() + self.trustee = self.create_user() + self.role_1 = self.create_role() + self.role_2 = self.create_role() + self.grant_role_to_user(self.trustor['id'], + self.role_1['id'], + self.get_project()['id']) + self.grant_role_to_user(self.trustor['id'], + self.role_2['id'], + self.get_project()['id']) + + def v3_request(self, path, data): + r = self.request(method='POST', + path=path, + body=data, + headers=self.headers) + return r + + def get_unscoped_token_response(self, username, password): + url = self.url_template % {'auth_url': self.admin_url, + 'resource': "tokens"} + data = self.get_unscoped_auth(username=username, password=password) + r = self.restful_request(method='POST', + port=self._public_port(), + path=url, + body=data, + headers=self.headers) + if 'access' in r.body: + return r.body['access'] + raise Exception(r) + + def get_scoped_token_response(self, username, password, project_name): + url = self.url_template % {'auth_url': self.admin_url, + 'resource': "tokens"} + data = self.get_scoped_auth(username, password, project_name) + r = self.restful_request(method='POST', + port=self._public_port(), + path=url, + body=data, + headers=self.headers) + if 'access' in r.body: + return r.body['access'] + raise Exception(r) + + def get_admin_token_data(self): + if not hasattr(self, 'admin_token_response'): + self.admin_token_response = self.get_scoped_token_response( + 'admin', 'freeipa4all', 'demo') + return self.admin_token_response + + def get_admin_token_id(self): + return 'ADMIN' + + def make_admin_post_request(self, resource, data): + return self.make_post_request(resource, + data, + self.get_admin_token_id()) + + def make_post_request(self, resource, data, token_id): + headers = copy.copy(self.headers) + headers["x-auth-token"] = token_id + url = self.url_template % {'auth_url': self.admin_url_v3, + 'resource': resource} + r = self.restful_request(method='POST', + path=url, + port=self._admin_port(), + body=data, + headers=headers) + return r + + def make_v2_post_request(self, resource, data, token_id): + headers = copy.copy(self.headers) + headers["x-auth-token"] = token_id + url = self.url_template % {'auth_url': self.admin_url, + 'resource': resource} + r = self.restful_request(method='POST', + path=url, + port=self._admin_port(), + body=data, + headers=headers) + return r + + def make_put_request(self, resource, data, token_id): + headers = copy.copy(self.headers) + headers["x-auth-token"] = self.get_admin_token_id() + url = self.url_template % {'auth_url': self.admin_url_v3, + 'resource': resource} + r = self.request(method='PUT', + path=url, + port=self._admin_port(), + body=json.dumps(data), + headers=headers) + return r + + def create_domain(self): + domain = self.new_domain_ref() + resource = 'domains' + data = {'domain': domain} + r = self.make_admin_post_request(resource, data) + dom = r.body['domain'] + self.domain = dom + + def create_project(self): + project = self.new_project_ref( + domain_id=self.get_domain()['id']) + data = {'project': project} + r = self.make_admin_post_request('projects', data) + self.project = r.body['project'] + + def get_domain(self): + if not self.domain: + #once authenticate supports domains, use the following function +# self.create_domain() + self.domain = {'id': DEFAULT_DOMAIN_ID} + return self.domain + + def get_project(self): + if not hasattr(self, 'project'): + self.create_project() + return self.project + + def create_user(self): + user_id = uuid.uuid4().hex + user = {'user': {'name': uuid.uuid4().hex, + 'password': self.password, + 'enabled': True, + 'domain_id': self.get_domain()['id'], + 'project_id': self.get_project()['id']}} + r = self.make_admin_post_request('users', user) + return r.body['user'] + + def create_role(self): + ref = self.new_role_ref() + body = {'role': ref} + r = self.make_admin_post_request('roles', body) + return r.body['role'] + + def grant_role_to_user(self, user_id, role_id, project_id): + """PUT /projects/{project_id}/users/{user_id}/roles/{role_id}""" + url_template = 'projects/%(project_id)s/users'\ + '/%(user_id)s/roles/%(role_id)s' + url = url_template % {'project_id': project_id, + 'user_id': user_id, + 'role_id': role_id} + r = self.make_put_request(url, '', self.get_admin_token_id()) + return r + + def get_scoped_auth(self, username, password, project_name): + return {"auth": + {"passwordCredentials": {"username": username, + "password": password}, + "projectName": project_name}} + + def get_unscoped_auth(self, username, password): + return {"auth": + {"passwordCredentials": {"username": username, + "password": password}}} + + def create_trust(self, impersonation=True): + trustor_token = self.get_scoped_token_response( + self.trustor['name'], + self.password, + self.get_project()['name']) + trustee_token = self.get_unscoped_token_response(self.trustee['name'], + self.password) + trust_request = {'trust': + {'trustor_user_id': self.trustor['id'], + 'trustee_user_id': self.trustee['id'], + 'project_id': self.get_project()['id'], + 'impersonation': impersonation, + 'description': 'described', + 'roles': []}} + trust_response = self.make_post_request('trusts', trust_request, + trustor_token['token']['id']) + return trust_response, trustee_token + + def test_create_trust(self): + trust_response, trustee_token = self.create_trust() + trust_id = trust_response.body['trust']['id'] + self.assertEquals(trust_response.body['trust']['description'], + 'described') + auth_data = {"auth": {"token": {'id': trustee_token['token']['id']}, + "trust_id": trust_id}} + r = self.make_v2_post_request("tokens", + auth_data, + trustee_token['token']['id']) + trust_token = r.body + self.assertIsNotNone(trust_token['access']['token']['id']) + self.assertEquals(trust_token['access']['trust']['trustee_user_id'], + self.trustee['id']) + self.assertEquals(trust_token['access']['trust']['id'], trust_id) + + def test_delete_trust(self): + trust_response, trustee_token = self.create_trust() + url = self.url_template % {'auth_url': self.admin_url_v3, + 'resource': "trusts/"} + url += trust_response.body['trust']['id'] + trustor_token = self.get_scoped_token_response( + self.trustor['name'], + self.password, + self.get_project()['name']) + + headers = copy.copy(self.headers) + headers["x-auth-token"] = trustor_token['token']['id'] + response = self.request(method='DELETE', + path=url, + port=self._public_port(), + body="", + headers=headers) + self.assertIsNotNone(response) + + def test_list_trusts(self): + trustor_token = self.get_scoped_token_response( + self.trustor['name'], + self.password, + self.get_project()['name']) + + for i in range(0, 3): + trust_response, trustee_token = self.create_trust() + url = self.url_template % {'auth_url': self.admin_url_v3, + 'resource': "trusts"} + headers = copy.copy(self.headers) + headers["x-auth-token"] = self.get_admin_token_id() + trust_lists_response = self.restful_request(method='GET', + path=url, + port=self._public_port(), + body="", + headers=headers) + trusts = trust_lists_response.body['trusts'] + self.assertEqual(len(trusts), 3) + + trustee_url = url + "?trustee_user_id=" + self.trustee['id'] + headers["x-auth-token"] = trustee_token['token']['id'] + trust_lists_response = self.restful_request( + method='GET', path=trustee_url, port=self._public_port(), + body="", headers=headers) + trusts = trust_lists_response.body['trusts'] + self.assertEqual(len(trusts), 3) + + headers["x-auth-token"] = trustor_token['token']['id'] + + trust_lists_response = self.restful_request( + method='GET', path=trustee_url, port=self._public_port(), + body="", headers=headers, expected_status=403) + + trustor_url = url + "?trustor_user_id=" + self.trustor['id'] + headers["x-auth-token"] = trustor_token['token']['id'] + trust_lists_response = self.restful_request( + method='GET', + path=trustor_url, + port=self._public_port(), + body="", + headers=headers) + trusts = trust_lists_response.body['trusts'] + self.assertEqual(len(trusts), 3) + + headers["x-auth-token"] = trustee_token['token']['id'] + trust_lists_response = self.restful_request( + method='GET', path=trustor_url, port=self._public_port(), + body="", headers=headers, expected_status=403) |