summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
authorjaypipes@gmail.com <>2010-12-17 11:10:11 -0500
committerjaypipes@gmail.com <>2010-12-17 11:10:11 -0500
commitafae367d63c6d38faa5d9be54725df29c2a5b903 (patch)
treee2282e9bbc6ef6d4c5f339a4fde38127a619f354 /nova/virt
parentd283922defdda6ede5fa2e09656cd8d411a90096 (diff)
parentcd460a1f661eea7e050891f50a8218fdf24f2c6f (diff)
downloadnova-afae367d63c6d38faa5d9be54725df29c2a5b903.tar.gz
nova-afae367d63c6d38faa5d9be54725df29c2a5b903.tar.xz
nova-afae367d63c6d38faa5d9be54725df29c2a5b903.zip
Merge eventlet and resolve all conflicts
Diffstat (limited to 'nova/virt')
-rw-r--r--nova/virt/fake.py10
-rw-r--r--nova/virt/images.py11
-rw-r--r--nova/virt/libvirt_conn.py177
-rw-r--r--nova/virt/xenapi/network_utils.py10
-rw-r--r--nova/virt/xenapi/vm_utils.py28
-rw-r--r--nova/virt/xenapi/vmops.py67
-rw-r--r--nova/virt/xenapi_conn.py45
7 files changed, 153 insertions, 195 deletions
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index 26b01af91..b7bb0d7d7 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -25,8 +25,6 @@ semantics of real hypervisor connections.
"""
-from twisted.internet import defer
-
from nova import exception
from nova.compute import power_state
@@ -107,7 +105,6 @@ class FakeConnection(object):
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
- return defer.succeed(None)
def reboot(self, instance):
"""
@@ -119,19 +116,19 @@ class FakeConnection(object):
The work will be done asynchronously. This function returns a
Deferred that allows the caller to detect when it is complete.
"""
- return defer.succeed(None)
+ pass
def rescue(self, instance):
"""
Rescue the specified instance.
"""
- return defer.succeed(None)
+ pass
def unrescue(self, instance):
"""
Unrescue the specified instance.
"""
- return defer.succeed(None)
+ pass
def destroy(self, instance):
"""
@@ -144,7 +141,6 @@ class FakeConnection(object):
Deferred that allows the caller to detect when it is complete.
"""
del self.instances[instance.name]
- return defer.succeed(None)
def attach_volume(self, instance_name, device_path, mountpoint):
"""Attach the disk at device_path to the instance at mountpoint"""
diff --git a/nova/virt/images.py b/nova/virt/images.py
index 981aa5cf3..1c9b2e093 100644
--- a/nova/virt/images.py
+++ b/nova/virt/images.py
@@ -26,7 +26,7 @@ import time
import urlparse
from nova import flags
-from nova import process
+from nova import utils
from nova.auth import manager
from nova.auth import signer
from nova.objectstore import image
@@ -50,7 +50,7 @@ def _fetch_s3_image(image, path, user, project):
# This should probably move somewhere else, like e.g. a download_as
# method on User objects and at the same time get rewritten to use
- # twisted web client.
+ # a web client.
headers = {}
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
@@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project):
cmd = ['/usr/bin/curl', '--fail', '--silent', url]
for (k, v) in headers.iteritems():
- cmd += ['-H', '%s: %s' % (k, v)]
+ cmd += ['-H', '"%s: %s"' % (k, v)]
cmd += ['-o', path]
- return process.SharedPool().execute(executable=cmd[0], args=cmd[1:])
+ cmd_out = ' '.join(cmd)
+ return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
source = _image_path('%s/image' % image)
- return process.simple_execute('cp %s %s' % (source, path))
+ return utils.execute('cp %s %s' % (source, path))
def _image_path(path):
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index bc435f4b5..f485d078e 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -45,16 +45,15 @@ import logging
import os
import shutil
+from eventlet import event
+from eventlet import tpool
+
import IPy
-from twisted.internet import defer
-from twisted.internet import task
-from twisted.internet import threads
from nova import context
from nova import db
from nova import exception
from nova import flags
-from nova import process
from nova import utils
#from nova.api import context
from nova.auth import manager
@@ -184,14 +183,12 @@ class LibvirtConnection(object):
except Exception as _err:
pass
# If the instance is already terminated, we're still happy
- d = defer.Deferred()
- if cleanup:
- d.addCallback(lambda _: self._cleanup(instance))
- # FIXME: What does this comment mean?
- # TODO(termie): short-circuit me for tests
- # WE'LL save this for when we do shutdown,
+
+ done = event.Event()
+
+ # We'll save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_shutdown():
try:
@@ -200,17 +197,26 @@ class LibvirtConnection(object):
instance['id'], state)
if state == power_state.SHUTDOWN:
timer.stop()
- d.callback(None)
except Exception:
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_shutdown
- timer.start(interval=0.5, now=True)
- return d
+ timer_done = timer.start(interval=0.5, now=True)
+
+ # NOTE(termie): this is strictly superfluous (we could put the
+ # cleanup code in the timer), but this emulates the
+ # previous model so I am keeping it around until
+ # everything has been vetted a bit
+ def _wait_for_timer():
+ timer_done.wait()
+ self._cleanup(instance)
+ done.send()
+
+ greenthread.spawn(_wait_for_timer)
+ return done
def _cleanup(self, instance):
target = os.path.join(FLAGS.instances_path, instance['name'])
@@ -219,7 +225,6 @@ class LibvirtConnection(object):
if os.path.exists(target):
shutil.rmtree(target)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, instance_name, device_path, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@@ -230,7 +235,6 @@ class LibvirtConnection(object):
<target dev='%s' bus='virtio'/>
</disk>""" % (device_path, mount_device)
virt_dom.attachDevice(xml)
- yield
def _get_disk_xml(self, xml, device):
"""Returns the xml for the disk mounted at device"""
@@ -252,7 +256,6 @@ class LibvirtConnection(object):
if doc != None:
doc.freeDoc()
- @defer.inlineCallbacks
@exception.wrap_exception
def detach_volume(self, instance_name, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@@ -261,17 +264,13 @@ class LibvirtConnection(object):
if not xml:
raise exception.NotFound(_("No disk at %s") % mount_device)
virt_dom.detachDevice(xml)
- yield
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot(self, instance):
- yield self.destroy(instance, False)
+ self.destroy(instance, False)
xml = self.to_xml(instance)
- yield self._conn.createXML(xml, 0)
-
- d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ self._conn.createXML(xml, 0)
+ timer = utils.LoopingCall(f=None)
def _wait_for_reboot():
try:
@@ -281,33 +280,28 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug(_('instance %s: rebooted'), instance['name'])
timer.stop()
- d.callback(None)
except Exception, exn:
logging.error(_('_wait_for_reboot failed: %s'), exn)
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_reboot
- timer.start(interval=0.5, now=True)
- yield d
+ return timer.start(interval=0.5, now=True)
- @defer.inlineCallbacks
@exception.wrap_exception
def rescue(self, instance):
- yield self.destroy(instance, False)
+ self.destroy(instance, False)
xml = self.to_xml(instance, rescue=True)
rescue_images = {'image_id': FLAGS.rescue_image_id,
'kernel_id': FLAGS.rescue_kernel_id,
'ramdisk_id': FLAGS.rescue_ramdisk_id}
- yield self._create_image(instance, xml, 'rescue-', rescue_images)
- yield self._conn.createXML(xml, 0)
+ self._create_image(instance, xml, 'rescue-', rescue_images)
+ self._conn.createXML(xml, 0)
- d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_rescue():
try:
@@ -316,27 +310,22 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug(_('instance %s: rescued'), instance['name'])
timer.stop()
- d.callback(None)
except Exception, exn:
logging.error(_('_wait_for_rescue failed: %s'), exn)
db.instance_set_state(None,
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_rescue
- timer.start(interval=0.5, now=True)
- yield d
+ return timer.start(interval=0.5, now=True)
- @defer.inlineCallbacks
@exception.wrap_exception
def unrescue(self, instance):
# NOTE(vish): Because reboot destroys and recreates an instance using
# the normal xml file, we can just call reboot here
- yield self.reboot(instance)
+ self.reboot(instance)
- @defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
xml = self.to_xml(instance)
@@ -344,14 +333,12 @@ class LibvirtConnection(object):
instance['id'],
power_state.NOSTATE,
'launching')
- yield NWFilterFirewall(self._conn).\
- setup_nwfilters_for_instance(instance)
- yield self._create_image(instance, xml)
- yield self._conn.createXML(xml, 0)
+ NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance)
+ self._create_image(instance, xml)
+ self._conn.createXML(xml, 0)
logging.debug(_("instance %s: is running"), instance['name'])
- local_d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_boot():
try:
@@ -361,7 +348,6 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug(_('instance %s: booted'), instance['name'])
timer.stop()
- local_d.callback(None)
except:
logging.exception(_('instance %s: failed to boot'),
instance['name'])
@@ -369,10 +355,9 @@ class LibvirtConnection(object):
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- local_d.callback(None)
+
timer.f = _wait_for_boot
- timer.start(interval=0.5, now=True)
- yield local_d
+ return timer.start(interval=0.5, now=True)
def _flush_xen_console(self, virsh_output):
logging.info('virsh said: %r' % (virsh_output,))
@@ -380,10 +365,9 @@ class LibvirtConnection(object):
if virsh_output.startswith('/dev/'):
logging.info(_('cool, it\'s a device'))
- d = process.simple_execute("sudo dd if=%s iflag=nonblock" %
- virsh_output, check_exit_code=False)
- d.addCallback(lambda r: r[0])
- return d
+ out, err = utils.execute("sudo dd if=%s iflag=nonblock" %
+ virsh_output, check_exit_code=False)
+ return out
else:
return ''
@@ -403,21 +387,20 @@ class LibvirtConnection(object):
def get_console_output(self, instance):
console_log = os.path.join(FLAGS.instances_path, instance['name'],
'console.log')
- d = process.simple_execute('sudo chown %d %s' % (os.getuid(),
- console_log))
+
+ utils.execute('sudo chown %d %s' % (os.getuid(), console_log))
+
if FLAGS.libvirt_type == 'xen':
- # Xen is spethial
- d.addCallback(lambda _:
- process.simple_execute("virsh ttyconsole %s" %
- instance['name']))
- d.addCallback(self._flush_xen_console)
- d.addCallback(self._append_to_file, console_log)
+ # Xen is special
+ virsh_output = utils.execute("virsh ttyconsole %s" %
+ instance['name'])
+ data = self._flush_xen_console(virsh_output)
+ fpath = self._append_to_file(data, console_log)
else:
- d.addCallback(lambda _: defer.succeed(console_log))
- d.addCallback(self._dump_file)
- return d
+ fpath = console_log
+
+ return self._dump_file(fpath)
- @defer.inlineCallbacks
def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None):
# syntactic nicety
basepath = lambda fname = '', prefix = prefix: os.path.join(
@@ -426,8 +409,8 @@ class LibvirtConnection(object):
prefix + fname)
# ensure directories exist and are writable
- yield process.simple_execute('mkdir -p %s' % basepath(prefix=''))
- yield process.simple_execute('chmod 0777 %s' % basepath(prefix=''))
+ utils.execute('mkdir -p %s' % basepath(prefix=''))
+ utils.execute('chmod 0777 %s' % basepath(prefix=''))
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
@@ -448,19 +431,19 @@ class LibvirtConnection(object):
'kernel_id': inst['kernel_id'],
'ramdisk_id': inst['ramdisk_id']}
if not os.path.exists(basepath('disk')):
- yield images.fetch(inst.image_id, basepath('disk-raw'), user,
- project)
+ images.fetch(inst.image_id, basepath('disk-raw'), user,
+ project)
if not os.path.exists(basepath('kernel')):
- yield images.fetch(inst.kernel_id, basepath('kernel'), user,
- project)
+ images.fetch(inst.kernel_id, basepath('kernel'), user,
+ project)
if not os.path.exists(basepath('ramdisk')):
- yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user,
- project)
+ images.fetch(inst.ramdisk_id, basepath('ramdisk'), user,
+ project)
- execute = lambda cmd, process_input = None, check_exit_code = True: \
- process.simple_execute(cmd=cmd,
- process_input=process_input,
- check_exit_code=check_exit_code)
+ def execute(cmd, process_input=None, check_exit_code=True):
+ return utils.execute(cmd=cmd,
+ process_input=process_input,
+ check_exit_code=check_exit_code)
key = str(inst['key_data'])
net = None
@@ -482,11 +465,11 @@ class LibvirtConnection(object):
if net:
logging.info(_('instance %s: injecting net into image %s'),
inst['name'], inst.image_id)
- yield disk.inject_data(basepath('disk-raw'), key, net,
- execute=execute)
+ disk.inject_data(basepath('disk-raw'), key, net,
+ execute=execute)
if os.path.exists(basepath('disk')):
- yield process.simple_execute('rm -f %s' % basepath('disk'))
+ utils.execute('rm -f %s' % basepath('disk'))
local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type]
['local_gb']
@@ -495,12 +478,11 @@ class LibvirtConnection(object):
resize = True
if inst['instance_type'] == 'm1.tiny' or prefix == 'rescue-':
resize = False
- yield disk.partition(basepath('disk-raw'), basepath('disk'),
- local_bytes, resize, execute=execute)
+ disk.partition(basepath('disk-raw'), basepath('disk'),
+ local_bytes, resize, execute=execute)
if FLAGS.libvirt_type == 'uml':
- yield process.simple_execute('sudo chown root %s' %
- basepath('disk'))
+ utils.execute('sudo chown root %s' % basepath('disk'))
def to_xml(self, instance, rescue=False):
# TODO(termie): cache?
@@ -761,15 +743,15 @@ class NWFilterFirewall(object):
def _define_filter(self, xml):
if callable(xml):
xml = xml()
- d = threads.deferToThread(self._conn.nwfilterDefineXML, xml)
- return d
+
+ # execute in a native thread and block current greenthread until done
+ tpool.execute(self._conn.nwfilterDefineXML, xml)
@staticmethod
def _get_net_and_mask(cidr):
net = IPy.IP(cidr)
return str(net.net()), str(net.netmask())
- @defer.inlineCallbacks
def setup_nwfilters_for_instance(self, instance):
"""
Creates an NWFilter for the given instance. In the process,
@@ -777,10 +759,10 @@ class NWFilterFirewall(object):
the base filter are all in place.
"""
- yield self._define_filter(self.nova_base_ipv4_filter)
- yield self._define_filter(self.nova_base_ipv6_filter)
- yield self._define_filter(self.nova_dhcp_filter)
- yield self._define_filter(self.nova_base_filter)
+ self._define_filter(self.nova_base_ipv4_filter)
+ self._define_filter(self.nova_base_ipv6_filter)
+ self._define_filter(self.nova_dhcp_filter)
+ self._define_filter(self.nova_base_filter)
nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \
" <filterref filter='nova-base' />\n" % \
@@ -792,20 +774,19 @@ class NWFilterFirewall(object):
net, mask = self._get_net_and_mask(network_ref['cidr'])
project_filter = self.nova_project_filter(instance['project_id'],
net, mask)
- yield self._define_filter(project_filter)
+ self._define_filter(project_filter)
nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \
instance['project_id']
for security_group in instance.security_groups:
- yield self.ensure_security_group_filter(security_group['id'])
+ self.ensure_security_group_filter(security_group['id'])
nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \
security_group['id']
nwfilter_xml += "</filter>"
- yield self._define_filter(nwfilter_xml)
- return
+ self._define_filter(nwfilter_xml)
def ensure_security_group_filter(self, security_group_id):
return self._define_filter(
diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py
index 8cb4cce3a..012954394 100644
--- a/nova/virt/xenapi/network_utils.py
+++ b/nova/virt/xenapi/network_utils.py
@@ -20,8 +20,6 @@ records and their attributes like bridges, PIFs, QoS, as well as
their lookup functions.
"""
-from twisted.internet import defer
-
class NetworkHelper():
"""
@@ -31,14 +29,12 @@ class NetworkHelper():
return
@classmethod
- @defer.inlineCallbacks
def find_network_with_bridge(cls, session, bridge):
- """ Return the network on which the bridge is attached, if found """
+ """ Return the network on which the bridge is attached, if found."""
expr = 'field "bridge" = "%s"' % bridge
- networks = yield session.call_xenapi('network.get_all_records_where',
- expr)
+ networks = session.call_xenapi('network.get_all_records_where', expr)
if len(networks) == 1:
- defer.returnValue(networks.keys()[0])
+ return networks.keys()[0]
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
index 77edb576e..2f5d78e75 100644
--- a/nova/virt/xenapi/vm_utils.py
+++ b/nova/virt/xenapi/vm_utils.py
@@ -21,18 +21,16 @@ their attributes like VDIs, VIFs, as well as their lookup functions.
import logging
import urllib
-
-from twisted.internet import defer
from xml.dom import minidom
from nova import flags
from nova import utils
-
from nova.auth.manager import AuthManager
from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import images
+
FLAGS = flags.FLAGS
XENAPI_POWER_STATE = {
@@ -64,7 +62,6 @@ class VMHelper():
XenAPI = __import__('XenAPI')
@classmethod
- @defer.inlineCallbacks
def create_vm(cls, session, instance, kernel, ramdisk):
"""Create a VM record. Returns a Deferred that gives the new
VM reference."""
@@ -102,12 +99,11 @@ class VMHelper():
'other_config': {},
}
logging.debug('Created VM %s...', instance.name)
- vm_ref = yield session.call_xenapi('VM.create', rec)
+ vm_ref = session.call_xenapi('VM.create', rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- defer.returnValue(vm_ref)
+ return vm_ref
@classmethod
- @defer.inlineCallbacks
def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable):
"""Create a VBD record. Returns a Deferred that gives the new
VBD reference."""
@@ -126,13 +122,12 @@ class VMHelper():
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec)
+ vbd_ref = session.call_xenapi('VBD.create', vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
- defer.returnValue(vbd_ref)
+ return vbd_ref
@classmethod
- @defer.inlineCallbacks
def create_vif(cls, session, vm_ref, network_ref, mac_address):
"""Create a VIF record. Returns a Deferred that gives the new
VIF reference."""
@@ -148,13 +143,12 @@ class VMHelper():
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
- vif_ref = yield session.call_xenapi('VIF.create', vif_rec)
+ vif_ref = session.call_xenapi('VIF.create', vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
- defer.returnValue(vif_ref)
+ return vif_ref
@classmethod
- @defer.inlineCallbacks
def fetch_image(cls, session, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
@@ -171,12 +165,11 @@ class VMHelper():
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
- task = yield session.async_call_plugin('objectstore', fn, args)
- uuid = yield session.wait_for_task(task)
- defer.returnValue(uuid)
+ task = session.async_call_plugin('objectstore', fn, args)
+ uuid = session.wait_for_task(task)
+ return uuid
@classmethod
- @utils.deferredToThread
def lookup(cls, session, i):
""" Look the instance i up, and returns it if available """
return VMHelper.lookup_blocking(session, i)
@@ -194,7 +187,6 @@ class VMHelper():
return vms[0]
@classmethod
- @utils.deferredToThread
def lookup_vm_vdis(cls, session, vm):
""" Look for the VDIs that are attached to the VM """
return VMHelper.lookup_vm_vdis_blocking(session, vm)
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 9bfd07267..3034df9e1 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -20,8 +20,6 @@ Management class for VM-related functions (spawn, reboot, etc).
import logging
-from twisted.internet import defer
-
from nova import db
from nova import context
@@ -49,10 +47,9 @@ class VMOps(object):
return [self._session.get_xenapi().VM.get_name_label(vm) \
for vm in self._session.get_xenapi().VM.get_all()]
- @defer.inlineCallbacks
def spawn(self, instance):
""" Create VM instance """
- vm = yield VMHelper.lookup(self._session, instance.name)
+ vm = VMHelper.lookup(self._session, instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
@@ -60,66 +57,63 @@ class VMOps(object):
bridge = db.project_get_network(context.get_admin_context(),
instance.project_id).bridge
network_ref = \
- yield NetworkHelper.find_network_with_bridge(self._session, bridge)
+ NetworkHelper.find_network_with_bridge(self._session, bridge)
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
- vdi_uuid = yield VMHelper.fetch_image(self._session,
- instance.image_id, user, project, True)
- kernel = yield VMHelper.fetch_image(self._session,
- instance.kernel_id, user, project, False)
- ramdisk = yield VMHelper.fetch_image(self._session,
- instance.ramdisk_id, user, project, False)
- vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
- vm_ref = yield VMHelper.create_vm(self._session,
- instance, kernel, ramdisk)
- yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
+ vdi_uuid = VMHelper.fetch_image(
+ self._session, instance.image_id, user, project, True)
+ kernel = VMHelper.fetch_image(
+ self._session, instance.kernel_id, user, project, False)
+ ramdisk = VMHelper.fetch_image(
+ self._session, instance.ramdisk_id, user, project, False)
+ vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
+ vm_ref = VMHelper.create_vm(
+ self._session, instance, kernel, ramdisk)
+ VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
if network_ref:
- yield VMHelper.create_vif(self._session, vm_ref,
- network_ref, instance.mac_address)
+ VMHelper.create_vif(self._session, vm_ref,
+ network_ref, instance.mac_address)
logging.debug('Starting VM %s...', vm_ref)
- yield self._session.call_xenapi('VM.start', vm_ref, False, False)
+ self._session.call_xenapi('VM.start', vm_ref, False, False)
logging.info('Spawning VM %s created %s.', instance.name,
vm_ref)
- @defer.inlineCallbacks
def reboot(self, instance):
""" Reboot VM instance """
instance_name = instance.name
- vm = yield VMHelper.lookup(self._session, instance_name)
+ vm = VMHelper.lookup(self._session, instance_name)
if vm is None:
raise Exception('instance not present %s' % instance_name)
- task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm)
- yield self._session.wait_for_task(task)
+ task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
+ self._session.wait_for_task(task)
- @defer.inlineCallbacks
def destroy(self, instance):
""" Destroy VM instance """
- vm = yield VMHelper.lookup(self._session, instance.name)
+ vm = VMHelper.lookup(self._session, instance.name)
if vm is None:
# Don't complain, just return. This lets us clean up instances
# that have already disappeared from the underlying platform.
- defer.returnValue(None)
+ return
# Get the VDIs related to the VM
- vdis = yield VMHelper.lookup_vm_vdis(self._session, vm)
+ vdis = VMHelper.lookup_vm_vdis(self._session, vm)
try:
- task = yield self._session.call_xenapi('Async.VM.hard_shutdown',
+ task = self._session.call_xenapi('Async.VM.hard_shutdown',
vm)
- yield self._session.wait_for_task(task)
+ self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
# Disk clean-up
if vdis:
for vdi in vdis:
try:
- task = yield self._session.call_xenapi('Async.VDI.destroy',
- vdi)
- yield self._session.wait_for_task(task)
+ task = self._session.call_xenapi('Async.VDI.destroy', vdi)
+ self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
try:
- task = yield self._session.call_xenapi('Async.VM.destroy', vm)
- yield self._session.wait_for_task(task)
+ task = self._session.call_xenapi('Async.VM.destroy', vm)
+ self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
@@ -131,14 +125,13 @@ class VMOps(object):
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_info(rec)
- @defer.inlineCallbacks
def get_diagnostics(self, instance_id):
"""Return data about VM diagnostics"""
- vm = yield VMHelper.lookup(self._session, instance_id)
+ vm = VMHelper.lookup(self._session, instance_id)
if vm is None:
raise Exception("instance not present %s" % instance_id)
- rec = yield self._session.get_xenapi().VM.get_record(vm)
- defer.returnValue(VMHelper.compile_diagnostics(self._session, rec))
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ return VMHelper.compile_diagnostics(self._session, rec)
def get_console_output(self, instance):
""" Return snapshot of console """
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index c8c451dfe..55aadb989 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -48,10 +48,11 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import logging
+import sys
import xmlrpclib
-from twisted.internet import defer
-from twisted.internet import reactor
+from eventlet import event
+from eventlet import tpool
from nova import utils
from nova import flags
@@ -159,53 +160,51 @@ class XenAPISession(object):
""" Return the xenapi host """
return self._session.xenapi.session.get_this_host(self._session.handle)
- @utils.deferredToThread
def call_xenapi(self, method, *args):
- """Call the specified XenAPI method on a background thread. Returns
- a Deferred for the result."""
+ """Call the specified XenAPI method on a background thread."""
f = self._session.xenapi
for m in method.split('.'):
f = f.__getattr__(m)
- return f(*args)
+ return tpool.execute(f, *args)
- @utils.deferredToThread
def async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread. Returns a
- Deferred with the task reference."""
- return _unwrap_plugin_exceptions(
- self._session.xenapi.Async.host.call_plugin,
- self.get_xenapi_host(), plugin, fn, args)
+ """Call Async.host.call_plugin on a background thread."""
+ return tpool.execute(_unwrap_plugin_exceptions,
+ self._session.xenapi.Async.host.call_plugin,
+ self.get_xenapi_host(), plugin, fn, args)
def wait_for_task(self, task):
"""Return a Deferred that will give the result of the given task.
The task is polled until it completes."""
- d = defer.Deferred()
- reactor.callLater(0, self._poll_task, task, d)
- return d
- @utils.deferredToThread
- def _poll_task(self, task, deferred):
+ done = event.Event()
+ loop = utils.LoopingCall(self._poll_task, task, done)
+ loop.start(FLAGS.xenapi_task_poll_interval, now=True)
+ rv = done.wait()
+ loop.stop()
+ return rv
+
+ def _poll_task(self, task, done):
"""Poll the given XenAPI task, and fire the given Deferred if we
get a result."""
try:
#logging.debug('Polling task %s...', task)
status = self._session.xenapi.task.get_status(task)
if status == 'pending':
- reactor.callLater(FLAGS.xenapi_task_poll_interval,
- self._poll_task, task, deferred)
+ return
elif status == 'success':
result = self._session.xenapi.task.get_result(task)
logging.info(_('Task %s status: success. %s'), task, result)
- deferred.callback(_parse_xmlrpc_value(result))
+ done.send(_parse_xmlrpc_value(result))
else:
error_info = self._session.xenapi.task.get_error_info(task)
logging.warn(_('Task %s status: %s. %s'), task, status,
error_info)
- deferred.errback(XenAPI.Failure(error_info))
- #logging.debug('Polling task %s done.', task)
+ done.send_exception(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
except XenAPI.Failure, exc:
logging.warn(exc)
- deferred.errback(exc)
+ done.send_exception(*sys.exc_info())
def _unwrap_plugin_exceptions(func, *args, **kwargs):