diff options
| author | Victor Sergeyev <vsergeyev@mirantis.com> | 2013-06-03 17:24:07 +0300 |
|---|---|---|
| committer | Roman Podolyaka <rpodolyaka@mirantis.com> | 2013-07-12 11:23:39 +0300 |
| commit | 3972c3f494ca7c12e4b9c250a9c2f916165bb6fa (patch) | |
| tree | 996e9b4717e83328f29e067378ac1eb12b36708b | |
| parent | ca0ca29e3f04ced20f62c33ad8b5bf7492775824 (diff) | |
| download | oslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.tar.gz oslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.tar.xz oslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.zip | |
Migrate sqlalchemy utils from Nova
These utils are useful for database schema migrations
and can be reused in other projects.
Blueprint: oslo-sqlalchemy-utils
Change-Id: Ie54fcd5d75de05b48cca8b86c19325d7327f39cd
| -rw-r--r-- | openstack/common/db/sqlalchemy/utils.py | 322 | ||||
| -rw-r--r-- | tests/unit/db/sqlalchemy/test_migrations.conf | 7 | ||||
| -rw-r--r-- | tests/unit/db/sqlalchemy/test_migrations.py | 198 | ||||
| -rw-r--r-- | tests/unit/db/sqlalchemy/test_utils.py | 259 |
4 files changed, 784 insertions, 2 deletions
diff --git a/openstack/common/db/sqlalchemy/utils.py b/openstack/common/db/sqlalchemy/utils.py index 07030c5..8b855ff 100644 --- a/openstack/common/db/sqlalchemy/utils.py +++ b/openstack/common/db/sqlalchemy/utils.py @@ -18,12 +18,28 @@ # License for the specific language governing permissions and limitations # under the License. -"""Implementation of paginate query.""" - import sqlalchemy +from sqlalchemy import Boolean +from sqlalchemy import CheckConstraint +from sqlalchemy import Column +from sqlalchemy.engine import reflection +from sqlalchemy.ext.compiler import compiles +from sqlalchemy import func +from sqlalchemy import Index +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy.sql.expression import literal_column +from sqlalchemy.sql.expression import UpdateBase +from sqlalchemy.sql import select +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.types import NullType from openstack.common.gettextutils import _ # noqa + +from openstack.common import exception from openstack.common import log as logging +from openstack.common import timeutils LOG = logging.getLogger(__name__) @@ -130,3 +146,305 @@ def paginate_query(query, model, limit, sort_keys, marker=None, query = query.limit(limit) return query + + +def get_table(engine, name): + """Returns an sqlalchemy table dynamically from db. + + Needed because the models don't work for us in migrations + as models will be far out of sync with the current data. + """ + metadata = MetaData() + metadata.bind = engine + return Table(name, metadata, autoload=True) + + +class InsertFromSelect(UpdateBase): + """Form the base for `INSERT INTO table (SELECT ... )` statement.""" + def __init__(self, table, select): + self.table = table + self.select = select + + +@compiles(InsertFromSelect) +def visit_insert_from_select(element, compiler, **kw): + """Form the `INSERT INTO table (SELECT ... )` statement.""" + return "INSERT INTO %s %s" % ( + compiler.process(element.table, asfrom=True), + compiler.process(element.select)) + + +def _get_not_supported_column(col_name_col_instance, column_name): + try: + column = col_name_col_instance[column_name] + except KeyError: + msg = _("Please specify column %s in col_name_col_instance " + "param. It is required because column has unsupported " + "type by sqlite).") + raise exception.OpenstackException(message=msg % column_name) + + if not isinstance(column, Column): + msg = _("col_name_col_instance param has wrong type of " + "column instance for column %s It should be instance " + "of sqlalchemy.Column.") + raise exception.OpenstackException(message=msg % column_name) + return column + + +def drop_old_duplicate_entries_from_table(migrate_engine, table_name, + use_soft_delete, *uc_column_names): + """Drop all old rows having the same values for columns in uc_columns. + + This method drop (or mark ad `deleted` if use_soft_delete is True) old + duplicate rows form table with name `table_name`. + + :param migrate_engine: Sqlalchemy engine + :param table_name: Table with duplicates + :param use_soft_delete: If True - values will be marked as `deleted`, + if False - values will be removed from table + :param uc_column_names: Unique constraint columns + """ + meta = MetaData() + meta.bind = migrate_engine + + table = Table(table_name, meta, autoload=True) + columns_for_group_by = [table.c[name] for name in uc_column_names] + + columns_for_select = [func.max(table.c.id)] + columns_for_select.extend(columns_for_group_by) + + duplicated_rows_select = select(columns_for_select, + group_by=columns_for_group_by, + having=func.count(table.c.id) > 1) + + for row in migrate_engine.execute(duplicated_rows_select): + # NOTE(boris-42): Do not remove row that has the biggest ID. + delete_condition = table.c.id != row[0] + is_none = None # workaround for pyflakes + delete_condition &= table.c.deleted_at == is_none + for name in uc_column_names: + delete_condition &= table.c[name] == row[name] + + rows_to_delete_select = select([table.c.id]).where(delete_condition) + for row in migrate_engine.execute(rows_to_delete_select).fetchall(): + LOG.info(_("Deleting duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) + + if use_soft_delete: + delete_statement = table.update().\ + where(delete_condition).\ + values({ + 'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow() + }) + else: + delete_statement = table.delete().where(delete_condition) + migrate_engine.execute(delete_statement) + + +def _get_default_deleted_value(table): + if isinstance(table.c.id.type, Integer): + return 0 + if isinstance(table.c.id.type, String): + return "" + raise exception.OpenstackException( + message=_("Unsupported id columns type")) + + +def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes): + table = get_table(migrate_engine, table_name) + + insp = reflection.Inspector.from_engine(migrate_engine) + real_indexes = insp.get_indexes(table_name) + existing_index_names = dict( + [(index['name'], index['column_names']) for index in real_indexes]) + + # NOTE(boris-42): Restore indexes on `deleted` column + for index in indexes: + if 'deleted' not in index['column_names']: + continue + name = index['name'] + if name in existing_index_names: + column_names = [table.c[c] for c in existing_index_names[name]] + old_index = Index(name, *column_names, unique=index["unique"]) + old_index.drop(migrate_engine) + + column_names = [table.c[c] for c in index['column_names']] + new_index = Index(index["name"], *column_names, unique=index["unique"]) + new_index.create(migrate_engine) + + +def change_deleted_column_type_to_boolean(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_boolean_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + old_deleted = Column('old_deleted', Boolean, default=False) + old_deleted.create(table, populate_default=False) + + table.update().\ + where(table.c.deleted == table.c.id).\ + values(old_deleted=True).\ + execute() + + table.c.deleted.drop() + table.c.old_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, + **col_name_col_instance): + insp = reflection.Inspector.from_engine(migrate_engine) + table = get_table(migrate_engine, table_name) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', Boolean, default=0) + columns.append(column_copy) + + constraints = [constraint.copy() for constraint in table.constraints] + + meta = MetaData(bind=migrate_engine) + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + c_select = [] + for c in table.c: + if c.name != "deleted": + c_select.append(c) + else: + c_select.append(table.c.deleted == table.c.id) + + ins = InsertFromSelect(new_table, select(c_select)) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + new_table.update().\ + where(new_table.c.deleted == new_table.c.id).\ + values(deleted=True).\ + execute() + + +def change_deleted_column_type_to_id_type(migrate_engine, table_name, + **col_name_col_instance): + if migrate_engine.name == "sqlite": + return _change_deleted_column_type_to_id_type_sqlite( + migrate_engine, table_name, **col_name_col_instance) + insp = reflection.Inspector.from_engine(migrate_engine) + indexes = insp.get_indexes(table_name) + + table = get_table(migrate_engine, table_name) + + new_deleted = Column('new_deleted', table.c.id.type, + default=_get_default_deleted_value(table)) + new_deleted.create(table, populate_default=True) + + deleted = True # workaround for pyflakes + table.update().\ + where(table.c.deleted == deleted).\ + values(new_deleted=table.c.id).\ + execute() + table.c.deleted.drop() + table.c.new_deleted.alter(name="deleted") + + _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) + + +def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name, + **col_name_col_instance): + # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check + # constraints in sqlite DB and our `deleted` column has + # 2 check constraints. So there is only one way to remove + # these constraints: + # 1) Create new table with the same columns, constraints + # and indexes. (except deleted column). + # 2) Copy all data from old to new table. + # 3) Drop old table. + # 4) Rename new table to old table name. + insp = reflection.Inspector.from_engine(migrate_engine) + meta = MetaData(bind=migrate_engine) + table = Table(table_name, meta, autoload=True) + default_deleted_value = _get_default_deleted_value(table) + + columns = [] + for column in table.columns: + column_copy = None + if column.name != "deleted": + if isinstance(column.type, NullType): + column_copy = _get_not_supported_column(col_name_col_instance, + column.name) + else: + column_copy = column.copy() + else: + column_copy = Column('deleted', table.c.id.type, + default=default_deleted_value) + columns.append(column_copy) + + def is_deleted_column_constraint(constraint): + # NOTE(boris-42): There is no other way to check is CheckConstraint + # associated with deleted column. + if not isinstance(constraint, CheckConstraint): + return False + sqltext = str(constraint.sqltext) + return (sqltext.endswith("deleted in (0, 1)") or + sqltext.endswith("deleted IN (:deleted_1, :deleted_2)")) + + constraints = [] + for constraint in table.constraints: + if not is_deleted_column_constraint(constraint): + constraints.append(constraint.copy()) + + new_table = Table(table_name + "__tmp__", meta, + *(columns + constraints)) + new_table.create() + + indexes = [] + for index in insp.get_indexes(table_name): + column_names = [new_table.c[c] for c in index['column_names']] + indexes.append(Index(index["name"], *column_names, + unique=index["unique"])) + + ins = InsertFromSelect(new_table, table.select()) + migrate_engine.execute(ins) + + table.drop() + [index.create(migrate_engine) for index in indexes] + + new_table.rename(table_name) + deleted = True # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=new_table.c.id).\ + execute() + + # NOTE(boris-42): Fix value of deleted column: False -> "" or 0. + deleted = False # workaround for pyflakes + new_table.update().\ + where(new_table.c.deleted == deleted).\ + values(deleted=default_deleted_value).\ + execute() diff --git a/tests/unit/db/sqlalchemy/test_migrations.conf b/tests/unit/db/sqlalchemy/test_migrations.conf new file mode 100644 index 0000000..e5e60f3 --- /dev/null +++ b/tests/unit/db/sqlalchemy/test_migrations.conf @@ -0,0 +1,7 @@ +[DEFAULT] +# Set up any number of migration data stores you want, one +# The "name" used in the test is the config variable key. +#sqlite=sqlite:///test_migrations.db +sqlite=sqlite:// +#mysql=mysql://root:@localhost/test_migrations +#postgresql=postgresql://user:pass@localhost/test_migrations diff --git a/tests/unit/db/sqlalchemy/test_migrations.py b/tests/unit/db/sqlalchemy/test_migrations.py new file mode 100644 index 0000000..e541e9e --- /dev/null +++ b/tests/unit/db/sqlalchemy/test_migrations.py @@ -0,0 +1,198 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010-2011 OpenStack Foundation +# Copyright 2012-2013 IBM Corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import commands +import ConfigParser +import os +import urlparse + +import sqlalchemy +import sqlalchemy.exc + +from openstack.common import lockutils +from openstack.common import log as logging + +from tests import utils as test_utils + +LOG = logging.getLogger(__name__) + + +def _get_connect_string(backend, user, passwd, database): + """Get database connection + + Try to get a connection with a very specific set of values, if we get + these then we'll run the tests, otherwise they are skipped + """ + if backend == "postgres": + backend = "postgresql+psycopg2" + elif backend == "mysql": + backend = "mysql+mysqldb" + else: + raise Exception("Unrecognized backend: '%s'" % backend) + + return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" + % locals()) + + +def _is_backend_avail(backend, user, passwd, database): + try: + connect_uri = _get_connect_string(backend, user, passwd, database) + engine = sqlalchemy.create_engine(connect_uri) + connection = engine.connect() + except Exception: + # intentionally catch all to handle exceptions even if we don't + # have any backend code loaded. + return False + else: + connection.close() + engine.dispose() + return True + + +def _have_mysql(user, passwd, database): + present = os.environ.get('TEST_MYSQL_PRESENT') + if present is None: + return _is_backend_avail('mysql', user, passwd, database) + return present.lower() in ('', 'true') + + +def _have_postgresql(user, passwd, database): + present = os.environ.get('TEST_POSTGRESQL_PRESENT') + if present is None: + return _is_backend_avail('postgres', user, passwd, database) + return present.lower() in ('', 'true') + + +def get_db_connection_info(conn_pieces): + database = conn_pieces.path.strip('/') + loc_pieces = conn_pieces.netloc.split('@') + host = loc_pieces[1] + + auth_pieces = loc_pieces[0].split(':') + user = auth_pieces[0] + password = "" + if len(auth_pieces) > 1: + password = auth_pieces[1].strip() + + return (user, password, database, host) + + +class BaseMigrationTestCase(test_utils.BaseTestCase): + """Base class fort testing of migration utils.""" + + def __init__(self, *args, **kwargs): + super(BaseMigrationTestCase, self).__init__(*args, **kwargs) + + self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), + 'test_migrations.conf') + # Test machines can set the TEST_MIGRATIONS_CONF variable + # to override the location of the config file for migration testing + self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF', + self.DEFAULT_CONFIG_FILE) + self.test_databases = {} + self.migration_api = None + + def setUp(self): + super(BaseMigrationTestCase, self).setUp() + + # Load test databases from the config file. Only do this + # once. No need to re-run this on each test... + LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH) + if os.path.exists(self.CONFIG_FILE_PATH): + cp = ConfigParser.RawConfigParser() + try: + cp.read(self.CONFIG_FILE_PATH) + defaults = cp.defaults() + for key, value in defaults.items(): + self.test_databases[key] = value + except ConfigParser.ParsingError, e: + self.fail("Failed to read test_migrations.conf config " + "file. Got error: %s" % e) + else: + self.fail("Failed to find test_migrations.conf config " + "file.") + + self.engines = {} + for key, value in self.test_databases.items(): + self.engines[key] = sqlalchemy.create_engine(value) + + # We start each test case with a completely blank slate. + self._reset_databases() + + def tearDown(self): + # We destroy the test data store between each test case, + # and recreate it, which ensures that we have no side-effects + # from the tests + self._reset_databases() + super(BaseMigrationTestCase, self).tearDown() + + def execute_cmd(self, cmd=None): + status, output = commands.getstatusoutput(cmd) + LOG.debug(output) + self.assertEqual(0, status, + "Failed to run: %s\n%s" % (cmd, output)) + + @lockutils.synchronized('pgadmin', 'tests-', external=True) + def _reset_pg(self, conn_pieces): + (user, password, database, host) = get_db_connection_info(conn_pieces) + os.environ['PGPASSWORD'] = password + os.environ['PGUSER'] = user + # note(boris-42): We must create and drop database, we can't + # drop database which we have connected to, so for such + # operations there is a special database template1. + sqlcmd = ("psql -w -U %(user)s -h %(host)s -c" + " '%(sql)s' -d template1") + + sql = ("drop database if exists %(database)s;") % locals() + droptable = sqlcmd % locals() + self.execute_cmd(droptable) + + sql = ("create database %(database)s;") % locals() + createtable = sqlcmd % locals() + self.execute_cmd(createtable) + + os.unsetenv('PGPASSWORD') + os.unsetenv('PGUSER') + + def _reset_databases(self): + for key, engine in self.engines.items(): + conn_string = self.test_databases[key] + conn_pieces = urlparse.urlparse(conn_string) + engine.dispose() + if conn_string.startswith('sqlite'): + # We can just delete the SQLite database, which is + # the easiest and cleanest solution + db_path = conn_pieces.path.strip('/') + if os.path.exists(db_path): + os.unlink(db_path) + # No need to recreate the SQLite DB. SQLite will + # create it for us if it's not there... + elif conn_string.startswith('mysql'): + # We can execute the MySQL client to destroy and re-create + # the MYSQL database, which is easier and less error-prone + # than using SQLAlchemy to do this via MetaData...trust me. + (user, password, database, host) = \ + get_db_connection_info(conn_pieces) + sql = ("drop database if exists %(database)s; " + "create database %(database)s;") % locals() + cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s " + "-e \"%(sql)s\"") % locals() + self.execute_cmd(cmd) + elif conn_string.startswith('postgresql'): + self._reset_pg(conn_pieces) diff --git a/tests/unit/db/sqlalchemy/test_utils.py b/tests/unit/db/sqlalchemy/test_utils.py new file mode 100644 index 0000000..5636ae8 --- /dev/null +++ b/tests/unit/db/sqlalchemy/test_utils.py @@ -0,0 +1,259 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 Boris Pavlovic (boris@pavlovic.me). +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy.dialects import mysql +from sqlalchemy import Boolean, Index, Integer, DateTime, String +from sqlalchemy import MetaData, Table, Column +from sqlalchemy.engine import reflection +from sqlalchemy.sql import select +from sqlalchemy.types import UserDefinedType, NullType + +from openstack.common.db.sqlalchemy import utils +from openstack.common import exception +from tests.unit.db.sqlalchemy import test_migrations + + +class CustomType(UserDefinedType): + """Dummy column type for testing unsupported types.""" + def get_col_spec(self): + return "CustomType" + + +class TestMigrationUtils(test_migrations.BaseMigrationTestCase): + """Class for testing utils that are used in db migrations.""" + + def _populate_db_for_drop_duplicate_entries(self, engine, meta, + table_name): + values = [ + {'id': 11, 'a': 3, 'b': 10, 'c': 'abcdef'}, + {'id': 12, 'a': 5, 'b': 10, 'c': 'abcdef'}, + {'id': 13, 'a': 6, 'b': 10, 'c': 'abcdef'}, + {'id': 14, 'a': 7, 'b': 10, 'c': 'abcdef'}, + {'id': 21, 'a': 1, 'b': 20, 'c': 'aa'}, + {'id': 31, 'a': 1, 'b': 20, 'c': 'bb'}, + {'id': 41, 'a': 1, 'b': 30, 'c': 'aef'}, + {'id': 42, 'a': 2, 'b': 30, 'c': 'aef'}, + {'id': 43, 'a': 3, 'b': 30, 'c': 'aef'} + ] + + test_table = Table(table_name, meta, + Column('id', Integer, primary_key=True, + nullable=False), + Column('a', Integer), + Column('b', Integer), + Column('c', String(255)), + Column('deleted', Integer, default=0), + Column('deleted_at', DateTime), + Column('updated_at', DateTime)) + + test_table.create() + engine.execute(test_table.insert(), values) + return test_table, values + + def test_drop_old_duplicate_entries_from_table(self): + table_name = "__test_tmp_table__" + + for key, engine in self.engines.items(): + meta = MetaData() + meta.bind = engine + test_table, values = self._populate_db_for_drop_duplicate_entries( + engine, meta, table_name) + utils.drop_old_duplicate_entries_from_table( + engine, table_name, False, 'b', 'c') + + uniq_values = set() + expected_ids = [] + for value in sorted(values, key=lambda x: x['id'], reverse=True): + uniq_value = (('b', value['b']), ('c', value['c'])) + if uniq_value in uniq_values: + continue + uniq_values.add(uniq_value) + expected_ids.append(value['id']) + + real_ids = [row[0] for row in + engine.execute(select([test_table.c.id])).fetchall()] + + self.assertEqual(len(real_ids), len(expected_ids)) + for id_ in expected_ids: + self.assertTrue(id_ in real_ids) + + def test_drop_old_duplicate_entries_from_table_soft_delete(self): + table_name = "__test_tmp_table__" + + for key, engine in self.engines.items(): + meta = MetaData() + meta.bind = engine + table, values = self._populate_db_for_drop_duplicate_entries( + engine, meta, table_name) + utils.drop_old_duplicate_entries_from_table(engine, table_name, + True, 'b', 'c') + uniq_values = set() + expected_values = [] + soft_deleted_values = [] + + for value in sorted(values, key=lambda x: x['id'], reverse=True): + uniq_value = (('b', value['b']), ('c', value['c'])) + if uniq_value in uniq_values: + soft_deleted_values.append(value) + continue + uniq_values.add(uniq_value) + expected_values.append(value) + + base_select = table.select() + + rows_select = base_select.where(table.c.deleted != table.c.id) + row_ids = [row['id'] for row in + engine.execute(rows_select).fetchall()] + self.assertEqual(len(row_ids), len(expected_values)) + for value in expected_values: + self.assertTrue(value['id'] in row_ids) + + deleted_rows_select = base_select.where( + table.c.deleted == table.c.id) + deleted_rows_ids = [row['id'] for row in + engine.execute(deleted_rows_select).fetchall()] + self.assertEqual(len(deleted_rows_ids), + len(values) - len(row_ids)) + for value in soft_deleted_values: + self.assertTrue(value['id'] in deleted_rows_ids) + + def test_change_deleted_column_type_doesnt_drop_index(self): + table_name = 'abc' + for key, engine in self.engines.items(): + meta = MetaData(bind=engine) + + indexes = { + 'idx_a_deleted': ['a', 'deleted'], + 'idx_b_deleted': ['b', 'deleted'], + 'idx_a': ['a'] + } + + index_instances = [Index(name, *columns) + for name, columns in indexes.iteritems()] + + table = Table(table_name, meta, + Column('id', Integer, primary_key=True), + Column('a', String(255)), + Column('b', String(255)), + Column('deleted', Boolean), + *index_instances) + table.create() + utils.change_deleted_column_type_to_id_type(engine, table_name) + utils.change_deleted_column_type_to_boolean(engine, table_name) + + insp = reflection.Inspector.from_engine(engine) + real_indexes = insp.get_indexes(table_name) + self.assertEqual(len(real_indexes), 3) + for index in real_indexes: + name = index['name'] + self.assertIn(name, indexes) + self.assertEqual(set(index['column_names']), + set(indexes[name])) + + def test_change_deleted_column_type_to_id_type_integer(self): + table_name = 'abc' + for key, engine in self.engines.items(): + meta = MetaData() + meta.bind = engine + table = Table(table_name, meta, + Column('id', Integer, primary_key=True), + Column('deleted', Boolean)) + table.create() + utils.change_deleted_column_type_to_id_type(engine, table_name) + + table = utils.get_table(engine, table_name) + self.assertTrue(isinstance(table.c.deleted.type, Integer)) + + def test_change_deleted_column_type_to_id_type_string(self): + table_name = 'abc' + for key, engine in self.engines.items(): + meta = MetaData() + meta.bind = engine + table = Table(table_name, meta, + Column('id', String(255), primary_key=True), + Column('deleted', Boolean)) + table.create() + utils.change_deleted_column_type_to_id_type(engine, table_name) + + table = utils.get_table(engine, table_name) + self.assertTrue(isinstance(table.c.deleted.type, String)) + + def test_change_deleted_column_type_to_id_type_custom(self): + table_name = 'abc' + engine = self.engines['sqlite'] + meta = MetaData() + meta.bind = engine + table = Table(table_name, meta, + Column('id', Integer, primary_key=True), + Column('foo', CustomType), + Column('deleted', Boolean)) + table.create() + + self.assertRaises(exception.OpenstackException, + utils.change_deleted_column_type_to_id_type, + engine, table_name) + + fooColumn = Column('foo', CustomType()) + utils.change_deleted_column_type_to_id_type(engine, table_name, + foo=fooColumn) + + table = utils.get_table(engine, table_name) + # NOTE(boris-42): There is no way to check has foo type CustomType. + # but sqlalchemy will set it to NullType. + self.assertTrue(isinstance(table.c.foo.type, NullType)) + self.assertTrue(isinstance(table.c.deleted.type, Integer)) + + def test_change_deleted_column_type_to_boolean(self): + table_name = 'abc' + for key, engine in self.engines.items(): + meta = MetaData() + meta.bind = engine + table = Table(table_name, meta, + Column('id', Integer, primary_key=True), + Column('deleted', Integer)) + table.create() + + utils.change_deleted_column_type_to_boolean(engine, table_name) + + table = utils.get_table(engine, table_name) + expected_type = Boolean if key != "mysql" else mysql.TINYINT + self.assertTrue(isinstance(table.c.deleted.type, expected_type)) + + def test_change_deleted_column_type_to_boolean_type_custom(self): + table_name = 'abc' + engine = self.engines['sqlite'] + meta = MetaData() + meta.bind = engine + table = Table(table_name, meta, + Column('id', Integer, primary_key=True), + Column('foo', CustomType), + Column('deleted', Integer)) + table.create() + + self.assertRaises(exception.OpenstackException, + utils.change_deleted_column_type_to_boolean, + engine, table_name) + + fooColumn = Column('foo', CustomType()) + utils.change_deleted_column_type_to_boolean(engine, table_name, + foo=fooColumn) + + table = utils.get_table(engine, table_name) + # NOTE(boris-42): There is no way to check has foo type CustomType. + # but sqlalchemy will set it to NullType. + self.assertTrue(isinstance(table.c.foo.type, NullType)) + self.assertTrue(isinstance(table.c.deleted.type, Boolean)) |
