From cf6e7ac1e4a9fdab81476741849f8100c10b9770 Mon Sep 17 00:00:00 2001 From: Boris Pavlovic Date: Fri, 15 Feb 2013 15:16:27 +0400 Subject: Add generic dropper for duplicate rows Add generic function that help us to drop duplicated rows from table, before creating unique constarints. It is very usefull in migrations, because it will allow DBA to make migrations without IntegrityError exceptions. It deletes all duplicates of rows except row with max value in id column. blueprint db-unique-keys Change-Id: I7e48d3eb78cf9a5d30752541b143b0c9fa1f838a --- nova/db/sqlalchemy/utils.py | 53 ++++++++++++++++++- nova/tests/test_migration_utils.py | 103 ++++++++++++++++++++++++++++++++++++- 2 files changed, 154 insertions(+), 2 deletions(-) diff --git a/nova/db/sqlalchemy/utils.py b/nova/db/sqlalchemy/utils.py index c656cbc9c..c67d8888d 100644 --- a/nova/db/sqlalchemy/utils.py +++ b/nova/db/sqlalchemy/utils.py @@ -18,11 +18,19 @@ from migrate.changeset import UniqueConstraint from sqlalchemy.engine import reflection from sqlalchemy.ext.compiler import compiles +from sqlalchemy import func from sqlalchemy import MetaData, Table, Column, Index -from sqlalchemy.sql.expression import UpdateBase +from sqlalchemy.sql.expression import UpdateBase, literal_column +from sqlalchemy.sql import select from sqlalchemy.types import NullType + from nova import exception +from nova.openstack.common import log as logging +from nova.openstack.common import timeutils + + +LOG = logging.getLogger(__name__) class InsertFromSelect(UpdateBase): @@ -115,3 +123,46 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns, else: _drop_unique_constraint_in_sqlite(migrate_engine, table_name, uc_name, **col_name_col_instance) + + +def drop_old_duplicate_entries_from_table(migrate_engine, table_name, + use_soft_delete, *uc_column_names): + """ + This method is used to drop all old rowss that have the same values for + columns in uc_columns. + """ + meta = MetaData() + meta.bind = migrate_engine + + table = Table(table_name, meta, autoload=True) + columns_for_group_by = [table.c[name] for name in uc_column_names] + + columns_for_select = [func.max(table.c.id)] + columns_for_select.extend(list(columns_for_group_by)) + + duplicated_rows_select = select(columns_for_select, + group_by=columns_for_group_by, + having=func.count(table.c.id) > 1) + + for row in migrate_engine.execute(duplicated_rows_select): + # NOTE(boris-42): Do not remove row that has the biggest ID. + delete_condition = table.c.id != row[0] + for name in uc_column_names: + delete_condition &= table.c[name] == row[name] + + rows_to_delete_select = select([table.c.id]).where(delete_condition) + for row in migrate_engine.execute(rows_to_delete_select).fetchall(): + LOG.info(_("Deleted duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) + + if use_soft_delete: + delete_statement = table.update().\ + where(delete_condition).\ + values({ + 'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow() + }) + else: + delete_statement = table.delete().where(delete_condition) + migrate_engine.execute(delete_statement) diff --git a/nova/tests/test_migration_utils.py b/nova/tests/test_migration_utils.py index 45b6d86d4..ddaaa2552 100644 --- a/nova/tests/test_migration_utils.py +++ b/nova/tests/test_migration_utils.py @@ -16,7 +16,9 @@ # under the License. from migrate.changeset import UniqueConstraint -from sqlalchemy import MetaData, Table, Column, Integer, BigInteger +from sqlalchemy import Integer, BigInteger, DateTime, String +from sqlalchemy import MetaData, Table, Column +from sqlalchemy.sql import select from nova.db.sqlalchemy import utils from nova import exception @@ -124,3 +126,102 @@ class TestMigrationUtils(test_migrations.BaseMigrationTestCase): self.assertEqual(len(constraints), 0) self.assertEqual(len(test_table.constraints), 1) test_table.drop() + + def _populate_db_for_drop_duplicate_entries(self, engine, meta, + table_name): + values = [ + {'id': 11, 'a': 3, 'b': 10, 'c': 'abcdef'}, + {'id': 12, 'a': 5, 'b': 10, 'c': 'abcdef'}, + {'id': 13, 'a': 6, 'b': 10, 'c': 'abcdef'}, + {'id': 14, 'a': 7, 'b': 10, 'c': 'abcdef'}, + {'id': 21, 'a': 1, 'b': 20, 'c': 'aa'}, + {'id': 31, 'a': 1, 'b': 20, 'c': 'bb'}, + {'id': 41, 'a': 1, 'b': 30, 'c': 'aef'}, + {'id': 42, 'a': 2, 'b': 30, 'c': 'aef'}, + {'id': 43, 'a': 3, 'b': 30, 'c': 'aef'} + ] + + test_table = Table(table_name, meta, + Column('id', Integer, primary_key=True, + nullable=False), + Column('a', Integer), + Column('b', Integer), + Column('c', String), + Column('deleted', Integer, default=0), + Column('deleted_at', DateTime), + Column('updated_at', DateTime)) + + test_table.create() + engine.execute(test_table.insert(), values) + return test_table, values + + def test_drop_old_duplicate_entries_from_table(self): + table_name = "__test_tmp_table__" + + for key, engine in self.engines.items(): + meta = MetaData() + meta.bind = engine + test_table, values = self.\ + _populate_db_for_drop_duplicate_entries(engine, meta, + table_name) + + utils.drop_old_duplicate_entries_from_table(engine, table_name, + False, 'b', 'c') + + uniq_values = set() + expected_ids = [] + for value in sorted(values, key=lambda x: x['id'], reverse=True): + uniq_value = (('b', value['b']), ('c', value['c'])) + if uniq_value in uniq_values: + continue + uniq_values.add(uniq_value) + expected_ids.append(value['id']) + + real_ids = [row[0] for row in + engine.execute(select([test_table.c.id])).fetchall()] + + self.assertEqual(len(real_ids), len(expected_ids)) + for id_ in expected_ids: + self.assertTrue(id_ in real_ids) + + def test_drop_old_duplicate_entries_from_table_soft_delete(self): + table_name = "__test_tmp_table__" + + for key, engine in self.engines.items(): + meta = MetaData() + meta.bind = engine + table, values = self.\ + _populate_db_for_drop_duplicate_entries(engine, meta, + table_name) + utils.drop_old_duplicate_entries_from_table(engine, table_name, + True, 'b', 'c') + uniq_values = set() + expected_values = [] + soft_deleted_values = [] + + for value in sorted(values, key=lambda x: x['id'], reverse=True): + uniq_value = (('b', value['b']), ('c', value['c'])) + if uniq_value in uniq_values: + soft_deleted_values.append(value) + continue + uniq_values.add(uniq_value) + expected_values.append(value) + + base_select = table.select() + + rows_select = base_select.\ + where(table.c.deleted != table.c.id) + row_ids = [row['id'] for row in + engine.execute(rows_select).fetchall()] + self.assertEqual(len(row_ids), len(expected_values)) + for value in expected_values: + self.assertTrue(value['id'] in row_ids) + + deleted_rows_select = base_select.\ + where(table.c.deleted == table.c.id) + deleted_rows_ids = [row['id'] for row in + engine.execute(deleted_rows_select).fetchall()] + self.assertEqual(len(deleted_rows_ids), + len(values) - len(row_ids)) + for value in soft_deleted_values: + self.assertTrue(value['id'] in deleted_rows_ids) -- cgit