diff options
| author | Michael Still <mikal@stillhq.com> | 2012-10-10 17:28:11 +1100 |
|---|---|---|
| committer | Michael Still <mikal@stillhq.com> | 2012-10-18 09:09:17 -0700 |
| commit | 9a8c1d7e1a537e6580be2be1630baefc910de1de (patch) | |
| tree | fe6c40b44c3b9d5b1dcaefc4945bc98e01956698 | |
| parent | 7695f967a3c750642504aa60f748111f64880a07 (diff) | |
| download | oslo-9a8c1d7e1a537e6580be2be1630baefc910de1de.tar.gz oslo-9a8c1d7e1a537e6580be2be1630baefc910de1de.tar.xz oslo-9a8c1d7e1a537e6580be2be1630baefc910de1de.zip | |
Move nova's util.synchronized decorator to openstack common.
In the end I needed to port utils.ensure_tree as well. Resolves
bug 1063230.
Change-Id: I6e6fa8201de2cac3f17e6c60d7b16f7df7c64116
| -rw-r--r-- | openstack/common/fileutils.py | 35 | ||||
| -rw-r--r-- | openstack/common/lockutils.py | 232 | ||||
| -rw-r--r-- | tests/unit/test_fileutils.py | 37 | ||||
| -rw-r--r-- | tests/unit/test_lockutils.py | 165 |
4 files changed, 469 insertions, 0 deletions
diff --git a/openstack/common/fileutils.py b/openstack/common/fileutils.py new file mode 100644 index 0000000..4746ad4 --- /dev/null +++ b/openstack/common/fileutils.py @@ -0,0 +1,35 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import errno +import os + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required) + + :param path: Directory to create + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + raise diff --git a/openstack/common/lockutils.py b/openstack/common/lockutils.py new file mode 100644 index 0000000..3a35b89 --- /dev/null +++ b/openstack/common/lockutils.py @@ -0,0 +1,232 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import errno +import functools +import os +import shutil +import time +import weakref + +from eventlet import greenthread +from eventlet import semaphore + +from openstack.common import cfg +from openstack.common import fileutils +from openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +util_opts = [ + cfg.BoolOpt('disable_process_locking', default=False, + help='Whether to disable inter-process locks'), + cfg.StrOpt('lock_path', + default=os.path.abspath(os.path.join(os.path.dirname(__file__), + '../')), + help='Directory to use for lock files') +] + + +CONF = cfg.CONF +CONF.register_opts(util_opts) + + +class _InterProcessLock(object): + """Lock implementation which allows multiple locks, working around + issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does + not require any cleanup. Since the lock is always held on a file + descriptor rather than outside of the process, the lock gets dropped + automatically if the process crashes, even if __exit__ is not executed. + + There are no guarantees regarding usage by multiple green threads in a + single process here. This lock works only between processes. Exclusive + access between local threads should be achieved using the semaphores + in the @synchronized decorator. + + Note these locks are released when the descriptor is closed, so it's not + safe to close the file descriptor while another green thread holds the + lock. Just opening and closing the lock file can break synchronisation, + so lock files must be accessed only using this abstraction. + """ + + def __init__(self, name): + self.lockfile = None + self.fname = name + + def __enter__(self): + self.lockfile = open(self.fname, 'w') + + while True: + try: + # Using non-blocking locks since green threads are not + # patched to deal with blocking locking calls. + # Also upon reading the MSDN docs for locking(), it seems + # to have a laughable 10 attempts "blocking" mechanism. + self.trylock() + return self + except IOError, e: + if e.errno in (errno.EACCES, errno.EAGAIN): + # external locks synchronise things like iptables + # updates - give it some time to prevent busy spinning + time.sleep(0.01) + else: + raise + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.unlock() + self.lockfile.close() + except IOError: + LOG.exception(_("Could not release the acquired lock `%s`"), + self.fname) + + def trylock(self): + raise NotImplementedError() + + def unlock(self): + raise NotImplementedError() + + +class _WindowsLock(_InterProcessLock): + def trylock(self): + msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1) + + def unlock(self): + msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1) + + +class _PosixLock(_InterProcessLock): + def trylock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) + + def unlock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_UN) + + +if os.name == 'nt': + import msvcrt + InterProcessLock = _WindowsLock +else: + import fcntl + InterProcessLock = _PosixLock + +_semaphores = weakref.WeakValueDictionary() + + +def synchronized(name, lock_file_prefix, external=False, lock_path=None): + """Synchronization decorator. + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one thread will execute the bar method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + The lock_file_prefix argument is used to provide lock files on disk with a + meaningful prefix. The prefix should end with a hyphen ('-') if specified. + + The external keyword argument denotes whether this lock should work across + multiple processes. This means that if two different workers both run a + a method decorated with @synchronized('mylock', external=True), only one + of them will execute at a time. + + The lock_path keyword argument is used to specify a special location for + external lock files to live. If nothing is set, then CONF.lock_path is + used as a default. + """ + + def wrap(f): + @functools.wraps(f) + def inner(*args, **kwargs): + # NOTE(soren): If we ever go natively threaded, this will be racy. + # See http://stackoverflow.com/questions/5390569/dyn + # amically-allocating-and-destroying-mutexes + sem = _semaphores.get(name, semaphore.Semaphore()) + if name not in _semaphores: + # this check is not racy - we're already holding ref locally + # so GC won't remove the item and there was no IO switch + # (only valid in greenthreads) + _semaphores[name] = sem + + with sem: + LOG.debug(_('Got semaphore "%(lock)s" for method ' + '"%(method)s"...'), {'lock': name, + 'method': f.__name__}) + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' + 'method "%(method)s"...'), + {'lock': name, 'method': f.__name__}) + cleanup_dir = False + + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path + if not local_lock_path: + local_lock_path = CONF.lock_path + + if not local_lock_path: + cleanup_dir = True + local_lock_path = tempfile.mkdtemp() + + if not os.path.exists(local_lock_path): + cleanup_dir = True + fileutils.ensure_tree(local_lock_path) + + # NOTE(mikal): the lock name cannot contain directory + # separators + safe_name = name.replace(os.sep, '_') + lock_file_name = '%s%s' % (lock_file_prefix, safe_name) + lock_file_path = os.path.join(local_lock_path, + lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock: + LOG.debug(_('Got file lock "%(lock)s" at %(path)s ' + 'for method "%(method)s"...'), + {'lock': name, + 'path': lock_file_path, + 'method': f.__name__}) + retval = f(*args, **kwargs) + finally: + # NOTE(vish): This removes the tempdir if we needed + # to create one. This is used to cleanup + # the locks left behind by unit tests. + if cleanup_dir: + shutil.rmtree(local_lock_path) + else: + retval = f(*args, **kwargs) + + return retval + return inner + return wrap diff --git a/tests/unit/test_fileutils.py b/tests/unit/test_fileutils.py new file mode 100644 index 0000000..de5719e --- /dev/null +++ b/tests/unit/test_fileutils.py @@ -0,0 +1,37 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +import os +import shutil +import tempfile + +from openstack.common import fileutils + + +class EnsureTree(unittest.TestCase): + def test_ensure_tree(self): + tmpdir = tempfile.mkdtemp() + try: + testdir = '%s/foo/bar/baz' % (tmpdir,) + fileutils.ensure_tree(testdir) + self.assertTrue(os.path.isdir(testdir)) + + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) diff --git a/tests/unit/test_lockutils.py b/tests/unit/test_lockutils.py new file mode 100644 index 0000000..5f50217 --- /dev/null +++ b/tests/unit/test_lockutils.py @@ -0,0 +1,165 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Justin Santa Barbara +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import errno +import os +import select +import shutil +import tempfile +import time +import unittest + +import eventlet +from eventlet import greenthread +from eventlet import greenpool + +from openstack.common import lockutils +from tests import utils as test_utils + + +class TestFileLocks(test_utils.BaseTestCase): + def test_concurrent_green_lock_succeeds(self): + """Verify spawn_n greenthreads with two locks run concurrently.""" + tmpdir = tempfile.mkdtemp() + try: + self.completed = False + + def locka(wait): + a = lockutils.InterProcessLock(os.path.join(tmpdir, 'a')) + with a: + wait.wait() + self.completed = True + + def lockb(wait): + b = lockutils.InterProcessLock(os.path.join(tmpdir, 'b')) + with b: + wait.wait() + + wait1 = eventlet.event.Event() + wait2 = eventlet.event.Event() + pool = greenpool.GreenPool() + pool.spawn_n(locka, wait1) + pool.spawn_n(lockb, wait2) + wait2.send() + eventlet.sleep(0) + wait1.send() + pool.waitall() + + self.assertTrue(self.completed) + + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + + +class LockTestCase(test_utils.BaseTestCase): + def test_synchronized_wrapped_function_metadata(self): + @lockutils.synchronized('whatever', 'test-') + def foo(): + """Bar""" + pass + + self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring " + "got lost") + self.assertEquals(foo.__name__, 'foo', "Wrapped function's name " + "got mangled") + + def test_synchronized_internally(self): + """We can lock across multiple green threads""" + saved_sem_num = len(lockutils._semaphores) + seen_threads = list() + + @lockutils.synchronized('testlock2', 'test-', external=False) + def f(id): + for x in range(10): + seen_threads.append(id) + greenthread.sleep(0) + + threads = [] + pool = greenpool.GreenPool(10) + for i in range(10): + threads.append(pool.spawn(f, i)) + + for thread in threads: + thread.wait() + + self.assertEquals(len(seen_threads), 100) + # Looking at the seen threads, split it into chunks of 10, and verify + # that the last 9 match the first in each chunk. + for i in range(10): + for j in range(9): + self.assertEquals(seen_threads[i * 10], + seen_threads[i * 10 + 1 + j]) + + self.assertEqual(saved_sem_num, len(lockutils._semaphores), + "Semaphore leak detected") + + def test_nested_external_works(self): + """We can nest external syncs""" + tempdir = tempfile.mkdtemp() + try: + self.config(lock_path=tempdir) + sentinel = object() + + @lockutils.synchronized('testlock1', 'test-', external=True) + def outer_lock(): + + @lockutils.synchronized('testlock2', 'test-', external=True) + def inner_lock(): + return sentinel + return inner_lock() + + self.assertEqual(sentinel, outer_lock()) + + finally: + if os.path.exists(tempdir): + shutil.rmtree(tempdir) + + def test_synchronized_externally(self): + """We can lock across multiple processes""" + tempdir = tempfile.mkdtemp() + self.config(lock_path=tempdir) + rpipe1, wpipe1 = os.pipe() + rpipe2, wpipe2 = os.pipe() + + @lockutils.synchronized('testlock1', 'test-', external=True) + def f(rpipe, wpipe): + try: + os.write(wpipe, "foo") + except OSError, e: + self.assertEquals(e.errno, errno.EPIPE) + return + + rfds, _wfds, _efds = select.select([rpipe], [], [], 1) + self.assertEquals(len(rfds), 0, "The other process, which was " + "supposed to be locked, " + "wrote on its end of the " + "pipe") + os.close(rpipe) + + pid = os.fork() + if pid > 0: + os.close(wpipe1) + os.close(rpipe2) + + f(rpipe1, wpipe2) + else: + os.close(rpipe1) + os.close(wpipe2) + + time.sleep(0.1) + f(rpipe2, wpipe1) + os._exit(0) |
