summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
authorSoren Hansen <soren.hansen@rackspace.com>2010-12-14 18:11:30 +0100
committerSoren Hansen <soren.hansen@rackspace.com>2010-12-14 18:11:30 +0100
commit31c1407f7fe39b5a366e75b3131769efbfb50b57 (patch)
tree44a5e436b947934676cd1e3b7f1d0b1f28a062a8 /nova/virt
parentbfe019e0de486eea09e4702262cd228791a4694c (diff)
parent911f11139926a772a5d90b8ec65cc4cb7ad850e9 (diff)
downloadnova-31c1407f7fe39b5a366e75b3131769efbfb50b57.tar.gz
nova-31c1407f7fe39b5a366e75b3131769efbfb50b57.tar.xz
nova-31c1407f7fe39b5a366e75b3131769efbfb50b57.zip
Merge trunk
Diffstat (limited to 'nova/virt')
-rw-r--r--nova/virt/connection.py4
-rw-r--r--nova/virt/libvirt_conn.py18
-rw-r--r--nova/virt/xenapi.py444
-rw-r--r--nova/virt/xenapi/__init__.py15
-rw-r--r--nova/virt/xenapi/network_utils.py45
-rw-r--r--nova/virt/xenapi/vm_utils.py268
-rw-r--r--nova/virt/xenapi/vmops.py146
-rw-r--r--nova/virt/xenapi/volumeops.py32
-rw-r--r--nova/virt/xenapi_conn.py242
9 files changed, 759 insertions, 455 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 11f0fa8ce..c40bb4bb4 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -25,7 +25,7 @@ import sys
from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
-from nova.virt import xenapi
+from nova.virt import xenapi_conn
FLAGS = flags.FLAGS
@@ -61,7 +61,7 @@ def get_connection(read_only=False):
elif t == 'libvirt':
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
- conn = xenapi.get_connection(read_only)
+ conn = xenapi_conn.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 2865c18ac..5dcb05b1f 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -421,14 +421,14 @@ class LibvirtConnection(object):
yield images.fetch(inst.image_id, basepath('disk-raw'), user,
project)
- using_kernel = inst.kernel_id
- if using_kernel:
+ if inst.kernel_id:
if not os.path.exists(basepath('kernel')):
- yield images.fetch(inst.kernel_id, basepath('kernel'), user,
- project)
- if not os.path.exists(basepath('ramdisk')):
- yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user,
- project)
+ yield images.fetch(inst.kernel_id, basepath('kernel'),
+ user, project)
+ if inst.ramdisk_id:
+ if not os.path.exists(basepath('ramdisk')):
+ yield images.fetch(inst.ramdisk_id, basepath('ramdisk'),
+ user, project)
execute = lambda cmd, process_input = None, check_exit_code = True: \
process.simple_execute(cmd=cmd,
@@ -439,7 +439,7 @@ class LibvirtConnection(object):
# partitioned disk image where the target partition is the first
# partition
target_partition = None
- if not using_kernel:
+ if not inst.kernel_id:
target_partition = "1"
key = str(inst['key_data'])
@@ -472,7 +472,7 @@ class LibvirtConnection(object):
' into image %s (%s)',
inst['name'], inst.image_id, e)
- if using_kernel:
+ if inst.kernel_id:
if os.path.exists(basepath('disk')):
yield process.simple_execute('rm -f %s' % basepath('disk'))
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
deleted file mode 100644
index 3169562a5..000000000
--- a/nova/virt/xenapi.py
+++ /dev/null
@@ -1,444 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (c) 2010 Citrix Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-A connection to XenServer or Xen Cloud Platform.
-
-The concurrency model for this class is as follows:
-
-All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
-deferredToThread). They are remote calls, and so may hang for the usual
-reasons. They should not be allowed to block the reactor thread.
-
-All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion. Polling is handled using reactor.callLater.
-
-This combination of techniques means that we don't block the reactor thread at
-all, and at the same time we don't hold lots of threads waiting for
-long-running operations.
-
-FIXME: get_info currently doesn't conform to these rules, and will block the
-reactor thread if the VM.get_by_name_label or VM.get_record calls block.
-
-**Related Flags**
-
-:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
-:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
- Platform (default: root).
-:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
- Platform.
-:xenapi_task_poll_interval: The interval (seconds) used for polling of
- remote tasks (Async.VM.start, etc)
- (default: 0.5).
-
-"""
-
-import logging
-import xmlrpclib
-
-from twisted.internet import defer
-from twisted.internet import reactor
-from twisted.internet import task
-
-from nova import db
-from nova import flags
-from nova import process
-from nova import utils
-from nova.auth.manager import AuthManager
-from nova.compute import instance_types
-from nova.compute import power_state
-from nova.virt import images
-
-XenAPI = None
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string('xenapi_connection_url',
- None,
- 'URL for connection to XenServer/Xen Cloud Platform.'
- ' Required if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_username',
- 'root',
- 'Username for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_password',
- None,
- 'Password for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_float('xenapi_task_poll_interval',
- 0.5,
- 'The interval used for polling of remote tasks '
- '(Async.VM.start, etc). Used only if '
- 'connection_type=xenapi.')
-
-
-XENAPI_POWER_STATE = {
- 'Halted': power_state.SHUTDOWN,
- 'Running': power_state.RUNNING,
- 'Paused': power_state.PAUSED,
- 'Suspended': power_state.SHUTDOWN, # FIXME
- 'Crashed': power_state.CRASHED}
-
-
-def get_connection(_):
- """Note that XenAPI doesn't have a read-only connection mode, so
- the read_only parameter is ignored."""
- # This is loaded late so that there's no need to install this
- # library when not using XenAPI.
- global XenAPI
- if XenAPI is None:
- XenAPI = __import__('XenAPI')
- url = FLAGS.xenapi_connection_url
- username = FLAGS.xenapi_connection_username
- password = FLAGS.xenapi_connection_password
- if not url or password is None:
- raise Exception('Must specify xenapi_connection_url, '
- 'xenapi_connection_username (optionally), and '
- 'xenapi_connection_password to use '
- 'connection_type=xenapi')
- return XenAPIConnection(url, username, password)
-
-
-class XenAPIConnection(object):
- def __init__(self, url, user, pw):
- self._conn = XenAPI.Session(url)
- self._conn.login_with_password(user, pw)
-
- def list_instances(self):
- return [self._conn.xenapi.VM.get_name_label(vm) \
- for vm in self._conn.xenapi.VM.get_all()]
-
- @defer.inlineCallbacks
- def spawn(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is not None:
- raise Exception('Attempted to create non-unique name %s' %
- instance.name)
-
- network = db.project_get_network(None, instance.project_id)
- network_ref = \
- yield self._find_network_with_bridge(network.bridge)
-
- user = AuthManager().get_user(instance.user_id)
- project = AuthManager().get_project(instance.project_id)
- vdi_uuid = yield self._fetch_image(
- instance.image_id, user, project, True)
- kernel = yield self._fetch_image(
- instance.kernel_id, user, project, False)
- ramdisk = yield self._fetch_image(
- instance.ramdisk_id, user, project, False)
- vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
-
- vm_ref = yield self._create_vm(instance, kernel, ramdisk)
- yield self._create_vbd(vm_ref, vdi_ref, 0, True)
- if network_ref:
- yield self._create_vif(vm_ref, network_ref, instance.mac_address)
- logging.debug('Starting VM %s...', vm_ref)
- yield self._call_xenapi('VM.start', vm_ref, False, False)
- logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
-
- @defer.inlineCallbacks
- def _create_vm(self, instance, kernel, ramdisk):
- """Create a VM record. Returns a Deferred that gives the new
- VM reference."""
-
- instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
- mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
- vcpus = str(instance_type['vcpus'])
- rec = {
- 'name_label': instance.name,
- 'name_description': '',
- 'is_a_template': False,
- 'memory_static_min': '0',
- 'memory_static_max': mem,
- 'memory_dynamic_min': mem,
- 'memory_dynamic_max': mem,
- 'VCPUs_at_startup': vcpus,
- 'VCPUs_max': vcpus,
- 'VCPUs_params': {},
- 'actions_after_shutdown': 'destroy',
- 'actions_after_reboot': 'restart',
- 'actions_after_crash': 'destroy',
- 'PV_bootloader': '',
- 'PV_kernel': kernel,
- 'PV_ramdisk': ramdisk,
- 'PV_args': 'root=/dev/xvda1',
- 'PV_bootloader_args': '',
- 'PV_legacy_args': '',
- 'HVM_boot_policy': '',
- 'HVM_boot_params': {},
- 'platform': {},
- 'PCI_bus': '',
- 'recommendations': '',
- 'affinity': '',
- 'user_version': '0',
- 'other_config': {},
- }
- logging.debug('Created VM %s...', instance.name)
- vm_ref = yield self._call_xenapi('VM.create', rec)
- logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- defer.returnValue(vm_ref)
-
- @defer.inlineCallbacks
- def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
- """Create a VBD record. Returns a Deferred that gives the new
- VBD reference."""
-
- vbd_rec = {}
- vbd_rec['VM'] = vm_ref
- vbd_rec['VDI'] = vdi_ref
- vbd_rec['userdevice'] = str(userdevice)
- vbd_rec['bootable'] = bootable
- vbd_rec['mode'] = 'RW'
- vbd_rec['type'] = 'disk'
- vbd_rec['unpluggable'] = True
- vbd_rec['empty'] = False
- vbd_rec['other_config'] = {}
- vbd_rec['qos_algorithm_type'] = ''
- vbd_rec['qos_algorithm_params'] = {}
- vbd_rec['qos_supported_algorithms'] = []
- logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
- logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
- vdi_ref)
- defer.returnValue(vbd_ref)
-
- @defer.inlineCallbacks
- def _create_vif(self, vm_ref, network_ref, mac_address):
- """Create a VIF record. Returns a Deferred that gives the new
- VIF reference."""
-
- vif_rec = {}
- vif_rec['device'] = '0'
- vif_rec['network'] = network_ref
- vif_rec['VM'] = vm_ref
- vif_rec['MAC'] = mac_address
- vif_rec['MTU'] = '1500'
- vif_rec['other_config'] = {}
- vif_rec['qos_algorithm_type'] = ''
- vif_rec['qos_algorithm_params'] = {}
- logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
- network_ref)
- vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
- logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
- vm_ref, network_ref)
- defer.returnValue(vif_ref)
-
- @defer.inlineCallbacks
- def _find_network_with_bridge(self, bridge):
- expr = 'field "bridge" = "%s"' % bridge
- networks = yield self._call_xenapi('network.get_all_records_where',
- expr)
- if len(networks) == 1:
- defer.returnValue(networks.keys()[0])
- elif len(networks) > 1:
- raise Exception('Found non-unique network for bridge %s' % bridge)
- else:
- raise Exception('Found no network for bridge %s' % bridge)
-
- @defer.inlineCallbacks
- def _fetch_image(self, image, user, project, use_sr):
- """use_sr: True to put the image as a VDI in an SR, False to place
- it on dom0's filesystem. The former is for VM disks, the latter for
- its kernel and ramdisk (if external kernels are being used).
- Returns a Deferred that gives the new VDI UUID."""
-
- url = images.image_url(image)
- access = AuthManager().get_access_key(user, project)
- logging.debug("Asking xapi to fetch %s as %s" % (url, access))
- fn = use_sr and 'get_vdi' or 'get_kernel'
- args = {}
- args['src_url'] = url
- args['username'] = access
- args['password'] = user.secret
- if use_sr:
- args['add_partition'] = 'true'
- task = yield self._async_call_plugin('objectstore', fn, args)
- uuid = yield self._wait_for_task(task)
- defer.returnValue(uuid)
-
- @defer.inlineCallbacks
- def reboot(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- raise Exception('instance not present %s' % instance.name)
- task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
- yield self._wait_for_task(task)
-
- @defer.inlineCallbacks
- def destroy(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- # Don't complain, just return. This lets us clean up instances
- # that have already disappeared from the underlying platform.
- defer.returnValue(None)
- # Get the VDIs related to the VM
- vdis = yield self._lookup_vm_vdis(vm)
- try:
- task = yield self._call_xenapi('Async.VM.hard_shutdown', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- # Disk clean-up
- if vdis:
- for vdi in vdis:
- try:
- task = yield self._call_xenapi('Async.VDI.destroy', vdi)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- try:
- task = yield self._call_xenapi('Async.VM.destroy', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
-
- def get_info(self, instance_id):
- vm = self._lookup_blocking(instance_id)
- if vm is None:
- raise Exception('instance not present %s' % instance_id)
- rec = self._conn.xenapi.VM.get_record(vm)
- return {'state': XENAPI_POWER_STATE[rec['power_state']],
- 'max_mem': long(rec['memory_static_max']) >> 10,
- 'mem': long(rec['memory_dynamic_max']) >> 10,
- 'num_cpu': rec['VCPUs_max'],
- 'cpu_time': 0}
-
- def get_console_output(self, instance):
- return 'FAKE CONSOLE OUTPUT'
-
- @utils.deferredToThread
- def _lookup(self, i):
- return self._lookup_blocking(i)
-
- def _lookup_blocking(self, i):
- vms = self._conn.xenapi.VM.get_by_name_label(i)
- n = len(vms)
- if n == 0:
- return None
- elif n > 1:
- raise Exception('duplicate name found: %s' % i)
- else:
- return vms[0]
-
- @utils.deferredToThread
- def _lookup_vm_vdis(self, vm):
- return self._lookup_vm_vdis_blocking(vm)
-
- def _lookup_vm_vdis_blocking(self, vm):
- # Firstly we get the VBDs, then the VDIs.
- # TODO: do we leave the read-only devices?
- vbds = self._conn.xenapi.VM.get_VBDs(vm)
- vdis = []
- if vbds:
- for vbd in vbds:
- try:
- vdi = self._conn.xenapi.VBD.get_VDI(vbd)
- # Test valid VDI
- record = self._conn.xenapi.VDI.get_record(vdi)
- except Exception, exc:
- logging.warn(exc)
- else:
- vdis.append(vdi)
- if len(vdis) > 0:
- return vdis
- else:
- return None
-
- def _wait_for_task(self, task):
- """Return a Deferred that will give the result of the given task.
- The task is polled until it completes."""
- d = defer.Deferred()
- reactor.callLater(0, self._poll_task, task, d)
- return d
-
- @utils.deferredToThread
- def _poll_task(self, task, deferred):
- """Poll the given XenAPI task, and fire the given Deferred if we
- get a result."""
- try:
- #logging.debug('Polling task %s...', task)
- status = self._conn.xenapi.task.get_status(task)
- if status == 'pending':
- reactor.callLater(FLAGS.xenapi_task_poll_interval,
- self._poll_task, task, deferred)
- elif status == 'success':
- result = self._conn.xenapi.task.get_result(task)
- logging.info('Task %s status: success. %s', task, result)
- deferred.callback(_parse_xmlrpc_value(result))
- else:
- error_info = self._conn.xenapi.task.get_error_info(task)
- logging.warn('Task %s status: %s. %s', task, status,
- error_info)
- deferred.errback(XenAPI.Failure(error_info))
- #logging.debug('Polling task %s done.', task)
- except Exception, exc:
- logging.warn(exc)
- deferred.errback(exc)
-
- @utils.deferredToThread
- def _call_xenapi(self, method, *args):
- """Call the specified XenAPI method on a background thread. Returns
- a Deferred for the result."""
- f = self._conn.xenapi
- for m in method.split('.'):
- f = f.__getattr__(m)
- return f(*args)
-
- @utils.deferredToThread
- def _async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread. Returns a
- Deferred with the task reference."""
- return _unwrap_plugin_exceptions(
- self._conn.xenapi.Async.host.call_plugin,
- self._get_xenapi_host(), plugin, fn, args)
-
- def _get_xenapi_host(self):
- return self._conn.xenapi.session.get_this_host(self._conn.handle)
-
-
-def _unwrap_plugin_exceptions(func, *args, **kwargs):
- try:
- return func(*args, **kwargs)
- except XenAPI.Failure, exc:
- logging.debug("Got exception: %s", exc)
- if (len(exc.details) == 4 and
- exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
- exc.details[2] == 'Failure'):
- params = None
- try:
- params = eval(exc.details[3])
- except:
- raise exc
- raise XenAPI.Failure(params)
- else:
- raise
- except xmlrpclib.ProtocolError, exc:
- logging.debug("Got exception: %s", exc)
- raise
-
-
-def _parse_xmlrpc_value(val):
- """Parse the given value as if it were an XML-RPC value. This is
- sometimes used as the format for the task.result field."""
- if not val:
- return val
- x = xmlrpclib.loads(
- '<?xml version="1.0"?><methodResponse><params><param>' +
- val +
- '</param></params></methodResponse>')
- return x[0][0]
diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py
new file mode 100644
index 000000000..3d598c463
--- /dev/null
+++ b/nova/virt/xenapi/__init__.py
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py
new file mode 100644
index 000000000..8cb4cce3a
--- /dev/null
+++ b/nova/virt/xenapi/network_utils.py
@@ -0,0 +1,45 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of network
+records and their attributes like bridges, PIFs, QoS, as well as
+their lookup functions.
+"""
+
+from twisted.internet import defer
+
+
+class NetworkHelper():
+ """
+ The class that wraps the helper methods together.
+ """
+ def __init__(self):
+ return
+
+ @classmethod
+ @defer.inlineCallbacks
+ def find_network_with_bridge(cls, session, bridge):
+ """ Return the network on which the bridge is attached, if found """
+ expr = 'field "bridge" = "%s"' % bridge
+ networks = yield session.call_xenapi('network.get_all_records_where',
+ expr)
+ if len(networks) == 1:
+ defer.returnValue(networks.keys()[0])
+ elif len(networks) > 1:
+ raise Exception('Found non-unique network for bridge %s' % bridge)
+ else:
+ raise Exception('Found no network for bridge %s' % bridge)
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
new file mode 100644
index 000000000..77edb576e
--- /dev/null
+++ b/nova/virt/xenapi/vm_utils.py
@@ -0,0 +1,268 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of VM records and
+their attributes like VDIs, VIFs, as well as their lookup functions.
+"""
+
+import logging
+import urllib
+
+from twisted.internet import defer
+from xml.dom import minidom
+
+from nova import flags
+from nova import utils
+
+from nova.auth.manager import AuthManager
+from nova.compute import instance_types
+from nova.compute import power_state
+from nova.virt import images
+
+FLAGS = flags.FLAGS
+
+XENAPI_POWER_STATE = {
+ 'Halted': power_state.SHUTDOWN,
+ 'Running': power_state.RUNNING,
+ 'Paused': power_state.PAUSED,
+ 'Suspended': power_state.SHUTDOWN, # FIXME
+ 'Crashed': power_state.CRASHED}
+
+XenAPI = None
+
+
+class VMHelper():
+ """
+ The class that wraps the helper methods together.
+ """
+ def __init__(self):
+ return
+
+ @classmethod
+ def late_import(cls):
+ """
+ Load the XenAPI module in for helper class, if required.
+ This is to avoid to install the XenAPI library when other
+ hypervisors are used
+ """
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vm(cls, session, instance, kernel, ramdisk):
+ """Create a VM record. Returns a Deferred that gives the new
+ VM reference."""
+
+ instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
+ mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
+ vcpus = str(instance_type['vcpus'])
+ rec = {
+ 'name_label': instance.name,
+ 'name_description': '',
+ 'is_a_template': False,
+ 'memory_static_min': '0',
+ 'memory_static_max': mem,
+ 'memory_dynamic_min': mem,
+ 'memory_dynamic_max': mem,
+ 'VCPUs_at_startup': vcpus,
+ 'VCPUs_max': vcpus,
+ 'VCPUs_params': {},
+ 'actions_after_shutdown': 'destroy',
+ 'actions_after_reboot': 'restart',
+ 'actions_after_crash': 'destroy',
+ 'PV_bootloader': '',
+ 'PV_kernel': kernel,
+ 'PV_ramdisk': ramdisk,
+ 'PV_args': 'root=/dev/xvda1',
+ 'PV_bootloader_args': '',
+ 'PV_legacy_args': '',
+ 'HVM_boot_policy': '',
+ 'HVM_boot_params': {},
+ 'platform': {},
+ 'PCI_bus': '',
+ 'recommendations': '',
+ 'affinity': '',
+ 'user_version': '0',
+ 'other_config': {},
+ }
+ logging.debug('Created VM %s...', instance.name)
+ vm_ref = yield session.call_xenapi('VM.create', rec)
+ logging.debug('Created VM %s as %s.', instance.name, vm_ref)
+ defer.returnValue(vm_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable):
+ """Create a VBD record. Returns a Deferred that gives the new
+ VBD reference."""
+
+ vbd_rec = {}
+ vbd_rec['VM'] = vm_ref
+ vbd_rec['VDI'] = vdi_ref
+ vbd_rec['userdevice'] = str(userdevice)
+ vbd_rec['bootable'] = bootable
+ vbd_rec['mode'] = 'RW'
+ vbd_rec['type'] = 'disk'
+ vbd_rec['unpluggable'] = True
+ vbd_rec['empty'] = False
+ vbd_rec['other_config'] = {}
+ vbd_rec['qos_algorithm_type'] = ''
+ vbd_rec['qos_algorithm_params'] = {}
+ vbd_rec['qos_supported_algorithms'] = []
+ logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
+ vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec)
+ logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
+ vdi_ref)
+ defer.returnValue(vbd_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vif(cls, session, vm_ref, network_ref, mac_address):
+ """Create a VIF record. Returns a Deferred that gives the new
+ VIF reference."""
+
+ vif_rec = {}
+ vif_rec['device'] = '0'
+ vif_rec['network'] = network_ref
+ vif_rec['VM'] = vm_ref
+ vif_rec['MAC'] = mac_address
+ vif_rec['MTU'] = '1500'
+ vif_rec['other_config'] = {}
+ vif_rec['qos_algorithm_type'] = ''
+ vif_rec['qos_algorithm_params'] = {}
+ logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
+ network_ref)
+ vif_ref = yield session.call_xenapi('VIF.create', vif_rec)
+ logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
+ vm_ref, network_ref)
+ defer.returnValue(vif_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def fetch_image(cls, session, image, user, project, use_sr):
+ """use_sr: True to put the image as a VDI in an SR, False to place
+ it on dom0's filesystem. The former is for VM disks, the latter for
+ its kernel and ramdisk (if external kernels are being used).
+ Returns a Deferred that gives the new VDI UUID."""
+
+ url = images.image_url(image)
+ access = AuthManager().get_access_key(user, project)
+ logging.debug("Asking xapi to fetch %s as %s", url, access)
+ fn = use_sr and 'get_vdi' or 'get_kernel'
+ args = {}
+ args['src_url'] = url
+ args['username'] = access
+ args['password'] = user.secret
+ if use_sr:
+ args['add_partition'] = 'true'
+ task = yield session.async_call_plugin('objectstore', fn, args)
+ uuid = yield session.wait_for_task(task)
+ defer.returnValue(uuid)
+
+ @classmethod
+ @utils.deferredToThread
+ def lookup(cls, session, i):
+ """ Look the instance i up, and returns it if available """
+ return VMHelper.lookup_blocking(session, i)
+
+ @classmethod
+ def lookup_blocking(cls, session, i):
+ """ Synchronous lookup """
+ vms = session.get_xenapi().VM.get_by_name_label(i)
+ n = len(vms)
+ if n == 0:
+ return None
+ elif n > 1:
+ raise Exception('duplicate name found: %s' % i)
+ else:
+ return vms[0]
+
+ @classmethod
+ @utils.deferredToThread
+ def lookup_vm_vdis(cls, session, vm):
+ """ Look for the VDIs that are attached to the VM """
+ return VMHelper.lookup_vm_vdis_blocking(session, vm)
+
+ @classmethod
+ def lookup_vm_vdis_blocking(cls, session, vm):
+ """ Synchronous lookup_vm_vdis """
+ # Firstly we get the VBDs, then the VDIs.
+ # TODO(Armando): do we leave the read-only devices?
+ vbds = session.get_xenapi().VM.get_VBDs(vm)
+ vdis = []
+ if vbds:
+ for vbd in vbds:
+ try:
+ vdi = session.get_xenapi().VBD.get_VDI(vbd)
+ # Test valid VDI
+ record = session.get_xenapi().VDI.get_record(vdi)
+ logging.debug('VDI %s is still available', record['uuid'])
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ else:
+ vdis.append(vdi)
+ if len(vdis) > 0:
+ return vdis
+ else:
+ return None
+
+ @classmethod
+ def compile_info(cls, record):
+ return {'state': XENAPI_POWER_STATE[record['power_state']],
+ 'max_mem': long(record['memory_static_max']) >> 10,
+ 'mem': long(record['memory_dynamic_max']) >> 10,
+ 'num_cpu': record['VCPUs_max'],
+ 'cpu_time': 0}
+
+ @classmethod
+ def compile_diagnostics(cls, session, record):
+ """Compile VM diagnostics data"""
+ try:
+ host = session.get_xenapi_host()
+ host_ip = session.get_xenapi().host.get_record(host)["address"]
+ metrics = session.get_xenapi().VM_guest_metrics.get_record(
+ record["guest_metrics"])
+ diags = {
+ "Kernel": metrics["os_version"]["uname"],
+ "Distro": metrics["os_version"]["name"]}
+ xml = get_rrd(host_ip, record["uuid"])
+ if xml:
+ rrd = minidom.parseString(xml)
+ for i, node in enumerate(rrd.firstChild.childNodes):
+ # We don't want all of the extra garbage
+ if i >= 3 and i <= 11:
+ ref = node.childNodes
+ # Name and Value
+ diags[ref[0].firstChild.data] = ref[6].firstChild.data
+ return diags
+ except XenAPI.Failure as e:
+ return {"Unable to retrieve diagnostics": e}
+
+
+def get_rrd(host, uuid):
+ """Return the VM RRD XML as a string"""
+ try:
+ xml = urllib.urlopen("http://%s:%s@%s/vm_rrd?uuid=%s" % (
+ FLAGS.xenapi_connection_username,
+ FLAGS.xenapi_connection_password,
+ host,
+ uuid))
+ return xml.read()
+ except IOError:
+ return None
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
new file mode 100644
index 000000000..9bfd07267
--- /dev/null
+++ b/nova/virt/xenapi/vmops.py
@@ -0,0 +1,146 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for VM-related functions (spawn, reboot, etc).
+"""
+
+import logging
+
+from twisted.internet import defer
+
+from nova import db
+from nova import context
+
+from nova.auth.manager import AuthManager
+from nova.virt.xenapi.network_utils import NetworkHelper
+from nova.virt.xenapi.vm_utils import VMHelper
+
+XenAPI = None
+
+
+class VMOps(object):
+ """
+ Management class for VM-related tasks
+ """
+ def __init__(self, session):
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+ self._session = session
+ # Load XenAPI module in the helper class
+ VMHelper.late_import()
+
+ def list_instances(self):
+ """ List VM instances """
+ return [self._session.get_xenapi().VM.get_name_label(vm) \
+ for vm in self._session.get_xenapi().VM.get_all()]
+
+ @defer.inlineCallbacks
+ def spawn(self, instance):
+ """ Create VM instance """
+ vm = yield VMHelper.lookup(self._session, instance.name)
+ if vm is not None:
+ raise Exception('Attempted to create non-unique name %s' %
+ instance.name)
+
+ bridge = db.project_get_network(context.get_admin_context(),
+ instance.project_id).bridge
+ network_ref = \
+ yield NetworkHelper.find_network_with_bridge(self._session, bridge)
+
+ user = AuthManager().get_user(instance.user_id)
+ project = AuthManager().get_project(instance.project_id)
+ vdi_uuid = yield VMHelper.fetch_image(self._session,
+ instance.image_id, user, project, True)
+ kernel = yield VMHelper.fetch_image(self._session,
+ instance.kernel_id, user, project, False)
+ ramdisk = yield VMHelper.fetch_image(self._session,
+ instance.ramdisk_id, user, project, False)
+ vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
+ vm_ref = yield VMHelper.create_vm(self._session,
+ instance, kernel, ramdisk)
+ yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
+ if network_ref:
+ yield VMHelper.create_vif(self._session, vm_ref,
+ network_ref, instance.mac_address)
+ logging.debug('Starting VM %s...', vm_ref)
+ yield self._session.call_xenapi('VM.start', vm_ref, False, False)
+ logging.info('Spawning VM %s created %s.', instance.name,
+ vm_ref)
+
+ @defer.inlineCallbacks
+ def reboot(self, instance):
+ """ Reboot VM instance """
+ instance_name = instance.name
+ vm = yield VMHelper.lookup(self._session, instance_name)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_name)
+ task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm)
+ yield self._session.wait_for_task(task)
+
+ @defer.inlineCallbacks
+ def destroy(self, instance):
+ """ Destroy VM instance """
+ vm = yield VMHelper.lookup(self._session, instance.name)
+ if vm is None:
+ # Don't complain, just return. This lets us clean up instances
+ # that have already disappeared from the underlying platform.
+ defer.returnValue(None)
+ # Get the VDIs related to the VM
+ vdis = yield VMHelper.lookup_vm_vdis(self._session, vm)
+ try:
+ task = yield self._session.call_xenapi('Async.VM.hard_shutdown',
+ vm)
+ yield self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ # Disk clean-up
+ if vdis:
+ for vdi in vdis:
+ try:
+ task = yield self._session.call_xenapi('Async.VDI.destroy',
+ vdi)
+ yield self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ try:
+ task = yield self._session.call_xenapi('Async.VM.destroy', vm)
+ yield self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+
+ def get_info(self, instance_id):
+ """ Return data about VM instance """
+ vm = VMHelper.lookup_blocking(self._session, instance_id)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_id)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ return VMHelper.compile_info(rec)
+
+ @defer.inlineCallbacks
+ def get_diagnostics(self, instance_id):
+ """Return data about VM diagnostics"""
+ vm = yield VMHelper.lookup(self._session, instance_id)
+ if vm is None:
+ raise Exception("instance not present %s" % instance_id)
+ rec = yield self._session.get_xenapi().VM.get_record(vm)
+ defer.returnValue(VMHelper.compile_diagnostics(self._session, rec))
+
+ def get_console_output(self, instance):
+ """ Return snapshot of console """
+ # TODO: implement this to fix pylint!
+ return 'FAKE CONSOLE OUTPUT of instance'
diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py
new file mode 100644
index 000000000..a4c7a3861
--- /dev/null
+++ b/nova/virt/xenapi/volumeops.py
@@ -0,0 +1,32 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for Storage-related functions (attach, detach, etc).
+"""
+
+
+class VolumeOps(object):
+ def __init__(self, session):
+ self._session = session
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ # FIXME: that's going to be sorted when iscsi-xenapi lands in branch
+ return True
+
+ def detach_volume(self, instance_name, mountpoint):
+ # FIXME: that's going to be sorted when iscsi-xenapi lands in branch
+ return True
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
new file mode 100644
index 000000000..2153810c8
--- /dev/null
+++ b/nova/virt/xenapi_conn.py
@@ -0,0 +1,242 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A connection to XenServer or Xen Cloud Platform.
+
+The concurrency model for this class is as follows:
+
+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
+deferredToThread). They are remote calls, and so may hang for the usual
+reasons. They should not be allowed to block the reactor thread.
+
+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion. Polling is handled using reactor.callLater.
+
+This combination of techniques means that we don't block the reactor thread at
+all, and at the same time we don't hold lots of threads waiting for
+long-running operations.
+
+FIXME: get_info currently doesn't conform to these rules, and will block the
+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
+
+**Related Flags**
+
+:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
+:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
+ Platform (default: root).
+:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
+ Platform.
+:xenapi_task_poll_interval: The interval (seconds) used for polling of
+ remote tasks (Async.VM.start, etc)
+ (default: 0.5).
+
+"""
+
+import logging
+import xmlrpclib
+
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from nova import utils
+from nova import flags
+from nova.virt.xenapi.vmops import VMOps
+from nova.virt.xenapi.volumeops import VolumeOps
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('xenapi_connection_url',
+ None,
+ 'URL for connection to XenServer/Xen Cloud Platform.'
+ ' Required if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_username',
+ 'root',
+ 'Username for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_password',
+ None,
+ 'Password for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_float('xenapi_task_poll_interval',
+ 0.5,
+ 'The interval used for polling of remote tasks '
+ '(Async.VM.start, etc). Used only if '
+ 'connection_type=xenapi.')
+
+XenAPI = None
+
+
+def get_connection(_):
+ """Note that XenAPI doesn't have a read-only connection mode, so
+ the read_only parameter is ignored."""
+ # This is loaded late so that there's no need to install this
+ # library when not using XenAPI.
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+ url = FLAGS.xenapi_connection_url
+ username = FLAGS.xenapi_connection_username
+ password = FLAGS.xenapi_connection_password
+ if not url or password is None:
+ raise Exception('Must specify xenapi_connection_url, '
+ 'xenapi_connection_username (optionally), and '
+ 'xenapi_connection_password to use '
+ 'connection_type=xenapi')
+ return XenAPIConnection(url, username, password)
+
+
+class XenAPIConnection(object):
+ """ A connection to XenServer or Xen Cloud Platform """
+ def __init__(self, url, user, pw):
+ session = XenAPISession(url, user, pw)
+ self._vmops = VMOps(session)
+ self._volumeops = VolumeOps(session)
+
+ def list_instances(self):
+ """ List VM instances """
+ return self._vmops.list_instances()
+
+ def spawn(self, instance):
+ """ Create VM instance """
+ self._vmops.spawn(instance)
+
+ def reboot(self, instance):
+ """ Reboot VM instance """
+ self._vmops.reboot(instance)
+
+ def destroy(self, instance):
+ """ Destroy VM instance """
+ self._vmops.destroy(instance)
+
+ def get_info(self, instance_id):
+ """ Return data about VM instance """
+ return self._vmops.get_info(instance_id)
+
+ def get_diagnostics(self, instance_id):
+ """Return data about VM diagnostics"""
+ return self._vmops.get_diagnostics(instance_id)
+
+ def get_console_output(self, instance):
+ """ Return snapshot of console """
+ return self._vmops.get_console_output(instance)
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ """ Attach volume storage to VM instance """
+ return self._volumeops.attach_volume(instance_name,
+ device_path,
+ mountpoint)
+
+ def detach_volume(self, instance_name, mountpoint):
+ """ Detach volume storage to VM instance """
+ return self._volumeops.detach_volume(instance_name, mountpoint)
+
+
+class XenAPISession(object):
+ """ The session to invoke XenAPI SDK calls """
+ def __init__(self, url, user, pw):
+ self._session = XenAPI.Session(url)
+ self._session.login_with_password(user, pw)
+
+ def get_xenapi(self):
+ """ Return the xenapi object """
+ return self._session.xenapi
+
+ def get_xenapi_host(self):
+ """ Return the xenapi host """
+ return self._session.xenapi.session.get_this_host(self._session.handle)
+
+ @utils.deferredToThread
+ def call_xenapi(self, method, *args):
+ """Call the specified XenAPI method on a background thread. Returns
+ a Deferred for the result."""
+ f = self._session.xenapi
+ for m in method.split('.'):
+ f = f.__getattr__(m)
+ return f(*args)
+
+ @utils.deferredToThread
+ def async_call_plugin(self, plugin, fn, args):
+ """Call Async.host.call_plugin on a background thread. Returns a
+ Deferred with the task reference."""
+ return _unwrap_plugin_exceptions(
+ self._session.xenapi.Async.host.call_plugin,
+ self.get_xenapi_host(), plugin, fn, args)
+
+ def wait_for_task(self, task):
+ """Return a Deferred that will give the result of the given task.
+ The task is polled until it completes."""
+ d = defer.Deferred()
+ reactor.callLater(0, self._poll_task, task, d)
+ return d
+
+ @utils.deferredToThread
+ def _poll_task(self, task, deferred):
+ """Poll the given XenAPI task, and fire the given Deferred if we
+ get a result."""
+ try:
+ #logging.debug('Polling task %s...', task)
+ status = self._session.xenapi.task.get_status(task)
+ if status == 'pending':
+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
+ self._poll_task, task, deferred)
+ elif status == 'success':
+ result = self._session.xenapi.task.get_result(task)
+ logging.info('Task %s status: success. %s', task, result)
+ deferred.callback(_parse_xmlrpc_value(result))
+ else:
+ error_info = self._session.xenapi.task.get_error_info(task)
+ logging.warn('Task %s status: %s. %s', task, status,
+ error_info)
+ deferred.errback(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ deferred.errback(exc)
+
+
+def _unwrap_plugin_exceptions(func, *args, **kwargs):
+ """ Parse exception details """
+ try:
+ return func(*args, **kwargs)
+ except XenAPI.Failure, exc:
+ logging.debug("Got exception: %s", exc)
+ if (len(exc.details) == 4 and
+ exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
+ exc.details[2] == 'Failure'):
+ params = None
+ try:
+ params = eval(exc.details[3])
+ except:
+ raise exc
+ raise XenAPI.Failure(params)
+ else:
+ raise
+ except xmlrpclib.ProtocolError, exc:
+ logging.debug("Got exception: %s", exc)
+ raise
+
+
+def _parse_xmlrpc_value(val):
+ """Parse the given value as if it were an XML-RPC value. This is
+ sometimes used as the format for the task.result field."""
+ if not val:
+ return val
+ x = xmlrpclib.loads(
+ '<?xml version="1.0"?><methodResponse><params><param>' +
+ val +
+ '</param></params></methodResponse>')
+ return x[0][0]