summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Still <mikal@stillhq.com>2012-10-10 17:28:11 +1100
committerMichael Still <mikal@stillhq.com>2012-10-18 09:09:17 -0700
commit9a8c1d7e1a537e6580be2be1630baefc910de1de (patch)
treefe6c40b44c3b9d5b1dcaefc4945bc98e01956698
parent7695f967a3c750642504aa60f748111f64880a07 (diff)
downloadoslo-9a8c1d7e1a537e6580be2be1630baefc910de1de.tar.gz
oslo-9a8c1d7e1a537e6580be2be1630baefc910de1de.tar.xz
oslo-9a8c1d7e1a537e6580be2be1630baefc910de1de.zip
Move nova's util.synchronized decorator to openstack common.
In the end I needed to port utils.ensure_tree as well. Resolves bug 1063230. Change-Id: I6e6fa8201de2cac3f17e6c60d7b16f7df7c64116
-rw-r--r--openstack/common/fileutils.py35
-rw-r--r--openstack/common/lockutils.py232
-rw-r--r--tests/unit/test_fileutils.py37
-rw-r--r--tests/unit/test_lockutils.py165
4 files changed, 469 insertions, 0 deletions
diff --git a/openstack/common/fileutils.py b/openstack/common/fileutils.py
new file mode 100644
index 0000000..4746ad4
--- /dev/null
+++ b/openstack/common/fileutils.py
@@ -0,0 +1,35 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import errno
+import os
+
+
+def ensure_tree(path):
+ """Create a directory (and any ancestor directories required)
+
+ :param path: Directory to create
+ """
+ try:
+ os.makedirs(path)
+ except OSError as exc:
+ if exc.errno == errno.EEXIST:
+ if not os.path.isdir(path):
+ raise
+ else:
+ raise
diff --git a/openstack/common/lockutils.py b/openstack/common/lockutils.py
new file mode 100644
index 0000000..3a35b89
--- /dev/null
+++ b/openstack/common/lockutils.py
@@ -0,0 +1,232 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import errno
+import functools
+import os
+import shutil
+import time
+import weakref
+
+from eventlet import greenthread
+from eventlet import semaphore
+
+from openstack.common import cfg
+from openstack.common import fileutils
+from openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+util_opts = [
+ cfg.BoolOpt('disable_process_locking', default=False,
+ help='Whether to disable inter-process locks'),
+ cfg.StrOpt('lock_path',
+ default=os.path.abspath(os.path.join(os.path.dirname(__file__),
+ '../')),
+ help='Directory to use for lock files')
+]
+
+
+CONF = cfg.CONF
+CONF.register_opts(util_opts)
+
+
+class _InterProcessLock(object):
+ """Lock implementation which allows multiple locks, working around
+ issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
+ not require any cleanup. Since the lock is always held on a file
+ descriptor rather than outside of the process, the lock gets dropped
+ automatically if the process crashes, even if __exit__ is not executed.
+
+ There are no guarantees regarding usage by multiple green threads in a
+ single process here. This lock works only between processes. Exclusive
+ access between local threads should be achieved using the semaphores
+ in the @synchronized decorator.
+
+ Note these locks are released when the descriptor is closed, so it's not
+ safe to close the file descriptor while another green thread holds the
+ lock. Just opening and closing the lock file can break synchronisation,
+ so lock files must be accessed only using this abstraction.
+ """
+
+ def __init__(self, name):
+ self.lockfile = None
+ self.fname = name
+
+ def __enter__(self):
+ self.lockfile = open(self.fname, 'w')
+
+ while True:
+ try:
+ # Using non-blocking locks since green threads are not
+ # patched to deal with blocking locking calls.
+ # Also upon reading the MSDN docs for locking(), it seems
+ # to have a laughable 10 attempts "blocking" mechanism.
+ self.trylock()
+ return self
+ except IOError, e:
+ if e.errno in (errno.EACCES, errno.EAGAIN):
+ # external locks synchronise things like iptables
+ # updates - give it some time to prevent busy spinning
+ time.sleep(0.01)
+ else:
+ raise
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ try:
+ self.unlock()
+ self.lockfile.close()
+ except IOError:
+ LOG.exception(_("Could not release the acquired lock `%s`"),
+ self.fname)
+
+ def trylock(self):
+ raise NotImplementedError()
+
+ def unlock(self):
+ raise NotImplementedError()
+
+
+class _WindowsLock(_InterProcessLock):
+ def trylock(self):
+ msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
+
+ def unlock(self):
+ msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
+
+
+class _PosixLock(_InterProcessLock):
+ def trylock(self):
+ fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
+
+ def unlock(self):
+ fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
+
+
+if os.name == 'nt':
+ import msvcrt
+ InterProcessLock = _WindowsLock
+else:
+ import fcntl
+ InterProcessLock = _PosixLock
+
+_semaphores = weakref.WeakValueDictionary()
+
+
+def synchronized(name, lock_file_prefix, external=False, lock_path=None):
+ """Synchronization decorator.
+
+ Decorating a method like so::
+
+ @synchronized('mylock')
+ def foo(self, *args):
+ ...
+
+ ensures that only one thread will execute the bar method at a time.
+
+ Different methods can share the same lock::
+
+ @synchronized('mylock')
+ def foo(self, *args):
+ ...
+
+ @synchronized('mylock')
+ def bar(self, *args):
+ ...
+
+ This way only one of either foo or bar can be executing at a time.
+
+ The lock_file_prefix argument is used to provide lock files on disk with a
+ meaningful prefix. The prefix should end with a hyphen ('-') if specified.
+
+ The external keyword argument denotes whether this lock should work across
+ multiple processes. This means that if two different workers both run a
+ a method decorated with @synchronized('mylock', external=True), only one
+ of them will execute at a time.
+
+ The lock_path keyword argument is used to specify a special location for
+ external lock files to live. If nothing is set, then CONF.lock_path is
+ used as a default.
+ """
+
+ def wrap(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ # NOTE(soren): If we ever go natively threaded, this will be racy.
+ # See http://stackoverflow.com/questions/5390569/dyn
+ # amically-allocating-and-destroying-mutexes
+ sem = _semaphores.get(name, semaphore.Semaphore())
+ if name not in _semaphores:
+ # this check is not racy - we're already holding ref locally
+ # so GC won't remove the item and there was no IO switch
+ # (only valid in greenthreads)
+ _semaphores[name] = sem
+
+ with sem:
+ LOG.debug(_('Got semaphore "%(lock)s" for method '
+ '"%(method)s"...'), {'lock': name,
+ 'method': f.__name__})
+ if external and not CONF.disable_process_locking:
+ LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
+ 'method "%(method)s"...'),
+ {'lock': name, 'method': f.__name__})
+ cleanup_dir = False
+
+ # We need a copy of lock_path because it is non-local
+ local_lock_path = lock_path
+ if not local_lock_path:
+ local_lock_path = CONF.lock_path
+
+ if not local_lock_path:
+ cleanup_dir = True
+ local_lock_path = tempfile.mkdtemp()
+
+ if not os.path.exists(local_lock_path):
+ cleanup_dir = True
+ fileutils.ensure_tree(local_lock_path)
+
+ # NOTE(mikal): the lock name cannot contain directory
+ # separators
+ safe_name = name.replace(os.sep, '_')
+ lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
+ lock_file_path = os.path.join(local_lock_path,
+ lock_file_name)
+
+ try:
+ lock = InterProcessLock(lock_file_path)
+ with lock:
+ LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
+ 'for method "%(method)s"...'),
+ {'lock': name,
+ 'path': lock_file_path,
+ 'method': f.__name__})
+ retval = f(*args, **kwargs)
+ finally:
+ # NOTE(vish): This removes the tempdir if we needed
+ # to create one. This is used to cleanup
+ # the locks left behind by unit tests.
+ if cleanup_dir:
+ shutil.rmtree(local_lock_path)
+ else:
+ retval = f(*args, **kwargs)
+
+ return retval
+ return inner
+ return wrap
diff --git a/tests/unit/test_fileutils.py b/tests/unit/test_fileutils.py
new file mode 100644
index 0000000..de5719e
--- /dev/null
+++ b/tests/unit/test_fileutils.py
@@ -0,0 +1,37 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import os
+import shutil
+import tempfile
+
+from openstack.common import fileutils
+
+
+class EnsureTree(unittest.TestCase):
+ def test_ensure_tree(self):
+ tmpdir = tempfile.mkdtemp()
+ try:
+ testdir = '%s/foo/bar/baz' % (tmpdir,)
+ fileutils.ensure_tree(testdir)
+ self.assertTrue(os.path.isdir(testdir))
+
+ finally:
+ if os.path.exists(tmpdir):
+ shutil.rmtree(tmpdir)
diff --git a/tests/unit/test_lockutils.py b/tests/unit/test_lockutils.py
new file mode 100644
index 0000000..5f50217
--- /dev/null
+++ b/tests/unit/test_lockutils.py
@@ -0,0 +1,165 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import errno
+import os
+import select
+import shutil
+import tempfile
+import time
+import unittest
+
+import eventlet
+from eventlet import greenthread
+from eventlet import greenpool
+
+from openstack.common import lockutils
+from tests import utils as test_utils
+
+
+class TestFileLocks(test_utils.BaseTestCase):
+ def test_concurrent_green_lock_succeeds(self):
+ """Verify spawn_n greenthreads with two locks run concurrently."""
+ tmpdir = tempfile.mkdtemp()
+ try:
+ self.completed = False
+
+ def locka(wait):
+ a = lockutils.InterProcessLock(os.path.join(tmpdir, 'a'))
+ with a:
+ wait.wait()
+ self.completed = True
+
+ def lockb(wait):
+ b = lockutils.InterProcessLock(os.path.join(tmpdir, 'b'))
+ with b:
+ wait.wait()
+
+ wait1 = eventlet.event.Event()
+ wait2 = eventlet.event.Event()
+ pool = greenpool.GreenPool()
+ pool.spawn_n(locka, wait1)
+ pool.spawn_n(lockb, wait2)
+ wait2.send()
+ eventlet.sleep(0)
+ wait1.send()
+ pool.waitall()
+
+ self.assertTrue(self.completed)
+
+ finally:
+ if os.path.exists(tmpdir):
+ shutil.rmtree(tmpdir)
+
+
+class LockTestCase(test_utils.BaseTestCase):
+ def test_synchronized_wrapped_function_metadata(self):
+ @lockutils.synchronized('whatever', 'test-')
+ def foo():
+ """Bar"""
+ pass
+
+ self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring "
+ "got lost")
+ self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
+ "got mangled")
+
+ def test_synchronized_internally(self):
+ """We can lock across multiple green threads"""
+ saved_sem_num = len(lockutils._semaphores)
+ seen_threads = list()
+
+ @lockutils.synchronized('testlock2', 'test-', external=False)
+ def f(id):
+ for x in range(10):
+ seen_threads.append(id)
+ greenthread.sleep(0)
+
+ threads = []
+ pool = greenpool.GreenPool(10)
+ for i in range(10):
+ threads.append(pool.spawn(f, i))
+
+ for thread in threads:
+ thread.wait()
+
+ self.assertEquals(len(seen_threads), 100)
+ # Looking at the seen threads, split it into chunks of 10, and verify
+ # that the last 9 match the first in each chunk.
+ for i in range(10):
+ for j in range(9):
+ self.assertEquals(seen_threads[i * 10],
+ seen_threads[i * 10 + 1 + j])
+
+ self.assertEqual(saved_sem_num, len(lockutils._semaphores),
+ "Semaphore leak detected")
+
+ def test_nested_external_works(self):
+ """We can nest external syncs"""
+ tempdir = tempfile.mkdtemp()
+ try:
+ self.config(lock_path=tempdir)
+ sentinel = object()
+
+ @lockutils.synchronized('testlock1', 'test-', external=True)
+ def outer_lock():
+
+ @lockutils.synchronized('testlock2', 'test-', external=True)
+ def inner_lock():
+ return sentinel
+ return inner_lock()
+
+ self.assertEqual(sentinel, outer_lock())
+
+ finally:
+ if os.path.exists(tempdir):
+ shutil.rmtree(tempdir)
+
+ def test_synchronized_externally(self):
+ """We can lock across multiple processes"""
+ tempdir = tempfile.mkdtemp()
+ self.config(lock_path=tempdir)
+ rpipe1, wpipe1 = os.pipe()
+ rpipe2, wpipe2 = os.pipe()
+
+ @lockutils.synchronized('testlock1', 'test-', external=True)
+ def f(rpipe, wpipe):
+ try:
+ os.write(wpipe, "foo")
+ except OSError, e:
+ self.assertEquals(e.errno, errno.EPIPE)
+ return
+
+ rfds, _wfds, _efds = select.select([rpipe], [], [], 1)
+ self.assertEquals(len(rfds), 0, "The other process, which was "
+ "supposed to be locked, "
+ "wrote on its end of the "
+ "pipe")
+ os.close(rpipe)
+
+ pid = os.fork()
+ if pid > 0:
+ os.close(wpipe1)
+ os.close(rpipe2)
+
+ f(rpipe1, wpipe2)
+ else:
+ os.close(rpipe1)
+ os.close(wpipe2)
+
+ time.sleep(0.1)
+ f(rpipe2, wpipe1)
+ os._exit(0)