diff options
| author | Tushar Patil <tushar.vitthal.patil@gmail.com> | 2011-03-14 12:09:57 -0700 |
|---|---|---|
| committer | Tushar Patil <tushar.vitthal.patil@gmail.com> | 2011-03-14 12:09:57 -0700 |
| commit | 11086de8ac1606e490bccbca6de1befe34c1d980 (patch) | |
| tree | 6bf91aa2805c6f026b06ddcd17c3759f433fa224 /nova/virt | |
| parent | 20c0b63425090621e86f913bba3dcec79dc191c7 (diff) | |
| parent | 8c2a4a565e718e594a2f42ff84eb4b9017ef15a7 (diff) | |
removed conflicts and merged with trunk
Diffstat (limited to 'nova/virt')
| -rw-r--r-- | nova/virt/cpuinfo.xml.template | 9 | ||||
| -rw-r--r-- | nova/virt/fake.py | 21 | ||||
| -rw-r--r-- | nova/virt/libvirt_conn.py | 369 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 101 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 193 | ||||
| -rw-r--r-- | nova/virt/xenapi/volume_utils.py | 6 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 27 |
7 files changed, 575 insertions, 151 deletions
diff --git a/nova/virt/cpuinfo.xml.template b/nova/virt/cpuinfo.xml.template new file mode 100644 index 000000000..48842b29d --- /dev/null +++ b/nova/virt/cpuinfo.xml.template @@ -0,0 +1,9 @@ +<cpu> + <arch>$arch</arch> + <model>$model</model> + <vendor>$vendor</vendor> + <topology sockets="$topology.sockets" cores="$topology.cores" threads="$topology.threads"/> +#for $var in $features + <features name="$var" /> +#end for +</cpu> diff --git a/nova/virt/fake.py b/nova/virt/fake.py index c744acf91..3a06284a1 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -407,6 +407,27 @@ class FakeConnection(object): """ return True + def update_available_resource(self, ctxt, host): + """This method is supported only by libvirt.""" + return + + def compare_cpu(self, xml): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def ensure_filtering_rules_for_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def live_migration(self, context, instance_ref, dest, + post_method, recover_method): + """This method is supported only by libvirt.""" + return + + def unfilter_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + class FakeInstance(object): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 5e70951bd..9efbb3342 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -36,10 +36,13 @@ Supports KVM, QEMU, UML, and XEN. """ +import multiprocessing import os import shutil +import sys import random import subprocess +import time import uuid from xml.dom import minidom @@ -69,6 +72,7 @@ Template = None LOG = logging.getLogger('nova.virt.libvirt_conn') FLAGS = flags.FLAGS +flags.DECLARE('live_migration_retry_count', 'nova.compute.manager') # TODO(vish): These flags should probably go into a shared location flags.DEFINE_string('rescue_image_id', 'ami-rescue', 'Rescue ami image') flags.DEFINE_string('rescue_kernel_id', 'aki-rescue', 'Rescue aki image') @@ -99,6 +103,17 @@ flags.DEFINE_string('ajaxterm_portrange', flags.DEFINE_string('firewall_driver', 'nova.virt.libvirt_conn.IptablesFirewallDriver', 'Firewall driver (defaults to iptables)') +flags.DEFINE_string('cpuinfo_xml_template', + utils.abspath('virt/cpuinfo.xml.template'), + 'CpuInfo XML Template (Used only live migration now)') +flags.DEFINE_string('live_migration_uri', + "qemu+tcp://%s/system", + 'Define protocol used by live_migration feature') +flags.DEFINE_string('live_migration_flag', + "VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER", + 'Define live migration behavior.') +flags.DEFINE_integer('live_migration_bandwidth', 0, + 'Define live migration behavior') def get_connection(read_only): @@ -146,6 +161,7 @@ class LibvirtConnection(object): self.libvirt_xml = open(FLAGS.libvirt_xml_template).read() self.interfaces_xml = open(FLAGS.injected_network_template).read() + self.cpuinfo_xml = open(FLAGS.cpuinfo_xml_template).read() self._wrapped_conn = None self.read_only = read_only @@ -859,6 +875,158 @@ class LibvirtConnection(object): return interfaces + def get_vcpu_total(self): + """Get vcpu number of physical computer. + + :returns: the number of cpu core. + + """ + + # On certain platforms, this will raise a NotImplementedError. + try: + return multiprocessing.cpu_count() + except NotImplementedError: + LOG.warn(_("Cannot get the number of cpu, because this " + "function is not implemented for this platform. " + "This error can be safely ignored for now.")) + return 0 + + def get_memory_mb_total(self): + """Get the total memory size(MB) of physical computer. + + :returns: the total amount of memory(MB). + + """ + + if sys.platform.upper() != 'LINUX2': + return 0 + + meminfo = open('/proc/meminfo').read().split() + idx = meminfo.index('MemTotal:') + # transforming kb to mb. + return int(meminfo[idx + 1]) / 1024 + + def get_local_gb_total(self): + """Get the total hdd size(GB) of physical computer. + + :returns: + The total amount of HDD(GB). + Note that this value shows a partition where + NOVA-INST-DIR/instances mounts. + + """ + + hddinfo = os.statvfs(FLAGS.instances_path) + return hddinfo.f_frsize * hddinfo.f_blocks / 1024 / 1024 / 1024 + + def get_vcpu_used(self): + """ Get vcpu usage number of physical computer. + + :returns: The total number of vcpu that currently used. + + """ + + total = 0 + for dom_id in self._conn.listDomainsID(): + dom = self._conn.lookupByID(dom_id) + total += len(dom.vcpus()[1]) + return total + + def get_memory_mb_used(self): + """Get the free memory size(MB) of physical computer. + + :returns: the total usage of memory(MB). + + """ + + if sys.platform.upper() != 'LINUX2': + return 0 + + m = open('/proc/meminfo').read().split() + idx1 = m.index('MemFree:') + idx2 = m.index('Buffers:') + idx3 = m.index('Cached:') + avail = (int(m[idx1 + 1]) + int(m[idx2 + 1]) + int(m[idx3 + 1])) / 1024 + return self.get_memory_mb_total() - avail + + def get_local_gb_used(self): + """Get the free hdd size(GB) of physical computer. + + :returns: + The total usage of HDD(GB). + Note that this value shows a partition where + NOVA-INST-DIR/instances mounts. + + """ + + hddinfo = os.statvfs(FLAGS.instances_path) + avail = hddinfo.f_frsize * hddinfo.f_bavail / 1024 / 1024 / 1024 + return self.get_local_gb_total() - avail + + def get_hypervisor_type(self): + """Get hypervisor type. + + :returns: hypervisor type (ex. qemu) + + """ + + return self._conn.getType() + + def get_hypervisor_version(self): + """Get hypervisor version. + + :returns: hypervisor version (ex. 12003) + + """ + + return self._conn.getVersion() + + def get_cpu_info(self): + """Get cpuinfo information. + + Obtains cpu feature from virConnect.getCapabilities, + and returns as a json string. + + :return: see above description + + """ + + xml = self._conn.getCapabilities() + xml = libxml2.parseDoc(xml) + nodes = xml.xpathEval('//cpu') + if len(nodes) != 1: + raise exception.Invalid(_("Invalid xml. '<cpu>' must be 1," + "but %d\n") % len(nodes) + + xml.serialize()) + + cpu_info = dict() + cpu_info['arch'] = xml.xpathEval('//cpu/arch')[0].getContent() + cpu_info['model'] = xml.xpathEval('//cpu/model')[0].getContent() + cpu_info['vendor'] = xml.xpathEval('//cpu/vendor')[0].getContent() + + topology_node = xml.xpathEval('//cpu/topology')[0].get_properties() + topology = dict() + while topology_node != None: + name = topology_node.get_name() + topology[name] = topology_node.getContent() + topology_node = topology_node.get_next() + + keys = ['cores', 'sockets', 'threads'] + tkeys = topology.keys() + if list(set(tkeys)) != list(set(keys)): + ks = ', '.join(keys) + raise exception.Invalid(_("Invalid xml: topology(%(topology)s) " + "must have %(ks)s") % locals()) + + feature_nodes = xml.xpathEval('//cpu/feature') + features = list() + for nodes in feature_nodes: + features.append(nodes.get_properties().getContent()) + + cpu_info['topology'] = topology + cpu_info['features'] = features + return utils.dumps(cpu_info) + def block_stats(self, instance_name, disk): """ Note that this function takes an instance name, not an Instance, so @@ -889,6 +1057,207 @@ class LibvirtConnection(object): def refresh_security_group_members(self, security_group_id): self.firewall_driver.refresh_security_group_members(security_group_id) + def update_available_resource(self, ctxt, host): + """Updates compute manager resource info on ComputeNode table. + + This method is called when nova-coompute launches, and + whenever admin executes "nova-manage service update_resource". + + :param ctxt: security context + :param host: hostname that compute manager is currently running + + """ + + try: + service_ref = db.service_get_all_compute_by_host(ctxt, host)[0] + except exception.NotFound: + raise exception.Invalid(_("Cannot update compute manager " + "specific info, because no service " + "record was found.")) + + # Updating host information + dic = {'vcpus': self.get_vcpu_total(), + 'memory_mb': self.get_memory_mb_total(), + 'local_gb': self.get_local_gb_total(), + 'vcpus_used': self.get_vcpu_used(), + 'memory_mb_used': self.get_memory_mb_used(), + 'local_gb_used': self.get_local_gb_used(), + 'hypervisor_type': self.get_hypervisor_type(), + 'hypervisor_version': self.get_hypervisor_version(), + 'cpu_info': self.get_cpu_info()} + + compute_node_ref = service_ref['compute_node'] + if not compute_node_ref: + LOG.info(_('Compute_service record created for %s ') % host) + dic['service_id'] = service_ref['id'] + db.compute_node_create(ctxt, dic) + else: + LOG.info(_('Compute_service record updated for %s ') % host) + db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic) + + def compare_cpu(self, cpu_info): + """Checks the host cpu is compatible to a cpu given by xml. + + "xml" must be a part of libvirt.openReadonly().getCapabilities(). + return values follows by virCPUCompareResult. + if 0 > return value, do live migration. + 'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult' + + :param cpu_info: json string that shows cpu feature(see get_cpu_info()) + :returns: + None. if given cpu info is not compatible to this server, + raise exception. + + """ + + LOG.info(_('Instance launched has CPU info:\n%s') % cpu_info) + dic = utils.loads(cpu_info) + xml = str(Template(self.cpuinfo_xml, searchList=dic)) + LOG.info(_('to xml...\n:%s ' % xml)) + + u = "http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult" + m = _("CPU doesn't have compatibility.\n\n%(ret)s\n\nRefer to %(u)s") + # unknown character exists in xml, then libvirt complains + try: + ret = self._conn.compareCPU(xml, 0) + except libvirt.libvirtError, e: + ret = e.message + LOG.error(m % locals()) + raise + + if ret <= 0: + raise exception.Invalid(m % locals()) + + return + + def ensure_filtering_rules_for_instance(self, instance_ref): + """Setting up filtering rules and waiting for its completion. + + To migrate an instance, filtering rules to hypervisors + and firewalls are inevitable on destination host. + ( Waiting only for filterling rules to hypervisor, + since filtering rules to firewall rules can be set faster). + + Concretely, the below method must be called. + - setup_basic_filtering (for nova-basic, etc.) + - prepare_instance_filter(for nova-instance-instance-xxx, etc.) + + to_xml may have to be called since it defines PROJNET, PROJMASK. + but libvirt migrates those value through migrateToURI(), + so , no need to be called. + + Don't use thread for this method since migration should + not be started when setting-up filtering rules operations + are not completed. + + :params instance_ref: nova.db.sqlalchemy.models.Instance object + + """ + + # If any instances never launch at destination host, + # basic-filtering must be set here. + self.firewall_driver.setup_basic_filtering(instance_ref) + # setting up n)ova-instance-instance-xx mainly. + self.firewall_driver.prepare_instance_filter(instance_ref) + + # wait for completion + timeout_count = range(FLAGS.live_migration_retry_count) + while timeout_count: + try: + filter_name = 'nova-instance-%s' % instance_ref.name + self._conn.nwfilterLookupByName(filter_name) + break + except libvirt.libvirtError: + timeout_count.pop() + if len(timeout_count) == 0: + ec2_id = instance_ref['hostname'] + iname = instance_ref.name + msg = _('Timeout migrating for %(ec2_id)s(%(iname)s)') + raise exception.Error(msg % locals()) + time.sleep(1) + + def live_migration(self, ctxt, instance_ref, dest, + post_method, recover_method): + """Spawning live_migration operation for distributing high-load. + + :params ctxt: security context + :params instance_ref: + nova.db.sqlalchemy.models.Instance object + instance object that is migrated. + :params dest: destination host + :params post_method: + post operation method. + expected nova.compute.manager.post_live_migration. + :params recover_method: + recovery method when any exception occurs. + expected nova.compute.manager.recover_live_migration. + + """ + + greenthread.spawn(self._live_migration, ctxt, instance_ref, dest, + post_method, recover_method) + + def _live_migration(self, ctxt, instance_ref, dest, + post_method, recover_method): + """Do live migration. + + :params ctxt: security context + :params instance_ref: + nova.db.sqlalchemy.models.Instance object + instance object that is migrated. + :params dest: destination host + :params post_method: + post operation method. + expected nova.compute.manager.post_live_migration. + :params recover_method: + recovery method when any exception occurs. + expected nova.compute.manager.recover_live_migration. + + """ + + # Do live migration. + try: + flaglist = FLAGS.live_migration_flag.split(',') + flagvals = [getattr(libvirt, x.strip()) for x in flaglist] + logical_sum = reduce(lambda x, y: x | y, flagvals) + + if self.read_only: + tmpconn = self._connect(self.libvirt_uri, False) + dom = tmpconn.lookupByName(instance_ref.name) + dom.migrateToURI(FLAGS.live_migration_uri % dest, + logical_sum, + None, + FLAGS.live_migration_bandwidth) + tmpconn.close() + else: + dom = self._conn.lookupByName(instance_ref.name) + dom.migrateToURI(FLAGS.live_migration_uri % dest, + logical_sum, + None, + FLAGS.live_migration_bandwidth) + + except Exception: + recover_method(ctxt, instance_ref) + raise + + # Waiting for completion of live_migration. + timer = utils.LoopingCall(f=None) + + def wait_for_live_migration(): + """waiting for live migration completion""" + try: + self.get_info(instance_ref.name)['state'] + except exception.NotFound: + timer.stop() + post_method(ctxt, instance_ref, dest) + + timer.f = wait_for_live_migration + timer.start(interval=0.5, now=True) + + def unfilter_instance(self, instance_ref): + """See comments of same method in firewall_driver.""" + self.firewall_driver.unfilter_instance(instance_ref) + class FirewallDriver(object): def prepare_instance_filter(self, instance): diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 8dd246178..f07b57796 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -202,13 +202,13 @@ class VMHelper(HelperBase): @classmethod def find_vbd_by_number(cls, session, vm_ref, number): """Get the VBD reference from the device number""" - vbds = session.get_xenapi().VM.get_VBDs(vm_ref) - if vbds: - for vbd in vbds: + vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) + if vbd_refs: + for vbd_ref in vbd_refs: try: - vbd_rec = session.get_xenapi().VBD.get_record(vbd) + vbd_rec = session.get_xenapi().VBD.get_record(vbd_ref) if vbd_rec['userdevice'] == str(number): - return vbd + return vbd_ref except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('VBD not found in instance %s') % vm_ref) @@ -443,29 +443,29 @@ class VMHelper(HelperBase): vdi_size += MBR_SIZE_BYTES name_label = get_name_label_for_image(image) - vdi = cls.create_vdi(session, sr_ref, name_label, vdi_size, False) + vdi_ref = cls.create_vdi(session, sr_ref, name_label, vdi_size, False) - with_vdi_attached_here(session, vdi, False, + with_vdi_attached_here(session, vdi_ref, False, lambda dev: _stream_disk(dev, image_type, virtual_size, image_file)) if image_type == ImageType.KERNEL_RAMDISK: #we need to invoke a plugin for copying VDI's #content into proper path - LOG.debug(_("Copying VDI %s to /boot/guest on dom0"), vdi) + LOG.debug(_("Copying VDI %s to /boot/guest on dom0"), vdi_ref) fn = "copy_kernel_vdi" args = {} - args['vdi-ref'] = vdi + args['vdi-ref'] = vdi_ref #let the plugin copy the correct number of bytes args['image-size'] = str(vdi_size) task = session.async_call_plugin('glance', fn, args) filename = session.wait_for_task(task, instance_id) #remove the VDI as it is not needed anymore - session.get_xenapi().VDI.destroy(vdi) - LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi) + session.get_xenapi().VDI.destroy(vdi_ref) + LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref) return filename else: - return session.get_xenapi().VDI.get_uuid(vdi) + return session.get_xenapi().VDI.get_uuid(vdi_ref) @classmethod def determine_disk_image_type(cls, instance): @@ -633,37 +633,38 @@ class VMHelper(HelperBase): return is_pv @classmethod - def lookup(cls, session, i): + def lookup(cls, session, name_label): """Look the instance i up, and returns it if available""" - vms = session.get_xenapi().VM.get_by_name_label(i) - n = len(vms) + vm_refs = session.get_xenapi().VM.get_by_name_label(name_label) + n = len(vm_refs) if n == 0: return None elif n > 1: - raise exception.Duplicate(_('duplicate name found: %s') % i) + raise exception.Duplicate(_('duplicate name found: %s') % + name_label) else: - return vms[0] + return vm_refs[0] @classmethod - def lookup_vm_vdis(cls, session, vm): + def lookup_vm_vdis(cls, session, vm_ref): """Look for the VDIs that are attached to the VM""" # Firstly we get the VBDs, then the VDIs. # TODO(Armando): do we leave the read-only devices? - vbds = session.get_xenapi().VM.get_VBDs(vm) - vdis = [] - if vbds: - for vbd in vbds: + vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) + vdi_refs = [] + if vbd_refs: + for vbd_ref in vbd_refs: try: - vdi = session.get_xenapi().VBD.get_VDI(vbd) + vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) # Test valid VDI - record = session.get_xenapi().VDI.get_record(vdi) + record = session.get_xenapi().VDI.get_record(vdi_ref) LOG.debug(_('VDI %s is still available'), record['uuid']) except cls.XenAPI.Failure, exc: LOG.exception(exc) else: - vdis.append(vdi) - if len(vdis) > 0: - return vdis + vdi_refs.append(vdi_ref) + if len(vdi_refs) > 0: + return vdi_refs else: return None @@ -839,16 +840,16 @@ def safe_find_sr(session): def find_sr(session): """Return the storage repository to hold VM images""" host = session.get_xenapi_host() - srs = session.get_xenapi().SR.get_all() - for sr in srs: - sr_rec = session.get_xenapi().SR.get_record(sr) + sr_refs = session.get_xenapi().SR.get_all() + for sr_ref in sr_refs: + sr_rec = session.get_xenapi().SR.get_record(sr_ref) if not ('i18n-key' in sr_rec['other_config'] and sr_rec['other_config']['i18n-key'] == 'local-storage'): continue - for pbd in sr_rec['PBDs']: - pbd_rec = session.get_xenapi().PBD.get_record(pbd) + for pbd_ref in sr_rec['PBDs']: + pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref) if pbd_rec['host'] == host: - return sr + return sr_ref return None @@ -873,11 +874,11 @@ def remap_vbd_dev(dev): return remapped_dev -def with_vdi_attached_here(session, vdi, read_only, f): +def with_vdi_attached_here(session, vdi_ref, read_only, f): this_vm_ref = get_this_vm_ref(session) vbd_rec = {} vbd_rec['VM'] = this_vm_ref - vbd_rec['VDI'] = vdi + vbd_rec['VDI'] = vdi_ref vbd_rec['userdevice'] = 'autodetect' vbd_rec['bootable'] = False vbd_rec['mode'] = read_only and 'RO' or 'RW' @@ -888,28 +889,28 @@ def with_vdi_attached_here(session, vdi, read_only, f): vbd_rec['qos_algorithm_type'] = '' vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] - LOG.debug(_('Creating VBD for VDI %s ... '), vdi) - vbd = session.get_xenapi().VBD.create(vbd_rec) - LOG.debug(_('Creating VBD for VDI %s done.'), vdi) + LOG.debug(_('Creating VBD for VDI %s ... '), vdi_ref) + vbd_ref = session.get_xenapi().VBD.create(vbd_rec) + LOG.debug(_('Creating VBD for VDI %s done.'), vdi_ref) try: - LOG.debug(_('Plugging VBD %s ... '), vbd) - session.get_xenapi().VBD.plug(vbd) - LOG.debug(_('Plugging VBD %s done.'), vbd) - orig_dev = session.get_xenapi().VBD.get_device(vbd) - LOG.debug(_('VBD %(vbd)s plugged as %(orig_dev)s') % locals()) + LOG.debug(_('Plugging VBD %s ... '), vbd_ref) + session.get_xenapi().VBD.plug(vbd_ref) + LOG.debug(_('Plugging VBD %s done.'), vbd_ref) + orig_dev = session.get_xenapi().VBD.get_device(vbd_ref) + LOG.debug(_('VBD %(vbd_ref)s plugged as %(orig_dev)s') % locals()) dev = remap_vbd_dev(orig_dev) if dev != orig_dev: - LOG.debug(_('VBD %(vbd)s plugged into wrong dev, ' + LOG.debug(_('VBD %(vbd_ref)s plugged into wrong dev, ' 'remapping to %(dev)s') % locals()) return f(dev) finally: - LOG.debug(_('Destroying VBD for VDI %s ... '), vdi) - vbd_unplug_with_retry(session, vbd) - ignore_failure(session.get_xenapi().VBD.destroy, vbd) - LOG.debug(_('Destroying VBD for VDI %s done.'), vdi) + LOG.debug(_('Destroying VBD for VDI %s ... '), vdi_ref) + vbd_unplug_with_retry(session, vbd_ref) + ignore_failure(session.get_xenapi().VBD.destroy, vbd_ref) + LOG.debug(_('Destroying VBD for VDI %s done.'), vdi_ref) -def vbd_unplug_with_retry(session, vbd): +def vbd_unplug_with_retry(session, vbd_ref): """Call VBD.unplug on the given VBD, with a retry if we get DEVICE_DETACH_REJECTED. For reasons which I don't understand, we're seeing the device still in use, even when all processes using the device @@ -917,7 +918,7 @@ def vbd_unplug_with_retry(session, vbd): # FIXME(sirp): We can use LoopingCall here w/o blocking sleep() while True: try: - session.get_xenapi().VBD.unplug(vbd) + session.get_xenapi().VBD.unplug(vbd_ref) LOG.debug(_('VBD.unplug successful first time.')) return except VMHelper.XenAPI.Failure, e: diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 8023a3e7a..406d58177 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -55,12 +55,12 @@ class VMOps(object): def list_instances(self): """List VM instances""" - vms = [] - for vm in self._session.get_xenapi().VM.get_all(): - rec = self._session.get_xenapi().VM.get_record(vm) - if not rec["is_a_template"] and not rec["is_control_domain"]: - vms.append(rec["name_label"]) - return vms + vm_refs = [] + for vm_ref in self._session.get_xenapi().VM.get_all(): + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: + vm_refs.append(vm_rec["name_label"]) + return vm_refs def _start(self, instance, vm_ref=None): """Power on a VM instance""" @@ -87,8 +87,8 @@ class VMOps(object): def _spawn_with_disk(self, instance, vdi_uuid): """Create VM instance""" instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is not None: + vm_ref = VMHelper.lookup(self._session, instance_name) + if vm_ref is not None: raise exception.Duplicate(_('Attempted to create' ' non-unique name %s') % instance_name) @@ -366,8 +366,8 @@ class VMOps(object): def reboot(self, instance): """Reboot VM instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.clean_reboot', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.clean_reboot', vm_ref) self._session.wait_for_task(task, instance.id) def set_admin_password(self, instance, new_pass): @@ -434,7 +434,7 @@ class VMOps(object): raise RuntimeError(resp_dict['message']) return resp_dict['message'] - def _shutdown(self, instance, vm, hard=True): + def _shutdown(self, instance, vm_ref, hard=True): """Shutdown an instance""" state = self.get_info(instance['name'])['state'] if state == power_state.SHUTDOWN: @@ -448,31 +448,33 @@ class VMOps(object): try: task = None if hard: - task = self._session.call_xenapi("Async.VM.hard_shutdown", vm) + task = self._session.call_xenapi("Async.VM.hard_shutdown", + vm_ref) else: - task = self._session.call_xenapi('Async.VM.clean_shutdown', vm) + task = self._session.call_xenapi("Async.VM.clean_shutdown", + vm_ref) self._session.wait_for_task(task, instance.id) except self.XenAPI.Failure, exc: LOG.exception(exc) - def _destroy_vdis(self, instance, vm): - """Destroys all VDIs associated with a VM """ + def _destroy_vdis(self, instance, vm_ref): + """Destroys all VDIs associated with a VM""" instance_id = instance.id LOG.debug(_("Destroying VDIs for Instance %(instance_id)s") % locals()) - vdis = VMHelper.lookup_vm_vdis(self._session, vm) + vdi_refs = VMHelper.lookup_vm_vdis(self._session, vm_ref) - if not vdis: + if not vdi_refs: return - for vdi in vdis: + for vdi_ref in vdi_refs: try: - task = self._session.call_xenapi('Async.VDI.destroy', vdi) + task = self._session.call_xenapi('Async.VDI.destroy', vdi_ref) self._session.wait_for_task(task, instance.id) except self.XenAPI.Failure, exc: LOG.exception(exc) - def _destroy_kernel_ramdisk(self, instance, vm): + def _destroy_kernel_ramdisk(self, instance, vm_ref): """ Three situations can occur: @@ -499,8 +501,8 @@ class VMOps(object): "both" % locals())) # 3. We have both kernel and ramdisk - (kernel, ramdisk) = VMHelper.lookup_kernel_ramdisk( - self._session, vm) + (kernel, ramdisk) = VMHelper.lookup_kernel_ramdisk(self._session, + vm_ref) LOG.debug(_("Removing kernel/ramdisk files")) @@ -511,11 +513,11 @@ class VMOps(object): LOG.debug(_("kernel/ramdisk files removed")) - def _destroy_vm(self, instance, vm): - """Destroys a VM record """ + def _destroy_vm(self, instance, vm_ref): + """Destroys a VM record""" instance_id = instance.id try: - task = self._session.call_xenapi('Async.VM.destroy', vm) + task = self._session.call_xenapi('Async.VM.destroy', vm_ref) self._session.wait_for_task(task, instance_id) except self.XenAPI.Failure, exc: LOG.exception(exc) @@ -531,10 +533,10 @@ class VMOps(object): """ instance_id = instance.id LOG.info(_("Destroying VM for Instance %(instance_id)s") % locals()) - vm = VMHelper.lookup(self._session, instance.name) - return self._destroy(instance, vm, shutdown=True) + vm_ref = VMHelper.lookup(self._session, instance.name) + return self._destroy(instance, vm_ref, shutdown=True) - def _destroy(self, instance, vm, shutdown=True, + def _destroy(self, instance, vm_ref, shutdown=True, destroy_kernel_ramdisk=True): """ Destroys VM instance by performing: @@ -544,17 +546,17 @@ class VMOps(object): 3. Destroying kernel and ramdisk files (if necessary) 4. Destroying that actual VM record """ - if vm is None: + if vm_ref is None: LOG.warning(_("VM is not present, skipping destroy...")) return if shutdown: - self._shutdown(instance, vm) + self._shutdown(instance, vm_ref) - self._destroy_vdis(instance, vm) + self._destroy_vdis(instance, vm_ref) if destroy_kernel_ramdisk: - self._destroy_kernel_ramdisk(instance, vm) - self._destroy_vm(instance, vm) + self._destroy_kernel_ramdisk(instance, vm_ref) + self._destroy_vm(instance, vm_ref) def _wait_with_callback(self, instance_id, task, callback): ret = None @@ -566,26 +568,27 @@ class VMOps(object): def pause(self, instance, callback): """Pause VM instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.pause', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.pause', vm_ref) self._wait_with_callback(instance.id, task, callback) def unpause(self, instance, callback): """Unpause VM instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.unpause', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.unpause', vm_ref) self._wait_with_callback(instance.id, task, callback) def suspend(self, instance, callback): """suspend the specified instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.suspend', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.suspend', vm_ref) self._wait_with_callback(instance.id, task, callback) def resume(self, instance, callback): """resume the specified instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.resume', vm, False, True) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.resume', vm_ref, False, + True) self._wait_with_callback(instance.id, task, callback) def rescue(self, instance, callback): @@ -595,29 +598,26 @@ class VMOps(object): - spawn a rescue VM (the vm name-label will be instance-N-rescue) """ - rescue_vm = VMHelper.lookup(self._session, instance.name + "-rescue") - if rescue_vm: + rescue_vm_ref = VMHelper.lookup(self._session, + instance.name + "-rescue") + if rescue_vm_ref: raise RuntimeError(_( "Instance is already in Rescue Mode: %s" % instance.name)) - vm = self._get_vm_opaque_ref(instance) - self._shutdown(instance, vm) - self._acquire_bootlock(vm) + vm_ref = self._get_vm_opaque_ref(instance) + self._shutdown(instance, vm_ref) + self._acquire_bootlock(vm_ref) instance._rescue = True self.spawn(instance) - rescue_vm = self._get_vm_opaque_ref(instance) + rescue_vm_ref = self._get_vm_opaque_ref(instance) - vbd = self._session.get_xenapi().VM.get_VBDs(vm)[0] - vdi_ref = self._session.get_xenapi().VBD.get_record(vbd)["VDI"] - vbd_ref = VMHelper.create_vbd( - self._session, - rescue_vm, - vdi_ref, - 1, - False) + vbd_ref = self._session.get_xenapi().VM.get_VBDs(vm_ref)[0] + vdi_ref = self._session.get_xenapi().VBD.get_record(vbd_ref)["VDI"] + rescue_vbd_ref = VMHelper.create_vbd(self._session, rescue_vm_ref, + vdi_ref, 1, False) - self._session.call_xenapi("Async.VBD.plug", vbd_ref) + self._session.call_xenapi("Async.VBD.plug", rescue_vbd_ref) def unrescue(self, instance, callback): """Unrescue the specified instance @@ -626,51 +626,53 @@ class VMOps(object): - release the bootlock to allow the instance VM to start """ - rescue_vm = VMHelper.lookup(self._session, instance.name + "-rescue") + rescue_vm_ref = VMHelper.lookup(self._session, + instance.name + "-rescue") - if not rescue_vm: + if not rescue_vm_ref: raise exception.NotFound(_( "Instance is not in Rescue Mode: %s" % instance.name)) - original_vm = self._get_vm_opaque_ref(instance) - vbds = self._session.get_xenapi().VM.get_VBDs(rescue_vm) + original_vm_ref = self._get_vm_opaque_ref(instance) + vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref) instance._rescue = False - for vbd_ref in vbds: - vbd = self._session.get_xenapi().VBD.get_record(vbd_ref) - if vbd["userdevice"] == "1": + for vbd_ref in vbd_refs: + _vbd_ref = self._session.get_xenapi().VBD.get_record(vbd_ref) + if _vbd_ref["userdevice"] == "1": VMHelper.unplug_vbd(self._session, vbd_ref) VMHelper.destroy_vbd(self._session, vbd_ref) - task1 = self._session.call_xenapi("Async.VM.hard_shutdown", rescue_vm) + task1 = self._session.call_xenapi("Async.VM.hard_shutdown", + rescue_vm_ref) self._session.wait_for_task(task1, instance.id) - vdis = VMHelper.lookup_vm_vdis(self._session, rescue_vm) - for vdi in vdis: + vdi_refs = VMHelper.lookup_vm_vdis(self._session, rescue_vm_ref) + for vdi_ref in vdi_refs: try: - task = self._session.call_xenapi('Async.VDI.destroy', vdi) + task = self._session.call_xenapi('Async.VDI.destroy', vdi_ref) self._session.wait_for_task(task, instance.id) except self.XenAPI.Failure: continue - task2 = self._session.call_xenapi('Async.VM.destroy', rescue_vm) + task2 = self._session.call_xenapi('Async.VM.destroy', rescue_vm_ref) self._session.wait_for_task(task2, instance.id) - self._release_bootlock(original_vm) - self._start(instance, original_vm) + self._release_bootlock(original_vm_ref) + self._start(instance, original_vm_ref) def get_info(self, instance): """Return data about VM instance""" - vm = self._get_vm_opaque_ref(instance) - rec = self._session.get_xenapi().VM.get_record(vm) - return VMHelper.compile_info(rec) + vm_ref = self._get_vm_opaque_ref(instance) + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + return VMHelper.compile_info(vm_rec) def get_diagnostics(self, instance): """Return data about VM diagnostics""" - vm = self._get_vm_opaque_ref(instance) - rec = self._session.get_xenapi().VM.get_record(vm) - return VMHelper.compile_diagnostics(self._session, rec) + vm_ref = self._get_vm_opaque_ref(instance) + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + return VMHelper.compile_diagnostics(self._session, vm_rec) def get_console_output(self, instance): """Return snapshot of console""" @@ -693,9 +695,9 @@ class VMOps(object): # at this stage even though they aren't implemented because these will # be needed for multi-nic and there was no sense writing it for single # network/single IP and then having to turn around and re-write it - vm_opaque_ref = self._get_vm_opaque_ref(instance.id) + vm_ref = self._get_vm_opaque_ref(instance.id) logging.debug(_("injecting network info to xenstore for vm: |%s|"), - vm_opaque_ref) + vm_ref) admin_context = context.get_admin_context() IPs = db.fixed_ip_get_all_by_instance(admin_context, instance['id']) networks = db.network_get_all_by_instance(admin_context, @@ -727,11 +729,10 @@ class VMOps(object): 'ips': [ip_dict(ip) for ip in network_IPs], 'ip6s': [ip6_dict(ip) for ip in network_IPs]} - self.write_to_param_xenstore(vm_opaque_ref, {location: mapping}) + self.write_to_param_xenstore(vm_ref, {location: mapping}) try: - self.write_to_xenstore(vm_opaque_ref, location, - mapping['location']) + self.write_to_xenstore(vm_ref, location, mapping['location']) except KeyError: # catch KeyError for domid if instance isn't running pass @@ -743,8 +744,8 @@ class VMOps(object): Creates vifs for an instance """ - vm_opaque_ref = self._get_vm_opaque_ref(instance.id) - logging.debug(_("creating vif(s) for vm: |%s|"), vm_opaque_ref) + vm_ref = self._get_vm_opaque_ref(instance.id) + logging.debug(_("creating vif(s) for vm: |%s|"), vm_ref) if networks is None: networks = db.network_get_all_by_instance(admin_context, instance['id']) @@ -764,12 +765,8 @@ class VMOps(object): except AttributeError: device = "0" - VMHelper.create_vif( - self._session, - vm_opaque_ref, - network_ref, - instance.mac_address, - device) + VMHelper.create_vif(self._session, vm_ref, network_ref, + instance.mac_address, device) def reset_network(self, instance): """ @@ -833,9 +830,9 @@ class VMOps(object): Any errors raised by the plugin will in turn raise a RuntimeError here. """ instance_id = vm.id - vm = self._get_vm_opaque_ref(vm) - rec = self._session.get_xenapi().VM.get_record(vm) - args = {'dom_id': rec['domid'], 'path': path} + vm_ref = self._get_vm_opaque_ref(vm) + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + args = {'dom_id': vm_rec['domid'], 'path': path} args.update(addl_args) try: task = self._session.async_call_plugin(plugin, method, args) @@ -915,9 +912,9 @@ class VMOps(object): value for 'keys' is passed, the returned dict is filtered to only return the values for those keys. """ - vm = self._get_vm_opaque_ref(instance_or_vm) + vm_ref = self._get_vm_opaque_ref(instance_or_vm) data = self._session.call_xenapi_request('VM.get_xenstore_data', - (vm, )) + (vm_ref, )) ret = {} if keys is None: keys = data.keys() @@ -935,11 +932,11 @@ class VMOps(object): """Takes a key/value pair and adds it to the xenstore parameter record for the given vm instance. If the key exists in xenstore, it is overwritten""" - vm = self._get_vm_opaque_ref(instance_or_vm) + vm_ref = self._get_vm_opaque_ref(instance_or_vm) self.remove_from_param_xenstore(instance_or_vm, key) jsonval = json.dumps(val) self._session.call_xenapi_request('VM.add_to_xenstore_data', - (vm, key, jsonval)) + (vm_ref, key, jsonval)) def write_to_param_xenstore(self, instance_or_vm, mapping): """Takes a dict and writes each key/value pair to the xenstore @@ -954,14 +951,14 @@ class VMOps(object): them from the xenstore parameter record data for the given VM. If the key doesn't exist, the request is ignored. """ - vm = self._get_vm_opaque_ref(instance_or_vm) + vm_ref = self._get_vm_opaque_ref(instance_or_vm) if isinstance(key_or_keys, basestring): keys = [key_or_keys] else: keys = key_or_keys for key in keys: self._session.call_xenapi_request('VM.remove_from_xenstore_data', - (vm, key)) + (vm_ref, key)) def clear_param_xenstore(self, instance_or_vm): """Removes all data from the xenstore parameter record for this VM.""" diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py index d5ebd29d5..72284ac02 100644 --- a/nova/virt/xenapi/volume_utils.py +++ b/nova/virt/xenapi/volume_utils.py @@ -117,16 +117,16 @@ class VolumeHelper(HelperBase): def introduce_vdi(cls, session, sr_ref): """Introduce VDI in the host""" try: - vdis = session.get_xenapi().SR.get_VDIs(sr_ref) + vdi_refs = session.get_xenapi().SR.get_VDIs(sr_ref) except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref) try: - vdi_rec = session.get_xenapi().VDI.get_record(vdis[0]) + vdi_rec = session.get_xenapi().VDI.get_record(vdi_refs[0]) except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('Unable to get record' - ' of VDI %s on') % vdis[0]) + ' of VDI %s on') % vdi_refs[0]) else: try: return session.get_xenapi().VDI.introduce( diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index b63a5f8c3..da42a83b6 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -49,6 +49,12 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. address for the nova-volume host :target_port: iSCSI Target Port, 3260 Default :iqn_prefix: IQN Prefix, e.g. 'iqn.2010-10.org.openstack' + +**Variable Naming Scheme** + +- suffix "_ref" for opaque references +- suffix "_uuid" for UUIDs +- suffix "_rec" for record objects """ import sys @@ -263,6 +269,27 @@ class XenAPIConnection(object): 'username': FLAGS.xenapi_connection_username, 'password': FLAGS.xenapi_connection_password} + def update_available_resource(self, ctxt, host): + """This method is supported only by libvirt.""" + return + + def compare_cpu(self, xml): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def ensure_filtering_rules_for_instance(self, instance_ref): + """This method is supported only libvirt.""" + return + + def live_migration(self, context, instance_ref, dest, + post_method, recover_method): + """This method is supported only by libvirt.""" + return + + def unfilter_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" |
