diff options
-rw-r--r-- | nova/manager.py | 3 | ||||
-rw-r--r-- | nova/objectstore/image.py | 3 | ||||
-rw-r--r-- | nova/rpc.py | 75 | ||||
-rw-r--r-- | nova/service.py | 1 | ||||
-rw-r--r-- | nova/test.py | 52 | ||||
-rw-r--r-- | nova/tests/api/__init__.py | 3 | ||||
-rw-r--r-- | nova/tests/auth_unittest.py | 5 | ||||
-rw-r--r-- | nova/tests/cloud_unittest.py | 4 | ||||
-rw-r--r-- | nova/tests/compute_unittest.py | 39 | ||||
-rw-r--r-- | nova/tests/rpc_unittest.py | 34 | ||||
-rw-r--r-- | nova/tests/service_unittest.py | 9 | ||||
-rw-r--r-- | nova/tests/virt_unittest.py | 7 | ||||
-rw-r--r-- | nova/tests/volume_unittest.py | 56 | ||||
-rw-r--r-- | nova/utils.py | 23 | ||||
-rw-r--r-- | nova/virt/images.py | 9 | ||||
-rw-r--r-- | nova/virt/libvirt_conn.py | 28 | ||||
-rw-r--r-- | nova/virt/xenapi/vm_utils.py | 28 | ||||
-rw-r--r-- | nova/virt/xenapi_conn.py | 1 | ||||
-rw-r--r-- | nova/volume/driver.py | 131 | ||||
-rw-r--r-- | run_tests.py | 14 |
20 files changed, 230 insertions, 295 deletions
diff --git a/nova/manager.py b/nova/manager.py index 5e067bd08..a343d7fc6 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -55,7 +55,6 @@ from nova import utils from nova import flags from nova.db import base -from twisted.internet import defer FLAGS = flags.FLAGS @@ -69,7 +68,7 @@ class Manager(base.Base): def periodic_tasks(self, context=None): """Tasks to be run at a periodic interval""" - yield + return def init_host(self): """Do any initialization that needs to be run if this is a standalone diff --git a/nova/objectstore/image.py b/nova/objectstore/image.py index 7292dbab8..2fe0b0117 100644 --- a/nova/objectstore/image.py +++ b/nova/objectstore/image.py @@ -26,6 +26,7 @@ Requires decryption using keys in the manifest. import binascii import glob import json +import logging import os import shutil import tarfile @@ -264,6 +265,8 @@ class Image(object): if err: raise exception.Error("Failed to decrypt initialization " "vector: %s" % err) + logging.debug(iv) + _out, err = utils.execute( 'openssl enc -d -aes-128-cbc -in %s -K %s -iv %s -out %s' % (encrypted_filename, key, iv, decrypted_filename), diff --git a/nova/rpc.py b/nova/rpc.py index 86a29574f..652b9e4aa 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -25,18 +25,18 @@ import json import logging import sys import time +import traceback import uuid from carrot import connection as carrot_connection from carrot import messaging from eventlet import greenthread -from twisted.internet import defer -from twisted.internet import task +from nova import context from nova import exception from nova import fakerabbit from nova import flags -from nova import context +from nova import utils FLAGS = flags.FLAGS @@ -128,17 +128,9 @@ class Consumer(messaging.Consumer): def attach_to_eventlet(self): """Only needed for unit tests!""" - def fetch_repeatedly(): - while True: - self.fetch(enable_callbacks=True) - greenthread.sleep(0.1) - greenthread.spawn(fetch_repeatedly) - - def attach_to_twisted(self): - """Attach a callback to twisted that fires 10 times a second""" - loop = task.LoopingCall(self.fetch, enable_callbacks=True) - loop.start(interval=0.1) - return loop + timer = utils.LoopingCall(self.fetch, enable_callbacks=True) + timer.start(0.1) + return timer class Publisher(messaging.Publisher): @@ -197,10 +189,13 @@ class AdapterConsumer(TopicConsumer): node_args = dict((str(k), v) for k, v in args.iteritems()) # NOTE(vish): magic is fun! # pylint: disable-msg=W0142 - d = defer.maybeDeferred(node_func, context=ctxt, **node_args) - if msg_id: - d.addCallback(lambda rval: msg_reply(msg_id, rval, None)) - d.addErrback(lambda e: msg_reply(msg_id, None, e)) + try: + rval = node_func(context=ctxt, **node_args) + if msg_id: + msg_reply(msg_id, rval, None) + except Exception as e: + if msg_id: + msg_reply(msg_id, None, sys.exc_info()) return @@ -244,11 +239,11 @@ def msg_reply(msg_id, reply=None, failure=None): failure should be a twisted failure object""" if failure: - message = failure.getErrorMessage() - traceback = failure.getTraceback() + message = str(failure[1]) + tb = traceback.format_exception(*failure) logging.error("Returning exception %s to caller", message) - logging.error(traceback) - failure = (failure.type.__name__, str(failure.value), traceback) + logging.error(tb) + failure = (failure[0].__name__, str(failure[1]), tb) conn = Connection.instance() publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: @@ -313,8 +308,8 @@ def call(context, topic, msg): _pack_context(msg, context) class WaitMessage(object): - def __call__(self, data, message): + LOG.debug('data %s, msg %s', data, message) """Acks message and sets result.""" message.ack() if data['failure']: @@ -337,41 +332,11 @@ def call(context, topic, msg): except StopIteration: pass consumer.close() + if isinstance(wait_msg.result, Exception): + raise wait_msg.result return wait_msg.result -def call_twisted(context, topic, msg): - """Sends a message on a topic and wait for a response""" - LOG.debug("Making asynchronous call...") - msg_id = uuid.uuid4().hex - msg.update({'_msg_id': msg_id}) - LOG.debug("MSG_ID is %s" % (msg_id)) - _pack_context(msg, context) - - conn = Connection.instance() - d = defer.Deferred() - consumer = DirectConsumer(connection=conn, msg_id=msg_id) - - def deferred_receive(data, message): - """Acks message and callbacks or errbacks""" - message.ack() - if data['failure']: - return d.errback(RemoteError(*data['failure'])) - else: - return d.callback(data['result']) - - consumer.register_callback(deferred_receive) - injected = consumer.attach_to_twisted() - - # clean up after the injected listened and return x - d.addCallback(lambda x: injected.stop() and x or x) - - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() - return d - - def cast(context, topic, msg): """Sends a message on a topic without waiting for a response""" LOG.debug("Making asynchronous cast...") diff --git a/nova/service.py b/nova/service.py index 9454d4049..55a0bb212 100644 --- a/nova/service.py +++ b/nova/service.py @@ -160,7 +160,6 @@ class Service(object, service.Service): except exception.NotFound: logging.warn("Service killed that has no database entry") - @defer.inlineCallbacks def periodic_tasks(self): """Tasks to be run at a periodic interval""" yield self.manager.periodic_tasks(context.get_admin_context()) diff --git a/nova/test.py b/nova/test.py index 5c2a72819..bbf063aca 100644 --- a/nova/test.py +++ b/nova/test.py @@ -25,11 +25,11 @@ and some black magic for inline callbacks. import datetime import sys import time +import unittest import mox import stubout from twisted.internet import defer -from twisted.trial import unittest from nova import context from nova import db @@ -94,7 +94,7 @@ class TrialTestCase(unittest.TestCase): db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host, self.start) db.network_disassociate_all(ctxt) - rpc.Consumer.attach_to_twisted = self.originalAttach + rpc.Consumer.attach_to_eventlet = self.originalAttach for x in self.injected: try: x.stop() @@ -125,31 +125,31 @@ class TrialTestCase(unittest.TestCase): for k, v in self._original_flags.iteritems(): setattr(FLAGS, k, v) - def run(self, result=None): - test_method = getattr(self, self._testMethodName) - setattr(self, - self._testMethodName, - self._maybeInlineCallbacks(test_method, result)) - rv = super(TrialTestCase, self).run(result) - setattr(self, self._testMethodName, test_method) - return rv - - def _maybeInlineCallbacks(self, func, result): - def _wrapped(): - g = func() - if isinstance(g, defer.Deferred): - return g - if not hasattr(g, 'send'): - return defer.succeed(g) - - inlined = defer.inlineCallbacks(func) - d = inlined() - return d - _wrapped.func_name = func.func_name - return _wrapped + #def run(self, result=None): + # test_method = getattr(self, self._testMethodName) + # setattr(self, + # self._testMethodName, + # self._maybeInlineCallbacks(test_method, result)) + # rv = super(TrialTestCase, self).run(result) + # setattr(self, self._testMethodName, test_method) + # return rv + + #def _maybeInlineCallbacks(self, func, result): + # def _wrapped(): + # g = func() + # if isinstance(g, defer.Deferred): + # return g + # if not hasattr(g, 'send'): + # return defer.succeed(g) + + # inlined = defer.inlineCallbacks(func) + # d = inlined() + # return d + # _wrapped.func_name = func.func_name + # return _wrapped def _monkey_patch_attach(self): - self.originalAttach = rpc.Consumer.attach_to_twisted + self.originalAttach = rpc.Consumer.attach_to_eventlet def _wrapped(innerSelf): rv = self.originalAttach(innerSelf) @@ -157,4 +157,4 @@ class TrialTestCase(unittest.TestCase): return rv _wrapped.func_name = self.originalAttach.func_name - rpc.Consumer.attach_to_twisted = _wrapped + rpc.Consumer.attach_to_eventlet = _wrapped diff --git a/nova/tests/api/__init__.py b/nova/tests/api/__init__.py index 9caa8c9d0..cdc1bbf00 100644 --- a/nova/tests/api/__init__.py +++ b/nova/tests/api/__init__.py @@ -78,4 +78,5 @@ class Test(unittest.TestCase): if __name__ == '__main__': - unittest.main() + pass + #unittest.main() diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py index fe891beee..129ff223d 100644 --- a/nova/tests/auth_unittest.py +++ b/nova/tests/auth_unittest.py @@ -16,10 +16,13 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +#import logging from M2Crypto import X509 import unittest +import eventlet +logging = eventlet.import_patched('logging') + from nova import crypto from nova import flags from nova import test diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 9886a2449..b7b856da5 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -27,8 +27,6 @@ import tempfile import time from eventlet import greenthread -from twisted.internet import defer -import unittest from xml.etree import ElementTree from nova import context @@ -186,7 +184,7 @@ class CloudTestCase(test.TrialTestCase): logging.debug("Need to watch instance %s until it's running..." % instance['instance_id']) while True: - rv = yield defer.succeed(time.sleep(1)) + greenthread.sleep(1) info = self.cloud._get_instance(instance['instance_id']) logging.debug(info['state']) if info['state'] == power_state.RUNNING: diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py index 6f3ef96cb..67cea72c9 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/compute_unittest.py @@ -22,8 +22,6 @@ Tests For Compute import datetime import logging -from twisted.internet import defer - from nova import context from nova import db from nova import exception @@ -33,6 +31,7 @@ from nova import utils from nova.auth import manager from nova.compute import api as compute_api + FLAGS = flags.FLAGS @@ -94,24 +93,22 @@ class ComputeTestCase(test.TrialTestCase): db.security_group_destroy(self.context, group['id']) db.instance_destroy(self.context, ref[0]['id']) - @defer.inlineCallbacks def test_run_terminate(self): """Make sure it is possible to run and terminate instance""" instance_id = self._create_instance() - yield self.compute.run_instance(self.context, instance_id) + self.compute.run_instance(self.context, instance_id) instances = db.instance_get_all(context.get_admin_context()) logging.info("Running instances: %s", instances) self.assertEqual(len(instances), 1) - yield self.compute.terminate_instance(self.context, instance_id) + self.compute.terminate_instance(self.context, instance_id) instances = db.instance_get_all(context.get_admin_context()) logging.info("After terminating instances: %s", instances) self.assertEqual(len(instances), 0) - @defer.inlineCallbacks def test_run_terminate_timestamps(self): """Make sure timestamps are set for launched and destroyed""" instance_id = self._create_instance() @@ -119,42 +116,40 @@ class ComputeTestCase(test.TrialTestCase): self.assertEqual(instance_ref['launched_at'], None) self.assertEqual(instance_ref['deleted_at'], None) launch = datetime.datetime.utcnow() - yield self.compute.run_instance(self.context, instance_id) + self.compute.run_instance(self.context, instance_id) instance_ref = db.instance_get(self.context, instance_id) self.assert_(instance_ref['launched_at'] > launch) self.assertEqual(instance_ref['deleted_at'], None) terminate = datetime.datetime.utcnow() - yield self.compute.terminate_instance(self.context, instance_id) + self.compute.terminate_instance(self.context, instance_id) self.context = self.context.elevated(True) instance_ref = db.instance_get(self.context, instance_id) self.assert_(instance_ref['launched_at'] < terminate) self.assert_(instance_ref['deleted_at'] > terminate) - @defer.inlineCallbacks def test_reboot(self): """Ensure instance can be rebooted""" instance_id = self._create_instance() - yield self.compute.run_instance(self.context, instance_id) - yield self.compute.reboot_instance(self.context, instance_id) - yield self.compute.terminate_instance(self.context, instance_id) + self.compute.run_instance(self.context, instance_id) + self.compute.reboot_instance(self.context, instance_id) + self.compute.terminate_instance(self.context, instance_id) - @defer.inlineCallbacks def test_console_output(self): """Make sure we can get console output from instance""" instance_id = self._create_instance() - yield self.compute.run_instance(self.context, instance_id) + self.compute.run_instance(self.context, instance_id) - console = yield self.compute.get_console_output(self.context, + console = self.compute.get_console_output(self.context, instance_id) self.assert_(console) - yield self.compute.terminate_instance(self.context, instance_id) + self.compute.terminate_instance(self.context, instance_id) - @defer.inlineCallbacks def test_run_instance_existing(self): """Ensure failure when running an instance that already exists""" instance_id = self._create_instance() - yield self.compute.run_instance(self.context, instance_id) - self.assertFailure(self.compute.run_instance(self.context, - instance_id), - exception.Error) - yield self.compute.terminate_instance(self.context, instance_id) + self.compute.run_instance(self.context, instance_id) + self.assertRaises(exception.Error, + self.compute.run_instance, + self.context, + instance_id) + self.compute.terminate_instance(self.context, instance_id) diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py index f35b65a39..c2ad5cd79 100644 --- a/nova/tests/rpc_unittest.py +++ b/nova/tests/rpc_unittest.py @@ -20,8 +20,6 @@ Unit Tests for remote procedure calls using queue """ import logging -from twisted.internet import defer - from nova import context from nova import flags from nova import rpc @@ -40,23 +38,22 @@ class RpcTestCase(test.TrialTestCase): self.consumer = rpc.AdapterConsumer(connection=self.conn, topic='test', proxy=self.receiver) - self.consumer.attach_to_twisted() + self.consumer.attach_to_eventlet() self.context = context.get_admin_context() def test_call_succeed(self): """Get a value through rpc call""" value = 42 - result = yield rpc.call_twisted(self.context, - 'test', {"method": "echo", + result = rpc.call(self.context, 'test', {"method": "echo", "args": {"value": value}}) self.assertEqual(value, result) def test_context_passed(self): """Makes sure a context is passed through rpc call""" value = 42 - result = yield rpc.call_twisted(self.context, - 'test', {"method": "context", - "args": {"value": value}}) + result = rpc.call(self.context, + 'test', {"method": "context", + "args": {"value": value}}) self.assertEqual(self.context.to_dict(), result) def test_call_exception(self): @@ -67,14 +64,17 @@ class RpcTestCase(test.TrialTestCase): to an int in the test. """ value = 42 - self.assertFailure(rpc.call_twisted(self.context, 'test', - {"method": "fail", - "args": {"value": value}}), - rpc.RemoteError) + self.assertRaises(rpc.RemoteError, + rpc.call, + self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) try: - yield rpc.call_twisted(self.context, - 'test', {"method": "fail", - "args": {"value": value}}) + rpc.call(self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) self.fail("should have thrown rpc.RemoteError") except rpc.RemoteError as exc: self.assertEqual(int(exc.value), value) @@ -89,13 +89,13 @@ class TestReceiver(object): def echo(context, value): """Simply returns whatever value is sent in""" logging.debug("Received %s", value) - return defer.succeed(value) + return value @staticmethod def context(context, value): """Returns dictionary version of context""" logging.debug("Received %s", context) - return defer.succeed(context.to_dict()) + return context.to_dict() @staticmethod def fail(context, value): diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py index a268bc4fe..4f8d2d550 100644 --- a/nova/tests/service_unittest.py +++ b/nova/tests/service_unittest.py @@ -143,7 +143,6 @@ class ServiceTestCase(test.TrialTestCase): # whether it is disconnected, it looks for a variable on itself called # 'model_disconnected' and report_state doesn't really do much so this # these are mostly just for coverage - @defer.inlineCallbacks def test_report_state_no_service(self): host = 'foo' binary = 'bar' @@ -174,9 +173,8 @@ class ServiceTestCase(test.TrialTestCase): topic, 'nova.tests.service_unittest.FakeManager') serv.startService() - yield serv.report_state() + serv.report_state() - @defer.inlineCallbacks def test_report_state_newly_disconnected(self): host = 'foo' binary = 'bar' @@ -205,10 +203,9 @@ class ServiceTestCase(test.TrialTestCase): topic, 'nova.tests.service_unittest.FakeManager') serv.startService() - yield serv.report_state() + serv.report_state() self.assert_(serv.model_disconnected) - @defer.inlineCallbacks def test_report_state_newly_connected(self): host = 'foo' binary = 'bar' @@ -240,6 +237,6 @@ class ServiceTestCase(test.TrialTestCase): 'nova.tests.service_unittest.FakeManager') serv.startService() serv.model_disconnected = True - yield serv.report_state() + serv.report_state() self.assert_(not serv.model_disconnected) diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py index d49383fb7..a4a8d3acf 100644 --- a/nova/tests/virt_unittest.py +++ b/nova/tests/virt_unittest.py @@ -235,7 +235,7 @@ class NWFilterTestCase(test.TrialTestCase): 'project_id': 'fake'}) inst_id = instance_ref['id'] - def _ensure_all_called(_): + def _ensure_all_called(): instance_filter = 'nova-instance-%s' % instance_ref['name'] secgroup_filter = 'nova-secgroup-%s' % self.security_group['id'] for required in [secgroup_filter, 'allow-dhcp-server', @@ -253,7 +253,6 @@ class NWFilterTestCase(test.TrialTestCase): instance = db.instance_get(self.context, inst_id) d = self.fw.setup_nwfilters_for_instance(instance) - d.addCallback(_ensure_all_called) - d.addCallback(lambda _: self.teardown_security_group()) - + _ensure_all_called() + self.teardown_security_group() return d diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py index 12321a96f..93d2ceab7 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/volume_unittest.py @@ -21,8 +21,6 @@ Tests for Volume Code. """ import logging -from twisted.internet import defer - from nova import context from nova import exception from nova import db @@ -56,51 +54,48 @@ class VolumeTestCase(test.TrialTestCase): vol['attach_status'] = "detached" return db.volume_create(context.get_admin_context(), vol)['id'] - @defer.inlineCallbacks def test_create_delete_volume(self): """Test volume can be created and deleted.""" volume_id = self._create_volume() - yield self.volume.create_volume(self.context, volume_id) + self.volume.create_volume(self.context, volume_id) self.assertEqual(volume_id, db.volume_get(context.get_admin_context(), volume_id).id) - yield self.volume.delete_volume(self.context, volume_id) + self.volume.delete_volume(self.context, volume_id) self.assertRaises(exception.NotFound, db.volume_get, self.context, volume_id) - @defer.inlineCallbacks def test_too_big_volume(self): """Ensure failure if a too large of a volume is requested.""" # FIXME(vish): validation needs to move into the data layer in # volume_create - defer.returnValue(True) + return True try: volume_id = self._create_volume('1001') - yield self.volume.create_volume(self.context, volume_id) + self.volume.create_volume(self.context, volume_id) self.fail("Should have thrown TypeError") except TypeError: pass - @defer.inlineCallbacks def test_too_many_volumes(self): """Ensure that NoMoreTargets is raised when we run out of volumes.""" vols = [] total_slots = FLAGS.iscsi_num_targets for _index in xrange(total_slots): volume_id = self._create_volume() - yield self.volume.create_volume(self.context, volume_id) + self.volume.create_volume(self.context, volume_id) vols.append(volume_id) volume_id = self._create_volume() - self.assertFailure(self.volume.create_volume(self.context, - volume_id), - db.NoMoreTargets) + self.assertRaises(db.NoMoreTargets, + self.volume.create_volume, + self.context, + volume_id) db.volume_destroy(context.get_admin_context(), volume_id) for volume_id in vols: - yield self.volume.delete_volume(self.context, volume_id) + self.volume.delete_volume(self.context, volume_id) - @defer.inlineCallbacks def test_run_attach_detach_volume(self): """Make sure volume can be attached and detached from instance.""" inst = {} @@ -115,15 +110,15 @@ class VolumeTestCase(test.TrialTestCase): instance_id = db.instance_create(self.context, inst)['id'] mountpoint = "/dev/sdf" volume_id = self._create_volume() - yield self.volume.create_volume(self.context, volume_id) + self.volume.create_volume(self.context, volume_id) if FLAGS.fake_tests: db.volume_attached(self.context, volume_id, instance_id, mountpoint) else: - yield self.compute.attach_volume(self.context, - instance_id, - volume_id, - mountpoint) + self.compute.attach_volume(self.context, + instance_id, + volume_id, + mountpoint) vol = db.volume_get(context.get_admin_context(), volume_id) self.assertEqual(vol['status'], "in-use") self.assertEqual(vol['attach_status'], "attached") @@ -131,25 +126,26 @@ class VolumeTestCase(test.TrialTestCase): instance_ref = db.volume_get_instance(self.context, volume_id) self.assertEqual(instance_ref['id'], instance_id) - self.assertFailure(self.volume.delete_volume(self.context, volume_id), - exception.Error) + self.assertRaises(exception.Error, + self.volume.delete_volume, + self.context, + volume_id) if FLAGS.fake_tests: db.volume_detached(self.context, volume_id) else: - yield self.compute.detach_volume(self.context, - instance_id, - volume_id) + self.compute.detach_volume(self.context, + instance_id, + volume_id) vol = db.volume_get(self.context, volume_id) self.assertEqual(vol['status'], "available") - yield self.volume.delete_volume(self.context, volume_id) + self.volume.delete_volume(self.context, volume_id) self.assertRaises(exception.Error, db.volume_get, self.context, volume_id) db.instance_destroy(self.context, instance_id) - @defer.inlineCallbacks def test_concurrent_volumes_get_different_targets(self): """Ensure multiple concurrent volumes get different targets.""" volume_ids = [] @@ -164,15 +160,11 @@ class VolumeTestCase(test.TrialTestCase): self.assert_(iscsi_target not in targets) targets.append(iscsi_target) logging.debug("Target %s allocated", iscsi_target) - deferreds = [] total_slots = FLAGS.iscsi_num_targets for _index in xrange(total_slots): volume_id = self._create_volume() d = self.volume.create_volume(self.context, volume_id) - d.addCallback(_check) - d.addErrback(self.fail) - deferreds.append(d) - yield defer.DeferredList(deferreds) + _check(d) for volume_id in volume_ids: self.volume.delete_volume(self.context, volume_id) diff --git a/nova/utils.py b/nova/utils.py index 66047ae8b..2c43203d8 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -34,8 +34,6 @@ from xml.sax import saxutils from eventlet import event from eventlet import greenthread -from twisted.internet.threads import deferToThread - from nova import exception from nova import flags from nova.exception import ProcessExecutionError @@ -78,7 +76,7 @@ def fetchfile(url, target): def execute(cmd, process_input=None, addl_env=None, check_exit_code=True): - logging.debug("Running cmd: %s", cmd) + logging.debug("Running cmd (subprocess): %s", cmd) env = os.environ.copy() if addl_env: env.update(addl_env) @@ -98,6 +96,10 @@ def execute(cmd, process_input=None, addl_env=None, check_exit_code=True): stdout=stdout, stderr=stderr, cmd=cmd) + # NOTE(termie): this appears to be necessary to let the subprocess call + # clean something up in between calls, without it two + # execute calls in a row hangs the second one + greenthread.sleep(0) return result @@ -126,13 +128,14 @@ def debug(arg): def runthis(prompt, cmd, check_exit_code=True): logging.debug("Running %s" % (cmd)) - exit_code = subprocess.call(cmd.split(" ")) - logging.debug(prompt % (exit_code)) - if check_exit_code and exit_code != 0: - raise ProcessExecutionError(exit_code=exit_code, - stdout=None, - stderr=None, - cmd=cmd) + rv, err = execute(cmd, check_exit_code=check_exit_code) + #exit_code = subprocess.call(cmd.split(" ")) + #logging.debug(prompt % (exit_code)) + #if check_exit_code and exit_code != 0: + # raise ProcessExecutionError(exit_code=exit_code, + # stdout=None, + # stderr=None, + # cmd=cmd) def generate_uid(topic, size=8): diff --git a/nova/virt/images.py b/nova/virt/images.py index 981aa5cf3..4d7c65f12 100644 --- a/nova/virt/images.py +++ b/nova/virt/images.py @@ -26,7 +26,7 @@ import time import urlparse from nova import flags -from nova import process +from nova import utils from nova.auth import manager from nova.auth import signer from nova.objectstore import image @@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project): cmd = ['/usr/bin/curl', '--fail', '--silent', url] for (k, v) in headers.iteritems(): - cmd += ['-H', '%s: %s' % (k, v)] + cmd += ['-H', '"%s: %s"' % (k, v)] cmd += ['-o', path] - return process.SharedPool().execute(executable=cmd[0], args=cmd[1:]) + cmd_out = ' '.join(cmd) + return utils.execute(cmd_out) def _fetch_local_image(image, path, user, project): source = _image_path('%s/image' % image) - return process.simple_execute('cp %s %s' % (source, path)) + return utils.execute('cp %s %s' % (source, path)) def _image_path(path): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index c09a7c01d..715e7234c 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -54,7 +54,6 @@ from nova import context from nova import db from nova import exception from nova import flags -from nova import process from nova import utils #from nova.api import context from nova.auth import manager @@ -366,8 +365,8 @@ class LibvirtConnection(object): if virsh_output.startswith('/dev/'): logging.info('cool, it\'s a device') - r = process.simple_execute("sudo dd if=%s iflag=nonblock" % - virsh_output, check_exit_code=False) + r = utils.execute("sudo dd if=%s iflag=nonblock" % + virsh_output, check_exit_code=False) return r[0] else: return '' @@ -389,13 +388,13 @@ class LibvirtConnection(object): console_log = os.path.join(FLAGS.instances_path, instance['name'], 'console.log') - process.simple_execute('sudo chown %d %s' % (os.getuid(), - console_log)) + utils.execute('sudo chown %d %s' % (os.getuid(), + console_log)) if FLAGS.libvirt_type == 'xen': # Xen is special - virsh_output = process.simple_execute("virsh ttyconsole %s" % - instance['name']) + virsh_output = utils.execute("virsh ttyconsole %s" % + instance['name']) data = self._flush_xen_console(virsh_output) fpath = self._append_to_file(data, console_log) else: @@ -411,8 +410,8 @@ class LibvirtConnection(object): prefix + fname) # ensure directories exist and are writable - process.simple_execute('mkdir -p %s' % basepath(prefix='')) - process.simple_execute('chmod 0777 %s' % basepath(prefix='')) + utils.execute('mkdir -p %s' % basepath(prefix='')) + utils.execute('chmod 0777 %s' % basepath(prefix='')) # TODO(termie): these are blocking calls, it would be great # if they weren't. @@ -443,9 +442,9 @@ class LibvirtConnection(object): project) def execute(cmd, process_input=None, check_exit_code=True): - return process.simple_execute(cmd=cmd, - process_input=process_input, - check_exit_code=check_exit_code) + return utils.execute(cmd=cmd, + process_input=process_input, + check_exit_code=check_exit_code) key = str(inst['key_data']) net = None @@ -471,7 +470,7 @@ class LibvirtConnection(object): execute=execute) if os.path.exists(basepath('disk')): - process.simple_execute('rm -f %s' % basepath('disk')) + utils.execute('rm -f %s' % basepath('disk')) local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type] ['local_gb'] @@ -485,8 +484,7 @@ class LibvirtConnection(object): local_bytes, resize, execute=execute) if FLAGS.libvirt_type == 'uml': - process.simple_execute('sudo chown root %s' % - basepath('disk')) + utils.execute('sudo chown root %s' % basepath('disk')) def to_xml(self, instance, rescue=False): # TODO(termie): cache? diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 99d484ca2..b72b8e13d 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -21,14 +21,13 @@ their attributes like VDIs, VIFs, as well as their lookup functions. import logging -from twisted.internet import defer - from nova import utils from nova.auth.manager import AuthManager from nova.compute import instance_types from nova.compute import power_state from nova.virt import images + XENAPI_POWER_STATE = { 'Halted': power_state.SHUTDOWN, 'Running': power_state.RUNNING, @@ -36,6 +35,7 @@ XENAPI_POWER_STATE = { 'Suspended': power_state.SHUTDOWN, # FIXME 'Crashed': power_state.CRASHED} + XenAPI = None @@ -49,7 +49,6 @@ class VMHelper(): XenAPI = __import__('XenAPI') @classmethod - @defer.inlineCallbacks def create_vm(cls, session, instance, kernel, ramdisk): """Create a VM record. Returns a Deferred that gives the new VM reference.""" @@ -87,12 +86,11 @@ class VMHelper(): 'other_config': {}, } logging.debug('Created VM %s...', instance.name) - vm_ref = yield session.call_xenapi('VM.create', rec) + vm_ref = session.call_xenapi('VM.create', rec) logging.debug('Created VM %s as %s.', instance.name, vm_ref) - defer.returnValue(vm_ref) + return vm_ref @classmethod - @defer.inlineCallbacks def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable): """Create a VBD record. Returns a Deferred that gives the new VBD reference.""" @@ -111,13 +109,12 @@ class VMHelper(): vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec) + vbd_ref = session.call_xenapi('VBD.create', vbd_rec) logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, vdi_ref) - defer.returnValue(vbd_ref) + return vbd_ref @classmethod - @defer.inlineCallbacks def create_vif(cls, session, vm_ref, network_ref, mac_address): """Create a VIF record. Returns a Deferred that gives the new VIF reference.""" @@ -133,13 +130,12 @@ class VMHelper(): vif_rec['qos_algorithm_params'] = {} logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, network_ref) - vif_ref = yield session.call_xenapi('VIF.create', vif_rec) + vif_ref = session.call_xenapi('VIF.create', vif_rec) logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, vm_ref, network_ref) - defer.returnValue(vif_ref) + return vif_ref @classmethod - @defer.inlineCallbacks def fetch_image(cls, session, image, user, project, use_sr): """use_sr: True to put the image as a VDI in an SR, False to place it on dom0's filesystem. The former is for VM disks, the latter for @@ -156,12 +152,11 @@ class VMHelper(): args['password'] = user.secret if use_sr: args['add_partition'] = 'true' - task = yield session.async_call_plugin('objectstore', fn, args) - uuid = yield session.wait_for_task(task) - defer.returnValue(uuid) + task = session.async_call_plugin('objectstore', fn, args) + uuid = session.wait_for_task(task) + return uuid @classmethod - @utils.deferredToThread def lookup(cls, session, i): """ Look the instance i up, and returns it if available """ return VMHelper.lookup_blocking(session, i) @@ -179,7 +174,6 @@ class VMHelper(): return vms[0] @classmethod - @utils.deferredToThread def lookup_vm_vdis(cls, session, vm): """ Look for the VDIs that are attached to the VM """ return VMHelper.lookup_vm_vdis_blocking(session, vm) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index dacf9fe2b..96d211cc0 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -167,7 +167,6 @@ class XenAPISession(object): self.get_xenapi_host(), plugin, fn, args) def wait_for_task(self, task): ->>>>>>> MERGE-SOURCE """Return a Deferred that will give the result of the given task. The task is polled until it completes.""" diff --git a/nova/volume/driver.py b/nova/volume/driver.py index 156aad2a0..f675c9132 100644 --- a/nova/volume/driver.py +++ b/nova/volume/driver.py @@ -23,8 +23,6 @@ Drivers for volumes. import logging import os -from twisted.internet import defer - from nova import exception from nova import flags from nova import process @@ -55,14 +53,13 @@ flags.DEFINE_string('iscsi_ip_prefix', '127.0', class VolumeDriver(object): """Executes commands relating to Volumes.""" - def __init__(self, execute=process.simple_execute, + def __init__(self, execute=utils.execute, sync_exec=utils.execute, *args, **kwargs): # NOTE(vish): db is set by Manager self.db = None self._execute = execute self._sync_exec = sync_exec - @defer.inlineCallbacks def _try_execute(self, command): # NOTE(vish): Volume commands can partially fail due to timing, but # running them a second time on failure will usually @@ -70,15 +67,15 @@ class VolumeDriver(object): tries = 0 while True: try: - yield self._execute(command) - defer.returnValue(True) + self._execute(command) + return True except exception.ProcessExecutionError: tries = tries + 1 if tries >= FLAGS.num_shell_tries: raise logging.exception("Recovering from a failed execute." "Try number %s", tries) - yield self._execute("sleep %s" % tries ** 2) + self._execute("sleep %s" % tries ** 2) def check_for_setup_error(self): """Returns an error if prerequisites aren't met""" @@ -86,53 +83,46 @@ class VolumeDriver(object): raise exception.Error("volume group %s doesn't exist" % FLAGS.volume_group) - @defer.inlineCallbacks def create_volume(self, volume): """Creates a logical volume.""" if int(volume['size']) == 0: sizestr = '100M' else: sizestr = '%sG' % volume['size'] - yield self._try_execute("sudo lvcreate -L %s -n %s %s" % - (sizestr, + self._try_execute("sudo lvcreate -L %s -n %s %s" % + (sizestr, volume['name'], FLAGS.volume_group)) - @defer.inlineCallbacks def delete_volume(self, volume): """Deletes a logical volume.""" - yield self._try_execute("sudo lvremove -f %s/%s" % - (FLAGS.volume_group, + self._try_execute("sudo lvremove -f %s/%s" % + (FLAGS.volume_group, volume['name'])) - @defer.inlineCallbacks def local_path(self, volume): - yield # NOTE(vish): stops deprecation warning + # NOTE(vish): stops deprecation warning escaped_group = FLAGS.volume_group.replace('-', '--') escaped_name = volume['name'].replace('-', '--') - defer.returnValue("/dev/mapper/%s-%s" % (escaped_group, - escaped_name)) + return "/dev/mapper/%s-%s" % (escaped_group, + escaped_name) def ensure_export(self, context, volume): """Synchronously recreates an export for a logical volume.""" raise NotImplementedError() - @defer.inlineCallbacks def create_export(self, context, volume): """Exports the volume.""" raise NotImplementedError() - @defer.inlineCallbacks def remove_export(self, context, volume): """Removes an export for a logical volume.""" raise NotImplementedError() - @defer.inlineCallbacks def discover_volume(self, volume): """Discover volume on a remote host.""" raise NotImplementedError() - @defer.inlineCallbacks def undiscover_volume(self, volume): """Undiscover volume on a remote host.""" raise NotImplementedError() @@ -155,14 +145,13 @@ class AOEDriver(VolumeDriver): dev = {'shelf_id': shelf_id, 'blade_id': blade_id} self.db.export_device_create_safe(context, dev) - @defer.inlineCallbacks def create_export(self, context, volume): """Creates an export for a logical volume.""" self._ensure_blades(context) (shelf_id, blade_id) = self.db.volume_allocate_shelf_and_blade(context, volume['id']) - yield self._try_execute( + self._try_execute( "sudo vblade-persist setup %s %s %s /dev/%s/%s" % (shelf_id, blade_id, @@ -176,33 +165,30 @@ class AOEDriver(VolumeDriver): # still works for the other volumes, so we # just wait a bit for the current volume to # be ready and ignore any errors. - yield self._execute("sleep 2") - yield self._execute("sudo vblade-persist auto all", - check_exit_code=False) - yield self._execute("sudo vblade-persist start all", - check_exit_code=False) + self._execute("sleep 2") + self._execute("sudo vblade-persist auto all", + check_exit_code=False) + self._execute("sudo vblade-persist start all", + check_exit_code=False) - @defer.inlineCallbacks def remove_export(self, context, volume): """Removes an export for a logical volume.""" (shelf_id, blade_id) = self.db.volume_get_shelf_and_blade(context, volume['id']) - yield self._try_execute("sudo vblade-persist stop %s %s" % - (shelf_id, blade_id)) - yield self._try_execute("sudo vblade-persist destroy %s %s" % - (shelf_id, blade_id)) + self._try_execute("sudo vblade-persist stop %s %s" % + (shelf_id, blade_id)) + self._try_execute("sudo vblade-persist destroy %s %s" % + (shelf_id, blade_id)) - @defer.inlineCallbacks def discover_volume(self, _volume): """Discover volume on a remote host.""" - yield self._execute("sudo aoe-discover") - yield self._execute("sudo aoe-stat", check_exit_code=False) + self._execute("sudo aoe-discover") + self._execute("sudo aoe-stat", check_exit_code=False) - @defer.inlineCallbacks def undiscover_volume(self, _volume): """Undiscover volume on a remote host.""" - yield + pass class FakeAOEDriver(AOEDriver): @@ -252,7 +238,6 @@ class ISCSIDriver(VolumeDriver): target = {'host': host, 'target_num': target_num} self.db.iscsi_target_create_safe(context, target) - @defer.inlineCallbacks def create_export(self, context, volume): """Creates an export for a logical volume.""" self._ensure_iscsi_targets(context, volume['host']) @@ -261,61 +246,57 @@ class ISCSIDriver(VolumeDriver): volume['host']) iscsi_name = "%s%s" % (FLAGS.iscsi_target_prefix, volume['name']) volume_path = "/dev/%s/%s" % (FLAGS.volume_group, volume['name']) - yield self._execute("sudo ietadm --op new " - "--tid=%s --params Name=%s" % - (iscsi_target, iscsi_name)) - yield self._execute("sudo ietadm --op new --tid=%s " - "--lun=0 --params Path=%s,Type=fileio" % - (iscsi_target, volume_path)) - - @defer.inlineCallbacks + self._execute("sudo ietadm --op new " + "--tid=%s --params Name=%s" % + (iscsi_target, iscsi_name)) + self._execute("sudo ietadm --op new --tid=%s " + "--lun=0 --params Path=%s,Type=fileio" % + (iscsi_target, volume_path)) + def remove_export(self, context, volume): """Removes an export for a logical volume.""" iscsi_target = self.db.volume_get_iscsi_target_num(context, volume['id']) - yield self._execute("sudo ietadm --op delete --tid=%s " - "--lun=0" % iscsi_target) - yield self._execute("sudo ietadm --op delete --tid=%s" % - iscsi_target) + self._execute("sudo ietadm --op delete --tid=%s " + "--lun=0" % iscsi_target) + self._execute("sudo ietadm --op delete --tid=%s" % + iscsi_target) - @defer.inlineCallbacks def _get_name_and_portal(self, volume_name, host): """Gets iscsi name and portal from volume name and host.""" - (out, _err) = yield self._execute("sudo iscsiadm -m discovery -t " - "sendtargets -p %s" % host) + (out, _err) = self._execute("sudo iscsiadm -m discovery -t " + "sendtargets -p %s" % host) for target in out.splitlines(): if FLAGS.iscsi_ip_prefix in target and volume_name in target: (location, _sep, iscsi_name) = target.partition(" ") break iscsi_portal = location.split(",")[0] - defer.returnValue((iscsi_name, iscsi_portal)) + return (iscsi_name, iscsi_portal) - @defer.inlineCallbacks def discover_volume(self, volume): """Discover volume on a remote host.""" (iscsi_name, - iscsi_portal) = yield self._get_name_and_portal(volume['name'], - volume['host']) - yield self._execute("sudo iscsiadm -m node -T %s -p %s --login" % - (iscsi_name, iscsi_portal)) - yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update " - "-n node.startup -v automatic" % - (iscsi_name, iscsi_portal)) - defer.returnValue("/dev/iscsi/%s" % volume['name']) - - @defer.inlineCallbacks + iscsi_portal) = self._get_name_and_portal(volume['name'], + volume['host']) + self._execute("sudo iscsiadm -m node -T %s -p %s --login" % + (iscsi_name, iscsi_portal)) + self._execute("sudo iscsiadm -m node -T %s -p %s --op update " + "-n node.startup -v automatic" % + (iscsi_name, iscsi_portal)) + return "/dev/iscsi/%s" % volume['name'] + def undiscover_volume(self, volume): """Undiscover volume on a remote host.""" (iscsi_name, - iscsi_portal) = yield self._get_name_and_portal(volume['name'], - volume['host']) - yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update " - "-n node.startup -v manual" % - (iscsi_name, iscsi_portal)) - yield self._execute("sudo iscsiadm -m node -T %s -p %s --logout " % - (iscsi_name, iscsi_portal)) - yield self._execute("sudo iscsiadm -m node --op delete " - "--targetname %s" % iscsi_name) + iscsi_portal) = self._get_name_and_portal(volume['name'], + volume['host']) + self._execute("sudo iscsiadm -m node -T %s -p %s --op update " + "-n node.startup -v manual" % + (iscsi_name, iscsi_portal)) + self._execute("sudo iscsiadm -m node -T %s -p %s --logout " % + (iscsi_name, iscsi_portal)) + self._execute("sudo iscsiadm -m node --op delete " + "--targetname %s" % iscsi_name) class FakeISCSIDriver(ISCSIDriver): diff --git a/run_tests.py b/run_tests.py index 3d427d8af..883d2b768 100644 --- a/run_tests.py +++ b/run_tests.py @@ -39,11 +39,16 @@ Due to our use of multiprocessing it we frequently get some ignorable """ +import eventlet +eventlet.monkey_patch() + import __main__ import os import sys + from twisted.scripts import trial as trial_script +import unittest from nova import flags from nova import twistd @@ -56,12 +61,12 @@ from nova.tests.compute_unittest import * from nova.tests.flags_unittest import * from nova.tests.misc_unittest import * from nova.tests.network_unittest import * -from nova.tests.objectstore_unittest import * -from nova.tests.process_unittest import * +#from nova.tests.objectstore_unittest import * +#from nova.tests.process_unittest import * from nova.tests.quota_unittest import * from nova.tests.rpc_unittest import * from nova.tests.scheduler_unittest import * -from nova.tests.service_unittest import * +#from nova.tests.service_unittest import * from nova.tests.twistd_unittest import * from nova.tests.validator_unittest import * from nova.tests.virt_unittest import * @@ -82,6 +87,8 @@ if __name__ == '__main__': config = OptionsClass() argv = config.parseOptions() + argv = FLAGS(sys.argv) + FLAGS.verbose = True # TODO(termie): these should make a call instead of doing work on import @@ -90,6 +97,7 @@ if __name__ == '__main__': else: from nova.tests.real_flags import * + # Establish redirect for STDERR sys.stderr.flush() err = open(FLAGS.tests_stderr, 'w+', 0) |