diff options
| author | Vishvananda Ishaya <vishvananda@gmail.com> | 2011-01-18 12:42:06 -0800 |
|---|---|---|
| committer | Vishvananda Ishaya <vishvananda@gmail.com> | 2011-01-18 12:42:06 -0800 |
| commit | ef9b60f4b8d22f16c0606c5b3c2d2d40d76eac02 (patch) | |
| tree | cd0edbb59607c2b4c68301e3e45efc37dc30da54 /nova/virt | |
| parent | 47a2dc24b08ca4be7d114d95b42dc4faf19d9fad (diff) | |
| parent | 4577c11923f05ba60ec898186d3f959d86e5de4c (diff) | |
| download | nova-ef9b60f4b8d22f16c0606c5b3c2d2d40d76eac02.tar.gz nova-ef9b60f4b8d22f16c0606c5b3c2d2d40d76eac02.tar.xz nova-ef9b60f4b8d22f16c0606c5b3c2d2d40d76eac02.zip | |
merged trunk
Diffstat (limited to 'nova/virt')
| -rw-r--r-- | nova/virt/cpuinfo.xml.template | 9 | ||||
| -rw-r--r-- | nova/virt/fake.py | 32 | ||||
| -rw-r--r-- | nova/virt/libvirt_conn.py | 292 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 23 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 6 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 38 |
6 files changed, 384 insertions, 16 deletions
diff --git a/nova/virt/cpuinfo.xml.template b/nova/virt/cpuinfo.xml.template new file mode 100644 index 000000000..48842b29d --- /dev/null +++ b/nova/virt/cpuinfo.xml.template @@ -0,0 +1,9 @@ +<cpu> + <arch>$arch</arch> + <model>$model</model> + <vendor>$vendor</vendor> + <topology sockets="$topology.sockets" cores="$topology.cores" threads="$topology.threads"/> +#for $var in $features + <features name="$var" /> +#end for +</cpu> diff --git a/nova/virt/fake.py b/nova/virt/fake.py index a57a8f43b..80ae7f34c 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -310,6 +310,38 @@ class FakeConnection(object): 'username': 'fakeuser', 'password': 'fakepassword'} + def get_cpu_info(self): + """This method is supported only libvirt. """ + return + + def get_vcpu_number(self): + """This method is supported only libvirt. """ + return -1 + + def get_memory_mb(self): + """This method is supported only libvirt..""" + return -1 + + def get_local_gb(self): + """This method is supported only libvirt..""" + return -1 + + def get_hypervisor_type(self): + """This method is supported only libvirt..""" + return + + def get_hypervisor_version(self): + """This method is supported only libvirt..""" + return -1 + + def compare_cpu(self, xml): + """This method is supported only libvirt..""" + raise NotImplementedError('This method is supported only libvirt.') + + def live_migration(self, context, instance_ref, dest): + """This method is supported only libvirt..""" + raise NotImplementedError('This method is supported only libvirt.') + class FakeInstance(object): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 4223defd5..ab4ceb6a4 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -36,8 +36,11 @@ Supports KVM, QEMU, UML, and XEN. """ +import json import os import shutil +import re +import time import random import subprocess import uuid @@ -80,6 +83,9 @@ flags.DEFINE_string('injected_network_template', flags.DEFINE_string('libvirt_xml_template', utils.abspath('virt/libvirt.xml.template'), 'Libvirt XML Template') +flags.DEFINE_string('cpuinfo_xml_template', + utils.abspath('virt/cpuinfo.xml.template'), + 'CpuInfo XML Template (used only live migration now)') flags.DEFINE_string('libvirt_type', 'kvm', 'Libvirt domain type (valid options are: ' @@ -88,6 +94,16 @@ flags.DEFINE_string('libvirt_uri', '', 'Override the default libvirt URI (which is dependent' ' on libvirt_type)') +flags.DEFINE_string('live_migration_uri', + "qemu+tcp://%s/system", + 'Define protocol used by live_migration feature') +flags.DEFINE_string('live_migration_flag', + "VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER", + 'Define live migration behavior.') +flags.DEFINE_integer('live_migration_bandwidth', 0, + 'Define live migration behavior') +flags.DEFINE_string('live_migration_timeout_sec', 10, + 'Timeout second for pre_live_migration is completed.') flags.DEFINE_bool('allow_project_net_traffic', True, 'Whether to allow in project network traffic') @@ -146,6 +162,7 @@ class LibvirtConnection(object): self.libvirt_uri = self.get_uri() self.libvirt_xml = open(FLAGS.libvirt_xml_template).read() + self.cpuinfo_xml = open(FLAGS.cpuinfo_xml_template).read() self._wrapped_conn = None self.read_only = read_only @@ -295,7 +312,7 @@ class LibvirtConnection(object): virt_dom.detachDevice(xml) @exception.wrap_exception - def snapshot(self, instance, name): + def snapshot(self, instance, image_id): """ Create snapshot from a running VM instance """ raise NotImplementedError( _("Instance snapshotting is not supported for libvirt" @@ -818,6 +835,74 @@ class LibvirtConnection(object): return interfaces + def get_vcpu_number(self): + """ Get vcpu number of physical computer. """ + return self._conn.getMaxVcpus(None) + + def get_memory_mb(self): + """Get the memory size of physical computer .""" + meminfo = open('/proc/meminfo').read().split() + idx = meminfo.index('MemTotal:') + # transforming kb to mb. + return int(meminfo[idx + 1]) / 1024 + + def get_local_gb(self): + """Get the hdd size of physical computer .""" + hddinfo = os.statvfs(FLAGS.instances_path) + return hddinfo.f_bsize * hddinfo.f_blocks / 1024 / 1024 / 1024 + + def get_hypervisor_type(self): + """ Get hypervisor type """ + return self._conn.getType() + + def get_hypervisor_version(self): + """ Get hypervisor version """ + return self._conn.getVersion() + + def get_cpu_info(self): + """ Get cpuinfo information """ + xmlstr = self._conn.getCapabilities() + xml = libxml2.parseDoc(xmlstr) + nodes = xml.xpathEval('//cpu') + if len(nodes) != 1: + msg = 'Unexpected xml format. tag "cpu" must be 1, but %d.' \ + % len(nodes) + msg += '\n' + xml.serialize() + raise exception.Invalid(_(msg)) + + arch = xml.xpathEval('//cpu/arch')[0].getContent() + model = xml.xpathEval('//cpu/model')[0].getContent() + vendor = xml.xpathEval('//cpu/vendor')[0].getContent() + + topology_node = xml.xpathEval('//cpu/topology')[0].get_properties() + topology = dict() + while topology_node != None: + name = topology_node.get_name() + topology[name] = topology_node.getContent() + topology_node = topology_node.get_next() + + keys = ['cores', 'sockets', 'threads'] + tkeys = topology.keys() + if list(set(tkeys)) != list(set(keys)): + msg = _('Invalid xml: topology(%s) must have %s') + raise exception.Invalid(msg % (str(topology), ', '.join(keys))) + + feature_nodes = xml.xpathEval('//cpu/feature') + features = list() + for nodes in feature_nodes: + feature_name = nodes.get_properties().getContent() + features.append(feature_name) + + template = ("""{"arch":"%s", "model":"%s", "vendor":"%s", """ + """"topology":{"cores":"%s", "threads":"%s", """ + """"sockets":"%s"}, "features":[%s]}""") + c = topology['cores'] + s = topology['sockets'] + t = topology['threads'] + f = ['"%s"' % x for x in features] + cpu_info = template % (arch, model, vendor, c, s, t, ', '.join(f)) + return cpu_info + def block_stats(self, instance_name, disk): """ Note that this function takes an instance name, not an Instance, so @@ -848,6 +933,208 @@ class LibvirtConnection(object): def refresh_security_group_members(self, security_group_id): self.firewall_driver.refresh_security_group_members(security_group_id) + def compare_cpu(self, cpu_info): + """ + Check the host cpu is compatible to a cpu given by xml. + "xml" must be a part of libvirt.openReadonly().getCapabilities(). + return values follows by virCPUCompareResult. + if 0 > return value, do live migration. + + 'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult' + """ + msg = _('Checking cpu_info: instance was launched this cpu.\n: %s ') + LOG.info(msg % cpu_info) + dic = json.loads(cpu_info) + xml = str(Template(self.cpuinfo_xml, searchList=dic)) + msg = _('to xml...\n: %s ') + LOG.info(msg % xml) + + url = 'http://libvirt.org/html/libvirt-libvirt.html' + url += '#virCPUCompareResult\n' + msg = 'CPU does not have compativility.\n' + msg += 'result:%d \n' + msg += 'Refer to %s' + msg = _(msg) + + # unknown character exists in xml, then libvirt complains + try: + ret = self._conn.compareCPU(xml, 0) + except libvirt.libvirtError, e: + LOG.error(msg % (ret, url)) + raise e + + if ret <= 0: + raise exception.Invalid(msg % (ret, url)) + + return + + def ensure_filtering_rules_for_instance(self, instance_ref): + """ Setting up inevitable filtering rules on compute node, + and waiting for its completion. + To migrate an instance, filtering rules to hypervisors + and firewalls are inevitable on destination host. + ( Waiting only for filterling rules to hypervisor, + since filtering rules to firewall rules can be set faster). + + Concretely, the below method must be called. + - setup_basic_filtering (for nova-basic, etc.) + - prepare_instance_filter(for nova-instance-instance-xxx, etc.) + + to_xml may have to be called since it defines PROJNET, PROJMASK. + but libvirt migrates those value through migrateToURI(), + so , no need to be called. + + Don't use thread for this method since migration should + not be started when setting-up filtering rules operations + are not completed.""" + + # Tf any instances never launch at destination host, + # basic-filtering must be set here. + self.nwfilter.setup_basic_filtering(instance_ref) + # setting up n)ova-instance-instance-xx mainly. + self.firewall_driver.prepare_instance_filter(instance_ref) + + # wait for completion + timeout_count = range(FLAGS.live_migration_timeout_sec * 2) + while len(timeout_count) != 0: + try: + filter_name = 'nova-instance-%s' % instance_ref.name + self._conn.nwfilterLookupByName(filter_name) + break + except libvirt.libvirtError: + timeout_count.pop() + if len(timeout_count) == 0: + ec2_id = instance_ref['hostname'] + msg = _('Timeout migrating for %s(%s)') + raise exception.Error(msg % (ec2_id, instance_ref.name)) + time.sleep(0.5) + + def live_migration(self, context, instance_ref, dest): + """ + Just spawning live_migration operation for + distributing high-load. + """ + greenthread.spawn(self._live_migration, context, instance_ref, dest) + + def _live_migration(self, context, instance_ref, dest): + """ Do live migration.""" + + # Do live migration. + try: + duri = FLAGS.live_migration_uri % dest + + flaglist = FLAGS.live_migration_flag.split(',') + flagvals = [getattr(libvirt, x.strip()) for x in flaglist] + logical_sum = reduce(lambda x, y: x | y, flagvals) + + bandwidth = FLAGS.live_migration_bandwidth + + if self.read_only: + tmpconn = self._connect(self.libvirt_uri, False) + dom = tmpconn.lookupByName(instance_ref.name) + dom.migrateToURI(duri, logical_sum, None, bandwidth) + tmpconn.close() + else: + dom = self._conn.lookupByName(instance_ref.name) + dom.migrateToURI(duri, logical_sum, None, bandwidth) + + except Exception, e: + id = instance_ref['id'] + db.instance_set_state(context, id, power_state.RUNNING, 'running') + for v in instance_ref['volumes']: + db.volume_update(context, + v['id'], + {'status': 'in-use'}) + + raise e + + # Waiting for completion of live_migration. + timer = utils.LoopingCall(f=None) + + def wait_for_live_migration(): + + try: + state = self.get_info(instance_ref.name)['state'] + except exception.NotFound: + timer.stop() + self._post_live_migration(context, instance_ref, dest) + + timer.f = wait_for_live_migration + timer.start(interval=0.5, now=True) + + def _post_live_migration(self, context, instance_ref, dest): + """ + Post operations for live migration. + Mainly, database updating. + """ + LOG.info('post livemigration operation is started..') + # Detaching volumes. + # (not necessary in current version ) + + # Releasing vlan. + # (not necessary in current implementation?) + + # Releasing security group ingress rule. + if FLAGS.firewall_driver == \ + 'nova.virt.libvirt_conn.IptablesFirewallDriver': + try: + self.firewall_driver.unfilter_instance(instance_ref) + except KeyError, e: + pass + + # Database updating. + ec2_id = instance_ref['hostname'] + + instance_id = instance_ref['id'] + fixed_ip = db.instance_get_fixed_address(context, instance_id) + # Not return if fixed_ip is not found, otherwise, + # instance never be accessible.. + if None == fixed_ip: + logging.warn('fixed_ip is not found for %s ' % ec2_id) + db.fixed_ip_update(context, fixed_ip, {'host': dest}) + network_ref = db.fixed_ip_get_network(context, fixed_ip) + db.network_update(context, network_ref['id'], {'host': dest}) + + try: + floating_ip \ + = db.instance_get_floating_address(context, instance_id) + # Not return if floating_ip is not found, otherwise, + # instance never be accessible.. + if None == floating_ip: + logging.error('floating_ip is not found for %s ' % ec2_id) + else: + floating_ip_ref = db.floating_ip_get_by_address(context, + floating_ip) + db.floating_ip_update(context, + floating_ip_ref['address'], + {'host': dest}) + except exception.NotFound: + logging.debug('%s doesnt have floating_ip.. ' % ec2_id) + except: + msg = 'Live migration: Unexpected error:' + msg += '%s cannot inherit floating ip.. ' % ec2_id + logging.error(_(msg)) + + # Restore instance/volume state + db.instance_update(context, + instance_id, + {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': dest}) + + for v in instance_ref['volumes']: + db.volume_update(context, + v['id'], + {'status': 'in-use'}) + + logging.info(_('Live migrating %s to %s finishes successfully') + % (ec2_id, dest)) + msg = _(("""Known error: the below error is nomally occurs.\n""" + """Just check if iinstance is successfully migrated.\n""" + """libvir: QEMU error : Domain not found: no domain """ + """with matching name..""")) + logging.info(msg) + class FirewallDriver(object): def prepare_instance_filter(self, instance): @@ -995,8 +1282,7 @@ class NWFilterFirewall(FirewallDriver): ['no-mac-spoofing', 'no-ip-spoofing', 'no-arp-spoofing', - 'allow-dhcp-server' - ])) + 'allow-dhcp-server'])) self._define_filter(self.nova_base_ipv4_filter) self._define_filter(self.nova_base_ipv6_filter) self._define_filter(self.nova_dhcp_filter) diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index a91c8ea27..eb0393d2a 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -236,14 +236,15 @@ class VMHelper(HelperBase): return template_vm_ref, [template_vdi_uuid, parent_uuid] @classmethod - def upload_image(cls, session, instance_id, vdi_uuids, image_name): + def upload_image(cls, session, instance_id, vdi_uuids, image_id): """ Requests that the Glance plugin bundle the specified VDIs and push them into Glance using the specified human-friendly name. """ - LOG.debug(_("Asking xapi to upload %s as '%s'"), vdi_uuids, image_name) + logging.debug(_("Asking xapi to upload %s as ID %s"), + vdi_uuids, image_id) params = {'vdi_uuids': vdi_uuids, - 'image_name': image_name, + 'image_id': image_id, 'glance_host': FLAGS.glance_host, 'glance_port': FLAGS.glance_port} @@ -424,9 +425,16 @@ def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref, * parent_vhd snapshot """ - #TODO(sirp): we need to timeout this req after a while + max_attempts = FLAGS.xenapi_vhd_coalesce_max_attempts + attempts = {'counter': 0} def _poll_vhds(): + attempts['counter'] += 1 + if attempts['counter'] > max_attempts: + msg = (_("VHD coalesce attempts exceeded (%d > %d), giving up...") + % (attempts['counter'], max_attempts)) + raise exception.Error(msg) + scan_sr(session, instance_id, sr_ref) parent_uuid = get_vhd_parent_uuid(session, vdi_ref) if original_parent_uuid and (parent_uuid != original_parent_uuid): @@ -434,13 +442,12 @@ def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref, "waiting for coalesce..."), parent_uuid, original_parent_uuid) else: - done.send(parent_uuid) + # Breakout of the loop (normally) and return the parent_uuid + raise utils.LoopingCallDone(parent_uuid) - done = event.Event() loop = utils.LoopingCall(_poll_vhds) loop.start(FLAGS.xenapi_vhd_coalesce_poll_interval, now=True) - parent_uuid = done.wait() - loop.stop() + parent_uuid = loop.wait() return parent_uuid diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 6e359ef82..5e414bab4 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -161,11 +161,11 @@ class VMOps(object): raise Exception(_('Instance not present %s') % instance_name) return vm - def snapshot(self, instance, name): + def snapshot(self, instance, image_id): """ Create snapshot from a running VM instance :param instance: instance to be snapshotted - :param name: name/label to be given to the snapshot + :param image_id: id of image to upload to Steps involved in a XenServer snapshot: @@ -201,7 +201,7 @@ class VMOps(object): try: # call plugin to ship snapshot off to glance VMHelper.upload_image( - self._session, instance.id, template_vdi_uuids, name) + self._session, instance.id, template_vdi_uuids, image_id) finally: self._destroy(instance, template_vm_ref, shutdown=False) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index 689844f34..c10f73fe7 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -93,6 +93,10 @@ flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval', 5.0, 'The interval used for polling of coalescing vhds.' ' Used only if connection_type=xenapi.') +flags.DEFINE_integer('xenapi_vhd_coalesce_max_attempts', + 5, + 'Max number of times to poll for VHD to coalesce.' + ' Used only if connection_type=xenapi.') flags.DEFINE_string('target_host', None, 'iSCSI Target Host') @@ -141,9 +145,9 @@ class XenAPIConnection(object): """Create VM instance""" self._vmops.spawn(instance) - def snapshot(self, instance, name): + def snapshot(self, instance, image_id): """ Create snapshot from a running VM instance """ - self._vmops.snapshot(instance, name) + self._vmops.snapshot(instance, image_id) def reboot(self, instance): """Reboot VM instance""" @@ -205,6 +209,36 @@ class XenAPIConnection(object): 'username': FLAGS.xenapi_connection_username, 'password': FLAGS.xenapi_connection_password} + def get_cpu_info(self): + """This method is supported only libvirt. """ + return + + def get_vcpu_number(self): + """This method is supported only libvirt. """ + return -1 + + def get_memory_mb(self): + """This method is supported only libvirt..""" + return -1 + + def get_local_gb(self): + """This method is supported only libvirt..""" + return -1 + + def get_hypervisor_type(self): + """This method is supported only libvirt..""" + return + + def get_hypervisor_version(self): + """This method is supported only libvirt..""" + return -1 + + def compare_cpu(self, xml): + raise NotImplementedError('This method is supported only libvirt.') + + def live_migration(self, context, instance_ref, dest): + raise NotImplementedError('This method is supported only libvirt.') + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" |
