diff options
Diffstat (limited to 'tests/test_sql_upgrade.py')
-rw-r--r-- | tests/test_sql_upgrade.py | 424 |
1 files changed, 409 insertions, 15 deletions
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py index ac4e5637..cf82b814 100644 --- a/tests/test_sql_upgrade.py +++ b/tests/test_sql_upgrade.py @@ -506,6 +506,10 @@ class SqlUpgradeTests(test.TestCase): def test_downgrade_to_0(self): self.upgrade(self.max_version) + + if self.engine.name == 'mysql': + self._mysql_check_all_tables_innodb() + self.downgrade(0) for table_name in ["user", "token", "role", "user_tenant_membership", "metadata"]: @@ -590,6 +594,22 @@ class SqlUpgradeTests(test.TestCase): 'data': json.dumps({"roles": [role2['id']]})} session.execute(metadata_table.insert().values(role_grant)) + # Create another user to test the case where member_role_id is already + # assigned. + user2 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'password': uuid.uuid4().hex, + 'enabled': True, + 'extra': json.dumps({})} + session.execute(user_table.insert().values(user2)) + + # Grant CONF.member_role_id to User2 + role_grant = {'user_id': user2['id'], + 'tenant_id': project['id'], + 'data': json.dumps({"roles": [CONF.member_role_id]})} + session.execute(metadata_table.insert().values(role_grant)) + session.commit() self.upgrade(17) @@ -597,17 +617,19 @@ class SqlUpgradeTests(test.TestCase): user_project_metadata_table = sqlalchemy.Table( 'user_project_metadata', self.metadata, autoload=True) - r = session.execute('select data from metadata where ' - 'user_id=:user and tenant_id=:tenant', - {'user': user['id'], 'tenant': project['id']}) + s = sqlalchemy.select([metadata_table.c.data]).where( + (metadata_table.c.user_id == user['id']) & + (metadata_table.c.tenant_id == project['id'])) + r = session.execute(s) test_project1 = json.loads(r.fetchone()['data']) self.assertEqual(len(test_project1['roles']), 1) self.assertIn(role['id'], test_project1['roles']) # Test user in project2 has role2 - r = session.execute('select data from metadata where ' - 'user_id=:user and tenant_id=:tenant', - {'user': user['id'], 'tenant': project2['id']}) + s = sqlalchemy.select([metadata_table.c.data]).where( + (metadata_table.c.user_id == user['id']) & + (metadata_table.c.tenant_id == project2['id'])) + r = session.execute(s) test_project2 = json.loads(r.fetchone()['data']) self.assertEqual(len(test_project2['roles']), 1) self.assertIn(role2['id'], test_project2['roles']) @@ -615,9 +637,10 @@ class SqlUpgradeTests(test.TestCase): # Test for user in project has role in user_project_metadata # Migration 17 does not properly migrate this data, so this should # be None. - r = session.execute('select data from user_project_metadata where ' - 'user_id=:user and project_id=:project', - {'user': user['id'], 'project': project['id']}) + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project['id'])) + r = session.execute(s) self.assertIsNone(r.fetchone()) # Create a conflicting user-project in user_project_metadata with @@ -628,6 +651,14 @@ class SqlUpgradeTests(test.TestCase): 'data': data} cmd = user_project_metadata_table.insert().values(role_grant) self.engine.execute(cmd) + + # Create another conflicting user-project for User2 + data = json.dumps({"roles": [role2['id']]}) + role_grant = {'user_id': user2['id'], + 'project_id': project['id'], + 'data': data} + cmd = user_project_metadata_table.insert().values(role_grant) + self.engine.execute(cmd) # End Scaffolding session.commit() @@ -638,9 +669,10 @@ class SqlUpgradeTests(test.TestCase): # The user-project pairs should have all roles from the previous # metadata table in addition to any roles currently in # user_project_metadata - r = session.execute('select data from user_project_metadata where ' - 'user_id=:user and project_id=:project', - {'user': user['id'], 'project': project['id']}) + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project['id'])) + r = session.execute(s) role_ids = json.loads(r.fetchone()['data'])['roles'] self.assertEqual(len(role_ids), 3) self.assertIn(CONF.member_role_id, role_ids) @@ -649,9 +681,10 @@ class SqlUpgradeTests(test.TestCase): # pairs that only existed in old metadata table should be in # user_project_metadata - r = session.execute('select data from user_project_metadata where ' - 'user_id=:user and project_id=:project', - {'user': user['id'], 'project': project2['id']}) + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) role_ids = json.loads(r.fetchone()['data'])['roles'] self.assertEqual(len(role_ids), 2) self.assertIn(CONF.member_role_id, role_ids) @@ -815,6 +848,344 @@ class SqlUpgradeTests(test.TestCase): self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id) self.assertEqual(ref.extra, '{}') + def test_group_project_FK_fixup(self): + # To create test data we must start before we broke in the + # group_project_metadata table in 015. + self.upgrade(14) + session = self.Session() + + domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) + group_table = sqlalchemy.Table('group', self.metadata, autoload=True) + tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True) + role_table = sqlalchemy.Table('role', self.metadata, autoload=True) + group_project_metadata_table = sqlalchemy.Table( + 'group_project_metadata', self.metadata, autoload=True) + + # Create a Domain + domain = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + session.execute(domain_table.insert().values(domain)) + + # Create two Tenants + tenant = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'extra': "{}"} + session.execute(tenant_table.insert().values(tenant)) + + tenant1 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'extra': "{}"} + session.execute(tenant_table.insert().values(tenant1)) + + # Create a Group + group = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': json.dumps({})} + session.execute(group_table.insert().values(group)) + + # Create roles + role_list = [] + for _ in range(2): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + session.execute(role_table.insert().values(role)) + role_list.append(role) + + # Grant Role to User on Project + role_grant = {'group_id': group['id'], + 'project_id': tenant['id'], + 'data': json.dumps({'roles': [role_list[0]['id']]})} + session.execute( + group_project_metadata_table.insert().values(role_grant)) + + role_grant = {'group_id': group['id'], + 'project_id': tenant1['id'], + 'data': json.dumps({'roles': [role_list[1]['id']]})} + session.execute( + group_project_metadata_table.insert().values(role_grant)) + + session.commit() + + # Now upgrade and fix up the FKs + self.upgrade(28) + self.assertTableExists('group_project_metadata') + self.assertTableExists('project') + self.assertTableDoesNotExist('tenant') + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == tenant['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[0]['id'], data['roles']) + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == tenant1['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[1]['id'], data['roles']) + + self.downgrade(27) + self.assertTableExists('group_project_metadata') + self.assertTableExists('project') + self.assertTableDoesNotExist('tenant') + + def test_assignment_metadata_migration(self): + self.upgrade(28) + # Scaffolding + session = self.Session() + + domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) + user_table = sqlalchemy.Table('user', self.metadata, autoload=True) + group_table = sqlalchemy.Table('group', self.metadata, autoload=True) + role_table = sqlalchemy.Table('role', self.metadata, autoload=True) + project_table = sqlalchemy.Table( + 'project', self.metadata, autoload=True) + user_project_metadata_table = sqlalchemy.Table( + 'user_project_metadata', self.metadata, autoload=True) + user_domain_metadata_table = sqlalchemy.Table( + 'user_domain_metadata', self.metadata, autoload=True) + group_project_metadata_table = sqlalchemy.Table( + 'group_project_metadata', self.metadata, autoload=True) + group_domain_metadata_table = sqlalchemy.Table( + 'group_domain_metadata', self.metadata, autoload=True) + + # Create a Domain + domain = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + session.execute(domain_table.insert().values(domain)) + + # Create anther Domain + domain2 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + session.execute(domain_table.insert().values(domain2)) + + # Create a Project + project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': "{}"} + session.execute(project_table.insert().values(project)) + + # Create another Project + project2 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': "{}"} + session.execute(project_table.insert().values(project2)) + + # Create a User + user = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'password': uuid.uuid4().hex, + 'enabled': True, + 'extra': json.dumps({})} + session.execute(user_table.insert().values(user)) + + # Create a Group + group = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'extra': json.dumps({})} + session.execute(group_table.insert().values(group)) + + # Create roles + role_list = [] + for _ in range(7): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + session.execute(role_table.insert().values(role)) + role_list.append(role) + + # Grant Role to User on Project + role_grant = {'user_id': user['id'], + 'project_id': project['id'], + 'data': json.dumps({'roles': [role_list[0]['id']]})} + session.execute( + user_project_metadata_table.insert().values(role_grant)) + + role_grant = {'user_id': user['id'], + 'project_id': project2['id'], + 'data': json.dumps({'roles': [role_list[1]['id']]})} + session.execute( + user_project_metadata_table.insert().values(role_grant)) + + # Grant Role to Group on different Project + role_grant = {'group_id': group['id'], + 'project_id': project2['id'], + 'data': json.dumps({'roles': [role_list[2]['id']]})} + session.execute( + group_project_metadata_table.insert().values(role_grant)) + + # Grant Role to User on Domain + role_grant = {'user_id': user['id'], + 'domain_id': domain['id'], + 'data': json.dumps({'roles': [role_list[3]['id']]})} + session.execute(user_domain_metadata_table.insert().values(role_grant)) + + # Grant Role to Group on Domain + role_grant = {'group_id': group['id'], + 'domain_id': domain['id'], + 'data': json.dumps( + {'roles': [role_list[4]['id']], + 'other': 'somedata'})} + session.execute( + group_domain_metadata_table.insert().values(role_grant)) + + session.commit() + + self.upgrade(29) + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[0]['id']}, data['roles']) + + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[1]['id']}, data['roles']) + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[2]['id']}, data['roles']) + + s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( + (user_domain_metadata_table.c.user_id == user['id']) & + (user_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[3]['id']}, data['roles']) + + s = sqlalchemy.select([group_domain_metadata_table.c.data]).where( + (group_domain_metadata_table.c.group_id == group['id']) & + (group_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn({'id': role_list[4]['id']}, data['roles']) + self.assertIn('other', data) + + # Now add an entry that has one regular and one inherited role + role_grant = {'user_id': user['id'], + 'domain_id': domain2['id'], + 'data': json.dumps( + {'roles': [{'id': role_list[5]['id']}, + {'id': role_list[6]['id'], + 'inherited_to': 'projects'}]})} + session.execute(user_domain_metadata_table.insert().values(role_grant)) + + session.commit() + self.downgrade(28) + + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[0]['id'], data['roles']) + + s = sqlalchemy.select([user_project_metadata_table.c.data]).where( + (user_project_metadata_table.c.user_id == user['id']) & + (user_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[1]['id'], data['roles']) + + s = sqlalchemy.select([group_project_metadata_table.c.data]).where( + (group_project_metadata_table.c.group_id == group['id']) & + (group_project_metadata_table.c.project_id == project2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[2]['id'], data['roles']) + + s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( + (user_domain_metadata_table.c.user_id == user['id']) & + (user_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[3]['id'], data['roles']) + + s = sqlalchemy.select([group_domain_metadata_table.c.data]).where( + (group_domain_metadata_table.c.group_id == group['id']) & + (group_domain_metadata_table.c.domain_id == domain['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[4]['id'], data['roles']) + self.assertIn('other', data) + + # For user-domain2, where we had one regular and one inherited role, + # only the direct role should remain, the inherited role should + # have been deleted during the downgrade + s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( + (user_domain_metadata_table.c.user_id == user['id']) & + (user_domain_metadata_table.c.domain_id == domain2['id'])) + r = session.execute(s) + data = json.loads(r.fetchone()['data']) + self.assertEqual(len(data['roles']), 1) + self.assertIn(role_list[5]['id'], data['roles']) + + def test_drop_credential_constraint(self): + ec2_credential = { + 'id': '100', + 'user_id': 'foo', + 'project_id': 'bar', + 'type': 'ec2', + 'blob': json.dumps({ + "access": "12345", + "secret": "12345" + }) + } + user = { + 'id': 'foo', + 'name': 'FOO', + 'password': 'foo2', + 'enabled': True, + 'email': 'foo@bar.com', + 'extra': json.dumps({'enabled': True}) + } + tenant = { + 'id': 'bar', + 'name': 'BAR', + 'description': 'description', + 'enabled': True, + 'extra': json.dumps({'enabled': True}) + } + session = self.Session() + self.upgrade(7) + self.insert_dict(session, 'user', user) + self.insert_dict(session, 'tenant', tenant) + self.insert_dict(session, 'credential', ec2_credential) + session.commit() + self.upgrade(30) + cred_table = sqlalchemy.Table('credential', + self.metadata, + autoload=True) + cred = session.query(cred_table).filter("id='100'").one() + self.assertEqual(cred.user_id, + ec2_credential['user_id']) + def populate_user_table(self, with_pass_enab=False, with_pass_enab_domain=False): # Populate the appropriate fields in the user @@ -956,3 +1327,26 @@ class SqlUpgradeTests(test.TestCase): for ver, change in changeset: self.schema.runchange(ver, change, changeset.step) self.assertEqual(self.schema.version, version) + + def _mysql_check_all_tables_innodb(self): + database = self.engine.url.database + + connection = self.engine.connect() + # sanity check + total = connection.execute("SELECT count(*) " + "from information_schema.TABLES " + "where TABLE_SCHEMA='%(database)s'" % + locals()) + self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?") + + noninnodb = connection.execute("SELECT table_name " + "from information_schema.TABLES " + "where TABLE_SCHEMA='%(database)s' " + "and ENGINE!='InnoDB' " + "and TABLE_NAME!='migrate_version'" % + locals()) + names = [x[0] for x in noninnodb] + self.assertEqual(names, [], + "Non-InnoDB tables exist") + + connection.close() |