summaryrefslogtreecommitdiffstats
path: root/tests/test_sql_upgrade.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_sql_upgrade.py')
-rw-r--r--tests/test_sql_upgrade.py424
1 files changed, 409 insertions, 15 deletions
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py
index ac4e5637..cf82b814 100644
--- a/tests/test_sql_upgrade.py
+++ b/tests/test_sql_upgrade.py
@@ -506,6 +506,10 @@ class SqlUpgradeTests(test.TestCase):
def test_downgrade_to_0(self):
self.upgrade(self.max_version)
+
+ if self.engine.name == 'mysql':
+ self._mysql_check_all_tables_innodb()
+
self.downgrade(0)
for table_name in ["user", "token", "role", "user_tenant_membership",
"metadata"]:
@@ -590,6 +594,22 @@ class SqlUpgradeTests(test.TestCase):
'data': json.dumps({"roles": [role2['id']]})}
session.execute(metadata_table.insert().values(role_grant))
+ # Create another user to test the case where member_role_id is already
+ # assigned.
+ user2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user2))
+
+ # Grant CONF.member_role_id to User2
+ role_grant = {'user_id': user2['id'],
+ 'tenant_id': project['id'],
+ 'data': json.dumps({"roles": [CONF.member_role_id]})}
+ session.execute(metadata_table.insert().values(role_grant))
+
session.commit()
self.upgrade(17)
@@ -597,17 +617,19 @@ class SqlUpgradeTests(test.TestCase):
user_project_metadata_table = sqlalchemy.Table(
'user_project_metadata', self.metadata, autoload=True)
- r = session.execute('select data from metadata where '
- 'user_id=:user and tenant_id=:tenant',
- {'user': user['id'], 'tenant': project['id']})
+ s = sqlalchemy.select([metadata_table.c.data]).where(
+ (metadata_table.c.user_id == user['id']) &
+ (metadata_table.c.tenant_id == project['id']))
+ r = session.execute(s)
test_project1 = json.loads(r.fetchone()['data'])
self.assertEqual(len(test_project1['roles']), 1)
self.assertIn(role['id'], test_project1['roles'])
# Test user in project2 has role2
- r = session.execute('select data from metadata where '
- 'user_id=:user and tenant_id=:tenant',
- {'user': user['id'], 'tenant': project2['id']})
+ s = sqlalchemy.select([metadata_table.c.data]).where(
+ (metadata_table.c.user_id == user['id']) &
+ (metadata_table.c.tenant_id == project2['id']))
+ r = session.execute(s)
test_project2 = json.loads(r.fetchone()['data'])
self.assertEqual(len(test_project2['roles']), 1)
self.assertIn(role2['id'], test_project2['roles'])
@@ -615,9 +637,10 @@ class SqlUpgradeTests(test.TestCase):
# Test for user in project has role in user_project_metadata
# Migration 17 does not properly migrate this data, so this should
# be None.
- r = session.execute('select data from user_project_metadata where '
- 'user_id=:user and project_id=:project',
- {'user': user['id'], 'project': project['id']})
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
self.assertIsNone(r.fetchone())
# Create a conflicting user-project in user_project_metadata with
@@ -628,6 +651,14 @@ class SqlUpgradeTests(test.TestCase):
'data': data}
cmd = user_project_metadata_table.insert().values(role_grant)
self.engine.execute(cmd)
+
+ # Create another conflicting user-project for User2
+ data = json.dumps({"roles": [role2['id']]})
+ role_grant = {'user_id': user2['id'],
+ 'project_id': project['id'],
+ 'data': data}
+ cmd = user_project_metadata_table.insert().values(role_grant)
+ self.engine.execute(cmd)
# End Scaffolding
session.commit()
@@ -638,9 +669,10 @@ class SqlUpgradeTests(test.TestCase):
# The user-project pairs should have all roles from the previous
# metadata table in addition to any roles currently in
# user_project_metadata
- r = session.execute('select data from user_project_metadata where '
- 'user_id=:user and project_id=:project',
- {'user': user['id'], 'project': project['id']})
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
role_ids = json.loads(r.fetchone()['data'])['roles']
self.assertEqual(len(role_ids), 3)
self.assertIn(CONF.member_role_id, role_ids)
@@ -649,9 +681,10 @@ class SqlUpgradeTests(test.TestCase):
# pairs that only existed in old metadata table should be in
# user_project_metadata
- r = session.execute('select data from user_project_metadata where '
- 'user_id=:user and project_id=:project',
- {'user': user['id'], 'project': project2['id']})
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
role_ids = json.loads(r.fetchone()['data'])['roles']
self.assertEqual(len(role_ids), 2)
self.assertIn(CONF.member_role_id, role_ids)
@@ -815,6 +848,344 @@ class SqlUpgradeTests(test.TestCase):
self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id)
self.assertEqual(ref.extra, '{}')
+ def test_group_project_FK_fixup(self):
+ # To create test data we must start before we broke in the
+ # group_project_metadata table in 015.
+ self.upgrade(14)
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create two Tenants
+ tenant = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant))
+
+ tenant1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant1))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(2):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant1['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ # Now upgrade and fix up the FKs
+ self.upgrade(28)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant1['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ self.downgrade(27)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ def test_assignment_metadata_migration(self):
+ self.upgrade(28)
+ # Scaffolding
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ project_table = sqlalchemy.Table(
+ 'project', self.metadata, autoload=True)
+ user_project_metadata_table = sqlalchemy.Table(
+ 'user_project_metadata', self.metadata, autoload=True)
+ user_domain_metadata_table = sqlalchemy.Table(
+ 'user_domain_metadata', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+ group_domain_metadata_table = sqlalchemy.Table(
+ 'group_domain_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create anther Domain
+ domain2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain2))
+
+ # Create a Project
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project))
+
+ # Create another Project
+ project2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project2))
+
+ # Create a User
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(7):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'user_id': user['id'],
+ 'project_id': project['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'user_id': user['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on different Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[2]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to User on Domain
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps({'roles': [role_list[3]['id']]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on Domain
+ role_grant = {'group_id': group['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps(
+ {'roles': [role_list[4]['id']],
+ 'other': 'somedata'})}
+ session.execute(
+ group_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ self.upgrade(29)
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[0]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[1]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[2]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[3]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[4]['id']}, data['roles'])
+ self.assertIn('other', data)
+
+ # Now add an entry that has one regular and one inherited role
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain2['id'],
+ 'data': json.dumps(
+ {'roles': [{'id': role_list[5]['id']},
+ {'id': role_list[6]['id'],
+ 'inherited_to': 'projects'}]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+ self.downgrade(28)
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[2]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[3]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[4]['id'], data['roles'])
+ self.assertIn('other', data)
+
+ # For user-domain2, where we had one regular and one inherited role,
+ # only the direct role should remain, the inherited role should
+ # have been deleted during the downgrade
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[5]['id'], data['roles'])
+
+ def test_drop_credential_constraint(self):
+ ec2_credential = {
+ 'id': '100',
+ 'user_id': 'foo',
+ 'project_id': 'bar',
+ 'type': 'ec2',
+ 'blob': json.dumps({
+ "access": "12345",
+ "secret": "12345"
+ })
+ }
+ user = {
+ 'id': 'foo',
+ 'name': 'FOO',
+ 'password': 'foo2',
+ 'enabled': True,
+ 'email': 'foo@bar.com',
+ 'extra': json.dumps({'enabled': True})
+ }
+ tenant = {
+ 'id': 'bar',
+ 'name': 'BAR',
+ 'description': 'description',
+ 'enabled': True,
+ 'extra': json.dumps({'enabled': True})
+ }
+ session = self.Session()
+ self.upgrade(7)
+ self.insert_dict(session, 'user', user)
+ self.insert_dict(session, 'tenant', tenant)
+ self.insert_dict(session, 'credential', ec2_credential)
+ session.commit()
+ self.upgrade(30)
+ cred_table = sqlalchemy.Table('credential',
+ self.metadata,
+ autoload=True)
+ cred = session.query(cred_table).filter("id='100'").one()
+ self.assertEqual(cred.user_id,
+ ec2_credential['user_id'])
+
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user
@@ -956,3 +1327,26 @@ class SqlUpgradeTests(test.TestCase):
for ver, change in changeset:
self.schema.runchange(ver, change, changeset.step)
self.assertEqual(self.schema.version, version)
+
+ def _mysql_check_all_tables_innodb(self):
+ database = self.engine.url.database
+
+ connection = self.engine.connect()
+ # sanity check
+ total = connection.execute("SELECT count(*) "
+ "from information_schema.TABLES "
+ "where TABLE_SCHEMA='%(database)s'" %
+ locals())
+ self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
+
+ noninnodb = connection.execute("SELECT table_name "
+ "from information_schema.TABLES "
+ "where TABLE_SCHEMA='%(database)s' "
+ "and ENGINE!='InnoDB' "
+ "and TABLE_NAME!='migrate_version'" %
+ locals())
+ names = [x[0] for x in noninnodb]
+ self.assertEqual(names, [],
+ "Non-InnoDB tables exist")
+
+ connection.close()