# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 OpenStack LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ To run these tests against a live database: 1. Modify the file `tests/backend_sql.conf` to use the connection for your live database 2. Set up a blank, live database. 3. run the tests using ./run_tests.sh -N test_sql_upgrade WARNING:: Your database will be wiped. Do not do this against a Database with valuable data as all data will be lost. """ import copy import json import uuid from migrate.versioning import api as versioning_api import sqlalchemy from keystone import test from keystone.common import sql from keystone.common.sql import migration from keystone import config import default_fixtures CONF = config.CONF DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id class SqlMigrateBase(test.TestCase): def initialize_sql(self): self.metadata = sqlalchemy.MetaData() self.metadata.bind = self.engine _config_file_list = [test.etcdir('keystone.conf.sample'), test.testsdir('test_overrides.conf'), test.testsdir('backend_sql.conf')] #override this to specify the complete list of configuration files def config_files(self): return self._config_file_list def repo_package(self): return None def setUp(self): super(SqlMigrateBase, self).setUp() self.config(self.config_files()) self.base = sql.Base() # create and share a single sqlalchemy engine for testing self.engine = self.base.get_engine(allow_global_engine=False) self.Session = self.base.get_sessionmaker(engine=self.engine, autocommit=False) self.initialize_sql() self.repo_path = migration.find_migrate_repo(self.repo_package()) self.schema = versioning_api.ControlledSchema.create( self.engine, self.repo_path, 0) # auto-detect the highest available schema version in the migrate_repo self.max_version = self.schema.repository.version().version def tearDown(self): sqlalchemy.orm.session.Session.close_all() table = sqlalchemy.Table("migrate_version", self.metadata, autoload=True) self.downgrade(0) table.drop(self.engine, checkfirst=True) super(SqlMigrateBase, self).tearDown() def select_table(self, name): table = sqlalchemy.Table(name, self.metadata, autoload=True) s = sqlalchemy.select([table]) return s def assertTableExists(self, table_name): try: self.select_table(table_name) except sqlalchemy.exc.NoSuchTableError: raise AssertionError('Table "%s" does not exist' % table_name) def assertTableDoesNotExist(self, table_name): """Asserts that a given table exists cannot be selected by name.""" # Switch to a different metadata otherwise you might still # detect renamed or dropped tables try: temp_metadata = sqlalchemy.MetaData() temp_metadata.bind = self.engine sqlalchemy.Table(table_name, temp_metadata, autoload=True) except sqlalchemy.exc.NoSuchTableError: pass else: raise AssertionError('Table "%s" already exists' % table_name) def upgrade(self, *args, **kwargs): self._migrate(*args, **kwargs) def downgrade(self, *args, **kwargs): self._migrate(*args, downgrade=True, **kwargs) def _migrate(self, version, repository=None, downgrade=False, current_schema=None): repository = repository or self.repo_path err = '' version = versioning_api._migrate_version(self.schema, version, not downgrade, err) if not current_schema: current_schema = self.schema changeset = current_schema.changeset(version) for ver, change in changeset: self.schema.runchange(ver, change, changeset.step) self.assertEqual(self.schema.version, version) def assertTableColumns(self, table_name, expected_cols): """Asserts that the table contains the expected set of columns.""" self.initialize_sql() table = self.select_table(table_name) actual_cols = [col.name for col in table.columns] self.assertEqual(expected_cols, actual_cols, '%s table' % table_name) class SqlUpgradeTests(SqlMigrateBase): def test_blank_db_to_start(self): self.assertTableDoesNotExist('user') def test_start_version_0(self): version = migration.db_version() self.assertEqual(version, 0, "DB is at version 0") def test_two_steps_forward_one_step_back(self): """You should be able to cleanly undo and re-apply all upgrades. Upgrades are run in the following order:: 0 -> 1 -> 0 -> 1 -> 2 -> 1 -> 2 -> 3 -> 2 -> 3 ... ^---------^ ^---------^ ^---------^ """ for x in range(1, self.max_version + 1): self.upgrade(x) self.downgrade(x - 1) self.upgrade(x) def test_upgrade_add_initial_tables(self): self.upgrade(1) self.assertTableColumns("user", ["id", "name", "extra"]) self.assertTableColumns("tenant", ["id", "name", "extra"]) self.assertTableColumns("role", ["id", "name"]) self.assertTableColumns("user_tenant_membership", ["user_id", "tenant_id"]) self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"]) self.populate_user_table() def test_upgrade_add_policy(self): self.upgrade(5) self.assertTableDoesNotExist('policy') self.upgrade(6) self.assertTableExists('policy') self.assertTableColumns('policy', ['id', 'type', 'blob', 'extra']) def test_upgrade_normalize_identity(self): self.upgrade(8) self.populate_user_table() self.populate_tenant_table() self.upgrade(10) self.assertTableColumns("user", ["id", "name", "extra", "password", "enabled"]) self.assertTableColumns("tenant", ["id", "name", "extra", "description", "enabled"]) self.assertTableColumns("role", ["id", "name", "extra"]) self.assertTableColumns("user_tenant_membership", ["user_id", "tenant_id"]) self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"]) session = self.Session() user_table = sqlalchemy.Table("user", self.metadata, autoload=True) a_user = session.query(user_table).filter("id='foo'").one() self.assertTrue(a_user.enabled) a_user = session.query(user_table).filter("id='badguy'").one() self.assertFalse(a_user.enabled) tenant_table = sqlalchemy.Table("tenant", self.metadata, autoload=True) a_tenant = session.query(tenant_table).filter("id='baz'").one() self.assertEqual(a_tenant.description, 'description') session.commit() session.close() def test_normalized_enabled_states(self): self.upgrade(8) users = { 'bool_enabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({'enabled': True})}, 'bool_disabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({'enabled': False})}, 'str_enabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({'enabled': 'True'})}, 'str_disabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({'enabled': 'False'})}, 'int_enabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({'enabled': 1})}, 'int_disabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({'enabled': 0})}, 'null_enabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({'enabled': None})}, 'unset_enabled_user': { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'extra': json.dumps({})}} session = self.Session() for user in users.values(): self.insert_dict(session, 'user', user) session.commit() self.upgrade(10) user_table = sqlalchemy.Table('user', self.metadata, autoload=True) q = session.query(user_table, 'enabled') user = q.filter_by(id=users['bool_enabled_user']['id']).one() self.assertTrue(user.enabled) user = q.filter_by(id=users['bool_disabled_user']['id']).one() self.assertFalse(user.enabled) user = q.filter_by(id=users['str_enabled_user']['id']).one() self.assertTrue(user.enabled) user = q.filter_by(id=users['str_disabled_user']['id']).one() self.assertFalse(user.enabled) user = q.filter_by(id=users['int_enabled_user']['id']).one() self.assertTrue(user.enabled) user = q.filter_by(id=users['int_disabled_user']['id']).one() self.assertFalse(user.enabled) user = q.filter_by(id=users['null_enabled_user']['id']).one() self.assertTrue(user.enabled) user = q.filter_by(id=users['unset_enabled_user']['id']).one() self.assertTrue(user.enabled) def test_downgrade_10_to_8(self): self.upgrade(10) self.populate_user_table(with_pass_enab=True) self.populate_tenant_table(with_desc_enab=True) self.downgrade(8) self.assertTableColumns('user', ['id', 'name', 'extra']) self.assertTableColumns('tenant', ['id', 'name', 'extra']) session = self.Session() user_table = sqlalchemy.Table("user", self.metadata, autoload=True) a_user = session.query(user_table).filter("id='badguy'").one() self.assertEqual(a_user.name, default_fixtures.USERS[2]['name']) tenant_table = sqlalchemy.Table("tenant", self.metadata, autoload=True) a_tenant = session.query(tenant_table).filter("id='baz'").one() self.assertEqual(a_tenant.name, default_fixtures.TENANTS[1]['name']) session.commit() session.close() def test_upgrade_endpoints(self): self.upgrade(10) service_extra = { 'name': uuid.uuid4().hex, } service = { 'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex, 'extra': json.dumps(service_extra), } endpoint_extra = { 'publicurl': uuid.uuid4().hex, 'internalurl': uuid.uuid4().hex, 'adminurl': uuid.uuid4().hex, } endpoint = { 'id': uuid.uuid4().hex, 'region': uuid.uuid4().hex, 'service_id': service['id'], 'extra': json.dumps(endpoint_extra), } session = self.Session() self.insert_dict(session, 'service', service) self.insert_dict(session, 'endpoint', endpoint) session.commit() session.close() self.upgrade(13) self.assertTableColumns( 'service', ['id', 'type', 'extra']) self.assertTableColumns( 'endpoint', ['id', 'legacy_endpoint_id', 'interface', 'region', 'service_id', 'url', 'extra']) endpoint_table = sqlalchemy.Table( 'endpoint', self.metadata, autoload=True) session = self.Session() self.assertEqual(session.query(endpoint_table).count(), 3) for interface in ['public', 'internal', 'admin']: q = session.query(endpoint_table) q = q.filter_by(legacy_endpoint_id=endpoint['id']) q = q.filter_by(interface=interface) ref = q.one() self.assertNotEqual(ref.id, endpoint['id']) self.assertEqual(ref.legacy_endpoint_id, endpoint['id']) self.assertEqual(ref.interface, interface) self.assertEqual(ref.region, endpoint['region']) self.assertEqual(ref.service_id, endpoint['service_id']) self.assertEqual(ref.url, endpoint_extra['%surl' % interface]) self.assertEqual(ref.extra, '{}') session.commit() session.close() def assertTenantTables(self): self.assertTableExists('tenant') self.assertTableExists('user_tenant_membership') self.assertTableDoesNotExist('project') self.assertTableDoesNotExist('user_project_membership') def assertProjectTables(self): self.assertTableExists('project') self.assertTableExists('user_project_membership') self.assertTableDoesNotExist('tenant') self.assertTableDoesNotExist('user_tenant_membership') def test_upgrade_tenant_to_project(self): self.upgrade(14) self.assertTenantTables() self.upgrade(15) self.assertProjectTables() def test_downgrade_project_to_tenant(self): # TODO(henry-nash): Debug why we need to re-load the tenant # or user_tenant_membership ahead of upgrading to project # in order for the assertProjectTables to work on sqlite # (MySQL is fine without it) self.upgrade(14) self.assertTenantTables() self.upgrade(15) self.assertProjectTables() self.downgrade(14) self.assertTenantTables() def test_upgrade_add_group_tables(self): self.upgrade(13) self.upgrade(14) self.assertTableExists('group') self.assertTableExists('group_project_metadata') self.assertTableExists('group_domain_metadata') self.assertTableExists('user_group_membership') def test_upgrade_14_to_16(self): self.upgrade(14) self.populate_user_table(with_pass_enab=True) self.populate_tenant_table(with_desc_enab=True) self.upgrade(16) self.assertTableColumns("user", ["id", "name", "extra", "password", "enabled", "domain_id"]) session = self.Session() user_table = sqlalchemy.Table("user", self.metadata, autoload=True) a_user = session.query(user_table).filter("id='foo'").one() self.assertTrue(a_user.enabled) self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID) a_user = session.query(user_table).filter("id='badguy'").one() self.assertEqual(a_user.name, default_fixtures.USERS[2]['name']) self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID) project_table = sqlalchemy.Table("project", self.metadata, autoload=True) a_project = session.query(project_table).filter("id='baz'").one() self.assertEqual(a_project.description, default_fixtures.TENANTS[1]['description']) self.assertEqual(a_project.domain_id, DEFAULT_DOMAIN_ID) session.commit() session.close() self.check_uniqueness_constraints() def test_downgrade_16_to_14(self): self.upgrade(16) self.populate_user_table(with_pass_enab_domain=True) self.populate_tenant_table(with_desc_enab_domain=True) self.downgrade(14) self.assertTableColumns("user", ["id", "name", "extra", "password", "enabled"]) session = self.Session() user_table = sqlalchemy.Table("user", self.metadata, autoload=True) a_user = session.query(user_table).filter("id='foo'").one() self.assertTrue(a_user.enabled) a_user = session.query(user_table).filter("id='badguy'").one() self.assertEqual(a_user.name, default_fixtures.USERS[2]['name']) tenant_table = sqlalchemy.Table("tenant", self.metadata, autoload=True) a_tenant = session.query(tenant_table).filter("id='baz'").one() self.assertEqual(a_tenant.description, default_fixtures.TENANTS[1]['description']) session.commit() session.close() def test_downgrade_remove_group_tables(self): self.upgrade(14) self.downgrade(13) self.assertTableDoesNotExist('group') self.assertTableDoesNotExist('group_project_metadata') self.assertTableDoesNotExist('group_domain_metadata') self.assertTableDoesNotExist('user_group_membership') def test_downgrade_endpoints(self): self.upgrade(13) service_extra = { 'name': uuid.uuid4().hex, } service = { 'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex, 'extra': json.dumps(service_extra), } common_endpoint_attrs = { 'legacy_endpoint_id': uuid.uuid4().hex, 'region': uuid.uuid4().hex, 'service_id': service['id'], 'extra': json.dumps({}), } endpoints = { 'public': { 'id': uuid.uuid4().hex, 'interface': 'public', 'url': uuid.uuid4().hex, }, 'internal': { 'id': uuid.uuid4().hex, 'interface': 'internal', 'url': uuid.uuid4().hex, }, 'admin': { 'id': uuid.uuid4().hex, 'interface': 'admin', 'url': uuid.uuid4().hex, }, } session = self.Session() self.insert_dict(session, 'service', service) for endpoint in endpoints.values(): endpoint.update(common_endpoint_attrs) self.insert_dict(session, 'endpoint', endpoint) session.commit() session.close() self.downgrade(9) self.assertTableColumns( 'service', ['id', 'type', 'extra']) self.assertTableColumns( 'endpoint', ['id', 'region', 'service_id', 'extra']) endpoint_table = sqlalchemy.Table( 'endpoint', self.metadata, autoload=True) session = self.Session() self.assertEqual(session.query(endpoint_table).count(), 1) q = session.query(endpoint_table) q = q.filter_by(id=common_endpoint_attrs['legacy_endpoint_id']) ref = q.one() self.assertEqual(ref.id, common_endpoint_attrs['legacy_endpoint_id']) self.assertEqual(ref.region, endpoint['region']) self.assertEqual(ref.service_id, endpoint['service_id']) extra = json.loads(ref.extra) for interface in ['public', 'internal', 'admin']: expected_url = endpoints[interface]['url'] self.assertEqual(extra['%surl' % interface], expected_url) session.commit() session.close() def insert_dict(self, session, table_name, d): """Naively inserts key-value pairs into a table, given a dictionary.""" this_table = sqlalchemy.Table(table_name, self.metadata, autoload=True) insert = this_table.insert() insert.execute(d) session.commit() def test_downgrade_to_0(self): self.upgrade(self.max_version) if self.engine.name == 'mysql': self._mysql_check_all_tables_innodb() self.downgrade(0) for table_name in ["user", "token", "role", "user_tenant_membership", "metadata"]: self.assertTableDoesNotExist(table_name) def test_upgrade_add_domain_tables(self): self.upgrade(6) self.assertTableDoesNotExist('credential') self.assertTableDoesNotExist('domain') self.assertTableDoesNotExist('user_domain_metadata') self.upgrade(7) self.assertTableExists('credential') self.assertTableColumns('credential', ['id', 'user_id', 'project_id', 'blob', 'type', 'extra']) self.assertTableExists('domain') self.assertTableColumns('domain', ['id', 'name', 'enabled', 'extra']) self.assertTableExists('user_domain_metadata') self.assertTableColumns('user_domain_metadata', ['user_id', 'domain_id', 'data']) def test_metadata_table_migration(self): # Scaffolding session = self.Session() self.upgrade(16) domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) user_table = sqlalchemy.Table('user', self.metadata, autoload=True) role_table = sqlalchemy.Table('role', self.metadata, autoload=True) project_table = sqlalchemy.Table( 'project', self.metadata, autoload=True) metadata_table = sqlalchemy.Table( 'metadata', self.metadata, autoload=True) # Create a Domain domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True} session.execute(domain_table.insert().values(domain)) # Create a Project project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'extra': "{}"} session.execute(project_table.insert().values(project)) # Create another Project project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'extra': "{}"} session.execute(project_table.insert().values(project2)) # Create a User user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'password': uuid.uuid4().hex, 'enabled': True, 'extra': json.dumps({})} session.execute(user_table.insert().values(user)) # Create a Role role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} session.execute(role_table.insert().values(role)) # And another role role2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} session.execute(role_table.insert().values(role2)) # Grant Role to User role_grant = {'user_id': user['id'], 'tenant_id': project['id'], 'data': json.dumps({"roles": [role['id']]})} session.execute(metadata_table.insert().values(role_grant)) role_grant = {'user_id': user['id'], 'tenant_id': project2['id'], 'data': json.dumps({"roles": [role2['id']]})} session.execute(metadata_table.insert().values(role_grant)) # Create another user to test the case where member_role_id is already # assigned. user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'password': uuid.uuid4().hex, 'enabled': True, 'extra': json.dumps({})} session.execute(user_table.insert().values(user2)) # Grant CONF.member_role_id to User2 role_grant = {'user_id': user2['id'], 'tenant_id': project['id'], 'data': json.dumps({"roles": [CONF.member_role_id]})} session.execute(metadata_table.insert().values(role_grant)) session.commit() self.upgrade(17) user_project_metadata_table = sqlalchemy.Table( 'user_project_metadata', self.metadata, autoload=True) s = sqlalchemy.select([metadata_table.c.data]).where( (metadata_table.c.user_id == user['id']) & (metadata_table.c.tenant_id == project['id'])) r = session.execute(s) test_project1 = json.loads(r.fetchone()['data']) self.assertEqual(len(test_project1['roles']), 1) self.assertIn(role['id'], test_project1['roles']) # Test user in project2 has role2 s = sqlalchemy.select([metadata_table.c.data]).where( (metadata_table.c.user_id == user['id']) & (metadata_table.c.tenant_id == project2['id'])) r = session.execute(s) test_project2 = json.loads(r.fetchone()['data']) self.assertEqual(len(test_project2['roles']), 1) self.assertIn(role2['id'], test_project2['roles']) # Test for user in project has role in user_project_metadata # Migration 17 does not properly migrate this data, so this should # be None. s = sqlalchemy.select([user_project_metadata_table.c.data]).where( (user_project_metadata_table.c.user_id == user['id']) & (user_project_metadata_table.c.project_id == project['id'])) r = session.execute(s) self.assertIsNone(r.fetchone()) # Create a conflicting user-project in user_project_metadata with # a different role data = json.dumps({"roles": [role2['id']]}) role_grant = {'user_id': user['id'], 'project_id': project['id'], 'data': data} cmd = user_project_metadata_table.insert().values(role_grant) self.engine.execute(cmd) # Create another conflicting user-project for User2 data = json.dumps({"roles": [role2['id']]}) role_grant = {'user_id': user2['id'], 'project_id': project['id'], 'data': data} cmd = user_project_metadata_table.insert().values(role_grant) self.engine.execute(cmd) # End Scaffolding session.commit() # Migrate to 20 self.upgrade(20) # The user-project pairs should have all roles from the previous # metadata table in addition to any roles currently in # user_project_metadata s = sqlalchemy.select([user_project_metadata_table.c.data]).where( (user_project_metadata_table.c.user_id == user['id']) & (user_project_metadata_table.c.project_id == project['id'])) r = session.execute(s) role_ids = json.loads(r.fetchone()['data'])['roles'] self.assertEqual(len(role_ids), 3) self.assertIn(CONF.member_role_id, role_ids) self.assertIn(role['id'], role_ids) self.assertIn(role2['id'], role_ids) # pairs that only existed in old metadata table should be in # user_project_metadata s = sqlalchemy.select([user_project_metadata_table.c.data]).where( (user_project_metadata_table.c.user_id == user['id']) & (user_project_metadata_table.c.project_id == project2['id'])) r = session.execute(s) role_ids = json.loads(r.fetchone()['data'])['roles'] self.assertEqual(len(role_ids), 2) self.assertIn(CONF.member_role_id, role_ids) self.assertIn(role2['id'], role_ids) self.assertTableDoesNotExist('metadata') def test_upgrade_default_roles(self): def count_member_roles(): session = self.Session() query_string = ("select count(*) as c from role " "where name='%s'" % config.CONF.member_role_name) role_count = session.execute(query_string).fetchone()['c'] session.close() return role_count self.upgrade(16) self.assertEquals(0, count_member_roles()) self.upgrade(17) self.assertEquals(1, count_member_roles()) self.downgrade(16) self.assertEquals(0, count_member_roles()) def check_uniqueness_constraints(self): # Check uniqueness constraints for User & Project tables are # correct following schema modification. The Group table's # schema is never modified, so we don't bother to check that. domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True} domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True} cmd = domain_table.insert().values(domain1) self.engine.execute(cmd) cmd = domain_table.insert().values(domain2) self.engine.execute(cmd) # First, the User table. this_table = sqlalchemy.Table('user', self.metadata, autoload=True) user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, 'enabled': True, 'extra': json.dumps({})} cmd = this_table.insert().values(user) self.engine.execute(cmd) # now insert a user with the same name into a different # domain - which should work. user['id'] = uuid.uuid4().hex user['domain_id'] = domain2['id'] cmd = this_table.insert().values(user) self.engine.execute(cmd) # TODO(henry-nash): For now, as part of clean-up we delete one of these # users. Although not part of this test, unless we do so the # downgrade(16->15) that is part of teardown with fail due to having # two uses with clashing name as we try to revert to a single global # name space. This limitation is raised as Bug #1125046 and the delete # could be removed depending on how that bug is resolved. cmd = this_table.delete(id=user['id']) self.engine.execute(cmd) # Now, the Project table. this_table = sqlalchemy.Table('project', self.metadata, autoload=True) project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'description': uuid.uuid4().hex, 'enabled': True, 'extra': json.dumps({})} cmd = this_table.insert().values(project) self.engine.execute(cmd) # now insert a project with the same name into a different # domain - which should work. project['id'] = uuid.uuid4().hex project['domain_id'] = domain2['id'] cmd = this_table.insert().values(project) self.engine.execute(cmd) # TODO(henry-nash): For now, we delete one of the projects for the same # reason as we delete one of the users (Bug #1125046). This delete # could be removed depending on that bug resolution. cmd = this_table.delete(id=project['id']) self.engine.execute(cmd) def test_upgrade_trusts(self): self.assertEqual(self.schema.version, 0, "DB is at version 0") self.upgrade(20) self.assertTableColumns("token", ["id", "expires", "extra", "valid"]) self.upgrade(21) self.assertTableColumns("trust", ["id", "trustor_user_id", "trustee_user_id", "project_id", "impersonation", "deleted_at", "expires_at", "extra"]) self.assertTableColumns("trust_role", ["trust_id", "role_id"]) self.assertTableColumns("token", ["id", "expires", "extra", "valid", "trust_id", "user_id"]) def test_fixup_role(self): session = self.Session() self.assertEqual(self.schema.version, 0, "DB is at version 0") self.upgrade(1) self.insert_dict(session, "role", {"id": "test", "name": "test"}) self.upgrade(18) self.insert_dict(session, "role", {"id": "test2", "name": "test2", "extra": None}) r = session.execute('select count(*) as c from role ' 'where extra is null') self.assertEqual(r.fetchone()['c'], 2) session.commit() self.upgrade(19) r = session.execute('select count(*) as c from role ' 'where extra is null') self.assertEqual(r.fetchone()['c'], 0) def test_legacy_endpoint_id(self): session = self.Session() self.upgrade(21) service = { 'id': uuid.uuid4().hex, 'name': 'keystone', 'type': 'identity'} self.insert_dict(session, 'service', service) legacy_endpoint_id = uuid.uuid4().hex endpoint = { 'id': uuid.uuid4().hex, 'service_id': service['id'], 'interface': uuid.uuid4().hex[:8], 'url': uuid.uuid4().hex, 'extra': json.dumps({ 'legacy_endpoint_id': legacy_endpoint_id})} self.insert_dict(session, 'endpoint', endpoint) session.commit() self.upgrade(22) endpoint_table = sqlalchemy.Table( 'endpoint', self.metadata, autoload=True) self.assertEqual(session.query(endpoint_table).count(), 1) ref = session.query(endpoint_table).one() self.assertEqual(ref.id, endpoint['id'], ref) self.assertEqual(ref.service_id, endpoint['service_id']) self.assertEqual(ref.interface, endpoint['interface']) self.assertEqual(ref.url, endpoint['url']) self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id) self.assertEqual(ref.extra, '{}') def test_group_project_FK_fixup(self): # To create test data we must start before we broke in the # group_project_metadata table in 015. self.upgrade(14) session = self.Session() domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) group_table = sqlalchemy.Table('group', self.metadata, autoload=True) tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True) role_table = sqlalchemy.Table('role', self.metadata, autoload=True) group_project_metadata_table = sqlalchemy.Table( 'group_project_metadata', self.metadata, autoload=True) # Create a Domain domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True} session.execute(domain_table.insert().values(domain)) # Create two Tenants tenant = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'extra': "{}"} session.execute(tenant_table.insert().values(tenant)) tenant1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'extra': "{}"} session.execute(tenant_table.insert().values(tenant1)) # Create a Group group = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'extra': json.dumps({})} session.execute(group_table.insert().values(group)) # Create roles role_list = [] for _ in range(2): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} session.execute(role_table.insert().values(role)) role_list.append(role) # Grant Role to User on Project role_grant = {'group_id': group['id'], 'project_id': tenant['id'], 'data': json.dumps({'roles': [role_list[0]['id']]})} session.execute( group_project_metadata_table.insert().values(role_grant)) role_grant = {'group_id': group['id'], 'project_id': tenant1['id'], 'data': json.dumps({'roles': [role_list[1]['id']]})} session.execute( group_project_metadata_table.insert().values(role_grant)) session.commit() # Now upgrade and fix up the FKs self.upgrade(28) self.assertTableExists('group_project_metadata') self.assertTableExists('project') self.assertTableDoesNotExist('tenant') s = sqlalchemy.select([group_project_metadata_table.c.data]).where( (group_project_metadata_table.c.group_id == group['id']) & (group_project_metadata_table.c.project_id == tenant['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[0]['id'], data['roles']) s = sqlalchemy.select([group_project_metadata_table.c.data]).where( (group_project_metadata_table.c.group_id == group['id']) & (group_project_metadata_table.c.project_id == tenant1['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[1]['id'], data['roles']) self.downgrade(27) self.assertTableExists('group_project_metadata') self.assertTableExists('project') self.assertTableDoesNotExist('tenant') def test_assignment_metadata_migration(self): self.upgrade(28) # Scaffolding session = self.Session() domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True) user_table = sqlalchemy.Table('user', self.metadata, autoload=True) group_table = sqlalchemy.Table('group', self.metadata, autoload=True) role_table = sqlalchemy.Table('role', self.metadata, autoload=True) project_table = sqlalchemy.Table( 'project', self.metadata, autoload=True) user_project_metadata_table = sqlalchemy.Table( 'user_project_metadata', self.metadata, autoload=True) user_domain_metadata_table = sqlalchemy.Table( 'user_domain_metadata', self.metadata, autoload=True) group_project_metadata_table = sqlalchemy.Table( 'group_project_metadata', self.metadata, autoload=True) group_domain_metadata_table = sqlalchemy.Table( 'group_domain_metadata', self.metadata, autoload=True) # Create a Domain domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True} session.execute(domain_table.insert().values(domain)) # Create anther Domain domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True} session.execute(domain_table.insert().values(domain2)) # Create a Project project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'extra': "{}"} session.execute(project_table.insert().values(project)) # Create another Project project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'extra': "{}"} session.execute(project_table.insert().values(project2)) # Create a User user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'password': uuid.uuid4().hex, 'enabled': True, 'extra': json.dumps({})} session.execute(user_table.insert().values(user)) # Create a Group group = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain['id'], 'extra': json.dumps({})} session.execute(group_table.insert().values(group)) # Create roles role_list = [] for _ in range(7): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} session.execute(role_table.insert().values(role)) role_list.append(role) # Grant Role to User on Project role_grant = {'user_id': user['id'], 'project_id': project['id'], 'data': json.dumps({'roles': [role_list[0]['id']]})} session.execute( user_project_metadata_table.insert().values(role_grant)) role_grant = {'user_id': user['id'], 'project_id': project2['id'], 'data': json.dumps({'roles': [role_list[1]['id']]})} session.execute( user_project_metadata_table.insert().values(role_grant)) # Grant Role to Group on different Project role_grant = {'group_id': group['id'], 'project_id': project2['id'], 'data': json.dumps({'roles': [role_list[2]['id']]})} session.execute( group_project_metadata_table.insert().values(role_grant)) # Grant Role to User on Domain role_grant = {'user_id': user['id'], 'domain_id': domain['id'], 'data': json.dumps({'roles': [role_list[3]['id']]})} session.execute(user_domain_metadata_table.insert().values(role_grant)) # Grant Role to Group on Domain role_grant = {'group_id': group['id'], 'domain_id': domain['id'], 'data': json.dumps( {'roles': [role_list[4]['id']], 'other': 'somedata'})} session.execute( group_domain_metadata_table.insert().values(role_grant)) session.commit() self.upgrade(29) s = sqlalchemy.select([user_project_metadata_table.c.data]).where( (user_project_metadata_table.c.user_id == user['id']) & (user_project_metadata_table.c.project_id == project['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn({'id': role_list[0]['id']}, data['roles']) s = sqlalchemy.select([user_project_metadata_table.c.data]).where( (user_project_metadata_table.c.user_id == user['id']) & (user_project_metadata_table.c.project_id == project2['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn({'id': role_list[1]['id']}, data['roles']) s = sqlalchemy.select([group_project_metadata_table.c.data]).where( (group_project_metadata_table.c.group_id == group['id']) & (group_project_metadata_table.c.project_id == project2['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn({'id': role_list[2]['id']}, data['roles']) s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( (user_domain_metadata_table.c.user_id == user['id']) & (user_domain_metadata_table.c.domain_id == domain['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn({'id': role_list[3]['id']}, data['roles']) s = sqlalchemy.select([group_domain_metadata_table.c.data]).where( (group_domain_metadata_table.c.group_id == group['id']) & (group_domain_metadata_table.c.domain_id == domain['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn({'id': role_list[4]['id']}, data['roles']) self.assertIn('other', data) # Now add an entry that has one regular and one inherited role role_grant = {'user_id': user['id'], 'domain_id': domain2['id'], 'data': json.dumps( {'roles': [{'id': role_list[5]['id']}, {'id': role_list[6]['id'], 'inherited_to': 'projects'}]})} session.execute(user_domain_metadata_table.insert().values(role_grant)) session.commit() self.downgrade(28) s = sqlalchemy.select([user_project_metadata_table.c.data]).where( (user_project_metadata_table.c.user_id == user['id']) & (user_project_metadata_table.c.project_id == project['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[0]['id'], data['roles']) s = sqlalchemy.select([user_project_metadata_table.c.data]).where( (user_project_metadata_table.c.user_id == user['id']) & (user_project_metadata_table.c.project_id == project2['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[1]['id'], data['roles']) s = sqlalchemy.select([group_project_metadata_table.c.data]).where( (group_project_metadata_table.c.group_id == group['id']) & (group_project_metadata_table.c.project_id == project2['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[2]['id'], data['roles']) s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( (user_domain_metadata_table.c.user_id == user['id']) & (user_domain_metadata_table.c.domain_id == domain['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[3]['id'], data['roles']) s = sqlalchemy.select([group_domain_metadata_table.c.data]).where( (group_domain_metadata_table.c.group_id == group['id']) & (group_domain_metadata_table.c.domain_id == domain['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[4]['id'], data['roles']) self.assertIn('other', data) # For user-domain2, where we had one regular and one inherited role, # only the direct role should remain, the inherited role should # have been deleted during the downgrade s = sqlalchemy.select([user_domain_metadata_table.c.data]).where( (user_domain_metadata_table.c.user_id == user['id']) & (user_domain_metadata_table.c.domain_id == domain2['id'])) r = session.execute(s) data = json.loads(r.fetchone()['data']) self.assertEqual(len(data['roles']), 1) self.assertIn(role_list[5]['id'], data['roles']) def test_drop_credential_constraint(self): ec2_credential = { 'id': '100', 'user_id': 'foo', 'project_id': 'bar', 'type': 'ec2', 'blob': json.dumps({ "access": "12345", "secret": "12345" }) } user = { 'id': 'foo', 'name': 'FOO', 'password': 'foo2', 'enabled': True, 'email': 'foo@bar.com', 'extra': json.dumps({'enabled': True}) } tenant = { 'id': 'bar', 'name': 'BAR', 'description': 'description', 'enabled': True, 'extra': json.dumps({'enabled': True}) } session = self.Session() self.upgrade(7) self.insert_dict(session, 'user', user) self.insert_dict(session, 'tenant', tenant) self.insert_dict(session, 'credential', ec2_credential) session.commit() self.upgrade(30) cred_table = sqlalchemy.Table('credential', self.metadata, autoload=True) cred = session.query(cred_table).filter("id='100'").one() self.assertEqual(cred.user_id, ec2_credential['user_id']) def test_drop_credential_indexes(self): self.upgrade(31) table = sqlalchemy.Table('credential', self.metadata, autoload=True) self.assertEqual(len(table.indexes), 0) def test_downgrade_30(self): self.upgrade(31) self.downgrade(30) table = sqlalchemy.Table('credential', self.metadata, autoload=True) index_data = [(idx.name, idx.columns.keys()) for idx in table.indexes] if self.engine.name == 'mysql': self.assertIn(('user_id', ['user_id']), index_data) self.assertIn(('credential_project_id_fkey', ['project_id']), index_data) else: self.assertEqual(len(index_data), 0) def populate_user_table(self, with_pass_enab=False, with_pass_enab_domain=False): # Populate the appropriate fields in the user # table, depending on the parameters: # # Default: id, name, extra # pass_enab: Add password, enabled as well # pass_enab_domain: Add password, enabled and domain as well # this_table = sqlalchemy.Table("user", self.metadata, autoload=True) for user in default_fixtures.USERS: extra = copy.deepcopy(user) extra.pop('id') extra.pop('name') if with_pass_enab: password = extra.pop('password', None) enabled = extra.pop('enabled', True) ins = this_table.insert().values( {'id': user['id'], 'name': user['name'], 'password': password, 'enabled': bool(enabled), 'extra': json.dumps(extra)}) else: if with_pass_enab_domain: password = extra.pop('password', None) enabled = extra.pop('enabled', True) extra.pop('domain_id') ins = this_table.insert().values( {'id': user['id'], 'name': user['name'], 'domain_id': user['domain_id'], 'password': password, 'enabled': bool(enabled), 'extra': json.dumps(extra)}) else: ins = this_table.insert().values( {'id': user['id'], 'name': user['name'], 'extra': json.dumps(extra)}) self.engine.execute(ins) def populate_tenant_table(self, with_desc_enab=False, with_desc_enab_domain=False): # Populate the appropriate fields in the tenant or # project table, depending on the parameters # # Default: id, name, extra # desc_enab: Add description, enabled as well # desc_enab_domain: Add description, enabled and domain as well, # plus use project instead of tenant # if with_desc_enab_domain: # By this time tenants are now projects this_table = sqlalchemy.Table("project", self.metadata, autoload=True) else: this_table = sqlalchemy.Table("tenant", self.metadata, autoload=True) for tenant in default_fixtures.TENANTS: extra = copy.deepcopy(tenant) extra.pop('id') extra.pop('name') if with_desc_enab: desc = extra.pop('description', None) enabled = extra.pop('enabled', True) ins = this_table.insert().values( {'id': tenant['id'], 'name': tenant['name'], 'description': desc, 'enabled': bool(enabled), 'extra': json.dumps(extra)}) else: if with_desc_enab_domain: desc = extra.pop('description', None) enabled = extra.pop('enabled', True) extra.pop('domain_id') ins = this_table.insert().values( {'id': tenant['id'], 'name': tenant['name'], 'domain_id': tenant['domain_id'], 'description': desc, 'enabled': bool(enabled), 'extra': json.dumps(extra)}) else: ins = this_table.insert().values( {'id': tenant['id'], 'name': tenant['name'], 'extra': json.dumps(extra)}) self.engine.execute(ins) def _mysql_check_all_tables_innodb(self): database = self.engine.url.database connection = self.engine.connect() # sanity check total = connection.execute("SELECT count(*) " "from information_schema.TABLES " "where TABLE_SCHEMA='%(database)s'" % locals()) self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?") noninnodb = connection.execute("SELECT table_name " "from information_schema.TABLES " "where TABLE_SCHEMA='%(database)s' " "and ENGINE!='InnoDB' " "and TABLE_NAME!='migrate_version'" % locals()) names = [x[0] for x in noninnodb] self.assertEqual(names, [], "Non-InnoDB tables exist") connection.close()