diff options
| author | Naveed Massjouni <naveedm9@gmail.com> | 2011-03-24 01:39:57 -0400 |
|---|---|---|
| committer | Naveed Massjouni <naveedm9@gmail.com> | 2011-03-24 01:39:57 -0400 |
| commit | 61c930606d71ed7cd5e08c238c7f297294a42f82 (patch) | |
| tree | 319d689c859daada2b221ae9c2dbde81283bc4f7 | |
| parent | dcb5ddd117c713d60f06ce335d5484ef0242c02d (diff) | |
| parent | 6eef7ff7a809fa1c5856614d54b580082fe5806c (diff) | |
Merge from trunk
| -rw-r--r-- | nova/compute/manager.py | 17 | ||||
| -rw-r--r-- | nova/crypto.py | 3 | ||||
| -rw-r--r-- | nova/db/api.py | 2 | ||||
| -rw-r--r-- | nova/network/linux_net.py | 43 | ||||
| -rw-r--r-- | nova/tests/test_misc.py | 43 | ||||
| -rw-r--r-- | nova/tests/test_virt.py | 2 | ||||
| -rw-r--r-- | nova/utils.py | 82 | ||||
| -rw-r--r-- | nova/virt/hyperv.py | 3 | ||||
| -rw-r--r-- | nova/virt/libvirt_conn.py | 29 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 91 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 4 | ||||
| -rw-r--r-- | smoketests/base.py | 4 |
12 files changed, 242 insertions, 81 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py index ac63f68ea..09af03564 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -65,8 +65,11 @@ flags.DEFINE_string('console_host', socket.gethostname(), 'Console proxy host to use to connect to instances on' 'this host.') flags.DEFINE_integer('live_migration_retry_count', 30, - ("Retry count needed in live_migration." - " sleep 1 sec for each count")) + "Retry count needed in live_migration." + " sleep 1 sec for each count") +flags.DEFINE_integer("rescue_timeout", 0, + "Automatically unrescue an instance after N seconds." + " Set to 0 to disable.") LOG = logging.getLogger('nova.compute.manager') @@ -118,8 +121,8 @@ class ComputeManager(manager.Manager): try: self.driver = utils.import_object(compute_driver) - except ImportError: - LOG.error("Unable to load the virtualization driver.") + except ImportError as e: + LOG.error(_("Unable to load the virtualization driver: %s") % (e)) sys.exit(1) self.network_manager = utils.import_object(FLAGS.network_manager) @@ -132,6 +135,12 @@ class ComputeManager(manager.Manager): """ self.driver.init_host(host=self.host) + def periodic_tasks(self, context=None): + """Tasks to be run at a periodic interval.""" + super(ComputeManager, self).periodic_tasks(context) + if FLAGS.rescue_timeout > 0: + self.driver.poll_rescued_instances(FLAGS.rescue_timeout) + def _update_state(self, context, instance_id): """Update the state of an instance from the driver info.""" # FIXME(ja): include other fields from state? diff --git a/nova/crypto.py b/nova/crypto.py index 2a8d4abca..b112e5b92 100644 --- a/nova/crypto.py +++ b/nova/crypto.py @@ -26,6 +26,7 @@ import gettext import hashlib import os import shutil +import string import struct import tempfile import time @@ -267,7 +268,7 @@ def _sign_csr(csr_text, ca_folder): './openssl.cnf', '-infiles', inbound) out, _err = utils.execute('openssl', 'x509', '-in', outbound, '-serial', '-noout') - serial = out.rpartition("=")[2] + serial = string.strip(out.rpartition("=")[2]) os.chdir(start) with open(outbound, "r") as crtfile: return (serial, crtfile.read()) diff --git a/nova/db/api.py b/nova/db/api.py index add5bd83e..afc1bff2f 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -214,7 +214,7 @@ def certificate_update(context, certificate_id, values): Raises NotFound if service does not exist. """ - return IMPL.service_update(context, certificate_id, values) + return IMPL.certificate_update(context, certificate_id, values) ################### diff --git a/nova/network/linux_net.py b/nova/network/linux_net.py index 0a273588f..796d6ba31 100644 --- a/nova/network/linux_net.py +++ b/nova/network/linux_net.py @@ -21,8 +21,6 @@ import inspect import os import calendar -from eventlet import semaphore - from nova import db from nova import exception from nova import flags @@ -272,37 +270,30 @@ class IptablesManager(object): self.ipv4['nat'].add_chain('floating-snat') self.ipv4['nat'].add_rule('snat', '-j $floating-snat') - self.semaphore = semaphore.Semaphore() - - @utils.synchronized('iptables') + @utils.synchronized('iptables', external=True) def apply(self): """Apply the current in-memory set of iptables rules This will blow away any rules left over from previous runs of the same component of Nova, and replace them with our current set of rules. This happens atomically, thanks to iptables-restore. - - We wrap the call in a semaphore lock, so that we don't race with - ourselves. In the event of a race with another component running - an iptables-* command at the same time, we retry up to 5 times. """ - with self.semaphore: - s = [('iptables', self.ipv4)] - if FLAGS.use_ipv6: - s += [('ip6tables', self.ipv6)] - - for cmd, tables in s: - for table in tables: - current_table, _ = self.execute('sudo', - '%s-save' % (cmd,), - '-t', '%s' % (table,), - attempts=5) - current_lines = current_table.split('\n') - new_filter = self._modify_rules(current_lines, - tables[table]) - self.execute('sudo', '%s-restore' % (cmd,), - process_input='\n'.join(new_filter), - attempts=5) + s = [('iptables', self.ipv4)] + if FLAGS.use_ipv6: + s += [('ip6tables', self.ipv6)] + + for cmd, tables in s: + for table in tables: + current_table, _ = self.execute('sudo', + '%s-save' % (cmd,), + '-t', '%s' % (table,), + attempts=5) + current_lines = current_table.split('\n') + new_filter = self._modify_rules(current_lines, + tables[table]) + self.execute('sudo', '%s-restore' % (cmd,), + process_input='\n'.join(new_filter), + attempts=5) def _modify_rules(self, current_lines, table, binary=None): unwrapped_chains = table.unwrapped_chains diff --git a/nova/tests/test_misc.py b/nova/tests/test_misc.py index 1fbaf304f..4e17e1ce0 100644 --- a/nova/tests/test_misc.py +++ b/nova/tests/test_misc.py @@ -18,8 +18,12 @@ import errno import os import select +from eventlet import greenpool +from eventlet import greenthread + from nova import test -from nova.utils import parse_mailmap, str_dict_replace, synchronized +from nova import utils +from nova.utils import parse_mailmap, str_dict_replace class ProjectTestCase(test.TestCase): @@ -63,7 +67,7 @@ class ProjectTestCase(test.TestCase): class LockTestCase(test.TestCase): def test_synchronized_wrapped_function_metadata(self): - @synchronized('whatever') + @utils.synchronized('whatever') def foo(): """Bar""" pass @@ -72,11 +76,42 @@ class LockTestCase(test.TestCase): self.assertEquals(foo.__name__, 'foo', "Wrapped function's name " "got mangled") - def test_synchronized(self): + def test_synchronized_internally(self): + """We can lock across multiple green threads""" + saved_sem_num = len(utils._semaphores) + seen_threads = list() + + @utils.synchronized('testlock2', external=False) + def f(id): + for x in range(10): + seen_threads.append(id) + greenthread.sleep(0) + + threads = [] + pool = greenpool.GreenPool(10) + for i in range(10): + threads.append(pool.spawn(f, i)) + + for thread in threads: + thread.wait() + + self.assertEquals(len(seen_threads), 100) + # Looking at the seen threads, split it into chunks of 10, and verify + # that the last 9 match the first in each chunk. + for i in range(10): + for j in range(9): + self.assertEquals(seen_threads[i * 10], + seen_threads[i * 10 + 1 + j]) + + self.assertEqual(saved_sem_num, len(utils._semaphores), + "Semaphore leak detected") + + def test_synchronized_externally(self): + """We can lock across multiple processes""" rpipe1, wpipe1 = os.pipe() rpipe2, wpipe2 = os.pipe() - @synchronized('testlock') + @utils.synchronized('testlock1', external=True) def f(rpipe, wpipe): try: os.write(wpipe, "foo") diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py index b214f5ce7..6bafac39f 100644 --- a/nova/tests/test_virt.py +++ b/nova/tests/test_virt.py @@ -77,13 +77,11 @@ class CacheConcurrencyTestCase(test.TestCase): eventlet.sleep(0) try: self.assertFalse(done2.ready()) - self.assertTrue('fname' in conn._image_sems) finally: wait1.send() done1.wait() eventlet.sleep(0) self.assertTrue(done2.ready()) - self.assertFalse('fname' in conn._image_sems) def test_different_fname_concurrency(self): """Ensures that two different fname caches are concurrent""" diff --git a/nova/utils.py b/nova/utils.py index 499af2039..03a6e8095 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -41,6 +41,7 @@ from xml.sax import saxutils from eventlet import event from eventlet import greenthread +from eventlet import semaphore from eventlet.green import subprocess None from nova import exception @@ -334,6 +335,14 @@ def utcnow(): utcnow.override_time = None +def is_older_than(before, seconds): + """Return True if before is older than 'seconds'""" + if utcnow() - before > datetime.timedelta(seconds=seconds): + return True + else: + return False + + def utcnow_ts(): """Timestamp version of our utcnow function.""" return time.mktime(utcnow().timetuple()) @@ -531,17 +540,76 @@ def loads(s): return json.loads(s) -def synchronized(name): +_semaphores = {} + + +class _NoopContextManager(object): + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +def synchronized(name, external=False): + """Synchronization decorator + + Decorating a method like so: + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one thread will execute the bar method at a time. + + Different methods can share the same lock: + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + The external keyword argument denotes whether this lock should work across + multiple processes. This means that if two different workers both run a + a method decorated with @synchronized('mylock', external=True), only one + of them will execute at a time. + """ + def wrap(f): @functools.wraps(f) def inner(*args, **kwargs): - LOG.debug(_("Attempting to grab %(lock)s for method " - "%(method)s..." % {"lock": name, + # NOTE(soren): If we ever go natively threaded, this will be racy. + # See http://stackoverflow.com/questions/5390569/dyn\ + # amically-allocating-and-destroying-mutexes + if name not in _semaphores: + _semaphores[name] = semaphore.Semaphore() + sem = _semaphores[name] + LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method ' + '"%(method)s"...' % {"lock": name, "method": f.__name__})) - lock = lockfile.FileLock(os.path.join(FLAGS.lock_path, - 'nova-%s.lock' % name)) - with lock: - return f(*args, **kwargs) + with sem: + if external: + LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' + 'method "%(method)s"...' % + {"lock": name, "method": f.__name__})) + lock_file_path = os.path.join(FLAGS.lock_path, + 'nova-%s.lock' % name) + lock = lockfile.FileLock(lock_file_path) + else: + lock = _NoopContextManager() + + with lock: + retval = f(*args, **kwargs) + + # If no-one else is waiting for it, delete it. + # See note about possible raciness above. + if not sem.balance < 1: + del _semaphores[name] + + return retval return inner return wrap diff --git a/nova/virt/hyperv.py b/nova/virt/hyperv.py index 29d18dac5..75fed6d4f 100644 --- a/nova/virt/hyperv.py +++ b/nova/virt/hyperv.py @@ -467,3 +467,6 @@ class HyperVConnection(object): if vm is None: raise exception.NotFound('Cannot detach volume from missing %s ' % instance_name) + + def poll_rescued_instances(self, timeout): + pass diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index f264cf619..67094320e 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -48,7 +48,7 @@ from xml.dom import minidom from eventlet import greenthread from eventlet import tpool -from eventlet import semaphore + import IPy from nova import context @@ -417,6 +417,10 @@ class LibvirtConnection(object): self.reboot(instance) @exception.wrap_exception + def poll_rescued_instances(self, timeout): + pass + + @exception.wrap_exception def spawn(self, instance): xml = self.to_xml(instance) db.instance_set_state(context.get_admin_context(), @@ -556,13 +560,12 @@ class LibvirtConnection(object): os.mkdir(base_dir) base = os.path.join(base_dir, fname) - if fname not in LibvirtConnection._image_sems: - LibvirtConnection._image_sems[fname] = semaphore.Semaphore() - with LibvirtConnection._image_sems[fname]: + @utils.synchronized(fname) + def call_if_not_exists(base, fn, *args, **kwargs): if not os.path.exists(base): fn(target=base, *args, **kwargs) - if not LibvirtConnection._image_sems[fname].locked(): - del LibvirtConnection._image_sems[fname] + + call_if_not_exists(base, fn, *args, **kwargs) if cow: utils.execute('qemu-img', 'create', '-f', 'qcow2', '-o', @@ -1780,15 +1783,15 @@ class IptablesFirewallDriver(FirewallDriver): pass def refresh_security_group_rules(self, security_group): - # We use the semaphore to make sure noone applies the rule set - # after we've yanked the existing rules but before we've put in - # the new ones. - with self.iptables.semaphore: - for instance in self.instances.values(): - self.remove_filters_for_instance(instance) - self.add_filters_for_instance(instance) + self.do_refresh_security_group_rules(security_group) self.iptables.apply() + @utils.synchronized('iptables', external=True) + def do_refresh_security_group_rules(self, security_group): + for instance in self.instances.values(): + self.remove_filters_for_instance(instance) + self.add_filters_for_instance(instance) + def _security_group_chain_name(self, security_group_id): return 'nova-sg-%s' % (security_group_id,) diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 872e67f01..cfc3a1c22 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -51,6 +51,7 @@ class VMOps(object): def __init__(self, session): self.XenAPI = session.get_imported_xenapi() self._session = session + self.poll_rescue_last_ran = None VMHelper.XenAPI = self.XenAPI @@ -488,6 +489,10 @@ class VMOps(object): except self.XenAPI.Failure, exc: LOG.exception(exc) + def _shutdown_rescue(self, rescue_vm_ref): + """Shutdown a rescue instance""" + self._session.call_xenapi("Async.VM.hard_shutdown", rescue_vm_ref) + def _destroy_vdis(self, instance, vm_ref): """Destroys all VDIs associated with a VM""" instance_id = instance.id @@ -505,6 +510,24 @@ class VMOps(object): except self.XenAPI.Failure, exc: LOG.exception(exc) + def _destroy_rescue_vdis(self, rescue_vm_ref): + """Destroys all VDIs associated with a rescued VM""" + vdi_refs = VMHelper.lookup_vm_vdis(self._session, rescue_vm_ref) + for vdi_ref in vdi_refs: + try: + self._session.call_xenapi("Async.VDI.destroy", vdi_ref) + except self.XenAPI.Failure: + continue + + def _destroy_rescue_vbds(self, rescue_vm_ref): + """Destroys all VBDs tied to a rescue VM""" + vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref) + for vbd_ref in vbd_refs: + vbd_rec = self._session.get_xenapi().VBD.get_record(vbd_ref) + if vbd_rec["userdevice"] == "1": # primary VBD is always 1 + VMHelper.unplug_vbd(self._session, vbd_ref) + VMHelper.destroy_vbd(self._session, vbd_ref) + def _destroy_kernel_ramdisk(self, instance, vm_ref): """ Three situations can occur: @@ -555,6 +578,14 @@ class VMOps(object): LOG.debug(_("Instance %(instance_id)s VM destroyed") % locals()) + def _destroy_rescue_instance(self, rescue_vm_ref): + """Destroy a rescue instance""" + self._destroy_rescue_vbds(rescue_vm_ref) + self._shutdown_rescue(rescue_vm_ref) + self._destroy_rescue_vdis(rescue_vm_ref) + + self._session.call_xenapi("Async.VM.destroy", rescue_vm_ref) + def destroy(self, instance): """ Destroy VM instance @@ -658,40 +689,56 @@ class VMOps(object): """ rescue_vm_ref = VMHelper.lookup(self._session, - instance.name + "-rescue") + instance.name + "-rescue") if not rescue_vm_ref: raise exception.NotFound(_( "Instance is not in Rescue Mode: %s" % instance.name)) original_vm_ref = self._get_vm_opaque_ref(instance) - vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref) - instance._rescue = False - for vbd_ref in vbd_refs: - _vbd_ref = self._session.get_xenapi().VBD.get_record(vbd_ref) - if _vbd_ref["userdevice"] == "1": - VMHelper.unplug_vbd(self._session, vbd_ref) - VMHelper.destroy_vbd(self._session, vbd_ref) + self._destroy_rescue_instance(rescue_vm_ref) + self._release_bootlock(original_vm_ref) + self._start(instance, original_vm_ref) - task1 = self._session.call_xenapi("Async.VM.hard_shutdown", - rescue_vm_ref) - self._session.wait_for_task(task1, instance.id) + def poll_rescued_instances(self, timeout): + """Look for expirable rescued instances + - forcibly exit rescue mode for any instances that have been + in rescue mode for >= the provided timeout + """ + last_ran = self.poll_rescue_last_ran + if last_ran: + if not utils.is_older_than(last_ran, timeout): + # Do not run. Let's bail. + return + else: + # Update the time tracker and proceed. + self.poll_rescue_last_ran = utils.utcnow() + else: + # We need a base time to start tracking. + self.poll_rescue_last_ran = utils.utcnow() + return - vdi_refs = VMHelper.lookup_vm_vdis(self._session, rescue_vm_ref) - for vdi_ref in vdi_refs: - try: - task = self._session.call_xenapi('Async.VDI.destroy', vdi_ref) - self._session.wait_for_task(task, instance.id) - except self.XenAPI.Failure: - continue + rescue_vms = [] + for instance in self.list_instances(): + if instance.endswith("-rescue"): + rescue_vms.append(dict(name=instance, + vm_ref=VMHelper.lookup(self._session, + instance))) - task2 = self._session.call_xenapi('Async.VM.destroy', rescue_vm_ref) - self._session.wait_for_task(task2, instance.id) + for vm in rescue_vms: + rescue_name = vm["name"] + rescue_vm_ref = vm["vm_ref"] - self._release_bootlock(original_vm_ref) - self._start(instance, original_vm_ref) + self._destroy_rescue_instance(rescue_vm_ref) + + original_name = vm["name"].split("-rescue", 1)[0] + original_vm_ref = VMHelper.lookup(self._session, original_name) + + self._release_bootlock(original_vm_ref) + self._session.call_xenapi("VM.start", original_vm_ref, False, + False) def get_info(self, instance): """Return data about VM instance""" diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index da2fb51f1..2884687fb 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -223,6 +223,10 @@ class XenAPIConnection(object): """Unrescue the specified instance""" self._vmops.unrescue(instance, callback) + def poll_rescued_instances(self, timeout): + """Poll for rescued instances""" + self._vmops.poll_rescued_instances(timeout) + def reset_network(self, instance): """reset networking for specified instance""" self._vmops.reset_network(instance) diff --git a/smoketests/base.py b/smoketests/base.py index 3e2446c9a..31d82b20b 100644 --- a/smoketests/base.py +++ b/smoketests/base.py @@ -32,7 +32,6 @@ SUITE_NAMES = '[image, instance, volume]' FLAGS = flags.FLAGS flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES) flags.DEFINE_integer('ssh_tries', 3, 'Numer of times to try ssh') -boto_v6 = None class SmokeTestCase(unittest.TestCase): @@ -183,6 +182,9 @@ class SmokeTestCase(unittest.TestCase): TEST_DATA = {} +if FLAGS.use_ipv6: + global boto_v6 + boto_v6 = __import__('boto_v6') class UserSmokeTestCase(SmokeTestCase): |
