summaryrefslogtreecommitdiffstats
path: root/tests/test_sql_upgrade.py
diff options
context:
space:
mode:
authorHenry Nash <henryn@linux.vnet.ibm.com>2013-01-16 16:10:24 +0000
committerHenry Nash <henryn@linux.vnet.ibm.com>2013-02-12 16:09:15 +0000
commit8a89464d62e9c81a1ba15c0a3aa695456fc6fd33 (patch)
tree46c61fe10c2859fdeddd80379d15bc839d37cd3d /tests/test_sql_upgrade.py
parentf1defe8f624e006a7562bc07cd471bdd176e303e (diff)
downloadkeystone-8a89464d62e9c81a1ba15c0a3aa695456fc6fd33.tar.gz
keystone-8a89464d62e9c81a1ba15c0a3aa695456fc6fd33.tar.xz
keystone-8a89464d62e9c81a1ba15c0a3aa695456fc6fd33.zip
Keystone backend preparation for domain-scoping
These changes lay the ground work for the implmentation of domain-scoping, but are benign in that they don't change the token. They include making domain_id a first-class attribute in the user and project entity (i.e. move it out of the 'extra' attribute), filling in domain grant and project support for the kvs backend and fixing a series of issues in the mirgation to make it work for both MySQL, Postgresql and sqlite. A further, separate, commit will actually provide the code to update the actual tokens once the v3 token support has been added. blueprint domain-scoping blueprint default-domain Change-Id: I55ab7947a6a1efbab003bd234856bd3805bb4a63
Diffstat (limited to 'tests/test_sql_upgrade.py')
-rw-r--r--tests/test_sql_upgrade.py217
1 files changed, 189 insertions, 28 deletions
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py
index f0b5fa5d..d204492c 100644
--- a/tests/test_sql_upgrade.py
+++ b/tests/test_sql_upgrade.py
@@ -35,12 +35,14 @@ import sqlalchemy
from keystone.common import sql
from keystone.common.sql import migration
from keystone import config
+from keystone import exception
from keystone import test
import default_fixtures
CONF = config.CONF
+DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
class SqlUpgradeTests(test.TestCase):
@@ -129,8 +131,8 @@ class SqlUpgradeTests(test.TestCase):
self.populate_tenant_table()
self.upgrade(10)
self.assertTableColumns("user",
- ["id", "name", "extra", "password",
- "enabled"])
+ ["id", "name", "extra",
+ "password", "enabled"])
self.assertTableColumns("tenant",
["id", "name", "extra", "description",
"enabled"])
@@ -152,17 +154,33 @@ class SqlUpgradeTests(test.TestCase):
a_tenant = session.query(tenant_table).filter("id='baz'").one()
self.assertEqual(a_tenant.description, 'description')
session.commit()
+ session.close()
def test_downgrade_10_to_8(self):
- self.upgrade(8)
- self.populate_user_table()
- self.populate_tenant_table()
self.upgrade(10)
+ self.populate_user_table(with_pass_enab=True)
+ self.populate_tenant_table(with_desc_enab=True)
self.downgrade(8)
+ self.assertTableColumns('user',
+ ['id', 'name', 'extra'])
+ self.assertTableColumns('tenant',
+ ['id', 'name', 'extra'])
+ session = self.Session()
+ user_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ a_user = session.query(user_table).filter("id='badguy'").one()
+ self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])
+ tenant_table = sqlalchemy.Table("tenant",
+ self.metadata,
+ autoload=True)
+ a_tenant = session.query(tenant_table).filter("id='baz'").one()
+ self.assertEqual(a_tenant.name, default_fixtures.TENANTS[1]['name'])
+ session.commit()
+ session.close()
def test_upgrade_10_to_13(self):
self.upgrade(10)
-
service_extra = {
'name': uuid.uuid4().hex,
}
@@ -187,9 +205,9 @@ class SqlUpgradeTests(test.TestCase):
self.insert_dict(session, 'service', service)
self.insert_dict(session, 'endpoint', endpoint)
session.commit()
+ session.close()
self.upgrade(13)
-
self.assertTableColumns(
'service',
['id', 'type', 'extra'])
@@ -215,6 +233,8 @@ class SqlUpgradeTests(test.TestCase):
self.assertEqual(ref.service_id, endpoint['service_id'])
self.assertEqual(ref.url, endpoint_extra['%surl' % interface])
self.assertEqual(ref.extra, '{}')
+ session.commit()
+ session.close()
def assertTenantTables(self):
self.assertTableExists('tenant')
@@ -235,6 +255,12 @@ class SqlUpgradeTests(test.TestCase):
self.assertProjectTables()
def test_downgrade_project_to_tenant(self):
+ # TODO(henry-nash): Debug why we need to re-load the tenant
+ # or user_tenant_membership ahead of upgrading to project
+ # in order for the assertProjectTables to work on sqlite
+ # (MySQL is fine without it)
+ self.upgrade(14)
+ self.assertTenantTables()
self.upgrade(15)
self.assertProjectTables()
self.downgrade(14)
@@ -248,6 +274,59 @@ class SqlUpgradeTests(test.TestCase):
self.assertTableExists('group_domain_metadata')
self.assertTableExists('user_group_membership')
+ def test_upgrade_14_to_16(self):
+ self.upgrade(14)
+ self.populate_user_table(with_pass_enab=True)
+ self.populate_tenant_table(with_desc_enab=True)
+ self.upgrade(16)
+ self.assertTableColumns("user",
+ ["id", "name", "extra",
+ "password", "enabled", "domain_id"])
+ session = self.Session()
+ user_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ a_user = session.query(user_table).filter("id='foo'").one()
+ self.assertTrue(a_user.enabled)
+ self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID)
+ a_user = session.query(user_table).filter("id='badguy'").one()
+ self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])
+ self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID)
+ project_table = sqlalchemy.Table("project",
+ self.metadata,
+ autoload=True)
+ a_project = session.query(project_table).filter("id='baz'").one()
+ self.assertEqual(a_project.description,
+ default_fixtures.TENANTS[1]['description'])
+ self.assertEqual(a_project.domain_id, DEFAULT_DOMAIN_ID)
+ session.commit()
+ session.close()
+
+ def test_downgrade_16_to_14(self):
+ self.upgrade(16)
+ self.populate_user_table(with_pass_enab_domain=True)
+ self.populate_tenant_table(with_desc_enab_domain=True)
+ self.downgrade(14)
+ self.assertTableColumns("user",
+ ["id", "name", "extra",
+ "password", "enabled"])
+ session = self.Session()
+ user_table = sqlalchemy.Table("user",
+ self.metadata,
+ autoload=True)
+ a_user = session.query(user_table).filter("id='foo'").one()
+ self.assertTrue(a_user.enabled)
+ a_user = session.query(user_table).filter("id='badguy'").one()
+ self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])
+ tenant_table = sqlalchemy.Table("tenant",
+ self.metadata,
+ autoload=True)
+ a_tenant = session.query(tenant_table).filter("id='baz'").one()
+ self.assertEqual(a_tenant.description,
+ default_fixtures.TENANTS[1]['description'])
+ session.commit()
+ session.close()
+
def test_downgrade_14_to_13(self):
self.upgrade(14)
self.downgrade(13)
@@ -298,6 +377,7 @@ class SqlUpgradeTests(test.TestCase):
endpoint.update(common_endpoint_attrs)
self.insert_dict(session, 'endpoint', endpoint)
session.commit()
+ session.close()
self.downgrade(9)
@@ -323,14 +403,15 @@ class SqlUpgradeTests(test.TestCase):
for interface in ['public', 'internal', 'admin']:
expected_url = endpoints[interface]['url']
self.assertEqual(extra['%surl' % interface], expected_url)
+ session.commit()
+ session.close()
def insert_dict(self, session, table_name, d):
"""Naively inserts key-value pairs into a table, given a dictionary."""
- session.execute(
- 'INSERT INTO `%s` (%s) VALUES (%s)' % (
- table_name,
- ', '.join('%s' % k for k in d.keys()),
- ', '.join("'%s'" % v for v in d.values())))
+ this_table = sqlalchemy.Table(table_name, self.metadata, autoload=True)
+ insert = this_table.insert()
+ insert.execute(d)
+ session.commit()
def test_downgrade_to_0(self):
self.upgrade(self.max_version)
@@ -355,28 +436,103 @@ class SqlUpgradeTests(test.TestCase):
self.assertTableColumns('user_domain_metadata',
['user_id', 'domain_id', 'data'])
- def populate_user_table(self):
- user_table = sqlalchemy.Table('user',
+ def populate_user_table(self, with_pass_enab=False,
+ with_pass_enab_domain=False):
+ # Populate the appropriate fields in the user
+ # table, depending on the parameters:
+ #
+ # Default: id, name, extra
+ # pass_enab: Add password, enabled as well
+ # pass_enab_domain: Add password, enabled and domain as well
+ #
+ this_table = sqlalchemy.Table("user",
self.metadata,
autoload=True)
- session = self.Session()
- insert = user_table.insert()
for user in default_fixtures.USERS:
extra = copy.deepcopy(user)
extra.pop('id')
extra.pop('name')
- user['extra'] = json.dumps(extra)
- insert.execute(user)
- def populate_tenant_table(self):
+ if with_pass_enab:
+ password = extra.pop('password', None)
+ enabled = extra.pop('enabled', True)
+ ins = this_table.insert().values(
+ {'id': user['id'],
+ 'name': user['name'],
+ 'password': password,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ if with_pass_enab_domain:
+ password = extra.pop('password', None)
+ enabled = extra.pop('enabled', True)
+ extra.pop('domain_id')
+ ins = this_table.insert().values(
+ {'id': user['id'],
+ 'name': user['name'],
+ 'domain_id': user['domain_id'],
+ 'password': password,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ ins = this_table.insert().values(
+ {'id': user['id'],
+ 'name': user['name'],
+ 'extra': json.dumps(extra)})
+ self.engine.execute(ins)
+
+ def populate_tenant_table(self, with_desc_enab=False,
+ with_desc_enab_domain=False):
+ # Populate the appropriate fields in the tenant or
+ # project table, depending on the parameters
+ #
+ # Default: id, name, extra
+ # desc_enab: Add description, enabled as well
+ # desc_enab_domain: Add description, enabled and domain as well,
+ # plus use project instead of tenant
+ #
+ if with_desc_enab_domain:
+ # By this time tenants are now projects
+ this_table = sqlalchemy.Table("project",
+ self.metadata,
+ autoload=True)
+ else:
+ this_table = sqlalchemy.Table("tenant",
+ self.metadata,
+ autoload=True)
+
for tenant in default_fixtures.TENANTS:
extra = copy.deepcopy(tenant)
extra.pop('id')
extra.pop('name')
- self.engine.execute("insert into tenant values ('%s', '%s', '%s')"
- % (tenant['id'],
- tenant['name'],
- json.dumps(extra)))
+
+ if with_desc_enab:
+ desc = extra.pop('description', None)
+ enabled = extra.pop('enabled', True)
+ ins = this_table.insert().values(
+ {'id': tenant['id'],
+ 'name': tenant['name'],
+ 'description': desc,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ if with_desc_enab_domain:
+ desc = extra.pop('description', None)
+ enabled = extra.pop('enabled', True)
+ extra.pop('domain_id')
+ ins = this_table.insert().values(
+ {'id': tenant['id'],
+ 'name': tenant['name'],
+ 'domain_id': tenant['domain_id'],
+ 'description': desc,
+ 'enabled': bool(enabled),
+ 'extra': json.dumps(extra)})
+ else:
+ ins = this_table.insert().values(
+ {'id': tenant['id'],
+ 'name': tenant['name'],
+ 'extra': json.dumps(extra)})
+ self.engine.execute(ins)
def select_table(self, name):
table = sqlalchemy.Table(name,
@@ -387,16 +543,21 @@ class SqlUpgradeTests(test.TestCase):
def assertTableExists(self, table_name):
try:
- #TODO ayoung: make quoting work for postgres
- self.engine.execute("select count(*) from '%s'" % table_name)
- except:
+ self.select_table(table_name)
+ except sqlalchemy.exc.NoSuchTableError:
raise AssertionError('Table "%s" does not exist' % table_name)
def assertTableDoesNotExist(self, table_name):
"""Asserts that a given table exists cannot be selected by name."""
+ # Switch to a different metadata otherwise you might still
+ # detect renamed or dropped tables
try:
- self.assertTableExists(table_name)
- except AssertionError:
+ temp_metadata = sqlalchemy.MetaData()
+ temp_metadata.bind = self.engine
+ table = sqlalchemy.Table(table_name,
+ temp_metadata,
+ autoload=True)
+ except sqlalchemy.exc.NoSuchTableError:
pass
else:
raise AssertionError('Table "%s" already exists' % table_name)