summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
authorJustin Santa Barbara <justinsb@justinsb-desktop>2010-10-14 12:59:36 -0700
committerJustin Santa Barbara <justinsb@justinsb-desktop>2010-10-14 12:59:36 -0700
commitd8643f1e15f241db96893d1ea41083a2bee65dbd (patch)
tree12e9e85733306f97b12b99339edbe49ef4031418 /nova/virt
parent759bab6059ef2e4c463a73e12fe85fe4b147eba7 (diff)
parent3363b133a927509432cb42d77abf18d3d5248abf (diff)
downloadnova-d8643f1e15f241db96893d1ea41083a2bee65dbd.tar.gz
nova-d8643f1e15f241db96893d1ea41083a2bee65dbd.tar.xz
nova-d8643f1e15f241db96893d1ea41083a2bee65dbd.zip
Merged with trunk, fixed broken stuff
Diffstat (limited to 'nova/virt')
-rw-r--r--nova/virt/connection.py6
-rw-r--r--nova/virt/fake.py38
-rw-r--r--nova/virt/images.py1
-rw-r--r--nova/virt/interfaces.template1
-rw-r--r--nova/virt/libvirt.qemu.xml.template5
-rw-r--r--nova/virt/libvirt.uml.xml.template27
-rw-r--r--nova/virt/libvirt.xen.xml.template35
-rw-r--r--nova/virt/libvirt_conn.py492
-rw-r--r--nova/virt/xenapi.py241
9 files changed, 671 insertions, 175 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 90bc7fa0a..34e37adf7 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -17,6 +17,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+"""Abstraction of the underlying virtualization API"""
+
+import logging
+import sys
+
from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
@@ -35,7 +40,6 @@ def get_connection(read_only=False):
Any object returned here must conform to the interface documented by
FakeConnection.
"""
-
# TODO(termie): maybe lazy load after initial check for permissions
# TODO(termie): check whether we can be disconnected
t = FLAGS.connection_type
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index 155833f3f..dc6112f20 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -39,12 +39,12 @@ class FakeConnection(object):
The interface to this class talks in terms of 'instances' (Amazon EC2 and
internal Nova terminology), by which we mean 'running virtual machine'
(XenAPI terminology) or domain (Xen or libvirt terminology).
-
+
An instance has an ID, which is the identifier chosen by Nova to represent
the instance further up the stack. This is unfortunately also called a
'name' elsewhere. As far as this layer is concerned, 'instance ID' and
'instance name' are synonyms.
-
+
Note that the instance ID or name is not human-readable or
customer-controlled -- it's an internal ID chosen by Nova. At the
nova.virt layer, instances do not have human-readable names at all -- such
@@ -101,7 +101,7 @@ class FakeConnection(object):
cleaned up, and the virtualization platform should be in the state
that it was before this call began.
"""
-
+
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
@@ -132,7 +132,15 @@ class FakeConnection(object):
del self.instances[instance.name]
return defer.succeed(None)
- def get_info(self, instance_id):
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ """Attach the disk at device_path to the instance at mountpoint"""
+ return True
+
+ def detach_volume(self, instance_name, mountpoint):
+ """Detach the disk attached to the instance at mountpoint"""
+ return True
+
+ def get_info(self, instance_name):
"""
Get a block of information about the given instance. This is returned
as a dictionary containing 'state': The power_state of the instance,
@@ -141,42 +149,42 @@ class FakeConnection(object):
of virtual CPUs the instance has, 'cpu_time': The total CPU time used
by the instance, in nanoseconds.
"""
- i = self.instances[instance_id]
+ i = self.instances[instance_name]
return {'state': i._state,
'max_mem': 0,
'mem': 0,
'num_cpu': 2,
'cpu_time': 0}
- def list_disks(self, instance_id):
+ def list_disks(self, instance_name):
"""
Return the IDs of all the virtual disks attached to the specified
instance, as a list. These IDs are opaque to the caller (they are
only useful for giving back to this layer as a parameter to
disk_stats). These IDs only need to be unique for a given instance.
-
+
Note that this function takes an instance ID, not a
compute.service.Instance, so that it can be called by compute.monitor.
"""
return ['A_DISK']
- def list_interfaces(self, instance_id):
+ def list_interfaces(self, instance_name):
"""
Return the IDs of all the virtual network interfaces attached to the
specified instance, as a list. These IDs are opaque to the caller
(they are only useful for giving back to this layer as a parameter to
interface_stats). These IDs only need to be unique for a given
instance.
-
+
Note that this function takes an instance ID, not a
compute.service.Instance, so that it can be called by compute.monitor.
"""
return ['A_VIF']
- def block_stats(self, instance_id, disk_id):
+ def block_stats(self, instance_name, disk_id):
"""
Return performance counters associated with the given disk_id on the
- given instance_id. These are returned as [rd_req, rd_bytes, wr_req,
+ given instance_name. These are returned as [rd_req, rd_bytes, wr_req,
wr_bytes, errs], where rd indicates read, wr indicates write, req is
the total number of I/O requests made, bytes is the total number of
bytes transferred, and errs is the number of requests held up due to a
@@ -188,13 +196,13 @@ class FakeConnection(object):
statistics can be retrieved directly in aggregate form, without Nova
having to do the aggregation. On those platforms, this method is
unused.
-
+
Note that this function takes an instance ID, not a
compute.service.Instance, so that it can be called by compute.monitor.
"""
return [0L, 0L, 0L, 0L, null]
- def interface_stats(self, instance_id, iface_id):
+ def interface_stats(self, instance_name, iface_id):
"""
Return performance counters associated with the given iface_id on the
given instance_id. These are returned as [rx_bytes, rx_packets,
@@ -209,12 +217,14 @@ class FakeConnection(object):
statistics can be retrieved directly in aggregate form, without Nova
having to do the aggregation. On those platforms, this method is
unused.
-
+
Note that this function takes an instance ID, not a
compute.service.Instance, so that it can be called by compute.monitor.
"""
return [0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L]
+ def get_console_output(self, instance):
+ return 'FAKE CONSOLE OUTPUT'
class FakeInstance(object):
def __init__(self):
diff --git a/nova/virt/images.py b/nova/virt/images.py
index a60bcc4c1..dc50764d9 100644
--- a/nova/virt/images.py
+++ b/nova/virt/images.py
@@ -29,6 +29,7 @@ from nova import flags
from nova import process
from nova.auth import manager
from nova.auth import signer
+from nova.objectstore import image
FLAGS = flags.FLAGS
diff --git a/nova/virt/interfaces.template b/nova/virt/interfaces.template
index 11df301f6..87b92b84a 100644
--- a/nova/virt/interfaces.template
+++ b/nova/virt/interfaces.template
@@ -10,7 +10,6 @@ auto eth0
iface eth0 inet static
address %(address)s
netmask %(netmask)s
- network %(network)s
broadcast %(broadcast)s
gateway %(gateway)s
dns-nameservers %(dns)s
diff --git a/nova/virt/libvirt.qemu.xml.template b/nova/virt/libvirt.qemu.xml.template
index 8a7c64f88..739eceaaa 100644
--- a/nova/virt/libvirt.qemu.xml.template
+++ b/nova/virt/libvirt.qemu.xml.template
@@ -24,11 +24,14 @@
<source bridge='${bridge_name}'/>
<mac address='${mac_address}'/>
<!-- <model type='virtio'/> CANT RUN virtio network right now -->
+ <filterref filter="nova-instance-%(name)s">
+ <parameter name="IP" value="%(ip_address)s" />
+ <parameter name="DHCPSERVER" value="%(dhcp_server)s" />
+ </filterref>
</interface>
<serial type="file">
<source path='${basepath}/console.log'/>
<target port='1'/>
</serial>
</devices>
- <nova>${nova}</nova>
</domain>
diff --git a/nova/virt/libvirt.uml.xml.template b/nova/virt/libvirt.uml.xml.template
index 6f4290f98..f6e5fad69 100644
--- a/nova/virt/libvirt.uml.xml.template
+++ b/nova/virt/libvirt.uml.xml.template
@@ -1,25 +1,26 @@
-<domain type='%(type)s'>
- <name>%(name)s</name>
- <memory>%(memory_kb)s</memory>
+<domain type='${type}'>
+ <name>${name}</name>
+ <memory>${memory_kb}</memory>
<os>
- <type>%(type)s</type>
+ <type>${type}</type>
<kernel>/usr/bin/linux</kernel>
<root>/dev/ubda1</root>
</os>
<devices>
<disk type='file'>
- <source file='%(basepath)s/disk'/>
+ <source file='${disk}'/>
<target dev='ubd0' bus='uml'/>
</disk>
<interface type='bridge'>
- <source bridge='%(bridge_name)s'/>
- <mac address='%(mac_address)s'/>
+ <source bridge='${bridge_name}'/>
+ <mac address='${mac_address}'/>
+ <filterref filter="nova-instance-${name}">
+ <parameter name="IP" value="${ip_address}" />
+ <parameter name="DHCPSERVER" value="${dhcp_server}" />
+ </filterref>
</interface>
- <console type="pty" />
- <serial type="file">
- <source path='%(basepath)s/console.log'/>
- <target port='1'/>
- </serial>
+ <console type="file">
+ <source path='${basepath}/console.log'/>
+ </console>
</devices>
- <nova>%(nova)s</nova>
</domain>
diff --git a/nova/virt/libvirt.xen.xml.template b/nova/virt/libvirt.xen.xml.template
new file mode 100644
index 000000000..9508ad3b7
--- /dev/null
+++ b/nova/virt/libvirt.xen.xml.template
@@ -0,0 +1,35 @@
+<domain type='${type}'>
+ <name>${name}</name>
+ <os>
+ <type>linux</type>
+#if $getVar('kernel', None)
+ <kernel>${kernel}</kernel>
+ #if $getVar('ramdisk', None)
+ <initrd>${ramdisk}</initrd>
+ #end if
+ <cmdline>root=/dev/vda1 console=ttyS0</cmdline>
+#end if
+ <root>/dev/xvda1</root>
+ <cmdline>ro</cmdline>
+ </os>
+ <features>
+ <acpi/>
+ </features>
+ <memory>${memory_kb}</memory>
+ <vcpu>${vcpus}</vcpu>
+ <devices>
+ <disk type='file'>
+ <source file='${disk}'/>
+ <target dev='sda' />
+ </disk>
+ <interface type='bridge'>
+ <source bridge='${bridge_name}'/>
+ <mac address='${mac_address}'/>
+ </interface>
+ <console type="file">
+ <source path='${basepath}/console.log'/>
+ <target port='1'/>
+ </console>
+ </devices>
+</domain>
+
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 7f7c4a131..ba5d6dbac 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -21,18 +21,21 @@
A connection to a hypervisor (e.g. KVM) through libvirt.
"""
-import json
import logging
-import os.path
+import os
import shutil
+import IPy
from twisted.internet import defer
from twisted.internet import task
+from twisted.internet import threads
+from nova import db
from nova import exception
from nova import flags
from nova import process
from nova import utils
+#from nova.api import context
from nova.auth import manager
from nova.compute import disk
from nova.compute import instance_types
@@ -49,6 +52,9 @@ FLAGS = flags.FLAGS
flags.DEFINE_string('libvirt_xml_template',
utils.abspath('virt/libvirt.qemu.xml.template'),
'Libvirt XML Template for QEmu/KVM')
+flags.DEFINE_string('libvirt_xen_xml_template',
+ utils.abspath('virt/libvirt.xen.xml.template'),
+ 'Libvirt XML Template for Xen')
flags.DEFINE_string('libvirt_uml_xml_template',
utils.abspath('virt/libvirt.uml.xml.template'),
'Libvirt XML Template for user-mode-linux')
@@ -57,11 +63,14 @@ flags.DEFINE_string('injected_network_template',
'Template file for injected network')
flags.DEFINE_string('libvirt_type',
'kvm',
- 'Libvirt domain type (valid options are: kvm, qemu, uml)')
+ 'Libvirt domain type (valid options are: kvm, qemu, uml, xen)')
flags.DEFINE_string('libvirt_uri',
'',
'Override the default libvirt URI (which is dependent'
' on libvirt_type)')
+flags.DEFINE_bool('allow_project_net_traffic',
+ True,
+ 'Whether to allow in project network traffic')
def get_connection(read_only):
# These are loaded late so that there's no need to install these
@@ -85,14 +94,29 @@ class LibvirtConnection(object):
@property
def _conn(self):
- if not self._wrapped_conn:
+ if not self._wrapped_conn or not self._test_connection():
+ logging.debug('Connecting to libvirt: %s' % self.libvirt_uri)
self._wrapped_conn = self._connect(self.libvirt_uri, self.read_only)
return self._wrapped_conn
+ def _test_connection(self):
+ try:
+ self._wrapped_conn.getInfo()
+ return True
+ except libvirt.libvirtError as e:
+ if e.get_error_code() == libvirt.VIR_ERR_SYSTEM_ERROR and \
+ e.get_error_domain() == libvirt.VIR_FROM_REMOTE:
+ logging.debug('Connection to libvirt broke')
+ return False
+ raise
+
def get_uri_and_template(self):
if FLAGS.libvirt_type == 'uml':
uri = FLAGS.libvirt_uri or 'uml:///system'
template_file = FLAGS.libvirt_uml_xml_template
+ elif FLAGS.libvirt_type == 'xen':
+ uri = FLAGS.libvirt_uri or 'xen:///'
+ template_file = FLAGS.libvirt_xen_xml_template
else:
uri = FLAGS.libvirt_uri or 'qemu:///system'
template_file = FLAGS.libvirt_xml_template
@@ -113,26 +137,29 @@ class LibvirtConnection(object):
def destroy(self, instance):
try:
- virt_dom = self._conn.lookupByName(instance.name)
+ virt_dom = self._conn.lookupByName(instance['name'])
virt_dom.destroy()
- except Exception, _err:
+ except Exception as _err:
pass
# If the instance is already terminated, we're still happy
d = defer.Deferred()
d.addCallback(lambda _: self._cleanup(instance))
# FIXME: What does this comment mean?
# TODO(termie): short-circuit me for tests
- # WE'LL save this for when we do shutdown,
+ # WE'LL save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
timer = task.LoopingCall(f=None)
def _wait_for_shutdown():
try:
- instance.update_state()
- if instance.state == power_state.SHUTDOWN:
+ state = self.get_info(instance['name'])['state']
+ db.instance_set_state(None, instance['id'], state)
+ if state == power_state.SHUTDOWN:
timer.stop()
d.callback(None)
except Exception:
- instance.set_state(power_state.SHUTDOWN)
+ db.instance_set_state(None,
+ instance['id'],
+ power_state.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_shutdown
@@ -140,30 +167,51 @@ class LibvirtConnection(object):
return d
def _cleanup(self, instance):
- target = os.path.abspath(instance.datamodel['basepath'])
- logging.info("Deleting instance files at %s", target)
+ target = os.path.join(FLAGS.instances_path, instance['name'])
+ logging.info('instance %s: deleting instance files %s',
+ instance['name'], target)
if os.path.exists(target):
shutil.rmtree(target)
@defer.inlineCallbacks
@exception.wrap_exception
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ yield process.simple_execute("sudo virsh attach-disk %s %s %s" %
+ (instance_name,
+ device_path,
+ mountpoint.rpartition('/dev/')[2]))
+
+ @defer.inlineCallbacks
+ @exception.wrap_exception
+ def detach_volume(self, instance_name, mountpoint):
+ # NOTE(vish): despite the documentation, virsh detach-disk just
+ # wants the device name without the leading /dev/
+ yield process.simple_execute("sudo virsh detach-disk %s %s" %
+ (instance_name,
+ mountpoint.rpartition('/dev/')[2]))
+
+ @defer.inlineCallbacks
+ @exception.wrap_exception
def reboot(self, instance):
- xml = self.toXml(instance)
- yield self._conn.lookupByName(instance.name).destroy()
+ xml = self.to_xml(instance)
+ yield self._conn.lookupByName(instance['name']).destroy()
yield self._conn.createXML(xml, 0)
d = defer.Deferred()
timer = task.LoopingCall(f=None)
def _wait_for_reboot():
try:
- instance.update_state()
- if instance.is_running():
- logging.debug('rebooted instance %s' % instance.name)
+ state = self.get_info(instance['name'])['state']
+ db.instance_set_state(None, instance['id'], state)
+ if state == power_state.RUNNING:
+ logging.debug('instance %s: rebooted', instance['name'])
timer.stop()
d.callback(None)
except Exception, exn:
- logging.error('_wait_for_reboot failed: %s' % exn)
- instance.set_state(power_state.SHUTDOWN)
+ logging.error('_wait_for_reboot failed: %s', exn)
+ db.instance_set_state(None,
+ instance['id'],
+ power_state.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_reboot
@@ -173,38 +221,86 @@ class LibvirtConnection(object):
@defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
- xml = self.toXml(instance)
- instance.set_state(power_state.NOSTATE, 'launching')
+ xml = self.to_xml(instance)
+ db.instance_set_state(None,
+ instance['id'],
+ power_state.NOSTATE,
+ 'launching')
+ yield NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance)
yield self._create_image(instance, xml)
yield self._conn.createXML(xml, 0)
# TODO(termie): this should actually register
# a callback to check for successful boot
- logging.debug("Instance is running")
+ logging.debug("instance %s: is running", instance['name'])
local_d = defer.Deferred()
timer = task.LoopingCall(f=None)
def _wait_for_boot():
try:
- instance.update_state()
- if instance.is_running():
- logging.debug('booted instance %s' % instance.name)
+ state = self.get_info(instance['name'])['state']
+ db.instance_set_state(None, instance['id'], state)
+ if state == power_state.RUNNING:
+ logging.debug('instance %s: booted', instance['name'])
timer.stop()
local_d.callback(None)
- except Exception, exn:
- logging.error("_wait_for_boot exception %s" % exn)
- self.set_state(power_state.SHUTDOWN)
- logging.error('Failed to boot instance %s' % instance.name)
+ except:
+ logging.exception('instance %s: failed to boot',
+ instance['name'])
+ db.instance_set_state(None,
+ instance['id'],
+ power_state.SHUTDOWN)
timer.stop()
local_d.callback(None)
timer.f = _wait_for_boot
timer.start(interval=0.5, now=True)
yield local_d
+ def _flush_xen_console(self, virsh_output):
+ logging.info('virsh said: %r' % (virsh_output,))
+ virsh_output = virsh_output[0].strip()
+
+ if virsh_output.startswith('/dev/'):
+ logging.info('cool, it\'s a device')
+ d = process.simple_execute("sudo dd if=%s iflag=nonblock" % virsh_output, check_exit_code=False)
+ d.addCallback(lambda r:r[0])
+ return d
+ else:
+ return ''
+
+ def _append_to_file(self, data, fpath):
+ logging.info('data: %r, fpath: %r' % (data, fpath))
+ fp = open(fpath, 'a+')
+ fp.write(data)
+ return fpath
+
+ def _dump_file(self, fpath):
+ fp = open(fpath, 'r+')
+ contents = fp.read()
+ logging.info('Contents: %r' % (contents,))
+ return contents
+
+ @exception.wrap_exception
+ def get_console_output(self, instance):
+ console_log = os.path.join(FLAGS.instances_path, instance['internal_id'], 'console.log')
+ logging.info('console_log: %s' % console_log)
+ logging.info('FLAGS.libvirt_type: %s' % FLAGS.libvirt_type)
+ if FLAGS.libvirt_type == 'xen':
+ # Xen is spethial
+ d = process.simple_execute("virsh ttyconsole %s" % instance['name'])
+ d.addCallback(self._flush_xen_console)
+ d.addCallback(self._append_to_file, console_log)
+ else:
+ d = defer.succeed(console_log)
+ d.addCallback(self._dump_file)
+ return d
+
+
@defer.inlineCallbacks
- def _create_image(self, instance, libvirt_xml):
+ def _create_image(self, inst, libvirt_xml):
# syntactic nicety
- data = instance.datamodel
- basepath = lambda x='': self.basepath(instance, x)
+ basepath = lambda fname='': os.path.join(FLAGS.instances_path,
+ inst['name'],
+ fname)
# ensure directories exist and are writable
yield process.simple_execute('mkdir -p %s' % basepath())
@@ -213,75 +309,100 @@ class LibvirtConnection(object):
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
- logging.info('Creating image for: %s', data['instance_id'])
+ logging.info('instance %s: Creating image', inst['name'])
f = open(basepath('libvirt.xml'), 'w')
f.write(libvirt_xml)
f.close()
- user = manager.AuthManager().get_user(data['user_id'])
- project = manager.AuthManager().get_project(data['project_id'])
+ os.close(os.open(basepath('console.log'), os.O_CREAT | os.O_WRONLY, 0660))
+
+ user = manager.AuthManager().get_user(inst['user_id'])
+ project = manager.AuthManager().get_project(inst['project_id'])
+
if not os.path.exists(basepath('disk')):
- yield images.fetch(data['image_id'], basepath('disk-raw'), user, project)
-
- using_kernel = data['kernel_id'] and True
+ yield images.fetch(inst.image_id, basepath('disk-raw'), user, project)
+ using_kernel = inst.kernel_id and True
if using_kernel:
if not os.path.exists(basepath('kernel')):
- yield images.fetch(data['kernel_id'], basepath('kernel'), user, project)
+ yield images.fetch(inst.kernel_id, basepath('kernel'), user, project)
if not os.path.exists(basepath('ramdisk')):
- yield images.fetch(data['ramdisk_id'], basepath('ramdisk'), user, project)
+ yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, project)
execute = lambda cmd, process_input=None: \
process.simple_execute(cmd=cmd,
process_input=process_input,
check_exit_code=True)
- # For now, we assume that if we're not using a kernel, we're using a partitioned disk image
- # where the target partition is the first partition
+ # For now, we assume that if we're not using a kernel, we're using a
+ # partitioned disk image where the target partition is the first
+ # partition
target_partition = None
if not using_kernel:
target_partition = "1"
- key = data['key_data']
+ key = str(inst['key_data'])
net = None
- if data.get('inject_network', False):
+ network_ref = db.network_get_by_instance(None, inst['id'])
+ if network_ref['injected']:
+ address = db.instance_get_fixed_address(None, inst['id'])
with open(FLAGS.injected_network_template) as f:
- net = f.read() % {'address': data['private_dns_name'],
- 'network': data['network_network'],
- 'netmask': data['network_netmask'],
- 'gateway': data['network_gateway'],
- 'broadcast': data['network_broadcast'],
- 'dns': data['network_dns']}
+ net = f.read() % {'address': address,
+ 'netmask': network_ref['netmask'],
+ 'gateway': network_ref['gateway'],
+ 'broadcast': network_ref['broadcast'],
+ 'dns': network_ref['dns']}
if key or net:
- logging.info('Injecting data into image %s', data['image_id'])
+ if key:
+ logging.info('instance %s: injecting key into image %s',
+ inst['name'], inst.image_id)
+ if net:
+ logging.info('instance %s: injecting net into image %s',
+ inst['name'], inst.image_id)
try:
- yield disk.inject_data(basepath('disk-raw'), key=key, net=net, dns=dns, remove_network_udev=True, partition=target_partition, execute=execute)
+ yield disk.inject_data(basepath('disk-raw'), key, net,
+ partition=target_partition,
+ execute=execute)
except Exception as e:
# This could be a windows image, or a vmdk format disk
- logging.warn('Could not inject data; ignoring. (%s)' % e)
+ logging.warn('instance %s: ignoring error injecting data'
+ ' into image %s (%s)',
+ inst['name'], inst.image_id, e)
if using_kernel:
if os.path.exists(basepath('disk')):
yield process.simple_execute('rm -f %s' % basepath('disk'))
- bytes = (instance_types.INSTANCE_TYPES[data['instance_type']]['local_gb']
+ bytes = (instance_types.INSTANCE_TYPES[inst.instance_type]['local_gb']
* 1024 * 1024 * 1024)
yield disk.partition(
basepath('disk-raw'), basepath('disk'), bytes, execute=execute)
- def basepath(self, instance, path=''):
- return os.path.abspath(os.path.join(instance.datamodel['basepath'], path))
+ if FLAGS.libvirt_type == 'uml':
+ yield process.simple_execute('sudo chown root %s' %
+ basepath('disk'))
- def toXml(self, instance):
+ def to_xml(self, instance):
# TODO(termie): cache?
- logging.debug("Starting the toXML method")
- template_contents = open(FLAGS.libvirt_xml_template).read()
- xml_info = instance.datamodel.copy()
- # TODO(joshua): Make this xml express the attached disks as well
-
- # TODO(termie): lazy lazy hack because xml is annoying
- xml_info['nova'] = json.dumps(instance.datamodel.copy())
-
+ logging.debug('instance %s: starting toXML method', instance['name'])
+ network = db.project_get_network(None,
+ instance['project_id'])
+ # FIXME(vish): stick this in db
+ instance_type = instance_types.INSTANCE_TYPES[instance['instance_type']]
+ ip_address = db.instance_get_fixed_address({}, instance['id'])
+ # Assume that the gateway also acts as the dhcp server.
+ dhcp_server = network['gateway']
+ xml_info = {'type': FLAGS.libvirt_type,
+ 'name': instance['name'],
+ 'basepath': os.path.join(FLAGS.instances_path,
+ instance['name']),
+ 'memory_kb': instance_type['memory_mb'] * 1024,
+ 'vcpus': instance_type['vcpus'],
+ 'bridge_name': network['bridge'],
+ 'mac_address': instance['mac_address'],
+ 'ip_address': ip_address,
+ 'dhcp_server': dhcp_server }
+
if xml_info['kernel_id']:
xml_info['kernel'] = xml_info['basepath'] + "/kernel"
@@ -293,16 +414,13 @@ class LibvirtConnection(object):
else:
xml_info['disk'] = xml_info['basepath'] + "/disk-raw"
- xml_info['type'] = FLAGS.libvirt_type
-
- libvirt_xml = str(Template(template_contents, searchList=[ xml_info ] ))
-
- logging.debug("Finished the toXML method")
+ xml = str(Template(self.libvirt_xml, searchList=[ xml_info ] ))
+ logging.debug('instance %s: finished toXML method', instance['name'])
- return libvirt_xml
+ return xml
- def get_info(self, instance_id):
- virt_dom = self._conn.lookupByName(instance_id)
+ def get_info(self, instance_name):
+ virt_dom = self._conn.lookupByName(instance_name)
(state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info()
return {'state': state,
'max_mem': max_mem,
@@ -310,8 +428,14 @@ class LibvirtConnection(object):
'num_cpu': num_cpu,
'cpu_time': cpu_time}
- def get_disks(self, instance_id):
- domain = self._conn.lookupByName(instance_id)
+ def get_disks(self, instance_name):
+ """
+ Note that this function takes an instance name, not an Instance, so
+ that it can be called by monitor.
+
+ Returns a list of all block devices for this domain.
+ """
+ domain = self._conn.lookupByName(instance_name)
# TODO(devcamcar): Replace libxml2 with etree.
xml = domain.XMLDesc(0)
doc = None
@@ -346,8 +470,14 @@ class LibvirtConnection(object):
return disks
- def get_interfaces(self, instance_id):
- domain = self._conn.lookupByName(instance_id)
+ def get_interfaces(self, instance_name):
+ """
+ Note that this function takes an instance name, not an Instance, so
+ that it can be called by monitor.
+
+ Returns a list of all network interfaces for this instance.
+ """
+ domain = self._conn.lookupByName(instance_name)
# TODO(devcamcar): Replace libxml2 with etree.
xml = domain.XMLDesc(0)
doc = None
@@ -382,10 +512,210 @@ class LibvirtConnection(object):
return interfaces
- def block_stats(self, instance_id, disk):
- domain = self._conn.lookupByName(instance_id)
+ def block_stats(self, instance_name, disk):
+ """
+ Note that this function takes an instance name, not an Instance, so
+ that it can be called by monitor.
+ """
+ domain = self._conn.lookupByName(instance_name)
return domain.blockStats(disk)
- def interface_stats(self, instance_id, interface):
- domain = self._conn.lookupByName(instance_id)
+ def interface_stats(self, instance_name, interface):
+ """
+ Note that this function takes an instance name, not an Instance, so
+ that it can be called by monitor.
+ """
+ domain = self._conn.lookupByName(instance_name)
return domain.interfaceStats(interface)
+
+
+ def refresh_security_group(self, security_group_id):
+ fw = NWFilterFirewall(self._conn)
+ fw.ensure_security_group_filter(security_group_id)
+
+
+class NWFilterFirewall(object):
+ """
+ This class implements a network filtering mechanism versatile
+ enough for EC2 style Security Group filtering by leveraging
+ libvirt's nwfilter.
+
+ First, all instances get a filter ("nova-base-filter") applied.
+ This filter drops all incoming ipv4 and ipv6 connections.
+ Outgoing connections are never blocked.
+
+ Second, every security group maps to a nwfilter filter(*).
+ NWFilters can be updated at runtime and changes are applied
+ immediately, so changes to security groups can be applied at
+ runtime (as mandated by the spec).
+
+ Security group rules are named "nova-secgroup-<id>" where <id>
+ is the internal id of the security group. They're applied only on
+ hosts that have instances in the security group in question.
+
+ Updates to security groups are done by updating the data model
+ (in response to API calls) followed by a request sent to all
+ the nodes with instances in the security group to refresh the
+ security group.
+
+ Each instance has its own NWFilter, which references the above
+ mentioned security group NWFilters. This was done because
+ interfaces can only reference one filter while filters can
+ reference multiple other filters. This has the added benefit of
+ actually being able to add and remove security groups from an
+ instance at run time. This functionality is not exposed anywhere,
+ though.
+
+ Outstanding questions:
+
+ The name is unique, so would there be any good reason to sync
+ the uuid across the nodes (by assigning it from the datamodel)?
+
+
+ (*) This sentence brought to you by the redundancy department of
+ redundancy.
+ """
+
+ def __init__(self, get_connection):
+ self._conn = get_connection
+
+
+ nova_base_filter = '''<filter name='nova-base' chain='root'>
+ <uuid>26717364-50cf-42d1-8185-29bf893ab110</uuid>
+ <filterref filter='no-mac-spoofing'/>
+ <filterref filter='no-ip-spoofing'/>
+ <filterref filter='no-arp-spoofing'/>
+ <filterref filter='allow-dhcp-server'/>
+ <filterref filter='nova-allow-dhcp-server'/>
+ <filterref filter='nova-base-ipv4'/>
+ <filterref filter='nova-base-ipv6'/>
+ </filter>'''
+
+ nova_dhcp_filter = '''<filter name='nova-allow-dhcp-server' chain='ipv4'>
+ <uuid>891e4787-e5c0-d59b-cbd6-41bc3c6b36fc</uuid>
+ <rule action='accept' direction='out'
+ priority='100'>
+ <udp srcipaddr='0.0.0.0'
+ dstipaddr='255.255.255.255'
+ srcportstart='68'
+ dstportstart='67'/>
+ </rule>
+ <rule action='accept' direction='in' priority='100'>
+ <udp srcipaddr='$DHCPSERVER'
+ srcportstart='67'
+ dstportstart='68'/>
+ </rule>
+ </filter>'''
+
+ def nova_base_ipv4_filter(self):
+ retval = "<filter name='nova-base-ipv4' chain='ipv4'>"
+ for protocol in ['tcp', 'udp', 'icmp']:
+ for direction,action,priority in [('out','accept', 399),
+ ('inout','drop', 400)]:
+ retval += """<rule action='%s' direction='%s' priority='%d'>
+ <%s />
+ </rule>""" % (action, direction,
+ priority, protocol)
+ retval += '</filter>'
+ return retval
+
+
+ def nova_base_ipv6_filter(self):
+ retval = "<filter name='nova-base-ipv6' chain='ipv6'>"
+ for protocol in ['tcp', 'udp', 'icmp']:
+ for direction,action,priority in [('out','accept',399),
+ ('inout','drop',400)]:
+ retval += """<rule action='%s' direction='%s' priority='%d'>
+ <%s-ipv6 />
+ </rule>""" % (action, direction,
+ priority, protocol)
+ retval += '</filter>'
+ return retval
+
+
+ def nova_project_filter(self, project, net, mask):
+ retval = "<filter name='nova-project-%s' chain='ipv4'>" % project
+ for protocol in ['tcp', 'udp', 'icmp']:
+ retval += """<rule action='accept' direction='in' priority='200'>
+ <%s srcipaddr='%s' srcipmask='%s' />
+ </rule>""" % (protocol, net, mask)
+ retval += '</filter>'
+ return retval
+
+
+ def _define_filter(self, xml):
+ if callable(xml):
+ xml = xml()
+ d = threads.deferToThread(self._conn.nwfilterDefineXML, xml)
+ return d
+
+
+ @staticmethod
+ def _get_net_and_mask(cidr):
+ net = IPy.IP(cidr)
+ return str(net.net()), str(net.netmask())
+
+ @defer.inlineCallbacks
+ def setup_nwfilters_for_instance(self, instance):
+ """
+ Creates an NWFilter for the given instance. In the process,
+ it makes sure the filters for the security groups as well as
+ the base filter are all in place.
+ """
+
+ yield self._define_filter(self.nova_base_ipv4_filter)
+ yield self._define_filter(self.nova_base_ipv6_filter)
+ yield self._define_filter(self.nova_dhcp_filter)
+ yield self._define_filter(self.nova_base_filter)
+
+ nwfilter_xml = ("<filter name='nova-instance-%s' chain='root'>\n" +
+ " <filterref filter='nova-base' />\n"
+ ) % instance['name']
+
+ if FLAGS.allow_project_net_traffic:
+ network_ref = db.project_get_network({}, instance['project_id'])
+ net, mask = self._get_net_and_mask(network_ref['cidr'])
+ project_filter = self.nova_project_filter(instance['project_id'],
+ net, mask)
+ yield self._define_filter(project_filter)
+
+ nwfilter_xml += (" <filterref filter='nova-project-%s' />\n"
+ ) % instance['project_id']
+
+ for security_group in instance.security_groups:
+ yield self.ensure_security_group_filter(security_group['id'])
+
+ nwfilter_xml += (" <filterref filter='nova-secgroup-%d' />\n"
+ ) % security_group['id']
+ nwfilter_xml += "</filter>"
+
+ yield self._define_filter(nwfilter_xml)
+ return
+
+ def ensure_security_group_filter(self, security_group_id):
+ return self._define_filter(
+ self.security_group_to_nwfilter_xml(security_group_id))
+
+
+ def security_group_to_nwfilter_xml(self, security_group_id):
+ security_group = db.security_group_get({}, security_group_id)
+ rule_xml = ""
+ for rule in security_group.rules:
+ rule_xml += "<rule action='accept' direction='in' priority='300'>"
+ if rule.cidr:
+ net, mask = self._get_net_and_mask(rule.cidr)
+ rule_xml += "<%s srcipaddr='%s' srcipmask='%s' " % (rule.protocol, net, mask)
+ if rule.protocol in ['tcp', 'udp']:
+ rule_xml += "dstportstart='%s' dstportend='%s' " % \
+ (rule.from_port, rule.to_port)
+ elif rule.protocol == 'icmp':
+ logging.info('rule.protocol: %r, rule.from_port: %r, rule.to_port: %r' % (rule.protocol, rule.from_port, rule.to_port))
+ if rule.from_port != -1:
+ rule_xml += "type='%s' " % rule.from_port
+ if rule.to_port != -1:
+ rule_xml += "code='%s' " % rule.to_port
+
+ rule_xml += '/>\n'
+ rule_xml += "</rule>\n"
+ xml = '''<filter name='nova-secgroup-%s' chain='ipv4'>%s</filter>''' % (security_group_id, rule_xml,)
+ return xml
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
index 2f5994983..04e830b64 100644
--- a/nova/virt/xenapi.py
+++ b/nova/virt/xenapi.py
@@ -16,18 +16,38 @@
"""
A connection to XenServer or Xen Cloud Platform.
+
+The concurrency model for this class is as follows:
+
+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
+deferredToThread). They are remote calls, and so may hang for the usual
+reasons. They should not be allowed to block the reactor thread.
+
+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion. Polling is handled using reactor.callLater.
+
+This combination of techniques means that we don't block the reactor thread at
+all, and at the same time we don't hold lots of threads waiting for
+long-running operations.
+
+FIXME: get_info currently doesn't conform to these rules, and will block the
+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import logging
import xmlrpclib
from twisted.internet import defer
+from twisted.internet import reactor
from twisted.internet import task
-from nova import exception
+from nova import db
from nova import flags
from nova import process
+from nova import utils
from nova.auth.manager import AuthManager
+from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import images
@@ -47,6 +67,11 @@ flags.DEFINE_string('xenapi_connection_password',
None,
'Password for connection to XenServer/Xen Cloud Platform.'
' Used only if connection_type=xenapi.')
+flags.DEFINE_float('xenapi_task_poll_interval',
+ 0.5,
+ 'The interval used for polling of remote tasks '
+ '(Async.VM.start, etc). Used only if '
+ 'connection_type=xenapi.')
XENAPI_POWER_STATE = {
@@ -80,48 +105,46 @@ class XenAPIConnection(object):
self._conn.login_with_password(user, pw)
def list_instances(self):
- result = [self._conn.xenapi.VM.get_name_label(vm) \
- for vm in self._conn.xenapi.VM.get_all()]
+ return [self._conn.xenapi.VM.get_name_label(vm) \
+ for vm in self._conn.xenapi.VM.get_all()]
@defer.inlineCallbacks
- @exception.wrap_exception
def spawn(self, instance):
- vm = yield self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
- if 'bridge_name' in instance.datamodel:
- network_ref = \
- yield self._find_network_with_bridge(
- instance.datamodel['bridge_name'])
- else:
- network_ref = None
-
- if 'mac_address' in instance.datamodel:
- mac_address = instance.datamodel['mac_address']
- else:
- mac_address = ''
-
- user = AuthManager().get_user(instance.datamodel['user_id'])
- project = AuthManager().get_project(instance.datamodel['project_id'])
- vdi_uuid = yield self.fetch_image(
- instance.datamodel['image_id'], user, project, True)
- kernel = yield self.fetch_image(
- instance.datamodel['kernel_id'], user, project, False)
- ramdisk = yield self.fetch_image(
- instance.datamodel['ramdisk_id'], user, project, False)
- vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)
-
- vm_ref = yield self.create_vm(instance, kernel, ramdisk)
- yield self.create_vbd(vm_ref, vdi_ref, 0, True)
+ network = db.project_get_network(None, instance.project_id)
+ network_ref = \
+ yield self._find_network_with_bridge(network.bridge)
+
+ user = AuthManager().get_user(instance.user_id)
+ project = AuthManager().get_project(instance.project_id)
+ vdi_uuid = yield self._fetch_image(
+ instance.image_id, user, project, True)
+ kernel = yield self._fetch_image(
+ instance.kernel_id, user, project, False)
+ ramdisk = yield self._fetch_image(
+ instance.ramdisk_id, user, project, False)
+ vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
+
+ vm_ref = yield self._create_vm(instance, kernel, ramdisk)
+ yield self._create_vbd(vm_ref, vdi_ref, 0, True)
if network_ref:
- yield self._create_vif(vm_ref, network_ref, mac_address)
- yield self._conn.xenapi.VM.start(vm_ref, False, False)
+ yield self._create_vif(vm_ref, network_ref, instance.mac_address)
+ logging.debug('Starting VM %s...', vm_ref)
+ yield self._call_xenapi('VM.start', vm_ref, False, False)
+ logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
- def create_vm(self, instance, kernel, ramdisk):
- mem = str(long(instance.datamodel['memory_kb']) * 1024)
- vcpus = str(instance.datamodel['vcpus'])
+ @defer.inlineCallbacks
+ def _create_vm(self, instance, kernel, ramdisk):
+ """Create a VM record. Returns a Deferred that gives the new
+ VM reference."""
+
+ instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
+ mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
+ vcpus = str(instance_type['vcpus'])
rec = {
'name_label': instance.name,
'name_description': '',
@@ -152,11 +175,15 @@ class XenAPIConnection(object):
'other_config': {},
}
logging.debug('Created VM %s...', instance.name)
- vm_ref = self._conn.xenapi.VM.create(rec)
+ vm_ref = yield self._call_xenapi('VM.create', rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- return vm_ref
+ defer.returnValue(vm_ref)
- def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
+ @defer.inlineCallbacks
+ def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
+ """Create a VBD record. Returns a Deferred that gives the new
+ VBD reference."""
+
vbd_rec = {}
vbd_rec['VM'] = vm_ref
vbd_rec['VDI'] = vdi_ref
@@ -171,12 +198,16 @@ class XenAPIConnection(object):
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)
+ vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
- return vbd_ref
+ defer.returnValue(vbd_ref)
+ @defer.inlineCallbacks
def _create_vif(self, vm_ref, network_ref, mac_address):
+ """Create a VIF record. Returns a Deferred that gives the new
+ VIF reference."""
+
vif_rec = {}
vif_rec['device'] = '0'
vif_rec['network']= network_ref
@@ -188,25 +219,29 @@ class XenAPIConnection(object):
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
- vif_ref = self._conn.xenapi.VIF.create(vif_rec)
+ vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
- return vif_ref
+ defer.returnValue(vif_ref)
+ @defer.inlineCallbacks
def _find_network_with_bridge(self, bridge):
expr = 'field "bridge" = "%s"' % bridge
- networks = self._conn.xenapi.network.get_all_records_where(expr)
+ networks = yield self._call_xenapi('network.get_all_records_where',
+ expr)
if len(networks) == 1:
- return networks.keys()[0]
+ defer.returnValue(networks.keys()[0])
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:
raise Exception('Found no network for bridge %s' % bridge)
- def fetch_image(self, image, user, project, use_sr):
+ @defer.inlineCallbacks
+ def _fetch_image(self, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
- its kernel and ramdisk (if external kernels are being used)."""
+ its kernel and ramdisk (if external kernels are being used).
+ Returns a Deferred that gives the new VDI UUID."""
url = images.image_url(image)
access = AuthManager().get_access_key(user, project)
@@ -218,22 +253,38 @@ class XenAPIConnection(object):
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
- return self._call_plugin('objectstore', fn, args)
+ task = yield self._async_call_plugin('objectstore', fn, args)
+ uuid = yield self._wait_for_task(task)
+ defer.returnValue(uuid)
+ @defer.inlineCallbacks
def reboot(self, instance):
- vm = self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
- yield self._conn.xenapi.VM.clean_reboot(vm)
+ task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
+ yield self._wait_for_task(task)
+ @defer.inlineCallbacks
def destroy(self, instance):
- vm = self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is None:
- raise Exception('instance not present %s' % instance.name)
- yield self._conn.xenapi.VM.destroy(vm)
+ # Don't complain, just return. This lets us clean up instances
+ # that have already disappeared from the underlying platform.
+ defer.returnValue(None)
+ try:
+ task = yield self._call_xenapi('Async.VM.hard_shutdown', vm)
+ yield self._wait_for_task(task)
+ except Exception, exc:
+ logging.warn(exc)
+ try:
+ task = yield self._call_xenapi('Async.VM.destroy', vm)
+ yield self._wait_for_task(task)
+ except Exception, exc:
+ logging.warn(exc)
def get_info(self, instance_id):
- vm = self.lookup(instance_id)
+ vm = self._lookup_blocking(instance_id)
if vm is None:
raise Exception('instance not present %s' % instance_id)
rec = self._conn.xenapi.VM.get_record(vm)
@@ -243,7 +294,14 @@ class XenAPIConnection(object):
'num_cpu': rec['VCPUs_max'],
'cpu_time': 0}
- def lookup(self, i):
+ def get_console_output(self, instance):
+ return 'FAKE CONSOLE OUTPUT'
+
+ @utils.deferredToThread
+ def _lookup(self, i):
+ return self._lookup_blocking(i)
+
+ def _lookup_blocking(self, i):
vms = self._conn.xenapi.VM.get_by_name_label(i)
n = len(vms)
if n == 0:
@@ -253,9 +311,52 @@ class XenAPIConnection(object):
else:
return vms[0]
- def _call_plugin(self, plugin, fn, args):
+ def _wait_for_task(self, task):
+ """Return a Deferred that will give the result of the given task.
+ The task is polled until it completes."""
+ d = defer.Deferred()
+ reactor.callLater(0, self._poll_task, task, d)
+ return d
+
+ @utils.deferredToThread
+ def _poll_task(self, task, deferred):
+ """Poll the given XenAPI task, and fire the given Deferred if we
+ get a result."""
+ try:
+ #logging.debug('Polling task %s...', task)
+ status = self._conn.xenapi.task.get_status(task)
+ if status == 'pending':
+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
+ self._poll_task, task, deferred)
+ elif status == 'success':
+ result = self._conn.xenapi.task.get_result(task)
+ logging.info('Task %s status: success. %s', task, result)
+ deferred.callback(_parse_xmlrpc_value(result))
+ else:
+ error_info = self._conn.xenapi.task.get_error_info(task)
+ logging.warn('Task %s status: %s. %s', task, status,
+ error_info)
+ deferred.errback(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
+ except Exception, exc:
+ logging.warn(exc)
+ deferred.errback(exc)
+
+ @utils.deferredToThread
+ def _call_xenapi(self, method, *args):
+ """Call the specified XenAPI method on a background thread. Returns
+ a Deferred for the result."""
+ f = self._conn.xenapi
+ for m in method.split('.'):
+ f = f.__getattr__(m)
+ return f(*args)
+
+ @utils.deferredToThread
+ def _async_call_plugin(self, plugin, fn, args):
+ """Call Async.host.call_plugin on a background thread. Returns a
+ Deferred with the task reference."""
return _unwrap_plugin_exceptions(
- self._conn.xenapi.host.call_plugin,
+ self._conn.xenapi.Async.host.call_plugin,
self._get_xenapi_host(), plugin, fn, args)
def _get_xenapi_host(self):
@@ -265,19 +366,31 @@ class XenAPIConnection(object):
def _unwrap_plugin_exceptions(func, *args, **kwargs):
try:
return func(*args, **kwargs)
- except XenAPI.Failure, exn:
- logging.debug("Got exception: %s", exn)
- if (len(exn.details) == 4 and
- exn.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
- exn.details[2] == 'Failure'):
+ except XenAPI.Failure, exc:
+ logging.debug("Got exception: %s", exc)
+ if (len(exc.details) == 4 and
+ exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
+ exc.details[2] == 'Failure'):
params = None
try:
- params = eval(exn.details[3])
+ params = eval(exc.details[3])
except:
- raise exn
+ raise exc
raise XenAPI.Failure(params)
else:
raise
- except xmlrpclib.ProtocolError, exn:
- logging.debug("Got exception: %s", exn)
+ except xmlrpclib.ProtocolError, exc:
+ logging.debug("Got exception: %s", exc)
raise
+
+
+def _parse_xmlrpc_value(val):
+ """Parse the given value as if it were an XML-RPC value. This is
+ sometimes used as the format for the task.result field."""
+ if not val:
+ return val
+ x = xmlrpclib.loads(
+ '<?xml version="1.0"?><methodResponse><params><param>' +
+ val +
+ '</param></params></methodResponse>')
+ return x[0][0]