summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
authorArmando Migliaccio <armando.migliaccio@citrix.com>2010-11-29 17:37:34 +0000
committerArmando Migliaccio <armando.migliaccio@citrix.com>2010-11-29 17:37:34 +0000
commit004704f0fec2280f75ec0cf9757008228b01410d (patch)
tree7dbbf758da9b61d451bb8c14b2c7e0a605144814 /nova/virt
parent04b1740c991d6d499364c21c2524c46ed5fc2522 (diff)
parenta82581cbada92d0e274438757f7beb3ed335da1b (diff)
brought the xenapi refactoring in plus trunk changes
Diffstat (limited to 'nova/virt')
-rw-r--r--nova/virt/connection.py4
-rw-r--r--nova/virt/xenapi.py642
-rw-r--r--nova/virt/xenapi/__init__.py15
-rw-r--r--nova/virt/xenapi/network_utils.py40
-rw-r--r--nova/virt/xenapi/novadeps.py103
-rw-r--r--nova/virt/xenapi/vm_utils.py190
-rw-r--r--nova/virt/xenapi/vmops.py123
-rw-r--r--nova/virt/xenapi/volumeops.py82
-rw-r--r--nova/virt/xenapi_conn.py227
9 files changed, 782 insertions, 644 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 11f0fa8ce..c40bb4bb4 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -25,7 +25,7 @@ import sys
from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
-from nova.virt import xenapi
+from nova.virt import xenapi_conn
FLAGS = flags.FLAGS
@@ -61,7 +61,7 @@ def get_connection(read_only=False):
elif t == 'libvirt':
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
- conn = xenapi.get_connection(read_only)
+ conn = xenapi_conn.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
deleted file mode 100644
index f2ba71306..000000000
--- a/nova/virt/xenapi.py
+++ /dev/null
@@ -1,642 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (c) 2010 Citrix Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-A connection to XenServer or Xen Cloud Platform.
-
-The concurrency model for this class is as follows:
-
-All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
-deferredToThread). They are remote calls, and so may hang for the usual
-reasons. They should not be allowed to block the reactor thread.
-
-All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion. Polling is handled using reactor.callLater.
-
-This combination of techniques means that we don't block the reactor thread at
-all, and at the same time we don't hold lots of threads waiting for
-long-running operations.
-
-FIXME: get_info currently doesn't conform to these rules, and will block the
-reactor thread if the VM.get_by_name_label or VM.get_record calls block.
-
-**Related Flags**
-
-:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
-:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
- Platform (default: root).
-:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
- Platform.
-:xenapi_task_poll_interval: The interval (seconds) used for polling of
- remote tasks (Async.VM.start, etc)
- (default: 0.5).
-
-"""
-
-import logging
-import xmlrpclib
-
-from twisted.internet import defer
-from twisted.internet import reactor
-from twisted.internet import task
-
-from nova import db
-from nova import flags
-from nova import process
-from nova import utils
-from nova.auth.manager import AuthManager
-from nova.compute import instance_types
-from nova.compute import power_state
-from nova.virt import images
-
-from xml.dom.minidom import parseString
-
-
-XenAPI = None
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string('xenapi_connection_url',
- None,
- 'URL for connection to XenServer/Xen Cloud Platform.'
- ' Required if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_username',
- 'root',
- 'Username for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_password',
- None,
- 'Password for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_float('xenapi_task_poll_interval',
- 0.5,
- 'The interval used for polling of remote tasks '
- '(Async.VM.start, etc). Used only if '
- 'connection_type=xenapi.')
-
-
-XENAPI_POWER_STATE = {
- 'Halted': power_state.SHUTDOWN,
- 'Running': power_state.RUNNING,
- 'Paused': power_state.PAUSED,
- 'Suspended': power_state.SHUTDOWN, # FIXME
- 'Crashed': power_state.CRASHED}
-
-
-def get_connection(_):
- """Note that XenAPI doesn't have a read-only connection mode, so
- the read_only parameter is ignored."""
- # This is loaded late so that there's no need to install this
- # library when not using XenAPI.
- global XenAPI
- if XenAPI is None:
- XenAPI = __import__('XenAPI')
- url = FLAGS.xenapi_connection_url
- username = FLAGS.xenapi_connection_username
- password = FLAGS.xenapi_connection_password
- if not url or password is None:
- raise Exception('Must specify xenapi_connection_url, '
- 'xenapi_connection_username (optionally), and '
- 'xenapi_connection_password to use '
- 'connection_type=xenapi')
- return XenAPIConnection(url, username, password)
-
-
-class XenAPIConnection(object):
- def __init__(self, url, user, pw):
- self._conn = XenAPI.Session(url)
- self._conn.login_with_password(user, pw)
-
- def list_instances(self):
- return [self._conn.xenapi.VM.get_name_label(vm) \
- for vm in self._conn.xenapi.VM.get_all()]
-
- @defer.inlineCallbacks
- def spawn(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is not None:
- raise Exception('Attempted to create non-unique name %s' %
- instance.name)
-
- network = db.project_get_network(None, instance.project_id)
- network_ref = \
- yield self._find_network_with_bridge(network.bridge)
-
- user = AuthManager().get_user(instance.user_id)
- project = AuthManager().get_project(instance.project_id)
- vdi_uuid = yield self._fetch_image(
- instance.image_id, user, project, True)
- kernel = yield self._fetch_image(
- instance.kernel_id, user, project, False)
- ramdisk = yield self._fetch_image(
- instance.ramdisk_id, user, project, False)
- vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
-
- vm_ref = yield self._create_vm(instance, kernel, ramdisk)
- yield self._create_vbd(vm_ref, vdi_ref, 0, True, True, False)
- if network_ref:
- yield self._create_vif(vm_ref, network_ref, instance.mac_address)
- logging.debug('Starting VM %s...', vm_ref)
- yield self._call_xenapi('VM.start', vm_ref, False, False)
- logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
-
- @defer.inlineCallbacks
- def _create_vm(self, instance, kernel, ramdisk):
- """Create a VM record. Returns a Deferred that gives the new
- VM reference."""
-
- instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
- mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
- vcpus = str(instance_type['vcpus'])
- rec = {
- 'name_label': instance.name,
- 'name_description': '',
- 'is_a_template': False,
- 'memory_static_min': '0',
- 'memory_static_max': mem,
- 'memory_dynamic_min': mem,
- 'memory_dynamic_max': mem,
- 'VCPUs_at_startup': vcpus,
- 'VCPUs_max': vcpus,
- 'VCPUs_params': {},
- 'actions_after_shutdown': 'destroy',
- 'actions_after_reboot': 'restart',
- 'actions_after_crash': 'destroy',
- 'PV_bootloader': '',
- 'PV_kernel': kernel,
- 'PV_ramdisk': ramdisk,
- 'PV_args': 'root=/dev/xvda1',
- 'PV_bootloader_args': '',
- 'PV_legacy_args': '',
- 'HVM_boot_policy': '',
- 'HVM_boot_params': {},
- 'platform': {},
- 'PCI_bus': '',
- 'recommendations': '',
- 'affinity': '',
- 'user_version': '0',
- 'other_config': {},
- }
- logging.debug('Created VM %s...', instance.name)
- vm_ref = yield self._call_xenapi('VM.create', rec)
- logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- defer.returnValue(vm_ref)
-
- @defer.inlineCallbacks
- def _create_vdi(self, sr_ref, size, type, label, description, read_only, sharable):
- """Create a VDI record. Returns a Deferred that gives the new
- VDI reference."""
-
- vdi_rec = {}
- vdi_rec['read_only'] = read_only
- vdi_rec['SR'] = sr_ref
- vdi_rec['virtual_size'] = str(size)
- vdi_rec['name_label'] = label
- vdi_rec['name_description'] = description
- vdi_rec['sharable'] = sharable
- vdi_rec['type'] = type
- vdi_rec['other_config'] = {}
- vdi_ref = yield self._call_xenapi('VDI.create', vdi_rec)
- defer.returnValue(vdi_ref)
-
- @defer.inlineCallbacks
- def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable, unpluggable, empty):
- """Create a VBD record. Returns a Deferred that gives the new
- VBD reference."""
-
- vbd_rec = {}
- vbd_rec['VM'] = vm_ref
- vbd_rec['VDI'] = vdi_ref
- vbd_rec['userdevice'] = str(userdevice)
- vbd_rec['bootable'] = bootable
- vbd_rec['mode'] = 'RW'
- vbd_rec['type'] = 'disk'
- vbd_rec['unpluggable'] = unpluggable
- vbd_rec['empty'] = empty
- vbd_rec['other_config'] = {}
- vbd_rec['qos_algorithm_type'] = ''
- vbd_rec['qos_algorithm_params'] = {}
- vbd_rec['qos_supported_algorithms'] = []
- logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
- logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
- vdi_ref)
- defer.returnValue(vbd_ref)
-
- @defer.inlineCallbacks
- def _create_vif(self, vm_ref, network_ref, mac_address):
- """Create a VIF record. Returns a Deferred that gives the new
- VIF reference."""
-
- vif_rec = {}
- vif_rec['device'] = '0'
- vif_rec['network'] = network_ref
- vif_rec['VM'] = vm_ref
- vif_rec['MAC'] = mac_address
- vif_rec['MTU'] = '1500'
- vif_rec['other_config'] = {}
- vif_rec['qos_algorithm_type'] = ''
- vif_rec['qos_algorithm_params'] = {}
- logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
- network_ref)
- vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
- logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
- vm_ref, network_ref)
- defer.returnValue(vif_ref)
-
- @defer.inlineCallbacks
- def _find_network_with_bridge(self, bridge):
- expr = 'field "bridge" = "%s"' % bridge
- networks = yield self._call_xenapi('network.get_all_records_where',
- expr)
- if len(networks) == 1:
- defer.returnValue(networks.keys()[0])
- elif len(networks) > 1:
- raise Exception('Found non-unique network for bridge %s' % bridge)
- else:
- raise Exception('Found no network for bridge %s' % bridge)
-
- @defer.inlineCallbacks
- def _fetch_image(self, image, user, project, use_sr):
- """use_sr: True to put the image as a VDI in an SR, False to place
- it on dom0's filesystem. The former is for VM disks, the latter for
- its kernel and ramdisk (if external kernels are being used).
- Returns a Deferred that gives the new VDI UUID."""
-
- url = images.image_url(image)
- access = AuthManager().get_access_key(user, project)
- logging.debug("Asking xapi to fetch %s as %s" % (url, access))
- fn = use_sr and 'get_vdi' or 'get_kernel'
- args = {}
- args['src_url'] = url
- args['username'] = access
- args['password'] = user.secret
- if use_sr:
- args['add_partition'] = 'true'
- task = yield self._async_call_plugin('objectstore', fn, args)
- uuid = yield self._wait_for_task(task)
- defer.returnValue(uuid)
-
- @defer.inlineCallbacks
- def reboot(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- raise Exception('instance not present %s' % instance.name)
- task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
- yield self._wait_for_task(task)
-
- @defer.inlineCallbacks
- def destroy(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- # Don't complain, just return. This lets us clean up instances
- # that have already disappeared from the underlying platform.
- defer.returnValue(None)
- # Get the VDIs related to the VM
- vdis = yield self._lookup_vm_vdis(vm)
- try:
- task = yield self._call_xenapi('Async.VM.hard_shutdown', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- # Disk clean-up
- if vdis:
- for vdi in vdis:
- try:
- task = yield self._call_xenapi('Async.VDI.destroy', vdi)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- try:
- task = yield self._call_xenapi('Async.VM.destroy', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
-
- @defer.inlineCallbacks
- def attach_volume(self, instance_name, device_path, mountpoint):
- # NOTE: No Resource Pool concept so far
- logging.debug("Attach_volume: %s, %s, %s",
- instance_name, device_path, mountpoint)
- volume_info = _parse_volume_info(device_path, mountpoint)
- # Create the iSCSI SR, and the PDB through which hosts access SRs.
- # But first, retrieve target info, like Host, IQN, LUN and SCSIID
- target = yield self._get_target(volume_info)
- label = 'SR-%s' % volume_info['volumeId']
- description = 'Attached-to:%s' % instance_name
- # Create SR and check the physical space available for the VDI allocation
- sr_ref = yield self._create_sr(target, label, description)
- disk_size = int(target['size'])
- #disk_size = yield self._get_sr_available_space(sr_ref)
- # Create VDI and attach VBD to VM
- vm_ref = yield self._lookup(instance_name)
- logging.debug("Mounting disk of: %s GB", (disk_size / (1024*1024*1024.0)))
- try:
- vdi_ref = yield self._create_vdi(sr_ref, disk_size,
- 'user', volume_info['volumeId'], '',
- False, False)
- except Exception, exc:
- logging.warn(exc)
- yield self._destroy_sr(sr_ref)
- raise Exception('Unable to create VDI on SR %s for instance %s'
- % (sr_ref,
- instance_name))
- else:
- try:
- userdevice = 2 # FIXME: this depends on the numbers of attached disks
- vbd_ref = yield self._create_vbd(vm_ref, vdi_ref, userdevice, False, True, False)
- except Exception, exc:
- logging.warn(exc)
- yield self._destroy_sr(sr_ref)
- raise Exception('Unable to create VBD on SR %s for instance %s'
- % (sr_ref,
- instance_name))
- else:
- try:
- raise Exception('')
- task = yield self._call_xenapi('Async.VBD.plug', vbd_ref)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- yield self._destroy_sr(sr_ref)
- raise Exception('Unable to attach volume to instance %s' % instance_name)
-
- yield True
-
- @defer.inlineCallbacks
- def detach_volume(self, instance_name, mountpoint):
- logging.debug("Detach_volume: %s, %s, %s", instance_name, mountpoint)
- # Detach VBD from VM
- # Forget SR/PDB info associated with host
- # TODO: can we avoid destroying the SR every time we detach?
- yield True
-
- def get_info(self, instance_id):
- vm = self._lookup_blocking(instance_id)
- if vm is None:
- raise Exception('instance not present %s' % instance_id)
- rec = self._conn.xenapi.VM.get_record(vm)
- return {'state': XENAPI_POWER_STATE[rec['power_state']],
- 'max_mem': long(rec['memory_static_max']) >> 10,
- 'mem': long(rec['memory_dynamic_max']) >> 10,
- 'num_cpu': rec['VCPUs_max'],
- 'cpu_time': 0}
-
- def get_console_output(self, instance):
- return 'FAKE CONSOLE OUTPUT'
-
- @utils.deferredToThread
- def _lookup(self, i):
- return self._lookup_blocking(i)
-
- def _lookup_blocking(self, i):
- vms = self._conn.xenapi.VM.get_by_name_label(i)
- n = len(vms)
- if n == 0:
- return None
- elif n > 1:
- raise Exception('duplicate name found: %s' % i)
- else:
- return vms[0]
-
- @utils.deferredToThread
- def _get_target(self, volume_info):
- return self._get_target_blocking(volume_info)
-
- def _get_target_blocking(self, volume_info):
- target = {}
- target['target'] = volume_info['targetHost']
- target['port'] = volume_info['targetPort']
- target['targetIQN'] = volume_info['iqn']
- # We expect SR_BACKEND_FAILURE_107 to retrieve params to create the SR
- try:
- self._conn.xenapi.SR.create(self._get_xenapi_host(),
- target, '-1', '', '',
- 'iscsi', '', False, {})
- except XenAPI.Failure, exc:
- if exc.details[0] == 'SR_BACKEND_FAILURE_107':
- xml_response = parseString(exc.details[3])
- isciTargets = xml_response.getElementsByTagName('iscsi-target')
- # Make sure that only the correct Lun is visible
- if len(isciTargets) > 1:
- raise Exception('More than one ISCSI Target available')
- isciLuns = isciTargets.item(0).getElementsByTagName('LUN')
- if len(isciLuns) > 1:
- raise Exception('More than one ISCSI Lun available')
- # Parse params from the xml response into the dictionary
- for n in isciLuns.item(0).childNodes:
- if n.nodeType == 1:
- target[n.nodeName] = str(n.firstChild.data).strip()
- else:
- logging.warn(exc)
- raise Exception('Unable to access SR')
- return target
-
- @utils.deferredToThread
- def _get_sr_available_space(self, sr_ref):
- return self._get_sr_available_space_blocking(sr_ref)
-
- def _get_sr_available_space_blocking(self, sr_ref):
- pu = self._conn.xenapi.SR.get_physical_utilisation(sr_ref)
- ps = self._conn.xenapi.SR.get_physical_size(sr_ref)
- return (int(ps) - int(pu)) - (8 * 1024 * 1024)
-
- @utils.deferredToThread
- def _create_sr(self, target, label, description):
- return self._create_sr_blocking(target, label, description)
-
- def _create_sr_blocking(self, target, label, description):
- # TODO: we might want to put all these string literals into constants
- sr_ref = self._conn.xenapi.SR.get_by_name_label(label)
- if sr_ref is None:
- sr_ref = self._conn.xenapi.SR.create(self._get_xenapi_host(),
- target,
- target['size'],
- label,
- description,
- 'iscsi',
- '',
- True, {})
- if sr_ref:
- #self._conn.xenapi.SR.scan(sr_ref)
- return sr_ref
- else:
- raise Exception('Unable to create SR')
-
- @defer.inlineCallbacks
- def _destroy_sr(self, sr_ref):
- # Some clean-up depending on the state of the SR
- # Remove VBDs
- #vbds = yield self._conn.xenapi.SR.get_VBDs(sr_ref)
- #for vbd_ref in vbds:
- # try:
- # task = yield self._call_xenapi('Async.VBD.destroy', vbd_ref)
- # yield self._wait_for_task(task)
- # except Exception, exc:
- # logging.warn(exc)
- # Remove VDIs
- #=======================================================================
- # vdis = yield self._conn.xenapi.SR.get_VDIs(sr_ref)
- # for vdi_ref in vdis:
- # try:
- # task = yield self._call_xenapi('Async.VDI.destroy', vdi_ref)
- # yield self._wait_for_task(task)
- # except Exception, exc:
- # logging.warn(exc)
- #=======================================================================
- sr_rec = self._conn.xenapi.SR.get_record(sr_ref)
- # Detach from host
- pbds = yield self._conn.xenapi.SR.get_PBDs(sr_ref)
- for pbd_ref in pbds:
- try:
- task = yield self._call_xenapi('Async.PBD.unplug', pbd_ref)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- # Destroy SR
- try:
- task = yield self._call_xenapi('Async.SR.destroy', sr_ref)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
-
- def _sr_dispose_action(self, action, records):
- for rec_ref in records:
- try:
- task = yield self._call_xenapi(action, rec_ref)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
-
- @utils.deferredToThread
- def _lookup_vm_vdis(self, vm):
- return self._lookup_vm_vdis_blocking(vm)
-
- def _lookup_vm_vdis_blocking(self, vm):
- # Firstly we get the VBDs, then the VDIs.
- # TODO: do we leave the read-only devices?
- vbds = self._conn.xenapi.VM.get_VBDs(vm)
- vdis = []
- if vbds:
- for vbd in vbds:
- try:
- vdi = self._conn.xenapi.VBD.get_VDI(vbd)
- # Test valid VDI
- record = self._conn.xenapi.VDI.get_record(vdi)
- except Exception, exc:
- logging.warn(exc)
- else:
- vdis.append(vdi)
- if len(vdis) > 0:
- return vdis
- else:
- return None
-
- def _wait_for_task(self, task):
- """Return a Deferred that will give the result of the given task.
- The task is polled until it completes."""
- d = defer.Deferred()
- reactor.callLater(0, self._poll_task, task, d)
- return d
-
- @utils.deferredToThread
- def _poll_task(self, task, deferred):
- """Poll the given XenAPI task, and fire the given Deferred if we
- get a result."""
- try:
- #logging.debug('Polling task %s...', task)
- status = self._conn.xenapi.task.get_status(task)
- if status == 'pending':
- reactor.callLater(FLAGS.xenapi_task_poll_interval,
- self._poll_task, task, deferred)
- elif status == 'success':
- result = self._conn.xenapi.task.get_result(task)
- logging.info('Task %s status: success. %s', task, result)
- deferred.callback(_parse_xmlrpc_value(result))
- else:
- error_info = self._conn.xenapi.task.get_error_info(task)
- logging.warn('Task %s status: %s. %s', task, status,
- error_info)
- deferred.errback(XenAPI.Failure(error_info))
- #logging.debug('Polling task %s done.', task)
- except Exception, exc:
- logging.warn(exc)
- deferred.errback(exc)
-
- @utils.deferredToThread
- def _call_xenapi(self, method, *args):
- """Call the specified XenAPI method on a background thread. Returns
- a Deferred for the result."""
- f = self._conn.xenapi
- for m in method.split('.'):
- f = f.__getattr__(m)
- return f(*args)
-
- @utils.deferredToThread
- def _async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread. Returns a
- Deferred with the task reference."""
- return _unwrap_plugin_exceptions(
- self._conn.xenapi.Async.host.call_plugin,
- self._get_xenapi_host(), plugin, fn, args)
-
- def _get_xenapi_host(self):
- return self._conn.xenapi.session.get_this_host(self._conn.handle)
-
-
-def _unwrap_plugin_exceptions(func, *args, **kwargs):
- try:
- return func(*args, **kwargs)
- except XenAPI.Failure, exc:
- logging.debug("Got exception: %s", exc)
- if (len(exc.details) == 4 and
- exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
- exc.details[2] == 'Failure'):
- params = None
- try:
- params = eval(exc.details[3])
- except:
- raise exc
- raise XenAPI.Failure(params)
- else:
- raise
- except xmlrpclib.ProtocolError, exc:
- logging.debug("Got exception: %s", exc)
- raise
-
-
-def _parse_xmlrpc_value(val):
- """Parse the given value as if it were an XML-RPC value. This is
- sometimes used as the format for the task.result field."""
- if not val:
- return val
- x = xmlrpclib.loads(
- '<?xml version="1.0"?><methodResponse><params><param>' +
- val +
- '</param></params></methodResponse>')
- return x[0][0]
-
-
-def _parse_volume_info(device_path, mountpoint):
- volume_info = {}
- volume_info['volumeId'] = 'vol-qurmrzn9'
- # Because XCP/XS want an x beforehand
- volume_info['mountpoint'] = '/dev/xvdc' # translate
- volume_info['targetHost'] = '10.70.177.40'
- volume_info['targetPort'] = '3260' # default 3260
- volume_info['iqn'] = 'iqn.2010-10.org.openstack:vol-qurmrzn9'
- return volume_info
diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py
new file mode 100644
index 000000000..3d598c463
--- /dev/null
+++ b/nova/virt/xenapi/__init__.py
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py
new file mode 100644
index 000000000..b58b9159c
--- /dev/null
+++ b/nova/virt/xenapi/network_utils.py
@@ -0,0 +1,40 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of network records and
+their attributes like bridges, PIFs, QoS, as well as their lookup functions.
+"""
+
+from twisted.internet import defer
+
+
+class NetworkHelper():
+ def __init__(self, session):
+ return
+
+ @classmethod
+ @defer.inlineCallbacks
+ def find_network_with_bridge(self, session, bridge):
+ expr = 'field "bridge" = "%s"' % bridge
+ networks = yield session.call_xenapi('network.get_all_records_where',
+ expr)
+ if len(networks) == 1:
+ defer.returnValue(networks.keys()[0])
+ elif len(networks) > 1:
+ raise Exception('Found non-unique network for bridge %s' % bridge)
+ else:
+ raise Exception('Found no network for bridge %s' % bridge)
diff --git a/nova/virt/xenapi/novadeps.py b/nova/virt/xenapi/novadeps.py
new file mode 100644
index 000000000..ba62468fb
--- /dev/null
+++ b/nova/virt/xenapi/novadeps.py
@@ -0,0 +1,103 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova import db
+from nova import flags
+from nova import process
+from nova import utils
+from nova import context
+
+from nova.compute import power_state
+from nova.auth.manager import AuthManager
+from nova.compute import instance_types
+from nova.virt import images
+
+XENAPI_POWER_STATE = {
+ 'Halted': power_state.SHUTDOWN,
+ 'Running': power_state.RUNNING,
+ 'Paused': power_state.PAUSED,
+ 'Suspended': power_state.SHUTDOWN, # FIXME
+ 'Crashed': power_state.CRASHED}
+
+
+class Instance(object):
+
+ @classmethod
+ def get_name(self, instance):
+ return instance.name
+
+ @classmethod
+ def get_type(self, instance):
+ return instance_types.INSTANCE_TYPES[instance.instance_type]
+
+ @classmethod
+ def get_project(self, instance):
+ return AuthManager().get_project(instance.project_id)
+
+ @classmethod
+ def get_project_id(self, instance):
+ return instance.project_id
+
+ @classmethod
+ def get_image_id(self, instance):
+ return instance.image_id
+
+ @classmethod
+ def get_kernel_id(self, instance):
+ return instance.kernel_id
+
+ @classmethod
+ def get_ramdisk_id(self, instance):
+ return instance.ramdisk_id
+
+ @classmethod
+ def get_network(self, instance):
+ # TODO: is ge_admin_context the right context to retrieve?
+ return db.project_get_network(context.get_admin_context(),
+ instance.project_id)
+
+ @classmethod
+ def get_mac(self, instance):
+ return instance.mac_address
+
+ @classmethod
+ def get_user(self, instance):
+ return AuthManager().get_user(instance.user_id)
+
+
+class Network(object):
+
+ @classmethod
+ def get_bridge(self, network):
+ return network.bridge
+
+
+class Image(object):
+
+ @classmethod
+ def get_url(self, image):
+ return images.image_url(image)
+
+
+class User(object):
+
+ @classmethod
+ def get_access(self, user, project):
+ return AuthManager().get_access_key(user, project)
+
+ @classmethod
+ def get_secret(self, user):
+ return user.secret
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
new file mode 100644
index 000000000..b68df2791
--- /dev/null
+++ b/nova/virt/xenapi/vm_utils.py
@@ -0,0 +1,190 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of VM records and
+their attributes like VDIs, VIFs, as well as their lookup functions.
+"""
+
+import logging
+
+from twisted.internet import defer
+
+from nova import utils
+
+from novadeps import Instance
+from novadeps import Image
+from novadeps import User
+
+
+class VMHelper():
+ def __init__(self, session):
+ return
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vm(self, session, instance, kernel, ramdisk):
+ """Create a VM record. Returns a Deferred that gives the new
+ VM reference."""
+
+ instance_type = Instance.get_type(instance)
+ mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
+ vcpus = str(instance_type['vcpus'])
+ rec = {
+ 'name_label': instance.name,
+ 'name_description': '',
+ 'is_a_template': False,
+ 'memory_static_min': '0',
+ 'memory_static_max': mem,
+ 'memory_dynamic_min': mem,
+ 'memory_dynamic_max': mem,
+ 'VCPUs_at_startup': vcpus,
+ 'VCPUs_max': vcpus,
+ 'VCPUs_params': {},
+ 'actions_after_shutdown': 'destroy',
+ 'actions_after_reboot': 'restart',
+ 'actions_after_crash': 'destroy',
+ 'PV_bootloader': '',
+ 'PV_kernel': kernel,
+ 'PV_ramdisk': ramdisk,
+ 'PV_args': 'root=/dev/xvda1',
+ 'PV_bootloader_args': '',
+ 'PV_legacy_args': '',
+ 'HVM_boot_policy': '',
+ 'HVM_boot_params': {},
+ 'platform': {},
+ 'PCI_bus': '',
+ 'recommendations': '',
+ 'affinity': '',
+ 'user_version': '0',
+ 'other_config': {},
+ }
+ logging.debug('Created VM %s...', Instance.get_name(instance))
+ vm_ref = yield session.call_xenapi('VM.create', rec)
+ logging.debug('Created VM %s as %s.',
+ Instance.get_name(instance), vm_ref)
+ defer.returnValue(vm_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vbd(self, session, vm_ref, vdi_ref, userdevice, bootable):
+ """Create a VBD record. Returns a Deferred that gives the new
+ VBD reference."""
+
+ vbd_rec = {}
+ vbd_rec['VM'] = vm_ref
+ vbd_rec['VDI'] = vdi_ref
+ vbd_rec['userdevice'] = str(userdevice)
+ vbd_rec['bootable'] = bootable
+ vbd_rec['mode'] = 'RW'
+ vbd_rec['type'] = 'disk'
+ vbd_rec['unpluggable'] = True
+ vbd_rec['empty'] = False
+ vbd_rec['other_config'] = {}
+ vbd_rec['qos_algorithm_type'] = ''
+ vbd_rec['qos_algorithm_params'] = {}
+ vbd_rec['qos_supported_algorithms'] = []
+ logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
+ vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec)
+ logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
+ vdi_ref)
+ defer.returnValue(vbd_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vif(self, session, vm_ref, network_ref, mac_address):
+ """Create a VIF record. Returns a Deferred that gives the new
+ VIF reference."""
+
+ vif_rec = {}
+ vif_rec['device'] = '0'
+ vif_rec['network'] = network_ref
+ vif_rec['VM'] = vm_ref
+ vif_rec['MAC'] = mac_address
+ vif_rec['MTU'] = '1500'
+ vif_rec['other_config'] = {}
+ vif_rec['qos_algorithm_type'] = ''
+ vif_rec['qos_algorithm_params'] = {}
+ logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
+ network_ref)
+ vif_ref = yield session.call_xenapi('VIF.create', vif_rec)
+ logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
+ vm_ref, network_ref)
+ defer.returnValue(vif_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def fetch_image(self, session, image, user, project, use_sr):
+ """use_sr: True to put the image as a VDI in an SR, False to place
+ it on dom0's filesystem. The former is for VM disks, the latter for
+ its kernel and ramdisk (if external kernels are being used).
+ Returns a Deferred that gives the new VDI UUID."""
+
+ url = Image.get_url(image)
+ access = User.get_access(user, project)
+ logging.debug("Asking xapi to fetch %s as %s" % (url, access))
+ fn = use_sr and 'get_vdi' or 'get_kernel'
+ args = {}
+ args['src_url'] = url
+ args['username'] = access
+ args['password'] = User.get_secret(user)
+ if use_sr:
+ args['add_partition'] = 'true'
+ task = yield session.async_call_plugin('objectstore', fn, args)
+ uuid = yield session.wait_for_task(task)
+ defer.returnValue(uuid)
+
+ @classmethod
+ @utils.deferredToThread
+ def lookup(self, session, i):
+ return VMHelper.lookup_blocking(session, i)
+
+ @classmethod
+ def lookup_blocking(self, session, i):
+ vms = session.get_xenapi().VM.get_by_name_label(i)
+ n = len(vms)
+ if n == 0:
+ return None
+ elif n > 1:
+ raise Exception('duplicate name found: %s' % i)
+ else:
+ return vms[0]
+
+ @classmethod
+ @utils.deferredToThread
+ def lookup_vm_vdis(self, session, vm):
+ return VMHelper.lookup_vm_vdis_blocking(session, vm)
+
+ @classmethod
+ def lookup_vm_vdis_blocking(self, session, vm):
+ # Firstly we get the VBDs, then the VDIs.
+ # TODO: do we leave the read-only devices?
+ vbds = session.get_xenapi().VM.get_VBDs(vm)
+ vdis = []
+ if vbds:
+ for vbd in vbds:
+ try:
+ vdi = session.get_xenapi().VBD.get_VDI(vbd)
+ # Test valid VDI
+ record = session.get_xenapi().VDI.get_record(vdi)
+ except Exception, exc:
+ logging.warn(exc)
+ else:
+ vdis.append(vdi)
+ if len(vdis) > 0:
+ return vdis
+ else:
+ return None
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
new file mode 100644
index 000000000..d6ea5e7db
--- /dev/null
+++ b/nova/virt/xenapi/vmops.py
@@ -0,0 +1,123 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for VM-related functions (spawn, reboot, etc).
+"""
+
+import logging
+
+from twisted.internet import defer
+
+from novadeps import XENAPI_POWER_STATE
+from novadeps import Instance
+from novadeps import Network
+
+from vm_utils import VMHelper
+from network_utils import NetworkHelper
+
+
+class VMOps(object):
+ def __init__(self, session):
+ self._session = session
+
+ def list_instances(self):
+ return [self._session.get_xenapi().VM.get_name_label(vm) \
+ for vm in self._session.get_xenapi().VM.get_all()]
+
+ @defer.inlineCallbacks
+ def spawn(self, instance):
+ vm = yield VMHelper.lookup(self._session, Instance.get_name(instance))
+ if vm is not None:
+ raise Exception('Attempted to create non-unique name %s' %
+ Instance.get_name(instance))
+
+ bridge = Network.get_bridge(Instance.get_network(instance))
+ network_ref = \
+ yield NetworkHelper.find_network_with_bridge(self._session, bridge)
+
+ user = Instance.get_user(instance)
+ project = Instance.get_project(instance)
+ vdi_uuid = yield VMHelper.fetch_image(self._session,
+ Instance.get_image_id(instance), user, project, True)
+ kernel = yield VMHelper.fetch_image(self._session,
+ Instance.get_kernel_id(instance), user, project, False)
+ ramdisk = yield VMHelper.fetch_image(self._session,
+ Instance.get_ramdisk_id(instance), user, project, False)
+ vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
+ vm_ref = yield VMHelper.create_vm(self._session,
+ instance, kernel, ramdisk)
+ yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
+ if network_ref:
+ yield VMHelper.create_vif(self._session, vm_ref,
+ network_ref, Instance.get_mac(instance))
+ logging.debug('Starting VM %s...', vm_ref)
+ yield self._session.call_xenapi('VM.start', vm_ref, False, False)
+ logging.info('Spawning VM %s created %s.', Instance.get_name(instance),
+ vm_ref)
+
+ @defer.inlineCallbacks
+ def reboot(self, instance):
+ instance_name = Instance.get_name(instance)
+ vm = yield VMHelper.lookup(self._session, instance_name)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_name)
+ task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm)
+ yield self._session.wait_for_task(task)
+
+ @defer.inlineCallbacks
+ def destroy(self, instance):
+ vm = yield VMHelper.lookup(self._session, Instance.get_name(instance))
+ if vm is None:
+ # Don't complain, just return. This lets us clean up instances
+ # that have already disappeared from the underlying platform.
+ defer.returnValue(None)
+ # Get the VDIs related to the VM
+ vdis = yield VMHelper.lookup_vm_vdis(self._session, vm)
+ try:
+ task = yield self._session.call_xenapi('Async.VM.hard_shutdown',
+ vm)
+ yield self._session.wait_for_task(task)
+ except Exception, exc:
+ logging.warn(exc)
+ # Disk clean-up
+ if vdis:
+ for vdi in vdis:
+ try:
+ task = yield self._session.call_xenapi('Async.VDI.destroy',
+ vdi)
+ yield self._session.wait_for_task(task)
+ except Exception, exc:
+ logging.warn(exc)
+ try:
+ task = yield self._session.call_xenapi('Async.VM.destroy', vm)
+ yield self._session.wait_for_task(task)
+ except Exception, exc:
+ logging.warn(exc)
+
+ def get_info(self, instance_id):
+ vm = VMHelper.lookup_blocking(self._session, instance_id)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_id)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ return {'state': XENAPI_POWER_STATE[rec['power_state']],
+ 'max_mem': long(rec['memory_static_max']) >> 10,
+ 'mem': long(rec['memory_dynamic_max']) >> 10,
+ 'num_cpu': rec['VCPUs_max'],
+ 'cpu_time': 0}
+
+ def get_console_output(self, instance):
+ return 'FAKE CONSOLE OUTPUT'
diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py
new file mode 100644
index 000000000..5aefa0611
--- /dev/null
+++ b/nova/virt/xenapi/volumeops.py
@@ -0,0 +1,82 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for Storage-related functions (attach, detach, etc).
+"""
+
+
+class VolumeOps(object):
+ def __init__(self, session):
+ self._session = session
+
+ @defer.inlineCallbacks
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ # NOTE: No Resource Pool concept so far
+ logging.debug("Attach_volume: %s, %s, %s",
+ instance_name, device_path, mountpoint)
+ volume_info = _parse_volume_info(device_path, mountpoint)
+ # Create the iSCSI SR, and the PDB through which hosts access SRs.
+ # But first, retrieve target info, like Host, IQN, LUN and SCSIID
+ target = yield self._get_target(volume_info)
+ label = 'SR-%s' % volume_info['volumeId']
+ description = 'Attached-to:%s' % instance_name
+ # Create SR and check the physical space available for the VDI allocation
+ sr_ref = yield self._create_sr(target, label, description)
+ disk_size = int(target['size'])
+ #disk_size = yield self._get_sr_available_space(sr_ref)
+ # Create VDI and attach VBD to VM
+ vm_ref = yield self._lookup(instance_name)
+ logging.debug("Mounting disk of: %s GB", (disk_size / (1024*1024*1024.0)))
+ try:
+ vdi_ref = yield self._create_vdi(sr_ref, disk_size,
+ 'user', volume_info['volumeId'], '',
+ False, False)
+ except Exception, exc:
+ logging.warn(exc)
+ yield self._destroy_sr(sr_ref)
+ raise Exception('Unable to create VDI on SR %s for instance %s'
+ % (sr_ref,
+ instance_name))
+ else:
+ try:
+ userdevice = 2 # FIXME: this depends on the numbers of attached disks
+ vbd_ref = yield self._create_vbd(vm_ref, vdi_ref, userdevice, False, True, False)
+ except Exception, exc:
+ logging.warn(exc)
+ yield self._destroy_sr(sr_ref)
+ raise Exception('Unable to create VBD on SR %s for instance %s'
+ % (sr_ref,
+ instance_name))
+ else:
+ try:
+ raise Exception('')
+ task = yield self._call_xenapi('Async.VBD.plug', vbd_ref)
+ yield self._wait_for_task(task)
+ except Exception, exc:
+ logging.warn(exc)
+ yield self._destroy_sr(sr_ref)
+ raise Exception('Unable to attach volume to instance %s' % instance_name)
+
+ yield True
+
+ @defer.inlineCallbacks
+ def detach_volume(self, instance_name, mountpoint):
+ logging.debug("Detach_volume: %s, %s, %s", instance_name, mountpoint)
+ # Detach VBD from VM
+ # Forget SR/PDB info associated with host
+ # TODO: can we avoid destroying the SR every time we detach?
+ yield True \ No newline at end of file
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
new file mode 100644
index 000000000..0a73b4774
--- /dev/null
+++ b/nova/virt/xenapi_conn.py
@@ -0,0 +1,227 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A connection to XenServer or Xen Cloud Platform.
+
+The concurrency model for this class is as follows:
+
+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
+deferredToThread). They are remote calls, and so may hang for the usual
+reasons. They should not be allowed to block the reactor thread.
+
+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion. Polling is handled using reactor.callLater.
+
+This combination of techniques means that we don't block the reactor thread at
+all, and at the same time we don't hold lots of threads waiting for
+long-running operations.
+
+FIXME: get_info currently doesn't conform to these rules, and will block the
+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
+
+**Related Flags**
+
+:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
+:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
+ Platform (default: root).
+:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
+ Platform.
+:xenapi_task_poll_interval: The interval (seconds) used for polling of
+ remote tasks (Async.VM.start, etc)
+ (default: 0.5).
+
+"""
+
+import logging
+import xmlrpclib
+
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from nova import flags
+from nova import utils
+
+from xenapi.vmops import VMOps
+from xenapi.volumeops import VolumeOps
+
+XenAPI = None
+
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('xenapi_connection_url',
+ None,
+ 'URL for connection to XenServer/Xen Cloud Platform.'
+ ' Required if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_username',
+ 'root',
+ 'Username for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_password',
+ None,
+ 'Password for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_float('xenapi_task_poll_interval',
+ 0.5,
+ 'The interval used for polling of remote tasks '
+ '(Async.VM.start, etc). Used only if '
+ 'connection_type=xenapi.')
+
+
+def get_connection(_):
+ """Note that XenAPI doesn't have a read-only connection mode, so
+ the read_only parameter is ignored."""
+ # This is loaded late so that there's no need to install this
+ # library when not using XenAPI.
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+ url = FLAGS.xenapi_connection_url
+ username = FLAGS.xenapi_connection_username
+ password = FLAGS.xenapi_connection_password
+ if not url or password is None:
+ raise Exception('Must specify xenapi_connection_url, '
+ 'xenapi_connection_username (optionally), and '
+ 'xenapi_connection_password to use '
+ 'connection_type=xenapi')
+ return XenAPIConnection(url, username, password)
+
+
+class XenAPIConnection(object):
+ def __init__(self, url, user, pw):
+ session = XenAPISession(url, user, pw)
+ self._vmops = VMOps(session)
+ self._volumeops = VolumeOps(session)
+
+ def list_instances(self):
+ return self._vmops.list_instances()
+
+ def spawn(self, instance):
+ self._vmops.spawn(instance)
+
+ def reboot(self, instance):
+ self._vmops.reboot(instance)
+
+ def destroy(self, instance):
+ self._vmops.destroy(instance)
+
+ def get_info(self, instance_id):
+ return self._vmops.get_info(instance_id)
+
+ def get_console_output(self, instance):
+ return self._vmops.get_console_output(instance)
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ return self._volumeops.attach_volume(instance_name,
+ device_path,
+ mountpoint)
+
+ def detach_volume(self, instance_name, mountpoint):
+ return self._volumeops.detach_volume(instance_name, mountpoint)
+
+
+class XenAPISession(object):
+ def __init__(self, url, user, pw):
+ self._session = XenAPI.Session(url)
+ self._session.login_with_password(user, pw)
+
+ def get_xenapi(self):
+ return self._session.xenapi
+
+ def get_xenapi_host(self):
+ return self._session.xenapi.session.get_this_host(self._session.handle)
+
+ @utils.deferredToThread
+ def call_xenapi(self, method, *args):
+ """Call the specified XenAPI method on a background thread. Returns
+ a Deferred for the result."""
+ f = self._session.xenapi
+ for m in method.split('.'):
+ f = f.__getattr__(m)
+ return f(*args)
+
+ @utils.deferredToThread
+ def async_call_plugin(self, plugin, fn, args):
+ """Call Async.host.call_plugin on a background thread. Returns a
+ Deferred with the task reference."""
+ return _unwrap_plugin_exceptions(
+ self._session.xenapi.Async.host.call_plugin,
+ self.get_xenapi_host(), plugin, fn, args)
+
+ def wait_for_task(self, task):
+ """Return a Deferred that will give the result of the given task.
+ The task is polled until it completes."""
+ d = defer.Deferred()
+ reactor.callLater(0, self._poll_task, task, d)
+ return d
+
+ @utils.deferredToThread
+ def _poll_task(self, task, deferred):
+ """Poll the given XenAPI task, and fire the given Deferred if we
+ get a result."""
+ try:
+ #logging.debug('Polling task %s...', task)
+ status = self._session.xenapi.task.get_status(task)
+ if status == 'pending':
+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
+ self._poll_task, task, deferred)
+ elif status == 'success':
+ result = self._session.xenapi.task.get_result(task)
+ logging.info('Task %s status: success. %s', task, result)
+ deferred.callback(_parse_xmlrpc_value(result))
+ else:
+ error_info = self._session.xenapi.task.get_error_info(task)
+ logging.warn('Task %s status: %s. %s', task, status,
+ error_info)
+ deferred.errback(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
+ except Exception, exc:
+ logging.warn(exc)
+ deferred.errback(exc)
+
+
+def _unwrap_plugin_exceptions(func, *args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except XenAPI.Failure, exc:
+ logging.debug("Got exception: %s", exc)
+ if (len(exc.details) == 4 and
+ exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
+ exc.details[2] == 'Failure'):
+ params = None
+ try:
+ params = eval(exc.details[3])
+ except:
+ raise exc
+ raise XenAPI.Failure(params)
+ else:
+ raise
+ except xmlrpclib.ProtocolError, exc:
+ logging.debug("Got exception: %s", exc)
+ raise
+
+
+def _parse_xmlrpc_value(val):
+ """Parse the given value as if it were an XML-RPC value. This is
+ sometimes used as the format for the task.result field."""
+ if not val:
+ return val
+ x = xmlrpclib.loads(
+ '<?xml version="1.0"?><methodResponse><params><param>' +
+ val +
+ '</param></params></methodResponse>')
+ return x[0][0]