diff options
| author | jaypipes@gmail.com <> | 2010-12-17 11:10:11 -0500 |
|---|---|---|
| committer | jaypipes@gmail.com <> | 2010-12-17 11:10:11 -0500 |
| commit | afae367d63c6d38faa5d9be54725df29c2a5b903 (patch) | |
| tree | e2282e9bbc6ef6d4c5f339a4fde38127a619f354 /nova/virt | |
| parent | d283922defdda6ede5fa2e09656cd8d411a90096 (diff) | |
| parent | cd460a1f661eea7e050891f50a8218fdf24f2c6f (diff) | |
| download | nova-afae367d63c6d38faa5d9be54725df29c2a5b903.tar.gz nova-afae367d63c6d38faa5d9be54725df29c2a5b903.tar.xz nova-afae367d63c6d38faa5d9be54725df29c2a5b903.zip | |
Merge eventlet and resolve all conflicts
Diffstat (limited to 'nova/virt')
| -rw-r--r-- | nova/virt/fake.py | 10 | ||||
| -rw-r--r-- | nova/virt/images.py | 11 | ||||
| -rw-r--r-- | nova/virt/libvirt_conn.py | 177 | ||||
| -rw-r--r-- | nova/virt/xenapi/network_utils.py | 10 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 28 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 67 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 45 |
7 files changed, 153 insertions, 195 deletions
diff --git a/nova/virt/fake.py b/nova/virt/fake.py index 26b01af91..b7bb0d7d7 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -25,8 +25,6 @@ semantics of real hypervisor connections. """ -from twisted.internet import defer - from nova import exception from nova.compute import power_state @@ -107,7 +105,6 @@ class FakeConnection(object): fake_instance = FakeInstance() self.instances[instance.name] = fake_instance fake_instance._state = power_state.RUNNING - return defer.succeed(None) def reboot(self, instance): """ @@ -119,19 +116,19 @@ class FakeConnection(object): The work will be done asynchronously. This function returns a Deferred that allows the caller to detect when it is complete. """ - return defer.succeed(None) + pass def rescue(self, instance): """ Rescue the specified instance. """ - return defer.succeed(None) + pass def unrescue(self, instance): """ Unrescue the specified instance. """ - return defer.succeed(None) + pass def destroy(self, instance): """ @@ -144,7 +141,6 @@ class FakeConnection(object): Deferred that allows the caller to detect when it is complete. """ del self.instances[instance.name] - return defer.succeed(None) def attach_volume(self, instance_name, device_path, mountpoint): """Attach the disk at device_path to the instance at mountpoint""" diff --git a/nova/virt/images.py b/nova/virt/images.py index 981aa5cf3..1c9b2e093 100644 --- a/nova/virt/images.py +++ b/nova/virt/images.py @@ -26,7 +26,7 @@ import time import urlparse from nova import flags -from nova import process +from nova import utils from nova.auth import manager from nova.auth import signer from nova.objectstore import image @@ -50,7 +50,7 @@ def _fetch_s3_image(image, path, user, project): # This should probably move somewhere else, like e.g. a download_as # method on User objects and at the same time get rewritten to use - # twisted web client. + # a web client. headers = {} headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) @@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project): cmd = ['/usr/bin/curl', '--fail', '--silent', url] for (k, v) in headers.iteritems(): - cmd += ['-H', '%s: %s' % (k, v)] + cmd += ['-H', '"%s: %s"' % (k, v)] cmd += ['-o', path] - return process.SharedPool().execute(executable=cmd[0], args=cmd[1:]) + cmd_out = ' '.join(cmd) + return utils.execute(cmd_out) def _fetch_local_image(image, path, user, project): source = _image_path('%s/image' % image) - return process.simple_execute('cp %s %s' % (source, path)) + return utils.execute('cp %s %s' % (source, path)) def _image_path(path): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index bc435f4b5..f485d078e 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -45,16 +45,15 @@ import logging import os import shutil +from eventlet import event +from eventlet import tpool + import IPy -from twisted.internet import defer -from twisted.internet import task -from twisted.internet import threads from nova import context from nova import db from nova import exception from nova import flags -from nova import process from nova import utils #from nova.api import context from nova.auth import manager @@ -184,14 +183,12 @@ class LibvirtConnection(object): except Exception as _err: pass # If the instance is already terminated, we're still happy - d = defer.Deferred() - if cleanup: - d.addCallback(lambda _: self._cleanup(instance)) - # FIXME: What does this comment mean? - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, + + done = event.Event() + + # We'll save this for when we do shutdown, # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_shutdown(): try: @@ -200,17 +197,26 @@ class LibvirtConnection(object): instance['id'], state) if state == power_state.SHUTDOWN: timer.stop() - d.callback(None) except Exception: db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d + timer_done = timer.start(interval=0.5, now=True) + + # NOTE(termie): this is strictly superfluous (we could put the + # cleanup code in the timer), but this emulates the + # previous model so I am keeping it around until + # everything has been vetted a bit + def _wait_for_timer(): + timer_done.wait() + self._cleanup(instance) + done.send() + + greenthread.spawn(_wait_for_timer) + return done def _cleanup(self, instance): target = os.path.join(FLAGS.instances_path, instance['name']) @@ -219,7 +225,6 @@ class LibvirtConnection(object): if os.path.exists(target): shutil.rmtree(target) - @defer.inlineCallbacks @exception.wrap_exception def attach_volume(self, instance_name, device_path, mountpoint): virt_dom = self._conn.lookupByName(instance_name) @@ -230,7 +235,6 @@ class LibvirtConnection(object): <target dev='%s' bus='virtio'/> </disk>""" % (device_path, mount_device) virt_dom.attachDevice(xml) - yield def _get_disk_xml(self, xml, device): """Returns the xml for the disk mounted at device""" @@ -252,7 +256,6 @@ class LibvirtConnection(object): if doc != None: doc.freeDoc() - @defer.inlineCallbacks @exception.wrap_exception def detach_volume(self, instance_name, mountpoint): virt_dom = self._conn.lookupByName(instance_name) @@ -261,17 +264,13 @@ class LibvirtConnection(object): if not xml: raise exception.NotFound(_("No disk at %s") % mount_device) virt_dom.detachDevice(xml) - yield - @defer.inlineCallbacks @exception.wrap_exception def reboot(self, instance): - yield self.destroy(instance, False) + self.destroy(instance, False) xml = self.to_xml(instance) - yield self._conn.createXML(xml, 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) + self._conn.createXML(xml, 0) + timer = utils.LoopingCall(f=None) def _wait_for_reboot(): try: @@ -281,33 +280,28 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug(_('instance %s: rebooted'), instance['name']) timer.stop() - d.callback(None) except Exception, exn: logging.error(_('_wait_for_reboot failed: %s'), exn) db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d + return timer.start(interval=0.5, now=True) - @defer.inlineCallbacks @exception.wrap_exception def rescue(self, instance): - yield self.destroy(instance, False) + self.destroy(instance, False) xml = self.to_xml(instance, rescue=True) rescue_images = {'image_id': FLAGS.rescue_image_id, 'kernel_id': FLAGS.rescue_kernel_id, 'ramdisk_id': FLAGS.rescue_ramdisk_id} - yield self._create_image(instance, xml, 'rescue-', rescue_images) - yield self._conn.createXML(xml, 0) + self._create_image(instance, xml, 'rescue-', rescue_images) + self._conn.createXML(xml, 0) - d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_rescue(): try: @@ -316,27 +310,22 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug(_('instance %s: rescued'), instance['name']) timer.stop() - d.callback(None) except Exception, exn: logging.error(_('_wait_for_rescue failed: %s'), exn) db.instance_set_state(None, instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_rescue - timer.start(interval=0.5, now=True) - yield d + return timer.start(interval=0.5, now=True) - @defer.inlineCallbacks @exception.wrap_exception def unrescue(self, instance): # NOTE(vish): Because reboot destroys and recreates an instance using # the normal xml file, we can just call reboot here - yield self.reboot(instance) + self.reboot(instance) - @defer.inlineCallbacks @exception.wrap_exception def spawn(self, instance): xml = self.to_xml(instance) @@ -344,14 +333,12 @@ class LibvirtConnection(object): instance['id'], power_state.NOSTATE, 'launching') - yield NWFilterFirewall(self._conn).\ - setup_nwfilters_for_instance(instance) - yield self._create_image(instance, xml) - yield self._conn.createXML(xml, 0) + NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance) + self._create_image(instance, xml) + self._conn.createXML(xml, 0) logging.debug(_("instance %s: is running"), instance['name']) - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_boot(): try: @@ -361,7 +348,6 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug(_('instance %s: booted'), instance['name']) timer.stop() - local_d.callback(None) except: logging.exception(_('instance %s: failed to boot'), instance['name']) @@ -369,10 +355,9 @@ class LibvirtConnection(object): instance['id'], power_state.SHUTDOWN) timer.stop() - local_d.callback(None) + timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - yield local_d + return timer.start(interval=0.5, now=True) def _flush_xen_console(self, virsh_output): logging.info('virsh said: %r' % (virsh_output,)) @@ -380,10 +365,9 @@ class LibvirtConnection(object): if virsh_output.startswith('/dev/'): logging.info(_('cool, it\'s a device')) - d = process.simple_execute("sudo dd if=%s iflag=nonblock" % - virsh_output, check_exit_code=False) - d.addCallback(lambda r: r[0]) - return d + out, err = utils.execute("sudo dd if=%s iflag=nonblock" % + virsh_output, check_exit_code=False) + return out else: return '' @@ -403,21 +387,20 @@ class LibvirtConnection(object): def get_console_output(self, instance): console_log = os.path.join(FLAGS.instances_path, instance['name'], 'console.log') - d = process.simple_execute('sudo chown %d %s' % (os.getuid(), - console_log)) + + utils.execute('sudo chown %d %s' % (os.getuid(), console_log)) + if FLAGS.libvirt_type == 'xen': - # Xen is spethial - d.addCallback(lambda _: - process.simple_execute("virsh ttyconsole %s" % - instance['name'])) - d.addCallback(self._flush_xen_console) - d.addCallback(self._append_to_file, console_log) + # Xen is special + virsh_output = utils.execute("virsh ttyconsole %s" % + instance['name']) + data = self._flush_xen_console(virsh_output) + fpath = self._append_to_file(data, console_log) else: - d.addCallback(lambda _: defer.succeed(console_log)) - d.addCallback(self._dump_file) - return d + fpath = console_log + + return self._dump_file(fpath) - @defer.inlineCallbacks def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None): # syntactic nicety basepath = lambda fname = '', prefix = prefix: os.path.join( @@ -426,8 +409,8 @@ class LibvirtConnection(object): prefix + fname) # ensure directories exist and are writable - yield process.simple_execute('mkdir -p %s' % basepath(prefix='')) - yield process.simple_execute('chmod 0777 %s' % basepath(prefix='')) + utils.execute('mkdir -p %s' % basepath(prefix='')) + utils.execute('chmod 0777 %s' % basepath(prefix='')) # TODO(termie): these are blocking calls, it would be great # if they weren't. @@ -448,19 +431,19 @@ class LibvirtConnection(object): 'kernel_id': inst['kernel_id'], 'ramdisk_id': inst['ramdisk_id']} if not os.path.exists(basepath('disk')): - yield images.fetch(inst.image_id, basepath('disk-raw'), user, - project) + images.fetch(inst.image_id, basepath('disk-raw'), user, + project) if not os.path.exists(basepath('kernel')): - yield images.fetch(inst.kernel_id, basepath('kernel'), user, - project) + images.fetch(inst.kernel_id, basepath('kernel'), user, + project) if not os.path.exists(basepath('ramdisk')): - yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, - project) + images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, + project) - execute = lambda cmd, process_input = None, check_exit_code = True: \ - process.simple_execute(cmd=cmd, - process_input=process_input, - check_exit_code=check_exit_code) + def execute(cmd, process_input=None, check_exit_code=True): + return utils.execute(cmd=cmd, + process_input=process_input, + check_exit_code=check_exit_code) key = str(inst['key_data']) net = None @@ -482,11 +465,11 @@ class LibvirtConnection(object): if net: logging.info(_('instance %s: injecting net into image %s'), inst['name'], inst.image_id) - yield disk.inject_data(basepath('disk-raw'), key, net, - execute=execute) + disk.inject_data(basepath('disk-raw'), key, net, + execute=execute) if os.path.exists(basepath('disk')): - yield process.simple_execute('rm -f %s' % basepath('disk')) + utils.execute('rm -f %s' % basepath('disk')) local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type] ['local_gb'] @@ -495,12 +478,11 @@ class LibvirtConnection(object): resize = True if inst['instance_type'] == 'm1.tiny' or prefix == 'rescue-': resize = False - yield disk.partition(basepath('disk-raw'), basepath('disk'), - local_bytes, resize, execute=execute) + disk.partition(basepath('disk-raw'), basepath('disk'), + local_bytes, resize, execute=execute) if FLAGS.libvirt_type == 'uml': - yield process.simple_execute('sudo chown root %s' % - basepath('disk')) + utils.execute('sudo chown root %s' % basepath('disk')) def to_xml(self, instance, rescue=False): # TODO(termie): cache? @@ -761,15 +743,15 @@ class NWFilterFirewall(object): def _define_filter(self, xml): if callable(xml): xml = xml() - d = threads.deferToThread(self._conn.nwfilterDefineXML, xml) - return d + + # execute in a native thread and block current greenthread until done + tpool.execute(self._conn.nwfilterDefineXML, xml) @staticmethod def _get_net_and_mask(cidr): net = IPy.IP(cidr) return str(net.net()), str(net.netmask()) - @defer.inlineCallbacks def setup_nwfilters_for_instance(self, instance): """ Creates an NWFilter for the given instance. In the process, @@ -777,10 +759,10 @@ class NWFilterFirewall(object): the base filter are all in place. """ - yield self._define_filter(self.nova_base_ipv4_filter) - yield self._define_filter(self.nova_base_ipv6_filter) - yield self._define_filter(self.nova_dhcp_filter) - yield self._define_filter(self.nova_base_filter) + self._define_filter(self.nova_base_ipv4_filter) + self._define_filter(self.nova_base_ipv6_filter) + self._define_filter(self.nova_dhcp_filter) + self._define_filter(self.nova_base_filter) nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \ " <filterref filter='nova-base' />\n" % \ @@ -792,20 +774,19 @@ class NWFilterFirewall(object): net, mask = self._get_net_and_mask(network_ref['cidr']) project_filter = self.nova_project_filter(instance['project_id'], net, mask) - yield self._define_filter(project_filter) + self._define_filter(project_filter) nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \ instance['project_id'] for security_group in instance.security_groups: - yield self.ensure_security_group_filter(security_group['id']) + self.ensure_security_group_filter(security_group['id']) nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \ security_group['id'] nwfilter_xml += "</filter>" - yield self._define_filter(nwfilter_xml) - return + self._define_filter(nwfilter_xml) def ensure_security_group_filter(self, security_group_id): return self._define_filter( diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py index 8cb4cce3a..012954394 100644 --- a/nova/virt/xenapi/network_utils.py +++ b/nova/virt/xenapi/network_utils.py @@ -20,8 +20,6 @@ records and their attributes like bridges, PIFs, QoS, as well as their lookup functions. """ -from twisted.internet import defer - class NetworkHelper(): """ @@ -31,14 +29,12 @@ class NetworkHelper(): return @classmethod - @defer.inlineCallbacks def find_network_with_bridge(cls, session, bridge): - """ Return the network on which the bridge is attached, if found """ + """ Return the network on which the bridge is attached, if found.""" expr = 'field "bridge" = "%s"' % bridge - networks = yield session.call_xenapi('network.get_all_records_where', - expr) + networks = session.call_xenapi('network.get_all_records_where', expr) if len(networks) == 1: - defer.returnValue(networks.keys()[0]) + return networks.keys()[0] elif len(networks) > 1: raise Exception('Found non-unique network for bridge %s' % bridge) else: diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 77edb576e..2f5d78e75 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -21,18 +21,16 @@ their attributes like VDIs, VIFs, as well as their lookup functions. import logging import urllib - -from twisted.internet import defer from xml.dom import minidom from nova import flags from nova import utils - from nova.auth.manager import AuthManager from nova.compute import instance_types from nova.compute import power_state from nova.virt import images + FLAGS = flags.FLAGS XENAPI_POWER_STATE = { @@ -64,7 +62,6 @@ class VMHelper(): XenAPI = __import__('XenAPI') @classmethod - @defer.inlineCallbacks def create_vm(cls, session, instance, kernel, ramdisk): """Create a VM record. Returns a Deferred that gives the new VM reference.""" @@ -102,12 +99,11 @@ class VMHelper(): 'other_config': {}, } logging.debug('Created VM %s...', instance.name) - vm_ref = yield session.call_xenapi('VM.create', rec) + vm_ref = session.call_xenapi('VM.create', rec) logging.debug('Created VM %s as %s.', instance.name, vm_ref) - defer.returnValue(vm_ref) + return vm_ref @classmethod - @defer.inlineCallbacks def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable): """Create a VBD record. Returns a Deferred that gives the new VBD reference.""" @@ -126,13 +122,12 @@ class VMHelper(): vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec) + vbd_ref = session.call_xenapi('VBD.create', vbd_rec) logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, vdi_ref) - defer.returnValue(vbd_ref) + return vbd_ref @classmethod - @defer.inlineCallbacks def create_vif(cls, session, vm_ref, network_ref, mac_address): """Create a VIF record. Returns a Deferred that gives the new VIF reference.""" @@ -148,13 +143,12 @@ class VMHelper(): vif_rec['qos_algorithm_params'] = {} logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, network_ref) - vif_ref = yield session.call_xenapi('VIF.create', vif_rec) + vif_ref = session.call_xenapi('VIF.create', vif_rec) logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, vm_ref, network_ref) - defer.returnValue(vif_ref) + return vif_ref @classmethod - @defer.inlineCallbacks def fetch_image(cls, session, image, user, project, use_sr): """use_sr: True to put the image as a VDI in an SR, False to place it on dom0's filesystem. The former is for VM disks, the latter for @@ -171,12 +165,11 @@ class VMHelper(): args['password'] = user.secret if use_sr: args['add_partition'] = 'true' - task = yield session.async_call_plugin('objectstore', fn, args) - uuid = yield session.wait_for_task(task) - defer.returnValue(uuid) + task = session.async_call_plugin('objectstore', fn, args) + uuid = session.wait_for_task(task) + return uuid @classmethod - @utils.deferredToThread def lookup(cls, session, i): """ Look the instance i up, and returns it if available """ return VMHelper.lookup_blocking(session, i) @@ -194,7 +187,6 @@ class VMHelper(): return vms[0] @classmethod - @utils.deferredToThread def lookup_vm_vdis(cls, session, vm): """ Look for the VDIs that are attached to the VM """ return VMHelper.lookup_vm_vdis_blocking(session, vm) diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 9bfd07267..3034df9e1 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -20,8 +20,6 @@ Management class for VM-related functions (spawn, reboot, etc). import logging -from twisted.internet import defer - from nova import db from nova import context @@ -49,10 +47,9 @@ class VMOps(object): return [self._session.get_xenapi().VM.get_name_label(vm) \ for vm in self._session.get_xenapi().VM.get_all()] - @defer.inlineCallbacks def spawn(self, instance): """ Create VM instance """ - vm = yield VMHelper.lookup(self._session, instance.name) + vm = VMHelper.lookup(self._session, instance.name) if vm is not None: raise Exception('Attempted to create non-unique name %s' % instance.name) @@ -60,66 +57,63 @@ class VMOps(object): bridge = db.project_get_network(context.get_admin_context(), instance.project_id).bridge network_ref = \ - yield NetworkHelper.find_network_with_bridge(self._session, bridge) + NetworkHelper.find_network_with_bridge(self._session, bridge) user = AuthManager().get_user(instance.user_id) project = AuthManager().get_project(instance.project_id) - vdi_uuid = yield VMHelper.fetch_image(self._session, - instance.image_id, user, project, True) - kernel = yield VMHelper.fetch_image(self._session, - instance.kernel_id, user, project, False) - ramdisk = yield VMHelper.fetch_image(self._session, - instance.ramdisk_id, user, project, False) - vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid) - vm_ref = yield VMHelper.create_vm(self._session, - instance, kernel, ramdisk) - yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True) + vdi_uuid = VMHelper.fetch_image( + self._session, instance.image_id, user, project, True) + kernel = VMHelper.fetch_image( + self._session, instance.kernel_id, user, project, False) + ramdisk = VMHelper.fetch_image( + self._session, instance.ramdisk_id, user, project, False) + vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid) + vm_ref = VMHelper.create_vm( + self._session, instance, kernel, ramdisk) + VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True) if network_ref: - yield VMHelper.create_vif(self._session, vm_ref, - network_ref, instance.mac_address) + VMHelper.create_vif(self._session, vm_ref, + network_ref, instance.mac_address) logging.debug('Starting VM %s...', vm_ref) - yield self._session.call_xenapi('VM.start', vm_ref, False, False) + self._session.call_xenapi('VM.start', vm_ref, False, False) logging.info('Spawning VM %s created %s.', instance.name, vm_ref) - @defer.inlineCallbacks def reboot(self, instance): """ Reboot VM instance """ instance_name = instance.name - vm = yield VMHelper.lookup(self._session, instance_name) + vm = VMHelper.lookup(self._session, instance_name) if vm is None: raise Exception('instance not present %s' % instance_name) - task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm) - yield self._session.wait_for_task(task) + task = self._session.call_xenapi('Async.VM.clean_reboot', vm) + self._session.wait_for_task(task) - @defer.inlineCallbacks def destroy(self, instance): """ Destroy VM instance """ - vm = yield VMHelper.lookup(self._session, instance.name) + vm = VMHelper.lookup(self._session, instance.name) if vm is None: # Don't complain, just return. This lets us clean up instances # that have already disappeared from the underlying platform. - defer.returnValue(None) + return # Get the VDIs related to the VM - vdis = yield VMHelper.lookup_vm_vdis(self._session, vm) + vdis = VMHelper.lookup_vm_vdis(self._session, vm) try: - task = yield self._session.call_xenapi('Async.VM.hard_shutdown', + task = self._session.call_xenapi('Async.VM.hard_shutdown', vm) - yield self._session.wait_for_task(task) + self._session.wait_for_task(task) except XenAPI.Failure, exc: logging.warn(exc) # Disk clean-up if vdis: for vdi in vdis: try: - task = yield self._session.call_xenapi('Async.VDI.destroy', - vdi) - yield self._session.wait_for_task(task) + task = self._session.call_xenapi('Async.VDI.destroy', vdi) + self._session.wait_for_task(task) except XenAPI.Failure, exc: logging.warn(exc) try: - task = yield self._session.call_xenapi('Async.VM.destroy', vm) - yield self._session.wait_for_task(task) + task = self._session.call_xenapi('Async.VM.destroy', vm) + self._session.wait_for_task(task) except XenAPI.Failure, exc: logging.warn(exc) @@ -131,14 +125,13 @@ class VMOps(object): rec = self._session.get_xenapi().VM.get_record(vm) return VMHelper.compile_info(rec) - @defer.inlineCallbacks def get_diagnostics(self, instance_id): """Return data about VM diagnostics""" - vm = yield VMHelper.lookup(self._session, instance_id) + vm = VMHelper.lookup(self._session, instance_id) if vm is None: raise Exception("instance not present %s" % instance_id) - rec = yield self._session.get_xenapi().VM.get_record(vm) - defer.returnValue(VMHelper.compile_diagnostics(self._session, rec)) + rec = self._session.get_xenapi().VM.get_record(vm) + return VMHelper.compile_diagnostics(self._session, rec) def get_console_output(self, instance): """ Return snapshot of console """ diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index c8c451dfe..55aadb989 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -48,10 +48,11 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. """ import logging +import sys import xmlrpclib -from twisted.internet import defer -from twisted.internet import reactor +from eventlet import event +from eventlet import tpool from nova import utils from nova import flags @@ -159,53 +160,51 @@ class XenAPISession(object): """ Return the xenapi host """ return self._session.xenapi.session.get_this_host(self._session.handle) - @utils.deferredToThread def call_xenapi(self, method, *args): - """Call the specified XenAPI method on a background thread. Returns - a Deferred for the result.""" + """Call the specified XenAPI method on a background thread.""" f = self._session.xenapi for m in method.split('.'): f = f.__getattr__(m) - return f(*args) + return tpool.execute(f, *args) - @utils.deferredToThread def async_call_plugin(self, plugin, fn, args): - """Call Async.host.call_plugin on a background thread. Returns a - Deferred with the task reference.""" - return _unwrap_plugin_exceptions( - self._session.xenapi.Async.host.call_plugin, - self.get_xenapi_host(), plugin, fn, args) + """Call Async.host.call_plugin on a background thread.""" + return tpool.execute(_unwrap_plugin_exceptions, + self._session.xenapi.Async.host.call_plugin, + self.get_xenapi_host(), plugin, fn, args) def wait_for_task(self, task): """Return a Deferred that will give the result of the given task. The task is polled until it completes.""" - d = defer.Deferred() - reactor.callLater(0, self._poll_task, task, d) - return d - @utils.deferredToThread - def _poll_task(self, task, deferred): + done = event.Event() + loop = utils.LoopingCall(self._poll_task, task, done) + loop.start(FLAGS.xenapi_task_poll_interval, now=True) + rv = done.wait() + loop.stop() + return rv + + def _poll_task(self, task, done): """Poll the given XenAPI task, and fire the given Deferred if we get a result.""" try: #logging.debug('Polling task %s...', task) status = self._session.xenapi.task.get_status(task) if status == 'pending': - reactor.callLater(FLAGS.xenapi_task_poll_interval, - self._poll_task, task, deferred) + return elif status == 'success': result = self._session.xenapi.task.get_result(task) logging.info(_('Task %s status: success. %s'), task, result) - deferred.callback(_parse_xmlrpc_value(result)) + done.send(_parse_xmlrpc_value(result)) else: error_info = self._session.xenapi.task.get_error_info(task) logging.warn(_('Task %s status: %s. %s'), task, status, error_info) - deferred.errback(XenAPI.Failure(error_info)) - #logging.debug('Polling task %s done.', task) + done.send_exception(XenAPI.Failure(error_info)) + #logging.debug('Polling task %s done.', task) except XenAPI.Failure, exc: logging.warn(exc) - deferred.errback(exc) + done.send_exception(*sys.exc_info()) def _unwrap_plugin_exceptions(func, *args, **kwargs): |
