diff options
author | Henry Nash <henryn@linux.vnet.ibm.com> | 2013-01-16 16:10:24 +0000 |
---|---|---|
committer | Henry Nash <henryn@linux.vnet.ibm.com> | 2013-02-12 16:09:15 +0000 |
commit | 8a89464d62e9c81a1ba15c0a3aa695456fc6fd33 (patch) | |
tree | 46c61fe10c2859fdeddd80379d15bc839d37cd3d /tests/test_sql_upgrade.py | |
parent | f1defe8f624e006a7562bc07cd471bdd176e303e (diff) | |
download | keystone-8a89464d62e9c81a1ba15c0a3aa695456fc6fd33.tar.gz keystone-8a89464d62e9c81a1ba15c0a3aa695456fc6fd33.tar.xz keystone-8a89464d62e9c81a1ba15c0a3aa695456fc6fd33.zip |
Keystone backend preparation for domain-scoping
These changes lay the ground work for the implmentation of
domain-scoping, but are benign in that they don't change the token.
They include making domain_id a first-class attribute in the user
and project entity (i.e. move it out of the 'extra' attribute),
filling in domain grant and project support for the kvs backend and
fixing a series of issues in the mirgation to make it work for both
MySQL, Postgresql and sqlite.
A further, separate, commit will actually provide the code to
update the actual tokens once the v3 token support has been added.
blueprint domain-scoping
blueprint default-domain
Change-Id: I55ab7947a6a1efbab003bd234856bd3805bb4a63
Diffstat (limited to 'tests/test_sql_upgrade.py')
-rw-r--r-- | tests/test_sql_upgrade.py | 217 |
1 files changed, 189 insertions, 28 deletions
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py index f0b5fa5d..d204492c 100644 --- a/tests/test_sql_upgrade.py +++ b/tests/test_sql_upgrade.py @@ -35,12 +35,14 @@ import sqlalchemy from keystone.common import sql from keystone.common.sql import migration from keystone import config +from keystone import exception from keystone import test import default_fixtures CONF = config.CONF +DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id class SqlUpgradeTests(test.TestCase): @@ -129,8 +131,8 @@ class SqlUpgradeTests(test.TestCase): self.populate_tenant_table() self.upgrade(10) self.assertTableColumns("user", - ["id", "name", "extra", "password", - "enabled"]) + ["id", "name", "extra", + "password", "enabled"]) self.assertTableColumns("tenant", ["id", "name", "extra", "description", "enabled"]) @@ -152,17 +154,33 @@ class SqlUpgradeTests(test.TestCase): a_tenant = session.query(tenant_table).filter("id='baz'").one() self.assertEqual(a_tenant.description, 'description') session.commit() + session.close() def test_downgrade_10_to_8(self): - self.upgrade(8) - self.populate_user_table() - self.populate_tenant_table() self.upgrade(10) + self.populate_user_table(with_pass_enab=True) + self.populate_tenant_table(with_desc_enab=True) self.downgrade(8) + self.assertTableColumns('user', + ['id', 'name', 'extra']) + self.assertTableColumns('tenant', + ['id', 'name', 'extra']) + session = self.Session() + user_table = sqlalchemy.Table("user", + self.metadata, + autoload=True) + a_user = session.query(user_table).filter("id='badguy'").one() + self.assertEqual(a_user.name, default_fixtures.USERS[2]['name']) + tenant_table = sqlalchemy.Table("tenant", + self.metadata, + autoload=True) + a_tenant = session.query(tenant_table).filter("id='baz'").one() + self.assertEqual(a_tenant.name, default_fixtures.TENANTS[1]['name']) + session.commit() + session.close() def test_upgrade_10_to_13(self): self.upgrade(10) - service_extra = { 'name': uuid.uuid4().hex, } @@ -187,9 +205,9 @@ class SqlUpgradeTests(test.TestCase): self.insert_dict(session, 'service', service) self.insert_dict(session, 'endpoint', endpoint) session.commit() + session.close() self.upgrade(13) - self.assertTableColumns( 'service', ['id', 'type', 'extra']) @@ -215,6 +233,8 @@ class SqlUpgradeTests(test.TestCase): self.assertEqual(ref.service_id, endpoint['service_id']) self.assertEqual(ref.url, endpoint_extra['%surl' % interface]) self.assertEqual(ref.extra, '{}') + session.commit() + session.close() def assertTenantTables(self): self.assertTableExists('tenant') @@ -235,6 +255,12 @@ class SqlUpgradeTests(test.TestCase): self.assertProjectTables() def test_downgrade_project_to_tenant(self): + # TODO(henry-nash): Debug why we need to re-load the tenant + # or user_tenant_membership ahead of upgrading to project + # in order for the assertProjectTables to work on sqlite + # (MySQL is fine without it) + self.upgrade(14) + self.assertTenantTables() self.upgrade(15) self.assertProjectTables() self.downgrade(14) @@ -248,6 +274,59 @@ class SqlUpgradeTests(test.TestCase): self.assertTableExists('group_domain_metadata') self.assertTableExists('user_group_membership') + def test_upgrade_14_to_16(self): + self.upgrade(14) + self.populate_user_table(with_pass_enab=True) + self.populate_tenant_table(with_desc_enab=True) + self.upgrade(16) + self.assertTableColumns("user", + ["id", "name", "extra", + "password", "enabled", "domain_id"]) + session = self.Session() + user_table = sqlalchemy.Table("user", + self.metadata, + autoload=True) + a_user = session.query(user_table).filter("id='foo'").one() + self.assertTrue(a_user.enabled) + self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID) + a_user = session.query(user_table).filter("id='badguy'").one() + self.assertEqual(a_user.name, default_fixtures.USERS[2]['name']) + self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID) + project_table = sqlalchemy.Table("project", + self.metadata, + autoload=True) + a_project = session.query(project_table).filter("id='baz'").one() + self.assertEqual(a_project.description, + default_fixtures.TENANTS[1]['description']) + self.assertEqual(a_project.domain_id, DEFAULT_DOMAIN_ID) + session.commit() + session.close() + + def test_downgrade_16_to_14(self): + self.upgrade(16) + self.populate_user_table(with_pass_enab_domain=True) + self.populate_tenant_table(with_desc_enab_domain=True) + self.downgrade(14) + self.assertTableColumns("user", + ["id", "name", "extra", + "password", "enabled"]) + session = self.Session() + user_table = sqlalchemy.Table("user", + self.metadata, + autoload=True) + a_user = session.query(user_table).filter("id='foo'").one() + self.assertTrue(a_user.enabled) + a_user = session.query(user_table).filter("id='badguy'").one() + self.assertEqual(a_user.name, default_fixtures.USERS[2]['name']) + tenant_table = sqlalchemy.Table("tenant", + self.metadata, + autoload=True) + a_tenant = session.query(tenant_table).filter("id='baz'").one() + self.assertEqual(a_tenant.description, + default_fixtures.TENANTS[1]['description']) + session.commit() + session.close() + def test_downgrade_14_to_13(self): self.upgrade(14) self.downgrade(13) @@ -298,6 +377,7 @@ class SqlUpgradeTests(test.TestCase): endpoint.update(common_endpoint_attrs) self.insert_dict(session, 'endpoint', endpoint) session.commit() + session.close() self.downgrade(9) @@ -323,14 +403,15 @@ class SqlUpgradeTests(test.TestCase): for interface in ['public', 'internal', 'admin']: expected_url = endpoints[interface]['url'] self.assertEqual(extra['%surl' % interface], expected_url) + session.commit() + session.close() def insert_dict(self, session, table_name, d): """Naively inserts key-value pairs into a table, given a dictionary.""" - session.execute( - 'INSERT INTO `%s` (%s) VALUES (%s)' % ( - table_name, - ', '.join('%s' % k for k in d.keys()), - ', '.join("'%s'" % v for v in d.values()))) + this_table = sqlalchemy.Table(table_name, self.metadata, autoload=True) + insert = this_table.insert() + insert.execute(d) + session.commit() def test_downgrade_to_0(self): self.upgrade(self.max_version) @@ -355,28 +436,103 @@ class SqlUpgradeTests(test.TestCase): self.assertTableColumns('user_domain_metadata', ['user_id', 'domain_id', 'data']) - def populate_user_table(self): - user_table = sqlalchemy.Table('user', + def populate_user_table(self, with_pass_enab=False, + with_pass_enab_domain=False): + # Populate the appropriate fields in the user + # table, depending on the parameters: + # + # Default: id, name, extra + # pass_enab: Add password, enabled as well + # pass_enab_domain: Add password, enabled and domain as well + # + this_table = sqlalchemy.Table("user", self.metadata, autoload=True) - session = self.Session() - insert = user_table.insert() for user in default_fixtures.USERS: extra = copy.deepcopy(user) extra.pop('id') extra.pop('name') - user['extra'] = json.dumps(extra) - insert.execute(user) - def populate_tenant_table(self): + if with_pass_enab: + password = extra.pop('password', None) + enabled = extra.pop('enabled', True) + ins = this_table.insert().values( + {'id': user['id'], + 'name': user['name'], + 'password': password, + 'enabled': bool(enabled), + 'extra': json.dumps(extra)}) + else: + if with_pass_enab_domain: + password = extra.pop('password', None) + enabled = extra.pop('enabled', True) + extra.pop('domain_id') + ins = this_table.insert().values( + {'id': user['id'], + 'name': user['name'], + 'domain_id': user['domain_id'], + 'password': password, + 'enabled': bool(enabled), + 'extra': json.dumps(extra)}) + else: + ins = this_table.insert().values( + {'id': user['id'], + 'name': user['name'], + 'extra': json.dumps(extra)}) + self.engine.execute(ins) + + def populate_tenant_table(self, with_desc_enab=False, + with_desc_enab_domain=False): + # Populate the appropriate fields in the tenant or + # project table, depending on the parameters + # + # Default: id, name, extra + # desc_enab: Add description, enabled as well + # desc_enab_domain: Add description, enabled and domain as well, + # plus use project instead of tenant + # + if with_desc_enab_domain: + # By this time tenants are now projects + this_table = sqlalchemy.Table("project", + self.metadata, + autoload=True) + else: + this_table = sqlalchemy.Table("tenant", + self.metadata, + autoload=True) + for tenant in default_fixtures.TENANTS: extra = copy.deepcopy(tenant) extra.pop('id') extra.pop('name') - self.engine.execute("insert into tenant values ('%s', '%s', '%s')" - % (tenant['id'], - tenant['name'], - json.dumps(extra))) + + if with_desc_enab: + desc = extra.pop('description', None) + enabled = extra.pop('enabled', True) + ins = this_table.insert().values( + {'id': tenant['id'], + 'name': tenant['name'], + 'description': desc, + 'enabled': bool(enabled), + 'extra': json.dumps(extra)}) + else: + if with_desc_enab_domain: + desc = extra.pop('description', None) + enabled = extra.pop('enabled', True) + extra.pop('domain_id') + ins = this_table.insert().values( + {'id': tenant['id'], + 'name': tenant['name'], + 'domain_id': tenant['domain_id'], + 'description': desc, + 'enabled': bool(enabled), + 'extra': json.dumps(extra)}) + else: + ins = this_table.insert().values( + {'id': tenant['id'], + 'name': tenant['name'], + 'extra': json.dumps(extra)}) + self.engine.execute(ins) def select_table(self, name): table = sqlalchemy.Table(name, @@ -387,16 +543,21 @@ class SqlUpgradeTests(test.TestCase): def assertTableExists(self, table_name): try: - #TODO ayoung: make quoting work for postgres - self.engine.execute("select count(*) from '%s'" % table_name) - except: + self.select_table(table_name) + except sqlalchemy.exc.NoSuchTableError: raise AssertionError('Table "%s" does not exist' % table_name) def assertTableDoesNotExist(self, table_name): """Asserts that a given table exists cannot be selected by name.""" + # Switch to a different metadata otherwise you might still + # detect renamed or dropped tables try: - self.assertTableExists(table_name) - except AssertionError: + temp_metadata = sqlalchemy.MetaData() + temp_metadata.bind = self.engine + table = sqlalchemy.Table(table_name, + temp_metadata, + autoload=True) + except sqlalchemy.exc.NoSuchTableError: pass else: raise AssertionError('Table "%s" already exists' % table_name) |