diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-02-06 14:46:50 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-02-06 14:46:50 +0000 |
| commit | 51bee3c7b53acca108f191a152a3783600d0f51a (patch) | |
| tree | 47ba93156e93f51fd9ee587a0bcd268acf700136 /openstack | |
| parent | e7c973ed8d708cf950acb25612b646260bccaca0 (diff) | |
| parent | 51efba78bdcee821937c28c1973ec80e8c2d59ae (diff) | |
| download | oslo-51bee3c7b53acca108f191a152a3783600d0f51a.tar.gz oslo-51bee3c7b53acca108f191a152a3783600d0f51a.tar.xz oslo-51bee3c7b53acca108f191a152a3783600d0f51a.zip | |
Merge "Emit a warning if RPC calls made with lock."
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/local.py | 11 | ||||
| -rw-r--r-- | openstack/common/lockutils.py | 105 | ||||
| -rw-r--r-- | openstack/common/rpc/__init__.py | 60 |
3 files changed, 117 insertions, 59 deletions
diff --git a/openstack/common/local.py b/openstack/common/local.py index 19d9627..8bdc837 100644 --- a/openstack/common/local.py +++ b/openstack/common/local.py @@ -26,6 +26,9 @@ class WeakLocal(corolocal.local): def __getattribute__(self, attr): rval = corolocal.local.__getattribute__(self, attr) if rval: + # NOTE(mikal): this bit is confusing. What is stored is a weak + # reference, not the value itself. We therefore need to lookup + # the weak reference and return the inner value here. rval = rval() return rval @@ -34,4 +37,12 @@ class WeakLocal(corolocal.local): return corolocal.local.__setattr__(self, attr, value) +# NOTE(mikal): the name "store" should be deprecated in the future store = WeakLocal() + +# A "weak" store uses weak references and allows an object to fall out of scope +# when it falls out of scope in the code that uses the thread local storage. A +# "strong" store will hold a reference to the object so that it never falls out +# of scope. +weak_store = WeakLocal() +strong_store = corolocal.local diff --git a/openstack/common/lockutils.py b/openstack/common/lockutils.py index 21115fd..6513fd8 100644 --- a/openstack/common/lockutils.py +++ b/openstack/common/lockutils.py @@ -29,6 +29,7 @@ from eventlet import semaphore from openstack.common import cfg from openstack.common import fileutils from openstack.common.gettextutils import _ +from openstack.common import local from openstack.common import log as logging @@ -139,7 +140,7 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): def foo(self, *args): ... - ensures that only one thread will execute the bar method at a time. + ensures that only one thread will execute the foo method at a time. Different methods can share the same lock:: @@ -183,54 +184,66 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): LOG.debug(_('Got semaphore "%(lock)s" for method ' '"%(method)s"...'), {'lock': name, 'method': f.__name__}) - if external and not CONF.disable_process_locking: - LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' - 'method "%(method)s"...'), - {'lock': name, 'method': f.__name__}) - cleanup_dir = False - - # We need a copy of lock_path because it is non-local - local_lock_path = lock_path - if not local_lock_path: - local_lock_path = CONF.lock_path - - if not local_lock_path: - cleanup_dir = True - local_lock_path = tempfile.mkdtemp() - - if not os.path.exists(local_lock_path): - cleanup_dir = True - fileutils.ensure_tree(local_lock_path) - - # NOTE(mikal): the lock name cannot contain directory - # separators - safe_name = name.replace(os.sep, '_') - lock_file_name = '%s%s' % (lock_file_prefix, safe_name) - lock_file_path = os.path.join(local_lock_path, - lock_file_name) - - try: - lock = InterProcessLock(lock_file_path) - with lock: - LOG.debug(_('Got file lock "%(lock)s" at %(path)s ' - 'for method "%(method)s"...'), + + # NOTE(mikal): I know this looks odd + if not hasattr(local.strong_store, 'locks_held'): + local.strong_store.locks_held = [] + local.strong_store.locks_held.append(name) + + try: + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s" ' + 'for method "%(method)s"...'), + {'lock': name, 'method': f.__name__}) + cleanup_dir = False + + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path + if not local_lock_path: + local_lock_path = CONF.lock_path + + if not local_lock_path: + cleanup_dir = True + local_lock_path = tempfile.mkdtemp() + + if not os.path.exists(local_lock_path): + cleanup_dir = True + fileutils.ensure_tree(local_lock_path) + + # NOTE(mikal): the lock name cannot contain directory + # separators + safe_name = name.replace(os.sep, '_') + lock_file_name = '%s%s' % (lock_file_prefix, safe_name) + lock_file_path = os.path.join(local_lock_path, + lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock: + LOG.debug(_('Got file lock "%(lock)s" at ' + '%(path)s for method ' + '"%(method)s"...'), + {'lock': name, + 'path': lock_file_path, + 'method': f.__name__}) + retval = f(*args, **kwargs) + finally: + LOG.debug(_('Released file lock "%(lock)s" at ' + '%(path)s for method "%(method)s"...'), {'lock': name, 'path': lock_file_path, 'method': f.__name__}) - retval = f(*args, **kwargs) - finally: - LOG.debug(_('Released file lock "%(lock)s" at %(path)s' - ' for method "%(method)s"...'), - {'lock': name, - 'path': lock_file_path, - 'method': f.__name__}) - # NOTE(vish): This removes the tempdir if we needed - # to create one. This is used to cleanup - # the locks left behind by unit tests. - if cleanup_dir: - shutil.rmtree(local_lock_path) - else: - retval = f(*args, **kwargs) + # NOTE(vish): This removes the tempdir if we needed + # to create one. This is used to + # cleanup the locks left behind by unit + # tests. + if cleanup_dir: + shutil.rmtree(local_lock_path) + else: + retval = f(*args, **kwargs) + + finally: + local.strong_store.locks_held.remove(name) return retval return inner diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 9303505..a786ae0 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see: rpc.proxy """ +import inspect +import logging + from openstack.common import cfg +from openstack.common.gettextutils import _ from openstack.common import importutils +from openstack.common import local + + +LOG = logging.getLogger(__name__) rpc_opts = [ @@ -62,7 +70,8 @@ rpc_opts = [ help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] -cfg.CONF.register_opts(rpc_opts) +CONF = cfg.CONF +CONF.register_opts(rpc_opts) def set_defaults(control_exchange): @@ -83,10 +92,27 @@ def create_connection(new=True): :returns: An instance of openstack.common.rpc.common.Connection """ - return _get_impl().create_connection(cfg.CONF, new=new) + return _get_impl().create_connection(CONF, new=new) + + +def _check_for_lock(): + if not CONF.debug: + return None + + if ((hasattr(local.strong_store, 'locks_held') + and local.strong_store.locks_held)): + stack = ' :: '.join([frame[3] for frame in inspect.stack()]) + LOG.warn(_('A RPC is being made while holding a lock. The locks ' + 'currently held are %(locks)s. This is probably a bug. ' + 'Please report it. Include the following: [%(stack)s].'), + {'locks': local.strong_store.locks_held, + 'stack': stack}) + return True + + return False -def call(context, topic, msg, timeout=None): +def call(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method that returns something. :param context: Information that identifies the user that has made this @@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: A dict from the remote method. :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().call(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().call(CONF, context, topic, msg, timeout) def cast(context, topic, msg): @@ -124,7 +154,7 @@ def cast(context, topic, msg): :returns: None """ - return _get_impl().cast(cfg.CONF, context, topic, msg) + return _get_impl().cast(CONF, context, topic, msg) def fanout_cast(context, topic, msg): @@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg): :returns: None """ - return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) + return _get_impl().fanout_cast(CONF, context, topic, msg) -def multicall(context, topic, msg, timeout=None): +def multicall(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method and get back an iterator. In this case, the remote method will be returning multiple values in @@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: An iterator. The iterator will yield a tuple (N, X) where N is an index that starts at 0 and increases by one for each value @@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().multicall(CONF, context, topic, msg, timeout) def notify(context, topic, msg, envelope=False): @@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, + return _get_impl().cast_to_server(CONF, context, server_params, topic, msg) @@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, + return _get_impl().fanout_cast_to_server(CONF, context, server_params, topic, msg) @@ -263,10 +297,10 @@ def _get_impl(): global _RPCIMPL if _RPCIMPL is None: try: - _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + _RPCIMPL = importutils.import_module(CONF.rpc_backend) except ImportError: # For backwards compatibility with older nova config. - impl = cfg.CONF.rpc_backend.replace('nova.rpc', - 'nova.openstack.common.rpc') + impl = CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') _RPCIMPL = importutils.import_module(impl) return _RPCIMPL |
