From cd0c765ced2823c05b16126d6ba6a823bb66fe7d Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Thu, 17 Jan 2013 15:38:02 -0500 Subject: Use oslo database code Bring in the new database code from oslo. Uses get_session() from oslo as well as changing NovaBase to derive from a common class. Remove test_sqlalchemy.py now that this code is test in oslo. Implements blueprint db-common. Change-Id: I090754981c871250dd981cbbe1a08e7181440120 --- bin/nova-manage | 9 +- nova/common/sqlalchemyutils.py | 128 ----- nova/compute/instance_types.py | 3 +- nova/config.py | 6 + nova/db/sqlalchemy/api.py | 15 +- nova/db/sqlalchemy/migration.py | 4 +- nova/db/sqlalchemy/models.py | 72 +-- nova/db/sqlalchemy/session.py | 620 ---------------------- nova/exception.py | 14 - nova/openstack/common/db/__init__.py | 16 + nova/openstack/common/db/sqlalchemy/__init__.py | 16 + nova/openstack/common/db/sqlalchemy/models.py | 103 ++++ nova/openstack/common/db/sqlalchemy/session.py | 667 ++++++++++++++++++++++++ nova/openstack/common/db/sqlalchemy/utils.py | 132 +++++ nova/test.py | 7 +- nova/tests/baremetal/db/test_bm_interface.py | 3 +- nova/tests/baremetal/db/test_bm_pxe_ip.py | 5 +- nova/tests/baremetal/test_pxe.py | 3 +- nova/tests/network/test_manager.py | 3 +- nova/tests/test_instance_types.py | 2 +- nova/tests/test_sqlalchemy.py | 129 ----- nova/virt/baremetal/db/sqlalchemy/api.py | 2 +- nova/virt/baremetal/db/sqlalchemy/session.py | 6 +- nova/virt/baremetal/driver.py | 3 +- nova/virt/baremetal/pxe.py | 3 +- openstack-common.conf | 2 +- 26 files changed, 986 insertions(+), 987 deletions(-) delete mode 100644 nova/common/sqlalchemyutils.py delete mode 100644 nova/db/sqlalchemy/session.py create mode 100644 nova/openstack/common/db/__init__.py create mode 100644 nova/openstack/common/db/sqlalchemy/__init__.py create mode 100644 nova/openstack/common/db/sqlalchemy/models.py create mode 100644 nova/openstack/common/db/sqlalchemy/session.py create mode 100644 nova/openstack/common/db/sqlalchemy/utils.py delete mode 100644 nova/tests/test_sqlalchemy.py diff --git a/bin/nova-manage b/bin/nova-manage index 82edf7389..c793fed16 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -80,6 +80,7 @@ from nova.db import migration from nova import exception from nova.openstack.common import cfg from nova.openstack.common import cliutils +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import importutils from nova.openstack.common import log as logging from nova.openstack.common import rpc @@ -831,7 +832,7 @@ class InstanceTypeCommands(object): except exception.InstanceTypeNotFound: print _("Valid instance type name is required") sys.exit(1) - except exception.DBError, e: + except db_session.DBError, e: print _("DB Error: %s") % e sys.exit(2) except Exception: @@ -848,7 +849,7 @@ class InstanceTypeCommands(object): inst_types = instance_types.get_all_types() else: inst_types = instance_types.get_instance_type_by_name(name) - except exception.DBError, e: + except db_session.DBError, e: _db_error(e) if isinstance(inst_types.values()[0], dict): for k, v in inst_types.iteritems(): @@ -879,7 +880,7 @@ class InstanceTypeCommands(object): ext_spec) print _("Key %(key)s set to %(value)s on instance" " type %(name)s") % locals() - except exception.DBError, e: + except db_session.DBError, e: _db_error(e) @args('--name', dest='name', metavar='', @@ -902,7 +903,7 @@ class InstanceTypeCommands(object): key) print _("Key %(key)s on instance type %(name)s unset") % locals() - except exception.DBError, e: + except db_session.DBError, e: _db_error(e) diff --git a/nova/common/sqlalchemyutils.py b/nova/common/sqlalchemyutils.py deleted file mode 100644 index a186948ac..000000000 --- a/nova/common/sqlalchemyutils.py +++ /dev/null @@ -1,128 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# Copyright 2010-2011 OpenStack LLC. -# Copyright 2012 Justin Santa Barbara -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Implementation of paginate query.""" - -import sqlalchemy - -from nova import exception -from nova.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - - -# copy from glance/db/sqlalchemy/api.py -def paginate_query(query, model, limit, sort_keys, marker=None, - sort_dir=None, sort_dirs=None): - """Returns a query with sorting / pagination criteria added. - - Pagination works by requiring a unique sort_key, specified by sort_keys. - (If sort_keys is not unique, then we risk looping through values.) - We use the last row in the previous page as the 'marker' for pagination. - So we must return values that follow the passed marker in the order. - With a single-valued sort_key, this would be easy: sort_key > X. - With a compound-values sort_key, (k1, k2, k3) we must do this to repeat - the lexicographical ordering: - (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3) - - We also have to cope with different sort_directions. - - Typically, the id of the last row is used as the client-facing pagination - marker, then the actual marker object must be fetched from the db and - passed in to us as marker. - - :param query: the query object to which we should add paging/sorting - :param model: the ORM model class - :param limit: maximum number of items to return - :param sort_keys: array of attributes by which results should be sorted - :param marker: the last item of the previous page; we returns the next - results after this value. - :param sort_dir: direction in which results should be sorted (asc, desc) - :param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys - - :rtype: sqlalchemy.orm.query.Query - :return: The query with sorting/pagination added. - """ - - if 'id' not in sort_keys: - # TODO(justinsb): If this ever gives a false-positive, check - # the actual primary key, rather than assuming its id - LOG.warn(_('Id not in sort_keys; is sort_keys unique?')) - - assert(not (sort_dir and sort_dirs)) - - # Default the sort direction to ascending - if sort_dirs is None and sort_dir is None: - sort_dir = 'asc' - - # Ensure a per-column sort direction - if sort_dirs is None: - sort_dirs = [sort_dir for _sort_key in sort_keys] - - assert(len(sort_dirs) == len(sort_keys)) - - # Add sorting - for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): - sort_dir_func = { - 'asc': sqlalchemy.asc, - 'desc': sqlalchemy.desc, - }[current_sort_dir] - - try: - sort_key_attr = getattr(model, current_sort_key) - except AttributeError: - raise exception.InvalidSortKey() - query = query.order_by(sort_dir_func(sort_key_attr)) - - # Add pagination - if marker is not None: - marker_values = [] - for sort_key in sort_keys: - v = getattr(marker, sort_key) - marker_values.append(v) - - # Build up an array of sort criteria as in the docstring - criteria_list = [] - for i in xrange(0, len(sort_keys)): - crit_attrs = [] - for j in xrange(0, i): - model_attr = getattr(model, sort_keys[j]) - crit_attrs.append((model_attr == marker_values[j])) - - model_attr = getattr(model, sort_keys[i]) - if sort_dirs[i] == 'desc': - crit_attrs.append((model_attr < marker_values[i])) - elif sort_dirs[i] == 'asc': - crit_attrs.append((model_attr > marker_values[i])) - else: - raise ValueError(_("Unknown sort direction, " - "must be 'desc' or 'asc'")) - - criteria = sqlalchemy.sql.and_(*crit_attrs) - criteria_list.append(criteria) - - f = sqlalchemy.sql.or_(*criteria_list) - query = query.filter(f) - - if limit is not None: - query = query.limit(limit) - - return query diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py index 78129ee6b..30766fd9e 100644 --- a/nova/compute/instance_types.py +++ b/nova/compute/instance_types.py @@ -27,6 +27,7 @@ from nova import context from nova import db from nova import exception from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import log as logging from nova import utils @@ -110,7 +111,7 @@ def create(name, memory, vcpus, root_gb, ephemeral_gb=None, flavorid=None, try: return db.instance_type_create(context.get_admin_context(), kwargs) - except exception.DBError, e: + except db_session.DBError, e: LOG.exception(_('DB error: %s') % e) raise exception.InstanceTypeCreateFailed() diff --git a/nova/config.py b/nova/config.py index 4095dba75..18147bdbb 100644 --- a/nova/config.py +++ b/nova/config.py @@ -18,10 +18,16 @@ # under the License. from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import rpc +from nova import paths + +_DEFAULT_SQL_CONNECTION = 'sqlite:///' + paths.state_path_def('$sqlite_db') def parse_args(argv, default_config_files=None): + db_session.set_defaults(sql_connection=_DEFAULT_SQL_CONNECTION, + sqlite_db='nova.sqlite') rpc.set_defaults(control_exchange='nova') cfg.CONF(argv[1:], project='nova', diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 0154805ac..6a973e59f 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -35,14 +35,14 @@ from sqlalchemy.sql.expression import desc from sqlalchemy.sql import func from nova import block_device -from nova.common import sqlalchemyutils from nova.compute import task_states from nova.compute import vm_states from nova import db from nova.db.sqlalchemy import models -from nova.db.sqlalchemy.session import get_session from nova import exception from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session +from nova.openstack.common.db.sqlalchemy import utils as sqlalchemyutils from nova.openstack.common import log as logging from nova.openstack.common import timeutils from nova.openstack.common import uuidutils @@ -58,10 +58,13 @@ db_opts = [ CONF = cfg.CONF CONF.register_opts(db_opts) CONF.import_opt('compute_topic', 'nova.compute.rpcapi') -CONF.import_opt('sql_connection', 'nova.db.sqlalchemy.session') +CONF.import_opt('sql_connection', + 'nova.openstack.common.db.sqlalchemy.session') LOG = logging.getLogger(__name__) +get_session = db_session.get_session + def is_user_context(context): """Indicates if the request context is a normal user.""" @@ -1251,7 +1254,7 @@ def virtual_interface_create(context, values): vif_ref = models.VirtualInterface() vif_ref.update(values) vif_ref.save() - except exception.DBError: + except db_session.DBError: raise exception.VirtualInterfaceCreateException() return vif_ref @@ -3535,7 +3538,7 @@ def instance_type_create(context, values): instance_type_ref.update(values) instance_type_ref.save(session=session) except Exception, e: - raise exception.DBError(e) + raise db_session.DBError(e) return _dict_with_extra_specs(instance_type_ref) @@ -4238,7 +4241,7 @@ def s3_image_create(context, image_uuid): s3_image_ref.update({'uuid': image_uuid}) s3_image_ref.save() except Exception, e: - raise exception.DBError(e) + raise db_session.DBError(e) return s3_image_ref diff --git a/nova/db/sqlalchemy/migration.py b/nova/db/sqlalchemy/migration.py index dbc1ed432..421167bec 100644 --- a/nova/db/sqlalchemy/migration.py +++ b/nova/db/sqlalchemy/migration.py @@ -20,8 +20,8 @@ import distutils.version as dist_version import os from nova.db import migration -from nova.db.sqlalchemy.session import get_engine from nova import exception +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import log as logging @@ -62,6 +62,8 @@ from migrate.versioning.repository import Repository _REPOSITORY = None +get_engine = db_session.get_engine + def db_sync(version=None): if version is not None: diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index b4c680ac0..78629e6c9 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -26,9 +26,9 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import ForeignKey, DateTime, Boolean, Text, Float from sqlalchemy.orm import relationship, backref, object_mapper -from nova.db.sqlalchemy.session import get_session from nova.db.sqlalchemy import types from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import models from nova.openstack.common import timeutils @@ -36,74 +36,8 @@ CONF = cfg.CONF BASE = declarative_base() -class NovaBase(object): - """Base class for Nova Models.""" - __table_initialized__ = False - created_at = Column(DateTime, default=timeutils.utcnow) - updated_at = Column(DateTime, onupdate=timeutils.utcnow) - deleted_at = Column(DateTime) - deleted = Column(Integer, default=0) - metadata = None - - def save(self, session=None): - """Save this object.""" - if not session: - session = get_session() - # NOTE(boris-42): This part of code should be look like: - # sesssion.add(self) - # session.flush() - # But there is a bug in sqlalchemy and eventlet that - # raises NoneType exception if there is no running - # transaction and rollback is called. As long as - # sqlalchemy has this bug we have to create transaction - # explicity. - with session.begin(subtransactions=True): - session.add(self) - session.flush() - - def soft_delete(self, session=None): - """Mark this object as deleted.""" - self.deleted = self.id - self.deleted_at = timeutils.utcnow() - self.save(session=session) - - def __setitem__(self, key, value): - setattr(self, key, value) - - def __getitem__(self, key): - return getattr(self, key) - - def get(self, key, default=None): - return getattr(self, key, default) - - def __iter__(self): - columns = dict(object_mapper(self).columns).keys() - # NOTE(russellb): Allow models to specify other keys that can be looked - # up, beyond the actual db columns. An example would be the 'name' - # property for an Instance. - if hasattr(self, '_extra_keys'): - columns.extend(self._extra_keys()) - self._i = iter(columns) - return self - - def next(self): - n = self._i.next() - return n, getattr(self, n) - - def update(self, values): - """Make the model object behave like a dict.""" - for k, v in values.iteritems(): - setattr(self, k, v) - - def iteritems(self): - """Make the model object behave like a dict. - - Includes attributes from joins.""" - local = dict(self) - joined = dict([(k, v) for k, v in self.__dict__.iteritems() - if not k[0] == '_']) - local.update(joined) - return local.iteritems() +class NovaBase(models.SoftDeleteMixin, models.ModelBase): + pass class Service(BASE, NovaBase): diff --git a/nova/db/sqlalchemy/session.py b/nova/db/sqlalchemy/session.py deleted file mode 100644 index 28ec613c5..000000000 --- a/nova/db/sqlalchemy/session.py +++ /dev/null @@ -1,620 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Session Handling for SQLAlchemy backend. - -Recommended ways to use sessions within this framework: - -* Don't use them explicitly; this is like running with AUTOCOMMIT=1. - model_query() will implicitly use a session when called without one - supplied. This is the ideal situation because it will allow queries - to be automatically retried if the database connection is interrupted. - - Note: Automatic retry will be enabled in a future patch. - - It is generally fine to issue several queries in a row like this. Even though - they may be run in separate transactions and/or separate sessions, each one - will see the data from the prior calls. If needed, undo- or rollback-like - functionality should be handled at a logical level. For an example, look at - the code around quotas and reservation_rollback(). - - Examples: - - def get_foo(context, foo): - return model_query(context, models.Foo).\ - filter_by(foo=foo).\ - first() - - def update_foo(context, id, newfoo): - model_query(context, models.Foo).\ - filter_by(id=id).\ - update({'foo': newfoo}) - - def create_foo(context, values): - foo_ref = models.Foo() - foo_ref.update(values) - foo_ref.save() - return foo_ref - - -* Within the scope of a single method, keeping all the reads and writes within - the context managed by a single session. In this way, the session's __exit__ - handler will take care of calling flush() and commit() for you. - If using this approach, you should not explicitly call flush() or commit(). - Any error within the context of the session will cause the session to emit - a ROLLBACK. If the connection is dropped before this is possible, the - database will implicitly rollback the transaction. - - Note: statements in the session scope will not be automatically retried. - - If you create models within the session, they need to be added, but you - do not need to call model.save() - - def create_many_foo(context, foos): - session = get_session() - with session.begin(): - for foo in foos: - foo_ref = models.Foo() - foo_ref.update(foo) - session.add(foo_ref) - - def update_bar(context, foo_id, newbar): - session = get_session() - with session.begin(): - foo_ref = model_query(context, models.Foo, session).\ - filter_by(id=foo_id).\ - first() - model_query(context, models.Bar, session).\ - filter_by(id=foo_ref['bar_id']).\ - update({'bar': newbar}) - - Note: update_bar is a trivially simple example of using "with session.begin". - Whereas create_many_foo is a good example of when a transaction is needed, - it is always best to use as few queries as possible. The two queries in - update_bar can be better expressed using a single query which avoids - the need for an explicit transaction. It can be expressed like so: - - def update_bar(context, foo_id, newbar): - subq = model_query(context, models.Foo.id).\ - filter_by(id=foo_id).\ - limit(1).\ - subquery() - model_query(context, models.Bar).\ - filter_by(id=subq.as_scalar()).\ - update({'bar': newbar}) - - For reference, this emits approximagely the following SQL statement: - - UPDATE bar SET bar = ${newbar} - WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); - -* Passing an active session between methods. Sessions should only be passed - to private methods. The private method must use a subtransaction; otherwise - SQLAlchemy will throw an error when you call session.begin() on an existing - transaction. Public methods should not accept a session parameter and should - not be involved in sessions within the caller's scope. - - Note that this incurs more overhead in SQLAlchemy than the above means - due to nesting transactions, and it is not possible to implicitly retry - failed database operations when using this approach. - - This also makes code somewhat more difficult to read and debug, because a - single database transaction spans more than one method. Error handling - becomes less clear in this situation. When this is needed for code clarity, - it should be clearly documented. - - def myfunc(foo): - session = get_session() - with session.begin(): - # do some database things - bar = _private_func(foo, session) - return bar - - def _private_func(foo, session=None): - if not session: - session = get_session() - with session.begin(subtransaction=True): - # do some other database things - return bar - - -There are some things which it is best to avoid: - -* Don't keep a transaction open any longer than necessary. - - This means that your "with session.begin()" block should be as short - as possible, while still containing all the related calls for that - transaction. - -* Avoid "with_lockmode('UPDATE')" when possible. - - In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match - any rows, it will take a gap-lock. This is a form of write-lock on the - "gap" where no rows exist, and prevents any other writes to that space. - This can effectively prevent any INSERT into a table by locking the gap - at the end of the index. Similar problems will occur if the SELECT FOR UPDATE - has an overly broad WHERE clause, or doesn't properly use an index. - - One idea proposed at ODS Fall '12 was to use a normal SELECT to test the - number of rows matching a query, and if only one row is returned, - then issue the SELECT FOR UPDATE. - - The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE. - However, this can not be done until the "deleted" columns are removed and - proper UNIQUE constraints are added to the tables. - - -Efficient use of soft deletes: - -* There are two possible ways to mark a record as deleted: - model.soft_delete() and query.soft_delete(). - - model.soft_delete() method works with single already fetched entry. - query.soft_delete() makes only one db request for all entries that correspond - to query. - -* In almost all cases you should use query.soft_delete(). Some examples: - - def soft_delete_bar(): - count = model_query(BarModel).find(some_condition).soft_delete() - if count == 0: - raise Exception("0 entries were soft deleted") - - def complex_soft_delete_with_synchronization_bar(session=None): - if session is None: - session = get_session() - with session.begin(subtransactions=True): - count = model_query(BarModel).\ - find(some_condition).\ - soft_delete(synchronize_session=True) - # Here synchronize_session is required, because we - # don't know what is going on in outer session. - if count == 0: - raise Exception("0 entries were soft deleted") - -* There is only one situation where model.soft_delete() is appropriate: when - you fetch a single record, work with it, and mark it as deleted in the same - transaction. - - def soft_delete_bar_model(): - session = get_session() - with session.begin(): - bar_ref = model_query(BarModel).find(some_condition).first() - # Work with bar_ref - bar_ref.soft_delete(session=session) - - However, if you need to work with all entries that correspond to query and - then soft delete them you should use query.soft_delete() method: - - def soft_delete_multi_models(): - session = get_session() - with session.begin(): - query = model_query(BarModel, session=session).\ - find(some_condition) - model_refs = query.all() - # Work with model_refs - query.soft_delete(synchronize_session=False) - # synchronize_session=False should be set if there is no outer - # session and these entries are not used after this. - - When working with many rows, it is very important to use query.soft_delete, - which issues a single query. Using model.soft_delete(), as in the following - example, is very inefficient. - - for bar_ref in bar_refs: - bar_ref.soft_delete(session=session) - # This will produce count(bar_refs) db requests. -""" - -import re -import time - -from eventlet import db_pool -from eventlet import greenthread -try: - import MySQLdb - from MySQLdb.constants import CLIENT as mysql_client_constants -except ImportError: - MySQLdb = None - mysql_client_constants = None -from sqlalchemy.exc import DisconnectionError, OperationalError, IntegrityError -import sqlalchemy.interfaces -import sqlalchemy.orm -from sqlalchemy.pool import NullPool, StaticPool -from sqlalchemy.sql.expression import literal_column - -import nova.exception -from nova.openstack.common import cfg -import nova.openstack.common.log as logging -from nova.openstack.common import timeutils -from nova import paths - - -sql_opts = [ - cfg.StrOpt('sql_connection', - default='sqlite:///' + paths.state_path_def('$sqlite_db'), - help='The SQLAlchemy connection string used to connect to the ' - 'database'), - cfg.StrOpt('sqlite_db', - default='nova.sqlite', - help='the filename to use with sqlite'), - cfg.IntOpt('sql_idle_timeout', - default=3600, - help='timeout before idle sql connections are reaped'), - cfg.BoolOpt('sqlite_synchronous', - default=True, - help='If passed, use synchronous mode for sqlite'), - cfg.IntOpt('sql_min_pool_size', - default=1, - help='Minimum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('sql_max_pool_size', - default=5, - help='Maximum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('sql_max_retries', - default=10, - help='maximum db connection retries during startup. ' - '(setting -1 implies an infinite retry count)'), - cfg.IntOpt('sql_retry_interval', - default=10, - help='interval between retries of opening a sql connection'), - cfg.IntOpt('sql_max_overflow', - default=None, - help='If set, use this value for max_overflow with sqlalchemy'), - cfg.IntOpt('sql_connection_debug', - default=0, - help='Verbosity of SQL debugging information. 0=None, ' - '100=Everything'), - cfg.BoolOpt('sql_connection_trace', - default=False, - help='Add python stack traces to SQL as comment strings'), - cfg.BoolOpt('sql_dbpool_enable', - default=False, - help="enable the use of eventlet's db_pool for MySQL"), -] - -CONF = cfg.CONF -CONF.register_opts(sql_opts) -LOG = logging.getLogger(__name__) - -_ENGINE = None -_MAKER = None - - -def get_session(autocommit=True, expire_on_commit=False): - """Return a SQLAlchemy session.""" - global _MAKER - - if _MAKER is None: - engine = get_engine() - _MAKER = get_maker(engine, autocommit, expire_on_commit) - - session = _MAKER() - return session - - -# note(boris-42): In current versions of DB backends unique constraint -# violation messages follow the structure: -# -# sqlite: -# 1 column - (IntegrityError) column c1 is not unique -# N columns - (IntegrityError) column c1, c2, ..., N are not unique -# -# postgres: -# 1 column - (IntegrityError) duplicate key value violates unique -# constraint "users_c1_key" -# N columns - (IntegrityError) duplicate key value violates unique -# constraint "name_of_our_constraint" -# -# mysql: -# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key -# 'c1'") -# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined -# with -' for key 'name_of_our_constraint'") -_RE_DB = { - "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), - "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"), - "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$") -} - - -def raise_if_duplicate_entry_error(integrity_error, engine_name): - """ - In this function will be raised DBDuplicateEntry exception if integrity - error wrap unique constraint violation. - """ - - def get_columns_from_uniq_cons_or_name(columns): - # note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3" - # means that columns c1, c2, c3 are in UniqueConstraint. - uniqbase = "uniq_" - if not columns.startswith(uniqbase): - if engine_name == "postgresql": - return [columns[columns.index("_") + 1:columns.rindex("_")]] - return [columns] - return columns[len(uniqbase):].split("_x_") - - if engine_name not in ["mysql", "sqlite", "postgresql"]: - return - - m = _RE_DB[engine_name].match(integrity_error.message) - if not m: - return - columns = m.group(1) - - if engine_name == "sqlite": - columns = columns.strip().split(", ") - else: - columns = get_columns_from_uniq_cons_or_name(columns) - raise nova.exception.DBDuplicateEntry(columns, integrity_error) - - -def wrap_db_error(f): - def _wrap(*args, **kwargs): - try: - return f(*args, **kwargs) - except UnicodeEncodeError: - raise nova.exception.InvalidUnicodeParameter() - # note(boris-42): We should catch unique constraint violation and - # wrap it by our own DBDuplicateEntry exception. Unique constraint - # violation is wrapped by IntegrityError. - except IntegrityError, e: - # note(boris-42): SqlAlchemy doesn't unify errors from different - # DBs so we must do this. Also in some tables (for example - # instance_types) there are more than one unique constraint. This - # means we should get names of columns, which values violate - # unique constraint, from error message. - raise_if_duplicate_entry_error(e, get_engine().name) - raise nova.exception.DBError(e) - except Exception, e: - LOG.exception(_('DB exception wrapped.')) - raise nova.exception.DBError(e) - _wrap.func_name = f.func_name - return _wrap - - -def get_engine(): - """Return a SQLAlchemy engine.""" - global _ENGINE - if _ENGINE is None: - _ENGINE = create_engine(CONF.sql_connection) - return _ENGINE - - -def synchronous_switch_listener(dbapi_conn, connection_rec): - """Switch sqlite connections to non-synchronous mode.""" - dbapi_conn.execute("PRAGMA synchronous = OFF") - - -def add_regexp_listener(dbapi_con, con_record): - """Add REGEXP function to sqlite connections.""" - - def regexp(expr, item): - reg = re.compile(expr) - return reg.search(unicode(item)) is not None - dbapi_con.create_function('regexp', 2, regexp) - - -def greenthread_yield(dbapi_con, con_record): - """ - Ensure other greenthreads get a chance to execute by forcing a context - switch. With common database backends (eg MySQLdb and sqlite), there is - no implicit yield caused by network I/O since they are implemented by - C libraries that eventlet cannot monkey patch. - """ - greenthread.sleep(0) - - -def ping_listener(dbapi_conn, connection_rec, connection_proxy): - """ - Ensures that MySQL connections checked out of the - pool are alive. - - Borrowed from: - http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f - """ - try: - dbapi_conn.cursor().execute('select 1') - except dbapi_conn.OperationalError, ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - LOG.warn(_('Got mysql server has gone away: %s'), ex) - raise DisconnectionError("Database server went away") - else: - raise - - -def is_db_connection_error(args): - """Return True if error in connecting to db.""" - # NOTE(adam_g): This is currently MySQL specific and needs to be extended - # to support Postgres and others. - conn_err_codes = ('2002', '2003', '2006') - for err_code in conn_err_codes: - if args.find(err_code) != -1: - return True - return False - - -def create_engine(sql_connection): - """Return a new SQLAlchemy engine.""" - connection_dict = sqlalchemy.engine.url.make_url(sql_connection) - - engine_args = { - "pool_recycle": CONF.sql_idle_timeout, - "echo": False, - 'convert_unicode': True, - } - - # Map our SQL debug level to SQLAlchemy's options - if CONF.sql_connection_debug >= 100: - engine_args['echo'] = 'debug' - elif CONF.sql_connection_debug >= 50: - engine_args['echo'] = True - - if "sqlite" in connection_dict.drivername: - engine_args["poolclass"] = NullPool - - if CONF.sql_connection == "sqlite://": - engine_args["poolclass"] = StaticPool - engine_args["connect_args"] = {'check_same_thread': False} - elif all((CONF.sql_dbpool_enable, MySQLdb, - "mysql" in connection_dict.drivername)): - LOG.info(_("Using mysql/eventlet db_pool.")) - # MySQLdb won't accept 'None' in the password field - password = connection_dict.password or '' - pool_args = { - 'db': connection_dict.database, - 'passwd': password, - 'host': connection_dict.host, - 'user': connection_dict.username, - 'min_size': CONF.sql_min_pool_size, - 'max_size': CONF.sql_max_pool_size, - 'max_idle': CONF.sql_idle_timeout, - 'client_flag': mysql_client_constants.FOUND_ROWS} - - pool = db_pool.ConnectionPool(MySQLdb, **pool_args) - - def creator(): - conn = pool.create() - if isinstance(conn, tuple): - # NOTE(belliott) eventlet >= 0.10 returns a tuple - now, now, conn = conn - - return conn - - engine_args['creator'] = creator - - else: - engine_args['pool_size'] = CONF.sql_max_pool_size - if CONF.sql_max_overflow is not None: - engine_args['max_overflow'] = CONF.sql_max_overflow - - engine = sqlalchemy.create_engine(sql_connection, **engine_args) - - sqlalchemy.event.listen(engine, 'checkin', greenthread_yield) - - if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(engine, 'checkout', ping_listener) - elif 'sqlite' in connection_dict.drivername: - if not CONF.sqlite_synchronous: - sqlalchemy.event.listen(engine, 'connect', - synchronous_switch_listener) - sqlalchemy.event.listen(engine, 'connect', add_regexp_listener) - - if (CONF.sql_connection_trace and - engine.dialect.dbapi.__name__ == 'MySQLdb'): - patch_mysqldb_with_stacktrace_comments() - - try: - engine.connect() - except OperationalError, e: - if not is_db_connection_error(e.args[0]): - raise - - remaining = CONF.sql_max_retries - if remaining == -1: - remaining = 'infinite' - while True: - msg = _('SQL connection failed. %s attempts left.') - LOG.warn(msg % remaining) - if remaining != 'infinite': - remaining -= 1 - time.sleep(CONF.sql_retry_interval) - try: - engine.connect() - break - except OperationalError, e: - if (remaining != 'infinite' and remaining == 0) or \ - not is_db_connection_error(e.args[0]): - raise - return engine - - -class Query(sqlalchemy.orm.query.Query): - """Subclass of sqlalchemy.query with soft_delete() method.""" - def soft_delete(self, synchronize_session='evaluate'): - return self.update({'deleted': literal_column('id'), - 'updated_at': literal_column('updated_at'), - 'deleted_at': timeutils.utcnow()}, - synchronize_session=synchronize_session) - - -class Session(sqlalchemy.orm.session.Session): - """Custom Session class to avoid SqlAlchemy Session monkey patching.""" - @wrap_db_error - def query(self, *args, **kwargs): - return super(Session, self).query(*args, **kwargs) - - @wrap_db_error - def flush(self, *args, **kwargs): - return super(Session, self).flush(*args, **kwargs) - - @wrap_db_error - def execute(self, *args, **kwargs): - return super(Session, self).execute(*args, **kwargs) - - -def get_maker(engine, autocommit=True, expire_on_commit=False): - """Return a SQLAlchemy sessionmaker using the given engine.""" - return sqlalchemy.orm.sessionmaker(bind=engine, - class_=Session, - autocommit=autocommit, - expire_on_commit=expire_on_commit, - query_cls=Query) - - -def patch_mysqldb_with_stacktrace_comments(): - """Adds current stack trace as a comment in queries by patching - MySQLdb.cursors.BaseCursor._do_query. - """ - import MySQLdb.cursors - import traceback - - old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query - - def _do_query(self, q): - stack = '' - for file, line, method, function in traceback.extract_stack(): - # exclude various common things from trace - if file.endswith('session.py') and method == '_do_query': - continue - if file.endswith('api.py') and method == 'wrapper': - continue - if file.endswith('utils.py') and method == '_inner': - continue - if file.endswith('exception.py') and method == '_wrap': - continue - # nova/db/api is just a wrapper around nova/db/sqlalchemy/api - if file.endswith('nova/db/api.py'): - continue - # only trace inside nova - index = file.rfind('nova') - if index == -1: - continue - stack += "File:%s:%s Method:%s() Line:%s | " \ - % (file[index:], line, method, function) - - # strip trailing " | " from stack - if stack: - stack = stack[:-3] - qq = "%s /* %s */" % (q, stack) - else: - qq = q - old_mysql_do_query(self, qq) - - setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) diff --git a/nova/exception.py b/nova/exception.py index 6bb8097c3..3b20b7e78 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -164,20 +164,6 @@ class EC2APIError(NovaException): super(EC2APIError, self).__init__(outstr) -class DBError(NovaException): - """Wraps an implementation specific exception.""" - def __init__(self, inner_exception=None): - self.inner_exception = inner_exception - super(DBError, self).__init__(str(inner_exception)) - - -class DBDuplicateEntry(DBError): - """Wraps an implementation specific exception.""" - def __init__(self, columns=[], inner_exception=None): - self.columns = columns - super(DBDuplicateEntry, self).__init__(inner_exception) - - class EncryptionFailure(NovaException): message = _("Failed to encrypt text: %(reason)s") diff --git a/nova/openstack/common/db/__init__.py b/nova/openstack/common/db/__init__.py new file mode 100644 index 000000000..1b9b60dec --- /dev/null +++ b/nova/openstack/common/db/__init__.py @@ -0,0 +1,16 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Cloudscaling Group, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/nova/openstack/common/db/sqlalchemy/__init__.py b/nova/openstack/common/db/sqlalchemy/__init__.py new file mode 100644 index 000000000..1b9b60dec --- /dev/null +++ b/nova/openstack/common/db/sqlalchemy/__init__.py @@ -0,0 +1,16 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Cloudscaling Group, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/nova/openstack/common/db/sqlalchemy/models.py b/nova/openstack/common/db/sqlalchemy/models.py new file mode 100644 index 000000000..87ec7ccc3 --- /dev/null +++ b/nova/openstack/common/db/sqlalchemy/models.py @@ -0,0 +1,103 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Piston Cloud Computing, Inc. +# Copyright 2012 Cloudscaling Group, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +SQLAlchemy models. +""" + +from sqlalchemy import Column, Integer +from sqlalchemy import DateTime +from sqlalchemy.orm import object_mapper + +from nova.openstack.common.db.sqlalchemy.session import get_session +from nova.openstack.common import timeutils + + +class ModelBase(object): + """Base class for models.""" + __table_initialized__ = False + created_at = Column(DateTime, default=timeutils.utcnow) + updated_at = Column(DateTime, onupdate=timeutils.utcnow) + metadata = None + + def save(self, session=None): + """Save this object.""" + if not session: + session = get_session() + # NOTE(boris-42): This part of code should be look like: + # sesssion.add(self) + # session.flush() + # But there is a bug in sqlalchemy and eventlet that + # raises NoneType exception if there is no running + # transaction and rollback is called. As long as + # sqlalchemy has this bug we have to create transaction + # explicity. + with session.begin(subtransactions=True): + session.add(self) + session.flush() + + def __setitem__(self, key, value): + setattr(self, key, value) + + def __getitem__(self, key): + return getattr(self, key) + + def get(self, key, default=None): + return getattr(self, key, default) + + def __iter__(self): + columns = dict(object_mapper(self).columns).keys() + # NOTE(russellb): Allow models to specify other keys that can be looked + # up, beyond the actual db columns. An example would be the 'name' + # property for an Instance. + if hasattr(self, '_extra_keys'): + columns.extend(self._extra_keys()) + self._i = iter(columns) + return self + + def next(self): + n = self._i.next() + return n, getattr(self, n) + + def update(self, values): + """Make the model object behave like a dict.""" + for k, v in values.iteritems(): + setattr(self, k, v) + + def iteritems(self): + """Make the model object behave like a dict. + + Includes attributes from joins.""" + local = dict(self) + joined = dict([(k, v) for k, v in self.__dict__.iteritems() + if not k[0] == '_']) + local.update(joined) + return local.iteritems() + + +class SoftDeleteMixin(object): + deleted_at = Column(DateTime) + deleted = Column(Integer, default=0) + + def soft_delete(self, session=None): + """Mark this object as deleted.""" + self.deleted = self.id + self.deleted_at = timeutils.utcnow() + self.save(session=session) diff --git a/nova/openstack/common/db/sqlalchemy/session.py b/nova/openstack/common/db/sqlalchemy/session.py new file mode 100644 index 000000000..2125b006d --- /dev/null +++ b/nova/openstack/common/db/sqlalchemy/session.py @@ -0,0 +1,667 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Session Handling for SQLAlchemy backend. + +Initializing: + +* Call set_defaults with the minimal of the following kwargs: + sql_connection, sqlite_db + + Example: + + session.set_defaults(sql_connection="sqlite:///var/lib/nova/sqlite.db", + sqlite_db="/var/lib/nova/sqlite.db") + +Recommended ways to use sessions within this framework: + +* Don't use them explicitly; this is like running with AUTOCOMMIT=1. + model_query() will implicitly use a session when called without one + supplied. This is the ideal situation because it will allow queries + to be automatically retried if the database connection is interrupted. + + Note: Automatic retry will be enabled in a future patch. + + It is generally fine to issue several queries in a row like this. Even though + they may be run in separate transactions and/or separate sessions, each one + will see the data from the prior calls. If needed, undo- or rollback-like + functionality should be handled at a logical level. For an example, look at + the code around quotas and reservation_rollback(). + + Examples: + + def get_foo(context, foo): + return model_query(context, models.Foo).\ + filter_by(foo=foo).\ + first() + + def update_foo(context, id, newfoo): + model_query(context, models.Foo).\ + filter_by(id=id).\ + update({'foo': newfoo}) + + def create_foo(context, values): + foo_ref = models.Foo() + foo_ref.update(values) + foo_ref.save() + return foo_ref + + +* Within the scope of a single method, keeping all the reads and writes within + the context managed by a single session. In this way, the session's __exit__ + handler will take care of calling flush() and commit() for you. + If using this approach, you should not explicitly call flush() or commit(). + Any error within the context of the session will cause the session to emit + a ROLLBACK. If the connection is dropped before this is possible, the + database will implicitly rollback the transaction. + + Note: statements in the session scope will not be automatically retried. + + If you create models within the session, they need to be added, but you + do not need to call model.save() + + def create_many_foo(context, foos): + session = get_session() + with session.begin(): + for foo in foos: + foo_ref = models.Foo() + foo_ref.update(foo) + session.add(foo_ref) + + def update_bar(context, foo_id, newbar): + session = get_session() + with session.begin(): + foo_ref = model_query(context, models.Foo, session).\ + filter_by(id=foo_id).\ + first() + model_query(context, models.Bar, session).\ + filter_by(id=foo_ref['bar_id']).\ + update({'bar': newbar}) + + Note: update_bar is a trivially simple example of using "with session.begin". + Whereas create_many_foo is a good example of when a transaction is needed, + it is always best to use as few queries as possible. The two queries in + update_bar can be better expressed using a single query which avoids + the need for an explicit transaction. It can be expressed like so: + + def update_bar(context, foo_id, newbar): + subq = model_query(context, models.Foo.id).\ + filter_by(id=foo_id).\ + limit(1).\ + subquery() + model_query(context, models.Bar).\ + filter_by(id=subq.as_scalar()).\ + update({'bar': newbar}) + + For reference, this emits approximagely the following SQL statement: + + UPDATE bar SET bar = ${newbar} + WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); + +* Passing an active session between methods. Sessions should only be passed + to private methods. The private method must use a subtransaction; otherwise + SQLAlchemy will throw an error when you call session.begin() on an existing + transaction. Public methods should not accept a session parameter and should + not be involved in sessions within the caller's scope. + + Note that this incurs more overhead in SQLAlchemy than the above means + due to nesting transactions, and it is not possible to implicitly retry + failed database operations when using this approach. + + This also makes code somewhat more difficult to read and debug, because a + single database transaction spans more than one method. Error handling + becomes less clear in this situation. When this is needed for code clarity, + it should be clearly documented. + + def myfunc(foo): + session = get_session() + with session.begin(): + # do some database things + bar = _private_func(foo, session) + return bar + + def _private_func(foo, session=None): + if not session: + session = get_session() + with session.begin(subtransaction=True): + # do some other database things + return bar + + +There are some things which it is best to avoid: + +* Don't keep a transaction open any longer than necessary. + + This means that your "with session.begin()" block should be as short + as possible, while still containing all the related calls for that + transaction. + +* Avoid "with_lockmode('UPDATE')" when possible. + + In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match + any rows, it will take a gap-lock. This is a form of write-lock on the + "gap" where no rows exist, and prevents any other writes to that space. + This can effectively prevent any INSERT into a table by locking the gap + at the end of the index. Similar problems will occur if the SELECT FOR UPDATE + has an overly broad WHERE clause, or doesn't properly use an index. + + One idea proposed at ODS Fall '12 was to use a normal SELECT to test the + number of rows matching a query, and if only one row is returned, + then issue the SELECT FOR UPDATE. + + The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE. + However, this can not be done until the "deleted" columns are removed and + proper UNIQUE constraints are added to the tables. + + +Enabling soft deletes: + +* To use/enable soft-deletes, the SoftDeleteMixin must be added + to your model class. For example: + + class NovaBase(models.SoftDeleteMixin, models.ModelBase): + pass + + +Efficient use of soft deletes: + +* There are two possible ways to mark a record as deleted: + model.soft_delete() and query.soft_delete(). + + model.soft_delete() method works with single already fetched entry. + query.soft_delete() makes only one db request for all entries that correspond + to query. + +* In almost all cases you should use query.soft_delete(). Some examples: + + def soft_delete_bar(): + count = model_query(BarModel).find(some_condition).soft_delete() + if count == 0: + raise Exception("0 entries were soft deleted") + + def complex_soft_delete_with_synchronization_bar(session=None): + if session is None: + session = get_session() + with session.begin(subtransactions=True): + count = model_query(BarModel).\ + find(some_condition).\ + soft_delete(synchronize_session=True) + # Here synchronize_session is required, because we + # don't know what is going on in outer session. + if count == 0: + raise Exception("0 entries were soft deleted") + +* There is only one situation where model.soft_delete() is appropriate: when + you fetch a single record, work with it, and mark it as deleted in the same + transaction. + + def soft_delete_bar_model(): + session = get_session() + with session.begin(): + bar_ref = model_query(BarModel).find(some_condition).first() + # Work with bar_ref + bar_ref.soft_delete(session=session) + + However, if you need to work with all entries that correspond to query and + then soft delete them you should use query.soft_delete() method: + + def soft_delete_multi_models(): + session = get_session() + with session.begin(): + query = model_query(BarModel, session=session).\ + find(some_condition) + model_refs = query.all() + # Work with model_refs + query.soft_delete(synchronize_session=False) + # synchronize_session=False should be set if there is no outer + # session and these entries are not used after this. + + When working with many rows, it is very important to use query.soft_delete, + which issues a single query. Using model.soft_delete(), as in the following + example, is very inefficient. + + for bar_ref in bar_refs: + bar_ref.soft_delete(session=session) + # This will produce count(bar_refs) db requests. +""" + +import os.path +import re +import time + +from eventlet import db_pool +from eventlet import greenthread +try: + import MySQLdb + from MySQLdb.constants import CLIENT as mysql_client_constants +except ImportError: + MySQLdb = None + mysql_client_constants = None +from sqlalchemy.exc import DisconnectionError, OperationalError, IntegrityError +import sqlalchemy.interfaces +import sqlalchemy.orm +from sqlalchemy.pool import NullPool, StaticPool +from sqlalchemy.sql.expression import literal_column + +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova.openstack.common.gettextutils import _ +from nova.openstack.common import timeutils + + +sql_opts = [ + cfg.StrOpt('sql_connection', + default='sqlite:///' + + os.path.abspath(os.path.join(os.path.dirname(__file__), + '../', '$sqlite_db')), + help='The SQLAlchemy connection string used to connect to the ' + 'database'), + cfg.StrOpt('sqlite_db', + default='nova.sqlite', + help='the filename to use with sqlite'), + cfg.IntOpt('sql_idle_timeout', + default=3600, + help='timeout before idle sql connections are reaped'), + cfg.BoolOpt('sqlite_synchronous', + default=True, + help='If passed, use synchronous mode for sqlite'), + cfg.IntOpt('sql_min_pool_size', + default=1, + help='Minimum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('sql_max_pool_size', + default=5, + help='Maximum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('sql_max_retries', + default=10, + help='maximum db connection retries during startup. ' + '(setting -1 implies an infinite retry count)'), + cfg.IntOpt('sql_retry_interval', + default=10, + help='interval between retries of opening a sql connection'), + cfg.IntOpt('sql_max_overflow', + default=None, + help='If set, use this value for max_overflow with sqlalchemy'), + cfg.IntOpt('sql_connection_debug', + default=0, + help='Verbosity of SQL debugging information. 0=None, ' + '100=Everything'), + cfg.BoolOpt('sql_connection_trace', + default=False, + help='Add python stack traces to SQL as comment strings'), + cfg.BoolOpt('sql_dbpool_enable', + default=False, + help="enable the use of eventlet's db_pool for MySQL"), +] + +CONF = cfg.CONF +CONF.register_opts(sql_opts) +LOG = logging.getLogger(__name__) + +_ENGINE = None +_MAKER = None + + +def set_defaults(sql_connection, sqlite_db): + """Set defaults for configuration variables.""" + cfg.set_defaults(sql_opts, + sql_connection=sql_connection, + sqlite_db=sqlite_db) + + +def get_session(autocommit=True, expire_on_commit=False): + """Return a SQLAlchemy session.""" + global _MAKER + + if _MAKER is None: + engine = get_engine() + _MAKER = get_maker(engine, autocommit, expire_on_commit) + + session = _MAKER() + return session + + +class DBError(Exception): + """Wraps an implementation specific exception.""" + def __init__(self, inner_exception=None): + self.inner_exception = inner_exception + super(DBError, self).__init__(str(inner_exception)) + + +class DBDuplicateEntry(DBError): + """Wraps an implementation specific exception.""" + def __init__(self, columns=[], inner_exception=None): + self.columns = columns + super(DBDuplicateEntry, self).__init__(inner_exception) + + +class InvalidUnicodeParameter(Exception): + message = _("Invalid Parameter: " + "Unicode is not supported by the current database.") + + +# note(boris-42): In current versions of DB backends unique constraint +# violation messages follow the structure: +# +# sqlite: +# 1 column - (IntegrityError) column c1 is not unique +# N columns - (IntegrityError) column c1, c2, ..., N are not unique +# +# postgres: +# 1 column - (IntegrityError) duplicate key value violates unique +# constraint "users_c1_key" +# N columns - (IntegrityError) duplicate key value violates unique +# constraint "name_of_our_constraint" +# +# mysql: +# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key +# 'c1'") +# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined +# with -' for key 'name_of_our_constraint'") +_RE_DB = { + "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), + "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"), + "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$") +} + + +def raise_if_duplicate_entry_error(integrity_error, engine_name): + """ + In this function will be raised DBDuplicateEntry exception if integrity + error wrap unique constraint violation. + """ + + def get_columns_from_uniq_cons_or_name(columns): + # note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3" + # means that columns c1, c2, c3 are in UniqueConstraint. + uniqbase = "uniq_" + if not columns.startswith(uniqbase): + if engine_name == "postgresql": + return [columns[columns.index("_") + 1:columns.rindex("_")]] + return [columns] + return columns[len(uniqbase):].split("_x_") + + if engine_name not in ["mysql", "sqlite", "postgresql"]: + return + + m = _RE_DB[engine_name].match(integrity_error.message) + if not m: + return + columns = m.group(1) + + if engine_name == "sqlite": + columns = columns.strip().split(", ") + else: + columns = get_columns_from_uniq_cons_or_name(columns) + raise DBDuplicateEntry(columns, integrity_error) + + +def wrap_db_error(f): + def _wrap(*args, **kwargs): + try: + return f(*args, **kwargs) + except UnicodeEncodeError: + raise InvalidUnicodeParameter() + # note(boris-42): We should catch unique constraint violation and + # wrap it by our own DBDuplicateEntry exception. Unique constraint + # violation is wrapped by IntegrityError. + except IntegrityError, e: + # note(boris-42): SqlAlchemy doesn't unify errors from different + # DBs so we must do this. Also in some tables (for example + # instance_types) there are more than one unique constraint. This + # means we should get names of columns, which values violate + # unique constraint, from error message. + raise_if_duplicate_entry_error(e, get_engine().name) + raise DBError(e) + except Exception, e: + LOG.exception(_('DB exception wrapped.')) + raise DBError(e) + _wrap.func_name = f.func_name + return _wrap + + +def get_engine(): + """Return a SQLAlchemy engine.""" + global _ENGINE + if _ENGINE is None: + _ENGINE = create_engine(CONF.sql_connection) + return _ENGINE + + +def synchronous_switch_listener(dbapi_conn, connection_rec): + """Switch sqlite connections to non-synchronous mode.""" + dbapi_conn.execute("PRAGMA synchronous = OFF") + + +def add_regexp_listener(dbapi_con, con_record): + """Add REGEXP function to sqlite connections.""" + + def regexp(expr, item): + reg = re.compile(expr) + return reg.search(unicode(item)) is not None + dbapi_con.create_function('regexp', 2, regexp) + + +def greenthread_yield(dbapi_con, con_record): + """ + Ensure other greenthreads get a chance to execute by forcing a context + switch. With common database backends (eg MySQLdb and sqlite), there is + no implicit yield caused by network I/O since they are implemented by + C libraries that eventlet cannot monkey patch. + """ + greenthread.sleep(0) + + +def ping_listener(dbapi_conn, connection_rec, connection_proxy): + """ + Ensures that MySQL connections checked out of the + pool are alive. + + Borrowed from: + http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f + """ + try: + dbapi_conn.cursor().execute('select 1') + except dbapi_conn.OperationalError, ex: + if ex.args[0] in (2006, 2013, 2014, 2045, 2055): + LOG.warn(_('Got mysql server has gone away: %s'), ex) + raise DisconnectionError("Database server went away") + else: + raise + + +def is_db_connection_error(args): + """Return True if error in connecting to db.""" + # NOTE(adam_g): This is currently MySQL specific and needs to be extended + # to support Postgres and others. + conn_err_codes = ('2002', '2003', '2006') + for err_code in conn_err_codes: + if args.find(err_code) != -1: + return True + return False + + +def create_engine(sql_connection): + """Return a new SQLAlchemy engine.""" + connection_dict = sqlalchemy.engine.url.make_url(sql_connection) + + engine_args = { + "pool_recycle": CONF.sql_idle_timeout, + "echo": False, + 'convert_unicode': True, + } + + # Map our SQL debug level to SQLAlchemy's options + if CONF.sql_connection_debug >= 100: + engine_args['echo'] = 'debug' + elif CONF.sql_connection_debug >= 50: + engine_args['echo'] = True + + if "sqlite" in connection_dict.drivername: + engine_args["poolclass"] = NullPool + + if CONF.sql_connection == "sqlite://": + engine_args["poolclass"] = StaticPool + engine_args["connect_args"] = {'check_same_thread': False} + elif all((CONF.sql_dbpool_enable, MySQLdb, + "mysql" in connection_dict.drivername)): + LOG.info(_("Using mysql/eventlet db_pool.")) + # MySQLdb won't accept 'None' in the password field + password = connection_dict.password or '' + pool_args = { + 'db': connection_dict.database, + 'passwd': password, + 'host': connection_dict.host, + 'user': connection_dict.username, + 'min_size': CONF.sql_min_pool_size, + 'max_size': CONF.sql_max_pool_size, + 'max_idle': CONF.sql_idle_timeout, + 'client_flag': mysql_client_constants.FOUND_ROWS} + + pool = db_pool.ConnectionPool(MySQLdb, **pool_args) + + def creator(): + conn = pool.create() + if isinstance(conn, tuple): + # NOTE(belliott) eventlet >= 0.10 returns a tuple + now, now, conn = conn + + return conn + + engine_args['creator'] = creator + + else: + engine_args['pool_size'] = CONF.sql_max_pool_size + if CONF.sql_max_overflow is not None: + engine_args['max_overflow'] = CONF.sql_max_overflow + + engine = sqlalchemy.create_engine(sql_connection, **engine_args) + + sqlalchemy.event.listen(engine, 'checkin', greenthread_yield) + + if 'mysql' in connection_dict.drivername: + sqlalchemy.event.listen(engine, 'checkout', ping_listener) + elif 'sqlite' in connection_dict.drivername: + if not CONF.sqlite_synchronous: + sqlalchemy.event.listen(engine, 'connect', + synchronous_switch_listener) + sqlalchemy.event.listen(engine, 'connect', add_regexp_listener) + + if (CONF.sql_connection_trace and + engine.dialect.dbapi.__name__ == 'MySQLdb'): + patch_mysqldb_with_stacktrace_comments() + + try: + engine.connect() + except OperationalError, e: + if not is_db_connection_error(e.args[0]): + raise + + remaining = CONF.sql_max_retries + if remaining == -1: + remaining = 'infinite' + while True: + msg = _('SQL connection failed. %s attempts left.') + LOG.warn(msg % remaining) + if remaining != 'infinite': + remaining -= 1 + time.sleep(CONF.sql_retry_interval) + try: + engine.connect() + break + except OperationalError, e: + if (remaining != 'infinite' and remaining == 0) or \ + not is_db_connection_error(e.args[0]): + raise + return engine + + +class Query(sqlalchemy.orm.query.Query): + """Subclass of sqlalchemy.query with soft_delete() method.""" + def soft_delete(self, synchronize_session='evaluate'): + return self.update({'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow()}, + synchronize_session=synchronize_session) + + +class Session(sqlalchemy.orm.session.Session): + """Custom Session class to avoid SqlAlchemy Session monkey patching.""" + @wrap_db_error + def query(self, *args, **kwargs): + return super(Session, self).query(*args, **kwargs) + + @wrap_db_error + def flush(self, *args, **kwargs): + return super(Session, self).flush(*args, **kwargs) + + @wrap_db_error + def execute(self, *args, **kwargs): + return super(Session, self).execute(*args, **kwargs) + + +def get_maker(engine, autocommit=True, expire_on_commit=False): + """Return a SQLAlchemy sessionmaker using the given engine.""" + return sqlalchemy.orm.sessionmaker(bind=engine, + class_=Session, + autocommit=autocommit, + expire_on_commit=expire_on_commit, + query_cls=Query) + + +def patch_mysqldb_with_stacktrace_comments(): + """Adds current stack trace as a comment in queries by patching + MySQLdb.cursors.BaseCursor._do_query. + """ + import MySQLdb.cursors + import traceback + + old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query + + def _do_query(self, q): + stack = '' + for file, line, method, function in traceback.extract_stack(): + # exclude various common things from trace + if file.endswith('session.py') and method == '_do_query': + continue + if file.endswith('api.py') and method == 'wrapper': + continue + if file.endswith('utils.py') and method == '_inner': + continue + if file.endswith('exception.py') and method == '_wrap': + continue + # db/api is just a wrapper around db/sqlalchemy/api + if file.endswith('db/api.py'): + continue + # only trace inside nova + index = file.rfind('nova') + if index == -1: + continue + stack += "File:%s:%s Method:%s() Line:%s | " \ + % (file[index:], line, method, function) + + # strip trailing " | " from stack + if stack: + stack = stack[:-3] + qq = "%s /* %s */" % (q, stack) + else: + qq = q + old_mysql_do_query(self, qq) + + setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) diff --git a/nova/openstack/common/db/sqlalchemy/utils.py b/nova/openstack/common/db/sqlalchemy/utils.py new file mode 100644 index 000000000..ef8af57ce --- /dev/null +++ b/nova/openstack/common/db/sqlalchemy/utils.py @@ -0,0 +1,132 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2010-2011 OpenStack LLC. +# Copyright 2012 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Implementation of paginate query.""" + +import sqlalchemy + +from nova.openstack.common.gettextutils import _ +from nova.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class InvalidSortKey(Exception): + message = _("Sort key supplied was not valid.") + + +# copy from glance/db/sqlalchemy/api.py +def paginate_query(query, model, limit, sort_keys, marker=None, + sort_dir=None, sort_dirs=None): + """Returns a query with sorting / pagination criteria added. + + Pagination works by requiring a unique sort_key, specified by sort_keys. + (If sort_keys is not unique, then we risk looping through values.) + We use the last row in the previous page as the 'marker' for pagination. + So we must return values that follow the passed marker in the order. + With a single-valued sort_key, this would be easy: sort_key > X. + With a compound-values sort_key, (k1, k2, k3) we must do this to repeat + the lexicographical ordering: + (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3) + + We also have to cope with different sort_directions. + + Typically, the id of the last row is used as the client-facing pagination + marker, then the actual marker object must be fetched from the db and + passed in to us as marker. + + :param query: the query object to which we should add paging/sorting + :param model: the ORM model class + :param limit: maximum number of items to return + :param sort_keys: array of attributes by which results should be sorted + :param marker: the last item of the previous page; we returns the next + results after this value. + :param sort_dir: direction in which results should be sorted (asc, desc) + :param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys + + :rtype: sqlalchemy.orm.query.Query + :return: The query with sorting/pagination added. + """ + + if 'id' not in sort_keys: + # TODO(justinsb): If this ever gives a false-positive, check + # the actual primary key, rather than assuming its id + LOG.warn(_('Id not in sort_keys; is sort_keys unique?')) + + assert(not (sort_dir and sort_dirs)) + + # Default the sort direction to ascending + if sort_dirs is None and sort_dir is None: + sort_dir = 'asc' + + # Ensure a per-column sort direction + if sort_dirs is None: + sort_dirs = [sort_dir for _sort_key in sort_keys] + + assert(len(sort_dirs) == len(sort_keys)) + + # Add sorting + for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): + sort_dir_func = { + 'asc': sqlalchemy.asc, + 'desc': sqlalchemy.desc, + }[current_sort_dir] + + try: + sort_key_attr = getattr(model, current_sort_key) + except AttributeError: + raise InvalidSortKey() + query = query.order_by(sort_dir_func(sort_key_attr)) + + # Add pagination + if marker is not None: + marker_values = [] + for sort_key in sort_keys: + v = getattr(marker, sort_key) + marker_values.append(v) + + # Build up an array of sort criteria as in the docstring + criteria_list = [] + for i in xrange(0, len(sort_keys)): + crit_attrs = [] + for j in xrange(0, i): + model_attr = getattr(model, sort_keys[j]) + crit_attrs.append((model_attr == marker_values[j])) + + model_attr = getattr(model, sort_keys[i]) + if sort_dirs[i] == 'desc': + crit_attrs.append((model_attr < marker_values[i])) + elif sort_dirs[i] == 'asc': + crit_attrs.append((model_attr > marker_values[i])) + else: + raise ValueError(_("Unknown sort direction, " + "must be 'desc' or 'asc'")) + + criteria = sqlalchemy.sql.and_(*crit_attrs) + criteria_list.append(criteria) + + f = sqlalchemy.sql.or_(*criteria_list) + query = query.filter(f) + + if limit is not None: + query = query.limit(limit) + + return query diff --git a/nova/test.py b/nova/test.py index b3f851dc4..e5c11081c 100644 --- a/nova/test.py +++ b/nova/test.py @@ -37,9 +37,9 @@ import testtools from nova import context from nova import db from nova.db import migration -from nova.db.sqlalchemy import session from nova.network import manager as network_manager from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session from nova.openstack.common import log as logging from nova.openstack.common import timeutils from nova import paths @@ -56,8 +56,9 @@ test_opts = [ CONF = cfg.CONF CONF.register_opts(test_opts) -CONF.import_opt('sql_connection', 'nova.db.sqlalchemy.session') -CONF.import_opt('sqlite_db', 'nova.db.sqlalchemy.session') +CONF.import_opt('sql_connection', + 'nova.openstack.common.db.sqlalchemy.session') +CONF.import_opt('sqlite_db', 'nova.openstack.common.db.sqlalchemy.session') CONF.set_override('use_stderr', False) logging.setup('nova') diff --git a/nova/tests/baremetal/db/test_bm_interface.py b/nova/tests/baremetal/db/test_bm_interface.py index 9f051ac9b..32beb1ce0 100644 --- a/nova/tests/baremetal/db/test_bm_interface.py +++ b/nova/tests/baremetal/db/test_bm_interface.py @@ -18,6 +18,7 @@ Bare-metal DB testcase for BareMetalInterface """ from nova import exception +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.tests.baremetal.db import base from nova.virt.baremetal import db @@ -27,7 +28,7 @@ class BareMetalInterfaceTestCase(base.BMDBTestCase): def test_unique_address(self): pif1_id = db.bm_interface_create(self.context, 1, '11:11:11:11:11:11', '0x1', 1) - self.assertRaises(exception.DBError, + self.assertRaises(db_session.DBError, db.bm_interface_create, self.context, 2, '11:11:11:11:11:11', '0x2', 2) # succeed after delete pif1 diff --git a/nova/tests/baremetal/db/test_bm_pxe_ip.py b/nova/tests/baremetal/db/test_bm_pxe_ip.py index 9a93b46ad..9820f3af0 100644 --- a/nova/tests/baremetal/db/test_bm_pxe_ip.py +++ b/nova/tests/baremetal/db/test_bm_pxe_ip.py @@ -18,6 +18,7 @@ Bare-metal DB testcase for BareMetalPxeIp """ from nova import exception +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.tests.baremetal.db import base from nova.tests.baremetal.db import utils from nova.virt.baremetal import db @@ -50,14 +51,14 @@ class BareMetalPxeIpTestCase(base.BMDBTestCase): # address duplicates i = utils.new_bm_pxe_ip(address='10.1.1.1', server_address='10.1.1.201') - self.assertRaises(exception.DBError, + self.assertRaises(db_session.DBError, db.bm_pxe_ip_create_direct, self.context, i) # server_address duplicates i = utils.new_bm_pxe_ip(address='10.1.1.3', server_address='10.1.1.101') - self.assertRaises(exception.DBError, + self.assertRaises(db_session.DBError, db.bm_pxe_ip_create_direct, self.context, i) diff --git a/nova/tests/baremetal/test_pxe.py b/nova/tests/baremetal/test_pxe.py index 09f1079bf..e50462b0e 100644 --- a/nova/tests/baremetal/test_pxe.py +++ b/nova/tests/baremetal/test_pxe.py @@ -25,6 +25,7 @@ from testtools import matchers from nova import exception from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.tests.baremetal.db import base as bm_db_base from nova.tests.baremetal.db import utils as bm_db_utils from nova.tests.image import fake as fake_image @@ -521,7 +522,7 @@ class PXEPublicMethodsTestCase(BareMetalPXETestCase): AndRaise(exception.NovaException) bm_utils.unlink_without_raise(pxe_path) self.driver._collect_mac_addresses(self.context, self.node).\ - AndRaise(exception.DBError) + AndRaise(db_session.DBError) bm_utils.rmtree_without_raise( os.path.join(CONF.baremetal.tftp_root, 'fake-uuid')) self.mox.ReplayAll() diff --git a/nova/tests/network/test_manager.py b/nova/tests/network/test_manager.py index 48183010f..9d467bcb1 100644 --- a/nova/tests/network/test_manager.py +++ b/nova/tests/network/test_manager.py @@ -29,6 +29,7 @@ from nova.network import linux_net from nova.network import manager as network_manager from nova.network import model as net_model from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import importutils from nova.openstack.common import log as logging from nova.openstack.common import rpc @@ -2046,7 +2047,7 @@ class FloatingIPTestCase(test.TestCase): # address column, so fake the collision-avoidance here def fake_vif_save(vif): if vif.address == crash_test_dummy_vif['address']: - raise exception.DBError("If you're smart, you'll retry!") + raise db_session.DBError("If you're smart, you'll retry!") self.stubs.Set(models.VirtualInterface, 'save', fake_vif_save) # Attempt to add another and make sure that both MACs are consumed diff --git a/nova/tests/test_instance_types.py b/nova/tests/test_instance_types.py index b70b96b7f..5abf62c3e 100644 --- a/nova/tests/test_instance_types.py +++ b/nova/tests/test_instance_types.py @@ -21,8 +21,8 @@ from nova.compute import instance_types from nova import context from nova import db from nova.db.sqlalchemy import models -from nova.db.sqlalchemy import session as sql_session from nova import exception +from nova.openstack.common.db.sqlalchemy import session as sql_session from nova.openstack.common import log as logging from nova import test diff --git a/nova/tests/test_sqlalchemy.py b/nova/tests/test_sqlalchemy.py deleted file mode 100644 index 5c7f4450b..000000000 --- a/nova/tests/test_sqlalchemy.py +++ /dev/null @@ -1,129 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 Rackspace Hosting -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Unit tests for SQLAlchemy specific code.""" - -from eventlet import db_pool -try: - import MySQLdb -except ImportError: - MySQLdb = None - -from sqlalchemy import Column, MetaData, Table, UniqueConstraint -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import DateTime, Integer - -from nova import context -from nova.db.sqlalchemy import models -from nova.db.sqlalchemy import session -from nova import exception -from nova import test - - -class DbPoolTestCase(test.TestCase): - def setUp(self): - super(DbPoolTestCase, self).setUp() - self.flags(sql_dbpool_enable=True) - self.user_id = 'fake' - self.project_id = 'fake' - self.context = context.RequestContext(self.user_id, self.project_id) - if not MySQLdb: - self.skipTest("Unable to test due to lack of MySQLdb") - - def test_db_pool_option(self): - self.flags(sql_idle_timeout=11, sql_min_pool_size=21, - sql_max_pool_size=42) - - info = {} - - class FakeConnectionPool(db_pool.ConnectionPool): - def __init__(self, mod_name, **kwargs): - info['module'] = mod_name - info['kwargs'] = kwargs - super(FakeConnectionPool, self).__init__(mod_name, - **kwargs) - - def connect(self, *args, **kwargs): - raise test.TestingException() - - self.stubs.Set(db_pool, 'ConnectionPool', - FakeConnectionPool) - - sql_connection = 'mysql://user:pass@127.0.0.1/nova' - self.assertRaises(test.TestingException, session.create_engine, - sql_connection) - - self.assertEqual(info['module'], MySQLdb) - self.assertEqual(info['kwargs']['max_idle'], 11) - self.assertEqual(info['kwargs']['min_size'], 21) - self.assertEqual(info['kwargs']['max_size'], 42) - - -BASE = declarative_base() -_TABLE_NAME = '__tmp__test__tmp__' - - -class TmpTable(BASE, models.NovaBase): - __tablename__ = _TABLE_NAME - id = Column(Integer, primary_key=True) - foo = Column(Integer) - - -class SessionErrorWrapperTestCase(test.TestCase): - def setUp(self): - super(SessionErrorWrapperTestCase, self).setUp() - meta = MetaData() - meta.bind = session.get_engine() - test_table = Table(_TABLE_NAME, meta, - Column('id', Integer, primary_key=True, - nullable=False), - Column('deleted', Integer, default=0), - Column('deleted_at', DateTime), - Column('updated_at', DateTime), - Column('created_at', DateTime), - Column('foo', Integer), - UniqueConstraint('foo', name='uniq_foo')) - test_table.create() - - def tearDown(self): - super(SessionErrorWrapperTestCase, self).tearDown() - meta = MetaData() - meta.bind = session.get_engine() - test_table = Table(_TABLE_NAME, meta, autoload=True) - test_table.drop() - - def test_flush_wrapper(self): - tbl = TmpTable() - tbl.update({'foo': 10}) - tbl.save() - - tbl2 = TmpTable() - tbl2.update({'foo': 10}) - self.assertRaises(exception.DBDuplicateEntry, tbl2.save) - - def test_execute_wrapper(self): - _session = session.get_session() - with _session.begin(): - for i in [10, 20]: - tbl = TmpTable() - tbl.update({'foo': i}) - tbl.save(session=_session) - - method = _session.query(TmpTable).\ - filter_by(foo=10).\ - update - self.assertRaises(exception.DBDuplicateEntry, - method, {'foo': 20}) diff --git a/nova/virt/baremetal/db/sqlalchemy/api.py b/nova/virt/baremetal/db/sqlalchemy/api.py index 34bcd1229..198c06256 100644 --- a/nova/virt/baremetal/db/sqlalchemy/api.py +++ b/nova/virt/baremetal/db/sqlalchemy/api.py @@ -351,7 +351,7 @@ def bm_interface_set_vif_uuid(context, if_id, vif_uuid): try: session.add(bm_interface) session.flush() - except exception.DBError, e: + except db_session.DBError, e: # TODO(deva): clean up when db layer raises DuplicateKeyError if str(e).find('IntegrityError') != -1: raise exception.NovaException(_("Baremetal interface %s " diff --git a/nova/virt/baremetal/db/sqlalchemy/session.py b/nova/virt/baremetal/db/sqlalchemy/session.py index fcaf210a5..06d777354 100644 --- a/nova/virt/baremetal/db/sqlalchemy/session.py +++ b/nova/virt/baremetal/db/sqlalchemy/session.py @@ -19,8 +19,8 @@ """Session Handling for SQLAlchemy backend.""" -from nova.db.sqlalchemy import session as nova_session from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as nova_session from nova import paths opts = [ @@ -38,11 +38,13 @@ CONF = cfg.CONF CONF.register_group(baremetal_group) CONF.register_opts(opts, baremetal_group) -CONF.import_opt('sqlite_db', 'nova.db.sqlalchemy.session') +CONF.import_opt('sqlite_db', 'nova.openstack.common.db.sqlalchemy.session') _ENGINE = None _MAKER = None +DBError = nova_session.DBError + def get_session(autocommit=True, expire_on_commit=False): """Return a SQLAlchemy session.""" diff --git a/nova/virt/baremetal/driver.py b/nova/virt/baremetal/driver.py index 631a9a8c4..43af951fd 100644 --- a/nova/virt/baremetal/driver.py +++ b/nova/virt/baremetal/driver.py @@ -25,6 +25,7 @@ from nova.compute import power_state from nova import context as nova_context from nova import exception from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import importutils from nova.openstack.common import log as logging from nova import paths @@ -266,7 +267,7 @@ class BareMetalDriver(driver.ComputeDriver): pm.state = baremetal_states.ERROR try: _update_state(context, node, instance, pm.state) - except exception.DBError, e: + except db_session.DBError, e: LOG.warning(_("Failed to update state record for " "baremetal node %s") % instance['uuid']) diff --git a/nova/virt/baremetal/pxe.py b/nova/virt/baremetal/pxe.py index 5a6f58655..a169e13e5 100644 --- a/nova/virt/baremetal/pxe.py +++ b/nova/virt/baremetal/pxe.py @@ -25,6 +25,7 @@ import os from nova.compute import instance_types from nova import exception from nova.openstack.common import cfg +from nova.openstack.common.db.sqlalchemy import session as db_session from nova.openstack.common import fileutils from nova.openstack.common import log as logging from nova.virt.baremetal import base @@ -411,7 +412,7 @@ class PXE(base.NodeDriver): bm_utils.unlink_without_raise(get_pxe_config_file_path(instance)) try: macs = self._collect_mac_addresses(context, node) - except exception.DBError: + except db_session.DBError: pass else: for mac in macs: diff --git a/openstack-common.conf b/openstack-common.conf index b0db41d51..29ed9d82f 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,cliutils,context,excutils,eventlet_backdoor,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,network_utils,notifier,plugin,policy,rootwrap,setup,timeutils,rpc,uuidutils,install_venv_common,flakes +modules=cfg,cliutils,context,db,db.sqlalchemy,excutils,eventlet_backdoor,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,network_utils,notifier,plugin,policy,rootwrap,setup,timeutils,rpc,uuidutils,install_venv_common,flakes # The base module to hold the copy of openstack.common base=nova -- cgit