diff options
author | Trey Morris <trey.morris@rackspace.com> | 2011-03-17 15:57:04 -0500 |
---|---|---|
committer | Trey Morris <trey.morris@rackspace.com> | 2011-03-17 15:57:04 -0500 |
commit | 400ca259f49a741cf2cefd86afcf2494ee0bd446 (patch) | |
tree | 881a11a7ba5a306f0d728c085a8021cb7e527bd5 | |
parent | 038d99d9fa4354bd617adfa332d69a87a9f7918e (diff) | |
parent | 88ae79505a84736ebdf57ba67c60ff16de5c9e87 (diff) | |
download | nova-400ca259f49a741cf2cefd86afcf2494ee0bd446.tar.gz nova-400ca259f49a741cf2cefd86afcf2494ee0bd446.tar.xz nova-400ca259f49a741cf2cefd86afcf2494ee0bd446.zip |
merged trunk, merged qos, slight refactor regarding merges
47 files changed, 4106 insertions, 160 deletions
@@ -28,6 +28,7 @@ <matt.dietz@rackspace.com> <matthewdietz@Matthew-Dietzs-MacBook-Pro.local> <matt.dietz@rackspace.com> <mdietz@openstack> <mordred@inaugust.com> <mordred@hudson> +<nirmal.ranganathan@rackspace.com> <nirmal.ranganathan@rackspace.coom> <paul@openstack.org> <paul.voccio@rackspace.com> <paul@openstack.org> <pvoccio@castor.local> <rconradharris@gmail.com> <rick.harris@rackspace.com> @@ -19,6 +19,7 @@ Devin Carlen <devin.carlen@gmail.com> Ed Leafe <ed@leafe.com> Eldar Nugaev <enugaev@griddynamics.com> Eric Day <eday@oddments.org> +Eric Windisch <eric@cloudscaling.com> Ewan Mellor <ewan.mellor@citrix.com> Hisaharu Ishii <ishii.hisaharu@lab.ntt.co.jp> Hisaki Ohara <hisaki.ohara@intel.com> @@ -39,6 +40,7 @@ Ken Pepple <ken.pepple@gmail.com> Kevin L. Mitchell <kevin.mitchell@rackspace.com> Koji Iida <iida.koji@lab.ntt.co.jp> Lorin Hochstein <lorin@isi.edu> +Mark Washenberger <mark.washenberger@rackspace.com> Masanori Itoh <itoumsn@nttdata.co.jp> Matt Dietz <matt.dietz@rackspace.com> Michael Gundlach <michael.gundlach@rackspace.com> diff --git a/MANIFEST.in b/MANIFEST.in index 2ceed34f3..bf30d1546 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -25,6 +25,7 @@ include nova/db/sqlalchemy/migrate_repo/migrate.cfg include nova/db/sqlalchemy/migrate_repo/README include nova/virt/interfaces.template include nova/virt/libvirt*.xml.template +include nova/virt/cpuinfo.xml.template include nova/tests/CA/ include nova/tests/CA/cacert.pem include nova/tests/CA/private/ diff --git a/bin/nova-manage b/bin/nova-manage index cb4d18614..a4d820209 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -446,10 +446,15 @@ class FixedIpCommands(object): def list(self, host=None): """Lists all fixed ips (optionally by host) arguments: [host]""" ctxt = context.get_admin_context() - if host == None: - fixed_ips = db.fixed_ip_get_all(ctxt) - else: - fixed_ips = db.fixed_ip_get_all_by_host(ctxt, host) + + try: + if host == None: + fixed_ips = db.fixed_ip_get_all(ctxt) + else: + fixed_ips = db.fixed_ip_get_all_by_host(ctxt, host) + except exception.NotFound as ex: + print "error: %s" % ex + sys.exit(2) print "%-18s\t%-15s\t%-17s\t%-15s\t%s" % (_('network'), _('IP address'), @@ -466,9 +471,9 @@ class FixedIpCommands(object): host = instance['host'] mac_address = instance['mac_address'] print "%-18s\t%-15s\t%-17s\t%-15s\t%s" % ( - fixed_ip['network']['cidr'], - fixed_ip['address'], - mac_address, hostname, host) + fixed_ip['network']['cidr'], + fixed_ip['address'], + mac_address, hostname, host) class FloatingIpCommands(object): @@ -560,6 +565,40 @@ class NetworkCommands(object): db.network_delete_safe(context.get_admin_context(), network.id) +class VmCommands(object): + """Class for mangaging VM instances.""" + + def live_migration(self, ec2_id, dest): + """Migrates a running instance to a new machine. + + :param ec2_id: instance id which comes from euca-describe-instance. + :param dest: destination host name. + + """ + + ctxt = context.get_admin_context() + instance_id = ec2utils.ec2_id_to_id(ec2_id) + + if FLAGS.connection_type != 'libvirt': + msg = _('Only KVM is supported for now. Sorry!') + raise exception.Error(msg) + + if (FLAGS.volume_driver != 'nova.volume.driver.AOEDriver' and \ + FLAGS.volume_driver != 'nova.volume.driver.ISCSIDriver'): + msg = _("Support only AOEDriver and ISCSIDriver. Sorry!") + raise exception.Error(msg) + + rpc.call(ctxt, + FLAGS.scheduler_topic, + {"method": "live_migration", + "args": {"instance_id": instance_id, + "dest": dest, + "topic": FLAGS.compute_topic}}) + + print _('Migration of %s initiated.' + 'Check its progress using euca-describe-instances.') % ec2_id + + class ServiceCommands(object): """Enable and disable running services""" @@ -604,6 +643,59 @@ class ServiceCommands(object): return db.service_update(ctxt, svc['id'], {'disabled': True}) + def describe_resource(self, host): + """Describes cpu/memory/hdd info for host. + + :param host: hostname. + + """ + + result = rpc.call(context.get_admin_context(), + FLAGS.scheduler_topic, + {"method": "show_host_resources", + "args": {"host": host}}) + + if type(result) != dict: + print _('An unexpected error has occurred.') + print _('[Result]'), result + else: + cpu = result['resource']['vcpus'] + mem = result['resource']['memory_mb'] + hdd = result['resource']['local_gb'] + cpu_u = result['resource']['vcpus_used'] + mem_u = result['resource']['memory_mb_used'] + hdd_u = result['resource']['local_gb_used'] + + print 'HOST\t\t\tPROJECT\t\tcpu\tmem(mb)\tdisk(gb)' + print '%s(total)\t\t\t%s\t%s\t%s' % (host, cpu, mem, hdd) + print '%s(used)\t\t\t%s\t%s\t%s' % (host, cpu_u, mem_u, hdd_u) + for p_id, val in result['usage'].items(): + print '%s\t\t%s\t\t%s\t%s\t%s' % (host, + p_id, + val['vcpus'], + val['memory_mb'], + val['local_gb']) + + def update_resource(self, host): + """Updates available vcpu/memory/disk info for host. + + :param host: hostname. + + """ + + ctxt = context.get_admin_context() + service_refs = db.service_get_all_by_host(ctxt, host) + if len(service_refs) <= 0: + raise exception.Invalid(_('%s does not exist.') % host) + + service_refs = [s for s in service_refs if s['topic'] == 'compute'] + if len(service_refs) <= 0: + raise exception.Invalid(_('%s is not compute node.') % host) + + rpc.call(ctxt, + db.queue_get_for(ctxt, FLAGS.compute_topic, host), + {"method": "update_available_resource"}) + class LogCommands(object): def request(self, request_id, logfile='/var/log/nova.log'): @@ -629,6 +721,49 @@ class DbCommands(object): print migration.db_version() +class InstanceCommands(object): + """Class for managing instances.""" + + def list(self, host=None, instance=None): + """Show a list of all instances""" + print "%-10s %-15s %-10s %-10s %-19s %-12s %-12s %-12s" \ + " %-10s %-10s %-10s %-5s" % ( + _('instance'), + _('node'), + _('type'), + _('state'), + _('launched'), + _('image'), + _('kernel'), + _('ramdisk'), + _('project'), + _('user'), + _('zone'), + _('index')) + + if host == None: + instances = db.instance_get_all(context.get_admin_context()) + else: + instances = db.instance_get_all_by_host( + context.get_admin_context(), host) + + for instance in instances: + print "%-10s %-15s %-10s %-10s %-19s %-12s %-12s %-12s" \ + " %-10s %-10s %-10s %-5d" % ( + instance['hostname'], + instance['host'], + instance['instance_type'], + instance['state_description'], + instance['launched_at'], + instance['image_id'], + instance['kernel_id'], + instance['ramdisk_id'], + instance['project_id'], + instance['user_id'], + instance['availability_zone'], + instance['launch_index']) + + class VolumeCommands(object): """Methods for dealing with a cloud in an odd state""" @@ -908,13 +1043,15 @@ CATEGORIES = [ ('fixed', FixedIpCommands), ('floating', FloatingIpCommands), ('network', NetworkCommands), + ('vm', VmCommands), ('service', ServiceCommands), ('log', LogCommands), ('db', DbCommands), ('volume', VolumeCommands), ('instance_type', InstanceTypeCommands), ('image', ImageCommands), - ('flavor', InstanceTypeCommands)] + ('flavor', InstanceTypeCommands), + ('instance', InstanceCommands)] def lazy_match(name, key_value_tuples): diff --git a/contrib/nova.sh b/contrib/nova.sh index d6c9b1081..55dfb971c 100755 --- a/contrib/nova.sh +++ b/contrib/nova.sh @@ -76,6 +76,7 @@ if [ "$CMD" == "install" ]; then sudo apt-get install -y python-migrate python-eventlet python-gflags python-ipy python-tempita sudo apt-get install -y python-libvirt python-libxml2 python-routes python-cheetah sudo apt-get install -y python-netaddr python-paste python-pastedeploy python-glance + sudo apt-get install -y python-multiprocessing if [ "$USE_IPV6" == 1 ]; then sudo apt-get install -y radvd diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index cadda97db..e257e44e7 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -147,8 +147,6 @@ class CloudController(object): instance_ref['id']) ec2_id = ec2utils.id_to_ec2_id(instance_ref['id']) image_ec2_id = self._image_ec2_id(instance_ref['image_id'], 'machine') - k_ec2_id = self._image_ec2_id(instance_ref['kernel_id'], 'kernel') - r_ec2_id = self._image_ec2_id(instance_ref['ramdisk_id'], 'ramdisk') data = { 'user-data': base64.b64decode(instance_ref['user_data']), 'meta-data': { @@ -167,8 +165,6 @@ class CloudController(object): 'instance-type': instance_ref['instance_type'], 'local-hostname': hostname, 'local-ipv4': address, - 'kernel-id': k_ec2_id, - 'ramdisk-id': r_ec2_id, 'placement': {'availability-zone': availability_zone}, 'public-hostname': hostname, 'public-ipv4': floating_ip or '', @@ -176,6 +172,13 @@ class CloudController(object): 'reservation-id': instance_ref['reservation_id'], 'security-groups': '', 'mpi': mpi}} + + for image_type in ['kernel', 'ramdisk']: + if '%s_id' % image_type in instance_ref: + ec2_id = self._image_ec2_id(instance_ref['%s_id' % image_type], + image_type) + data['meta-data']['%s-id' % image_type] = ec2_id + if False: # TODO(vish): store ancestor ids data['ancestor-ami-ids'] = [] if False: # TODO(vish): store product codes @@ -956,7 +959,7 @@ class CloudController(object): raise exception.NotFound(_('Image %s not found') % image_id) internal_id = image['id'] del(image['id']) - raise Exception(image) + image['properties']['is_public'] = (operation_type == 'add') return self.image_service.update(context, internal_id, image) diff --git a/nova/api/openstack/auth.py b/nova/api/openstack/auth.py index 4c6b58eff..f3a9bdeca 100644 --- a/nova/api/openstack/auth.py +++ b/nova/api/openstack/auth.py @@ -135,7 +135,11 @@ class AuthMiddleware(wsgi.Middleware): req - wsgi.Request object """ ctxt = context.get_admin_context() - user = self.auth.get_user_from_access_key(key) + + try: + user = self.auth.get_user_from_access_key(key) + except exception.NotFound: + user = None if user and user.name == username: token_hash = hashlib.sha1('%s%s%f' % (username, key, diff --git a/nova/api/openstack/flavors.py b/nova/api/openstack/flavors.py index f3d040ba3..1c440b3a9 100644 --- a/nova/api/openstack/flavors.py +++ b/nova/api/openstack/flavors.py @@ -36,7 +36,7 @@ class Controller(wsgi.Controller): def index(self, req): """Return all flavors in brief.""" - return dict(flavors=[dict(id=flavor['id'], name=flavor['name']) + return dict(flavors=[dict(id=flavor['flavorid'], name=flavor['name']) for flavor in self.detail(req)['flavors']]) def detail(self, req): @@ -48,6 +48,7 @@ class Controller(wsgi.Controller): """Return data about the given flavor id.""" ctxt = req.environ['nova.context'] values = db.instance_type_get_by_flavor_id(ctxt, id) + values['id'] = values['flavorid'] return dict(flavor=values) raise faults.Fault(exc.HTTPNotFound()) diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py index dc28a0782..2f26fa873 100644 --- a/nova/api/openstack/servers.py +++ b/nova/api/openstack/servers.py @@ -13,9 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 import hashlib import json import traceback +from xml.dom import minidom from webob import exc @@ -30,6 +32,7 @@ from nova.api.openstack import faults from nova.auth import manager as auth_manager from nova.compute import instance_types from nova.compute import power_state +from nova.quota import QuotaError import nova.api.openstack @@ -141,15 +144,19 @@ class Controller(wsgi.Controller): def create(self, req): """ Creates a new server for a given user """ - env = self._deserialize(req.body, req.get_content_type()) + env = self._deserialize_create(req) if not env: return faults.Fault(exc.HTTPUnprocessableEntity()) context = req.environ['nova.context'] + + key_name = None + key_data = None key_pairs = auth_manager.AuthManager.get_key_pairs(context) - if not key_pairs: - raise exception.NotFound(_("No keypairs defined")) - key_pair = key_pairs[0] + if key_pairs: + key_pair = key_pairs[0] + key_name = key_pair['name'] + key_data = key_pair['public_key'] image_id = common.get_image_id_from_image_hash(self._image_service, context, env['server']['imageId']) @@ -166,18 +173,24 @@ class Controller(wsgi.Controller): for k, v in env['server']['metadata'].items(): metadata.append({'key': k, 'value': v}) - instances = self.compute_api.create( - context, - instance_types.get_by_flavor_id(env['server']['flavorId']), - image_id, - kernel_id=kernel_id, - ramdisk_id=ramdisk_id, - display_name=env['server']['name'], - display_description=env['server']['name'], - key_name=key_pair['name'], - key_data=key_pair['public_key'], - metadata=metadata, - onset_files=env.get('onset_files', [])) + personality = env['server'].get('personality', []) + injected_files = self._get_injected_files(personality) + + try: + instances = self.compute_api.create( + context, + instance_types.get_by_flavor_id(env['server']['flavorId']), + image_id, + kernel_id=kernel_id, + ramdisk_id=ramdisk_id, + display_name=env['server']['name'], + display_description=env['server']['name'], + key_name=key_name, + key_data=key_data, + metadata=metadata, + injected_files=injected_files) + except QuotaError as error: + self._handle_quota_errors(error) server = _translate_keys(instances[0]) password = "%s%s" % (server['server']['name'][:4], @@ -187,6 +200,61 @@ class Controller(wsgi.Controller): password) return server + def _deserialize_create(self, request): + """ + Deserialize a create request + + Overrides normal behavior in the case of xml content + """ + if request.content_type == "application/xml": + deserializer = ServerCreateRequestXMLDeserializer() + return deserializer.deserialize(request.body) + else: + return self._deserialize(request.body, request.get_content_type()) + + def _get_injected_files(self, personality): + """ + Create a list of injected files from the personality attribute + + At this time, injected_files must be formatted as a list of + (file_path, file_content) pairs for compatibility with the + underlying compute service. + """ + injected_files = [] + for item in personality: + try: + path = item['path'] + contents = item['contents'] + except KeyError as key: + expl = _('Bad personality format: missing %s') % key + raise exc.HTTPBadRequest(explanation=expl) + except TypeError: + expl = _('Bad personality format') + raise exc.HTTPBadRequest(explanation=expl) + try: + contents = base64.b64decode(contents) + except TypeError: + expl = _('Personality content for %s cannot be decoded') % path + raise exc.HTTPBadRequest(explanation=expl) + injected_files.append((path, contents)) + return injected_files + + def _handle_quota_errors(self, error): + """ + Reraise quota errors as api-specific http exceptions + """ + if error.code == "OnsetFileLimitExceeded": + expl = _("Personality file limit exceeded") + raise exc.HTTPBadRequest(explanation=expl) + if error.code == "OnsetFilePathLimitExceeded": + expl = _("Personality file path too long") + raise exc.HTTPBadRequest(explanation=expl) + if error.code == "OnsetFileContentLimitExceeded": + expl = _("Personality file content too long") + raise exc.HTTPBadRequest(explanation=expl) + # if the original error is okay, just reraise it + raise error + def update(self, req, id): """ Updates the server name or password """ if len(req.body) == 0: @@ -476,3 +544,79 @@ class Controller(wsgi.Controller): _("Ramdisk not found for image %(image_id)s") % locals()) return kernel_id, ramdisk_id + + +class ServerCreateRequestXMLDeserializer(object): + """ + Deserializer to handle xml-formatted server create requests. + + Handles standard server attributes as well as optional metadata + and personality attributes + """ + + def deserialize(self, string): + """Deserialize an xml-formatted server create request""" + dom = minidom.parseString(string) + server = self._extract_server(dom) + return {'server': server} + + def _extract_server(self, node): + """Marshal the server attribute of a parsed request""" + server = {} + server_node = self._find_first_child_named(node, 'server') + for attr in ["name", "imageId", "flavorId"]: + server[attr] = server_node.getAttribute(attr) + metadata = self._extract_metadata(server_node) + if metadata is not None: + server["metadata"] = metadata + personality = self._extract_personality(server_node) + if personality is not None: + server["personality"] = personality + return server + + def _extract_metadata(self, server_node): + """Marshal the metadata attribute of a parsed request""" + metadata_node = self._find_first_child_named(server_node, "metadata") + if metadata_node is None: + return None + metadata = {} + for meta_node in self._find_children_named(metadata_node, "meta"): + key = meta_node.getAttribute("key") + metadata[key] = self._extract_text(meta_node) + return metadata + + def _extract_personality(self, server_node): + """Marshal the personality attribute of a parsed request""" + personality_node = \ + self._find_first_child_named(server_node, "personality") + if personality_node is None: + return None + personality = [] + for file_node in self._find_children_named(personality_node, "file"): + item = {} + if file_node.hasAttribute("path"): + item["path"] = file_node.getAttribute("path") + item["contents"] = self._extract_text(file_node) + personality.append(item) + return personality + + def _find_first_child_named(self, parent, name): + """Search a nodes children for the first child with a given name""" + for node in parent.childNodes: + if node.nodeName == name: + return node + return None + + def _find_children_named(self, parent, name): + """Return all of a nodes children who have the given name""" + for node in parent.childNodes: + if node.nodeName == name: + yield node + + def _extract_text(self, node): + """Get the text field contained by the given node""" + if len(node.childNodes) == 1: + child = node.childNodes[0] + if child.nodeType == child.TEXT_NODE: + return child.nodeValue + return "" diff --git a/nova/compute/api.py b/nova/compute/api.py index 61f8b2a6a..32577af82 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -80,13 +80,32 @@ class API(base.Base): topic, {"method": "get_network_topic", "args": {'fake': 1}}) + def _check_injected_file_quota(self, context, injected_files): + """ + Enforce quota limits on injected files + + Raises a QuotaError if any limit is exceeded + """ + if injected_files is None: + return + limit = quota.allowed_injected_files(context) + if len(injected_files) > limit: + raise quota.QuotaError(code="OnsetFileLimitExceeded") + path_limit = quota.allowed_injected_file_path_bytes(context) + content_limit = quota.allowed_injected_file_content_bytes(context) + for path, content in injected_files: + if len(path) > path_limit: + raise quota.QuotaError(code="OnsetFilePathLimitExceeded") + if len(content) > content_limit: + raise quota.QuotaError(code="OnsetFileContentLimitExceeded") + def create(self, context, instance_type, image_id, kernel_id=None, ramdisk_id=None, min_count=1, max_count=1, display_name='', display_description='', key_name=None, key_data=None, security_group='default', availability_zone=None, user_data=None, metadata=[], - onset_files=None): + injected_files=None): """Create the number of instances requested if quota and other arguments check out ok.""" @@ -124,6 +143,8 @@ class API(base.Base): LOG.warn(msg) raise quota.QuotaError(msg, "MetadataLimitExceeded") + self._check_injected_file_quota(context, injected_files) + image = self.image_service.show(context, image_id) os_type = None @@ -225,7 +246,7 @@ class API(base.Base): "args": {"topic": FLAGS.compute_topic, "instance_id": instance_id, "availability_zone": availability_zone, - "onset_files": onset_files}}) + "injected_files": injected_files}}) for group_id in security_groups: self.trigger_security_group_members_refresh(elevated, group_id) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index b35216dd3..92deca813 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -34,11 +34,13 @@ terminating it. :func:`nova.utils.import_object` """ -import base64 import datetime +import os import random import string import socket +import tempfile +import time import functools from nova import exception @@ -61,6 +63,9 @@ flags.DEFINE_integer('password_length', 12, flags.DEFINE_string('console_host', socket.gethostname(), 'Console proxy host to use to connect to instances on' 'this host.') +flags.DEFINE_integer('live_migration_retry_count', 30, + ("Retry count needed in live_migration." + " sleep 1 sec for each count")) LOG = logging.getLogger('nova.compute.manager') @@ -174,14 +179,14 @@ class ComputeManager(manager.Manager): """Launch a new instance with specified options.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - instance_ref.onset_files = kwargs.get('onset_files', []) + instance_ref.injected_files = kwargs.get('injected_files', []) if instance_ref['name'] in self.driver.list_instances(): raise exception.Error(_("Instance has already been created")) LOG.audit(_("instance %s: starting..."), instance_id, context=context) self.db.instance_update(context, instance_id, - {'host': self.host}) + {'host': self.host, 'launched_on': self.host}) self.db.instance_set_state(context, instance_id, @@ -353,15 +358,10 @@ class ComputeManager(manager.Manager): LOG.warn(_('trying to inject a file into a non-running ' 'instance: %(instance_id)s (state: %(instance_state)s ' 'expected: %(expected_state)s)') % locals()) - # Files/paths *should* be base64-encoded at this point, but - # double-check to make sure. - b64_path = utils.ensure_b64_encoding(path) - b64_contents = utils.ensure_b64_encoding(file_contents) - plain_path = base64.b64decode(b64_path) nm = instance_ref['name'] - msg = _('instance %(nm)s: injecting file to %(plain_path)s') % locals() + msg = _('instance %(nm)s: injecting file to %(path)s') % locals() LOG.audit(msg) - self.driver.inject_file(instance_ref, b64_path, b64_contents) + self.driver.inject_file(instance_ref, path, file_contents) @exception.wrap_exception @checks_instance_lock @@ -723,3 +723,248 @@ class ComputeManager(manager.Manager): self.volume_manager.remove_compute_volume(context, volume_id) self.db.volume_detached(context, volume_id) return True + + @exception.wrap_exception + def compare_cpu(self, context, cpu_info): + """Checks the host cpu is compatible to a cpu given by xml. + + :param context: security context + :param cpu_info: json string obtained from virConnect.getCapabilities + :returns: See driver.compare_cpu + + """ + return self.driver.compare_cpu(cpu_info) + + @exception.wrap_exception + def create_shared_storage_test_file(self, context): + """Makes tmpfile under FLAGS.instance_path. + + This method enables compute nodes to recognize that they mounts + same shared storage. (create|check|creanup)_shared_storage_test_file() + is a pair. + + :param context: security context + :returns: tmpfile name(basename) + + """ + + dirpath = FLAGS.instances_path + fd, tmp_file = tempfile.mkstemp(dir=dirpath) + LOG.debug(_("Creating tmpfile %s to notify to other " + "compute nodes that they should mount " + "the same storage.") % tmp_file) + os.close(fd) + return os.path.basename(tmp_file) + + @exception.wrap_exception + def check_shared_storage_test_file(self, context, filename): + """Confirms existence of the tmpfile under FLAGS.instances_path. + + :param context: security context + :param filename: confirm existence of FLAGS.instances_path/thisfile + + """ + + tmp_file = os.path.join(FLAGS.instances_path, filename) + if not os.path.exists(tmp_file): + raise exception.NotFound(_('%s not found') % tmp_file) + + @exception.wrap_exception + def cleanup_shared_storage_test_file(self, context, filename): + """Removes existence of the tmpfile under FLAGS.instances_path. + + :param context: security context + :param filename: remove existence of FLAGS.instances_path/thisfile + + """ + + tmp_file = os.path.join(FLAGS.instances_path, filename) + os.remove(tmp_file) + + @exception.wrap_exception + def update_available_resource(self, context): + """See comments update_resource_info. + + :param context: security context + :returns: See driver.update_available_resource() + + """ + + return self.driver.update_available_resource(context, self.host) + + def pre_live_migration(self, context, instance_id): + """Preparations for live migration at dest host. + + :param context: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + + """ + + # Getting instance info + instance_ref = self.db.instance_get(context, instance_id) + ec2_id = instance_ref['hostname'] + + # Getting fixed ips + fixed_ip = self.db.instance_get_fixed_address(context, instance_id) + if not fixed_ip: + msg = _("%(instance_id)s(%(ec2_id)s) does not have fixed_ip.") + raise exception.NotFound(msg % locals()) + + # If any volume is mounted, prepare here. + if not instance_ref['volumes']: + LOG.info(_("%s has no volume."), ec2_id) + else: + for v in instance_ref['volumes']: + self.volume_manager.setup_compute_volume(context, v['id']) + + # Bridge settings. + # Call this method prior to ensure_filtering_rules_for_instance, + # since bridge is not set up, ensure_filtering_rules_for instance + # fails. + # + # Retry operation is necessary because continuously request comes, + # concorrent request occurs to iptables, then it complains. + max_retry = FLAGS.live_migration_retry_count + for cnt in range(max_retry): + try: + self.network_manager.setup_compute_network(context, + instance_id) + break + except exception.ProcessExecutionError: + if cnt == max_retry - 1: + raise + else: + LOG.warn(_("setup_compute_network() failed %(cnt)d." + "Retry up to %(max_retry)d for %(ec2_id)s.") + % locals()) + time.sleep(1) + + # Creating filters to hypervisors and firewalls. + # An example is that nova-instance-instance-xxx, + # which is written to libvirt.xml(Check "virsh nwfilter-list") + # This nwfilter is necessary on the destination host. + # In addition, this method is creating filtering rule + # onto destination host. + self.driver.ensure_filtering_rules_for_instance(instance_ref) + + def live_migration(self, context, instance_id, dest): + """Executing live migration. + + :param context: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param dest: destination host + + """ + + # Get instance for error handling. + instance_ref = self.db.instance_get(context, instance_id) + i_name = instance_ref.name + + try: + # Checking volume node is working correctly when any volumes + # are attached to instances. + if instance_ref['volumes']: + rpc.call(context, + FLAGS.volume_topic, + {"method": "check_for_export", + "args": {'instance_id': instance_id}}) + + # Asking dest host to preparing live migration. + rpc.call(context, + self.db.queue_get_for(context, FLAGS.compute_topic, dest), + {"method": "pre_live_migration", + "args": {'instance_id': instance_id}}) + + except Exception: + msg = _("Pre live migration for %(i_name)s failed at %(dest)s") + LOG.error(msg % locals()) + self.recover_live_migration(context, instance_ref) + raise + + # Executing live migration + # live_migration might raises exceptions, but + # nothing must be recovered in this version. + self.driver.live_migration(context, instance_ref, dest, + self.post_live_migration, + self.recover_live_migration) + + def post_live_migration(self, ctxt, instance_ref, dest): + """Post operations for live migration. + + This method is called from live_migration + and mainly updating database record. + + :param ctxt: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param dest: destination host + + """ + + LOG.info(_('post_live_migration() is started..')) + instance_id = instance_ref['id'] + + # Detaching volumes. + try: + for vol in self.db.volume_get_all_by_instance(ctxt, instance_id): + self.volume_manager.remove_compute_volume(ctxt, vol['id']) + except exception.NotFound: + pass + + # Releasing vlan. + # (not necessary in current implementation?) + + # Releasing security group ingress rule. + self.driver.unfilter_instance(instance_ref) + + # Database updating. + i_name = instance_ref.name + try: + # Not return if floating_ip is not found, otherwise, + # instance never be accessible.. + floating_ip = self.db.instance_get_floating_address(ctxt, + instance_id) + if not floating_ip: + LOG.info(_('No floating_ip is found for %s.'), i_name) + else: + floating_ip_ref = self.db.floating_ip_get_by_address(ctxt, + floating_ip) + self.db.floating_ip_update(ctxt, + floating_ip_ref['address'], + {'host': dest}) + except exception.NotFound: + LOG.info(_('No floating_ip is found for %s.'), i_name) + except: + LOG.error(_("Live migration: Unexpected error:" + "%s cannot inherit floating ip..") % i_name) + + # Restore instance/volume state + self.recover_live_migration(ctxt, instance_ref, dest) + + LOG.info(_('Migrating %(i_name)s to %(dest)s finished successfully.') + % locals()) + LOG.info(_("You may see the error \"libvirt: QEMU error: " + "Domain not found: no domain with matching name.\" " + "This error can be safely ignored.")) + + def recover_live_migration(self, ctxt, instance_ref, host=None): + """Recovers Instance/volume state from migrating -> running. + + :param ctxt: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param host: + DB column value is updated by this hostname. + if none, the host instance currently running is selected. + + """ + + if not host: + host = instance_ref['host'] + + self.db.instance_update(ctxt, + instance_ref['id'], + {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': host}) + + for volume in instance_ref['volumes']: + self.db.volume_update(ctxt, volume['id'], {'status': 'in-use'}) diff --git a/nova/console/manager.py b/nova/console/manager.py index 57c75cf4f..bfa571ea9 100644 --- a/nova/console/manager.py +++ b/nova/console/manager.py @@ -69,7 +69,7 @@ class ConsoleProxyManager(manager.Manager): except exception.NotFound: logging.debug(_("Adding console")) if not password: - password = self.driver.generate_password() + password = utils.generate_password(8) if not port: port = self.driver.get_port(context) console_data = {'instance_name': name, diff --git a/nova/console/xvp.py b/nova/console/xvp.py index 68d8c8565..0cedfbb13 100644 --- a/nova/console/xvp.py +++ b/nova/console/xvp.py @@ -91,10 +91,6 @@ class XVPConsoleProxy(object): """Trim password to length, and encode""" return self._xvp_encrypt(password) - def generate_password(self, length=8): - """Returns random console password""" - return os.urandom(length * 2).encode('base64')[:length] - def _rebuild_xvp_conf(self, context): logging.debug(_("Rebuilding xvp conf")) pools = [pool for pool in diff --git a/nova/db/api.py b/nova/db/api.py index aa86f0af1..3cb0e5811 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -104,6 +104,11 @@ def service_get_all_by_host(context, host): return IMPL.service_get_all_by_host(context, host) +def service_get_all_compute_by_host(context, host): + """Get all compute services for a given host.""" + return IMPL.service_get_all_compute_by_host(context, host) + + def service_get_all_compute_sorted(context): """Get all compute services sorted by instance count. @@ -153,6 +158,29 @@ def service_update(context, service_id, values): ################### +def compute_node_get(context, compute_id, session=None): + """Get an computeNode or raise if it does not exist.""" + return IMPL.compute_node_get(context, compute_id) + + +def compute_node_create(context, values): + """Create a computeNode from the values dictionary.""" + return IMPL.compute_node_create(context, values) + + +def compute_node_update(context, compute_id, values): + """Set the given properties on an computeNode and update it. + + Raises NotFound if computeNode does not exist. + + """ + + return IMPL.compute_node_update(context, compute_id, values) + + +################### + + def certificate_create(context, values): """Create a certificate from the values dictionary.""" return IMPL.certificate_create(context, values) @@ -257,6 +285,11 @@ def floating_ip_get_by_address(context, address): return IMPL.floating_ip_get_by_address(context, address) +def floating_ip_update(context, address, values): + """Update a floating ip by address or raise if it doesn't exist.""" + return IMPL.floating_ip_update(context, address, values) + + #################### def migration_update(context, id, values): @@ -320,6 +353,11 @@ def fixed_ip_get_all(context): return IMPL.fixed_ip_get_all(context) +def fixed_ip_get_all_by_host(context, host): + """Get all defined fixed ips used by a host.""" + return IMPL.fixed_ip_get_all_by_host(context, host) + + def fixed_ip_get_by_address(context, address): """Get a fixed ip by address or raise if it does not exist.""" return IMPL.fixed_ip_get_by_address(context, address) @@ -441,6 +479,27 @@ def instance_add_security_group(context, instance_id, security_group_id): security_group_id) +def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id): + """Get instances.vcpus by host and project.""" + return IMPL.instance_get_vcpu_sum_by_host_and_project(context, + hostname, + proj_id) + + +def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id): + """Get amount of memory by host and project.""" + return IMPL.instance_get_memory_sum_by_host_and_project(context, + hostname, + proj_id) + + +def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id): + """Get total amount of disk by host and project.""" + return IMPL.instance_get_disk_sum_by_host_and_project(context, + hostname, + proj_id) + + def instance_action_create(context, values): """Create an instance action from the values dictionary.""" return IMPL.instance_action_create(context, values) @@ -765,6 +824,11 @@ def volume_get_all_by_host(context, host): return IMPL.volume_get_all_by_host(context, host) +def volume_get_all_by_instance(context, instance_id): + """Get all volumes belonging to a instance.""" + return IMPL.volume_get_all_by_instance(context, instance_id) + + def volume_get_all_by_project(context, project_id): """Get all volumes belonging to a project.""" return IMPL.volume_get_all_by_project(context, project_id) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 8b541757a..9d9b86c1d 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -119,6 +119,11 @@ def service_destroy(context, service_id): service_ref = service_get(context, service_id, session=session) service_ref.delete(session=session) + if service_ref.topic == 'compute' and \ + len(service_ref.compute_node) != 0: + for c in service_ref.compute_node: + c.delete(session=session) + @require_admin_context def service_get(context, service_id, session=None): @@ -126,6 +131,7 @@ def service_get(context, service_id, session=None): session = get_session() result = session.query(models.Service).\ + options(joinedload('compute_node')).\ filter_by(id=service_id).\ filter_by(deleted=can_read_deleted(context)).\ first() @@ -176,6 +182,24 @@ def service_get_all_by_host(context, host): @require_admin_context +def service_get_all_compute_by_host(context, host): + topic = 'compute' + session = get_session() + result = session.query(models.Service).\ + options(joinedload('compute_node')).\ + filter_by(deleted=False).\ + filter_by(host=host).\ + filter_by(topic=topic).\ + all() + + if not result: + raise exception.NotFound(_("%s does not exist or is not " + "a compute node.") % host) + + return result + + +@require_admin_context def _service_get_all_topic_subquery(context, session, topic, subq, label): sort_value = getattr(subq.c, label) return session.query(models.Service, func.coalesce(sort_value, 0)).\ @@ -286,6 +310,42 @@ def service_update(context, service_id, values): @require_admin_context +def compute_node_get(context, compute_id, session=None): + if not session: + session = get_session() + + result = session.query(models.ComputeNode).\ + filter_by(id=compute_id).\ + filter_by(deleted=can_read_deleted(context)).\ + first() + + if not result: + raise exception.NotFound(_('No computeNode for id %s') % compute_id) + + return result + + +@require_admin_context +def compute_node_create(context, values): + compute_node_ref = models.ComputeNode() + compute_node_ref.update(values) + compute_node_ref.save() + return compute_node_ref + + +@require_admin_context +def compute_node_update(context, compute_id, values): + session = get_session() + with session.begin(): + compute_ref = compute_node_get(context, compute_id, session=session) + compute_ref.update(values) + compute_ref.save(session=session) + + +################### + + +@require_admin_context def certificate_get(context, certificate_id, session=None): if not session: session = get_session() @@ -506,6 +566,16 @@ def floating_ip_get_by_address(context, address, session=None): return result +@require_context +def floating_ip_update(context, address, values): + session = get_session() + with session.begin(): + floating_ip_ref = floating_ip_get_by_address(context, address, session) + for (key, value) in values.iteritems(): + floating_ip_ref[key] = value + floating_ip_ref.save(session=session) + + ################### @@ -602,6 +672,22 @@ def fixed_ip_get_all(context, session=None): return result +@require_admin_context +def fixed_ip_get_all_by_host(context, host=None): + session = get_session() + + result = session.query(models.FixedIp).\ + join(models.FixedIp.instance).\ + filter_by(state=1).\ + filter_by(host=host).\ + all() + + if not result: + raise exception.NotFound(_('No fixed ips for this host defined')) + + return result + + @require_context def fixed_ip_get_by_address(context, address, session=None): if not session: @@ -676,6 +762,15 @@ def instance_create(context, values): context - request context object values - dict containing column values. """ + metadata = values.get('metadata') + metadata_refs = [] + if metadata: + for metadata_item in metadata: + metadata_ref = models.InstanceMetadata() + metadata_ref.update(metadata_item) + metadata_refs.append(metadata_ref) + values['metadata'] = metadata_refs + instance_ref = models.Instance() instance_ref.update(values) @@ -907,6 +1002,45 @@ def instance_add_security_group(context, instance_id, security_group_id): @require_context +def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id): + session = get_session() + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(project_id=proj_id).\ + filter_by(deleted=False).\ + value(func.sum(models.Instance.vcpus)) + if not result: + return 0 + return result + + +@require_context +def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id): + session = get_session() + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(project_id=proj_id).\ + filter_by(deleted=False).\ + value(func.sum(models.Instance.memory_mb)) + if not result: + return 0 + return result + + +@require_context +def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id): + session = get_session() + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(project_id=proj_id).\ + filter_by(deleted=False).\ + value(func.sum(models.Instance.local_gb)) + if not result: + return 0 + return result + + +@require_context def instance_action_create(context, values): """Create an instance action from the values dictionary.""" action_ref = models.InstanceActions() @@ -1530,6 +1664,18 @@ def volume_get_all_by_host(context, host): all() +@require_admin_context +def volume_get_all_by_instance(context, instance_id): + session = get_session() + result = session.query(models.Volume).\ + filter_by(instance_id=instance_id).\ + filter_by(deleted=False).\ + all() + if not result: + raise exception.NotFound(_('No volume for instance %s') % instance_id) + return result + + @require_context def volume_get_all_by_project(context, project_id): authorize_project_context(context, project_id) diff --git a/nova/db/sqlalchemy/migrate_repo/versions/011_live_migration.py b/nova/db/sqlalchemy/migrate_repo/versions/011_live_migration.py new file mode 100644 index 000000000..23ccccb4e --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/011_live_migration.py @@ -0,0 +1,83 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from migrate import * +from nova import log as logging +from sqlalchemy import * + + +meta = MetaData() + +instances = Table('instances', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + +# +# New Tables +# + +compute_nodes = Table('compute_nodes', meta, + Column('created_at', DateTime(timezone=False)), + Column('updated_at', DateTime(timezone=False)), + Column('deleted_at', DateTime(timezone=False)), + Column('deleted', Boolean(create_constraint=True, name=None)), + Column('id', Integer(), primary_key=True, nullable=False), + Column('service_id', Integer(), nullable=False), + + Column('vcpus', Integer(), nullable=False), + Column('memory_mb', Integer(), nullable=False), + Column('local_gb', Integer(), nullable=False), + Column('vcpus_used', Integer(), nullable=False), + Column('memory_mb_used', Integer(), nullable=False), + Column('local_gb_used', Integer(), nullable=False), + Column('hypervisor_type', + Text(convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=False), + Column('hypervisor_version', Integer(), nullable=False), + Column('cpu_info', + Text(convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=False), + ) + + +# +# Tables to alter +# +instances_launched_on = Column( + 'launched_on', + Text(convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=True) + + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; + # bind migrate_engine to your metadata + meta.bind = migrate_engine + + try: + compute_nodes.create() + except Exception: + logging.info(repr(compute_nodes)) + logging.exception('Exception while creating table') + meta.drop_all(tables=[compute_nodes]) + raise + + instances.create_column(instances_launched_on) diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 6bf058327..1845e85eb 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -113,6 +113,41 @@ class Service(BASE, NovaBase): availability_zone = Column(String(255), default='nova') +class ComputeNode(BASE, NovaBase): + """Represents a running compute service on a host.""" + + __tablename__ = 'compute_nodes' + id = Column(Integer, primary_key=True) + service_id = Column(Integer, ForeignKey('services.id'), nullable=True) + service = relationship(Service, + backref=backref('compute_node'), + foreign_keys=service_id, + primaryjoin='and_(' + 'ComputeNode.service_id == Service.id,' + 'ComputeNode.deleted == False)') + + vcpus = Column(Integer, nullable=True) + memory_mb = Column(Integer, nullable=True) + local_gb = Column(Integer, nullable=True) + vcpus_used = Column(Integer, nullable=True) + memory_mb_used = Column(Integer, nullable=True) + local_gb_used = Column(Integer, nullable=True) + hypervisor_type = Column(Text, nullable=True) + hypervisor_version = Column(Integer, nullable=True) + + # Note(masumotok): Expected Strings example: + # + # '{"arch":"x86_64", + # "model":"Nehalem", + # "topology":{"sockets":1, "threads":2, "cores":3}, + # "features":["tdtscp", "xtpr"]}' + # + # Points are "json translatable" and it must have all dictionary keys + # above, since it is copied from <cpu> tag of getCapabilities() + # (See libvirt.virtConnection). + cpu_info = Column(Text, nullable=True) + + class Certificate(BASE, NovaBase): """Represents a an x509 certificate""" __tablename__ = 'certificates' @@ -126,7 +161,7 @@ class Certificate(BASE, NovaBase): class Instance(BASE, NovaBase): """Represents a guest vm.""" __tablename__ = 'instances' - onset_files = [] + injected_files = [] id = Column(Integer, primary_key=True, autoincrement=True) @@ -191,6 +226,9 @@ class Instance(BASE, NovaBase): display_name = Column(String(255)) display_description = Column(String(255)) + # To remember on which host a instance booted. + # An instance may have moved to another host by live migraiton. + launched_on = Column(Text) locked = Column(Boolean) os_type = Column(String(255)) diff --git a/nova/quota.py b/nova/quota.py index 6b52a97fa..2b24c0b5b 100644 --- a/nova/quota.py +++ b/nova/quota.py @@ -37,6 +37,12 @@ flags.DEFINE_integer('quota_floating_ips', 10, 'number of floating ips allowed per project') flags.DEFINE_integer('quota_metadata_items', 128, 'number of metadata items allowed per instance') +flags.DEFINE_integer('quota_max_injected_files', 5, + 'number of injected files allowed') +flags.DEFINE_integer('quota_max_injected_file_content_bytes', 10 * 1024, + 'number of bytes allowed per injected file') +flags.DEFINE_integer('quota_max_injected_file_path_bytes', 255, + 'number of bytes allowed per injected file path') def get_quota(context, project_id): @@ -46,6 +52,7 @@ def get_quota(context, project_id): 'gigabytes': FLAGS.quota_gigabytes, 'floating_ips': FLAGS.quota_floating_ips, 'metadata_items': FLAGS.quota_metadata_items} + try: quota = db.quota_get(context, project_id) for key in rval.keys(): @@ -106,6 +113,21 @@ def allowed_metadata_items(context, num_metadata_items): return min(num_metadata_items, num_allowed_metadata_items) +def allowed_injected_files(context): + """Return the number of injected files allowed""" + return FLAGS.quota_max_injected_files + + +def allowed_injected_file_content_bytes(context): + """Return the number of bytes allowed per injected file content""" + return FLAGS.quota_max_injected_file_content_bytes + + +def allowed_injected_file_path_bytes(context): + """Return the number of bytes allowed in an injected file path""" + return FLAGS.quota_max_injected_file_path_bytes + + class QuotaError(exception.ApiError): """Quota Exceeeded""" pass diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index 66e46c1b9..ed3dfe1c0 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -26,10 +26,14 @@ import datetime from nova import db from nova import exception from nova import flags +from nova import log as logging +from nova import rpc +from nova.compute import power_state FLAGS = flags.FLAGS flags.DEFINE_integer('service_down_time', 60, 'maximum time since last checkin for up service') +flags.DECLARE('instances_path', 'nova.compute.manager') class NoValidHost(exception.Error): @@ -64,3 +68,236 @@ class Scheduler(object): def schedule(self, context, topic, *_args, **_kwargs): """Must override at least this method for scheduler to work.""" raise NotImplementedError(_("Must implement a fallback schedule")) + + def schedule_live_migration(self, context, instance_id, dest): + """Live migration scheduling method. + + :param context: + :param instance_id: + :param dest: destination host + :return: + The host where instance is running currently. + Then scheduler send request that host. + + """ + + # Whether instance exists and is running. + instance_ref = db.instance_get(context, instance_id) + + # Checking instance. + self._live_migration_src_check(context, instance_ref) + + # Checking destination host. + self._live_migration_dest_check(context, instance_ref, dest) + + # Common checking. + self._live_migration_common_check(context, instance_ref, dest) + + # Changing instance_state. + db.instance_set_state(context, + instance_id, + power_state.PAUSED, + 'migrating') + + # Changing volume state + for volume_ref in instance_ref['volumes']: + db.volume_update(context, + volume_ref['id'], + {'status': 'migrating'}) + + # Return value is necessary to send request to src + # Check _schedule() in detail. + src = instance_ref['host'] + return src + + def _live_migration_src_check(self, context, instance_ref): + """Live migration check routine (for src host). + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + + """ + + # Checking instance is running. + if (power_state.RUNNING != instance_ref['state'] or \ + 'running' != instance_ref['state_description']): + ec2_id = instance_ref['hostname'] + raise exception.Invalid(_('Instance(%s) is not running') % ec2_id) + + # Checing volume node is running when any volumes are mounted + # to the instance. + if len(instance_ref['volumes']) != 0: + services = db.service_get_all_by_topic(context, 'volume') + if len(services) < 1 or not self.service_is_up(services[0]): + raise exception.Invalid(_("volume node is not alive" + "(time synchronize problem?)")) + + # Checking src host exists and compute node + src = instance_ref['host'] + services = db.service_get_all_compute_by_host(context, src) + + # Checking src host is alive. + if not self.service_is_up(services[0]): + raise exception.Invalid(_("%s is not alive(time " + "synchronize problem?)") % src) + + def _live_migration_dest_check(self, context, instance_ref, dest): + """Live migration check routine (for destination host). + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + # Checking dest exists and compute node. + dservice_refs = db.service_get_all_compute_by_host(context, dest) + dservice_ref = dservice_refs[0] + + # Checking dest host is alive. + if not self.service_is_up(dservice_ref): + raise exception.Invalid(_("%s is not alive(time " + "synchronize problem?)") % dest) + + # Checking whether The host where instance is running + # and dest is not same. + src = instance_ref['host'] + if dest == src: + ec2_id = instance_ref['hostname'] + raise exception.Invalid(_("%(dest)s is where %(ec2_id)s is " + "running now. choose other host.") + % locals()) + + # Checking dst host still has enough capacities. + self.assert_compute_node_has_enough_resources(context, + instance_ref, + dest) + + def _live_migration_common_check(self, context, instance_ref, dest): + """Live migration common check routine. + + Below checkings are followed by + http://wiki.libvirt.org/page/TodoPreMigrationChecks + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + # Checking shared storage connectivity + self.mounted_on_same_shared_storage(context, instance_ref, dest) + + # Checking dest exists. + dservice_refs = db.service_get_all_compute_by_host(context, dest) + dservice_ref = dservice_refs[0]['compute_node'][0] + + # Checking original host( where instance was launched at) exists. + try: + oservice_refs = db.service_get_all_compute_by_host(context, + instance_ref['launched_on']) + except exception.NotFound: + raise exception.Invalid(_("host %s where instance was launched " + "does not exist.") + % instance_ref['launched_on']) + oservice_ref = oservice_refs[0]['compute_node'][0] + + # Checking hypervisor is same. + orig_hypervisor = oservice_ref['hypervisor_type'] + dest_hypervisor = dservice_ref['hypervisor_type'] + if orig_hypervisor != dest_hypervisor: + raise exception.Invalid(_("Different hypervisor type" + "(%(orig_hypervisor)s->" + "%(dest_hypervisor)s)')" % locals())) + + # Checkng hypervisor version. + orig_hypervisor = oservice_ref['hypervisor_version'] + dest_hypervisor = dservice_ref['hypervisor_version'] + if orig_hypervisor > dest_hypervisor: + raise exception.Invalid(_("Older hypervisor version" + "(%(orig_hypervisor)s->" + "%(dest_hypervisor)s)") % locals()) + + # Checking cpuinfo. + try: + rpc.call(context, + db.queue_get_for(context, FLAGS.compute_topic, dest), + {"method": 'compare_cpu', + "args": {'cpu_info': oservice_ref['cpu_info']}}) + + except rpc.RemoteError: + src = instance_ref['host'] + logging.exception(_("host %(dest)s is not compatible with " + "original host %(src)s.") % locals()) + raise + + def assert_compute_node_has_enough_resources(self, context, + instance_ref, dest): + """Checks if destination host has enough resource for live migration. + + Currently, only memory checking has been done. + If storage migration(block migration, meaning live-migration + without any shared storage) will be available, local storage + checking is also necessary. + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + # Getting instance information + ec2_id = instance_ref['hostname'] + + # Getting host information + service_refs = db.service_get_all_compute_by_host(context, dest) + compute_node_ref = service_refs[0]['compute_node'][0] + + mem_total = int(compute_node_ref['memory_mb']) + mem_used = int(compute_node_ref['memory_mb_used']) + mem_avail = mem_total - mem_used + mem_inst = instance_ref['memory_mb'] + if mem_avail <= mem_inst: + raise exception.NotEmpty(_("Unable to migrate %(ec2_id)s " + "to destination: %(dest)s " + "(host:%(mem_avail)s " + "<= instance:%(mem_inst)s)") + % locals()) + + def mounted_on_same_shared_storage(self, context, instance_ref, dest): + """Check if the src and dest host mount same shared storage. + + At first, dest host creates temp file, and src host can see + it if they mounts same shared storage. Then src host erase it. + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + src = instance_ref['host'] + dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest) + src_t = db.queue_get_for(context, FLAGS.compute_topic, src) + + try: + # create tmpfile at dest host + filename = rpc.call(context, dst_t, + {"method": 'create_shared_storage_test_file'}) + + # make sure existence at src host. + rpc.call(context, src_t, + {"method": 'check_shared_storage_test_file', + "args": {'filename': filename}}) + + except rpc.RemoteError: + ipath = FLAGS.instances_path + logging.error(_("Cannot confirm tmpfile at %(ipath)s is on " + "same shared storage between %(src)s " + "and %(dest)s.") % locals()) + raise + + finally: + rpc.call(context, dst_t, + {"method": 'cleanup_shared_storage_test_file', + "args": {'filename': filename}}) diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index c94397210..053a53356 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -77,3 +77,55 @@ class SchedulerManager(manager.Manager): {"method": method, "args": kwargs}) LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals()) + + # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin. + # Based on bexar design summit discussion, + # just put this here for bexar release. + def show_host_resources(self, context, host, *args): + """Shows the physical/usage resource given by hosts. + + :param context: security context + :param host: hostname + :returns: + example format is below. + {'resource':D, 'usage':{proj_id1:D, proj_id2:D}} + D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048} + + """ + + compute_ref = db.service_get_all_compute_by_host(context, host) + compute_ref = compute_ref[0] + + # Getting physical resource information + compute_node_ref = compute_ref['compute_node'][0] + resource = {'vcpus': compute_node_ref['vcpus'], + 'memory_mb': compute_node_ref['memory_mb'], + 'local_gb': compute_node_ref['local_gb'], + 'vcpus_used': compute_node_ref['vcpus_used'], + 'memory_mb_used': compute_node_ref['memory_mb_used'], + 'local_gb_used': compute_node_ref['local_gb_used']} + + # Getting usage resource information + usage = {} + instance_refs = db.instance_get_all_by_host(context, + compute_ref['host']) + if not instance_refs: + return {'resource': resource, 'usage': usage} + + project_ids = [i['project_id'] for i in instance_refs] + project_ids = list(set(project_ids)) + for project_id in project_ids: + vcpus = db.instance_get_vcpu_sum_by_host_and_project(context, + host, + project_id) + mem = db.instance_get_memory_sum_by_host_and_project(context, + host, + project_id) + hdd = db.instance_get_disk_sum_by_host_and_project(context, + host, + project_id) + usage[project_id] = {'vcpus': int(vcpus), + 'memory_mb': int(mem), + 'local_gb': int(hdd)} + + return {'resource': resource, 'usage': usage} diff --git a/nova/service.py b/nova/service.py index af20db01c..d60df987c 100644 --- a/nova/service.py +++ b/nova/service.py @@ -92,6 +92,9 @@ class Service(object): except exception.NotFound: self._create_service_ref(ctxt) + if 'nova-compute' == self.binary: + self.manager.update_available_resource(ctxt) + conn1 = rpc.Connection.instance(new=True) conn2 = rpc.Connection.instance(new=True) if self.report_interval: diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index e50d11a3d..0bbb1c890 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -69,8 +69,6 @@ def fake_auth_init(self, application): @webob.dec.wsgify def fake_wsgi(self, req): req.environ['nova.context'] = context.RequestContext(1, 1) - if req.body: - req.environ['inst_dict'] = json.loads(req.body) return self.application @@ -85,10 +83,17 @@ def wsgi_app(inner_application=None): return mapper -def stub_out_key_pair_funcs(stubs): +def stub_out_key_pair_funcs(stubs, have_key_pair=True): def key_pair(context, user_id): return [dict(name='key', public_key='public_key')] - stubs.Set(nova.db, 'key_pair_get_all_by_user', key_pair) + + def no_key_pair(context, user_id): + return [] + + if have_key_pair: + stubs.Set(nova.db, 'key_pair_get_all_by_user', key_pair) + else: + stubs.Set(nova.db, 'key_pair_get_all_by_user', no_key_pair) def stub_out_image_service(stubs): @@ -321,7 +326,10 @@ class FakeAuthManager(object): (user.id == p.project_manager_id)] def get_user_from_access_key(self, key): - return FakeAuthManager.auth_data.get(key, None) + try: + return FakeAuthManager.auth_data[key] + except KeyError: + raise exc.NotFound class FakeRateLimiter(object): diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py index aaaa4e415..0448ed701 100644 --- a/nova/tests/api/openstack/test_auth.py +++ b/nova/tests/api/openstack/test_auth.py @@ -51,11 +51,12 @@ class Test(test.TestCase): def test_authorize_user(self): f = fakes.FakeAuthManager() - f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None)) + f.add_user('user1_key', + nova.auth.manager.User(1, 'user1', None, None, None)) req = webob.Request.blank('/v1.0/') - req.headers['X-Auth-User'] = 'herp' - req.headers['X-Auth-Key'] = 'derp' + req.headers['X-Auth-User'] = 'user1' + req.headers['X-Auth-Key'] = 'user1_key' result = req.get_response(fakes.wsgi_app()) self.assertEqual(result.status, '204 No Content') self.assertEqual(len(result.headers['X-Auth-Token']), 40) @@ -65,13 +66,13 @@ class Test(test.TestCase): def test_authorize_token(self): f = fakes.FakeAuthManager() - u = nova.auth.manager.User(1, 'herp', None, None, None) - f.add_user('derp', u) - f.create_project('test', u) + u = nova.auth.manager.User(1, 'user1', None, None, None) + f.add_user('user1_key', u) + f.create_project('user1_project', u) req = webob.Request.blank('/v1.0/', {'HTTP_HOST': 'foo'}) - req.headers['X-Auth-User'] = 'herp' - req.headers['X-Auth-Key'] = 'derp' + req.headers['X-Auth-User'] = 'user1' + req.headers['X-Auth-Key'] = 'user1_key' result = req.get_response(fakes.wsgi_app()) self.assertEqual(result.status, '204 No Content') self.assertEqual(len(result.headers['X-Auth-Token']), 40) @@ -92,7 +93,7 @@ class Test(test.TestCase): def test_token_expiry(self): self.destroy_called = False - token_hash = 'bacon' + token_hash = 'token_hash' def destroy_token_mock(meh, context, token): self.destroy_called = True @@ -109,15 +110,26 @@ class Test(test.TestCase): bad_token) req = webob.Request.blank('/v1.0/') - req.headers['X-Auth-Token'] = 'bacon' + req.headers['X-Auth-Token'] = 'token_hash' result = req.get_response(fakes.wsgi_app()) self.assertEqual(result.status, '401 Unauthorized') self.assertEqual(self.destroy_called, True) - def test_bad_user(self): + def test_bad_user_bad_key(self): + req = webob.Request.blank('/v1.0/') + req.headers['X-Auth-User'] = 'unknown_user' + req.headers['X-Auth-Key'] = 'unknown_user_key' + result = req.get_response(fakes.wsgi_app()) + self.assertEqual(result.status, '401 Unauthorized') + + def test_bad_user_good_key(self): + f = fakes.FakeAuthManager() + u = nova.auth.manager.User(1, 'user1', None, None, None) + f.add_user('user1_key', u) + req = webob.Request.blank('/v1.0/') - req.headers['X-Auth-User'] = 'herp' - req.headers['X-Auth-Key'] = 'derp' + req.headers['X-Auth-User'] = 'unknown_user' + req.headers['X-Auth-Key'] = 'user1_key' result = req.get_response(fakes.wsgi_app()) self.assertEqual(result.status, '401 Unauthorized') @@ -128,7 +140,7 @@ class Test(test.TestCase): def test_bad_token(self): req = webob.Request.blank('/v1.0/') - req.headers['X-Auth-Token'] = 'baconbaconbacon' + req.headers['X-Auth-Token'] = 'unknown_token' result = req.get_response(fakes.wsgi_app()) self.assertEqual(result.status, '401 Unauthorized') @@ -137,11 +149,11 @@ class TestFunctional(test.TestCase): def test_token_expiry(self): ctx = context.get_admin_context() tok = db.auth_token_create(ctx, dict( - token_hash='bacon', + token_hash='test_token_hash', cdn_management_url='', server_management_url='', storage_url='', - user_id='ham', + user_id='user1', )) db.auth_token_update(ctx, tok.token_hash, dict( @@ -149,13 +161,13 @@ class TestFunctional(test.TestCase): )) req = webob.Request.blank('/v1.0/') - req.headers['X-Auth-Token'] = 'bacon' + req.headers['X-Auth-Token'] = 'test_token_hash' result = req.get_response(fakes.wsgi_app()) self.assertEqual(result.status, '401 Unauthorized') def test_token_doesnotexist(self): req = webob.Request.blank('/v1.0/') - req.headers['X-Auth-Token'] = 'ham' + req.headers['X-Auth-Token'] = 'nonexistant_token_hash' result = req.get_response(fakes.wsgi_app()) self.assertEqual(result.status, '401 Unauthorized') @@ -178,13 +190,13 @@ class TestLimiter(test.TestCase): def test_authorize_token(self): f = fakes.FakeAuthManager() - u = nova.auth.manager.User(1, 'herp', None, None, None) - f.add_user('derp', u) + u = nova.auth.manager.User(1, 'user1', None, None, None) + f.add_user('user1_key', u) f.create_project('test', u) req = webob.Request.blank('/v1.0/') - req.headers['X-Auth-User'] = 'herp' - req.headers['X-Auth-Key'] = 'derp' + req.headers['X-Auth-User'] = 'user1' + req.headers['X-Auth-Key'] = 'user1_key' result = req.get_response(fakes.wsgi_app()) self.assertEqual(len(result.headers['X-Auth-Token']), 40) diff --git a/nova/tests/api/openstack/test_flavors.py b/nova/tests/api/openstack/test_flavors.py index 8280a505f..30326dc50 100644 --- a/nova/tests/api/openstack/test_flavors.py +++ b/nova/tests/api/openstack/test_flavors.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import stubout import webob @@ -50,3 +51,5 @@ class FlavorsTest(test.TestCase): req = webob.Request.blank('/v1.0/flavors/1') res = req.get_response(fakes.wsgi_app()) self.assertEqual(res.status_int, 200) + body = json.loads(res.body) + self.assertEqual(body['flavor']['id'], 1) diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py index 5d7a208e9..03e00af2a 100644 --- a/nova/tests/api/openstack/test_servers.py +++ b/nova/tests/api/openstack/test_servers.py @@ -15,8 +15,11 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 import datetime import json +import unittest +from xml.dom import minidom import stubout import webob @@ -216,7 +219,8 @@ class ServersTest(test.TestCase): servers = json.loads(res.body)['servers'] self.assertEqual([s['id'] for s in servers], [1, 2]) - def test_create_instance(self): + def _test_create_instance_helper(self): + """Shared implementation for tests below that create instance""" def instance_create(context, inst): return {'id': '1', 'display_name': 'server_test'} @@ -271,6 +275,13 @@ class ServersTest(test.TestCase): self.assertEqual(res.status_int, 200) + def test_create_instance(self): + self._test_create_instance_helper() + + def test_create_instance_no_key_pair(self): + fakes.stub_out_key_pair_funcs(self.stubs, have_key_pair=False) + self._test_create_instance_helper() + def test_update_no_body(self): req = webob.Request.blank('/v1.0/servers/1') req.method = 'PUT' @@ -599,5 +610,529 @@ class ServersTest(test.TestCase): res = req.get_response(fakes.wsgi_app()) self.assertEqual(res.status_int, 400) + +class TestServerCreateRequestXMLDeserializer(unittest.TestCase): + + def setUp(self): + self.deserializer = servers.ServerCreateRequestXMLDeserializer() + + def test_minimal_request(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"/>""" + request = self.deserializer.deserialize(serial_request) + expected = {"server": { + "name": "new-server-test", + "imageId": "1", + "flavorId": "1", + }} + self.assertEquals(request, expected) + + def test_request_with_empty_metadata(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata/> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"server": { + "name": "new-server-test", + "imageId": "1", + "flavorId": "1", + "metadata": {}, + }} + self.assertEquals(request, expected) + + def test_request_with_empty_personality(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <personality/> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"server": { + "name": "new-server-test", + "imageId": "1", + "flavorId": "1", + "personality": [], + }} + self.assertEquals(request, expected) + + def test_request_with_empty_metadata_and_personality(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata/> + <personality/> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"server": { + "name": "new-server-test", + "imageId": "1", + "flavorId": "1", + "metadata": {}, + "personality": [], + }} + self.assertEquals(request, expected) + + def test_request_with_empty_metadata_and_personality_reversed(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <personality/> + <metadata/> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"server": { + "name": "new-server-test", + "imageId": "1", + "flavorId": "1", + "metadata": {}, + "personality": [], + }} + self.assertEquals(request, expected) + + def test_request_with_one_personality(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <personality> + <file path="/etc/conf">aabbccdd</file> + </personality> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = [{"path": "/etc/conf", "contents": "aabbccdd"}] + self.assertEquals(request["server"]["personality"], expected) + + def test_request_with_two_personalities(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> +<personality><file path="/etc/conf">aabbccdd</file> +<file path="/etc/sudoers">abcd</file></personality></server>""" + request = self.deserializer.deserialize(serial_request) + expected = [{"path": "/etc/conf", "contents": "aabbccdd"}, + {"path": "/etc/sudoers", "contents": "abcd"}] + self.assertEquals(request["server"]["personality"], expected) + + def test_request_second_personality_node_ignored(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <personality> + <file path="/etc/conf">aabbccdd</file> + </personality> + <personality> + <file path="/etc/ignoreme">anything</file> + </personality> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = [{"path": "/etc/conf", "contents": "aabbccdd"}] + self.assertEquals(request["server"]["personality"], expected) + + def test_request_with_one_personality_missing_path(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> +<personality><file>aabbccdd</file></personality></server>""" + request = self.deserializer.deserialize(serial_request) + expected = [{"contents": "aabbccdd"}] + self.assertEquals(request["server"]["personality"], expected) + + def test_request_with_one_personality_empty_contents(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> +<personality><file path="/etc/conf"></file></personality></server>""" + request = self.deserializer.deserialize(serial_request) + expected = [{"path": "/etc/conf", "contents": ""}] + self.assertEquals(request["server"]["personality"], expected) + + def test_request_with_one_personality_empty_contents_variation(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> +<personality><file path="/etc/conf"/></personality></server>""" + request = self.deserializer.deserialize(serial_request) + expected = [{"path": "/etc/conf", "contents": ""}] + self.assertEquals(request["server"]["personality"], expected) + + def test_request_with_one_metadata(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta key="alpha">beta</meta> + </metadata> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"alpha": "beta"} + self.assertEquals(request["server"]["metadata"], expected) + + def test_request_with_two_metadata(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta key="alpha">beta</meta> + <meta key="foo">bar</meta> + </metadata> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"alpha": "beta", "foo": "bar"} + self.assertEquals(request["server"]["metadata"], expected) + + def test_request_with_metadata_missing_value(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta key="alpha"></meta> + </metadata> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"alpha": ""} + self.assertEquals(request["server"]["metadata"], expected) + + def test_request_with_two_metadata_missing_value(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta key="alpha"/> + <meta key="delta"/> + </metadata> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"alpha": "", "delta": ""} + self.assertEquals(request["server"]["metadata"], expected) + + def test_request_with_metadata_missing_key(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta>beta</meta> + </metadata> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"": "beta"} + self.assertEquals(request["server"]["metadata"], expected) + + def test_request_with_two_metadata_missing_key(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta>beta</meta> + <meta>gamma</meta> + </metadata> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"": "gamma"} + self.assertEquals(request["server"]["metadata"], expected) + + def test_request_with_metadata_duplicate_key(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta key="foo">bar</meta> + <meta key="foo">baz</meta> + </metadata> +</server>""" + request = self.deserializer.deserialize(serial_request) + expected = {"foo": "baz"} + self.assertEquals(request["server"]["metadata"], expected) + + def test_canonical_request_from_docs(self): + serial_request = """ +<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" + name="new-server-test" imageId="1" flavorId="1"> + <metadata> + <meta key="My Server Name">Apache1</meta> + </metadata> + <personality> + <file path="/etc/banner.txt">\ +ICAgICAgDQoiQSBjbG91ZCBkb2VzIG5vdCBrbm93IHdoeSBp\ +dCBtb3ZlcyBpbiBqdXN0IHN1Y2ggYSBkaXJlY3Rpb24gYW5k\ +IGF0IHN1Y2ggYSBzcGVlZC4uLkl0IGZlZWxzIGFuIGltcHVs\ +c2lvbi4uLnRoaXMgaXMgdGhlIHBsYWNlIHRvIGdvIG5vdy4g\ +QnV0IHRoZSBza3kga25vd3MgdGhlIHJlYXNvbnMgYW5kIHRo\ +ZSBwYXR0ZXJucyBiZWhpbmQgYWxsIGNsb3VkcywgYW5kIHlv\ +dSB3aWxsIGtub3csIHRvbywgd2hlbiB5b3UgbGlmdCB5b3Vy\ +c2VsZiBoaWdoIGVub3VnaCB0byBzZWUgYmV5b25kIGhvcml6\ +b25zLiINCg0KLVJpY2hhcmQgQmFjaA==</file> + </personality> +</server>""" + expected = {"server": { + "name": "new-server-test", + "imageId": "1", + "flavorId": "1", + "metadata": { + "My Server Name": "Apache1", + }, + "personality": [ + { + "path": "/etc/banner.txt", + "contents": """\ +ICAgICAgDQoiQSBjbG91ZCBkb2VzIG5vdCBrbm93IHdoeSBp\ +dCBtb3ZlcyBpbiBqdXN0IHN1Y2ggYSBkaXJlY3Rpb24gYW5k\ +IGF0IHN1Y2ggYSBzcGVlZC4uLkl0IGZlZWxzIGFuIGltcHVs\ +c2lvbi4uLnRoaXMgaXMgdGhlIHBsYWNlIHRvIGdvIG5vdy4g\ +QnV0IHRoZSBza3kga25vd3MgdGhlIHJlYXNvbnMgYW5kIHRo\ +ZSBwYXR0ZXJucyBiZWhpbmQgYWxsIGNsb3VkcywgYW5kIHlv\ +dSB3aWxsIGtub3csIHRvbywgd2hlbiB5b3UgbGlmdCB5b3Vy\ +c2VsZiBoaWdoIGVub3VnaCB0byBzZWUgYmV5b25kIGhvcml6\ +b25zLiINCg0KLVJpY2hhcmQgQmFjaA==""", + }, + ], + }} + request = self.deserializer.deserialize(serial_request) + self.assertEqual(request, expected) + + +class TestServerInstanceCreation(test.TestCase): + + def setUp(self): + super(TestServerInstanceCreation, self).setUp() + self.stubs = stubout.StubOutForTesting() + fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthDatabase.data = {} + fakes.stub_out_auth(self.stubs) + fakes.stub_out_key_pair_funcs(self.stubs) + self.allow_admin = FLAGS.allow_admin_api + + def tearDown(self): + self.stubs.UnsetAll() + FLAGS.allow_admin_api = self.allow_admin + super(TestServerInstanceCreation, self).tearDown() + + def _setup_mock_compute_api_for_personality(self): + + class MockComputeAPI(object): + + def __init__(self): + self.injected_files = None + + def create(self, *args, **kwargs): + if 'injected_files' in kwargs: + self.injected_files = kwargs['injected_files'] + else: + self.injected_files = None + return [{'id': '1234', 'display_name': 'fakeinstance'}] + + def set_admin_password(self, *args, **kwargs): + pass + + def make_stub_method(canned_return): + def stub_method(*args, **kwargs): + return canned_return + return stub_method + + compute_api = MockComputeAPI() + self.stubs.Set(nova.compute, 'API', make_stub_method(compute_api)) + self.stubs.Set(nova.api.openstack.servers.Controller, + '_get_kernel_ramdisk_from_image', make_stub_method((1, 1))) + self.stubs.Set(nova.api.openstack.common, + 'get_image_id_from_image_hash', make_stub_method(2)) + return compute_api + + def _create_personality_request_dict(self, personality_files): + server = {} + server['name'] = 'new-server-test' + server['imageId'] = 1 + server['flavorId'] = 1 + if personality_files is not None: + personalities = [] + for path, contents in personality_files: + personalities.append({'path': path, 'contents': contents}) + server['personality'] = personalities + return {'server': server} + + def _get_create_request_json(self, body_dict): + req = webob.Request.blank('/v1.0/servers') + req.content_type = 'application/json' + req.method = 'POST' + req.body = json.dumps(body_dict) + return req + + def _run_create_instance_with_mock_compute_api(self, request): + compute_api = self._setup_mock_compute_api_for_personality() + response = request.get_response(fakes.wsgi_app()) + return compute_api, response + + def _format_xml_request_body(self, body_dict): + server = body_dict['server'] + body_parts = [] + body_parts.extend([ + '<?xml version="1.0" encoding="UTF-8"?>', + '<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0"', + ' name="%s" imageId="%s" flavorId="%s">' % ( + server['name'], server['imageId'], server['flavorId'])]) + if 'metadata' in server: + metadata = server['metadata'] + body_parts.append('<metadata>') + for item in metadata.iteritems(): + body_parts.append('<meta key="%s">%s</meta>' % item) + body_parts.append('</metadata>') + if 'personality' in server: + personalities = server['personality'] + body_parts.append('<personality>') + for file in personalities: + item = (file['path'], file['contents']) + body_parts.append('<file path="%s">%s</file>' % item) + body_parts.append('</personality>') + body_parts.append('</server>') + return ''.join(body_parts) + + def _get_create_request_xml(self, body_dict): + req = webob.Request.blank('/v1.0/servers') + req.content_type = 'application/xml' + req.accept = 'application/xml' + req.method = 'POST' + req.body = self._format_xml_request_body(body_dict) + return req + + def _create_instance_with_personality_json(self, personality): + body_dict = self._create_personality_request_dict(personality) + request = self._get_create_request_json(body_dict) + compute_api, response = \ + self._run_create_instance_with_mock_compute_api(request) + return request, response, compute_api.injected_files + + def _create_instance_with_personality_xml(self, personality): + body_dict = self._create_personality_request_dict(personality) + request = self._get_create_request_xml(body_dict) + compute_api, response = \ + self._run_create_instance_with_mock_compute_api(request) + return request, response, compute_api.injected_files + + def test_create_instance_with_no_personality(self): + request, response, injected_files = \ + self._create_instance_with_personality_json(personality=None) + self.assertEquals(response.status_int, 200) + self.assertEquals(injected_files, []) + + def test_create_instance_with_no_personality_xml(self): + request, response, injected_files = \ + self._create_instance_with_personality_xml(personality=None) + self.assertEquals(response.status_int, 200) + self.assertEquals(injected_files, []) + + def test_create_instance_with_personality(self): + path = '/my/file/path' + contents = '#!/bin/bash\necho "Hello, World!"\n' + b64contents = base64.b64encode(contents) + personality = [(path, b64contents)] + request, response, injected_files = \ + self._create_instance_with_personality_json(personality) + self.assertEquals(response.status_int, 200) + self.assertEquals(injected_files, [(path, contents)]) + + def test_create_instance_with_personality_xml(self): + path = '/my/file/path' + contents = '#!/bin/bash\necho "Hello, World!"\n' + b64contents = base64.b64encode(contents) + personality = [(path, b64contents)] + request, response, injected_files = \ + self._create_instance_with_personality_xml(personality) + self.assertEquals(response.status_int, 200) + self.assertEquals(injected_files, [(path, contents)]) + + def test_create_instance_with_personality_no_path(self): + personality = [('/remove/this/path', + base64.b64encode('my\n\file\ncontents'))] + body_dict = self._create_personality_request_dict(personality) + del body_dict['server']['personality'][0]['path'] + request = self._get_create_request_json(body_dict) + compute_api, response = \ + self._run_create_instance_with_mock_compute_api(request) + self.assertEquals(response.status_int, 400) + self.assertEquals(compute_api.injected_files, None) + + def _test_create_instance_with_personality_no_path_xml(self): + personality = [('/remove/this/path', + base64.b64encode('my\n\file\ncontents'))] + body_dict = self._create_personality_request_dict(personality) + request = self._get_create_request_xml(body_dict) + request.body = request.body.replace(' path="/remove/this/path"', '') + compute_api, response = \ + self._run_create_instance_with_mock_compute_api(request) + self.assertEquals(response.status_int, 400) + self.assertEquals(compute_api.injected_files, None) + + def test_create_instance_with_personality_no_contents(self): + personality = [('/test/path', + base64.b64encode('remove\nthese\ncontents'))] + body_dict = self._create_personality_request_dict(personality) + del body_dict['server']['personality'][0]['contents'] + request = self._get_create_request_json(body_dict) + compute_api, response = \ + self._run_create_instance_with_mock_compute_api(request) + self.assertEquals(response.status_int, 400) + self.assertEquals(compute_api.injected_files, None) + + def test_create_instance_with_personality_not_a_list(self): + personality = [('/test/path', base64.b64encode('test\ncontents\n'))] + body_dict = self._create_personality_request_dict(personality) + body_dict['server']['personality'] = \ + body_dict['server']['personality'][0] + request = self._get_create_request_json(body_dict) + compute_api, response = \ + self._run_create_instance_with_mock_compute_api(request) + self.assertEquals(response.status_int, 400) + self.assertEquals(compute_api.injected_files, None) + + def test_create_instance_with_personality_with_non_b64_content(self): + path = '/my/file/path' + contents = '#!/bin/bash\necho "Oh no!"\n' + personality = [(path, contents)] + request, response, injected_files = \ + self._create_instance_with_personality_json(personality) + self.assertEquals(response.status_int, 400) + self.assertEquals(injected_files, None) + + def test_create_instance_with_three_personalities(self): + files = [ + ('/etc/sudoers', 'ALL ALL=NOPASSWD: ALL\n'), + ('/etc/motd', 'Enjoy your root access!\n'), + ('/etc/dovecot.conf', 'dovecot\nconfig\nstuff\n'), + ] + personality = [] + for path, content in files: + personality.append((path, base64.b64encode(content))) + request, response, injected_files = \ + self._create_instance_with_personality_json(personality) + self.assertEquals(response.status_int, 200) + self.assertEquals(injected_files, files) + + def test_create_instance_personality_empty_content(self): + path = '/my/file/path' + contents = '' + personality = [(path, contents)] + request, response, injected_files = \ + self._create_instance_with_personality_json(personality) + self.assertEquals(response.status_int, 200) + self.assertEquals(injected_files, [(path, contents)]) + + def test_create_instance_admin_pass_json(self): + request, response, dummy = \ + self._create_instance_with_personality_json(None) + self.assertEquals(response.status_int, 200) + response = json.loads(response.body) + self.assertTrue('adminPass' in response['server']) + self.assertTrue(response['server']['adminPass'].startswith('fake')) + + def test_create_instance_admin_pass_xml(self): + request, response, dummy = \ + self._create_instance_with_personality_xml(None) + self.assertEquals(response.status_int, 200) + dom = minidom.parseString(response.body) + server = dom.childNodes[0] + self.assertEquals(server.nodeName, 'server') + self.assertTrue(server.getAttribute('adminPass').startswith('fake')) + + if __name__ == "__main__": unittest.main() diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py index 142f6b1c6..2d25d5fc5 100644 --- a/nova/tests/db/fakes.py +++ b/nova/tests/db/fakes.py @@ -28,13 +28,33 @@ def stub_out_db_instance_api(stubs): """ Stubs out the db API for creating Instances """ INSTANCE_TYPES = { - 'm1.tiny': dict(memory_mb=512, vcpus=1, local_gb=0, flavorid=1), - 'm1.small': dict(memory_mb=2048, vcpus=1, local_gb=20, flavorid=2), + 'm1.tiny': dict(memory_mb=512, + vcpus=1, + local_gb=0, + flavorid=1, + rxtx_cap=1), + 'm1.small': dict(memory_mb=2048, + vcpus=1, + local_gb=20, + flavorid=2, + rxtx_cap=2), 'm1.medium': - dict(memory_mb=4096, vcpus=2, local_gb=40, flavorid=3), - 'm1.large': dict(memory_mb=8192, vcpus=4, local_gb=80, flavorid=4), + dict(memory_mb=4096, + vcpus=2, + local_gb=40, + flavorid=3, + rxtx_cap=3), + 'm1.large': dict(memory_mb=8192, + vcpus=4, + local_gb=80, + flavorid=4, + rxtx_cap=4), 'm1.xlarge': - dict(memory_mb=16384, vcpus=8, local_gb=160, flavorid=5)} + dict(memory_mb=16384, + vcpus=8, + local_gb=160, + flavorid=5, + rxtx_cap=5)} class FakeModel(object): """ Stubs out for model """ @@ -77,8 +97,8 @@ def stub_out_db_instance_api(stubs): 'mac_address': values['mac_address'], 'vcpus': type_data['vcpus'], 'local_gb': type_data['local_gb'], - 'os_type': values['os_type'] - } + 'os_type': values['os_type']} + return FakeModel(base_options) def fake_network_get_by_instance(context, instance_id): diff --git a/nova/tests/integrated/__init__.py b/nova/tests/integrated/__init__.py new file mode 100644 index 000000000..10e0a91d7 --- /dev/null +++ b/nova/tests/integrated/__init__.py @@ -0,0 +1,20 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 Justin Santa Barbara +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`integrated` -- Tests whole systems, using mock services where needed +================================= +""" diff --git a/nova/tests/integrated/api/__init__.py b/nova/tests/integrated/api/__init__.py new file mode 100644 index 000000000..5798ab3d1 --- /dev/null +++ b/nova/tests/integrated/api/__init__.py @@ -0,0 +1,20 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 Justin Santa Barbara +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`api` -- OpenStack API client, for testing rather than production +================================= +""" diff --git a/nova/tests/integrated/api/client.py b/nova/tests/integrated/api/client.py new file mode 100644 index 000000000..245eb8c69 --- /dev/null +++ b/nova/tests/integrated/api/client.py @@ -0,0 +1,212 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 Justin Santa Barbara +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import httplib +import urlparse + +from nova import log as logging + + +LOG = logging.getLogger('nova.tests.api') + + +class OpenStackApiException(Exception): + def __init__(self, message=None, response=None): + self.response = response + if not message: + message = 'Unspecified error' + + if response: + _status = response.status + _body = response.read() + + message = _('%(message)s\nStatus Code: %(_status)s\n' + 'Body: %(_body)s') % locals() + + super(OpenStackApiException, self).__init__(message) + + +class OpenStackApiAuthenticationException(OpenStackApiException): + def __init__(self, response=None, message=None): + if not message: + message = _("Authentication error") + super(OpenStackApiAuthenticationException, self).__init__(message, + response) + + +class OpenStackApiNotFoundException(OpenStackApiException): + def __init__(self, response=None, message=None): + if not message: + message = _("Item not found") + super(OpenStackApiNotFoundException, self).__init__(message, response) + + +class TestOpenStackClient(object): + """ A really basic OpenStack API client that is under our control, + so we can make changes / insert hooks for testing""" + + def __init__(self, auth_user, auth_key, auth_uri): + super(TestOpenStackClient, self).__init__() + self.auth_result = None + self.auth_user = auth_user + self.auth_key = auth_key + self.auth_uri = auth_uri + + def request(self, url, method='GET', body=None, headers=None): + if headers is None: + headers = {} + + parsed_url = urlparse.urlparse(url) + port = parsed_url.port + hostname = parsed_url.hostname + scheme = parsed_url.scheme + + if scheme == 'http': + conn = httplib.HTTPConnection(hostname, + port=port) + elif scheme == 'https': + conn = httplib.HTTPSConnection(hostname, + port=port) + else: + raise OpenStackApiException("Unknown scheme: %s" % url) + + relative_url = parsed_url.path + if parsed_url.query: + relative_url = relative_url + parsed_url.query + LOG.info(_("Doing %(method)s on %(relative_url)s") % locals()) + if body: + LOG.info(_("Body: %s") % body) + + conn.request(method, relative_url, body, headers) + response = conn.getresponse() + return response + + def _authenticate(self): + if self.auth_result: + return self.auth_result + + auth_uri = self.auth_uri + headers = {'X-Auth-User': self.auth_user, + 'X-Auth-Key': self.auth_key} + response = self.request(auth_uri, + headers=headers) + + http_status = response.status + LOG.debug(_("%(auth_uri)s => code %(http_status)s") % locals()) + + # Until bug732866 is fixed, we can't check this properly... + #if http_status == 401: + if http_status != 204: + raise OpenStackApiAuthenticationException(response=response) + + auth_headers = {} + for k, v in response.getheaders(): + auth_headers[k] = v + + self.auth_result = auth_headers + return self.auth_result + + def api_request(self, relative_uri, check_response_status=None, **kwargs): + auth_result = self._authenticate() + + #NOTE(justinsb): httplib 'helpfully' converts headers to lower case + base_uri = auth_result['x-server-management-url'] + full_uri = base_uri + relative_uri + + headers = kwargs.setdefault('headers', {}) + headers['X-Auth-Token'] = auth_result['x-auth-token'] + + response = self.request(full_uri, **kwargs) + + http_status = response.status + LOG.debug(_("%(relative_uri)s => code %(http_status)s") % locals()) + + if check_response_status: + if not http_status in check_response_status: + if http_status == 404: + raise OpenStackApiNotFoundException(response=response) + else: + raise OpenStackApiException( + message=_("Unexpected status code"), + response=response) + + return response + + def _decode_json(self, response): + body = response.read() + LOG.debug(_("Decoding JSON: %s") % (body)) + return json.loads(body) + + def api_get(self, relative_uri, **kwargs): + kwargs.setdefault('check_response_status', [200]) + response = self.api_request(relative_uri, **kwargs) + return self._decode_json(response) + + def api_post(self, relative_uri, body, **kwargs): + kwargs['method'] = 'POST' + if body: + headers = kwargs.setdefault('headers', {}) + headers['Content-Type'] = 'application/json' + kwargs['body'] = json.dumps(body) + + kwargs.setdefault('check_response_status', [200]) + response = self.api_request(relative_uri, **kwargs) + return self._decode_json(response) + + def api_delete(self, relative_uri, **kwargs): + kwargs['method'] = 'DELETE' + kwargs.setdefault('check_response_status', [200, 202]) + return self.api_request(relative_uri, **kwargs) + + def get_server(self, server_id): + return self.api_get('/servers/%s' % server_id)['server'] + + def get_servers(self, detail=True): + rel_url = '/servers/detail' if detail else '/servers' + return self.api_get(rel_url)['servers'] + + def post_server(self, server): + return self.api_post('/servers', server)['server'] + + def delete_server(self, server_id): + return self.api_delete('/servers/%s' % server_id) + + def get_image(self, image_id): + return self.api_get('/images/%s' % image_id)['image'] + + def get_images(self, detail=True): + rel_url = '/images/detail' if detail else '/images' + return self.api_get(rel_url)['images'] + + def post_image(self, image): + return self.api_post('/images', image)['image'] + + def delete_image(self, image_id): + return self.api_delete('/images/%s' % image_id) + + def get_flavor(self, flavor_id): + return self.api_get('/flavors/%s' % flavor_id)['flavor'] + + def get_flavors(self, detail=True): + rel_url = '/flavors/detail' if detail else '/flavors' + return self.api_get(rel_url)['flavors'] + + def post_flavor(self, flavor): + return self.api_post('/flavors', flavor)['flavor'] + + def delete_flavor(self, flavor_id): + return self.api_delete('/flavors/%s' % flavor_id) diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index 643b2e93a..3651f4cef 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -20,6 +20,7 @@ Tests For Compute """ import datetime +import mox from nova import compute from nova import context @@ -27,15 +28,20 @@ from nova import db from nova import exception from nova import flags from nova import log as logging +from nova import rpc from nova import test from nova import utils from nova.auth import manager from nova.compute import instance_types +from nova.compute import manager as compute_manager +from nova.compute import power_state +from nova.db.sqlalchemy import models from nova.image import local LOG = logging.getLogger('nova.tests.compute') FLAGS = flags.FLAGS flags.DECLARE('stub_network', 'nova.compute.manager') +flags.DECLARE('live_migration_retry_count', 'nova.compute.manager') class ComputeTestCase(test.TestCase): @@ -83,6 +89,21 @@ class ComputeTestCase(test.TestCase): 'project_id': self.project.id} return db.security_group_create(self.context, values) + def _get_dummy_instance(self): + """Get mock-return-value instance object + Use this when any testcase executed later than test_run_terminate + """ + vol1 = models.Volume() + vol1['id'] = 1 + vol2 = models.Volume() + vol2['id'] = 2 + instance_ref = models.Instance() + instance_ref['id'] = 1 + instance_ref['volumes'] = [vol1, vol2] + instance_ref['hostname'] = 'i-00000001' + instance_ref['host'] = 'dummy' + return instance_ref + def test_create_instance_defaults_display_name(self): """Verify that an instance cannot be created without a display_name.""" cases = [dict(), dict(display_name=None)] @@ -301,3 +322,256 @@ class ComputeTestCase(test.TestCase): self.compute.terminate_instance(self.context, instance_id) type = instance_types.get_by_flavor_id("1") self.assertEqual(type, 'm1.tiny') + + def _setup_other_managers(self): + self.volume_manager = utils.import_object(FLAGS.volume_manager) + self.network_manager = utils.import_object(FLAGS.network_manager) + self.compute_driver = utils.import_object(FLAGS.compute_driver) + + def test_pre_live_migration_instance_has_no_fixed_ip(self): + """Confirm raising exception if instance doesn't have fixed_ip.""" + instance_ref = self._get_dummy_instance() + c = context.get_admin_context() + i_id = instance_ref['id'] + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_id).AndReturn(instance_ref) + dbmock.instance_get_fixed_address(c, i_id).AndReturn(None) + + self.compute.db = dbmock + self.mox.ReplayAll() + self.assertRaises(exception.NotFound, + self.compute.pre_live_migration, + c, instance_ref['id']) + + def test_pre_live_migration_instance_has_volume(self): + """Confirm setup_compute_volume is called when volume is mounted.""" + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + + self._setup_other_managers() + dbmock = self.mox.CreateMock(db) + volmock = self.mox.CreateMock(self.volume_manager) + netmock = self.mox.CreateMock(self.network_manager) + drivermock = self.mox.CreateMock(self.compute_driver) + + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy') + for i in range(len(i_ref['volumes'])): + vid = i_ref['volumes'][i]['id'] + volmock.setup_compute_volume(c, vid).InAnyOrder('g1') + netmock.setup_compute_network(c, i_ref['id']) + drivermock.ensure_filtering_rules_for_instance(i_ref) + + self.compute.db = dbmock + self.compute.volume_manager = volmock + self.compute.network_manager = netmock + self.compute.driver = drivermock + + self.mox.ReplayAll() + ret = self.compute.pre_live_migration(c, i_ref['id']) + self.assertEqual(ret, None) + + def test_pre_live_migration_instance_has_no_volume(self): + """Confirm log meg when instance doesn't mount any volumes.""" + i_ref = self._get_dummy_instance() + i_ref['volumes'] = [] + c = context.get_admin_context() + + self._setup_other_managers() + dbmock = self.mox.CreateMock(db) + netmock = self.mox.CreateMock(self.network_manager) + drivermock = self.mox.CreateMock(self.compute_driver) + + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy') + self.mox.StubOutWithMock(compute_manager.LOG, 'info') + compute_manager.LOG.info(_("%s has no volume."), i_ref['hostname']) + netmock.setup_compute_network(c, i_ref['id']) + drivermock.ensure_filtering_rules_for_instance(i_ref) + + self.compute.db = dbmock + self.compute.network_manager = netmock + self.compute.driver = drivermock + + self.mox.ReplayAll() + ret = self.compute.pre_live_migration(c, i_ref['id']) + self.assertEqual(ret, None) + + def test_pre_live_migration_setup_compute_node_fail(self): + """Confirm operation setup_compute_network() fails. + + It retries and raise exception when timeout exceeded. + + """ + + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + + self._setup_other_managers() + dbmock = self.mox.CreateMock(db) + netmock = self.mox.CreateMock(self.network_manager) + volmock = self.mox.CreateMock(self.volume_manager) + + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy') + for i in range(len(i_ref['volumes'])): + volmock.setup_compute_volume(c, i_ref['volumes'][i]['id']) + for i in range(FLAGS.live_migration_retry_count): + netmock.setup_compute_network(c, i_ref['id']).\ + AndRaise(exception.ProcessExecutionError()) + + self.compute.db = dbmock + self.compute.network_manager = netmock + self.compute.volume_manager = volmock + + self.mox.ReplayAll() + self.assertRaises(exception.ProcessExecutionError, + self.compute.pre_live_migration, + c, i_ref['id']) + + def test_live_migration_works_correctly_with_volume(self): + """Confirm check_for_export to confirm volume health check.""" + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + self.mox.StubOutWithMock(rpc, 'call') + rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export", + "args": {'instance_id': i_ref['id']}}) + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}) + self.mox.StubOutWithMock(self.compute.driver, 'live_migration') + self.compute.driver.live_migration(c, i_ref, i_ref['host'], + self.compute.post_live_migration, + self.compute.recover_live_migration) + + self.compute.db = dbmock + self.mox.ReplayAll() + ret = self.compute.live_migration(c, i_ref['id'], i_ref['host']) + self.assertEqual(ret, None) + + def test_live_migration_dest_raises_exception(self): + """Confirm exception when pre_live_migration fails.""" + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + self.mox.StubOutWithMock(rpc, 'call') + rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export", + "args": {'instance_id': i_ref['id']}}) + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}).\ + AndRaise(rpc.RemoteError('', '', '')) + dbmock.instance_update(c, i_ref['id'], {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': i_ref['host']}) + for v in i_ref['volumes']: + dbmock.volume_update(c, v['id'], {'status': 'in-use'}) + + self.compute.db = dbmock + self.mox.ReplayAll() + self.assertRaises(rpc.RemoteError, + self.compute.live_migration, + c, i_ref['id'], i_ref['host']) + + def test_live_migration_dest_raises_exception_no_volume(self): + """Same as above test(input pattern is different) """ + i_ref = self._get_dummy_instance() + i_ref['volumes'] = [] + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + self.mox.StubOutWithMock(rpc, 'call') + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}).\ + AndRaise(rpc.RemoteError('', '', '')) + dbmock.instance_update(c, i_ref['id'], {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': i_ref['host']}) + + self.compute.db = dbmock + self.mox.ReplayAll() + self.assertRaises(rpc.RemoteError, + self.compute.live_migration, + c, i_ref['id'], i_ref['host']) + + def test_live_migration_works_correctly_no_volume(self): + """Confirm live_migration() works as expected correctly.""" + i_ref = self._get_dummy_instance() + i_ref['volumes'] = [] + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + self.mox.StubOutWithMock(rpc, 'call') + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}) + self.mox.StubOutWithMock(self.compute.driver, 'live_migration') + self.compute.driver.live_migration(c, i_ref, i_ref['host'], + self.compute.post_live_migration, + self.compute.recover_live_migration) + + self.compute.db = dbmock + self.mox.ReplayAll() + ret = self.compute.live_migration(c, i_ref['id'], i_ref['host']) + self.assertEqual(ret, None) + + def test_post_live_migration_working_correctly(self): + """Confirm post_live_migration() works as expected correctly.""" + dest = 'desthost' + flo_addr = '1.2.1.2' + + # Preparing datas + c = context.get_admin_context() + instance_id = self._create_instance() + i_ref = db.instance_get(c, instance_id) + db.instance_update(c, i_ref['id'], {'state_description': 'migrating', + 'state': power_state.PAUSED}) + v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id}) + fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1', + 'instance_id': instance_id}) + fix_ref = db.fixed_ip_get_by_address(c, fix_addr) + flo_ref = db.floating_ip_create(c, {'address': flo_addr, + 'fixed_ip_id': fix_ref['id']}) + # reload is necessary before setting mocks + i_ref = db.instance_get(c, instance_id) + + # Preparing mocks + self.mox.StubOutWithMock(self.compute.volume_manager, + 'remove_compute_volume') + for v in i_ref['volumes']: + self.compute.volume_manager.remove_compute_volume(c, v['id']) + self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance') + self.compute.driver.unfilter_instance(i_ref) + + # executing + self.mox.ReplayAll() + ret = self.compute.post_live_migration(c, i_ref, dest) + + # make sure every data is rewritten to dest + i_ref = db.instance_get(c, i_ref['id']) + c1 = (i_ref['host'] == dest) + flo_refs = db.floating_ip_get_all_by_host(c, dest) + c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr) + + # post operaton + self.assertTrue(c1 and c2) + db.instance_destroy(c, instance_id) + db.volume_destroy(c, v_ref['id']) + db.floating_ip_destroy(c, flo_addr) diff --git a/nova/tests/test_misc.py b/nova/tests/test_misc.py index a658e4978..1fbaf304f 100644 --- a/nova/tests/test_misc.py +++ b/nova/tests/test_misc.py @@ -24,18 +24,19 @@ from nova.utils import parse_mailmap, str_dict_replace, synchronized class ProjectTestCase(test.TestCase): def test_authors_up_to_date(self): - if os.path.exists('.bzr'): + topdir = os.path.normpath(os.path.dirname(__file__) + '/../../') + if os.path.exists(os.path.join(topdir, '.bzr')): contributors = set() - mailmap = parse_mailmap('.mailmap') + mailmap = parse_mailmap(os.path.join(topdir, '.mailmap')) import bzrlib.workingtree - tree = bzrlib.workingtree.WorkingTree.open('.') + tree = bzrlib.workingtree.WorkingTree.open(topdir) tree.lock_read() try: parents = tree.get_parent_ids() g = tree.branch.repository.get_graph() - for p in parents[1:]: + for p in parents: rev_ids = [r for r, _ in g.iter_ancestry(parents) if r != "null:"] revs = tree.branch.repository.get_revisions(rev_ids) @@ -44,7 +45,8 @@ class ProjectTestCase(test.TestCase): email = author.split(' ')[-1] contributors.add(str_dict_replace(email, mailmap)) - authors_file = open('Authors', 'r').read() + authors_file = open(os.path.join(topdir, 'Authors'), + 'r').read() missing = set() for contributor in contributors: diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py index 45b544753..c65bc459d 100644 --- a/nova/tests/test_quota.py +++ b/nova/tests/test_quota.py @@ -33,6 +33,12 @@ FLAGS = flags.FLAGS class QuotaTestCase(test.TestCase): + + class StubImageService(object): + + def show(self, *args, **kwargs): + return {"properties": {}} + def setUp(self): super(QuotaTestCase, self).setUp() self.flags(connection_type='fake', @@ -193,3 +199,67 @@ class QuotaTestCase(test.TestCase): instance_type='m1.small', image_id='fake', metadata=metadata) + + def test_allowed_injected_files(self): + self.assertEqual( + quota.allowed_injected_files(self.context), + FLAGS.quota_max_injected_files) + + def _create_with_injected_files(self, files): + api = compute.API(image_service=self.StubImageService()) + api.create(self.context, min_count=1, max_count=1, + instance_type='m1.small', image_id='fake', + injected_files=files) + + def test_no_injected_files(self): + api = compute.API(image_service=self.StubImageService()) + api.create(self.context, instance_type='m1.small', image_id='fake') + + def test_max_injected_files(self): + files = [] + for i in xrange(FLAGS.quota_max_injected_files): + files.append(('/my/path%d' % i, 'config = test\n')) + self._create_with_injected_files(files) # no QuotaError + + def test_too_many_injected_files(self): + files = [] + for i in xrange(FLAGS.quota_max_injected_files + 1): + files.append(('/my/path%d' % i, 'my\ncontent%d\n' % i)) + self.assertRaises(quota.QuotaError, + self._create_with_injected_files, files) + + def test_allowed_injected_file_content_bytes(self): + self.assertEqual( + quota.allowed_injected_file_content_bytes(self.context), + FLAGS.quota_max_injected_file_content_bytes) + + def test_max_injected_file_content_bytes(self): + max = FLAGS.quota_max_injected_file_content_bytes + content = ''.join(['a' for i in xrange(max)]) + files = [('/test/path', content)] + self._create_with_injected_files(files) # no QuotaError + + def test_too_many_injected_file_content_bytes(self): + max = FLAGS.quota_max_injected_file_content_bytes + content = ''.join(['a' for i in xrange(max + 1)]) + files = [('/test/path', content)] + self.assertRaises(quota.QuotaError, + self._create_with_injected_files, files) + + def test_allowed_injected_file_path_bytes(self): + self.assertEqual( + quota.allowed_injected_file_path_bytes(self.context), + FLAGS.quota_max_injected_file_path_bytes) + + def test_max_injected_file_path_bytes(self): + max = FLAGS.quota_max_injected_file_path_bytes + path = ''.join(['a' for i in xrange(max)]) + files = [(path, 'config = quotatest')] + self._create_with_injected_files(files) # no QuotaError + + def test_too_many_injected_file_path_bytes(self): + max = FLAGS.quota_max_injected_file_path_bytes + path = ''.join(['a' for i in xrange(max + 1)]) + files = [(path, 'config = quotatest')] + self.assertRaises(quota.QuotaError, + self._create_with_injected_files, files) diff --git a/nova/tests/test_scheduler.py b/nova/tests/test_scheduler.py index bb279ac4b..244e43bd9 100644 --- a/nova/tests/test_scheduler.py +++ b/nova/tests/test_scheduler.py @@ -20,10 +20,12 @@ Tests For Scheduler """ import datetime +import mox from mox import IgnoreArg from nova import context from nova import db +from nova import exception from nova import flags from nova import service from nova import test @@ -32,11 +34,14 @@ from nova import utils from nova.auth import manager as auth_manager from nova.scheduler import manager from nova.scheduler import driver +from nova.compute import power_state +from nova.db.sqlalchemy import models FLAGS = flags.FLAGS flags.DECLARE('max_cores', 'nova.scheduler.simple') flags.DECLARE('stub_network', 'nova.compute.manager') +flags.DECLARE('instances_path', 'nova.compute.manager') class TestDriver(driver.Scheduler): @@ -54,6 +59,34 @@ class SchedulerTestCase(test.TestCase): super(SchedulerTestCase, self).setUp() self.flags(scheduler_driver='nova.tests.test_scheduler.TestDriver') + def _create_compute_service(self): + """Create compute-manager(ComputeNode and Service record).""" + ctxt = context.get_admin_context() + dic = {'host': 'dummy', 'binary': 'nova-compute', 'topic': 'compute', + 'report_count': 0, 'availability_zone': 'dummyzone'} + s_ref = db.service_create(ctxt, dic) + + dic = {'service_id': s_ref['id'], + 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100, + 'vcpus_used': 16, 'memory_mb_used': 32, 'local_gb_used': 10, + 'hypervisor_type': 'qemu', 'hypervisor_version': 12003, + 'cpu_info': ''} + db.compute_node_create(ctxt, dic) + + return db.service_get(ctxt, s_ref['id']) + + def _create_instance(self, **kwargs): + """Create a test instance""" + ctxt = context.get_admin_context() + inst = {} + inst['user_id'] = 'admin' + inst['project_id'] = kwargs.get('project_id', 'fake') + inst['host'] = kwargs.get('host', 'dummy') + inst['vcpus'] = kwargs.get('vcpus', 1) + inst['memory_mb'] = kwargs.get('memory_mb', 10) + inst['local_gb'] = kwargs.get('local_gb', 20) + return db.instance_create(ctxt, inst) + def test_fallback(self): scheduler = manager.SchedulerManager() self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) @@ -76,6 +109,73 @@ class SchedulerTestCase(test.TestCase): self.mox.ReplayAll() scheduler.named_method(ctxt, 'topic', num=7) + def test_show_host_resources_host_not_exit(self): + """A host given as an argument does not exists.""" + + scheduler = manager.SchedulerManager() + dest = 'dummydest' + ctxt = context.get_admin_context() + + try: + scheduler.show_host_resources(ctxt, dest) + except exception.NotFound, e: + c1 = (e.message.find(_("does not exist or is not a " + "compute node.")) >= 0) + self.assertTrue(c1) + + def _dic_is_equal(self, dic1, dic2, keys=None): + """Compares 2 dictionary contents(Helper method)""" + if not keys: + keys = ['vcpus', 'memory_mb', 'local_gb', + 'vcpus_used', 'memory_mb_used', 'local_gb_used'] + + for key in keys: + if not (dic1[key] == dic2[key]): + return False + return True + + def test_show_host_resources_no_project(self): + """No instance are running on the given host.""" + + scheduler = manager.SchedulerManager() + ctxt = context.get_admin_context() + s_ref = self._create_compute_service() + + result = scheduler.show_host_resources(ctxt, s_ref['host']) + + # result checking + c1 = ('resource' in result and 'usage' in result) + compute_node = s_ref['compute_node'][0] + c2 = self._dic_is_equal(result['resource'], compute_node) + c3 = result['usage'] == {} + self.assertTrue(c1 and c2 and c3) + db.service_destroy(ctxt, s_ref['id']) + + def test_show_host_resources_works_correctly(self): + """Show_host_resources() works correctly as expected.""" + + scheduler = manager.SchedulerManager() + ctxt = context.get_admin_context() + s_ref = self._create_compute_service() + i_ref1 = self._create_instance(project_id='p-01', host=s_ref['host']) + i_ref2 = self._create_instance(project_id='p-02', vcpus=3, + host=s_ref['host']) + + result = scheduler.show_host_resources(ctxt, s_ref['host']) + + c1 = ('resource' in result and 'usage' in result) + compute_node = s_ref['compute_node'][0] + c2 = self._dic_is_equal(result['resource'], compute_node) + c3 = result['usage'].keys() == ['p-01', 'p-02'] + keys = ['vcpus', 'memory_mb', 'local_gb'] + c4 = self._dic_is_equal(result['usage']['p-01'], i_ref1, keys) + c5 = self._dic_is_equal(result['usage']['p-02'], i_ref2, keys) + self.assertTrue(c1 and c2 and c3 and c4 and c5) + + db.service_destroy(ctxt, s_ref['id']) + db.instance_destroy(ctxt, i_ref1['id']) + db.instance_destroy(ctxt, i_ref2['id']) + class ZoneSchedulerTestCase(test.TestCase): """Test case for zone scheduler""" @@ -161,9 +261,15 @@ class SimpleDriverTestCase(test.TestCase): inst['project_id'] = self.project.id inst['instance_type'] = 'm1.tiny' inst['mac_address'] = utils.generate_mac() + inst['vcpus'] = kwargs.get('vcpus', 1) inst['ami_launch_index'] = 0 - inst['vcpus'] = 1 inst['availability_zone'] = kwargs.get('availability_zone', None) + inst['host'] = kwargs.get('host', 'dummy') + inst['memory_mb'] = kwargs.get('memory_mb', 20) + inst['local_gb'] = kwargs.get('local_gb', 30) + inst['launched_on'] = kwargs.get('launghed_on', 'dummy') + inst['state_description'] = kwargs.get('state_description', 'running') + inst['state'] = kwargs.get('state', power_state.RUNNING) return db.instance_create(self.context, inst)['id'] def _create_volume(self): @@ -173,6 +279,211 @@ class SimpleDriverTestCase(test.TestCase): vol['availability_zone'] = 'test' return db.volume_create(self.context, vol)['id'] + def _create_compute_service(self, **kwargs): + """Create a compute service.""" + + dic = {'binary': 'nova-compute', 'topic': 'compute', + 'report_count': 0, 'availability_zone': 'dummyzone'} + dic['host'] = kwargs.get('host', 'dummy') + s_ref = db.service_create(self.context, dic) + if 'created_at' in kwargs.keys() or 'updated_at' in kwargs.keys(): + t = datetime.datetime.utcnow() - datetime.timedelta(0) + dic['created_at'] = kwargs.get('created_at', t) + dic['updated_at'] = kwargs.get('updated_at', t) + db.service_update(self.context, s_ref['id'], dic) + + dic = {'service_id': s_ref['id'], + 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100, + 'vcpus_used': 16, 'local_gb_used': 10, + 'hypervisor_type': 'qemu', 'hypervisor_version': 12003, + 'cpu_info': ''} + dic['memory_mb_used'] = kwargs.get('memory_mb_used', 32) + dic['hypervisor_type'] = kwargs.get('hypervisor_type', 'qemu') + dic['hypervisor_version'] = kwargs.get('hypervisor_version', 12003) + db.compute_node_create(self.context, dic) + return db.service_get(self.context, s_ref['id']) + + def test_doesnt_report_disabled_hosts_as_up(self): + """Ensures driver doesn't find hosts before they are enabled""" + # NOTE(vish): constructing service without create method + # because we are going to use it without queue + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute') + s2 = db.service_get_by_args(self.context, 'host2', 'nova-compute') + db.service_update(self.context, s1['id'], {'disabled': True}) + db.service_update(self.context, s2['id'], {'disabled': True}) + hosts = self.scheduler.driver.hosts_up(self.context, 'compute') + self.assertEqual(0, len(hosts)) + compute1.kill() + compute2.kill() + + def test_reports_enabled_hosts_as_up(self): + """Ensures driver can find the hosts that are up""" + # NOTE(vish): constructing service without create method + # because we are going to use it without queue + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + hosts = self.scheduler.driver.hosts_up(self.context, 'compute') + self.assertEqual(2, len(hosts)) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_instance(self): + """Ensures the host with less cores gets the next one""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + instance_id1 = self._create_instance() + compute1.run_instance(self.context, instance_id1) + instance_id2 = self._create_instance() + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual(host, 'host2') + compute1.terminate_instance(self.context, instance_id1) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + compute2.kill() + + def test_specific_host_gets_instance(self): + """Ensures if you set availability_zone it launches on that zone""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + instance_id1 = self._create_instance() + compute1.run_instance(self.context, instance_id1) + instance_id2 = self._create_instance(availability_zone='nova:host1') + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual('host1', host) + compute1.terminate_instance(self.context, instance_id1) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + compute2.kill() + + def test_wont_sechedule_if_specified_host_is_down(self): + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute') + now = datetime.datetime.utcnow() + delta = datetime.timedelta(seconds=FLAGS.service_down_time * 2) + past = now - delta + db.service_update(self.context, s1['id'], {'updated_at': past}) + instance_id2 = self._create_instance(availability_zone='nova:host1') + self.assertRaises(driver.WillNotSchedule, + self.scheduler.driver.schedule_run_instance, + self.context, + instance_id2) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + + def test_will_schedule_on_disabled_host_if_specified(self): + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute') + db.service_update(self.context, s1['id'], {'disabled': True}) + instance_id2 = self._create_instance(availability_zone='nova:host1') + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual('host1', host) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + + def test_too_many_cores(self): + """Ensures we don't go over max cores""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + instance_ids1 = [] + instance_ids2 = [] + for index in xrange(FLAGS.max_cores): + instance_id = self._create_instance() + compute1.run_instance(self.context, instance_id) + instance_ids1.append(instance_id) + instance_id = self._create_instance() + compute2.run_instance(self.context, instance_id) + instance_ids2.append(instance_id) + instance_id = self._create_instance() + self.assertRaises(driver.NoValidHost, + self.scheduler.driver.schedule_run_instance, + self.context, + instance_id) + for instance_id in instance_ids1: + compute1.terminate_instance(self.context, instance_id) + for instance_id in instance_ids2: + compute2.terminate_instance(self.context, instance_id) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_volume(self): + """Ensures the host with less gigabytes gets the next one""" + volume1 = service.Service('host1', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume1.start() + volume2 = service.Service('host2', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume2.start() + volume_id1 = self._create_volume() + volume1.create_volume(self.context, volume_id1) + volume_id2 = self._create_volume() + host = self.scheduler.driver.schedule_create_volume(self.context, + volume_id2) + self.assertEqual(host, 'host2') + volume1.delete_volume(self.context, volume_id1) + db.volume_destroy(self.context, volume_id2) + dic = {'service_id': s_ref['id'], + 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100, + 'vcpus_used': 16, 'memory_mb_used': 12, 'local_gb_used': 10, + 'hypervisor_type': 'qemu', 'hypervisor_version': 12003, + 'cpu_info': ''} + def test_doesnt_report_disabled_hosts_as_up(self): """Ensures driver doesn't find hosts before they are enabled""" compute1 = self.start_service('compute', host='host1') @@ -316,3 +627,313 @@ class SimpleDriverTestCase(test.TestCase): volume2.delete_volume(self.context, volume_id) volume1.kill() volume2.kill() + + def test_scheduler_live_migration_with_volume(self): + """scheduler_live_migration() works correctly as expected. + + Also, checks instance state is changed from 'running' -> 'migrating'. + + """ + + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + dic = {'instance_id': instance_id, 'size': 1} + v_ref = db.volume_create(self.context, dic) + + # cannot check 2nd argument b/c the addresses of instance object + # is different. + driver_i = self.scheduler.driver + nocare = mox.IgnoreArg() + self.mox.StubOutWithMock(driver_i, '_live_migration_src_check') + self.mox.StubOutWithMock(driver_i, '_live_migration_dest_check') + self.mox.StubOutWithMock(driver_i, '_live_migration_common_check') + driver_i._live_migration_src_check(nocare, nocare) + driver_i._live_migration_dest_check(nocare, nocare, i_ref['host']) + driver_i._live_migration_common_check(nocare, nocare, i_ref['host']) + self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) + kwargs = {'instance_id': instance_id, 'dest': i_ref['host']} + rpc.cast(self.context, + db.queue_get_for(nocare, FLAGS.compute_topic, i_ref['host']), + {"method": 'live_migration', "args": kwargs}) + + self.mox.ReplayAll() + self.scheduler.live_migration(self.context, FLAGS.compute_topic, + instance_id=instance_id, + dest=i_ref['host']) + + i_ref = db.instance_get(self.context, instance_id) + self.assertTrue(i_ref['state_description'] == 'migrating') + db.instance_destroy(self.context, instance_id) + db.volume_destroy(self.context, v_ref['id']) + + def test_live_migration_src_check_instance_not_running(self): + """The instance given by instance_id is not running.""" + + instance_id = self._create_instance(state_description='migrating') + i_ref = db.instance_get(self.context, instance_id) + + try: + self.scheduler.driver._live_migration_src_check(self.context, + i_ref) + except exception.Invalid, e: + c = (e.message.find('is not running') > 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + + def test_live_migration_src_check_volume_node_not_alive(self): + """Raise exception when volume node is not alive.""" + + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + dic = {'instance_id': instance_id, 'size': 1} + v_ref = db.volume_create(self.context, {'instance_id': instance_id, + 'size': 1}) + t1 = datetime.datetime.utcnow() - datetime.timedelta(1) + dic = {'created_at': t1, 'updated_at': t1, 'binary': 'nova-volume', + 'topic': 'volume', 'report_count': 0} + s_ref = db.service_create(self.context, dic) + + try: + self.scheduler.driver.schedule_live_migration(self.context, + instance_id, + i_ref['host']) + except exception.Invalid, e: + c = (e.message.find('volume node is not alive') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.volume_destroy(self.context, v_ref['id']) + + def test_live_migration_src_check_compute_node_not_alive(self): + """Confirms src-compute node is alive.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + t = datetime.datetime.utcnow() - datetime.timedelta(10) + s_ref = self._create_compute_service(created_at=t, updated_at=t, + host=i_ref['host']) + + try: + self.scheduler.driver._live_migration_src_check(self.context, + i_ref) + except exception.Invalid, e: + c = (e.message.find('is not alive') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_src_check_works_correctly(self): + """Confirms this method finishes with no error.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host=i_ref['host']) + + ret = self.scheduler.driver._live_migration_src_check(self.context, + i_ref) + + self.assertTrue(ret == None) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_not_alive(self): + """Confirms exception raises in case dest host does not exist.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + t = datetime.datetime.utcnow() - datetime.timedelta(10) + s_ref = self._create_compute_service(created_at=t, updated_at=t, + host=i_ref['host']) + + try: + self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + i_ref['host']) + except exception.Invalid, e: + c = (e.message.find('is not alive') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_service_same_host(self): + """Confirms exceptioin raises in case dest and src is same host.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host=i_ref['host']) + + try: + self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + i_ref['host']) + except exception.Invalid, e: + c = (e.message.find('choose other host') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_service_lack_memory(self): + """Confirms exception raises when dest doesn't have enough memory.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host='somewhere', + memory_mb_used=12) + + try: + self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + 'somewhere') + except exception.NotEmpty, e: + c = (e.message.find('Unable to migrate') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_service_works_correctly(self): + """Confirms method finishes with no error.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host='somewhere', + memory_mb_used=5) + + ret = self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + 'somewhere') + self.assertTrue(ret == None) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_common_check_service_orig_not_exists(self): + """Destination host does not exist.""" + + dest = 'dummydest' + # mocks for live_migration_common_check() + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + t1 = datetime.datetime.utcnow() - datetime.timedelta(10) + s_ref = self._create_compute_service(created_at=t1, updated_at=t1, + host=dest) + + # mocks for mounted_on_same_shared_storage() + fpath = '/test/20110127120000' + self.mox.StubOutWithMock(driver, 'rpc', use_mock_anything=True) + topic = FLAGS.compute_topic + driver.rpc.call(mox.IgnoreArg(), + db.queue_get_for(self.context, topic, dest), + {"method": 'create_shared_storage_test_file'}).AndReturn(fpath) + driver.rpc.call(mox.IgnoreArg(), + db.queue_get_for(mox.IgnoreArg(), topic, i_ref['host']), + {"method": 'check_shared_storage_test_file', + "args": {'filename': fpath}}) + driver.rpc.call(mox.IgnoreArg(), + db.queue_get_for(mox.IgnoreArg(), topic, dest), + {"method": 'cleanup_shared_storage_test_file', + "args": {'filename': fpath}}) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except exception.Invalid, e: + c = (e.message.find('does not exist') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_common_check_service_different_hypervisor(self): + """Original host and dest host has different hypervisor type.""" + dest = 'dummydest' + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + + # compute service for destination + s_ref = self._create_compute_service(host=i_ref['host']) + # compute service for original host + s_ref2 = self._create_compute_service(host=dest, hypervisor_type='xen') + + # mocks + driver = self.scheduler.driver + self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage') + driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except exception.Invalid, e: + c = (e.message.find(_('Different hypervisor type')) >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.service_destroy(self.context, s_ref2['id']) + + def test_live_migration_common_check_service_different_version(self): + """Original host and dest host has different hypervisor version.""" + dest = 'dummydest' + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + + # compute service for destination + s_ref = self._create_compute_service(host=i_ref['host']) + # compute service for original host + s_ref2 = self._create_compute_service(host=dest, + hypervisor_version=12002) + + # mocks + driver = self.scheduler.driver + self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage') + driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except exception.Invalid, e: + c = (e.message.find(_('Older hypervisor version')) >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.service_destroy(self.context, s_ref2['id']) + + def test_live_migration_common_check_checking_cpuinfo_fail(self): + """Raise excetion when original host doen't have compatible cpu.""" + + dest = 'dummydest' + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + + # compute service for destination + s_ref = self._create_compute_service(host=i_ref['host']) + # compute service for original host + s_ref2 = self._create_compute_service(host=dest) + + # mocks + driver = self.scheduler.driver + self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage') + driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest) + self.mox.StubOutWithMock(rpc, 'call', use_mock_anything=True) + rpc.call(mox.IgnoreArg(), mox.IgnoreArg(), + {"method": 'compare_cpu', + "args": {'cpu_info': s_ref2['compute_node'][0]['cpu_info']}}).\ + AndRaise(rpc.RemoteError("doesn't have compatibility to", "", "")) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except rpc.RemoteError, e: + c = (e.message.find(_("doesn't have compatibility to")) >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.service_destroy(self.context, s_ref2['id']) diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py index 45d9afa6c..393f9d20b 100644 --- a/nova/tests/test_service.py +++ b/nova/tests/test_service.py @@ -30,6 +30,7 @@ from nova import rpc from nova import test from nova import service from nova import manager +from nova.compute import manager as compute_manager FLAGS = flags.FLAGS flags.DEFINE_string("fake_manager", "nova.tests.test_service.FakeManager", @@ -251,3 +252,43 @@ class ServiceTestCase(test.TestCase): serv.report_state() self.assert_(not serv.model_disconnected) + + def test_compute_can_update_available_resource(self): + """Confirm compute updates their record of compute-service table.""" + host = 'foo' + binary = 'nova-compute' + topic = 'compute' + + # Any mocks are not working without UnsetStubs() here. + self.mox.UnsetStubs() + ctxt = context.get_admin_context() + service_ref = db.service_create(ctxt, {'host': host, + 'binary': binary, + 'topic': topic}) + serv = service.Service(host, + binary, + topic, + 'nova.compute.manager.ComputeManager') + + # This testcase want to test calling update_available_resource. + # No need to call periodic call, then below variable must be set 0. + serv.report_interval = 0 + serv.periodic_interval = 0 + + # Creating mocks + self.mox.StubOutWithMock(service.rpc.Connection, 'instance') + service.rpc.Connection.instance(new=mox.IgnoreArg()) + service.rpc.Connection.instance(new=mox.IgnoreArg()) + self.mox.StubOutWithMock(serv.manager.driver, + 'update_available_resource') + serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) + + # Just doing start()-stop(), not confirm new db record is created, + # because update_available_resource() works only in + # libvirt environment. This testcase confirms + # update_available_resource() is called. Otherwise, mox complains. + self.mox.ReplayAll() + serv.start() + serv.stop() + + db.service_destroy(ctxt, service_ref['id']) diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py index 27893043a..b214f5ce7 100644 --- a/nova/tests/test_virt.py +++ b/nova/tests/test_virt.py @@ -14,22 +14,29 @@ # License for the specific language governing permissions and limitations # under the License. -import re +import eventlet +import mox import os +import re +import sys -import eventlet from xml.etree.ElementTree import fromstring as xml_to_tree from xml.dom.minidom import parseString as xml_to_dom from nova import context from nova import db +from nova import exception from nova import flags from nova import test from nova import utils from nova.api.ec2 import cloud from nova.auth import manager +from nova.compute import manager as compute_manager +from nova.compute import power_state +from nova.db.sqlalchemy import models from nova.virt import libvirt_conn +libvirt = None FLAGS = flags.FLAGS flags.DECLARE('instances_path', 'nova.compute.manager') @@ -104,11 +111,28 @@ class LibvirtConnTestCase(test.TestCase): libvirt_conn._late_load_cheetah() self.flags(fake_call=True) self.manager = manager.AuthManager() + + try: + pjs = self.manager.get_projects() + pjs = [p for p in pjs if p.name == 'fake'] + if 0 != len(pjs): + self.manager.delete_project(pjs[0]) + + users = self.manager.get_users() + users = [u for u in users if u.name == 'fake'] + if 0 != len(users): + self.manager.delete_user(users[0]) + except Exception, e: + pass + + users = self.manager.get_users() self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True) self.project = self.manager.create_project('fake', 'fake', 'fake') self.network = utils.import_object(FLAGS.network_manager) + self.context = context.get_admin_context() FLAGS.instances_path = '' + self.call_libvirt_dependant_setup = False test_ip = '10.11.12.13' test_instance = {'memory_kb': '1024000', @@ -120,6 +144,58 @@ class LibvirtConnTestCase(test.TestCase): 'bridge': 'br101', 'instance_type': 'm1.small'} + def lazy_load_library_exists(self): + """check if libvirt is available.""" + # try to connect libvirt. if fail, skip test. + try: + import libvirt + import libxml2 + except ImportError: + return False + global libvirt + libvirt = __import__('libvirt') + libvirt_conn.libvirt = __import__('libvirt') + libvirt_conn.libxml2 = __import__('libxml2') + return True + + def create_fake_libvirt_mock(self, **kwargs): + """Defining mocks for LibvirtConnection(libvirt is not used).""" + + # A fake libvirt.virConnect + class FakeLibvirtConnection(object): + pass + + # A fake libvirt_conn.IptablesFirewallDriver + class FakeIptablesFirewallDriver(object): + + def __init__(self, **kwargs): + pass + + def setattr(self, key, val): + self.__setattr__(key, val) + + # Creating mocks + fake = FakeLibvirtConnection() + fakeip = FakeIptablesFirewallDriver + # Customizing above fake if necessary + for key, val in kwargs.items(): + fake.__setattr__(key, val) + + # Inevitable mocks for libvirt_conn.LibvirtConnection + self.mox.StubOutWithMock(libvirt_conn.utils, 'import_class') + libvirt_conn.utils.import_class(mox.IgnoreArg()).AndReturn(fakeip) + self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn') + libvirt_conn.LibvirtConnection._conn = fake + + def create_service(self, **kwargs): + service_ref = {'host': kwargs.get('host', 'dummy'), + 'binary': 'nova-compute', + 'topic': 'compute', + 'report_count': 0, + 'availability_zone': 'zone'} + + return db.service_create(context.get_admin_context(), service_ref) + def test_xml_and_uri_no_ramdisk_no_kernel(self): instance_data = dict(self.test_instance) self._check_xml_and_uri(instance_data, @@ -259,8 +335,8 @@ class LibvirtConnTestCase(test.TestCase): expected_result, '%s failed common check %d' % (xml, i)) - # This test is supposed to make sure we don't override a specifically - # set uri + # This test is supposed to make sure we don't + # override a specifically set uri # # Deliberately not just assigning this string to FLAGS.libvirt_uri and # checking against that later on. This way we make sure the @@ -274,6 +350,150 @@ class LibvirtConnTestCase(test.TestCase): self.assertEquals(uri, testuri) db.instance_destroy(user_context, instance_ref['id']) + def test_update_available_resource_works_correctly(self): + """Confirm compute_node table is updated successfully.""" + org_path = FLAGS.instances_path = '' + FLAGS.instances_path = '.' + + # Prepare mocks + def getVersion(): + return 12003 + + def getType(): + return 'qemu' + + def listDomainsID(): + return [] + + service_ref = self.create_service(host='dummy') + self.create_fake_libvirt_mock(getVersion=getVersion, + getType=getType, + listDomainsID=listDomainsID) + self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, + 'get_cpu_info') + libvirt_conn.LibvirtConnection.get_cpu_info().AndReturn('cpuinfo') + + # Start test + self.mox.ReplayAll() + conn = libvirt_conn.LibvirtConnection(False) + conn.update_available_resource(self.context, 'dummy') + service_ref = db.service_get(self.context, service_ref['id']) + compute_node = service_ref['compute_node'][0] + + if sys.platform.upper() == 'LINUX2': + self.assertTrue(compute_node['vcpus'] >= 0) + self.assertTrue(compute_node['memory_mb'] > 0) + self.assertTrue(compute_node['local_gb'] > 0) + self.assertTrue(compute_node['vcpus_used'] == 0) + self.assertTrue(compute_node['memory_mb_used'] > 0) + self.assertTrue(compute_node['local_gb_used'] > 0) + self.assertTrue(len(compute_node['hypervisor_type']) > 0) + self.assertTrue(compute_node['hypervisor_version'] > 0) + else: + self.assertTrue(compute_node['vcpus'] >= 0) + self.assertTrue(compute_node['memory_mb'] == 0) + self.assertTrue(compute_node['local_gb'] > 0) + self.assertTrue(compute_node['vcpus_used'] == 0) + self.assertTrue(compute_node['memory_mb_used'] == 0) + self.assertTrue(compute_node['local_gb_used'] > 0) + self.assertTrue(len(compute_node['hypervisor_type']) > 0) + self.assertTrue(compute_node['hypervisor_version'] > 0) + + db.service_destroy(self.context, service_ref['id']) + FLAGS.instances_path = org_path + + def test_update_resource_info_no_compute_record_found(self): + """Raise exception if no recorde found on services table.""" + org_path = FLAGS.instances_path = '' + FLAGS.instances_path = '.' + self.create_fake_libvirt_mock() + + self.mox.ReplayAll() + conn = libvirt_conn.LibvirtConnection(False) + self.assertRaises(exception.Invalid, + conn.update_available_resource, + self.context, 'dummy') + + FLAGS.instances_path = org_path + + def test_ensure_filtering_rules_for_instance_timeout(self): + """ensure_filtering_fules_for_instance() finishes with timeout.""" + # Skip if non-libvirt environment + if not self.lazy_load_library_exists(): + return + + # Preparing mocks + def fake_none(self): + return + + def fake_raise(self): + raise libvirt.libvirtError('ERR') + + self.create_fake_libvirt_mock(nwfilterLookupByName=fake_raise) + instance_ref = db.instance_create(self.context, self.test_instance) + + # Start test + self.mox.ReplayAll() + try: + conn = libvirt_conn.LibvirtConnection(False) + conn.firewall_driver.setattr('setup_basic_filtering', fake_none) + conn.firewall_driver.setattr('prepare_instance_filter', fake_none) + conn.ensure_filtering_rules_for_instance(instance_ref) + except exception.Error, e: + c1 = (0 <= e.message.find('Timeout migrating for')) + self.assertTrue(c1) + + db.instance_destroy(self.context, instance_ref['id']) + + def test_live_migration_raises_exception(self): + """Confirms recover method is called when exceptions are raised.""" + # Skip if non-libvirt environment + if not self.lazy_load_library_exists(): + return + + # Preparing data + self.compute = utils.import_object(FLAGS.compute_manager) + instance_dict = {'host': 'fake', 'state': power_state.RUNNING, + 'state_description': 'running'} + instance_ref = db.instance_create(self.context, self.test_instance) + instance_ref = db.instance_update(self.context, instance_ref['id'], + instance_dict) + vol_dict = {'status': 'migrating', 'size': 1} + volume_ref = db.volume_create(self.context, vol_dict) + db.volume_attached(self.context, volume_ref['id'], instance_ref['id'], + '/dev/fake') + + # Preparing mocks + vdmock = self.mox.CreateMock(libvirt.virDomain) + self.mox.StubOutWithMock(vdmock, "migrateToURI") + vdmock.migrateToURI(FLAGS.live_migration_uri % 'dest', + mox.IgnoreArg(), + None, FLAGS.live_migration_bandwidth).\ + AndRaise(libvirt.libvirtError('ERR')) + + def fake_lookup(instance_name): + if instance_name == instance_ref.name: + return vdmock + + self.create_fake_libvirt_mock(lookupByName=fake_lookup) + + # Start test + self.mox.ReplayAll() + conn = libvirt_conn.LibvirtConnection(False) + self.assertRaises(libvirt.libvirtError, + conn._live_migration, + self.context, instance_ref, 'dest', '', + self.compute.recover_live_migration) + + instance_ref = db.instance_get(self.context, instance_ref['id']) + self.assertTrue(instance_ref['state_description'] == 'running') + self.assertTrue(instance_ref['state'] == power_state.RUNNING) + volume_ref = db.volume_get(self.context, volume_ref['id']) + self.assertTrue(volume_ref['status'] == 'in-use') + + db.volume_destroy(self.context, volume_ref['id']) + db.instance_destroy(self.context, instance_ref['id']) + def tearDown(self): self.manager.delete_project(self.project) self.manager.delete_user(self.user) diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py index f698c85b5..1b1d72092 100644 --- a/nova/tests/test_volume.py +++ b/nova/tests/test_volume.py @@ -20,6 +20,8 @@ Tests for Volume Code. """ +import cStringIO + from nova import context from nova import exception from nova import db @@ -173,3 +175,196 @@ class VolumeTestCase(test.TestCase): # each of them having a different FLAG for storage_node # This will allow us to test cross-node interactions pass + + +class DriverTestCase(test.TestCase): + """Base Test class for Drivers.""" + driver_name = "nova.volume.driver.FakeAOEDriver" + + def setUp(self): + super(DriverTestCase, self).setUp() + self.flags(volume_driver=self.driver_name, + logging_default_format_string="%(message)s") + self.volume = utils.import_object(FLAGS.volume_manager) + self.context = context.get_admin_context() + self.output = "" + + def _fake_execute(_command, *_args, **_kwargs): + """Fake _execute.""" + return self.output, None + self.volume.driver._execute = _fake_execute + self.volume.driver._sync_execute = _fake_execute + + log = logging.getLogger() + self.stream = cStringIO.StringIO() + log.addHandler(logging.StreamHandler(self.stream)) + + inst = {} + self.instance_id = db.instance_create(self.context, inst)['id'] + + def tearDown(self): + super(DriverTestCase, self).tearDown() + + def _attach_volume(self): + """Attach volumes to an instance. This function also sets + a fake log message.""" + return [] + + def _detach_volume(self, volume_id_list): + """Detach volumes from an instance.""" + for volume_id in volume_id_list: + db.volume_detached(self.context, volume_id) + self.volume.delete_volume(self.context, volume_id) + + +class AOETestCase(DriverTestCase): + """Test Case for AOEDriver""" + driver_name = "nova.volume.driver.AOEDriver" + + def setUp(self): + super(AOETestCase, self).setUp() + + def tearDown(self): + super(AOETestCase, self).tearDown() + + def _attach_volume(self): + """Attach volumes to an instance. This function also sets + a fake log message.""" + volume_id_list = [] + for index in xrange(3): + vol = {} + vol['size'] = 0 + volume_id = db.volume_create(self.context, + vol)['id'] + self.volume.create_volume(self.context, volume_id) + + # each volume has a different mountpoint + mountpoint = "/dev/sd" + chr((ord('b') + index)) + db.volume_attached(self.context, volume_id, self.instance_id, + mountpoint) + + (shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context, + volume_id) + self.output += "%s %s eth0 /dev/nova-volumes/vol-foo auto run\n" \ + % (shelf_id, blade_id) + + volume_id_list.append(volume_id) + + return volume_id_list + + def test_check_for_export_with_no_volume(self): + """No log message when no volume is attached to an instance.""" + self.stream.truncate(0) + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + + def test_check_for_export_with_all_vblade_processes(self): + """No log message when all the vblade processes are running.""" + volume_id_list = self._attach_volume() + + self.stream.truncate(0) + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + + self._detach_volume(volume_id_list) + + def test_check_for_export_with_vblade_process_missing(self): + """Output a warning message when some vblade processes aren't + running.""" + volume_id_list = self._attach_volume() + + # the first vblade process isn't running + self.output = self.output.replace("run", "down", 1) + (shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context, + volume_id_list[0]) + + msg_is_match = False + self.stream.truncate(0) + try: + self.volume.check_for_export(self.context, self.instance_id) + except exception.ProcessExecutionError, e: + volume_id = volume_id_list[0] + msg = _("Cannot confirm exported volume id:%(volume_id)s. " + "vblade process for e%(shelf_id)s.%(blade_id)s " + "isn't running.") % locals() + + msg_is_match = (0 <= e.message.find(msg)) + + self.assertTrue(msg_is_match) + self._detach_volume(volume_id_list) + + +class ISCSITestCase(DriverTestCase): + """Test Case for ISCSIDriver""" + driver_name = "nova.volume.driver.ISCSIDriver" + + def setUp(self): + super(ISCSITestCase, self).setUp() + + def tearDown(self): + super(ISCSITestCase, self).tearDown() + + def _attach_volume(self): + """Attach volumes to an instance. This function also sets + a fake log message.""" + volume_id_list = [] + for index in xrange(3): + vol = {} + vol['size'] = 0 + vol_ref = db.volume_create(self.context, vol) + self.volume.create_volume(self.context, vol_ref['id']) + vol_ref = db.volume_get(self.context, vol_ref['id']) + + # each volume has a different mountpoint + mountpoint = "/dev/sd" + chr((ord('b') + index)) + db.volume_attached(self.context, vol_ref['id'], self.instance_id, + mountpoint) + volume_id_list.append(vol_ref['id']) + + return volume_id_list + + def test_check_for_export_with_no_volume(self): + """No log message when no volume is attached to an instance.""" + self.stream.truncate(0) + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + + def test_check_for_export_with_all_volume_exported(self): + """No log message when all the vblade processes are running.""" + volume_id_list = self._attach_volume() + + self.mox.StubOutWithMock(self.volume.driver, '_execute') + for i in volume_id_list: + tid = db.volume_get_iscsi_target_num(self.context, i) + self.volume.driver._execute("sudo ietadm --op show --tid=%(tid)d" + % locals()) + + self.stream.truncate(0) + self.mox.ReplayAll() + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + self.mox.UnsetStubs() + + self._detach_volume(volume_id_list) + + def test_check_for_export_with_some_volume_missing(self): + """Output a warning message when some volumes are not recognied + by ietd.""" + volume_id_list = self._attach_volume() + + # the first vblade process isn't running + tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0]) + self.mox.StubOutWithMock(self.volume.driver, '_execute') + self.volume.driver._execute("sudo ietadm --op show --tid=%(tid)d" + % locals()).AndRaise(exception.ProcessExecutionError()) + + self.mox.ReplayAll() + self.assertRaises(exception.ProcessExecutionError, + self.volume.check_for_export, + self.context, + self.instance_id) + msg = _("Cannot confirm exported volume id:%s.") % volume_id_list[0] + self.assertTrue(0 <= self.stream.getvalue().find(msg)) + self.mox.UnsetStubs() + + self._detach_volume(volume_id_list) diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index cd125a301..66a973a78 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -78,8 +78,7 @@ class XenAPIVolumeTestCase(test.TestCase): 'ramdisk_id': 3, 'instance_type': 'm1.large', 'mac_address': 'aa:bb:cc:dd:ee:ff', - 'os_type': 'linux' - } + 'os_type': 'linux'} def _create_volume(self, size='0'): """Create a volume object.""" @@ -315,8 +314,8 @@ class XenAPIVMTestCase(test.TestCase): 'ramdisk_id': ramdisk_id, 'instance_type': instance_type, 'mac_address': 'aa:bb:cc:dd:ee:ff', - 'os_type': os_type - } + 'os_type': os_type} + conn = xenapi_conn.get_connection(False) instance = db.instance_create(values) conn.spawn(instance) @@ -362,6 +361,14 @@ class XenAPIVMTestCase(test.TestCase): glance_stubs.FakeGlance.IMAGE_RAMDISK) self.check_vm_params_for_linux_with_external_kernel() + def test_spawn_with_network_qos(self): + self._create_instance() + for vif_ref in xenapi_fake.get_all('VIF'): + vif_rec = xenapi_fake.get_record('VIF', vif_ref) + self.assertEquals(vif_rec['qos_algorithm_type'], 'ratelimit') + self.assertEquals(vif_rec['qos_algorithm_params']['kbps'], + str(4 * 1024)) + def tearDown(self): super(XenAPIVMTestCase, self).tearDown() self.manager.delete_project(self.project) @@ -440,8 +447,8 @@ class XenAPIMigrateInstance(test.TestCase): 'ramdisk_id': None, 'instance_type': 'm1.large', 'mac_address': 'aa:bb:cc:dd:ee:ff', - 'os_type': 'linux' - } + 'os_type': 'linux'} + stubs.stub_out_migration_methods(self.stubs) glance_stubs.stubout_glance_client(self.stubs, glance_stubs.FakeGlance) diff --git a/nova/utils.py b/nova/utils.py index 87e726394..24b8da9ea 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -262,13 +262,25 @@ def generate_mac(): return ':'.join(map(lambda x: "%02x" % x, mac)) -def generate_password(length=20): - """Generate a random sequence of letters and digits - to be used as a password. Note that this is not intended - to represent the ultimate in security. +# Default symbols to use for passwords. Avoids visually confusing characters. +# ~6 bits per symbol +DEFAULT_PASSWORD_SYMBOLS = ("23456789" # Removed: 0,1 + "ABCDEFGHJKLMNPQRSTUVWXYZ" # Removed: I, O + "abcdefghijkmnopqrstuvwxyz") # Removed: l + + +# ~5 bits per symbol +EASIER_PASSWORD_SYMBOLS = ("23456789" # Removed: 0, 1 + "ABCDEFGHJKLMNPQRSTUVWXYZ") # Removed: I, O + + +def generate_password(length=20, symbols=DEFAULT_PASSWORD_SYMBOLS): + """Generate a random password from the supplied symbols. + + Believed to be reasonably secure (with a reasonable password length!) """ - chrs = string.letters + string.digits - return "".join([random.choice(chrs) for i in xrange(length)]) + r = random.SystemRandom() + return "".join([r.choice(symbols) for _i in xrange(length)]) def last_octet(address): @@ -518,6 +530,9 @@ def synchronized(name): def wrap(f): @functools.wraps(f) def inner(*args, **kwargs): + LOG.debug(_("Attempting to grab %(lock)s for method " + "%(method)s..." % {"lock": name, + "method": f.__name__})) lock = lockfile.FileLock(os.path.join(FLAGS.lock_path, 'nova-%s.lock' % name)) with lock: @@ -526,18 +541,6 @@ def synchronized(name): return wrap -def ensure_b64_encoding(val): - """Safety method to ensure that values expected to be base64-encoded - actually are. If they are, the value is returned unchanged. Otherwise, - the encoded value is returned. - """ - try: - dummy = base64.decode(val) - return val - except TypeError: - return base64.b64encode(val) - - def get_from_path(items, path): """ Returns a list of items matching the specified path. Takes an XPath-like expression e.g. prop1/prop2/prop3, and for each item in items, diff --git a/nova/virt/cpuinfo.xml.template b/nova/virt/cpuinfo.xml.template new file mode 100644 index 000000000..48842b29d --- /dev/null +++ b/nova/virt/cpuinfo.xml.template @@ -0,0 +1,9 @@ +<cpu> + <arch>$arch</arch> + <model>$model</model> + <vendor>$vendor</vendor> + <topology sockets="$topology.sockets" cores="$topology.cores" threads="$topology.threads"/> +#for $var in $features + <features name="$var" /> +#end for +</cpu> diff --git a/nova/virt/fake.py b/nova/virt/fake.py index c744acf91..3a06284a1 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -407,6 +407,27 @@ class FakeConnection(object): """ return True + def update_available_resource(self, ctxt, host): + """This method is supported only by libvirt.""" + return + + def compare_cpu(self, xml): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def ensure_filtering_rules_for_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def live_migration(self, context, instance_ref, dest, + post_method, recover_method): + """This method is supported only by libvirt.""" + return + + def unfilter_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + class FakeInstance(object): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 700a6bf9d..0a85da541 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -36,10 +36,13 @@ Supports KVM, QEMU, UML, and XEN. """ +import multiprocessing import os import shutil +import sys import random import subprocess +import time import uuid from xml.dom import minidom @@ -69,6 +72,7 @@ Template = None LOG = logging.getLogger('nova.virt.libvirt_conn') FLAGS = flags.FLAGS +flags.DECLARE('live_migration_retry_count', 'nova.compute.manager') # TODO(vish): These flags should probably go into a shared location flags.DEFINE_string('rescue_image_id', 'ami-rescue', 'Rescue ami image') flags.DEFINE_string('rescue_kernel_id', 'aki-rescue', 'Rescue aki image') @@ -99,6 +103,17 @@ flags.DEFINE_string('ajaxterm_portrange', flags.DEFINE_string('firewall_driver', 'nova.virt.libvirt_conn.IptablesFirewallDriver', 'Firewall driver (defaults to iptables)') +flags.DEFINE_string('cpuinfo_xml_template', + utils.abspath('virt/cpuinfo.xml.template'), + 'CpuInfo XML Template (Used only live migration now)') +flags.DEFINE_string('live_migration_uri', + "qemu+tcp://%s/system", + 'Define protocol used by live_migration feature') +flags.DEFINE_string('live_migration_flag', + "VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER", + 'Define live migration behavior.') +flags.DEFINE_integer('live_migration_bandwidth', 0, + 'Define live migration behavior') def get_connection(read_only): @@ -145,6 +160,7 @@ class LibvirtConnection(object): self.libvirt_uri = self.get_uri() self.libvirt_xml = open(FLAGS.libvirt_xml_template).read() + self.cpuinfo_xml = open(FLAGS.cpuinfo_xml_template).read() self._wrapped_conn = None self.read_only = read_only @@ -346,19 +362,19 @@ class LibvirtConnection(object): @exception.wrap_exception def pause(self, instance, callback): - raise exception.APIError("pause not supported for libvirt.") + raise exception.ApiError("pause not supported for libvirt.") @exception.wrap_exception def unpause(self, instance, callback): - raise exception.APIError("unpause not supported for libvirt.") + raise exception.ApiError("unpause not supported for libvirt.") @exception.wrap_exception def suspend(self, instance, callback): - raise exception.APIError("suspend not supported for libvirt") + raise exception.ApiError("suspend not supported for libvirt") @exception.wrap_exception def resume(self, instance, callback): - raise exception.APIError("resume not supported for libvirt") + raise exception.ApiError("resume not supported for libvirt") @exception.wrap_exception def rescue(self, instance, callback=None): @@ -486,7 +502,7 @@ class LibvirtConnection(object): cmd = 'netcat', '0.0.0.0', port, '-w', '1' try: stdout, stderr = utils.execute(*cmd, process_input='') - except ProcessExecutionError: + except exception.ProcessExecutionError: return port raise Exception(_('Unable to find an open port')) @@ -763,7 +779,7 @@ class LibvirtConnection(object): 'cpu_time': cpu_time} def get_diagnostics(self, instance_name): - raise exception.APIError(_("diagnostics are not supported " + raise exception.ApiError(_("diagnostics are not supported " "for libvirt")) def get_disks(self, instance_name): @@ -850,6 +866,159 @@ class LibvirtConnection(object): return interfaces + def get_vcpu_total(self): + """Get vcpu number of physical computer. + + :returns: the number of cpu core. + + """ + + # On certain platforms, this will raise a NotImplementedError. + try: + return multiprocessing.cpu_count() + except NotImplementedError: + LOG.warn(_("Cannot get the number of cpu, because this " + "function is not implemented for this platform. " + "This error can be safely ignored for now.")) + return 0 + + def get_memory_mb_total(self): + """Get the total memory size(MB) of physical computer. + + :returns: the total amount of memory(MB). + + """ + + if sys.platform.upper() != 'LINUX2': + return 0 + + meminfo = open('/proc/meminfo').read().split() + idx = meminfo.index('MemTotal:') + # transforming kb to mb. + return int(meminfo[idx + 1]) / 1024 + + def get_local_gb_total(self): + """Get the total hdd size(GB) of physical computer. + + :returns: + The total amount of HDD(GB). + Note that this value shows a partition where + NOVA-INST-DIR/instances mounts. + + """ + + hddinfo = os.statvfs(FLAGS.instances_path) + return hddinfo.f_frsize * hddinfo.f_blocks / 1024 / 1024 / 1024 + + def get_vcpu_used(self): + """ Get vcpu usage number of physical computer. + + :returns: The total number of vcpu that currently used. + + """ + + total = 0 + for dom_id in self._conn.listDomainsID(): + dom = self._conn.lookupByID(dom_id) + total += len(dom.vcpus()[1]) + return total + + def get_memory_mb_used(self): + """Get the free memory size(MB) of physical computer. + + :returns: the total usage of memory(MB). + + """ + + if sys.platform.upper() != 'LINUX2': + return 0 + + m = open('/proc/meminfo').read().split() + idx1 = m.index('MemFree:') + idx2 = m.index('Buffers:') + idx3 = m.index('Cached:') + avail = (int(m[idx1 + 1]) + int(m[idx2 + 1]) + int(m[idx3 + 1])) / 1024 + return self.get_memory_mb_total() - avail + + def get_local_gb_used(self): + """Get the free hdd size(GB) of physical computer. + + :returns: + The total usage of HDD(GB). + Note that this value shows a partition where + NOVA-INST-DIR/instances mounts. + + """ + + hddinfo = os.statvfs(FLAGS.instances_path) + avail = hddinfo.f_frsize * hddinfo.f_bavail / 1024 / 1024 / 1024 + return self.get_local_gb_total() - avail + + def get_hypervisor_type(self): + """Get hypervisor type. + + :returns: hypervisor type (ex. qemu) + + """ + + return self._conn.getType() + + def get_hypervisor_version(self): + """Get hypervisor version. + + :returns: hypervisor version (ex. 12003) + + """ + + return self._conn.getVersion() + + def get_cpu_info(self): + """Get cpuinfo information. + + Obtains cpu feature from virConnect.getCapabilities, + and returns as a json string. + + :return: see above description + + """ + + xml = self._conn.getCapabilities() + xml = libxml2.parseDoc(xml) + nodes = xml.xpathEval('//host/cpu') + if len(nodes) != 1: + raise exception.Invalid(_("Invalid xml. '<cpu>' must be 1," + "but %d\n") % len(nodes) + + xml.serialize()) + + cpu_info = dict() + cpu_info['arch'] = xml.xpathEval('//host/cpu/arch')[0].getContent() + cpu_info['model'] = xml.xpathEval('//host/cpu/model')[0].getContent() + cpu_info['vendor'] = xml.xpathEval('//host/cpu/vendor')[0].getContent() + + topology_node = xml.xpathEval('//host/cpu/topology')[0]\ + .get_properties() + topology = dict() + while topology_node: + name = topology_node.get_name() + topology[name] = topology_node.getContent() + topology_node = topology_node.get_next() + + keys = ['cores', 'sockets', 'threads'] + tkeys = topology.keys() + if set(tkeys) != set(keys): + ks = ', '.join(keys) + raise exception.Invalid(_("Invalid xml: topology(%(topology)s) " + "must have %(ks)s") % locals()) + + feature_nodes = xml.xpathEval('//host/cpu/feature') + features = list() + for nodes in feature_nodes: + features.append(nodes.get_properties().getContent()) + + cpu_info['topology'] = topology + cpu_info['features'] = features + return utils.dumps(cpu_info) + def block_stats(self, instance_name, disk): """ Note that this function takes an instance name, not an Instance, so @@ -880,6 +1049,207 @@ class LibvirtConnection(object): def refresh_security_group_members(self, security_group_id): self.firewall_driver.refresh_security_group_members(security_group_id) + def update_available_resource(self, ctxt, host): + """Updates compute manager resource info on ComputeNode table. + + This method is called when nova-coompute launches, and + whenever admin executes "nova-manage service update_resource". + + :param ctxt: security context + :param host: hostname that compute manager is currently running + + """ + + try: + service_ref = db.service_get_all_compute_by_host(ctxt, host)[0] + except exception.NotFound: + raise exception.Invalid(_("Cannot update compute manager " + "specific info, because no service " + "record was found.")) + + # Updating host information + dic = {'vcpus': self.get_vcpu_total(), + 'memory_mb': self.get_memory_mb_total(), + 'local_gb': self.get_local_gb_total(), + 'vcpus_used': self.get_vcpu_used(), + 'memory_mb_used': self.get_memory_mb_used(), + 'local_gb_used': self.get_local_gb_used(), + 'hypervisor_type': self.get_hypervisor_type(), + 'hypervisor_version': self.get_hypervisor_version(), + 'cpu_info': self.get_cpu_info()} + + compute_node_ref = service_ref['compute_node'] + if not compute_node_ref: + LOG.info(_('Compute_service record created for %s ') % host) + dic['service_id'] = service_ref['id'] + db.compute_node_create(ctxt, dic) + else: + LOG.info(_('Compute_service record updated for %s ') % host) + db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic) + + def compare_cpu(self, cpu_info): + """Checks the host cpu is compatible to a cpu given by xml. + + "xml" must be a part of libvirt.openReadonly().getCapabilities(). + return values follows by virCPUCompareResult. + if 0 > return value, do live migration. + 'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult' + + :param cpu_info: json string that shows cpu feature(see get_cpu_info()) + :returns: + None. if given cpu info is not compatible to this server, + raise exception. + + """ + + LOG.info(_('Instance launched has CPU info:\n%s') % cpu_info) + dic = utils.loads(cpu_info) + xml = str(Template(self.cpuinfo_xml, searchList=dic)) + LOG.info(_('to xml...\n:%s ' % xml)) + + u = "http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult" + m = _("CPU doesn't have compatibility.\n\n%(ret)s\n\nRefer to %(u)s") + # unknown character exists in xml, then libvirt complains + try: + ret = self._conn.compareCPU(xml, 0) + except libvirt.libvirtError, e: + ret = e.message + LOG.error(m % locals()) + raise + + if ret <= 0: + raise exception.Invalid(m % locals()) + + return + + def ensure_filtering_rules_for_instance(self, instance_ref): + """Setting up filtering rules and waiting for its completion. + + To migrate an instance, filtering rules to hypervisors + and firewalls are inevitable on destination host. + ( Waiting only for filterling rules to hypervisor, + since filtering rules to firewall rules can be set faster). + + Concretely, the below method must be called. + - setup_basic_filtering (for nova-basic, etc.) + - prepare_instance_filter(for nova-instance-instance-xxx, etc.) + + to_xml may have to be called since it defines PROJNET, PROJMASK. + but libvirt migrates those value through migrateToURI(), + so , no need to be called. + + Don't use thread for this method since migration should + not be started when setting-up filtering rules operations + are not completed. + + :params instance_ref: nova.db.sqlalchemy.models.Instance object + + """ + + # If any instances never launch at destination host, + # basic-filtering must be set here. + self.firewall_driver.setup_basic_filtering(instance_ref) + # setting up n)ova-instance-instance-xx mainly. + self.firewall_driver.prepare_instance_filter(instance_ref) + + # wait for completion + timeout_count = range(FLAGS.live_migration_retry_count) + while timeout_count: + try: + filter_name = 'nova-instance-%s' % instance_ref.name + self._conn.nwfilterLookupByName(filter_name) + break + except libvirt.libvirtError: + timeout_count.pop() + if len(timeout_count) == 0: + ec2_id = instance_ref['hostname'] + iname = instance_ref.name + msg = _('Timeout migrating for %(ec2_id)s(%(iname)s)') + raise exception.Error(msg % locals()) + time.sleep(1) + + def live_migration(self, ctxt, instance_ref, dest, + post_method, recover_method): + """Spawning live_migration operation for distributing high-load. + + :params ctxt: security context + :params instance_ref: + nova.db.sqlalchemy.models.Instance object + instance object that is migrated. + :params dest: destination host + :params post_method: + post operation method. + expected nova.compute.manager.post_live_migration. + :params recover_method: + recovery method when any exception occurs. + expected nova.compute.manager.recover_live_migration. + + """ + + greenthread.spawn(self._live_migration, ctxt, instance_ref, dest, + post_method, recover_method) + + def _live_migration(self, ctxt, instance_ref, dest, + post_method, recover_method): + """Do live migration. + + :params ctxt: security context + :params instance_ref: + nova.db.sqlalchemy.models.Instance object + instance object that is migrated. + :params dest: destination host + :params post_method: + post operation method. + expected nova.compute.manager.post_live_migration. + :params recover_method: + recovery method when any exception occurs. + expected nova.compute.manager.recover_live_migration. + + """ + + # Do live migration. + try: + flaglist = FLAGS.live_migration_flag.split(',') + flagvals = [getattr(libvirt, x.strip()) for x in flaglist] + logical_sum = reduce(lambda x, y: x | y, flagvals) + + if self.read_only: + tmpconn = self._connect(self.libvirt_uri, False) + dom = tmpconn.lookupByName(instance_ref.name) + dom.migrateToURI(FLAGS.live_migration_uri % dest, + logical_sum, + None, + FLAGS.live_migration_bandwidth) + tmpconn.close() + else: + dom = self._conn.lookupByName(instance_ref.name) + dom.migrateToURI(FLAGS.live_migration_uri % dest, + logical_sum, + None, + FLAGS.live_migration_bandwidth) + + except Exception: + recover_method(ctxt, instance_ref) + raise + + # Waiting for completion of live_migration. + timer = utils.LoopingCall(f=None) + + def wait_for_live_migration(): + """waiting for live migration completion""" + try: + self.get_info(instance_ref.name)['state'] + except exception.NotFound: + timer.stop() + post_method(ctxt, instance_ref, dest) + + timer.f = wait_for_live_migration + timer.start(interval=0.5, now=True) + + def unfilter_instance(self, instance_ref): + """See comments of same method in firewall_driver.""" + self.firewall_driver.unfilter_instance(instance_ref) + class FirewallDriver(object): def prepare_instance_filter(self, instance): @@ -1228,6 +1598,8 @@ class IptablesFirewallDriver(FirewallDriver): self.iptables.ipv4['filter'].add_chain('sg-fallback') self.iptables.ipv4['filter'].add_rule('sg-fallback', '-j DROP') + self.iptables.ipv6['filter'].add_chain('sg-fallback') + self.iptables.ipv6['filter'].add_rule('sg-fallback', '-j DROP') def setup_basic_filtering(self, instance): """Use NWFilter from libvirt for this.""" diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 1f03b4124..28ce215d8 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -136,8 +136,7 @@ class VMHelper(HelperBase): 'VCPUs_at_startup': vcpus, 'VCPUs_max': vcpus, 'VCPUs_params': {}, - 'xenstore_data': {} - } + 'xenstore_data': {}} # Complete VM configuration record according to the image type # non-raw/raw with PV kernel/raw in HVM mode @@ -234,7 +233,8 @@ class VMHelper(HelperBase): raise StorageError(_('Unable to destroy VBD %s') % vbd_ref) @classmethod - def create_vif(cls, session, vm_ref, network_ref, mac_address, dev): + def create_vif(cls, session, vm_ref, network_ref, mac_address, + dev, rxtx_cap=0): """Create a VIF record. Returns a Deferred that gives the new VIF reference.""" vif_rec = {} @@ -244,8 +244,9 @@ class VMHelper(HelperBase): vif_rec['MAC'] = mac_address vif_rec['MTU'] = '1500' vif_rec['other_config'] = {} - vif_rec['qos_algorithm_type'] = '' - vif_rec['qos_algorithm_params'] = {} + vif_rec['qos_algorithm_type'] = "ratelimit" if rxtx_cap else '' + vif_rec['qos_algorithm_params'] = \ + {"kbps": str(rxtx_cap * 1024)} if rxtx_cap else {} LOG.debug(_('Creating VIF for VM %(vm_ref)s,' ' network %(network_ref)s.') % locals()) vif_ref = session.call_xenapi('VIF.create', vif_rec) diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index a9a6800b1..a35d36b9e 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -19,6 +19,7 @@ Management class for VM-related functions (spawn, reboot, etc). """ +import base64 import json import M2Crypto import os @@ -131,12 +132,7 @@ class VMOps(object): # TODO(tr3buchet) - check to make sure we have network info, otherwise # create it now. This goes away once nova-multi-nic hits. if network_info is None: - admin_context = context.get_admin_context() - IPs = db.fixed_ip_get_all_by_instance(admin_context, - instance['id']) - networks = db.network_get_all_by_instance(admin_context, - instance['id']) - network_info = self._get_network_info(instance, networks, IPs) + network_info = self._get_network_info(instance) self.inject_network_info(instance, vm_ref, network_info) self.create_vifs(vm_ref, network_info) @@ -145,19 +141,20 @@ class VMOps(object): LOG.info(_('Spawning VM %(instance_name)s created %(vm_ref)s.') % locals()) - def _inject_onset_files(): - onset_files = instance.onset_files - if onset_files: + def _inject_files(): + injected_files = instance.injected_files + if injected_files: # Check if this is a JSON-encoded string and convert if needed. - if isinstance(onset_files, basestring): + if isinstance(injected_files, basestring): try: - onset_files = json.loads(onset_files) + injected_files = json.loads(injected_files) except ValueError: - LOG.exception(_("Invalid value for onset_files: '%s'") - % onset_files) - onset_files = [] + LOG.exception( + _("Invalid value for injected_files: '%s'") + % injected_files) + injected_files = [] # Inject any files, if specified - for path, contents in instance.onset_files: + for path, contents in instance.injected_files: LOG.debug(_("Injecting file path: '%s'") % path) self.inject_file(instance, path, contents) # NOTE(armando): Do we really need to do this in virt? @@ -173,7 +170,7 @@ class VMOps(object): if state == power_state.RUNNING: LOG.debug(_('Instance %s: booted'), instance_name) timer.stop() - _inject_onset_files() + _inject_files() return True except Exception, exc: LOG.warn(exc) @@ -417,17 +414,16 @@ class VMOps(object): raise RuntimeError(resp_dict['message']) return resp_dict['message'] - def inject_file(self, instance, b64_path, b64_contents): + def inject_file(self, instance, path, contents): """Write a file to the VM instance. The path to which it is to be - written and the contents of the file need to be supplied; both should + written and the contents of the file need to be supplied; both will be base64-encoded to prevent errors with non-ASCII characters being transmitted. If the agent does not support file injection, or the user has disabled it, a NotImplementedError will be raised. """ - # Files/paths *should* be base64-encoded at this point, but - # double-check to make sure. - b64_path = utils.ensure_b64_encoding(b64_path) - b64_contents = utils.ensure_b64_encoding(b64_contents) + # Files/paths must be base64-encoded for transmission to agent + b64_path = base64.b64encode(path) + b64_contents = base64.b64encode(contents) # Need to uniquely identify this request. transaction_id = str(uuid.uuid4()) @@ -694,9 +690,15 @@ class VMOps(object): return 'http://fakeajaxconsole/fake_url' # TODO(tr3buchet) - remove this function after nova multi-nic - def _get_network_info(self, instance, networks, IPs): + def _get_network_info(self, instance): """creates network info list for instance""" - + admin_context = context.get_admin_context() + IPs = db.fixed_ip_get_all_by_instance(admin_context, + instance['id']) + networks = db.network_get_all_by_instance(admin_context, + instance['id']) + flavor = db.instance_type_get_by_name(admin_context, + instance.['instance_type']) network_info = [] for network in networks: network_IPs = [ip for ip in IPs if ip.network_id == network.id] @@ -718,6 +720,7 @@ class VMOps(object): 'label': network['label'], 'gateway': network['gateway'], 'mac': instance.mac_address, + 'rxtx_cap': flavor['rxtx_cap'], 'dns': [network['dns']], 'ips': [ip_dict(ip) for ip in network_IPs], 'ip6s': [ip6_dict(ip) for ip in network_IPs]} @@ -758,11 +761,12 @@ class VMOps(object): for (network, info) in network_info: mac_address = info['mac'] bridge = network['bridge'] + rxtx_cap = info.pop('rxtx_cap') network_ref = \ NetworkHelper.find_network_with_bridge(self._session, bridge) VMHelper.create_vif(self._session, vm_ref, network_ref, - mac_address, device) + mac_address, device, rxtx_cap) device += 1 def reset_network(self, instance, vm_ref): diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index bfe290be3..da42a83b6 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -269,6 +269,27 @@ class XenAPIConnection(object): 'username': FLAGS.xenapi_connection_username, 'password': FLAGS.xenapi_connection_password} + def update_available_resource(self, ctxt, host): + """This method is supported only by libvirt.""" + return + + def compare_cpu(self, xml): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def ensure_filtering_rules_for_instance(self, instance_ref): + """This method is supported only libvirt.""" + return + + def live_migration(self, context, instance_ref, dest, + post_method, recover_method): + """This method is supported only by libvirt.""" + return + + def unfilter_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" diff --git a/nova/volume/driver.py b/nova/volume/driver.py index 45cc800e7..7b4bacdec 100644 --- a/nova/volume/driver.py +++ b/nova/volume/driver.py @@ -143,6 +143,10 @@ class VolumeDriver(object): """Undiscover volume on a remote host.""" raise NotImplementedError() + def check_for_export(self, context, volume_id): + """Make sure volume is exported.""" + raise NotImplementedError() + class AOEDriver(VolumeDriver): """Implements AOE specific volume commands.""" @@ -198,15 +202,45 @@ class AOEDriver(VolumeDriver): self._try_execute('sudo', 'vblade-persist', 'destroy', shelf_id, blade_id) - def discover_volume(self, _volume): + def discover_volume(self, context, _volume): """Discover volume on a remote host.""" - self._execute('sudo', 'aoe-discover') - self._execute('sudo', 'aoe-stat', check_exit_code=False) + (shelf_id, + blade_id) = self.db.volume_get_shelf_and_blade(context, + _volume['id']) + self._execute("sudo aoe-discover") + out, err = self._execute("sudo aoe-stat", check_exit_code=False) + device_path = 'e%(shelf_id)d.%(blade_id)d' % locals() + if out.find(device_path) >= 0: + return "/dev/etherd/%s" % device_path + else: + return def undiscover_volume(self, _volume): """Undiscover volume on a remote host.""" pass + def check_for_export(self, context, volume_id): + """Make sure volume is exported.""" + (shelf_id, + blade_id) = self.db.volume_get_shelf_and_blade(context, + volume_id) + cmd = "sudo vblade-persist ls --no-header" + out, _err = self._execute(cmd) + exported = False + for line in out.split('\n'): + param = line.split(' ') + if len(param) == 6 and param[0] == str(shelf_id) \ + and param[1] == str(blade_id) and param[-1] == "run": + exported = True + break + if not exported: + # Instance will be terminated in this case. + desc = _("Cannot confirm exported volume id:%(volume_id)s. " + "vblade process for e%(shelf_id)s.%(blade_id)s " + "isn't running.") % locals() + raise exception.ProcessExecutionError(out, _err, cmd=cmd, + description=desc) + class FakeAOEDriver(AOEDriver): """Logs calls instead of executing.""" @@ -402,7 +436,7 @@ class ISCSIDriver(VolumeDriver): (property_key, property_value)) return self._run_iscsiadm(iscsi_properties, iscsi_command) - def discover_volume(self, volume): + def discover_volume(self, context, volume): """Discover volume on a remote host.""" iscsi_properties = self._get_iscsi_properties(volume) @@ -461,6 +495,20 @@ class ISCSIDriver(VolumeDriver): self._run_iscsiadm(iscsi_properties, "--logout") self._run_iscsiadm(iscsi_properties, "--op delete") + def check_for_export(self, context, volume_id): + """Make sure volume is exported.""" + + tid = self.db.volume_get_iscsi_target_num(context, volume_id) + try: + self._execute("sudo ietadm --op show --tid=%(tid)d" % locals()) + except exception.ProcessExecutionError, e: + # Instances remount read-only in this case. + # /etc/init.d/iscsitarget restart and rebooting nova-volume + # is better since ensure_export() works at boot time. + logging.error(_("Cannot confirm exported volume " + "id:%(volume_id)s.") % locals()) + raise + class FakeISCSIDriver(ISCSIDriver): """Logs calls instead of executing.""" diff --git a/nova/volume/manager.py b/nova/volume/manager.py index 3e8bc16b3..9dea35b35 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -160,7 +160,7 @@ class VolumeManager(manager.Manager): if volume_ref['host'] == self.host and FLAGS.use_local_volumes: path = self.driver.local_path(volume_ref) else: - path = self.driver.discover_volume(volume_ref) + path = self.driver.discover_volume(context, volume_ref) return path def remove_compute_volume(self, context, volume_id): @@ -171,3 +171,9 @@ class VolumeManager(manager.Manager): return True else: self.driver.undiscover_volume(volume_ref) + + def check_for_export(self, context, instance_id): + """Make sure whether volume is exported.""" + instance_ref = self.db.instance_get(context, instance_id) + for volume in instance_ref['volumes']: + self.driver.check_for_export(context, volume['id']) diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance index c996f6ef4..db39cb0f4 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance @@ -216,7 +216,7 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port, os_type): 'x-image-meta-status': 'queued', 'x-image-meta-disk-format': 'vhd', 'x-image-meta-container-format': 'ovf', - 'x-image-meta-property-os-type': os_type + 'x-image-meta-property-os-type': os_type, } for header, value in headers.iteritems(): |