diff options
| author | Chris Behrens <cbehrens@codestud.com> | 2011-05-13 16:55:18 +0000 |
|---|---|---|
| committer | Tarmac <> | 2011-05-13 16:55:18 +0000 |
| commit | 715d7ae9a3dc3804b0bcea0830ebd0f1322e16fe (patch) | |
| tree | 7780f3b1f490b679ab4e497340bfbe62d7654f18 | |
| parent | 0805521c79f934ba54f839be64a2c43ed177612d (diff) | |
| parent | 3f247a628c954d5d4d97def6e6a2f889ab7ec7e3 (diff) | |
XenAPI was not implemented to allow for multiple simultaneous XenAPI requests. A single XenAPIConnection (and thus XenAPISession) is used for all queries. XenAPISession's wait_for_task method would set a self.loop = for looping calls to _poll_task until task completion. Subsequent (parallel) calls to wait_for_task for another query would overwrite this. XenAPISession._poll_task was pulled into the XenAPISession.wait_for_task method to avoid having to store self.loop.
| -rw-r--r-- | nova/tests/test_xenapi.py | 23 | ||||
| -rw-r--r-- | nova/tests/xenapi/stubs.py | 39 | ||||
| -rw-r--r-- | nova/utils.py | 2 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 86 |
4 files changed, 76 insertions, 74 deletions
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index 6072f5455..be1e35697 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -16,6 +16,7 @@ """Test suite for XenAPI.""" +import eventlet import functools import json import os @@ -198,6 +199,28 @@ class XenAPIVMTestCase(test.TestCase): self.context = context.RequestContext('fake', 'fake', False) self.conn = xenapi_conn.get_connection(False) + def test_parallel_builds(self): + stubs.stubout_loopingcall_delay(self.stubs) + + def _do_build(id, proj, user, *args): + values = { + 'id': id, + 'project_id': proj, + 'user_id': user, + 'image_id': 1, + 'kernel_id': 2, + 'ramdisk_id': 3, + 'instance_type_id': '3', # m1.large + 'mac_address': 'aa:bb:cc:dd:ee:ff', + 'os_type': 'linux'} + instance = db.instance_create(self.context, values) + self.conn.spawn(instance) + + gt1 = eventlet.spawn(_do_build, 1, self.project.id, self.user.id) + gt2 = eventlet.spawn(_do_build, 2, self.project.id, self.user.id) + gt1.wait() + gt2.wait() + def test_list_instances_0(self): instances = self.conn.list_instances() self.assertEquals(instances, []) diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 205f6c902..4833ccb07 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -16,6 +16,7 @@ """Stubouts, mocks and fixtures for the test suite""" +import eventlet from nova.virt import xenapi_conn from nova.virt.xenapi import fake from nova.virt.xenapi import volume_utils @@ -28,29 +29,6 @@ def stubout_instance_snapshot(stubs): @classmethod def fake_fetch_image(cls, session, instance_id, image, user, project, type): - # Stubout wait_for_task - def fake_wait_for_task(self, task, id): - class FakeEvent: - - def send(self, value): - self.rv = value - - def wait(self): - return self.rv - - done = FakeEvent() - self._poll_task(id, task, done) - rv = done.wait() - return rv - - def fake_loop(self): - pass - - stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', - fake_wait_for_task) - - stubs.Set(xenapi_conn.XenAPISession, '_stop_loop', fake_loop) - from nova.virt.xenapi.fake import create_vdi name_label = "instance-%s" % instance_id #TODO: create fake SR record @@ -63,11 +41,6 @@ def stubout_instance_snapshot(stubs): stubs.Set(vm_utils.VMHelper, 'fetch_image', fake_fetch_image) - def fake_parse_xmlrpc_value(val): - return val - - stubs.Set(xenapi_conn, '_parse_xmlrpc_value', fake_parse_xmlrpc_value) - def fake_wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref, original_parent_uuid): from nova.virt.xenapi.fake import create_vdi @@ -144,6 +117,16 @@ def stubout_loopingcall_start(stubs): stubs.Set(utils.LoopingCall, 'start', fake_start) +def stubout_loopingcall_delay(stubs): + def fake_start(self, interval, now=True): + self._running = True + eventlet.sleep(1) + self.f(*self.args, **self.kw) + # This would fail before parallel xenapi calls were fixed + assert self._running == False + stubs.Set(utils.LoopingCall, 'start', fake_start) + + class FakeSessionForVMTests(fake.SessionBase): """ Stubs out a XenAPISession for VM tests """ def __init__(self, uri): diff --git a/nova/utils.py b/nova/utils.py index b55e83e5a..0c469b1de 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -462,6 +462,8 @@ class LoopingCall(object): try: while self._running: self.f(*self.args, **self.kw) + if not self._running: + break greenthread.sleep(interval) except LoopingCallDone, e: self.stop() diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index 8e9085277..eb572f295 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -347,7 +347,6 @@ class XenAPISession(object): "(is the Dom0 disk full?)")) with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): self._session.login_with_password(user, pw) - self.loop = None def get_imported_xenapi(self): """Stubout point. This can be replaced with a mock xenapi module.""" @@ -384,57 +383,52 @@ class XenAPISession(object): def wait_for_task(self, task, id=None): """Return the result of the given task. The task is polled - until it completes. Not re-entrant.""" + until it completes.""" done = event.Event() - self.loop = utils.LoopingCall(self._poll_task, id, task, done) - self.loop.start(FLAGS.xenapi_task_poll_interval, now=True) - rv = done.wait() - self.loop.stop() - return rv - - def _stop_loop(self): - """Stop polling for task to finish.""" - #NOTE(sandy-walsh) Had to break this call out to support unit tests. - if self.loop: - self.loop.stop() + loop = utils.LoopingCall(f=None) + + def _poll_task(): + """Poll the given XenAPI task, and return the result if the + action was completed successfully or not. + """ + try: + name = self._session.xenapi.task.get_name_label(task) + status = self._session.xenapi.task.get_status(task) + if id: + action = dict( + instance_id=int(id), + action=name[0:255], # Ensure action is never > 255 + error=None) + if status == "pending": + return + elif status == "success": + result = self._session.xenapi.task.get_result(task) + LOG.info(_("Task [%(name)s] %(task)s status:" + " success %(result)s") % locals()) + done.send(_parse_xmlrpc_value(result)) + else: + error_info = self._session.xenapi.task.get_error_info(task) + action["error"] = str(error_info) + LOG.warn(_("Task [%(name)s] %(task)s status:" + " %(status)s %(error_info)s") % locals()) + done.send_exception(self.XenAPI.Failure(error_info)) + + if id: + db.instance_action_create(context.get_admin_context(), + action) + except self.XenAPI.Failure, exc: + LOG.warn(exc) + done.send_exception(*sys.exc_info()) + loop.stop() + + loop.f = _poll_task + loop.start(FLAGS.xenapi_task_poll_interval, now=True) + return done.wait() def _create_session(self, url): """Stubout point. This can be replaced with a mock session.""" return self.XenAPI.Session(url) - def _poll_task(self, id, task, done): - """Poll the given XenAPI task, and fire the given action if we - get a result. - """ - try: - name = self._session.xenapi.task.get_name_label(task) - status = self._session.xenapi.task.get_status(task) - if id: - action = dict( - instance_id=int(id), - action=name[0:255], # Ensure action is never > 255 - error=None) - if status == "pending": - return - elif status == "success": - result = self._session.xenapi.task.get_result(task) - LOG.info(_("Task [%(name)s] %(task)s status:" - " success %(result)s") % locals()) - done.send(_parse_xmlrpc_value(result)) - else: - error_info = self._session.xenapi.task.get_error_info(task) - action["error"] = str(error_info) - LOG.warn(_("Task [%(name)s] %(task)s status:" - " %(status)s %(error_info)s") % locals()) - done.send_exception(self.XenAPI.Failure(error_info)) - - if id: - db.instance_action_create(context.get_admin_context(), action) - except self.XenAPI.Failure, exc: - LOG.warn(exc) - done.send_exception(*sys.exc_info()) - self._stop_loop() - def _unwrap_plugin_exceptions(self, func, *args, **kwargs): """Parse exception details""" try: |
