summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Sergeyev <vsergeyev@mirantis.com>2013-06-03 17:24:07 +0300
committerRoman Podolyaka <rpodolyaka@mirantis.com>2013-07-12 11:23:39 +0300
commit3972c3f494ca7c12e4b9c250a9c2f916165bb6fa (patch)
tree996e9b4717e83328f29e067378ac1eb12b36708b
parentca0ca29e3f04ced20f62c33ad8b5bf7492775824 (diff)
downloadoslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.tar.gz
oslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.tar.xz
oslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.zip
Migrate sqlalchemy utils from Nova
These utils are useful for database schema migrations and can be reused in other projects. Blueprint: oslo-sqlalchemy-utils Change-Id: Ie54fcd5d75de05b48cca8b86c19325d7327f39cd
-rw-r--r--openstack/common/db/sqlalchemy/utils.py322
-rw-r--r--tests/unit/db/sqlalchemy/test_migrations.conf7
-rw-r--r--tests/unit/db/sqlalchemy/test_migrations.py198
-rw-r--r--tests/unit/db/sqlalchemy/test_utils.py259
4 files changed, 784 insertions, 2 deletions
diff --git a/openstack/common/db/sqlalchemy/utils.py b/openstack/common/db/sqlalchemy/utils.py
index 07030c5..8b855ff 100644
--- a/openstack/common/db/sqlalchemy/utils.py
+++ b/openstack/common/db/sqlalchemy/utils.py
@@ -18,12 +18,28 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Implementation of paginate query."""
-
import sqlalchemy
+from sqlalchemy import Boolean
+from sqlalchemy import CheckConstraint
+from sqlalchemy import Column
+from sqlalchemy.engine import reflection
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy import func
+from sqlalchemy import Index
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy.sql.expression import literal_column
+from sqlalchemy.sql.expression import UpdateBase
+from sqlalchemy.sql import select
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy.types import NullType
from openstack.common.gettextutils import _ # noqa
+
+from openstack.common import exception
from openstack.common import log as logging
+from openstack.common import timeutils
LOG = logging.getLogger(__name__)
@@ -130,3 +146,305 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
query = query.limit(limit)
return query
+
+
+def get_table(engine, name):
+ """Returns an sqlalchemy table dynamically from db.
+
+ Needed because the models don't work for us in migrations
+ as models will be far out of sync with the current data.
+ """
+ metadata = MetaData()
+ metadata.bind = engine
+ return Table(name, metadata, autoload=True)
+
+
+class InsertFromSelect(UpdateBase):
+ """Form the base for `INSERT INTO table (SELECT ... )` statement."""
+ def __init__(self, table, select):
+ self.table = table
+ self.select = select
+
+
+@compiles(InsertFromSelect)
+def visit_insert_from_select(element, compiler, **kw):
+ """Form the `INSERT INTO table (SELECT ... )` statement."""
+ return "INSERT INTO %s %s" % (
+ compiler.process(element.table, asfrom=True),
+ compiler.process(element.select))
+
+
+def _get_not_supported_column(col_name_col_instance, column_name):
+ try:
+ column = col_name_col_instance[column_name]
+ except KeyError:
+ msg = _("Please specify column %s in col_name_col_instance "
+ "param. It is required because column has unsupported "
+ "type by sqlite).")
+ raise exception.OpenstackException(message=msg % column_name)
+
+ if not isinstance(column, Column):
+ msg = _("col_name_col_instance param has wrong type of "
+ "column instance for column %s It should be instance "
+ "of sqlalchemy.Column.")
+ raise exception.OpenstackException(message=msg % column_name)
+ return column
+
+
+def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
+ use_soft_delete, *uc_column_names):
+ """Drop all old rows having the same values for columns in uc_columns.
+
+ This method drop (or mark ad `deleted` if use_soft_delete is True) old
+ duplicate rows form table with name `table_name`.
+
+ :param migrate_engine: Sqlalchemy engine
+ :param table_name: Table with duplicates
+ :param use_soft_delete: If True - values will be marked as `deleted`,
+ if False - values will be removed from table
+ :param uc_column_names: Unique constraint columns
+ """
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ table = Table(table_name, meta, autoload=True)
+ columns_for_group_by = [table.c[name] for name in uc_column_names]
+
+ columns_for_select = [func.max(table.c.id)]
+ columns_for_select.extend(columns_for_group_by)
+
+ duplicated_rows_select = select(columns_for_select,
+ group_by=columns_for_group_by,
+ having=func.count(table.c.id) > 1)
+
+ for row in migrate_engine.execute(duplicated_rows_select):
+ # NOTE(boris-42): Do not remove row that has the biggest ID.
+ delete_condition = table.c.id != row[0]
+ is_none = None # workaround for pyflakes
+ delete_condition &= table.c.deleted_at == is_none
+ for name in uc_column_names:
+ delete_condition &= table.c[name] == row[name]
+
+ rows_to_delete_select = select([table.c.id]).where(delete_condition)
+ for row in migrate_engine.execute(rows_to_delete_select).fetchall():
+ LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
+ "%(table)s") % dict(id=row[0], table=table_name))
+
+ if use_soft_delete:
+ delete_statement = table.update().\
+ where(delete_condition).\
+ values({
+ 'deleted': literal_column('id'),
+ 'updated_at': literal_column('updated_at'),
+ 'deleted_at': timeutils.utcnow()
+ })
+ else:
+ delete_statement = table.delete().where(delete_condition)
+ migrate_engine.execute(delete_statement)
+
+
+def _get_default_deleted_value(table):
+ if isinstance(table.c.id.type, Integer):
+ return 0
+ if isinstance(table.c.id.type, String):
+ return ""
+ raise exception.OpenstackException(
+ message=_("Unsupported id columns type"))
+
+
+def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
+ table = get_table(migrate_engine, table_name)
+
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ real_indexes = insp.get_indexes(table_name)
+ existing_index_names = dict(
+ [(index['name'], index['column_names']) for index in real_indexes])
+
+ # NOTE(boris-42): Restore indexes on `deleted` column
+ for index in indexes:
+ if 'deleted' not in index['column_names']:
+ continue
+ name = index['name']
+ if name in existing_index_names:
+ column_names = [table.c[c] for c in existing_index_names[name]]
+ old_index = Index(name, *column_names, unique=index["unique"])
+ old_index.drop(migrate_engine)
+
+ column_names = [table.c[c] for c in index['column_names']]
+ new_index = Index(index["name"], *column_names, unique=index["unique"])
+ new_index.create(migrate_engine)
+
+
+def change_deleted_column_type_to_boolean(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_boolean_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ old_deleted = Column('old_deleted', Boolean, default=False)
+ old_deleted.create(table, populate_default=False)
+
+ table.update().\
+ where(table.c.deleted == table.c.id).\
+ values(old_deleted=True).\
+ execute()
+
+ table.c.deleted.drop()
+ table.c.old_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ table = get_table(migrate_engine, table_name)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', Boolean, default=0)
+ columns.append(column_copy)
+
+ constraints = [constraint.copy() for constraint in table.constraints]
+
+ meta = MetaData(bind=migrate_engine)
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ c_select = []
+ for c in table.c:
+ if c.name != "deleted":
+ c_select.append(c)
+ else:
+ c_select.append(table.c.deleted == table.c.id)
+
+ ins = InsertFromSelect(new_table, select(c_select))
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ new_table.update().\
+ where(new_table.c.deleted == new_table.c.id).\
+ values(deleted=True).\
+ execute()
+
+
+def change_deleted_column_type_to_id_type(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_id_type_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ new_deleted = Column('new_deleted', table.c.id.type,
+ default=_get_default_deleted_value(table))
+ new_deleted.create(table, populate_default=True)
+
+ deleted = True # workaround for pyflakes
+ table.update().\
+ where(table.c.deleted == deleted).\
+ values(new_deleted=table.c.id).\
+ execute()
+ table.c.deleted.drop()
+ table.c.new_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
+ # constraints in sqlite DB and our `deleted` column has
+ # 2 check constraints. So there is only one way to remove
+ # these constraints:
+ # 1) Create new table with the same columns, constraints
+ # and indexes. (except deleted column).
+ # 2) Copy all data from old to new table.
+ # 3) Drop old table.
+ # 4) Rename new table to old table name.
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ meta = MetaData(bind=migrate_engine)
+ table = Table(table_name, meta, autoload=True)
+ default_deleted_value = _get_default_deleted_value(table)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', table.c.id.type,
+ default=default_deleted_value)
+ columns.append(column_copy)
+
+ def is_deleted_column_constraint(constraint):
+ # NOTE(boris-42): There is no other way to check is CheckConstraint
+ # associated with deleted column.
+ if not isinstance(constraint, CheckConstraint):
+ return False
+ sqltext = str(constraint.sqltext)
+ return (sqltext.endswith("deleted in (0, 1)") or
+ sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
+
+ constraints = []
+ for constraint in table.constraints:
+ if not is_deleted_column_constraint(constraint):
+ constraints.append(constraint.copy())
+
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ ins = InsertFromSelect(new_table, table.select())
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ deleted = True # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=new_table.c.id).\
+ execute()
+
+ # NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
+ deleted = False # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=default_deleted_value).\
+ execute()
diff --git a/tests/unit/db/sqlalchemy/test_migrations.conf b/tests/unit/db/sqlalchemy/test_migrations.conf
new file mode 100644
index 0000000..e5e60f3
--- /dev/null
+++ b/tests/unit/db/sqlalchemy/test_migrations.conf
@@ -0,0 +1,7 @@
+[DEFAULT]
+# Set up any number of migration data stores you want, one
+# The "name" used in the test is the config variable key.
+#sqlite=sqlite:///test_migrations.db
+sqlite=sqlite://
+#mysql=mysql://root:@localhost/test_migrations
+#postgresql=postgresql://user:pass@localhost/test_migrations
diff --git a/tests/unit/db/sqlalchemy/test_migrations.py b/tests/unit/db/sqlalchemy/test_migrations.py
new file mode 100644
index 0000000..e541e9e
--- /dev/null
+++ b/tests/unit/db/sqlalchemy/test_migrations.py
@@ -0,0 +1,198 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010-2011 OpenStack Foundation
+# Copyright 2012-2013 IBM Corp.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import commands
+import ConfigParser
+import os
+import urlparse
+
+import sqlalchemy
+import sqlalchemy.exc
+
+from openstack.common import lockutils
+from openstack.common import log as logging
+
+from tests import utils as test_utils
+
+LOG = logging.getLogger(__name__)
+
+
+def _get_connect_string(backend, user, passwd, database):
+ """Get database connection
+
+ Try to get a connection with a very specific set of values, if we get
+ these then we'll run the tests, otherwise they are skipped
+ """
+ if backend == "postgres":
+ backend = "postgresql+psycopg2"
+ elif backend == "mysql":
+ backend = "mysql+mysqldb"
+ else:
+ raise Exception("Unrecognized backend: '%s'" % backend)
+
+ return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
+ % locals())
+
+
+def _is_backend_avail(backend, user, passwd, database):
+ try:
+ connect_uri = _get_connect_string(backend, user, passwd, database)
+ engine = sqlalchemy.create_engine(connect_uri)
+ connection = engine.connect()
+ except Exception:
+ # intentionally catch all to handle exceptions even if we don't
+ # have any backend code loaded.
+ return False
+ else:
+ connection.close()
+ engine.dispose()
+ return True
+
+
+def _have_mysql(user, passwd, database):
+ present = os.environ.get('TEST_MYSQL_PRESENT')
+ if present is None:
+ return _is_backend_avail('mysql', user, passwd, database)
+ return present.lower() in ('', 'true')
+
+
+def _have_postgresql(user, passwd, database):
+ present = os.environ.get('TEST_POSTGRESQL_PRESENT')
+ if present is None:
+ return _is_backend_avail('postgres', user, passwd, database)
+ return present.lower() in ('', 'true')
+
+
+def get_db_connection_info(conn_pieces):
+ database = conn_pieces.path.strip('/')
+ loc_pieces = conn_pieces.netloc.split('@')
+ host = loc_pieces[1]
+
+ auth_pieces = loc_pieces[0].split(':')
+ user = auth_pieces[0]
+ password = ""
+ if len(auth_pieces) > 1:
+ password = auth_pieces[1].strip()
+
+ return (user, password, database, host)
+
+
+class BaseMigrationTestCase(test_utils.BaseTestCase):
+ """Base class fort testing of migration utils."""
+
+ def __init__(self, *args, **kwargs):
+ super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
+
+ self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
+ 'test_migrations.conf')
+ # Test machines can set the TEST_MIGRATIONS_CONF variable
+ # to override the location of the config file for migration testing
+ self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
+ self.DEFAULT_CONFIG_FILE)
+ self.test_databases = {}
+ self.migration_api = None
+
+ def setUp(self):
+ super(BaseMigrationTestCase, self).setUp()
+
+ # Load test databases from the config file. Only do this
+ # once. No need to re-run this on each test...
+ LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
+ if os.path.exists(self.CONFIG_FILE_PATH):
+ cp = ConfigParser.RawConfigParser()
+ try:
+ cp.read(self.CONFIG_FILE_PATH)
+ defaults = cp.defaults()
+ for key, value in defaults.items():
+ self.test_databases[key] = value
+ except ConfigParser.ParsingError, e:
+ self.fail("Failed to read test_migrations.conf config "
+ "file. Got error: %s" % e)
+ else:
+ self.fail("Failed to find test_migrations.conf config "
+ "file.")
+
+ self.engines = {}
+ for key, value in self.test_databases.items():
+ self.engines[key] = sqlalchemy.create_engine(value)
+
+ # We start each test case with a completely blank slate.
+ self._reset_databases()
+
+ def tearDown(self):
+ # We destroy the test data store between each test case,
+ # and recreate it, which ensures that we have no side-effects
+ # from the tests
+ self._reset_databases()
+ super(BaseMigrationTestCase, self).tearDown()
+
+ def execute_cmd(self, cmd=None):
+ status, output = commands.getstatusoutput(cmd)
+ LOG.debug(output)
+ self.assertEqual(0, status,
+ "Failed to run: %s\n%s" % (cmd, output))
+
+ @lockutils.synchronized('pgadmin', 'tests-', external=True)
+ def _reset_pg(self, conn_pieces):
+ (user, password, database, host) = get_db_connection_info(conn_pieces)
+ os.environ['PGPASSWORD'] = password
+ os.environ['PGUSER'] = user
+ # note(boris-42): We must create and drop database, we can't
+ # drop database which we have connected to, so for such
+ # operations there is a special database template1.
+ sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
+ " '%(sql)s' -d template1")
+
+ sql = ("drop database if exists %(database)s;") % locals()
+ droptable = sqlcmd % locals()
+ self.execute_cmd(droptable)
+
+ sql = ("create database %(database)s;") % locals()
+ createtable = sqlcmd % locals()
+ self.execute_cmd(createtable)
+
+ os.unsetenv('PGPASSWORD')
+ os.unsetenv('PGUSER')
+
+ def _reset_databases(self):
+ for key, engine in self.engines.items():
+ conn_string = self.test_databases[key]
+ conn_pieces = urlparse.urlparse(conn_string)
+ engine.dispose()
+ if conn_string.startswith('sqlite'):
+ # We can just delete the SQLite database, which is
+ # the easiest and cleanest solution
+ db_path = conn_pieces.path.strip('/')
+ if os.path.exists(db_path):
+ os.unlink(db_path)
+ # No need to recreate the SQLite DB. SQLite will
+ # create it for us if it's not there...
+ elif conn_string.startswith('mysql'):
+ # We can execute the MySQL client to destroy and re-create
+ # the MYSQL database, which is easier and less error-prone
+ # than using SQLAlchemy to do this via MetaData...trust me.
+ (user, password, database, host) = \
+ get_db_connection_info(conn_pieces)
+ sql = ("drop database if exists %(database)s; "
+ "create database %(database)s;") % locals()
+ cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
+ "-e \"%(sql)s\"") % locals()
+ self.execute_cmd(cmd)
+ elif conn_string.startswith('postgresql'):
+ self._reset_pg(conn_pieces)
diff --git a/tests/unit/db/sqlalchemy/test_utils.py b/tests/unit/db/sqlalchemy/test_utils.py
new file mode 100644
index 0000000..5636ae8
--- /dev/null
+++ b/tests/unit/db/sqlalchemy/test_utils.py
@@ -0,0 +1,259 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 Boris Pavlovic (boris@pavlovic.me).
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy.dialects import mysql
+from sqlalchemy import Boolean, Index, Integer, DateTime, String
+from sqlalchemy import MetaData, Table, Column
+from sqlalchemy.engine import reflection
+from sqlalchemy.sql import select
+from sqlalchemy.types import UserDefinedType, NullType
+
+from openstack.common.db.sqlalchemy import utils
+from openstack.common import exception
+from tests.unit.db.sqlalchemy import test_migrations
+
+
+class CustomType(UserDefinedType):
+ """Dummy column type for testing unsupported types."""
+ def get_col_spec(self):
+ return "CustomType"
+
+
+class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
+ """Class for testing utils that are used in db migrations."""
+
+ def _populate_db_for_drop_duplicate_entries(self, engine, meta,
+ table_name):
+ values = [
+ {'id': 11, 'a': 3, 'b': 10, 'c': 'abcdef'},
+ {'id': 12, 'a': 5, 'b': 10, 'c': 'abcdef'},
+ {'id': 13, 'a': 6, 'b': 10, 'c': 'abcdef'},
+ {'id': 14, 'a': 7, 'b': 10, 'c': 'abcdef'},
+ {'id': 21, 'a': 1, 'b': 20, 'c': 'aa'},
+ {'id': 31, 'a': 1, 'b': 20, 'c': 'bb'},
+ {'id': 41, 'a': 1, 'b': 30, 'c': 'aef'},
+ {'id': 42, 'a': 2, 'b': 30, 'c': 'aef'},
+ {'id': 43, 'a': 3, 'b': 30, 'c': 'aef'}
+ ]
+
+ test_table = Table(table_name, meta,
+ Column('id', Integer, primary_key=True,
+ nullable=False),
+ Column('a', Integer),
+ Column('b', Integer),
+ Column('c', String(255)),
+ Column('deleted', Integer, default=0),
+ Column('deleted_at', DateTime),
+ Column('updated_at', DateTime))
+
+ test_table.create()
+ engine.execute(test_table.insert(), values)
+ return test_table, values
+
+ def test_drop_old_duplicate_entries_from_table(self):
+ table_name = "__test_tmp_table__"
+
+ for key, engine in self.engines.items():
+ meta = MetaData()
+ meta.bind = engine
+ test_table, values = self._populate_db_for_drop_duplicate_entries(
+ engine, meta, table_name)
+ utils.drop_old_duplicate_entries_from_table(
+ engine, table_name, False, 'b', 'c')
+
+ uniq_values = set()
+ expected_ids = []
+ for value in sorted(values, key=lambda x: x['id'], reverse=True):
+ uniq_value = (('b', value['b']), ('c', value['c']))
+ if uniq_value in uniq_values:
+ continue
+ uniq_values.add(uniq_value)
+ expected_ids.append(value['id'])
+
+ real_ids = [row[0] for row in
+ engine.execute(select([test_table.c.id])).fetchall()]
+
+ self.assertEqual(len(real_ids), len(expected_ids))
+ for id_ in expected_ids:
+ self.assertTrue(id_ in real_ids)
+
+ def test_drop_old_duplicate_entries_from_table_soft_delete(self):
+ table_name = "__test_tmp_table__"
+
+ for key, engine in self.engines.items():
+ meta = MetaData()
+ meta.bind = engine
+ table, values = self._populate_db_for_drop_duplicate_entries(
+ engine, meta, table_name)
+ utils.drop_old_duplicate_entries_from_table(engine, table_name,
+ True, 'b', 'c')
+ uniq_values = set()
+ expected_values = []
+ soft_deleted_values = []
+
+ for value in sorted(values, key=lambda x: x['id'], reverse=True):
+ uniq_value = (('b', value['b']), ('c', value['c']))
+ if uniq_value in uniq_values:
+ soft_deleted_values.append(value)
+ continue
+ uniq_values.add(uniq_value)
+ expected_values.append(value)
+
+ base_select = table.select()
+
+ rows_select = base_select.where(table.c.deleted != table.c.id)
+ row_ids = [row['id'] for row in
+ engine.execute(rows_select).fetchall()]
+ self.assertEqual(len(row_ids), len(expected_values))
+ for value in expected_values:
+ self.assertTrue(value['id'] in row_ids)
+
+ deleted_rows_select = base_select.where(
+ table.c.deleted == table.c.id)
+ deleted_rows_ids = [row['id'] for row in
+ engine.execute(deleted_rows_select).fetchall()]
+ self.assertEqual(len(deleted_rows_ids),
+ len(values) - len(row_ids))
+ for value in soft_deleted_values:
+ self.assertTrue(value['id'] in deleted_rows_ids)
+
+ def test_change_deleted_column_type_doesnt_drop_index(self):
+ table_name = 'abc'
+ for key, engine in self.engines.items():
+ meta = MetaData(bind=engine)
+
+ indexes = {
+ 'idx_a_deleted': ['a', 'deleted'],
+ 'idx_b_deleted': ['b', 'deleted'],
+ 'idx_a': ['a']
+ }
+
+ index_instances = [Index(name, *columns)
+ for name, columns in indexes.iteritems()]
+
+ table = Table(table_name, meta,
+ Column('id', Integer, primary_key=True),
+ Column('a', String(255)),
+ Column('b', String(255)),
+ Column('deleted', Boolean),
+ *index_instances)
+ table.create()
+ utils.change_deleted_column_type_to_id_type(engine, table_name)
+ utils.change_deleted_column_type_to_boolean(engine, table_name)
+
+ insp = reflection.Inspector.from_engine(engine)
+ real_indexes = insp.get_indexes(table_name)
+ self.assertEqual(len(real_indexes), 3)
+ for index in real_indexes:
+ name = index['name']
+ self.assertIn(name, indexes)
+ self.assertEqual(set(index['column_names']),
+ set(indexes[name]))
+
+ def test_change_deleted_column_type_to_id_type_integer(self):
+ table_name = 'abc'
+ for key, engine in self.engines.items():
+ meta = MetaData()
+ meta.bind = engine
+ table = Table(table_name, meta,
+ Column('id', Integer, primary_key=True),
+ Column('deleted', Boolean))
+ table.create()
+ utils.change_deleted_column_type_to_id_type(engine, table_name)
+
+ table = utils.get_table(engine, table_name)
+ self.assertTrue(isinstance(table.c.deleted.type, Integer))
+
+ def test_change_deleted_column_type_to_id_type_string(self):
+ table_name = 'abc'
+ for key, engine in self.engines.items():
+ meta = MetaData()
+ meta.bind = engine
+ table = Table(table_name, meta,
+ Column('id', String(255), primary_key=True),
+ Column('deleted', Boolean))
+ table.create()
+ utils.change_deleted_column_type_to_id_type(engine, table_name)
+
+ table = utils.get_table(engine, table_name)
+ self.assertTrue(isinstance(table.c.deleted.type, String))
+
+ def test_change_deleted_column_type_to_id_type_custom(self):
+ table_name = 'abc'
+ engine = self.engines['sqlite']
+ meta = MetaData()
+ meta.bind = engine
+ table = Table(table_name, meta,
+ Column('id', Integer, primary_key=True),
+ Column('foo', CustomType),
+ Column('deleted', Boolean))
+ table.create()
+
+ self.assertRaises(exception.OpenstackException,
+ utils.change_deleted_column_type_to_id_type,
+ engine, table_name)
+
+ fooColumn = Column('foo', CustomType())
+ utils.change_deleted_column_type_to_id_type(engine, table_name,
+ foo=fooColumn)
+
+ table = utils.get_table(engine, table_name)
+ # NOTE(boris-42): There is no way to check has foo type CustomType.
+ # but sqlalchemy will set it to NullType.
+ self.assertTrue(isinstance(table.c.foo.type, NullType))
+ self.assertTrue(isinstance(table.c.deleted.type, Integer))
+
+ def test_change_deleted_column_type_to_boolean(self):
+ table_name = 'abc'
+ for key, engine in self.engines.items():
+ meta = MetaData()
+ meta.bind = engine
+ table = Table(table_name, meta,
+ Column('id', Integer, primary_key=True),
+ Column('deleted', Integer))
+ table.create()
+
+ utils.change_deleted_column_type_to_boolean(engine, table_name)
+
+ table = utils.get_table(engine, table_name)
+ expected_type = Boolean if key != "mysql" else mysql.TINYINT
+ self.assertTrue(isinstance(table.c.deleted.type, expected_type))
+
+ def test_change_deleted_column_type_to_boolean_type_custom(self):
+ table_name = 'abc'
+ engine = self.engines['sqlite']
+ meta = MetaData()
+ meta.bind = engine
+ table = Table(table_name, meta,
+ Column('id', Integer, primary_key=True),
+ Column('foo', CustomType),
+ Column('deleted', Integer))
+ table.create()
+
+ self.assertRaises(exception.OpenstackException,
+ utils.change_deleted_column_type_to_boolean,
+ engine, table_name)
+
+ fooColumn = Column('foo', CustomType())
+ utils.change_deleted_column_type_to_boolean(engine, table_name,
+ foo=fooColumn)
+
+ table = utils.get_table(engine, table_name)
+ # NOTE(boris-42): There is no way to check has foo type CustomType.
+ # but sqlalchemy will set it to NullType.
+ self.assertTrue(isinstance(table.c.foo.type, NullType))
+ self.assertTrue(isinstance(table.c.deleted.type, Boolean))