summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorVictor Sergeyev <vsergeyev@mirantis.com>2013-06-03 17:24:07 +0300
committerRoman Podolyaka <rpodolyaka@mirantis.com>2013-07-12 11:23:39 +0300
commit3972c3f494ca7c12e4b9c250a9c2f916165bb6fa (patch)
tree996e9b4717e83328f29e067378ac1eb12b36708b /openstack
parentca0ca29e3f04ced20f62c33ad8b5bf7492775824 (diff)
downloadoslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.tar.gz
oslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.tar.xz
oslo-3972c3f494ca7c12e4b9c250a9c2f916165bb6fa.zip
Migrate sqlalchemy utils from Nova
These utils are useful for database schema migrations and can be reused in other projects. Blueprint: oslo-sqlalchemy-utils Change-Id: Ie54fcd5d75de05b48cca8b86c19325d7327f39cd
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/db/sqlalchemy/utils.py322
1 files changed, 320 insertions, 2 deletions
diff --git a/openstack/common/db/sqlalchemy/utils.py b/openstack/common/db/sqlalchemy/utils.py
index 07030c5..8b855ff 100644
--- a/openstack/common/db/sqlalchemy/utils.py
+++ b/openstack/common/db/sqlalchemy/utils.py
@@ -18,12 +18,28 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Implementation of paginate query."""
-
import sqlalchemy
+from sqlalchemy import Boolean
+from sqlalchemy import CheckConstraint
+from sqlalchemy import Column
+from sqlalchemy.engine import reflection
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy import func
+from sqlalchemy import Index
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy.sql.expression import literal_column
+from sqlalchemy.sql.expression import UpdateBase
+from sqlalchemy.sql import select
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy.types import NullType
from openstack.common.gettextutils import _ # noqa
+
+from openstack.common import exception
from openstack.common import log as logging
+from openstack.common import timeutils
LOG = logging.getLogger(__name__)
@@ -130,3 +146,305 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
query = query.limit(limit)
return query
+
+
+def get_table(engine, name):
+ """Returns an sqlalchemy table dynamically from db.
+
+ Needed because the models don't work for us in migrations
+ as models will be far out of sync with the current data.
+ """
+ metadata = MetaData()
+ metadata.bind = engine
+ return Table(name, metadata, autoload=True)
+
+
+class InsertFromSelect(UpdateBase):
+ """Form the base for `INSERT INTO table (SELECT ... )` statement."""
+ def __init__(self, table, select):
+ self.table = table
+ self.select = select
+
+
+@compiles(InsertFromSelect)
+def visit_insert_from_select(element, compiler, **kw):
+ """Form the `INSERT INTO table (SELECT ... )` statement."""
+ return "INSERT INTO %s %s" % (
+ compiler.process(element.table, asfrom=True),
+ compiler.process(element.select))
+
+
+def _get_not_supported_column(col_name_col_instance, column_name):
+ try:
+ column = col_name_col_instance[column_name]
+ except KeyError:
+ msg = _("Please specify column %s in col_name_col_instance "
+ "param. It is required because column has unsupported "
+ "type by sqlite).")
+ raise exception.OpenstackException(message=msg % column_name)
+
+ if not isinstance(column, Column):
+ msg = _("col_name_col_instance param has wrong type of "
+ "column instance for column %s It should be instance "
+ "of sqlalchemy.Column.")
+ raise exception.OpenstackException(message=msg % column_name)
+ return column
+
+
+def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
+ use_soft_delete, *uc_column_names):
+ """Drop all old rows having the same values for columns in uc_columns.
+
+ This method drop (or mark ad `deleted` if use_soft_delete is True) old
+ duplicate rows form table with name `table_name`.
+
+ :param migrate_engine: Sqlalchemy engine
+ :param table_name: Table with duplicates
+ :param use_soft_delete: If True - values will be marked as `deleted`,
+ if False - values will be removed from table
+ :param uc_column_names: Unique constraint columns
+ """
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ table = Table(table_name, meta, autoload=True)
+ columns_for_group_by = [table.c[name] for name in uc_column_names]
+
+ columns_for_select = [func.max(table.c.id)]
+ columns_for_select.extend(columns_for_group_by)
+
+ duplicated_rows_select = select(columns_for_select,
+ group_by=columns_for_group_by,
+ having=func.count(table.c.id) > 1)
+
+ for row in migrate_engine.execute(duplicated_rows_select):
+ # NOTE(boris-42): Do not remove row that has the biggest ID.
+ delete_condition = table.c.id != row[0]
+ is_none = None # workaround for pyflakes
+ delete_condition &= table.c.deleted_at == is_none
+ for name in uc_column_names:
+ delete_condition &= table.c[name] == row[name]
+
+ rows_to_delete_select = select([table.c.id]).where(delete_condition)
+ for row in migrate_engine.execute(rows_to_delete_select).fetchall():
+ LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
+ "%(table)s") % dict(id=row[0], table=table_name))
+
+ if use_soft_delete:
+ delete_statement = table.update().\
+ where(delete_condition).\
+ values({
+ 'deleted': literal_column('id'),
+ 'updated_at': literal_column('updated_at'),
+ 'deleted_at': timeutils.utcnow()
+ })
+ else:
+ delete_statement = table.delete().where(delete_condition)
+ migrate_engine.execute(delete_statement)
+
+
+def _get_default_deleted_value(table):
+ if isinstance(table.c.id.type, Integer):
+ return 0
+ if isinstance(table.c.id.type, String):
+ return ""
+ raise exception.OpenstackException(
+ message=_("Unsupported id columns type"))
+
+
+def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
+ table = get_table(migrate_engine, table_name)
+
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ real_indexes = insp.get_indexes(table_name)
+ existing_index_names = dict(
+ [(index['name'], index['column_names']) for index in real_indexes])
+
+ # NOTE(boris-42): Restore indexes on `deleted` column
+ for index in indexes:
+ if 'deleted' not in index['column_names']:
+ continue
+ name = index['name']
+ if name in existing_index_names:
+ column_names = [table.c[c] for c in existing_index_names[name]]
+ old_index = Index(name, *column_names, unique=index["unique"])
+ old_index.drop(migrate_engine)
+
+ column_names = [table.c[c] for c in index['column_names']]
+ new_index = Index(index["name"], *column_names, unique=index["unique"])
+ new_index.create(migrate_engine)
+
+
+def change_deleted_column_type_to_boolean(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_boolean_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ old_deleted = Column('old_deleted', Boolean, default=False)
+ old_deleted.create(table, populate_default=False)
+
+ table.update().\
+ where(table.c.deleted == table.c.id).\
+ values(old_deleted=True).\
+ execute()
+
+ table.c.deleted.drop()
+ table.c.old_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ table = get_table(migrate_engine, table_name)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', Boolean, default=0)
+ columns.append(column_copy)
+
+ constraints = [constraint.copy() for constraint in table.constraints]
+
+ meta = MetaData(bind=migrate_engine)
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ c_select = []
+ for c in table.c:
+ if c.name != "deleted":
+ c_select.append(c)
+ else:
+ c_select.append(table.c.deleted == table.c.id)
+
+ ins = InsertFromSelect(new_table, select(c_select))
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ new_table.update().\
+ where(new_table.c.deleted == new_table.c.id).\
+ values(deleted=True).\
+ execute()
+
+
+def change_deleted_column_type_to_id_type(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_id_type_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ new_deleted = Column('new_deleted', table.c.id.type,
+ default=_get_default_deleted_value(table))
+ new_deleted.create(table, populate_default=True)
+
+ deleted = True # workaround for pyflakes
+ table.update().\
+ where(table.c.deleted == deleted).\
+ values(new_deleted=table.c.id).\
+ execute()
+ table.c.deleted.drop()
+ table.c.new_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
+ # constraints in sqlite DB and our `deleted` column has
+ # 2 check constraints. So there is only one way to remove
+ # these constraints:
+ # 1) Create new table with the same columns, constraints
+ # and indexes. (except deleted column).
+ # 2) Copy all data from old to new table.
+ # 3) Drop old table.
+ # 4) Rename new table to old table name.
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ meta = MetaData(bind=migrate_engine)
+ table = Table(table_name, meta, autoload=True)
+ default_deleted_value = _get_default_deleted_value(table)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', table.c.id.type,
+ default=default_deleted_value)
+ columns.append(column_copy)
+
+ def is_deleted_column_constraint(constraint):
+ # NOTE(boris-42): There is no other way to check is CheckConstraint
+ # associated with deleted column.
+ if not isinstance(constraint, CheckConstraint):
+ return False
+ sqltext = str(constraint.sqltext)
+ return (sqltext.endswith("deleted in (0, 1)") or
+ sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
+
+ constraints = []
+ for constraint in table.constraints:
+ if not is_deleted_column_constraint(constraint):
+ constraints.append(constraint.copy())
+
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ ins = InsertFromSelect(new_table, table.select())
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ deleted = True # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=new_table.c.id).\
+ execute()
+
+ # NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
+ deleted = False # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=default_deleted_value).\
+ execute()