diff options
-rwxr-xr-x | bin/nova-manage | 11 | ||||
-rw-r--r-- | nova/db/api.py | 22 | ||||
-rw-r--r-- | nova/db/sqlalchemy/api.py | 99 | ||||
-rw-r--r-- | nova/db/sqlalchemy/migrate_repo/versions/154_add_shadow_tables.py | 77 | ||||
-rw-r--r-- | nova/tests/test_db_api.py | 161 | ||||
-rw-r--r-- | nova/tests/test_migrations.py | 46 |
6 files changed, 416 insertions, 0 deletions
diff --git a/bin/nova-manage b/bin/nova-manage index c39c3e4eb..22549a50d 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -770,6 +770,17 @@ class DbCommands(object): """Print the current database version.""" print migration.db_version() + @args('--max_rows', dest='max_rows', metavar='<number>', + help='Maximum number of deleted rows to archive') + def archive_deleted_rows(self, max_rows=None): + """Move up to max_rows deleted rows from production tables to shadow + tables. + """ + if max_rows is not None: + max_rows = int(max_rows) + admin_context = context.get_admin_context() + db.archive_deleted_rows(admin_context, max_rows) + class InstanceTypeCommands(object): """Class for managing instance types / flavors.""" diff --git a/nova/db/api.py b/nova/db/api.py index b07cd6b8b..6ec0b3a95 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -1715,3 +1715,25 @@ def task_log_get(context, task_name, period_beginning, period_ending, host, state=None): return IMPL.task_log_get(context, task_name, period_beginning, period_ending, host, state) + + +#################### + + +def archive_deleted_rows(context, max_rows=None): + """Move up to max_rows rows from production tables to corresponding shadow + tables. + + :returns: number of rows archived. + """ + return IMPL.archive_deleted_rows(context, max_rows=max_rows) + + +def archive_deleted_rows_for_table(context, tablename, max_rows=None): + """Move up to max_rows rows from tablename to corresponding shadow + table. + + :returns: number of rows archived. + """ + return IMPL.archive_deleted_rows_for_table(context, tablename, + max_rows=max_rows) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 96e7c6255..eb9181fce 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -26,13 +26,20 @@ import functools import uuid from sqlalchemy import and_ +from sqlalchemy import Boolean from sqlalchemy.exc import IntegrityError +from sqlalchemy.exc import NoSuchTableError +from sqlalchemy import Integer +from sqlalchemy import MetaData from sqlalchemy import or_ from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload_all +from sqlalchemy.schema import Table from sqlalchemy.sql.expression import asc from sqlalchemy.sql.expression import desc +from sqlalchemy.sql.expression import select from sqlalchemy.sql import func +from sqlalchemy import String from nova import block_device from nova.compute import task_states @@ -63,6 +70,7 @@ CONF.import_opt('sql_connection', LOG = logging.getLogger(__name__) +get_engine = db_session.get_engine get_session = db_session.get_session @@ -4786,3 +4794,94 @@ def task_log_end_task(context, task_name, period_beginning, period_ending, if rows == 0: #It's not running! raise exception.TaskNotRunning(task_name=task_name, host=host) + + +def _get_default_deleted_value(table): + # TODO(dripton): It would be better to introspect the actual default value + # from the column, but I don't see a way to do that in the low-level APIs + # of SQLAlchemy 0.7. 0.8 has better introspection APIs, which we should + # use when Nova is ready to require 0.8. + deleted_column_type = table.c.deleted.type + if isinstance(deleted_column_type, Integer): + return 0 + elif isinstance(deleted_column_type, Boolean): + return False + elif isinstance(deleted_column_type, String): + return "" + else: + return None + + +@require_admin_context +def archive_deleted_rows_for_table(context, tablename, max_rows=None): + """Move up to max_rows rows from one tables to the corresponding + shadow table. + + :returns: number of rows archived + """ + # The context argument is only used for the decorator. + if max_rows is None: + max_rows = 5000 + engine = get_engine() + conn = engine.connect() + metadata = MetaData() + metadata.bind = engine + table = Table(tablename, metadata, autoload=True) + default_deleted_value = _get_default_deleted_value(table) + shadow_tablename = "shadow_" + tablename + rows_archived = 0 + try: + shadow_table = Table(shadow_tablename, metadata, autoload=True) + except NoSuchTableError: + # No corresponding shadow table; skip it. + return rows_archived + # Group the insert and delete in a transaction. + with conn.begin(): + # TODO(dripton): It would be more efficient to insert(select) and then + # delete(same select) without ever returning the selected rows back to + # Python. sqlalchemy does not support that directly, but we have + # nova.db.sqlalchemy.utils.InsertFromSelect for the insert side. We + # need a corresponding function for the delete side. + try: + column = table.c.id + column_name = "id" + except AttributeError: + # We have one table (dns_domains) where the key is called + # "domain" rather than "id" + column = table.c.domain + column_name = "domain" + query = select([table], + table.c.deleted != default_deleted_value).\ + order_by(column).limit(max_rows) + rows = conn.execute(query).fetchall() + if rows: + insert_statement = shadow_table.insert() + conn.execute(insert_statement, rows) + keys = [getattr(row, column_name) for row in rows] + delete_statement = table.delete(column.in_(keys)) + result = conn.execute(delete_statement) + rows_archived = result.rowcount + return rows_archived + + +@require_admin_context +def archive_deleted_rows(context, max_rows=None): + """Move up to max_rows rows from production tables to the corresponding + shadow tables. + + :returns: Number of rows archived. + """ + # The context argument is only used for the decorator. + if max_rows is None: + max_rows = 5000 + tablenames = [] + for model_class in models.__dict__.itervalues(): + if hasattr(model_class, "__tablename__"): + tablenames.append(model_class.__tablename__) + rows_archived = 0 + for tablename in tablenames: + rows_archived += archive_deleted_rows_for_table(context, tablename, + max_rows=max_rows - rows_archived) + if rows_archived >= max_rows: + break + return rows_archived diff --git a/nova/db/sqlalchemy/migrate_repo/versions/154_add_shadow_tables.py b/nova/db/sqlalchemy/migrate_repo/versions/154_add_shadow_tables.py new file mode 100644 index 000000000..7c9f69c2b --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/154_add_shadow_tables.py @@ -0,0 +1,77 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import BigInteger, Column, MetaData, Table +from sqlalchemy.types import NullType + +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +def upgrade(migrate_engine): + meta = MetaData(migrate_engine) + meta.reflect(migrate_engine) + table_names = meta.tables.keys() + + meta.bind = migrate_engine + + for table_name in table_names: + if table_name.startswith('shadow'): + continue + table = Table(table_name, meta, autoload=True) + + columns = [] + for column in table.columns: + column_copy = None + # NOTE(boris-42): BigInteger is not supported by sqlite, so + # after copy it will have NullType, other + # types that are used in Nova are supported by + # sqlite. + if isinstance(column.type, NullType): + column_copy = Column(column.name, BigInteger(), default=0) + else: + column_copy = column.copy() + columns.append(column_copy) + + shadow_table_name = 'shadow_' + table_name + shadow_table = Table(shadow_table_name, meta, *columns, + mysql_engine='InnoDB') + try: + shadow_table.create() + except Exception: + LOG.info(repr(shadow_table)) + LOG.exception(_('Exception while creating table.')) + raise + + +def downgrade(migrate_engine): + meta = MetaData(migrate_engine) + meta.reflect(migrate_engine) + table_names = meta.tables.keys() + + meta.bind = migrate_engine + + for table_name in table_names: + if table_name.startswith('shadow'): + continue + shadow_table_name = 'shadow_' + table_name + shadow_table = Table(shadow_table_name, meta, autoload=True) + try: + shadow_table.drop() + except Exception: + LOG.error(_("table '%s' not dropped") % shadow_table_name) diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index 22de9346f..835527219 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -22,10 +22,15 @@ import datetime import uuid as stdlib_uuid +from sqlalchemy import MetaData +from sqlalchemy.schema import Table +from sqlalchemy.sql.expression import select + from nova import context from nova import db from nova import exception from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import timeutils from nova import test from nova.tests import matchers @@ -36,6 +41,9 @@ CONF = cfg.CONF CONF.import_opt('reserved_host_memory_mb', 'nova.compute.resource_tracker') CONF.import_opt('reserved_host_disk_mb', 'nova.compute.resource_tracker') +get_engine = db_session.get_engine +get_session = db_session.get_session + class DbApiTestCase(test.TestCase): def setUp(self): @@ -1791,3 +1799,156 @@ class TaskLogTestCase(test.TestCase): result = db.task_log_get(self.context, self.task_name, self.begin, self.end, self.host) self.assertEqual(result['errors'], 1) + + +class ArchiveTestCase(test.TestCase): + + def setUp(self): + super(ArchiveTestCase, self).setUp() + self.context = context.get_admin_context() + engine = get_engine() + self.conn = engine.connect() + self.metadata = MetaData() + self.metadata.bind = engine + self.table1 = Table("instance_id_mappings", + self.metadata, + autoload=True) + self.shadow_table1 = Table("shadow_instance_id_mappings", + self.metadata, + autoload=True) + self.table2 = Table("dns_domains", + self.metadata, + autoload=True) + self.shadow_table2 = Table("shadow_dns_domains", + self.metadata, + autoload=True) + self.uuidstrs = [] + for unused in xrange(6): + self.uuidstrs.append(stdlib_uuid.uuid4().hex) + + def tearDown(self): + super(ArchiveTestCase, self).tearDown() + delete_statement1 = self.table1.delete( + self.table1.c.uuid.in_(self.uuidstrs)) + self.conn.execute(delete_statement1) + delete_statement2 = self.shadow_table1.delete( + self.shadow_table1.c.uuid.in_(self.uuidstrs)) + self.conn.execute(delete_statement2) + delete_statement3 = self.table2.delete(self.table2.c.domain.in_( + self.uuidstrs)) + self.conn.execute(delete_statement3) + delete_statement4 = self.shadow_table2.delete( + self.shadow_table2.c.domain.in_(self.uuidstrs)) + self.conn.execute(delete_statement4) + + def test_archive_deleted_rows(self): + # Add 6 rows to table + for uuidstr in self.uuidstrs: + insert_statement = self.table1.insert().values(uuid=uuidstr) + self.conn.execute(insert_statement) + # Set 4 to deleted + update_statement = self.table1.update().\ + where(self.table1.c.uuid.in_(self.uuidstrs[:4]))\ + .values(deleted=True) + self.conn.execute(update_statement) + query1 = select([self.table1]).where(self.table1.c.uuid.in_( + self.uuidstrs)) + rows1 = self.conn.execute(query1).fetchall() + # Verify we have 6 in main + self.assertEqual(len(rows1), 6) + query2 = select([self.shadow_table1]).\ + where(self.shadow_table1.c.uuid.in_(self.uuidstrs)) + rows2 = self.conn.execute(query2).fetchall() + # Verify we have 0 in shadow + self.assertEqual(len(rows2), 0) + # Archive 2 rows + db.archive_deleted_rows(self.context, max_rows=2) + rows3 = self.conn.execute(query1).fetchall() + # Verify we have 4 left in main + self.assertEqual(len(rows3), 4) + rows4 = self.conn.execute(query2).fetchall() + # Verify we have 2 in shadow + self.assertEqual(len(rows4), 2) + # Archive 2 more rows + db.archive_deleted_rows(self.context, max_rows=2) + rows5 = self.conn.execute(query1).fetchall() + # Verify we have 2 left in main + self.assertEqual(len(rows5), 2) + rows6 = self.conn.execute(query2).fetchall() + # Verify we have 4 in shadow + self.assertEqual(len(rows6), 4) + # Try to archive more, but there are no deleted rows left. + db.archive_deleted_rows(self.context, max_rows=2) + rows7 = self.conn.execute(query1).fetchall() + # Verify we still have 2 left in main + self.assertEqual(len(rows7), 2) + rows8 = self.conn.execute(query2).fetchall() + # Verify we still have 4 in shadow + self.assertEqual(len(rows8), 4) + + def test_archive_deleted_rows_for_table(self): + tablename = "instance_id_mappings" + # Add 6 rows to table + for uuidstr in self.uuidstrs: + insert_statement = self.table1.insert().values(uuid=uuidstr) + self.conn.execute(insert_statement) + # Set 4 to deleted + update_statement = self.table1.update().\ + where(self.table1.c.uuid.in_(self.uuidstrs[:4]))\ + .values(deleted=True) + self.conn.execute(update_statement) + query1 = select([self.table1]).where(self.table1.c.uuid.in_( + self.uuidstrs)) + rows1 = self.conn.execute(query1).fetchall() + # Verify we have 6 in main + self.assertEqual(len(rows1), 6) + query2 = select([self.shadow_table1]).\ + where(self.shadow_table1.c.uuid.in_(self.uuidstrs)) + rows2 = self.conn.execute(query2).fetchall() + # Verify we have 0 in shadow + self.assertEqual(len(rows2), 0) + # Archive 2 rows + db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2) + rows3 = self.conn.execute(query1).fetchall() + # Verify we have 4 left in main + self.assertEqual(len(rows3), 4) + rows4 = self.conn.execute(query2).fetchall() + # Verify we have 2 in shadow + self.assertEqual(len(rows4), 2) + # Archive 2 more rows + db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2) + rows5 = self.conn.execute(query1).fetchall() + # Verify we have 2 left in main + self.assertEqual(len(rows5), 2) + rows6 = self.conn.execute(query2).fetchall() + # Verify we have 4 in shadow + self.assertEqual(len(rows6), 4) + # Try to archive more, but there are no deleted rows left. + db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2) + rows7 = self.conn.execute(query1).fetchall() + # Verify we still have 2 left in main + self.assertEqual(len(rows7), 2) + rows8 = self.conn.execute(query2).fetchall() + # Verify we still have 4 in shadow + self.assertEqual(len(rows8), 4) + + def test_archive_deleted_rows_no_id_column(self): + uuidstr0 = self.uuidstrs[0] + insert_statement = self.table2.insert().values(domain=uuidstr0) + self.conn.execute(insert_statement) + update_statement = self.table2.update().\ + where(self.table2.c.domain == uuidstr0).\ + values(deleted=True) + self.conn.execute(update_statement) + query1 = select([self.table2], self.table2.c.domain == uuidstr0) + rows1 = self.conn.execute(query1).fetchall() + self.assertEqual(len(rows1), 1) + query2 = select([self.shadow_table2], + self.shadow_table2.c.domain == uuidstr0) + rows2 = self.conn.execute(query2).fetchall() + self.assertEqual(len(rows2), 0) + db.archive_deleted_rows(self.context, max_rows=1) + rows3 = self.conn.execute(query1).fetchall() + self.assertEqual(len(rows3), 0) + rows4 = self.conn.execute(query2).fetchall() + self.assertEqual(len(rows4), 1) diff --git a/nova/tests/test_migrations.py b/nova/tests/test_migrations.py index a0c5db9c4..bb1086a9d 100644 --- a/nova/tests/test_migrations.py +++ b/nova/tests/test_migrations.py @@ -631,3 +631,49 @@ class TestMigrations(BaseMigrationTestCase): self.assertIn(prop_name, inst_sys_meta) self.assertEqual(str(inst_sys_meta[prop_name]), str(inst_type[prop])) + + # migration 154, add shadow tables for deleted data + # There are 53 shadow tables but we only test one + # There are additional tests in test_db_api.py + def _prerun_154(self, engine): + meta = sqlalchemy.schema.MetaData() + meta.reflect(engine) + table_names = meta.tables.keys() + for table_name in table_names: + self.assertFalse(table_name.startswith("_shadow")) + + def _check_154(self, engine, data): + meta = sqlalchemy.schema.MetaData() + meta.reflect(engine) + table_names = set(meta.tables.keys()) + for table_name in table_names: + print table_name + if table_name.startswith("shadow_"): + shadow_name = table_name + base_name = table_name.replace("shadow_", "") + self.assertIn(base_name, table_names) + else: + base_name = table_name + shadow_name = "shadow_" + table_name + self.assertIn(shadow_name, table_names) + shadow_table = get_table(engine, shadow_name) + base_table = get_table(engine, base_name) + base_columns = [] + shadow_columns = [] + for column in base_table.columns: + base_columns.append(column) + for column in shadow_table.columns: + shadow_columns.append(column) + for ii, base_column in enumerate(base_columns): + shadow_column = shadow_columns[ii] + self.assertEqual(base_column.name, shadow_column.name) + # NullType needs a special case. We end up with NullType on sqlite + # where bigint is not defined. + if isinstance(base_column.type, sqlalchemy.types.NullType): + self.assertTrue(isinstance(shadow_column.type, + sqlalchemy.types.NullType)) + else: + # Identical types do not test equal because sqlalchemy does not + # override __eq__, but if we stringify them then they do. + self.assertEqual(str(base_column.type), + str(shadow_column.type)) |