summaryrefslogtreecommitdiffstats
path: root/tests/test_sql_upgrade.py
diff options
context:
space:
mode:
authorHenry Nash <henryn@linux.vnet.ibm.com>2013-07-05 06:04:25 +0100
committerHenry Nash <henryn@linux.vnet.ibm.com>2013-07-17 00:37:32 +0100
commit7f4891ddc3da7457df09c0cc8bbfe8a888063feb (patch)
tree89a26f688965566ec35eb35192d34a71ab513da4 /tests/test_sql_upgrade.py
parent3a56c8a68d0f033266f98963261a6d724e506966 (diff)
downloadkeystone-7f4891ddc3da7457df09c0cc8bbfe8a888063feb.tar.gz
keystone-7f4891ddc3da7457df09c0cc8bbfe8a888063feb.tar.xz
keystone-7f4891ddc3da7457df09c0cc8bbfe8a888063feb.zip
Implement role assignment inheritance (OS-INHERIT extension)
This extension allows for project roles to be optionally inherited from the owning domain. The v3 grant APIs are extended to take an inherited_to_projects flag. The GET role_assignments API will also include these roles in its response, either showing them as inherited roles assigned to the domain or, if the 'effective' query parameter is set, will interpret the inheritance and reflect those role assignments on the projects. The inherited_to_projects flag is encoded in the role list in the metadata of the relevant entries in the grant tables. The 'roles' key in the metadata is now a list of dicts, as opposed to a simple list, where each dict is either {'id': role_id} for a regular role, or {'id': role_id, 'inherited_to': 'projects'} for an inherited role Remember that a previous patch had rationalized the way metadata is handled so that its structure is entirely hidden within the driver layer. The extension can be enabled/disabled via a config setting. Limitations: - The extension is not yet discoverable via url, this will be added as a separate patch when the v3/extensions work is complete. A separate issue has been discovered with the fact that the v2 calls of 'get_projects_for_user()' and 'list_user_projects()' should be rationalized and also honor both group (and inherited) role assignments. This is being raised as a separate bug. DocImpact Implements bp inherited-domain-roles Change-Id: I35b57ce0df668f12462e96b3467cef0239594e97
Diffstat (limited to 'tests/test_sql_upgrade.py')
-rw-r--r--tests/test_sql_upgrade.py298
1 files changed, 298 insertions, 0 deletions
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py
index e7e34b4b..64bf53c8 100644
--- a/tests/test_sql_upgrade.py
+++ b/tests/test_sql_upgrade.py
@@ -824,6 +824,304 @@ class SqlUpgradeTests(test.TestCase):
self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id)
self.assertEqual(ref.extra, '{}')
+ def test_group_project_FK_fixup(self):
+ # To create test data we must start before we broke in the
+ # group_project_metadata table in 015.
+ self.upgrade(14)
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create two Tenants
+ tenant = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant))
+
+ tenant1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant1))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(2):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant1['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ # Now upgrade and fix up the FKs
+ self.upgrade(28)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant1['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ self.downgrade(27)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ def test_assignment_metadata_migration(self):
+ self.upgrade(28)
+ # Scaffolding
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ project_table = sqlalchemy.Table(
+ 'project', self.metadata, autoload=True)
+ user_project_metadata_table = sqlalchemy.Table(
+ 'user_project_metadata', self.metadata, autoload=True)
+ user_domain_metadata_table = sqlalchemy.Table(
+ 'user_domain_metadata', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+ group_domain_metadata_table = sqlalchemy.Table(
+ 'group_domain_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create anther Domain
+ domain2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain2))
+
+ # Create a Project
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project))
+
+ # Create another Project
+ project2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project2))
+
+ # Create a User
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(7):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'user_id': user['id'],
+ 'project_id': project['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'user_id': user['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on different Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[2]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to User on Domain
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps({'roles': [role_list[3]['id']]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on Domain
+ role_grant = {'group_id': group['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps(
+ {'roles': [role_list[4]['id']],
+ 'other': 'somedata'})}
+ session.execute(
+ group_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ self.upgrade(29)
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[0]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[1]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[2]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[3]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[4]['id']}, data['roles'])
+ self.assertIn('other', data)
+
+ # Now add an entry that has one regular and one inherited role
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain2['id'],
+ 'data': json.dumps(
+ {'roles': [{'id': role_list[5]['id']},
+ {'id': role_list[6]['id'],
+ 'inherited_to': 'projects'}]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+ self.downgrade(28)
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[2]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[3]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[4]['id'], data['roles'])
+ self.assertIn('other', data)
+
+ # For user-domain2, where we had one regular and one inherited role,
+ # only the direct role should remain, the inherited role should
+ # have been deleted during the downgrade
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[5]['id'], data['roles'])
+
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user