summaryrefslogtreecommitdiffstats
path: root/nova/utils.py
diff options
context:
space:
mode:
authorJustin Santa Barbara <justin@fathomdb.com>2011-03-24 10:59:25 -0700
committerJustin Santa Barbara <justin@fathomdb.com>2011-03-24 10:59:25 -0700
commit038a629cb7cb61a58838c3fc91a204ca2892dbed (patch)
tree33e2a01e91599bb2ce98040e93139c709b826066 /nova/utils.py
parenta1e2959312b51757653447de3e8c9e92029da6fd (diff)
parent5b1abbb34c0a35d7d6d142ae9afd2cde74b1782e (diff)
downloadnova-038a629cb7cb61a58838c3fc91a204ca2892dbed.tar.gz
nova-038a629cb7cb61a58838c3fc91a204ca2892dbed.tar.xz
nova-038a629cb7cb61a58838c3fc91a204ca2892dbed.zip
Merged with trunk
Diffstat (limited to 'nova/utils.py')
-rw-r--r--nova/utils.py116
1 files changed, 99 insertions, 17 deletions
diff --git a/nova/utils.py b/nova/utils.py
index 24b8da9ea..e4d8a70eb 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -41,6 +41,7 @@ from xml.sax import saxutils
from eventlet import event
from eventlet import greenthread
+from eventlet import semaphore
from eventlet.green import subprocess
None
from nova import exception
@@ -133,13 +134,14 @@ def fetchfile(url, target):
def execute(*cmd, **kwargs):
- process_input = kwargs.get('process_input', None)
- addl_env = kwargs.get('addl_env', None)
- check_exit_code = kwargs.get('check_exit_code', 0)
- stdin = kwargs.get('stdin', subprocess.PIPE)
- stdout = kwargs.get('stdout', subprocess.PIPE)
- stderr = kwargs.get('stderr', subprocess.PIPE)
- attempts = kwargs.get('attempts', 1)
+ process_input = kwargs.pop('process_input', None)
+ addl_env = kwargs.pop('addl_env', None)
+ check_exit_code = kwargs.pop('check_exit_code', 0)
+ delay_on_retry = kwargs.pop('delay_on_retry', True)
+ attempts = kwargs.pop('attempts', 1)
+ if len(kwargs):
+ raise exception.Error(_('Got unknown keyword args '
+ 'to utils.execute: %r') % kwargs)
cmd = map(str, cmd)
while attempts > 0:
@@ -149,8 +151,11 @@ def execute(*cmd, **kwargs):
env = os.environ.copy()
if addl_env:
env.update(addl_env)
- obj = subprocess.Popen(cmd, stdin=stdin,
- stdout=stdout, stderr=stderr, env=env)
+ obj = subprocess.Popen(cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env)
result = None
if process_input != None:
result = obj.communicate(process_input)
@@ -176,7 +181,8 @@ def execute(*cmd, **kwargs):
raise
else:
LOG.debug(_("%r failed. Retrying."), cmd)
- greenthread.sleep(random.randint(20, 200) / 100.0)
+ if delay_on_retry:
+ greenthread.sleep(random.randint(20, 200) / 100.0)
def ssh_execute(ssh, cmd, process_input=None,
@@ -329,6 +335,14 @@ def utcnow():
utcnow.override_time = None
+def is_older_than(before, seconds):
+ """Return True if before is older than 'seconds'"""
+ if utcnow() - before > datetime.timedelta(seconds=seconds):
+ return True
+ else:
+ return False
+
+
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return time.mktime(utcnow().timetuple())
@@ -526,17 +540,76 @@ def loads(s):
return json.loads(s)
-def synchronized(name):
+_semaphores = {}
+
+
+class _NoopContextManager(object):
+ def __enter__(self):
+ pass
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+def synchronized(name, external=False):
+ """Synchronization decorator
+
+ Decorating a method like so:
+ @synchronized('mylock')
+ def foo(self, *args):
+ ...
+
+ ensures that only one thread will execute the bar method at a time.
+
+ Different methods can share the same lock:
+ @synchronized('mylock')
+ def foo(self, *args):
+ ...
+
+ @synchronized('mylock')
+ def bar(self, *args):
+ ...
+
+ This way only one of either foo or bar can be executing at a time.
+
+ The external keyword argument denotes whether this lock should work across
+ multiple processes. This means that if two different workers both run a
+ a method decorated with @synchronized('mylock', external=True), only one
+ of them will execute at a time.
+ """
+
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
- LOG.debug(_("Attempting to grab %(lock)s for method "
- "%(method)s..." % {"lock": name,
+ # NOTE(soren): If we ever go natively threaded, this will be racy.
+ # See http://stackoverflow.com/questions/5390569/dyn\
+ # amically-allocating-and-destroying-mutexes
+ if name not in _semaphores:
+ _semaphores[name] = semaphore.Semaphore()
+ sem = _semaphores[name]
+ LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method '
+ '"%(method)s"...' % {"lock": name,
"method": f.__name__}))
- lock = lockfile.FileLock(os.path.join(FLAGS.lock_path,
- 'nova-%s.lock' % name))
- with lock:
- return f(*args, **kwargs)
+ with sem:
+ if external:
+ LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
+ 'method "%(method)s"...' %
+ {"lock": name, "method": f.__name__}))
+ lock_file_path = os.path.join(FLAGS.lock_path,
+ 'nova-%s.lock' % name)
+ lock = lockfile.FileLock(lock_file_path)
+ else:
+ lock = _NoopContextManager()
+
+ with lock:
+ retval = f(*args, **kwargs)
+
+ # If no-one else is waiting for it, delete it.
+ # See note about possible raciness above.
+ if not sem.balance < 1:
+ del _semaphores[name]
+
+ return retval
return inner
return wrap
@@ -588,3 +661,12 @@ def get_from_path(items, path):
return results
else:
return get_from_path(results, remainder)
+
+
+def check_isinstance(obj, cls):
+ """Checks that obj is of type cls, and lets PyLint infer types"""
+ if isinstance(obj, cls):
+ return obj
+ raise Exception(_("Expected object of type: %s") % (str(cls)))
+ # TODO(justinsb): Can we make this better??
+ return cls() # Ugly PyLint hack