diff options
author | Justin Santa Barbara <justin@fathomdb.com> | 2011-03-24 10:59:25 -0700 |
---|---|---|
committer | Justin Santa Barbara <justin@fathomdb.com> | 2011-03-24 10:59:25 -0700 |
commit | 038a629cb7cb61a58838c3fc91a204ca2892dbed (patch) | |
tree | 33e2a01e91599bb2ce98040e93139c709b826066 /nova/utils.py | |
parent | a1e2959312b51757653447de3e8c9e92029da6fd (diff) | |
parent | 5b1abbb34c0a35d7d6d142ae9afd2cde74b1782e (diff) | |
download | nova-038a629cb7cb61a58838c3fc91a204ca2892dbed.tar.gz nova-038a629cb7cb61a58838c3fc91a204ca2892dbed.tar.xz nova-038a629cb7cb61a58838c3fc91a204ca2892dbed.zip |
Merged with trunk
Diffstat (limited to 'nova/utils.py')
-rw-r--r-- | nova/utils.py | 116 |
1 files changed, 99 insertions, 17 deletions
diff --git a/nova/utils.py b/nova/utils.py index 24b8da9ea..e4d8a70eb 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -41,6 +41,7 @@ from xml.sax import saxutils from eventlet import event from eventlet import greenthread +from eventlet import semaphore from eventlet.green import subprocess None from nova import exception @@ -133,13 +134,14 @@ def fetchfile(url, target): def execute(*cmd, **kwargs): - process_input = kwargs.get('process_input', None) - addl_env = kwargs.get('addl_env', None) - check_exit_code = kwargs.get('check_exit_code', 0) - stdin = kwargs.get('stdin', subprocess.PIPE) - stdout = kwargs.get('stdout', subprocess.PIPE) - stderr = kwargs.get('stderr', subprocess.PIPE) - attempts = kwargs.get('attempts', 1) + process_input = kwargs.pop('process_input', None) + addl_env = kwargs.pop('addl_env', None) + check_exit_code = kwargs.pop('check_exit_code', 0) + delay_on_retry = kwargs.pop('delay_on_retry', True) + attempts = kwargs.pop('attempts', 1) + if len(kwargs): + raise exception.Error(_('Got unknown keyword args ' + 'to utils.execute: %r') % kwargs) cmd = map(str, cmd) while attempts > 0: @@ -149,8 +151,11 @@ def execute(*cmd, **kwargs): env = os.environ.copy() if addl_env: env.update(addl_env) - obj = subprocess.Popen(cmd, stdin=stdin, - stdout=stdout, stderr=stderr, env=env) + obj = subprocess.Popen(cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env) result = None if process_input != None: result = obj.communicate(process_input) @@ -176,7 +181,8 @@ def execute(*cmd, **kwargs): raise else: LOG.debug(_("%r failed. Retrying."), cmd) - greenthread.sleep(random.randint(20, 200) / 100.0) + if delay_on_retry: + greenthread.sleep(random.randint(20, 200) / 100.0) def ssh_execute(ssh, cmd, process_input=None, @@ -329,6 +335,14 @@ def utcnow(): utcnow.override_time = None +def is_older_than(before, seconds): + """Return True if before is older than 'seconds'""" + if utcnow() - before > datetime.timedelta(seconds=seconds): + return True + else: + return False + + def utcnow_ts(): """Timestamp version of our utcnow function.""" return time.mktime(utcnow().timetuple()) @@ -526,17 +540,76 @@ def loads(s): return json.loads(s) -def synchronized(name): +_semaphores = {} + + +class _NoopContextManager(object): + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +def synchronized(name, external=False): + """Synchronization decorator + + Decorating a method like so: + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one thread will execute the bar method at a time. + + Different methods can share the same lock: + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + The external keyword argument denotes whether this lock should work across + multiple processes. This means that if two different workers both run a + a method decorated with @synchronized('mylock', external=True), only one + of them will execute at a time. + """ + def wrap(f): @functools.wraps(f) def inner(*args, **kwargs): - LOG.debug(_("Attempting to grab %(lock)s for method " - "%(method)s..." % {"lock": name, + # NOTE(soren): If we ever go natively threaded, this will be racy. + # See http://stackoverflow.com/questions/5390569/dyn\ + # amically-allocating-and-destroying-mutexes + if name not in _semaphores: + _semaphores[name] = semaphore.Semaphore() + sem = _semaphores[name] + LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method ' + '"%(method)s"...' % {"lock": name, "method": f.__name__})) - lock = lockfile.FileLock(os.path.join(FLAGS.lock_path, - 'nova-%s.lock' % name)) - with lock: - return f(*args, **kwargs) + with sem: + if external: + LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' + 'method "%(method)s"...' % + {"lock": name, "method": f.__name__})) + lock_file_path = os.path.join(FLAGS.lock_path, + 'nova-%s.lock' % name) + lock = lockfile.FileLock(lock_file_path) + else: + lock = _NoopContextManager() + + with lock: + retval = f(*args, **kwargs) + + # If no-one else is waiting for it, delete it. + # See note about possible raciness above. + if not sem.balance < 1: + del _semaphores[name] + + return retval return inner return wrap @@ -588,3 +661,12 @@ def get_from_path(items, path): return results else: return get_from_path(results, remainder) + + +def check_isinstance(obj, cls): + """Checks that obj is of type cls, and lets PyLint infer types""" + if isinstance(obj, cls): + return obj + raise Exception(_("Expected object of type: %s") % (str(cls))) + # TODO(justinsb): Can we make this better?? + return cls() # Ugly PyLint hack |