summaryrefslogtreecommitdiffstats
path: root/keystone/tests/test_sql_upgrade.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/tests/test_sql_upgrade.py')
-rw-r--r--keystone/tests/test_sql_upgrade.py1378
1 files changed, 1378 insertions, 0 deletions
diff --git a/keystone/tests/test_sql_upgrade.py b/keystone/tests/test_sql_upgrade.py
new file mode 100644
index 00000000..e904d6a7
--- /dev/null
+++ b/keystone/tests/test_sql_upgrade.py
@@ -0,0 +1,1378 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+To run these tests against a live database:
+1. Modify the file `keystone/tests/backend_sql.conf` to use the connection for
+ your live database
+2. Set up a blank, live database.
+3. run the tests using
+ ./run_tests.sh -N test_sql_upgrade
+ WARNING::
+ Your database will be wiped.
+ Do not do this against a Database with valuable data as
+ all data will be lost.
+"""
+import copy
+import json
+import uuid
+
+from migrate.versioning import api as versioning_api
+import sqlalchemy
+
+from keystone.tests import core as test
+
+from keystone.common import sql
+from keystone.common.sql import migration
+from keystone import config
+
+import default_fixtures
+
+
+CONF = config.CONF
+DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
+
+
+class SqlMigrateBase(test.TestCase):
+ def initialize_sql(self):
+ self.metadata = sqlalchemy.MetaData()
+ self.metadata.bind = self.engine
+
+ _config_file_list = [test.etcdir('keystone.conf.sample'),
+ test.testsdir('test_overrides.conf'),
+ test.testsdir('backend_sql.conf')]
+
+ #override this to specify the complete list of configuration files
+ def config_files(self):
+ return self._config_file_list
+
+ def repo_package(self):
+ return None
+
+ def setUp(self):
+ super(SqlMigrateBase, self).setUp()
+
+ self.config(self.config_files())
+ self.base = sql.Base()
+
+ # create and share a single sqlalchemy engine for testing
+ self.engine = self.base.get_engine(allow_global_engine=False)
+ self.Session = self.base.get_sessionmaker(engine=self.engine,
+ autocommit=False)
+
+ self.initialize_sql()
+ self.repo_path = migration.find_migrate_repo(self.repo_package())
+ self.schema = versioning_api.ControlledSchema.create(
+ self.engine,
+ self.repo_path, 0)
+
+ # auto-detect the highest available schema version in the migrate_repo
+ self.max_version = self.schema.repository.version().version
+
+ def tearDown(self):
+ sqlalchemy.orm.session.Session.close_all()
+ table = sqlalchemy.Table("migrate_version", self.metadata,
+ autoload=True)
+ self.downgrade(0)
+ table.drop(self.engine, checkfirst=True)
+ super(SqlMigrateBase, self).tearDown()
+
+ def select_table(self, name):
+ table = sqlalchemy.Table(name,
+ self.metadata,
+ autoload=True)
+ s = sqlalchemy.select([table])
+ return s
+
+ def assertTableExists(self, table_name):
+ try:
+ self.select_table(table_name)
+ except sqlalchemy.exc.NoSuchTableError:
+ raise AssertionError('Table "%s" does not exist' % table_name)
+
+ def assertTableDoesNotExist(self, table_name):
+ """Asserts that a given table exists cannot be selected by name."""
+ # Switch to a different metadata otherwise you might still
+ # detect renamed or dropped tables
+ try:
+ temp_metadata = sqlalchemy.MetaData()
+ temp_metadata.bind = self.engine
+ sqlalchemy.Table(table_name, temp_metadata, autoload=True)
+ except sqlalchemy.exc.NoSuchTableError:
+ pass
+ else:
+ raise AssertionError('Table "%s" already exists' % table_name)
+
+ def upgrade(self, *args, **kwargs):
+ self._migrate(*args, **kwargs)
+
+ def downgrade(self, *args, **kwargs):
+ self._migrate(*args, downgrade=True, **kwargs)
+
+ def _migrate(self, version, repository=None, downgrade=False,
+ current_schema=None):
+ repository = repository or self.repo_path
+ err = ''
+ version = versioning_api._migrate_version(self.schema,
+ version,
+ not downgrade,
+ err)
+ if not current_schema:
+ current_schema = self.schema
+ changeset = current_schema.changeset(version)
+ for ver, change in changeset:
+ self.schema.runchange(ver, change, changeset.step)
+ self.assertEqual(self.schema.version, version)
+
+ def assertTableColumns(self, table_name, expected_cols):
+ """Asserts that the table contains the expected set of columns."""
+ self.initialize_sql()
+ table = self.select_table(table_name)
+ actual_cols = [col.name for col in table.columns]
+ self.assertEqual(expected_cols, actual_cols, '%s table' % table_name)
+
+
+class SqlUpgradeTests(SqlMigrateBase):
+
+ def test_blank_db_to_start(self):
+ self.assertTableDoesNotExist('user')
+
+ def test_start_version_0(self):
+ version = migration.db_version()
+ self.assertEqual(version, 0, "DB is at version 0")
+
+ def test_two_steps_forward_one_step_back(self):
+ """You should be able to cleanly undo and re-apply all upgrades.
+
+ Upgrades are run in the following order::
+
+ 0 -> 1 -> 0 -> 1 -> 2 -> 1 -> 2 -> 3 -> 2 -> 3 ...
+ ^---------^ ^---------^ ^---------^
+
+ """
+ for x in range(1, self.max_version + 1):
+ self.upgrade(x)
+ self.downgrade(x - 1)
+ self.upgrade(x)
+
+ def test_upgrade_add_initial_tables(self):
+ self.upgrade(1)
+ self.assertTableColumns("user", ["id", "name", "extra"])
+ self.assertTableColumns("tenant", ["id", "name", "extra"])
+ self.assertTableColumns("role", ["id", "name"])
+ self.assertTableColumns("user_tenant_membership",
+ ["user_id", "tenant_id"])
+ self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"])
+ self.populate_user_table()
+
+ def test_upgrade_add_policy(self):
+ self.upgrade(5)
+ self.assertTableDoesNotExist('policy')
+
+ self.upgrade(6)
+ self.assertTableExists('policy')
+ self.assertTableColumns('policy', ['id', 'type', 'blob', 'extra'])
+
+ def test_upgrade_normalize_identity(self):
+ self.upgrade(8)
+ self.populate_user_table()
+ self.populate_tenant_table()
+ self.upgrade(10)
+ self.assertTableColumns("user",
+ ["id", "name", "extra",
+ "password", "enabled"])
+ self.assertTableColumns("tenant",
+ ["id", "name", "extra", "description",
+ "enabled"])
+ self.assertTableColumns("role", ["id", "name", "extra"])
+ self.assertTableColumns("user_tenant_membership",
+ ["user_id", "tenant_id"])
+ self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"])
+ session = self.Session()
+ user_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ a_user = session.query(user_table).filter("id='foo'").one()
+ self.assertTrue(a_user.enabled)
+ a_user = session.query(user_table).filter("id='badguy'").one()
+ self.assertFalse(a_user.enabled)
+ tenant_table = sqlalchemy.Table("tenant",
+ self.metadata,
+ autoload=True)
+ a_tenant = session.query(tenant_table).filter("id='baz'").one()
+ self.assertEqual(a_tenant.description, 'description')
+ session.commit()
+ session.close()
+
+ def test_normalized_enabled_states(self):
+ self.upgrade(8)
+
+ users = {
+ 'bool_enabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({'enabled': True})},
+ 'bool_disabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({'enabled': False})},
+ 'str_enabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({'enabled': 'True'})},
+ 'str_disabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({'enabled': 'False'})},
+ 'int_enabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({'enabled': 1})},
+ 'int_disabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({'enabled': 0})},
+ 'null_enabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({'enabled': None})},
+ 'unset_enabled_user': {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'extra': json.dumps({})}}
+
+ session = self.Session()
+ for user in users.values():
+ self.insert_dict(session, 'user', user)
+ session.commit()
+
+ self.upgrade(10)
+
+ user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
+ q = session.query(user_table, 'enabled')
+
+ user = q.filter_by(id=users['bool_enabled_user']['id']).one()
+ self.assertTrue(user.enabled)
+
+ user = q.filter_by(id=users['bool_disabled_user']['id']).one()
+ self.assertFalse(user.enabled)
+
+ user = q.filter_by(id=users['str_enabled_user']['id']).one()
+ self.assertTrue(user.enabled)
+
+ user = q.filter_by(id=users['str_disabled_user']['id']).one()
+ self.assertFalse(user.enabled)
+
+ user = q.filter_by(id=users['int_enabled_user']['id']).one()
+ self.assertTrue(user.enabled)
+
+ user = q.filter_by(id=users['int_disabled_user']['id']).one()
+ self.assertFalse(user.enabled)
+
+ user = q.filter_by(id=users['null_enabled_user']['id']).one()
+ self.assertTrue(user.enabled)
+
+ user = q.filter_by(id=users['unset_enabled_user']['id']).one()
+ self.assertTrue(user.enabled)
+
+ def test_downgrade_10_to_8(self):
+ self.upgrade(10)
+ self.populate_user_table(with_pass_enab=True)
+ self.populate_tenant_table(with_desc_enab=True)
+ self.downgrade(8)
+ self.assertTableColumns('user',
+ ['id', 'name', 'extra'])
+ self.assertTableColumns('tenant',
+ ['id', 'name', 'extra'])
+ session = self.Session()
+ user_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ a_user = session.query(user_table).filter("id='badguy'").one()
+ self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])
+ tenant_table = sqlalchemy.Table("tenant",
+ self.metadata,
+ autoload=True)
+ a_tenant = session.query(tenant_table).filter("id='baz'").one()
+ self.assertEqual(a_tenant.name, default_fixtures.TENANTS[1]['name'])
+ session.commit()
+ session.close()
+
+ def test_upgrade_endpoints(self):
+ self.upgrade(10)
+ service_extra = {
+ 'name': uuid.uuid4().hex,
+ }
+ service = {
+ 'id': uuid.uuid4().hex,
+ 'type': uuid.uuid4().hex,
+ 'extra': json.dumps(service_extra),
+ }
+ endpoint_extra = {
+ 'publicurl': uuid.uuid4().hex,
+ 'internalurl': uuid.uuid4().hex,
+ 'adminurl': uuid.uuid4().hex,
+ }
+ endpoint = {
+ 'id': uuid.uuid4().hex,
+ 'region': uuid.uuid4().hex,
+ 'service_id': service['id'],
+ 'extra': json.dumps(endpoint_extra),
+ }
+
+ session = self.Session()
+ self.insert_dict(session, 'service', service)
+ self.insert_dict(session, 'endpoint', endpoint)
+ session.commit()
+ session.close()
+
+ self.upgrade(13)
+ self.assertTableColumns(
+ 'service',
+ ['id', 'type', 'extra'])
+ self.assertTableColumns(
+ 'endpoint',
+ ['id', 'legacy_endpoint_id', 'interface', 'region', 'service_id',
+ 'url', 'extra'])
+
+ endpoint_table = sqlalchemy.Table(
+ 'endpoint', self.metadata, autoload=True)
+
+ session = self.Session()
+ self.assertEqual(session.query(endpoint_table).count(), 3)
+ for interface in ['public', 'internal', 'admin']:
+ q = session.query(endpoint_table)
+ q = q.filter_by(legacy_endpoint_id=endpoint['id'])
+ q = q.filter_by(interface=interface)
+ ref = q.one()
+ self.assertNotEqual(ref.id, endpoint['id'])
+ self.assertEqual(ref.legacy_endpoint_id, endpoint['id'])
+ self.assertEqual(ref.interface, interface)
+ self.assertEqual(ref.region, endpoint['region'])
+ self.assertEqual(ref.service_id, endpoint['service_id'])
+ self.assertEqual(ref.url, endpoint_extra['%surl' % interface])
+ self.assertEqual(ref.extra, '{}')
+ session.commit()
+ session.close()
+
+ def assertTenantTables(self):
+ self.assertTableExists('tenant')
+ self.assertTableExists('user_tenant_membership')
+ self.assertTableDoesNotExist('project')
+ self.assertTableDoesNotExist('user_project_membership')
+
+ def assertProjectTables(self):
+ self.assertTableExists('project')
+ self.assertTableExists('user_project_membership')
+ self.assertTableDoesNotExist('tenant')
+ self.assertTableDoesNotExist('user_tenant_membership')
+
+ def test_upgrade_tenant_to_project(self):
+ self.upgrade(14)
+ self.assertTenantTables()
+ self.upgrade(15)
+ self.assertProjectTables()
+
+ def test_downgrade_project_to_tenant(self):
+ # TODO(henry-nash): Debug why we need to re-load the tenant
+ # or user_tenant_membership ahead of upgrading to project
+ # in order for the assertProjectTables to work on sqlite
+ # (MySQL is fine without it)
+ self.upgrade(14)
+ self.assertTenantTables()
+ self.upgrade(15)
+ self.assertProjectTables()
+ self.downgrade(14)
+ self.assertTenantTables()
+
+ def test_upgrade_add_group_tables(self):
+ self.upgrade(13)
+ self.upgrade(14)
+ self.assertTableExists('group')
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('group_domain_metadata')
+ self.assertTableExists('user_group_membership')
+
+ def test_upgrade_14_to_16(self):
+ self.upgrade(14)
+ self.populate_user_table(with_pass_enab=True)
+ self.populate_tenant_table(with_desc_enab=True)
+ self.upgrade(16)
+
+ self.assertTableColumns("user",
+ ["id", "name", "extra",
+ "password", "enabled", "domain_id"])
+ session = self.Session()
+ user_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ a_user = session.query(user_table).filter("id='foo'").one()
+ self.assertTrue(a_user.enabled)
+ self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID)
+ a_user = session.query(user_table).filter("id='badguy'").one()
+ self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])
+ self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID)
+ project_table = sqlalchemy.Table("project",
+ self.metadata,
+ autoload=True)
+ a_project = session.query(project_table).filter("id='baz'").one()
+ self.assertEqual(a_project.description,
+ default_fixtures.TENANTS[1]['description'])
+ self.assertEqual(a_project.domain_id, DEFAULT_DOMAIN_ID)
+
+ session.commit()
+ session.close()
+
+ self.check_uniqueness_constraints()
+
+ def test_downgrade_16_to_14(self):
+ self.upgrade(16)
+ self.populate_user_table(with_pass_enab_domain=True)
+ self.populate_tenant_table(with_desc_enab_domain=True)
+ self.downgrade(14)
+ self.assertTableColumns("user",
+ ["id", "name", "extra",
+ "password", "enabled"])
+ session = self.Session()
+ user_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ a_user = session.query(user_table).filter("id='foo'").one()
+ self.assertTrue(a_user.enabled)
+ a_user = session.query(user_table).filter("id='badguy'").one()
+ self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])
+ tenant_table = sqlalchemy.Table("tenant",
+ self.metadata,
+ autoload=True)
+ a_tenant = session.query(tenant_table).filter("id='baz'").one()
+ self.assertEqual(a_tenant.description,
+ default_fixtures.TENANTS[1]['description'])
+ session.commit()
+ session.close()
+
+ def test_downgrade_remove_group_tables(self):
+ self.upgrade(14)
+ self.downgrade(13)
+ self.assertTableDoesNotExist('group')
+ self.assertTableDoesNotExist('group_project_metadata')
+ self.assertTableDoesNotExist('group_domain_metadata')
+ self.assertTableDoesNotExist('user_group_membership')
+
+ def test_downgrade_endpoints(self):
+ self.upgrade(13)
+
+ service_extra = {
+ 'name': uuid.uuid4().hex,
+ }
+ service = {
+ 'id': uuid.uuid4().hex,
+ 'type': uuid.uuid4().hex,
+ 'extra': json.dumps(service_extra),
+ }
+
+ common_endpoint_attrs = {
+ 'legacy_endpoint_id': uuid.uuid4().hex,
+ 'region': uuid.uuid4().hex,
+ 'service_id': service['id'],
+ 'extra': json.dumps({}),
+ }
+ endpoints = {
+ 'public': {
+ 'id': uuid.uuid4().hex,
+ 'interface': 'public',
+ 'url': uuid.uuid4().hex,
+ },
+ 'internal': {
+ 'id': uuid.uuid4().hex,
+ 'interface': 'internal',
+ 'url': uuid.uuid4().hex,
+ },
+ 'admin': {
+ 'id': uuid.uuid4().hex,
+ 'interface': 'admin',
+ 'url': uuid.uuid4().hex,
+ },
+ }
+
+ session = self.Session()
+ self.insert_dict(session, 'service', service)
+ for endpoint in endpoints.values():
+ endpoint.update(common_endpoint_attrs)
+ self.insert_dict(session, 'endpoint', endpoint)
+ session.commit()
+ session.close()
+
+ self.downgrade(9)
+
+ self.assertTableColumns(
+ 'service',
+ ['id', 'type', 'extra'])
+ self.assertTableColumns(
+ 'endpoint',
+ ['id', 'region', 'service_id', 'extra'])
+
+ endpoint_table = sqlalchemy.Table(
+ 'endpoint', self.metadata, autoload=True)
+
+ session = self.Session()
+ self.assertEqual(session.query(endpoint_table).count(), 1)
+ q = session.query(endpoint_table)
+ q = q.filter_by(id=common_endpoint_attrs['legacy_endpoint_id'])
+ ref = q.one()
+ self.assertEqual(ref.id, common_endpoint_attrs['legacy_endpoint_id'])
+ self.assertEqual(ref.region, endpoint['region'])
+ self.assertEqual(ref.service_id, endpoint['service_id'])
+ extra = json.loads(ref.extra)
+ for interface in ['public', 'internal', 'admin']:
+ expected_url = endpoints[interface]['url']
+ self.assertEqual(extra['%surl' % interface], expected_url)
+ session.commit()
+ session.close()
+
+ def insert_dict(self, session, table_name, d):
+ """Naively inserts key-value pairs into a table, given a dictionary."""
+ this_table = sqlalchemy.Table(table_name, self.metadata, autoload=True)
+ insert = this_table.insert()
+ insert.execute(d)
+ session.commit()
+
+ def test_downgrade_to_0(self):
+ self.upgrade(self.max_version)
+
+ if self.engine.name == 'mysql':
+ self._mysql_check_all_tables_innodb()
+
+ self.downgrade(0)
+ for table_name in ["user", "token", "role", "user_tenant_membership",
+ "metadata"]:
+ self.assertTableDoesNotExist(table_name)
+
+ def test_upgrade_add_domain_tables(self):
+ self.upgrade(6)
+ self.assertTableDoesNotExist('credential')
+ self.assertTableDoesNotExist('domain')
+ self.assertTableDoesNotExist('user_domain_metadata')
+
+ self.upgrade(7)
+ self.assertTableExists('credential')
+ self.assertTableColumns('credential', ['id', 'user_id', 'project_id',
+ 'blob', 'type', 'extra'])
+ self.assertTableExists('domain')
+ self.assertTableColumns('domain', ['id', 'name', 'enabled', 'extra'])
+ self.assertTableExists('user_domain_metadata')
+ self.assertTableColumns('user_domain_metadata',
+ ['user_id', 'domain_id', 'data'])
+
+ def test_metadata_table_migration(self):
+ # Scaffolding
+ session = self.Session()
+
+ self.upgrade(16)
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ project_table = sqlalchemy.Table(
+ 'project', self.metadata, autoload=True)
+ metadata_table = sqlalchemy.Table(
+ 'metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create a Project
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project))
+
+ # Create another Project
+ project2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project2))
+
+ # Create a User
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user))
+
+ # Create a Role
+ role = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+
+ # And another role
+ role2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role2))
+
+ # Grant Role to User
+ role_grant = {'user_id': user['id'],
+ 'tenant_id': project['id'],
+ 'data': json.dumps({"roles": [role['id']]})}
+ session.execute(metadata_table.insert().values(role_grant))
+
+ role_grant = {'user_id': user['id'],
+ 'tenant_id': project2['id'],
+ 'data': json.dumps({"roles": [role2['id']]})}
+ session.execute(metadata_table.insert().values(role_grant))
+
+ # Create another user to test the case where member_role_id is already
+ # assigned.
+ user2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user2))
+
+ # Grant CONF.member_role_id to User2
+ role_grant = {'user_id': user2['id'],
+ 'tenant_id': project['id'],
+ 'data': json.dumps({"roles": [CONF.member_role_id]})}
+ session.execute(metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ self.upgrade(17)
+
+ user_project_metadata_table = sqlalchemy.Table(
+ 'user_project_metadata', self.metadata, autoload=True)
+
+ s = sqlalchemy.select([metadata_table.c.data]).where(
+ (metadata_table.c.user_id == user['id']) &
+ (metadata_table.c.tenant_id == project['id']))
+ r = session.execute(s)
+ test_project1 = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(test_project1['roles']), 1)
+ self.assertIn(role['id'], test_project1['roles'])
+
+ # Test user in project2 has role2
+ s = sqlalchemy.select([metadata_table.c.data]).where(
+ (metadata_table.c.user_id == user['id']) &
+ (metadata_table.c.tenant_id == project2['id']))
+ r = session.execute(s)
+ test_project2 = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(test_project2['roles']), 1)
+ self.assertIn(role2['id'], test_project2['roles'])
+
+ # Test for user in project has role in user_project_metadata
+ # Migration 17 does not properly migrate this data, so this should
+ # be None.
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ self.assertIsNone(r.fetchone())
+
+ # Create a conflicting user-project in user_project_metadata with
+ # a different role
+ data = json.dumps({"roles": [role2['id']]})
+ role_grant = {'user_id': user['id'],
+ 'project_id': project['id'],
+ 'data': data}
+ cmd = user_project_metadata_table.insert().values(role_grant)
+ self.engine.execute(cmd)
+
+ # Create another conflicting user-project for User2
+ data = json.dumps({"roles": [role2['id']]})
+ role_grant = {'user_id': user2['id'],
+ 'project_id': project['id'],
+ 'data': data}
+ cmd = user_project_metadata_table.insert().values(role_grant)
+ self.engine.execute(cmd)
+ # End Scaffolding
+
+ session.commit()
+
+ # Migrate to 20
+ self.upgrade(20)
+
+ # The user-project pairs should have all roles from the previous
+ # metadata table in addition to any roles currently in
+ # user_project_metadata
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ role_ids = json.loads(r.fetchone()['data'])['roles']
+ self.assertEqual(len(role_ids), 3)
+ self.assertIn(CONF.member_role_id, role_ids)
+ self.assertIn(role['id'], role_ids)
+ self.assertIn(role2['id'], role_ids)
+
+ # pairs that only existed in old metadata table should be in
+ # user_project_metadata
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ role_ids = json.loads(r.fetchone()['data'])['roles']
+ self.assertEqual(len(role_ids), 2)
+ self.assertIn(CONF.member_role_id, role_ids)
+ self.assertIn(role2['id'], role_ids)
+
+ self.assertTableDoesNotExist('metadata')
+
+ def test_upgrade_default_roles(self):
+ def count_member_roles():
+ session = self.Session()
+ query_string = ("select count(*) as c from role "
+ "where name='%s'" % config.CONF.member_role_name)
+ role_count = session.execute(query_string).fetchone()['c']
+ session.close()
+ return role_count
+
+ self.upgrade(16)
+ self.assertEquals(0, count_member_roles())
+ self.upgrade(17)
+ self.assertEquals(1, count_member_roles())
+ self.downgrade(16)
+ self.assertEquals(0, count_member_roles())
+
+ def check_uniqueness_constraints(self):
+ # Check uniqueness constraints for User & Project tables are
+ # correct following schema modification. The Group table's
+ # schema is never modified, so we don't bother to check that.
+ domain_table = sqlalchemy.Table('domain',
+ self.metadata,
+ autoload=True)
+ domain1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ domain2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ cmd = domain_table.insert().values(domain1)
+ self.engine.execute(cmd)
+ cmd = domain_table.insert().values(domain2)
+ self.engine.execute(cmd)
+
+ # First, the User table.
+ this_table = sqlalchemy.Table('user',
+ self.metadata,
+ autoload=True)
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ cmd = this_table.insert().values(user)
+ self.engine.execute(cmd)
+ # now insert a user with the same name into a different
+ # domain - which should work.
+ user['id'] = uuid.uuid4().hex
+ user['domain_id'] = domain2['id']
+ cmd = this_table.insert().values(user)
+ self.engine.execute(cmd)
+ # TODO(henry-nash): For now, as part of clean-up we delete one of these
+ # users. Although not part of this test, unless we do so the
+ # downgrade(16->15) that is part of teardown with fail due to having
+ # two uses with clashing name as we try to revert to a single global
+ # name space. This limitation is raised as Bug #1125046 and the delete
+ # could be removed depending on how that bug is resolved.
+ cmd = this_table.delete(id=user['id'])
+ self.engine.execute(cmd)
+
+ # Now, the Project table.
+ this_table = sqlalchemy.Table('project',
+ self.metadata,
+ autoload=True)
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'],
+ 'description': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ cmd = this_table.insert().values(project)
+ self.engine.execute(cmd)
+ # now insert a project with the same name into a different
+ # domain - which should work.
+ project['id'] = uuid.uuid4().hex
+ project['domain_id'] = domain2['id']
+ cmd = this_table.insert().values(project)
+ self.engine.execute(cmd)
+ # TODO(henry-nash): For now, we delete one of the projects for the same
+ # reason as we delete one of the users (Bug #1125046). This delete
+ # could be removed depending on that bug resolution.
+ cmd = this_table.delete(id=project['id'])
+ self.engine.execute(cmd)
+
+ def test_upgrade_trusts(self):
+ self.assertEqual(self.schema.version, 0, "DB is at version 0")
+ self.upgrade(20)
+ self.assertTableColumns("token",
+ ["id", "expires", "extra", "valid"])
+ self.upgrade(21)
+ self.assertTableColumns("trust",
+ ["id", "trustor_user_id",
+ "trustee_user_id",
+ "project_id", "impersonation",
+ "deleted_at",
+ "expires_at", "extra"])
+ self.assertTableColumns("trust_role",
+ ["trust_id", "role_id"])
+ self.assertTableColumns("token",
+ ["id", "expires", "extra", "valid",
+ "trust_id", "user_id"])
+
+ def test_fixup_role(self):
+ session = self.Session()
+ self.assertEqual(self.schema.version, 0, "DB is at version 0")
+ self.upgrade(1)
+ self.insert_dict(session, "role", {"id": "test", "name": "test"})
+ self.upgrade(18)
+ self.insert_dict(session, "role", {"id": "test2",
+ "name": "test2",
+ "extra": None})
+ r = session.execute('select count(*) as c from role '
+ 'where extra is null')
+ self.assertEqual(r.fetchone()['c'], 2)
+ session.commit()
+ self.upgrade(19)
+ r = session.execute('select count(*) as c from role '
+ 'where extra is null')
+ self.assertEqual(r.fetchone()['c'], 0)
+
+ def test_legacy_endpoint_id(self):
+ session = self.Session()
+ self.upgrade(21)
+
+ service = {
+ 'id': uuid.uuid4().hex,
+ 'name': 'keystone',
+ 'type': 'identity'}
+ self.insert_dict(session, 'service', service)
+
+ legacy_endpoint_id = uuid.uuid4().hex
+ endpoint = {
+ 'id': uuid.uuid4().hex,
+ 'service_id': service['id'],
+ 'interface': uuid.uuid4().hex[:8],
+ 'url': uuid.uuid4().hex,
+ 'extra': json.dumps({
+ 'legacy_endpoint_id': legacy_endpoint_id})}
+ self.insert_dict(session, 'endpoint', endpoint)
+
+ session.commit()
+ self.upgrade(22)
+
+ endpoint_table = sqlalchemy.Table(
+ 'endpoint', self.metadata, autoload=True)
+
+ self.assertEqual(session.query(endpoint_table).count(), 1)
+ ref = session.query(endpoint_table).one()
+ self.assertEqual(ref.id, endpoint['id'], ref)
+ self.assertEqual(ref.service_id, endpoint['service_id'])
+ self.assertEqual(ref.interface, endpoint['interface'])
+ self.assertEqual(ref.url, endpoint['url'])
+ self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id)
+ self.assertEqual(ref.extra, '{}')
+
+ def test_group_project_FK_fixup(self):
+ # To create test data we must start before we broke in the
+ # group_project_metadata table in 015.
+ self.upgrade(14)
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create two Tenants
+ tenant = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant))
+
+ tenant1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'extra': "{}"}
+ session.execute(tenant_table.insert().values(tenant1))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(2):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'group_id': group['id'],
+ 'project_id': tenant1['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ # Now upgrade and fix up the FKs
+ self.upgrade(28)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == tenant1['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ self.downgrade(27)
+ self.assertTableExists('group_project_metadata')
+ self.assertTableExists('project')
+ self.assertTableDoesNotExist('tenant')
+
+ def test_assignment_metadata_migration(self):
+ self.upgrade(28)
+ # Scaffolding
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
+ user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
+ group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ project_table = sqlalchemy.Table(
+ 'project', self.metadata, autoload=True)
+ user_project_metadata_table = sqlalchemy.Table(
+ 'user_project_metadata', self.metadata, autoload=True)
+ user_domain_metadata_table = sqlalchemy.Table(
+ 'user_domain_metadata', self.metadata, autoload=True)
+ group_project_metadata_table = sqlalchemy.Table(
+ 'group_project_metadata', self.metadata, autoload=True)
+ group_domain_metadata_table = sqlalchemy.Table(
+ 'group_domain_metadata', self.metadata, autoload=True)
+
+ # Create a Domain
+ domain = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain))
+
+ # Create anther Domain
+ domain2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ session.execute(domain_table.insert().values(domain2))
+
+ # Create a Project
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project))
+
+ # Create another Project
+ project2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': "{}"}
+ session.execute(project_table.insert().values(project2))
+
+ # Create a User
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user))
+
+ # Create a Group
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'extra': json.dumps({})}
+ session.execute(group_table.insert().values(group))
+
+ # Create roles
+ role_list = []
+ for _ in range(7):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ session.execute(role_table.insert().values(role))
+ role_list.append(role)
+
+ # Grant Role to User on Project
+ role_grant = {'user_id': user['id'],
+ 'project_id': project['id'],
+ 'data': json.dumps({'roles': [role_list[0]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ role_grant = {'user_id': user['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[1]['id']]})}
+ session.execute(
+ user_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on different Project
+ role_grant = {'group_id': group['id'],
+ 'project_id': project2['id'],
+ 'data': json.dumps({'roles': [role_list[2]['id']]})}
+ session.execute(
+ group_project_metadata_table.insert().values(role_grant))
+
+ # Grant Role to User on Domain
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps({'roles': [role_list[3]['id']]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ # Grant Role to Group on Domain
+ role_grant = {'group_id': group['id'],
+ 'domain_id': domain['id'],
+ 'data': json.dumps(
+ {'roles': [role_list[4]['id']],
+ 'other': 'somedata'})}
+ session.execute(
+ group_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+
+ self.upgrade(29)
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[0]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[1]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[2]['id']}, data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[3]['id']}, data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn({'id': role_list[4]['id']}, data['roles'])
+ self.assertIn('other', data)
+
+ # Now add an entry that has one regular and one inherited role
+ role_grant = {'user_id': user['id'],
+ 'domain_id': domain2['id'],
+ 'data': json.dumps(
+ {'roles': [{'id': role_list[5]['id']},
+ {'id': role_list[6]['id'],
+ 'inherited_to': 'projects'}]})}
+ session.execute(user_domain_metadata_table.insert().values(role_grant))
+
+ session.commit()
+ self.downgrade(28)
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[0]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
+ (user_project_metadata_table.c.user_id == user['id']) &
+ (user_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[1]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
+ (group_project_metadata_table.c.group_id == group['id']) &
+ (group_project_metadata_table.c.project_id == project2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[2]['id'], data['roles'])
+
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[3]['id'], data['roles'])
+
+ s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
+ (group_domain_metadata_table.c.group_id == group['id']) &
+ (group_domain_metadata_table.c.domain_id == domain['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[4]['id'], data['roles'])
+ self.assertIn('other', data)
+
+ # For user-domain2, where we had one regular and one inherited role,
+ # only the direct role should remain, the inherited role should
+ # have been deleted during the downgrade
+ s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
+ (user_domain_metadata_table.c.user_id == user['id']) &
+ (user_domain_metadata_table.c.domain_id == domain2['id']))
+ r = session.execute(s)
+ data = json.loads(r.fetchone()['data'])
+ self.assertEqual(len(data['roles']), 1)
+ self.assertIn(role_list[5]['id'], data['roles'])
+
+ def test_drop_credential_constraint(self):
+ ec2_credential = {
+ 'id': '100',
+ 'user_id': 'foo',
+ 'project_id': 'bar',
+ 'type': 'ec2',
+ 'blob': json.dumps({
+ "access": "12345",
+ "secret": "12345"
+ })
+ }
+ user = {
+ 'id': 'foo',
+ 'name': 'FOO',
+ 'password': 'foo2',
+ 'enabled': True,
+ 'email': 'foo@bar.com',
+ 'extra': json.dumps({'enabled': True})
+ }
+ tenant = {
+ 'id': 'bar',
+ 'name': 'BAR',
+ 'description': 'description',
+ 'enabled': True,
+ 'extra': json.dumps({'enabled': True})
+ }
+ session = self.Session()
+ self.upgrade(7)
+ self.insert_dict(session, 'user', user)
+ self.insert_dict(session, 'tenant', tenant)
+ self.insert_dict(session, 'credential', ec2_credential)
+ session.commit()
+ self.upgrade(30)
+ cred_table = sqlalchemy.Table('credential',
+ self.metadata,
+ autoload=True)
+ cred = session.query(cred_table).filter("id='100'").one()
+ self.assertEqual(cred.user_id,
+ ec2_credential['user_id'])
+
+ def test_drop_credential_indexes(self):
+ self.upgrade(31)
+ table = sqlalchemy.Table('credential', self.metadata, autoload=True)
+ self.assertEqual(len(table.indexes), 0)
+
+ def test_downgrade_30(self):
+ self.upgrade(31)
+ self.downgrade(30)
+ table = sqlalchemy.Table('credential', self.metadata, autoload=True)
+ index_data = [(idx.name, idx.columns.keys())
+ for idx in table.indexes]
+ if self.engine.name == 'mysql':
+ self.assertIn(('user_id', ['user_id']), index_data)
+ self.assertIn(('credential_project_id_fkey', ['project_id']),
+ index_data)
+ else:
+ self.assertEqual(len(index_data), 0)
+
+ def populate_user_table(self, with_pass_enab=False,
+ with_pass_enab_domain=False):
+ # Populate the appropriate fields in the user
+ # table, depending on the parameters:
+ #
+ # Default: id, name, extra
+ # pass_enab: Add password, enabled as well
+ # pass_enab_domain: Add password, enabled and domain as well
+ #
+ this_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ for user in default_fixtures.USERS:
+ extra = copy.deepcopy(user)
+ extra.pop('id')
+ extra.pop('name')
+
+ if with_pass_enab:
+ password = extra.pop('password', None)
+ enabled = extra.pop('enabled', True)
+ ins = this_table.insert().values(
+ {'id': user['id'],
+ 'name': user['name'],
+ 'password': password,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ if with_pass_enab_domain:
+ password = extra.pop('password', None)
+ enabled = extra.pop('enabled', True)
+ extra.pop('domain_id')
+ ins = this_table.insert().values(
+ {'id': user['id'],
+ 'name': user['name'],
+ 'domain_id': user['domain_id'],
+ 'password': password,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ ins = this_table.insert().values(
+ {'id': user['id'],
+ 'name': user['name'],
+ 'extra': json.dumps(extra)})
+ self.engine.execute(ins)
+
+ def populate_tenant_table(self, with_desc_enab=False,
+ with_desc_enab_domain=False):
+ # Populate the appropriate fields in the tenant or
+ # project table, depending on the parameters
+ #
+ # Default: id, name, extra
+ # desc_enab: Add description, enabled as well
+ # desc_enab_domain: Add description, enabled and domain as well,
+ # plus use project instead of tenant
+ #
+ if with_desc_enab_domain:
+ # By this time tenants are now projects
+ this_table = sqlalchemy.Table("project",
+ self.metadata,
+ autoload=True)
+ else:
+ this_table = sqlalchemy.Table("tenant",
+ self.metadata,
+ autoload=True)
+
+ for tenant in default_fixtures.TENANTS:
+ extra = copy.deepcopy(tenant)
+ extra.pop('id')
+ extra.pop('name')
+
+ if with_desc_enab:
+ desc = extra.pop('description', None)
+ enabled = extra.pop('enabled', True)
+ ins = this_table.insert().values(
+ {'id': tenant['id'],
+ 'name': tenant['name'],
+ 'description': desc,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ if with_desc_enab_domain:
+ desc = extra.pop('description', None)
+ enabled = extra.pop('enabled', True)
+ extra.pop('domain_id')
+ ins = this_table.insert().values(
+ {'id': tenant['id'],
+ 'name': tenant['name'],
+ 'domain_id': tenant['domain_id'],
+ 'description': desc,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ ins = this_table.insert().values(
+ {'id': tenant['id'],
+ 'name': tenant['name'],
+ 'extra': json.dumps(extra)})
+ self.engine.execute(ins)
+
+ def _mysql_check_all_tables_innodb(self):
+ database = self.engine.url.database
+
+ connection = self.engine.connect()
+ # sanity check
+ total = connection.execute("SELECT count(*) "
+ "from information_schema.TABLES "
+ "where TABLE_SCHEMA='%(database)s'" %
+ locals())
+ self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
+
+ noninnodb = connection.execute("SELECT table_name "
+ "from information_schema.TABLES "
+ "where TABLE_SCHEMA='%(database)s' "
+ "and ENGINE!='InnoDB' "
+ "and TABLE_NAME!='migrate_version'" %
+ locals())
+ names = [x[0] for x in noninnodb]
+ self.assertEqual(names, [],
+ "Non-InnoDB tables exist")
+
+ connection.close()