diff options
Diffstat (limited to 'nova/virt')
| -rw-r--r-- | nova/virt/connection.py | 4 | ||||
| -rw-r--r-- | nova/virt/fake.py | 10 | ||||
| -rw-r--r-- | nova/virt/images.py | 11 | ||||
| -rw-r--r-- | nova/virt/libvirt_conn.py | 177 | ||||
| -rw-r--r-- | nova/virt/xenapi.py | 444 | ||||
| -rw-r--r-- | nova/virt/xenapi/__init__.py | 15 | ||||
| -rw-r--r-- | nova/virt/xenapi/network_utils.py | 41 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 260 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 139 | ||||
| -rw-r--r-- | nova/virt/xenapi/volumeops.py | 32 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 241 |
11 files changed, 818 insertions, 556 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py index 11f0fa8ce..c40bb4bb4 100644 --- a/nova/virt/connection.py +++ b/nova/virt/connection.py @@ -25,7 +25,7 @@ import sys from nova import flags from nova.virt import fake from nova.virt import libvirt_conn -from nova.virt import xenapi +from nova.virt import xenapi_conn FLAGS = flags.FLAGS @@ -61,7 +61,7 @@ def get_connection(read_only=False): elif t == 'libvirt': conn = libvirt_conn.get_connection(read_only) elif t == 'xenapi': - conn = xenapi.get_connection(read_only) + conn = xenapi_conn.get_connection(read_only) else: raise Exception('Unknown connection type "%s"' % t) diff --git a/nova/virt/fake.py b/nova/virt/fake.py index f855523d3..77bc926c2 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -25,8 +25,6 @@ semantics of real hypervisor connections. """ -from twisted.internet import defer - from nova import exception from nova.compute import power_state @@ -107,7 +105,6 @@ class FakeConnection(object): fake_instance = FakeInstance() self.instances[instance.name] = fake_instance fake_instance._state = power_state.RUNNING - return defer.succeed(None) def reboot(self, instance): """ @@ -119,19 +116,19 @@ class FakeConnection(object): The work will be done asynchronously. This function returns a Deferred that allows the caller to detect when it is complete. """ - return defer.succeed(None) + pass def rescue(self, instance): """ Rescue the specified instance. """ - return defer.succeed(None) + pass def unrescue(self, instance): """ Unrescue the specified instance. """ - return defer.succeed(None) + pass def destroy(self, instance): """ @@ -144,7 +141,6 @@ class FakeConnection(object): Deferred that allows the caller to detect when it is complete. """ del self.instances[instance.name] - return defer.succeed(None) def attach_volume(self, instance_name, device_path, mountpoint): """Attach the disk at device_path to the instance at mountpoint""" diff --git a/nova/virt/images.py b/nova/virt/images.py index 981aa5cf3..1c9b2e093 100644 --- a/nova/virt/images.py +++ b/nova/virt/images.py @@ -26,7 +26,7 @@ import time import urlparse from nova import flags -from nova import process +from nova import utils from nova.auth import manager from nova.auth import signer from nova.objectstore import image @@ -50,7 +50,7 @@ def _fetch_s3_image(image, path, user, project): # This should probably move somewhere else, like e.g. a download_as # method on User objects and at the same time get rewritten to use - # twisted web client. + # a web client. headers = {} headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) @@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project): cmd = ['/usr/bin/curl', '--fail', '--silent', url] for (k, v) in headers.iteritems(): - cmd += ['-H', '%s: %s' % (k, v)] + cmd += ['-H', '"%s: %s"' % (k, v)] cmd += ['-o', path] - return process.SharedPool().execute(executable=cmd[0], args=cmd[1:]) + cmd_out = ' '.join(cmd) + return utils.execute(cmd_out) def _fetch_local_image(image, path, user, project): source = _image_path('%s/image' % image) - return process.simple_execute('cp %s %s' % (source, path)) + return utils.execute('cp %s %s' % (source, path)) def _image_path(path): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 18085089f..5a8c71850 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -45,16 +45,15 @@ import logging import os import shutil +from eventlet import event +from eventlet import tpool + import IPy -from twisted.internet import defer -from twisted.internet import task -from twisted.internet import threads from nova import context from nova import db from nova import exception from nova import flags -from nova import process from nova import utils #from nova.api import context from nova.auth import manager @@ -184,14 +183,12 @@ class LibvirtConnection(object): except Exception as _err: pass # If the instance is already terminated, we're still happy - d = defer.Deferred() - if cleanup: - d.addCallback(lambda _: self._cleanup(instance)) - # FIXME: What does this comment mean? - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, + + done = event.Event() + + # We'll save this for when we do shutdown, # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_shutdown(): try: @@ -200,17 +197,26 @@ class LibvirtConnection(object): instance['id'], state) if state == power_state.SHUTDOWN: timer.stop() - d.callback(None) except Exception: db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d + timer_done = timer.start(interval=0.5, now=True) + + # NOTE(termie): this is strictly superfluous (we could put the + # cleanup code in the timer), but this emulates the + # previous model so I am keeping it around until + # everything has been vetted a bit + def _wait_for_timer(): + timer_done.wait() + self._cleanup(instance) + done.send() + + greenthread.spawn(_wait_for_timer) + return done def _cleanup(self, instance): target = os.path.join(FLAGS.instances_path, instance['name']) @@ -219,7 +225,6 @@ class LibvirtConnection(object): if os.path.exists(target): shutil.rmtree(target) - @defer.inlineCallbacks @exception.wrap_exception def attach_volume(self, instance_name, device_path, mountpoint): virt_dom = self._conn.lookupByName(instance_name) @@ -230,7 +235,6 @@ class LibvirtConnection(object): <target dev='%s' bus='virtio'/> </disk>""" % (device_path, mount_device) virt_dom.attachDevice(xml) - yield def _get_disk_xml(self, xml, device): """Returns the xml for the disk mounted at device""" @@ -252,7 +256,6 @@ class LibvirtConnection(object): if doc != None: doc.freeDoc() - @defer.inlineCallbacks @exception.wrap_exception def detach_volume(self, instance_name, mountpoint): virt_dom = self._conn.lookupByName(instance_name) @@ -261,17 +264,13 @@ class LibvirtConnection(object): if not xml: raise exception.NotFound("No disk at %s" % mount_device) virt_dom.detachDevice(xml) - yield - @defer.inlineCallbacks @exception.wrap_exception def reboot(self, instance): - yield self.destroy(instance, False) + self.destroy(instance, False) xml = self.to_xml(instance) - yield self._conn.createXML(xml, 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) + self._conn.createXML(xml, 0) + timer = utils.LoopingCall(f=None) def _wait_for_reboot(): try: @@ -281,33 +280,28 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug('instance %s: rebooted', instance['name']) timer.stop() - d.callback(None) except Exception, exn: logging.error('_wait_for_reboot failed: %s', exn) db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d + return timer.start(interval=0.5, now=True) - @defer.inlineCallbacks @exception.wrap_exception def rescue(self, instance): - yield self.destroy(instance, False) + self.destroy(instance, False) xml = self.to_xml(instance, rescue=True) rescue_images = {'image_id': FLAGS.rescue_image_id, 'kernel_id': FLAGS.rescue_kernel_id, 'ramdisk_id': FLAGS.rescue_ramdisk_id} - yield self._create_image(instance, xml, 'rescue-', rescue_images) - yield self._conn.createXML(xml, 0) + self._create_image(instance, xml, 'rescue-', rescue_images) + self._conn.createXML(xml, 0) - d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_rescue(): try: @@ -316,27 +310,22 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug('instance %s: rescued', instance['name']) timer.stop() - d.callback(None) except Exception, exn: logging.error('_wait_for_rescue failed: %s', exn) db.instance_set_state(None, instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_rescue - timer.start(interval=0.5, now=True) - yield d + return timer.start(interval=0.5, now=True) - @defer.inlineCallbacks @exception.wrap_exception def unrescue(self, instance): # NOTE(vish): Because reboot destroys and recreates an instance using # the normal xml file, we can just call reboot here - yield self.reboot(instance) + self.reboot(instance) - @defer.inlineCallbacks @exception.wrap_exception def spawn(self, instance): xml = self.to_xml(instance) @@ -344,14 +333,12 @@ class LibvirtConnection(object): instance['id'], power_state.NOSTATE, 'launching') - yield NWFilterFirewall(self._conn).\ - setup_nwfilters_for_instance(instance) - yield self._create_image(instance, xml) - yield self._conn.createXML(xml, 0) + NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance) + self._create_image(instance, xml) + self._conn.createXML(xml, 0) logging.debug("instance %s: is running", instance['name']) - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_boot(): try: @@ -361,7 +348,6 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug('instance %s: booted', instance['name']) timer.stop() - local_d.callback(None) except: logging.exception('instance %s: failed to boot', instance['name']) @@ -369,10 +355,9 @@ class LibvirtConnection(object): instance['id'], power_state.SHUTDOWN) timer.stop() - local_d.callback(None) + timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - yield local_d + return timer.start(interval=0.5, now=True) def _flush_xen_console(self, virsh_output): logging.info('virsh said: %r' % (virsh_output,)) @@ -380,10 +365,9 @@ class LibvirtConnection(object): if virsh_output.startswith('/dev/'): logging.info('cool, it\'s a device') - d = process.simple_execute("sudo dd if=%s iflag=nonblock" % - virsh_output, check_exit_code=False) - d.addCallback(lambda r: r[0]) - return d + out, err = utils.execute("sudo dd if=%s iflag=nonblock" % + virsh_output, check_exit_code=False) + return out else: return '' @@ -403,21 +387,20 @@ class LibvirtConnection(object): def get_console_output(self, instance): console_log = os.path.join(FLAGS.instances_path, instance['name'], 'console.log') - d = process.simple_execute('sudo chown %d %s' % (os.getuid(), - console_log)) + + utils.execute('sudo chown %d %s' % (os.getuid(), console_log)) + if FLAGS.libvirt_type == 'xen': - # Xen is spethial - d.addCallback(lambda _: - process.simple_execute("virsh ttyconsole %s" % - instance['name'])) - d.addCallback(self._flush_xen_console) - d.addCallback(self._append_to_file, console_log) + # Xen is special + virsh_output = utils.execute("virsh ttyconsole %s" % + instance['name']) + data = self._flush_xen_console(virsh_output) + fpath = self._append_to_file(data, console_log) else: - d.addCallback(lambda _: defer.succeed(console_log)) - d.addCallback(self._dump_file) - return d + fpath = console_log + + return self._dump_file(fpath) - @defer.inlineCallbacks def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None): # syntactic nicety basepath = lambda fname = '', prefix = prefix: os.path.join( @@ -426,8 +409,8 @@ class LibvirtConnection(object): prefix + fname) # ensure directories exist and are writable - yield process.simple_execute('mkdir -p %s' % basepath(prefix='')) - yield process.simple_execute('chmod 0777 %s' % basepath(prefix='')) + utils.execute('mkdir -p %s' % basepath(prefix='')) + utils.execute('chmod 0777 %s' % basepath(prefix='')) # TODO(termie): these are blocking calls, it would be great # if they weren't. @@ -448,19 +431,19 @@ class LibvirtConnection(object): 'kernel_id': inst['kernel_id'], 'ramdisk_id': inst['ramdisk_id']} if not os.path.exists(basepath('disk')): - yield images.fetch(inst.image_id, basepath('disk-raw'), user, - project) + images.fetch(inst.image_id, basepath('disk-raw'), user, + project) if not os.path.exists(basepath('kernel')): - yield images.fetch(inst.kernel_id, basepath('kernel'), user, - project) + images.fetch(inst.kernel_id, basepath('kernel'), user, + project) if not os.path.exists(basepath('ramdisk')): - yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, - project) + images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, + project) - execute = lambda cmd, process_input = None, check_exit_code = True: \ - process.simple_execute(cmd=cmd, - process_input=process_input, - check_exit_code=check_exit_code) + def execute(cmd, process_input=None, check_exit_code=True): + return utils.execute(cmd=cmd, + process_input=process_input, + check_exit_code=check_exit_code) key = str(inst['key_data']) net = None @@ -482,11 +465,11 @@ class LibvirtConnection(object): if net: logging.info('instance %s: injecting net into image %s', inst['name'], inst.image_id) - yield disk.inject_data(basepath('disk-raw'), key, net, - execute=execute) + disk.inject_data(basepath('disk-raw'), key, net, + execute=execute) if os.path.exists(basepath('disk')): - yield process.simple_execute('rm -f %s' % basepath('disk')) + utils.execute('rm -f %s' % basepath('disk')) local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type] ['local_gb'] @@ -495,12 +478,11 @@ class LibvirtConnection(object): resize = True if inst['instance_type'] == 'm1.tiny' or prefix == 'rescue-': resize = False - yield disk.partition(basepath('disk-raw'), basepath('disk'), - local_bytes, resize, execute=execute) + disk.partition(basepath('disk-raw'), basepath('disk'), + local_bytes, resize, execute=execute) if FLAGS.libvirt_type == 'uml': - yield process.simple_execute('sudo chown root %s' % - basepath('disk')) + utils.execute('sudo chown root %s' % basepath('disk')) def to_xml(self, instance, rescue=False): # TODO(termie): cache? @@ -758,15 +740,15 @@ class NWFilterFirewall(object): def _define_filter(self, xml): if callable(xml): xml = xml() - d = threads.deferToThread(self._conn.nwfilterDefineXML, xml) - return d + + # execute in a native thread and block current greenthread until done + tpool.execute(self._conn.nwfilterDefineXML, xml) @staticmethod def _get_net_and_mask(cidr): net = IPy.IP(cidr) return str(net.net()), str(net.netmask()) - @defer.inlineCallbacks def setup_nwfilters_for_instance(self, instance): """ Creates an NWFilter for the given instance. In the process, @@ -774,10 +756,10 @@ class NWFilterFirewall(object): the base filter are all in place. """ - yield self._define_filter(self.nova_base_ipv4_filter) - yield self._define_filter(self.nova_base_ipv6_filter) - yield self._define_filter(self.nova_dhcp_filter) - yield self._define_filter(self.nova_base_filter) + self._define_filter(self.nova_base_ipv4_filter) + self._define_filter(self.nova_base_ipv6_filter) + self._define_filter(self.nova_dhcp_filter) + self._define_filter(self.nova_base_filter) nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \ " <filterref filter='nova-base' />\n" % \ @@ -789,20 +771,19 @@ class NWFilterFirewall(object): net, mask = self._get_net_and_mask(network_ref['cidr']) project_filter = self.nova_project_filter(instance['project_id'], net, mask) - yield self._define_filter(project_filter) + self._define_filter(project_filter) nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \ instance['project_id'] for security_group in instance.security_groups: - yield self.ensure_security_group_filter(security_group['id']) + self.ensure_security_group_filter(security_group['id']) nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \ security_group['id'] nwfilter_xml += "</filter>" - yield self._define_filter(nwfilter_xml) - return + self._define_filter(nwfilter_xml) def ensure_security_group_filter(self, security_group_id): return self._define_filter( diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py deleted file mode 100644 index 3169562a5..000000000 --- a/nova/virt/xenapi.py +++ /dev/null @@ -1,444 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (c) 2010 Citrix Systems, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -A connection to XenServer or Xen Cloud Platform. - -The concurrency model for this class is as follows: - -All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator -deferredToThread). They are remote calls, and so may hang for the usual -reasons. They should not be allowed to block the reactor thread. - -All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async -(using XenAPI.VM.async_start etc). These return a task, which can then be -polled for completion. Polling is handled using reactor.callLater. - -This combination of techniques means that we don't block the reactor thread at -all, and at the same time we don't hold lots of threads waiting for -long-running operations. - -FIXME: get_info currently doesn't conform to these rules, and will block the -reactor thread if the VM.get_by_name_label or VM.get_record calls block. - -**Related Flags** - -:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. -:xenapi_connection_username: Username for connection to XenServer/Xen Cloud - Platform (default: root). -:xenapi_connection_password: Password for connection to XenServer/Xen Cloud - Platform. -:xenapi_task_poll_interval: The interval (seconds) used for polling of - remote tasks (Async.VM.start, etc) - (default: 0.5). - -""" - -import logging -import xmlrpclib - -from twisted.internet import defer -from twisted.internet import reactor -from twisted.internet import task - -from nova import db -from nova import flags -from nova import process -from nova import utils -from nova.auth.manager import AuthManager -from nova.compute import instance_types -from nova.compute import power_state -from nova.virt import images - -XenAPI = None - - -FLAGS = flags.FLAGS -flags.DEFINE_string('xenapi_connection_url', - None, - 'URL for connection to XenServer/Xen Cloud Platform.' - ' Required if connection_type=xenapi.') -flags.DEFINE_string('xenapi_connection_username', - 'root', - 'Username for connection to XenServer/Xen Cloud Platform.' - ' Used only if connection_type=xenapi.') -flags.DEFINE_string('xenapi_connection_password', - None, - 'Password for connection to XenServer/Xen Cloud Platform.' - ' Used only if connection_type=xenapi.') -flags.DEFINE_float('xenapi_task_poll_interval', - 0.5, - 'The interval used for polling of remote tasks ' - '(Async.VM.start, etc). Used only if ' - 'connection_type=xenapi.') - - -XENAPI_POWER_STATE = { - 'Halted': power_state.SHUTDOWN, - 'Running': power_state.RUNNING, - 'Paused': power_state.PAUSED, - 'Suspended': power_state.SHUTDOWN, # FIXME - 'Crashed': power_state.CRASHED} - - -def get_connection(_): - """Note that XenAPI doesn't have a read-only connection mode, so - the read_only parameter is ignored.""" - # This is loaded late so that there's no need to install this - # library when not using XenAPI. - global XenAPI - if XenAPI is None: - XenAPI = __import__('XenAPI') - url = FLAGS.xenapi_connection_url - username = FLAGS.xenapi_connection_username - password = FLAGS.xenapi_connection_password - if not url or password is None: - raise Exception('Must specify xenapi_connection_url, ' - 'xenapi_connection_username (optionally), and ' - 'xenapi_connection_password to use ' - 'connection_type=xenapi') - return XenAPIConnection(url, username, password) - - -class XenAPIConnection(object): - def __init__(self, url, user, pw): - self._conn = XenAPI.Session(url) - self._conn.login_with_password(user, pw) - - def list_instances(self): - return [self._conn.xenapi.VM.get_name_label(vm) \ - for vm in self._conn.xenapi.VM.get_all()] - - @defer.inlineCallbacks - def spawn(self, instance): - vm = yield self._lookup(instance.name) - if vm is not None: - raise Exception('Attempted to create non-unique name %s' % - instance.name) - - network = db.project_get_network(None, instance.project_id) - network_ref = \ - yield self._find_network_with_bridge(network.bridge) - - user = AuthManager().get_user(instance.user_id) - project = AuthManager().get_project(instance.project_id) - vdi_uuid = yield self._fetch_image( - instance.image_id, user, project, True) - kernel = yield self._fetch_image( - instance.kernel_id, user, project, False) - ramdisk = yield self._fetch_image( - instance.ramdisk_id, user, project, False) - vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid) - - vm_ref = yield self._create_vm(instance, kernel, ramdisk) - yield self._create_vbd(vm_ref, vdi_ref, 0, True) - if network_ref: - yield self._create_vif(vm_ref, network_ref, instance.mac_address) - logging.debug('Starting VM %s...', vm_ref) - yield self._call_xenapi('VM.start', vm_ref, False, False) - logging.info('Spawning VM %s created %s.', instance.name, vm_ref) - - @defer.inlineCallbacks - def _create_vm(self, instance, kernel, ramdisk): - """Create a VM record. Returns a Deferred that gives the new - VM reference.""" - - instance_type = instance_types.INSTANCE_TYPES[instance.instance_type] - mem = str(long(instance_type['memory_mb']) * 1024 * 1024) - vcpus = str(instance_type['vcpus']) - rec = { - 'name_label': instance.name, - 'name_description': '', - 'is_a_template': False, - 'memory_static_min': '0', - 'memory_static_max': mem, - 'memory_dynamic_min': mem, - 'memory_dynamic_max': mem, - 'VCPUs_at_startup': vcpus, - 'VCPUs_max': vcpus, - 'VCPUs_params': {}, - 'actions_after_shutdown': 'destroy', - 'actions_after_reboot': 'restart', - 'actions_after_crash': 'destroy', - 'PV_bootloader': '', - 'PV_kernel': kernel, - 'PV_ramdisk': ramdisk, - 'PV_args': 'root=/dev/xvda1', - 'PV_bootloader_args': '', - 'PV_legacy_args': '', - 'HVM_boot_policy': '', - 'HVM_boot_params': {}, - 'platform': {}, - 'PCI_bus': '', - 'recommendations': '', - 'affinity': '', - 'user_version': '0', - 'other_config': {}, - } - logging.debug('Created VM %s...', instance.name) - vm_ref = yield self._call_xenapi('VM.create', rec) - logging.debug('Created VM %s as %s.', instance.name, vm_ref) - defer.returnValue(vm_ref) - - @defer.inlineCallbacks - def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): - """Create a VBD record. Returns a Deferred that gives the new - VBD reference.""" - - vbd_rec = {} - vbd_rec['VM'] = vm_ref - vbd_rec['VDI'] = vdi_ref - vbd_rec['userdevice'] = str(userdevice) - vbd_rec['bootable'] = bootable - vbd_rec['mode'] = 'RW' - vbd_rec['type'] = 'disk' - vbd_rec['unpluggable'] = True - vbd_rec['empty'] = False - vbd_rec['other_config'] = {} - vbd_rec['qos_algorithm_type'] = '' - vbd_rec['qos_algorithm_params'] = {} - vbd_rec['qos_supported_algorithms'] = [] - logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec) - logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, - vdi_ref) - defer.returnValue(vbd_ref) - - @defer.inlineCallbacks - def _create_vif(self, vm_ref, network_ref, mac_address): - """Create a VIF record. Returns a Deferred that gives the new - VIF reference.""" - - vif_rec = {} - vif_rec['device'] = '0' - vif_rec['network'] = network_ref - vif_rec['VM'] = vm_ref - vif_rec['MAC'] = mac_address - vif_rec['MTU'] = '1500' - vif_rec['other_config'] = {} - vif_rec['qos_algorithm_type'] = '' - vif_rec['qos_algorithm_params'] = {} - logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, - network_ref) - vif_ref = yield self._call_xenapi('VIF.create', vif_rec) - logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, - vm_ref, network_ref) - defer.returnValue(vif_ref) - - @defer.inlineCallbacks - def _find_network_with_bridge(self, bridge): - expr = 'field "bridge" = "%s"' % bridge - networks = yield self._call_xenapi('network.get_all_records_where', - expr) - if len(networks) == 1: - defer.returnValue(networks.keys()[0]) - elif len(networks) > 1: - raise Exception('Found non-unique network for bridge %s' % bridge) - else: - raise Exception('Found no network for bridge %s' % bridge) - - @defer.inlineCallbacks - def _fetch_image(self, image, user, project, use_sr): - """use_sr: True to put the image as a VDI in an SR, False to place - it on dom0's filesystem. The former is for VM disks, the latter for - its kernel and ramdisk (if external kernels are being used). - Returns a Deferred that gives the new VDI UUID.""" - - url = images.image_url(image) - access = AuthManager().get_access_key(user, project) - logging.debug("Asking xapi to fetch %s as %s" % (url, access)) - fn = use_sr and 'get_vdi' or 'get_kernel' - args = {} - args['src_url'] = url - args['username'] = access - args['password'] = user.secret - if use_sr: - args['add_partition'] = 'true' - task = yield self._async_call_plugin('objectstore', fn, args) - uuid = yield self._wait_for_task(task) - defer.returnValue(uuid) - - @defer.inlineCallbacks - def reboot(self, instance): - vm = yield self._lookup(instance.name) - if vm is None: - raise Exception('instance not present %s' % instance.name) - task = yield self._call_xenapi('Async.VM.clean_reboot', vm) - yield self._wait_for_task(task) - - @defer.inlineCallbacks - def destroy(self, instance): - vm = yield self._lookup(instance.name) - if vm is None: - # Don't complain, just return. This lets us clean up instances - # that have already disappeared from the underlying platform. - defer.returnValue(None) - # Get the VDIs related to the VM - vdis = yield self._lookup_vm_vdis(vm) - try: - task = yield self._call_xenapi('Async.VM.hard_shutdown', vm) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - # Disk clean-up - if vdis: - for vdi in vdis: - try: - task = yield self._call_xenapi('Async.VDI.destroy', vdi) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - try: - task = yield self._call_xenapi('Async.VM.destroy', vm) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - - def get_info(self, instance_id): - vm = self._lookup_blocking(instance_id) - if vm is None: - raise Exception('instance not present %s' % instance_id) - rec = self._conn.xenapi.VM.get_record(vm) - return {'state': XENAPI_POWER_STATE[rec['power_state']], - 'max_mem': long(rec['memory_static_max']) >> 10, - 'mem': long(rec['memory_dynamic_max']) >> 10, - 'num_cpu': rec['VCPUs_max'], - 'cpu_time': 0} - - def get_console_output(self, instance): - return 'FAKE CONSOLE OUTPUT' - - @utils.deferredToThread - def _lookup(self, i): - return self._lookup_blocking(i) - - def _lookup_blocking(self, i): - vms = self._conn.xenapi.VM.get_by_name_label(i) - n = len(vms) - if n == 0: - return None - elif n > 1: - raise Exception('duplicate name found: %s' % i) - else: - return vms[0] - - @utils.deferredToThread - def _lookup_vm_vdis(self, vm): - return self._lookup_vm_vdis_blocking(vm) - - def _lookup_vm_vdis_blocking(self, vm): - # Firstly we get the VBDs, then the VDIs. - # TODO: do we leave the read-only devices? - vbds = self._conn.xenapi.VM.get_VBDs(vm) - vdis = [] - if vbds: - for vbd in vbds: - try: - vdi = self._conn.xenapi.VBD.get_VDI(vbd) - # Test valid VDI - record = self._conn.xenapi.VDI.get_record(vdi) - except Exception, exc: - logging.warn(exc) - else: - vdis.append(vdi) - if len(vdis) > 0: - return vdis - else: - return None - - def _wait_for_task(self, task): - """Return a Deferred that will give the result of the given task. - The task is polled until it completes.""" - d = defer.Deferred() - reactor.callLater(0, self._poll_task, task, d) - return d - - @utils.deferredToThread - def _poll_task(self, task, deferred): - """Poll the given XenAPI task, and fire the given Deferred if we - get a result.""" - try: - #logging.debug('Polling task %s...', task) - status = self._conn.xenapi.task.get_status(task) - if status == 'pending': - reactor.callLater(FLAGS.xenapi_task_poll_interval, - self._poll_task, task, deferred) - elif status == 'success': - result = self._conn.xenapi.task.get_result(task) - logging.info('Task %s status: success. %s', task, result) - deferred.callback(_parse_xmlrpc_value(result)) - else: - error_info = self._conn.xenapi.task.get_error_info(task) - logging.warn('Task %s status: %s. %s', task, status, - error_info) - deferred.errback(XenAPI.Failure(error_info)) - #logging.debug('Polling task %s done.', task) - except Exception, exc: - logging.warn(exc) - deferred.errback(exc) - - @utils.deferredToThread - def _call_xenapi(self, method, *args): - """Call the specified XenAPI method on a background thread. Returns - a Deferred for the result.""" - f = self._conn.xenapi - for m in method.split('.'): - f = f.__getattr__(m) - return f(*args) - - @utils.deferredToThread - def _async_call_plugin(self, plugin, fn, args): - """Call Async.host.call_plugin on a background thread. Returns a - Deferred with the task reference.""" - return _unwrap_plugin_exceptions( - self._conn.xenapi.Async.host.call_plugin, - self._get_xenapi_host(), plugin, fn, args) - - def _get_xenapi_host(self): - return self._conn.xenapi.session.get_this_host(self._conn.handle) - - -def _unwrap_plugin_exceptions(func, *args, **kwargs): - try: - return func(*args, **kwargs) - except XenAPI.Failure, exc: - logging.debug("Got exception: %s", exc) - if (len(exc.details) == 4 and - exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and - exc.details[2] == 'Failure'): - params = None - try: - params = eval(exc.details[3]) - except: - raise exc - raise XenAPI.Failure(params) - else: - raise - except xmlrpclib.ProtocolError, exc: - logging.debug("Got exception: %s", exc) - raise - - -def _parse_xmlrpc_value(val): - """Parse the given value as if it were an XML-RPC value. This is - sometimes used as the format for the task.result field.""" - if not val: - return val - x = xmlrpclib.loads( - '<?xml version="1.0"?><methodResponse><params><param>' + - val + - '</param></params></methodResponse>') - return x[0][0] diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py new file mode 100644 index 000000000..3d598c463 --- /dev/null +++ b/nova/virt/xenapi/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py new file mode 100644 index 000000000..012954394 --- /dev/null +++ b/nova/virt/xenapi/network_utils.py @@ -0,0 +1,41 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of network +records and their attributes like bridges, PIFs, QoS, as well as +their lookup functions. +""" + + +class NetworkHelper(): + """ + The class that wraps the helper methods together. + """ + def __init__(self): + return + + @classmethod + def find_network_with_bridge(cls, session, bridge): + """ Return the network on which the bridge is attached, if found.""" + expr = 'field "bridge" = "%s"' % bridge + networks = session.call_xenapi('network.get_all_records_where', expr) + if len(networks) == 1: + return networks.keys()[0] + elif len(networks) > 1: + raise Exception('Found non-unique network for bridge %s' % bridge) + else: + raise Exception('Found no network for bridge %s' % bridge) diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py new file mode 100644 index 000000000..2f5d78e75 --- /dev/null +++ b/nova/virt/xenapi/vm_utils.py @@ -0,0 +1,260 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of VM records and +their attributes like VDIs, VIFs, as well as their lookup functions. +""" + +import logging +import urllib +from xml.dom import minidom + +from nova import flags +from nova import utils +from nova.auth.manager import AuthManager +from nova.compute import instance_types +from nova.compute import power_state +from nova.virt import images + + +FLAGS = flags.FLAGS + +XENAPI_POWER_STATE = { + 'Halted': power_state.SHUTDOWN, + 'Running': power_state.RUNNING, + 'Paused': power_state.PAUSED, + 'Suspended': power_state.SHUTDOWN, # FIXME + 'Crashed': power_state.CRASHED} + +XenAPI = None + + +class VMHelper(): + """ + The class that wraps the helper methods together. + """ + def __init__(self): + return + + @classmethod + def late_import(cls): + """ + Load the XenAPI module in for helper class, if required. + This is to avoid to install the XenAPI library when other + hypervisors are used + """ + global XenAPI + if XenAPI is None: + XenAPI = __import__('XenAPI') + + @classmethod + def create_vm(cls, session, instance, kernel, ramdisk): + """Create a VM record. Returns a Deferred that gives the new + VM reference.""" + + instance_type = instance_types.INSTANCE_TYPES[instance.instance_type] + mem = str(long(instance_type['memory_mb']) * 1024 * 1024) + vcpus = str(instance_type['vcpus']) + rec = { + 'name_label': instance.name, + 'name_description': '', + 'is_a_template': False, + 'memory_static_min': '0', + 'memory_static_max': mem, + 'memory_dynamic_min': mem, + 'memory_dynamic_max': mem, + 'VCPUs_at_startup': vcpus, + 'VCPUs_max': vcpus, + 'VCPUs_params': {}, + 'actions_after_shutdown': 'destroy', + 'actions_after_reboot': 'restart', + 'actions_after_crash': 'destroy', + 'PV_bootloader': '', + 'PV_kernel': kernel, + 'PV_ramdisk': ramdisk, + 'PV_args': 'root=/dev/xvda1', + 'PV_bootloader_args': '', + 'PV_legacy_args': '', + 'HVM_boot_policy': '', + 'HVM_boot_params': {}, + 'platform': {}, + 'PCI_bus': '', + 'recommendations': '', + 'affinity': '', + 'user_version': '0', + 'other_config': {}, + } + logging.debug('Created VM %s...', instance.name) + vm_ref = session.call_xenapi('VM.create', rec) + logging.debug('Created VM %s as %s.', instance.name, vm_ref) + return vm_ref + + @classmethod + def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable): + """Create a VBD record. Returns a Deferred that gives the new + VBD reference.""" + + vbd_rec = {} + vbd_rec['VM'] = vm_ref + vbd_rec['VDI'] = vdi_ref + vbd_rec['userdevice'] = str(userdevice) + vbd_rec['bootable'] = bootable + vbd_rec['mode'] = 'RW' + vbd_rec['type'] = 'disk' + vbd_rec['unpluggable'] = True + vbd_rec['empty'] = False + vbd_rec['other_config'] = {} + vbd_rec['qos_algorithm_type'] = '' + vbd_rec['qos_algorithm_params'] = {} + vbd_rec['qos_supported_algorithms'] = [] + logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) + vbd_ref = session.call_xenapi('VBD.create', vbd_rec) + logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, + vdi_ref) + return vbd_ref + + @classmethod + def create_vif(cls, session, vm_ref, network_ref, mac_address): + """Create a VIF record. Returns a Deferred that gives the new + VIF reference.""" + + vif_rec = {} + vif_rec['device'] = '0' + vif_rec['network'] = network_ref + vif_rec['VM'] = vm_ref + vif_rec['MAC'] = mac_address + vif_rec['MTU'] = '1500' + vif_rec['other_config'] = {} + vif_rec['qos_algorithm_type'] = '' + vif_rec['qos_algorithm_params'] = {} + logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, + network_ref) + vif_ref = session.call_xenapi('VIF.create', vif_rec) + logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, + vm_ref, network_ref) + return vif_ref + + @classmethod + def fetch_image(cls, session, image, user, project, use_sr): + """use_sr: True to put the image as a VDI in an SR, False to place + it on dom0's filesystem. The former is for VM disks, the latter for + its kernel and ramdisk (if external kernels are being used). + Returns a Deferred that gives the new VDI UUID.""" + + url = images.image_url(image) + access = AuthManager().get_access_key(user, project) + logging.debug("Asking xapi to fetch %s as %s", url, access) + fn = use_sr and 'get_vdi' or 'get_kernel' + args = {} + args['src_url'] = url + args['username'] = access + args['password'] = user.secret + if use_sr: + args['add_partition'] = 'true' + task = session.async_call_plugin('objectstore', fn, args) + uuid = session.wait_for_task(task) + return uuid + + @classmethod + def lookup(cls, session, i): + """ Look the instance i up, and returns it if available """ + return VMHelper.lookup_blocking(session, i) + + @classmethod + def lookup_blocking(cls, session, i): + """ Synchronous lookup """ + vms = session.get_xenapi().VM.get_by_name_label(i) + n = len(vms) + if n == 0: + return None + elif n > 1: + raise Exception('duplicate name found: %s' % i) + else: + return vms[0] + + @classmethod + def lookup_vm_vdis(cls, session, vm): + """ Look for the VDIs that are attached to the VM """ + return VMHelper.lookup_vm_vdis_blocking(session, vm) + + @classmethod + def lookup_vm_vdis_blocking(cls, session, vm): + """ Synchronous lookup_vm_vdis """ + # Firstly we get the VBDs, then the VDIs. + # TODO(Armando): do we leave the read-only devices? + vbds = session.get_xenapi().VM.get_VBDs(vm) + vdis = [] + if vbds: + for vbd in vbds: + try: + vdi = session.get_xenapi().VBD.get_VDI(vbd) + # Test valid VDI + record = session.get_xenapi().VDI.get_record(vdi) + logging.debug('VDI %s is still available', record['uuid']) + except XenAPI.Failure, exc: + logging.warn(exc) + else: + vdis.append(vdi) + if len(vdis) > 0: + return vdis + else: + return None + + @classmethod + def compile_info(cls, record): + return {'state': XENAPI_POWER_STATE[record['power_state']], + 'max_mem': long(record['memory_static_max']) >> 10, + 'mem': long(record['memory_dynamic_max']) >> 10, + 'num_cpu': record['VCPUs_max'], + 'cpu_time': 0} + + @classmethod + def compile_diagnostics(cls, session, record): + """Compile VM diagnostics data""" + try: + host = session.get_xenapi_host() + host_ip = session.get_xenapi().host.get_record(host)["address"] + metrics = session.get_xenapi().VM_guest_metrics.get_record( + record["guest_metrics"]) + diags = { + "Kernel": metrics["os_version"]["uname"], + "Distro": metrics["os_version"]["name"]} + xml = get_rrd(host_ip, record["uuid"]) + if xml: + rrd = minidom.parseString(xml) + for i, node in enumerate(rrd.firstChild.childNodes): + # We don't want all of the extra garbage + if i >= 3 and i <= 11: + ref = node.childNodes + # Name and Value + diags[ref[0].firstChild.data] = ref[6].firstChild.data + return diags + except XenAPI.Failure as e: + return {"Unable to retrieve diagnostics": e} + + +def get_rrd(host, uuid): + """Return the VM RRD XML as a string""" + try: + xml = urllib.urlopen("http://%s:%s@%s/vm_rrd?uuid=%s" % ( + FLAGS.xenapi_connection_username, + FLAGS.xenapi_connection_password, + host, + uuid)) + return xml.read() + except IOError: + return None diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py new file mode 100644 index 000000000..3034df9e1 --- /dev/null +++ b/nova/virt/xenapi/vmops.py @@ -0,0 +1,139 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for VM-related functions (spawn, reboot, etc). +""" + +import logging + +from nova import db +from nova import context + +from nova.auth.manager import AuthManager +from nova.virt.xenapi.network_utils import NetworkHelper +from nova.virt.xenapi.vm_utils import VMHelper + +XenAPI = None + + +class VMOps(object): + """ + Management class for VM-related tasks + """ + def __init__(self, session): + global XenAPI + if XenAPI is None: + XenAPI = __import__('XenAPI') + self._session = session + # Load XenAPI module in the helper class + VMHelper.late_import() + + def list_instances(self): + """ List VM instances """ + return [self._session.get_xenapi().VM.get_name_label(vm) \ + for vm in self._session.get_xenapi().VM.get_all()] + + def spawn(self, instance): + """ Create VM instance """ + vm = VMHelper.lookup(self._session, instance.name) + if vm is not None: + raise Exception('Attempted to create non-unique name %s' % + instance.name) + + bridge = db.project_get_network(context.get_admin_context(), + instance.project_id).bridge + network_ref = \ + NetworkHelper.find_network_with_bridge(self._session, bridge) + + user = AuthManager().get_user(instance.user_id) + project = AuthManager().get_project(instance.project_id) + vdi_uuid = VMHelper.fetch_image( + self._session, instance.image_id, user, project, True) + kernel = VMHelper.fetch_image( + self._session, instance.kernel_id, user, project, False) + ramdisk = VMHelper.fetch_image( + self._session, instance.ramdisk_id, user, project, False) + vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid) + vm_ref = VMHelper.create_vm( + self._session, instance, kernel, ramdisk) + VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True) + if network_ref: + VMHelper.create_vif(self._session, vm_ref, + network_ref, instance.mac_address) + logging.debug('Starting VM %s...', vm_ref) + self._session.call_xenapi('VM.start', vm_ref, False, False) + logging.info('Spawning VM %s created %s.', instance.name, + vm_ref) + + def reboot(self, instance): + """ Reboot VM instance """ + instance_name = instance.name + vm = VMHelper.lookup(self._session, instance_name) + if vm is None: + raise Exception('instance not present %s' % instance_name) + task = self._session.call_xenapi('Async.VM.clean_reboot', vm) + self._session.wait_for_task(task) + + def destroy(self, instance): + """ Destroy VM instance """ + vm = VMHelper.lookup(self._session, instance.name) + if vm is None: + # Don't complain, just return. This lets us clean up instances + # that have already disappeared from the underlying platform. + return + # Get the VDIs related to the VM + vdis = VMHelper.lookup_vm_vdis(self._session, vm) + try: + task = self._session.call_xenapi('Async.VM.hard_shutdown', + vm) + self._session.wait_for_task(task) + except XenAPI.Failure, exc: + logging.warn(exc) + # Disk clean-up + if vdis: + for vdi in vdis: + try: + task = self._session.call_xenapi('Async.VDI.destroy', vdi) + self._session.wait_for_task(task) + except XenAPI.Failure, exc: + logging.warn(exc) + try: + task = self._session.call_xenapi('Async.VM.destroy', vm) + self._session.wait_for_task(task) + except XenAPI.Failure, exc: + logging.warn(exc) + + def get_info(self, instance_id): + """ Return data about VM instance """ + vm = VMHelper.lookup_blocking(self._session, instance_id) + if vm is None: + raise Exception('instance not present %s' % instance_id) + rec = self._session.get_xenapi().VM.get_record(vm) + return VMHelper.compile_info(rec) + + def get_diagnostics(self, instance_id): + """Return data about VM diagnostics""" + vm = VMHelper.lookup(self._session, instance_id) + if vm is None: + raise Exception("instance not present %s" % instance_id) + rec = self._session.get_xenapi().VM.get_record(vm) + return VMHelper.compile_diagnostics(self._session, rec) + + def get_console_output(self, instance): + """ Return snapshot of console """ + # TODO: implement this to fix pylint! + return 'FAKE CONSOLE OUTPUT of instance' diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py new file mode 100644 index 000000000..a4c7a3861 --- /dev/null +++ b/nova/virt/xenapi/volumeops.py @@ -0,0 +1,32 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for Storage-related functions (attach, detach, etc). +""" + + +class VolumeOps(object): + def __init__(self, session): + self._session = session + + def attach_volume(self, instance_name, device_path, mountpoint): + # FIXME: that's going to be sorted when iscsi-xenapi lands in branch + return True + + def detach_volume(self, instance_name, mountpoint): + # FIXME: that's going to be sorted when iscsi-xenapi lands in branch + return True diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py new file mode 100644 index 000000000..6beb08f5e --- /dev/null +++ b/nova/virt/xenapi_conn.py @@ -0,0 +1,241 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A connection to XenServer or Xen Cloud Platform. + +The concurrency model for this class is as follows: + +All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator +deferredToThread). They are remote calls, and so may hang for the usual +reasons. They should not be allowed to block the reactor thread. + +All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async +(using XenAPI.VM.async_start etc). These return a task, which can then be +polled for completion. Polling is handled using reactor.callLater. + +This combination of techniques means that we don't block the reactor thread at +all, and at the same time we don't hold lots of threads waiting for +long-running operations. + +FIXME: get_info currently doesn't conform to these rules, and will block the +reactor thread if the VM.get_by_name_label or VM.get_record calls block. + +**Related Flags** + +:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. +:xenapi_connection_username: Username for connection to XenServer/Xen Cloud + Platform (default: root). +:xenapi_connection_password: Password for connection to XenServer/Xen Cloud + Platform. +:xenapi_task_poll_interval: The interval (seconds) used for polling of + remote tasks (Async.VM.start, etc) + (default: 0.5). + +""" + +import logging +import sys +import xmlrpclib + +from eventlet import event +from eventlet import tpool + +from nova import utils +from nova import flags +from nova.virt.xenapi.vmops import VMOps +from nova.virt.xenapi.volumeops import VolumeOps + +FLAGS = flags.FLAGS +flags.DEFINE_string('xenapi_connection_url', + None, + 'URL for connection to XenServer/Xen Cloud Platform.' + ' Required if connection_type=xenapi.') +flags.DEFINE_string('xenapi_connection_username', + 'root', + 'Username for connection to XenServer/Xen Cloud Platform.' + ' Used only if connection_type=xenapi.') +flags.DEFINE_string('xenapi_connection_password', + None, + 'Password for connection to XenServer/Xen Cloud Platform.' + ' Used only if connection_type=xenapi.') +flags.DEFINE_float('xenapi_task_poll_interval', + 0.5, + 'The interval used for polling of remote tasks ' + '(Async.VM.start, etc). Used only if ' + 'connection_type=xenapi.') + +XenAPI = None + + +def get_connection(_): + """Note that XenAPI doesn't have a read-only connection mode, so + the read_only parameter is ignored.""" + # This is loaded late so that there's no need to install this + # library when not using XenAPI. + global XenAPI + if XenAPI is None: + XenAPI = __import__('XenAPI') + url = FLAGS.xenapi_connection_url + username = FLAGS.xenapi_connection_username + password = FLAGS.xenapi_connection_password + if not url or password is None: + raise Exception('Must specify xenapi_connection_url, ' + 'xenapi_connection_username (optionally), and ' + 'xenapi_connection_password to use ' + 'connection_type=xenapi') + return XenAPIConnection(url, username, password) + + +class XenAPIConnection(object): + """ A connection to XenServer or Xen Cloud Platform """ + def __init__(self, url, user, pw): + session = XenAPISession(url, user, pw) + self._vmops = VMOps(session) + self._volumeops = VolumeOps(session) + + def list_instances(self): + """ List VM instances """ + return self._vmops.list_instances() + + def spawn(self, instance): + """ Create VM instance """ + self._vmops.spawn(instance) + + def reboot(self, instance): + """ Reboot VM instance """ + self._vmops.reboot(instance) + + def destroy(self, instance): + """ Destroy VM instance """ + self._vmops.destroy(instance) + + def get_info(self, instance_id): + """ Return data about VM instance """ + return self._vmops.get_info(instance_id) + + def get_diagnostics(self, instance_id): + """Return data about VM diagnostics""" + return self._vmops.get_diagnostics(instance_id) + + def get_console_output(self, instance): + """ Return snapshot of console """ + return self._vmops.get_console_output(instance) + + def attach_volume(self, instance_name, device_path, mountpoint): + """ Attach volume storage to VM instance """ + return self._volumeops.attach_volume(instance_name, + device_path, + mountpoint) + + def detach_volume(self, instance_name, mountpoint): + """ Detach volume storage to VM instance """ + return self._volumeops.detach_volume(instance_name, mountpoint) + + +class XenAPISession(object): + """ The session to invoke XenAPI SDK calls """ + def __init__(self, url, user, pw): + self._session = XenAPI.Session(url) + self._session.login_with_password(user, pw) + + def get_xenapi(self): + """ Return the xenapi object """ + return self._session.xenapi + + def get_xenapi_host(self): + """ Return the xenapi host """ + return self._session.xenapi.session.get_this_host(self._session.handle) + + def call_xenapi(self, method, *args): + """Call the specified XenAPI method on a background thread.""" + f = self._session.xenapi + for m in method.split('.'): + f = f.__getattr__(m) + return tpool.execute(f, *args) + + def async_call_plugin(self, plugin, fn, args): + """Call Async.host.call_plugin on a background thread.""" + return tpool.execute(_unwrap_plugin_exceptions, + self._session.xenapi.Async.host.call_plugin, + self.get_xenapi_host(), plugin, fn, args) + + def wait_for_task(self, task): + """Return a Deferred that will give the result of the given task. + The task is polled until it completes.""" + + done = event.Event() + loop = utils.LoopingCall(self._poll_task, task, done) + loop.start(FLAGS.xenapi_task_poll_interval, now=True) + rv = done.wait() + loop.stop() + return rv + + def _poll_task(self, task, done): + """Poll the given XenAPI task, and fire the given Deferred if we + get a result.""" + try: + #logging.debug('Polling task %s...', task) + status = self._session.xenapi.task.get_status(task) + if status == 'pending': + return + elif status == 'success': + result = self._session.xenapi.task.get_result(task) + logging.info('Task %s status: success. %s', task, result) + done.send(_parse_xmlrpc_value(result)) + else: + error_info = self._session.xenapi.task.get_error_info(task) + logging.warn('Task %s status: %s. %s', task, status, + error_info) + done.send_exception(XenAPI.Failure(error_info)) + #logging.debug('Polling task %s done.', task) + except XenAPI.Failure, exc: + logging.warn(exc) + done.send_exception(*sys.exc_info()) + + +def _unwrap_plugin_exceptions(func, *args, **kwargs): + """ Parse exception details """ + try: + return func(*args, **kwargs) + except XenAPI.Failure, exc: + logging.debug("Got exception: %s", exc) + if (len(exc.details) == 4 and + exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and + exc.details[2] == 'Failure'): + params = None + try: + params = eval(exc.details[3]) + except: + raise exc + raise XenAPI.Failure(params) + else: + raise + except xmlrpclib.ProtocolError, exc: + logging.debug("Got exception: %s", exc) + raise + + +def _parse_xmlrpc_value(val): + """Parse the given value as if it were an XML-RPC value. This is + sometimes used as the format for the task.result field.""" + if not val: + return val + x = xmlrpclib.loads( + '<?xml version="1.0"?><methodResponse><params><param>' + + val + + '</param></params></methodResponse>') + return x[0][0] |
