diff options
author | Justin Santa Barbara <justin@fathomdb.com> | 2011-03-14 22:29:30 -0700 |
---|---|---|
committer | Justin Santa Barbara <justin@fathomdb.com> | 2011-03-14 22:29:30 -0700 |
commit | 855da579147709b8070811751b3b3f684f9f78d6 (patch) | |
tree | d2039e0d03d6182c9eb9114ba4d777a82c361805 | |
parent | 2b20306fcaddcb6b9bc57fb55b17230d709cd1ce (diff) | |
parent | 5da32f8b917d461388d0186af52946a3f7d2c665 (diff) | |
download | nova-855da579147709b8070811751b3b3f684f9f78d6.tar.gz nova-855da579147709b8070811751b3b3f684f9f78d6.tar.xz nova-855da579147709b8070811751b3b3f684f9f78d6.zip |
Merged with trunk. Had to hold bazaar's hand as it got lost again.
51 files changed, 4654 insertions, 649 deletions
diff --git a/MANIFEST.in b/MANIFEST.in index 2ceed34f3..bf30d1546 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -25,6 +25,7 @@ include nova/db/sqlalchemy/migrate_repo/migrate.cfg include nova/db/sqlalchemy/migrate_repo/README include nova/virt/interfaces.template include nova/virt/libvirt*.xml.template +include nova/virt/cpuinfo.xml.template include nova/tests/CA/ include nova/tests/CA/cacert.pem include nova/tests/CA/private/ diff --git a/bin/nova-dhcpbridge b/bin/nova-dhcpbridge index 3dd9de367..7ef51feba 100755 --- a/bin/nova-dhcpbridge +++ b/bin/nova-dhcpbridge @@ -94,7 +94,7 @@ def init_leases(interface): """Get the list of hosts for an interface.""" ctxt = context.get_admin_context() network_ref = db.network_get_by_bridge(ctxt, interface) - return linux_net.get_dhcp_hosts(ctxt, network_ref['id']) + return linux_net.get_dhcp_leases(ctxt, network_ref['id']) def main(): diff --git a/bin/nova-manage b/bin/nova-manage index e001552d5..1eb4e5418 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -276,7 +276,7 @@ def _db_error(caught_exception): print caught_exception print _("The above error may show that the database has not " "been created.\nPlease create a database using " - "nova-manage sync db before running this command.") + "'nova-manage db sync' before running this command.") exit(1) @@ -437,6 +437,8 @@ class ProjectCommands(object): "been created.\nPlease create a database by running a " "nova-api server on this host.") +AccountCommands = ProjectCommands + class FixedIpCommands(object): """Class for managing fixed ip.""" @@ -558,6 +560,40 @@ class NetworkCommands(object): db.network_delete_safe(context.get_admin_context(), network.id) +class VmCommands(object): + """Class for mangaging VM instances.""" + + def live_migration(self, ec2_id, dest): + """Migrates a running instance to a new machine. + + :param ec2_id: instance id which comes from euca-describe-instance. + :param dest: destination host name. + + """ + + ctxt = context.get_admin_context() + instance_id = ec2_id_to_id(ec2_id) + + if FLAGS.connection_type != 'libvirt': + msg = _('Only KVM is supported for now. Sorry!') + raise exception.Error(msg) + + if (FLAGS.volume_driver != 'nova.volume.driver.AOEDriver' and \ + FLAGS.volume_driver != 'nova.volume.driver.ISCSIDriver'): + msg = _("Support only AOEDriver and ISCSIDriver. Sorry!") + raise exception.Error(msg) + + rpc.call(ctxt, + FLAGS.scheduler_topic, + {"method": "live_migration", + "args": {"instance_id": instance_id, + "dest": dest, + "topic": FLAGS.compute_topic}}) + + print _('Migration of %s initiated.' + 'Check its progress using euca-describe-instances.') % ec2_id + + class ServiceCommands(object): """Enable and disable running services""" @@ -602,6 +638,59 @@ class ServiceCommands(object): return db.service_update(ctxt, svc['id'], {'disabled': True}) + def describe_resource(self, host): + """Describes cpu/memory/hdd info for host. + + :param host: hostname. + + """ + + result = rpc.call(context.get_admin_context(), + FLAGS.scheduler_topic, + {"method": "show_host_resources", + "args": {"host": host}}) + + if type(result) != dict: + print _('An unexpected error has occurred.') + print _('[Result]'), result + else: + cpu = result['resource']['vcpus'] + mem = result['resource']['memory_mb'] + hdd = result['resource']['local_gb'] + cpu_u = result['resource']['vcpus_used'] + mem_u = result['resource']['memory_mb_used'] + hdd_u = result['resource']['local_gb_used'] + + print 'HOST\t\t\tPROJECT\t\tcpu\tmem(mb)\tdisk(gb)' + print '%s(total)\t\t\t%s\t%s\t%s' % (host, cpu, mem, hdd) + print '%s(used)\t\t\t%s\t%s\t%s' % (host, cpu_u, mem_u, hdd_u) + for p_id, val in result['usage'].items(): + print '%s\t\t%s\t\t%s\t%s\t%s' % (host, + p_id, + val['vcpus'], + val['memory_mb'], + val['local_gb']) + + def update_resource(self, host): + """Updates available vcpu/memory/disk info for host. + + :param host: hostname. + + """ + + ctxt = context.get_admin_context() + service_refs = db.service_get_all_by_host(ctxt, host) + if len(service_refs) <= 0: + raise exception.Invalid(_('%s does not exist.') % host) + + service_refs = [s for s in service_refs if s['topic'] == 'compute'] + if len(service_refs) <= 0: + raise exception.Invalid(_('%s is not compute node.') % host) + + rpc.call(ctxt, + db.queue_get_for(ctxt, FLAGS.compute_topic, host), + {"method": "update_available_resource"}) + class LogCommands(object): def request(self, request_id, logfile='/var/log/nova.log'): @@ -898,6 +987,7 @@ class ImageCommands(object): CATEGORIES = [ ('user', UserCommands), + ('account', AccountCommands), ('project', ProjectCommands), ('role', RoleCommands), ('shell', ShellCommands), @@ -905,6 +995,7 @@ CATEGORIES = [ ('fixed', FixedIpCommands), ('floating', FloatingIpCommands), ('network', NetworkCommands), + ('vm', VmCommands), ('service', ServiceCommands), ('log', LogCommands), ('db', DbCommands), diff --git a/contrib/nova.sh b/contrib/nova.sh index d6c9b1081..55dfb971c 100755 --- a/contrib/nova.sh +++ b/contrib/nova.sh @@ -76,6 +76,7 @@ if [ "$CMD" == "install" ]; then sudo apt-get install -y python-migrate python-eventlet python-gflags python-ipy python-tempita sudo apt-get install -y python-libvirt python-libxml2 python-routes python-cheetah sudo apt-get install -y python-netaddr python-paste python-pastedeploy python-glance + sudo apt-get install -y python-multiprocessing if [ "$USE_IPV6" == 1 ]; then sudo apt-get install -y radvd diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index b1917e9ea..40a9da0e7 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -147,8 +147,6 @@ class CloudController(object): instance_ref['id']) ec2_id = ec2utils.id_to_ec2_id(instance_ref['id']) image_ec2_id = self._image_ec2_id(instance_ref['image_id'], 'machine') - k_ec2_id = self._image_ec2_id(instance_ref['kernel_id'], 'kernel') - r_ec2_id = self._image_ec2_id(instance_ref['ramdisk_id'], 'ramdisk') data = { 'user-data': base64.b64decode(instance_ref['user_data']), 'meta-data': { @@ -167,8 +165,6 @@ class CloudController(object): 'instance-type': instance_ref['instance_type'], 'local-hostname': hostname, 'local-ipv4': address, - 'kernel-id': k_ec2_id, - 'ramdisk-id': r_ec2_id, 'placement': {'availability-zone': availability_zone}, 'public-hostname': hostname, 'public-ipv4': floating_ip or '', @@ -176,6 +172,13 @@ class CloudController(object): 'reservation-id': instance_ref['reservation_id'], 'security-groups': '', 'mpi': mpi}} + + for image_type in ['kernel', 'ramdisk']: + if '%s_id' % image_type in instance_ref: + ec2_id = self._image_ec2_id(instance_ref['%s_id' % image_type], + image_type) + data['meta-data']['%s-id' % image_type] = ec2_id + if False: # TODO(vish): store ancestor ids data['ancestor-ami-ids'] = [] if False: # TODO(vish): store product codes @@ -562,7 +565,7 @@ class CloudController(object): if context.is_admin: v['status'] = '%s (%s, %s, %s, %s)' % ( volume['status'], - volume['user_id'], + volume['project_id'], volume['host'], instance_data, volume['mountpoint']) diff --git a/nova/api/openstack/__init__.py b/nova/api/openstack/__init__.py index ab9dbb780..ce3cff337 100644 --- a/nova/api/openstack/__init__.py +++ b/nova/api/openstack/__init__.py @@ -27,6 +27,7 @@ import webob.exc from nova import flags from nova import log as logging from nova import wsgi +from nova.api.openstack import accounts from nova.api.openstack import faults from nova.api.openstack import backup_schedules from nova.api.openstack import consoles @@ -34,6 +35,7 @@ from nova.api.openstack import flavors from nova.api.openstack import images from nova.api.openstack import servers from nova.api.openstack import shared_ip_groups +from nova.api.openstack import users from nova.api.openstack import zones @@ -89,6 +91,13 @@ class APIRouter(wsgi.Router): mapper.resource("zone", "zones", controller=zones.Controller(), collection={'detail': 'GET', 'info': 'GET'}), + mapper.resource("user", "users", controller=users.Controller(), + collection={'detail': 'GET'}) + + mapper.resource("account", "accounts", + controller=accounts.Controller(), + collection={'detail': 'GET'}) + mapper.resource("server", "servers", controller=servers.Controller(), collection={'detail': 'GET'}, member=server_members) diff --git a/nova/api/openstack/accounts.py b/nova/api/openstack/accounts.py new file mode 100644 index 000000000..2510ffb61 --- /dev/null +++ b/nova/api/openstack/accounts.py @@ -0,0 +1,85 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import common + +from nova import exception +from nova import flags +from nova import log as logging +from nova import wsgi + +from nova.auth import manager +from nova.api.openstack import faults + +FLAGS = flags.FLAGS +LOG = logging.getLogger('nova.api.openstack') + + +def _translate_keys(account): + return dict(id=account.id, + name=account.name, + description=account.description, + manager=account.project_manager_id) + + +class Controller(wsgi.Controller): + + _serialization_metadata = { + 'application/xml': { + "attributes": { + "account": ["id", "name", "description", "manager"]}}} + + def __init__(self): + self.manager = manager.AuthManager() + + def _check_admin(self, context): + """We cannot depend on the db layer to check for admin access + for the auth manager, so we do it here""" + if not context.is_admin: + raise exception.NotAuthorized(_("Not admin user.")) + + def index(self, req): + raise faults.Fault(exc.HTTPNotImplemented()) + + def detail(self, req): + raise faults.Fault(exc.HTTPNotImplemented()) + + def show(self, req, id): + """Return data about the given account id""" + account = self.manager.get_project(id) + return dict(account=_translate_keys(account)) + + def delete(self, req, id): + self._check_admin(req.environ['nova.context']) + self.manager.delete_project(id) + return {} + + def create(self, req): + """We use update with create-or-update semantics + because the id comes from an external source""" + raise faults.Fault(exc.HTTPNotImplemented()) + + def update(self, req, id): + """This is really create or update.""" + self._check_admin(req.environ['nova.context']) + env = self._deserialize(req.body, req.get_content_type()) + description = env['account'].get('description') + manager = env['account'].get('manager') + try: + account = self.manager.get_project(id) + self.manager.modify_project(id, manager, description) + except exception.NotFound: + account = self.manager.create_project(id, manager, description) + return dict(account=_translate_keys(account)) diff --git a/nova/api/openstack/auth.py b/nova/api/openstack/auth.py index de8905f46..4c6b58eff 100644 --- a/nova/api/openstack/auth.py +++ b/nova/api/openstack/auth.py @@ -28,11 +28,13 @@ from nova import context from nova import db from nova import exception from nova import flags +from nova import log as logging from nova import manager from nova import utils from nova import wsgi from nova.api.openstack import faults +LOG = logging.getLogger('nova.api.openstack') FLAGS = flags.FLAGS @@ -50,14 +52,23 @@ class AuthMiddleware(wsgi.Middleware): def __call__(self, req): if not self.has_authentication(req): return self.authenticate(req) - user = self.get_user_by_authentication(req) - + accounts = self.auth.get_projects(user=user) if not user: return faults.Fault(webob.exc.HTTPUnauthorized()) - project = self.auth.get_project(FLAGS.default_project) - req.environ['nova.context'] = context.RequestContext(user, project) + if accounts: + #we are punting on this til auth is settled, + #and possibly til api v1.1 (mdragon) + account = accounts[0] + else: + return faults.Fault(webob.exc.HTTPUnauthorized()) + + if not self.auth.is_admin(user) and \ + not self.auth.is_project_member(user, account): + return faults.Fault(webob.exc.HTTPUnauthorized()) + + req.environ['nova.context'] = context.RequestContext(user, account) return self.application def has_authentication(self, req): @@ -125,14 +136,15 @@ class AuthMiddleware(wsgi.Middleware): """ ctxt = context.get_admin_context() user = self.auth.get_user_from_access_key(key) + if user and user.name == username: token_hash = hashlib.sha1('%s%s%f' % (username, key, time.time())).hexdigest() token_dict = {} token_dict['token_hash'] = token_hash token_dict['cdn_management_url'] = '' - # Same as auth url, e.g. http://foo.org:8774/baz/v1.0 - token_dict['server_management_url'] = req.url + os_url = req.url + token_dict['server_management_url'] = os_url token_dict['storage_url'] = '' token_dict['user_id'] = user.id token = self.db.auth_token_create(ctxt, token_dict) diff --git a/nova/api/openstack/users.py b/nova/api/openstack/users.py new file mode 100644 index 000000000..ebd0f4512 --- /dev/null +++ b/nova/api/openstack/users.py @@ -0,0 +1,93 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import common + +from nova import exception +from nova import flags +from nova import log as logging +from nova import wsgi + +from nova.auth import manager + +FLAGS = flags.FLAGS +LOG = logging.getLogger('nova.api.openstack') + + +def _translate_keys(user): + return dict(id=user.id, + name=user.name, + access=user.access, + secret=user.secret, + admin=user.admin) + + +class Controller(wsgi.Controller): + + _serialization_metadata = { + 'application/xml': { + "attributes": { + "user": ["id", "name", "access", "secret", "admin"]}}} + + def __init__(self): + self.manager = manager.AuthManager() + + def _check_admin(self, context): + """We cannot depend on the db layer to check for admin access + for the auth manager, so we do it here""" + if not context.is_admin: + raise exception.NotAuthorized(_("Not admin user")) + + def index(self, req): + """Return all users in brief""" + users = self.manager.get_users() + users = common.limited(users, req) + users = [_translate_keys(user) for user in users] + return dict(users=users) + + def detail(self, req): + """Return all users in detail""" + return self.index(req) + + def show(self, req, id): + """Return data about the given user id""" + user = self.manager.get_user(id) + return dict(user=_translate_keys(user)) + + def delete(self, req, id): + self._check_admin(req.environ['nova.context']) + self.manager.delete_user(id) + return {} + + def create(self, req): + self._check_admin(req.environ['nova.context']) + env = self._deserialize(req.body, req.get_content_type()) + is_admin = env['user'].get('admin') in ('T', 'True', True) + name = env['user'].get('name') + access = env['user'].get('access') + secret = env['user'].get('secret') + user = self.manager.create_user(name, access, secret, is_admin) + return dict(user=_translate_keys(user)) + + def update(self, req, id): + self._check_admin(req.environ['nova.context']) + env = self._deserialize(req.body, req.get_content_type()) + is_admin = env['user'].get('admin') + if is_admin is not None: + is_admin = is_admin in ('T', 'True', True) + access = env['user'].get('access') + secret = env['user'].get('secret') + self.manager.modify_user(id, access, secret, is_admin) + return dict(user=_translate_keys(self.manager.get_user(id))) diff --git a/nova/compute/api.py b/nova/compute/api.py index f5638ba0b..61f8b2a6a 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -125,6 +125,11 @@ class API(base.Base): raise quota.QuotaError(msg, "MetadataLimitExceeded") image = self.image_service.show(context, image_id) + + os_type = None + if 'properties' in image and 'os_type' in image['properties']: + os_type = image['properties']['os_type'] + if kernel_id is None: kernel_id = image['properties'].get('kernel_id', None) if ramdisk_id is None: @@ -181,7 +186,8 @@ class API(base.Base): 'key_data': key_data, 'locked': False, 'metadata': metadata, - 'availability_zone': availability_zone} + 'availability_zone': availability_zone, + 'os_type': os_type} elevated = context.elevated() instances = [] LOG.debug(_("Going to run %s instances..."), num_instances) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index b35216dd3..0cab10fc3 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -36,9 +36,12 @@ terminating it. import base64 import datetime +import os import random import string import socket +import tempfile +import time import functools from nova import exception @@ -61,6 +64,9 @@ flags.DEFINE_integer('password_length', 12, flags.DEFINE_string('console_host', socket.gethostname(), 'Console proxy host to use to connect to instances on' 'this host.') +flags.DEFINE_integer('live_migration_retry_count', 30, + ("Retry count needed in live_migration." + " sleep 1 sec for each count")) LOG = logging.getLogger('nova.compute.manager') @@ -181,7 +187,7 @@ class ComputeManager(manager.Manager): context=context) self.db.instance_update(context, instance_id, - {'host': self.host}) + {'host': self.host, 'launched_on': self.host}) self.db.instance_set_state(context, instance_id, @@ -723,3 +729,248 @@ class ComputeManager(manager.Manager): self.volume_manager.remove_compute_volume(context, volume_id) self.db.volume_detached(context, volume_id) return True + + @exception.wrap_exception + def compare_cpu(self, context, cpu_info): + """Checks the host cpu is compatible to a cpu given by xml. + + :param context: security context + :param cpu_info: json string obtained from virConnect.getCapabilities + :returns: See driver.compare_cpu + + """ + return self.driver.compare_cpu(cpu_info) + + @exception.wrap_exception + def create_shared_storage_test_file(self, context): + """Makes tmpfile under FLAGS.instance_path. + + This method enables compute nodes to recognize that they mounts + same shared storage. (create|check|creanup)_shared_storage_test_file() + is a pair. + + :param context: security context + :returns: tmpfile name(basename) + + """ + + dirpath = FLAGS.instances_path + fd, tmp_file = tempfile.mkstemp(dir=dirpath) + LOG.debug(_("Creating tmpfile %s to notify to other " + "compute nodes that they should mount " + "the same storage.") % tmp_file) + os.close(fd) + return os.path.basename(tmp_file) + + @exception.wrap_exception + def check_shared_storage_test_file(self, context, filename): + """Confirms existence of the tmpfile under FLAGS.instances_path. + + :param context: security context + :param filename: confirm existence of FLAGS.instances_path/thisfile + + """ + + tmp_file = os.path.join(FLAGS.instances_path, filename) + if not os.path.exists(tmp_file): + raise exception.NotFound(_('%s not found') % tmp_file) + + @exception.wrap_exception + def cleanup_shared_storage_test_file(self, context, filename): + """Removes existence of the tmpfile under FLAGS.instances_path. + + :param context: security context + :param filename: remove existence of FLAGS.instances_path/thisfile + + """ + + tmp_file = os.path.join(FLAGS.instances_path, filename) + os.remove(tmp_file) + + @exception.wrap_exception + def update_available_resource(self, context): + """See comments update_resource_info. + + :param context: security context + :returns: See driver.update_available_resource() + + """ + + return self.driver.update_available_resource(context, self.host) + + def pre_live_migration(self, context, instance_id): + """Preparations for live migration at dest host. + + :param context: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + + """ + + # Getting instance info + instance_ref = self.db.instance_get(context, instance_id) + ec2_id = instance_ref['hostname'] + + # Getting fixed ips + fixed_ip = self.db.instance_get_fixed_address(context, instance_id) + if not fixed_ip: + msg = _("%(instance_id)s(%(ec2_id)s) does not have fixed_ip.") + raise exception.NotFound(msg % locals()) + + # If any volume is mounted, prepare here. + if not instance_ref['volumes']: + LOG.info(_("%s has no volume."), ec2_id) + else: + for v in instance_ref['volumes']: + self.volume_manager.setup_compute_volume(context, v['id']) + + # Bridge settings. + # Call this method prior to ensure_filtering_rules_for_instance, + # since bridge is not set up, ensure_filtering_rules_for instance + # fails. + # + # Retry operation is necessary because continuously request comes, + # concorrent request occurs to iptables, then it complains. + max_retry = FLAGS.live_migration_retry_count + for cnt in range(max_retry): + try: + self.network_manager.setup_compute_network(context, + instance_id) + break + except exception.ProcessExecutionError: + if cnt == max_retry - 1: + raise + else: + LOG.warn(_("setup_compute_network() failed %(cnt)d." + "Retry up to %(max_retry)d for %(ec2_id)s.") + % locals()) + time.sleep(1) + + # Creating filters to hypervisors and firewalls. + # An example is that nova-instance-instance-xxx, + # which is written to libvirt.xml(Check "virsh nwfilter-list") + # This nwfilter is necessary on the destination host. + # In addition, this method is creating filtering rule + # onto destination host. + self.driver.ensure_filtering_rules_for_instance(instance_ref) + + def live_migration(self, context, instance_id, dest): + """Executing live migration. + + :param context: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param dest: destination host + + """ + + # Get instance for error handling. + instance_ref = self.db.instance_get(context, instance_id) + i_name = instance_ref.name + + try: + # Checking volume node is working correctly when any volumes + # are attached to instances. + if instance_ref['volumes']: + rpc.call(context, + FLAGS.volume_topic, + {"method": "check_for_export", + "args": {'instance_id': instance_id}}) + + # Asking dest host to preparing live migration. + rpc.call(context, + self.db.queue_get_for(context, FLAGS.compute_topic, dest), + {"method": "pre_live_migration", + "args": {'instance_id': instance_id}}) + + except Exception: + msg = _("Pre live migration for %(i_name)s failed at %(dest)s") + LOG.error(msg % locals()) + self.recover_live_migration(context, instance_ref) + raise + + # Executing live migration + # live_migration might raises exceptions, but + # nothing must be recovered in this version. + self.driver.live_migration(context, instance_ref, dest, + self.post_live_migration, + self.recover_live_migration) + + def post_live_migration(self, ctxt, instance_ref, dest): + """Post operations for live migration. + + This method is called from live_migration + and mainly updating database record. + + :param ctxt: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param dest: destination host + + """ + + LOG.info(_('post_live_migration() is started..')) + instance_id = instance_ref['id'] + + # Detaching volumes. + try: + for vol in self.db.volume_get_all_by_instance(ctxt, instance_id): + self.volume_manager.remove_compute_volume(ctxt, vol['id']) + except exception.NotFound: + pass + + # Releasing vlan. + # (not necessary in current implementation?) + + # Releasing security group ingress rule. + self.driver.unfilter_instance(instance_ref) + + # Database updating. + i_name = instance_ref.name + try: + # Not return if floating_ip is not found, otherwise, + # instance never be accessible.. + floating_ip = self.db.instance_get_floating_address(ctxt, + instance_id) + if not floating_ip: + LOG.info(_('No floating_ip is found for %s.'), i_name) + else: + floating_ip_ref = self.db.floating_ip_get_by_address(ctxt, + floating_ip) + self.db.floating_ip_update(ctxt, + floating_ip_ref['address'], + {'host': dest}) + except exception.NotFound: + LOG.info(_('No floating_ip is found for %s.'), i_name) + except: + LOG.error(_("Live migration: Unexpected error:" + "%s cannot inherit floating ip..") % i_name) + + # Restore instance/volume state + self.recover_live_migration(ctxt, instance_ref, dest) + + LOG.info(_('Migrating %(i_name)s to %(dest)s finished successfully.') + % locals()) + LOG.info(_("You may see the error \"libvirt: QEMU error: " + "Domain not found: no domain with matching name.\" " + "This error can be safely ignored.")) + + def recover_live_migration(self, ctxt, instance_ref, host=None): + """Recovers Instance/volume state from migrating -> running. + + :param ctxt: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param host: + DB column value is updated by this hostname. + if none, the host instance currently running is selected. + + """ + + if not host: + host = instance_ref['host'] + + self.db.instance_update(ctxt, + instance_ref['id'], + {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': host}) + + for volume in instance_ref['volumes']: + self.db.volume_update(ctxt, volume['id'], {'status': 'in-use'}) diff --git a/nova/db/api.py b/nova/db/api.py index aa86f0af1..0aa846d61 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -104,6 +104,11 @@ def service_get_all_by_host(context, host): return IMPL.service_get_all_by_host(context, host) +def service_get_all_compute_by_host(context, host): + """Get all compute services for a given host.""" + return IMPL.service_get_all_compute_by_host(context, host) + + def service_get_all_compute_sorted(context): """Get all compute services sorted by instance count. @@ -153,6 +158,29 @@ def service_update(context, service_id, values): ################### +def compute_node_get(context, compute_id, session=None): + """Get an computeNode or raise if it does not exist.""" + return IMPL.compute_node_get(context, compute_id) + + +def compute_node_create(context, values): + """Create a computeNode from the values dictionary.""" + return IMPL.compute_node_create(context, values) + + +def compute_node_update(context, compute_id, values): + """Set the given properties on an computeNode and update it. + + Raises NotFound if computeNode does not exist. + + """ + + return IMPL.compute_node_update(context, compute_id, values) + + +################### + + def certificate_create(context, values): """Create a certificate from the values dictionary.""" return IMPL.certificate_create(context, values) @@ -257,6 +285,11 @@ def floating_ip_get_by_address(context, address): return IMPL.floating_ip_get_by_address(context, address) +def floating_ip_update(context, address, values): + """Update a floating ip by address or raise if it doesn't exist.""" + return IMPL.floating_ip_update(context, address, values) + + #################### def migration_update(context, id, values): @@ -441,6 +474,27 @@ def instance_add_security_group(context, instance_id, security_group_id): security_group_id) +def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id): + """Get instances.vcpus by host and project.""" + return IMPL.instance_get_vcpu_sum_by_host_and_project(context, + hostname, + proj_id) + + +def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id): + """Get amount of memory by host and project.""" + return IMPL.instance_get_memory_sum_by_host_and_project(context, + hostname, + proj_id) + + +def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id): + """Get total amount of disk by host and project.""" + return IMPL.instance_get_disk_sum_by_host_and_project(context, + hostname, + proj_id) + + def instance_action_create(context, values): """Create an instance action from the values dictionary.""" return IMPL.instance_action_create(context, values) @@ -765,6 +819,11 @@ def volume_get_all_by_host(context, host): return IMPL.volume_get_all_by_host(context, host) +def volume_get_all_by_instance(context, instance_id): + """Get all volumes belonging to a instance.""" + return IMPL.volume_get_all_by_instance(context, instance_id) + + def volume_get_all_by_project(context, project_id): """Get all volumes belonging to a project.""" return IMPL.volume_get_all_by_project(context, project_id) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 3e94082df..56998ce05 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -34,6 +34,7 @@ from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload_all from sqlalchemy.sql import exists from sqlalchemy.sql import func +from sqlalchemy.sql.expression import literal_column FLAGS = flags.FLAGS @@ -118,6 +119,11 @@ def service_destroy(context, service_id): service_ref = service_get(context, service_id, session=session) service_ref.delete(session=session) + if service_ref.topic == 'compute' and \ + len(service_ref.compute_node) != 0: + for c in service_ref.compute_node: + c.delete(session=session) + @require_admin_context def service_get(context, service_id, session=None): @@ -125,6 +131,7 @@ def service_get(context, service_id, session=None): session = get_session() result = session.query(models.Service).\ + options(joinedload('compute_node')).\ filter_by(id=service_id).\ filter_by(deleted=can_read_deleted(context)).\ first() @@ -175,6 +182,24 @@ def service_get_all_by_host(context, host): @require_admin_context +def service_get_all_compute_by_host(context, host): + topic = 'compute' + session = get_session() + result = session.query(models.Service).\ + options(joinedload('compute_node')).\ + filter_by(deleted=False).\ + filter_by(host=host).\ + filter_by(topic=topic).\ + all() + + if not result: + raise exception.NotFound(_("%s does not exist or is not " + "a compute node.") % host) + + return result + + +@require_admin_context def _service_get_all_topic_subquery(context, session, topic, subq, label): sort_value = getattr(subq.c, label) return session.query(models.Service, func.coalesce(sort_value, 0)).\ @@ -285,6 +310,42 @@ def service_update(context, service_id, values): @require_admin_context +def compute_node_get(context, compute_id, session=None): + if not session: + session = get_session() + + result = session.query(models.ComputeNode).\ + filter_by(id=compute_id).\ + filter_by(deleted=can_read_deleted(context)).\ + first() + + if not result: + raise exception.NotFound(_('No computeNode for id %s') % compute_id) + + return result + + +@require_admin_context +def compute_node_create(context, values): + compute_node_ref = models.ComputeNode() + compute_node_ref.update(values) + compute_node_ref.save() + return compute_node_ref + + +@require_admin_context +def compute_node_update(context, compute_id, values): + session = get_session() + with session.begin(): + compute_ref = compute_node_get(context, compute_id, session=session) + compute_ref.update(values) + compute_ref.save(session=session) + + +################### + + +@require_admin_context def certificate_get(context, certificate_id, session=None): if not session: session = get_session() @@ -505,6 +566,16 @@ def floating_ip_get_by_address(context, address, session=None): return result +@require_context +def floating_ip_update(context, address, values): + session = get_session() + with session.begin(): + floating_ip_ref = floating_ip_get_by_address(context, address, session) + for (key, value) in values.iteritems(): + floating_ip_ref[key] = value + floating_ip_ref.save(session=session) + + ################### @@ -577,18 +648,17 @@ def fixed_ip_disassociate(context, address): @require_admin_context def fixed_ip_disassociate_all_by_timeout(_context, host, time): session = get_session() - # NOTE(vish): The nested select is because sqlite doesn't support - # JOINs in UPDATEs. - result = session.execute('UPDATE fixed_ips SET instance_id = NULL, ' - 'leased = 0 ' - 'WHERE network_id IN (SELECT id FROM networks ' - 'WHERE host = :host) ' - 'AND updated_at < :time ' - 'AND instance_id IS NOT NULL ' - 'AND allocated = 0', - {'host': host, - 'time': time}) - return result.rowcount + inner_q = session.query(models.Network.id).\ + filter_by(host=host).\ + subquery() + result = session.query(models.FixedIp).\ + filter(models.FixedIp.network_id.in_(inner_q)).\ + filter(models.FixedIp.updated_at < time).\ + filter(models.FixedIp.instance_id != None).\ + filter_by(allocated=0).\ + update({'instance_id': None, + 'leased': 0}) + return result @require_admin_context @@ -701,14 +771,16 @@ def instance_data_get_for_project(context, project_id): def instance_destroy(context, instance_id): session = get_session() with session.begin(): - session.execute('update instances set deleted=1,' - 'deleted_at=:at where id=:id', - {'id': instance_id, - 'at': datetime.datetime.utcnow()}) - session.execute('update security_group_instance_association ' - 'set deleted=1,deleted_at=:at where instance_id=:id', - {'id': instance_id, - 'at': datetime.datetime.utcnow()}) + session.query(models.Instance).\ + filter_by(id=instance_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) + session.query(models.SecurityGroupInstanceAssociation).\ + filter_by(instance_id=instance_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) @require_context @@ -905,6 +977,45 @@ def instance_add_security_group(context, instance_id, security_group_id): @require_context +def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id): + session = get_session() + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(project_id=proj_id).\ + filter_by(deleted=False).\ + value(func.sum(models.Instance.vcpus)) + if not result: + return 0 + return result + + +@require_context +def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id): + session = get_session() + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(project_id=proj_id).\ + filter_by(deleted=False).\ + value(func.sum(models.Instance.memory_mb)) + if not result: + return 0 + return result + + +@require_context +def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id): + session = get_session() + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(project_id=proj_id).\ + filter_by(deleted=False).\ + value(func.sum(models.Instance.local_gb)) + if not result: + return 0 + return result + + +@require_context def instance_action_create(context, values): """Create an instance action from the values dictionary.""" action_ref = models.InstanceActions() @@ -950,9 +1061,11 @@ def key_pair_destroy_all_by_user(context, user_id): authorize_user_context(context, user_id) session = get_session() with session.begin(): - # TODO(vish): do we have to use sql here? - session.execute('update key_pairs set deleted=1 where user_id=:id', - {'id': user_id}) + session.query(models.KeyPair).\ + filter_by(user_id=user_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) @require_context @@ -1072,7 +1185,9 @@ def network_disassociate(context, network_id): @require_admin_context def network_disassociate_all(context): session = get_session() - session.execute('update networks set project_id=NULL') + session.query(models.Network).\ + update({'project_id': None, + 'updated_at': literal_column('updated_at')}) @require_context @@ -1454,15 +1569,17 @@ def volume_data_get_for_project(context, project_id): def volume_destroy(context, volume_id): session = get_session() with session.begin(): - # TODO(vish): do we have to use sql here? - session.execute('update volumes set deleted=1 where id=:id', - {'id': volume_id}) - session.execute('update export_devices set volume_id=NULL ' - 'where volume_id=:id', - {'id': volume_id}) - session.execute('update iscsi_targets set volume_id=NULL ' - 'where volume_id=:id', - {'id': volume_id}) + session.query(models.Volume).\ + filter_by(id=volume_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) + session.query(models.ExportDevice).\ + filter_by(volume_id=volume_id).\ + update({'volume_id': None}) + session.query(models.IscsiTarget).\ + filter_by(volume_id=volume_id).\ + update({'volume_id': None}) @require_admin_context @@ -1522,6 +1639,18 @@ def volume_get_all_by_host(context, host): all() +@require_admin_context +def volume_get_all_by_instance(context, instance_id): + session = get_session() + result = session.query(models.Volume).\ + filter_by(instance_id=instance_id).\ + filter_by(deleted=False).\ + all() + if not result: + raise exception.NotFound(_('No volume for instance %s') % instance_id) + return result + + @require_context def volume_get_all_by_project(context, project_id): authorize_project_context(context, project_id) @@ -1682,17 +1811,21 @@ def security_group_create(context, values): def security_group_destroy(context, security_group_id): session = get_session() with session.begin(): - # TODO(vish): do we have to use sql here? - session.execute('update security_groups set deleted=1 where id=:id', - {'id': security_group_id}) - session.execute('update security_group_instance_association ' - 'set deleted=1,deleted_at=:at ' - 'where security_group_id=:id', - {'id': security_group_id, - 'at': datetime.datetime.utcnow()}) - session.execute('update security_group_rules set deleted=1 ' - 'where group_id=:id', - {'id': security_group_id}) + session.query(models.SecurityGroup).\ + filter_by(id=security_group_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) + session.query(models.SecurityGroupInstanceAssociation).\ + filter_by(security_group_id=security_group_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) + session.query(models.SecurityGroupIngressRule).\ + filter_by(group_id=security_group_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) @require_context @@ -1700,9 +1833,14 @@ def security_group_destroy_all(context, session=None): if not session: session = get_session() with session.begin(): - # TODO(vish): do we have to use sql here? - session.execute('update security_groups set deleted=1') - session.execute('update security_group_rules set deleted=1') + session.query(models.SecurityGroup).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) + session.query(models.SecurityGroupIngressRule).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) ################### @@ -1831,12 +1969,15 @@ def user_create(_context, values): def user_delete(context, id): session = get_session() with session.begin(): - session.execute('delete from user_project_association ' - 'where user_id=:id', {'id': id}) - session.execute('delete from user_role_association ' - 'where user_id=:id', {'id': id}) - session.execute('delete from user_project_role_association ' - 'where user_id=:id', {'id': id}) + session.query(models.UserProjectAssociation).\ + filter_by(user_id=id).\ + delete() + session.query(models.UserRoleAssociation).\ + filter_by(user_id=id).\ + delete() + session.query(models.UserProjectRoleAssociation).\ + filter_by(user_id=id).\ + delete() user_ref = user_get(context, id, session=session) session.delete(user_ref) @@ -1893,8 +2034,11 @@ def project_get_by_user(context, user_id): session = get_session() user = session.query(models.User).\ filter_by(deleted=can_read_deleted(context)).\ + filter_by(id=user_id).\ options(joinedload_all('projects')).\ first() + if not user: + raise exception.NotFound(_('Invalid user_id %s') % user_id) return user.projects @@ -1927,10 +2071,12 @@ def project_update(context, project_id, values): def project_delete(context, id): session = get_session() with session.begin(): - session.execute('delete from user_project_association ' - 'where project_id=:id', {'id': id}) - session.execute('delete from user_project_role_association ' - 'where project_id=:id', {'id': id}) + session.query(models.UserProjectAssociation).\ + filter_by(project_id=id).\ + delete() + session.query(models.UserProjectRoleAssociation).\ + filter_by(project_id=id).\ + delete() project_ref = project_get(context, id, session=session) session.delete(project_ref) @@ -1955,11 +2101,11 @@ def user_get_roles_for_project(context, user_id, project_id): def user_remove_project_role(context, user_id, project_id, role): session = get_session() with session.begin(): - session.execute('delete from user_project_role_association where ' - 'user_id=:user_id and project_id=:project_id and ' - 'role=:role', {'user_id': user_id, - 'project_id': project_id, - 'role': role}) + session.query(models.UserProjectRoleAssociation).\ + filter_by(user_id=user_id).\ + filter_by(project_id=project_id).\ + filter_by(role=role).\ + delete() def user_remove_role(context, user_id, role): @@ -2110,8 +2256,9 @@ def console_delete(context, console_id): session = get_session() with session.begin(): # consoles are meant to be transient. (mdragon) - session.execute('delete from consoles ' - 'where id=:id', {'id': console_id}) + session.query(models.Console).\ + filter_by(id=console_id).\ + delete() def console_get_by_pool_instance(context, pool_id, instance_id): @@ -2267,8 +2414,9 @@ def zone_update(context, zone_id, values): def zone_delete(context, zone_id): session = get_session() with session.begin(): - session.execute('delete from zones ' - 'where id=:id', {'id': zone_id}) + session.query(models.Zone).\ + filter_by(id=zone_id).\ + delete() @require_admin_context diff --git a/nova/db/sqlalchemy/migrate_repo/versions/010_add_os_type_to_instances.py b/nova/db/sqlalchemy/migrate_repo/versions/010_add_os_type_to_instances.py new file mode 100644 index 000000000..eb3066894 --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/010_add_os_type_to_instances.py @@ -0,0 +1,51 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import * +from sqlalchemy.sql import text +from migrate import * + +from nova import log as logging + + +meta = MetaData() + +instances = Table('instances', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + +instances_os_type = Column('os_type', + String(length=255, convert_unicode=False, + assert_unicode=None, unicode_error=None, + _warn_on_bytestring=False), + nullable=True) + + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; + # bind migrate_engine to your metadata + meta.bind = migrate_engine + + instances.create_column(instances_os_type) + migrate_engine.execute(instances.update()\ + .where(instances.c.os_type == None)\ + .values(os_type='linux')) + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + + instances.drop_column('os_type') diff --git a/nova/db/sqlalchemy/migrate_repo/versions/011_live_migration.py b/nova/db/sqlalchemy/migrate_repo/versions/011_live_migration.py new file mode 100644 index 000000000..23ccccb4e --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/011_live_migration.py @@ -0,0 +1,83 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from migrate import * +from nova import log as logging +from sqlalchemy import * + + +meta = MetaData() + +instances = Table('instances', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + +# +# New Tables +# + +compute_nodes = Table('compute_nodes', meta, + Column('created_at', DateTime(timezone=False)), + Column('updated_at', DateTime(timezone=False)), + Column('deleted_at', DateTime(timezone=False)), + Column('deleted', Boolean(create_constraint=True, name=None)), + Column('id', Integer(), primary_key=True, nullable=False), + Column('service_id', Integer(), nullable=False), + + Column('vcpus', Integer(), nullable=False), + Column('memory_mb', Integer(), nullable=False), + Column('local_gb', Integer(), nullable=False), + Column('vcpus_used', Integer(), nullable=False), + Column('memory_mb_used', Integer(), nullable=False), + Column('local_gb_used', Integer(), nullable=False), + Column('hypervisor_type', + Text(convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=False), + Column('hypervisor_version', Integer(), nullable=False), + Column('cpu_info', + Text(convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=False), + ) + + +# +# Tables to alter +# +instances_launched_on = Column( + 'launched_on', + Text(convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=True) + + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; + # bind migrate_engine to your metadata + meta.bind = migrate_engine + + try: + compute_nodes.create() + except Exception: + logging.info(repr(compute_nodes)) + logging.exception('Exception while creating table') + meta.drop_all(tables=[compute_nodes]) + raise + + instances.create_column(instances_launched_on) diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 6ef284e65..162f6fded 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -113,6 +113,41 @@ class Service(BASE, NovaBase): availability_zone = Column(String(255), default='nova') +class ComputeNode(BASE, NovaBase): + """Represents a running compute service on a host.""" + + __tablename__ = 'compute_nodes' + id = Column(Integer, primary_key=True) + service_id = Column(Integer, ForeignKey('services.id'), nullable=True) + service = relationship(Service, + backref=backref('compute_node'), + foreign_keys=service_id, + primaryjoin='and_(' + 'ComputeNode.service_id == Service.id,' + 'ComputeNode.deleted == False)') + + vcpus = Column(Integer, nullable=True) + memory_mb = Column(Integer, nullable=True) + local_gb = Column(Integer, nullable=True) + vcpus_used = Column(Integer, nullable=True) + memory_mb_used = Column(Integer, nullable=True) + local_gb_used = Column(Integer, nullable=True) + hypervisor_type = Column(Text, nullable=True) + hypervisor_version = Column(Integer, nullable=True) + + # Note(masumotok): Expected Strings example: + # + # '{"arch":"x86_64", + # "model":"Nehalem", + # "topology":{"sockets":1, "threads":2, "cores":3}, + # "features":["tdtscp", "xtpr"]}' + # + # Points are "json translatable" and it must have all dictionary keys + # above, since it is copied from <cpu> tag of getCapabilities() + # (See libvirt.virtConnection). + cpu_info = Column(Text, nullable=True) + + class Certificate(BASE, NovaBase): """Represents a an x509 certificate""" __tablename__ = 'certificates' @@ -191,8 +226,13 @@ class Instance(BASE, NovaBase): display_name = Column(String(255)) display_description = Column(String(255)) + # To remember on which host a instance booted. + # An instance may have moved to another host by live migraiton. + launched_on = Column(Text) locked = Column(Boolean) + os_type = Column(String(255)) + # TODO(vish): see Ewan's email about state improvements, probably # should be in a driver base class or some such # vmstate_state = running, halted, suspended, paused diff --git a/nova/network/linux_net.py b/nova/network/linux_net.py index c0bd76adf..7106e6164 100644 --- a/nova/network/linux_net.py +++ b/nova/network/linux_net.py @@ -17,7 +17,11 @@ Implements vlans, bridges, and iptables rules using linux utilities. """ +import inspect import os +import calendar + +from eventlet import semaphore from nova import db from nova import exception @@ -25,7 +29,6 @@ from nova import flags from nova import log as logging from nova import utils - LOG = logging.getLogger("nova.linux_net") @@ -52,10 +55,10 @@ flags.DEFINE_string('dhcpbridge', _bin_file('nova-dhcpbridge'), 'location of nova-dhcpbridge') flags.DEFINE_string('routing_source_ip', '$my_ip', 'Public IP of network host') -flags.DEFINE_bool('use_nova_chains', False, - 'use the nova_ routing chains instead of default') flags.DEFINE_string('input_chain', 'INPUT', 'chain to add nova_input to') +flags.DEFINE_integer('dhcp_lease_time', 120, + 'Lifetime of a DHCP lease') flags.DEFINE_string('dns_server', None, 'if set, uses specific dns server for dnsmasq') @@ -63,79 +66,332 @@ flags.DEFINE_string('dmz_cidr', '10.128.0.0/24', 'dmz range that should be accepted') +binary_name = os.path.basename(inspect.stack()[-1][1]) + + +class IptablesRule(object): + """An iptables rule + + You shouldn't need to use this class directly, it's only used by + IptablesManager + """ + def __init__(self, chain, rule, wrap=True, top=False): + self.chain = chain + self.rule = rule + self.wrap = wrap + self.top = top + + def __eq__(self, other): + return ((self.chain == other.chain) and + (self.rule == other.rule) and + (self.top == other.top) and + (self.wrap == other.wrap)) + + def __ne__(self, other): + return not self == other + + def __str__(self): + if self.wrap: + chain = '%s-%s' % (binary_name, self.chain) + else: + chain = self.chain + return '-A %s %s' % (chain, self.rule) + + +class IptablesTable(object): + """An iptables table""" + + def __init__(self): + self.rules = [] + self.chains = set() + self.unwrapped_chains = set() + + def add_chain(self, name, wrap=True): + """Adds a named chain to the table + + The chain name is wrapped to be unique for the component creating + it, so different components of Nova can safely create identically + named chains without interfering with one another. + + At the moment, its wrapped name is <binary name>-<chain name>, + so if nova-compute creates a chain named "OUTPUT", it'll actually + end up named "nova-compute-OUTPUT". + """ + if wrap: + self.chains.add(name) + else: + self.unwrapped_chains.add(name) + + def remove_chain(self, name, wrap=True): + """Remove named chain + + This removal "cascades". All rule in the chain are removed, as are + all rules in other chains that jump to it. + + If the chain is not found, this is merely logged. + """ + if wrap: + chain_set = self.chains + else: + chain_set = self.unwrapped_chains + + if name not in chain_set: + LOG.debug(_("Attempted to remove chain %s which doesn't exist"), + name) + return + + chain_set.remove(name) + self.rules = filter(lambda r: r.chain != name, self.rules) + + if wrap: + jump_snippet = '-j %s-%s' % (binary_name, name) + else: + jump_snippet = '-j %s' % (name,) + + self.rules = filter(lambda r: jump_snippet not in r.rule, self.rules) + + def add_rule(self, chain, rule, wrap=True, top=False): + """Add a rule to the table + + This is just like what you'd feed to iptables, just without + the "-A <chain name>" bit at the start. + + However, if you need to jump to one of your wrapped chains, + prepend its name with a '$' which will ensure the wrapping + is applied correctly. + """ + if wrap and chain not in self.chains: + raise ValueError(_("Unknown chain: %r") % chain) + + if '$' in rule: + rule = ' '.join(map(self._wrap_target_chain, rule.split(' '))) + + self.rules.append(IptablesRule(chain, rule, wrap, top)) + + def _wrap_target_chain(self, s): + if s.startswith('$'): + return '%s-%s' % (binary_name, s[1:]) + return s + + def remove_rule(self, chain, rule, wrap=True, top=False): + """Remove a rule from a chain + + Note: The rule must be exactly identical to the one that was added. + You cannot switch arguments around like you can with the iptables + CLI tool. + """ + try: + self.rules.remove(IptablesRule(chain, rule, wrap, top)) + except ValueError: + LOG.debug(_("Tried to remove rule that wasn't there:" + " %(chain)r %(rule)r %(wrap)r %(top)r"), + {'chain': chain, 'rule': rule, + 'top': top, 'wrap': wrap}) + + +class IptablesManager(object): + """Wrapper for iptables + + See IptablesTable for some usage docs + + A number of chains are set up to begin with. + + First, nova-filter-top. It's added at the top of FORWARD and OUTPUT. Its + name is not wrapped, so it's shared between the various nova workers. It's + intended for rules that need to live at the top of the FORWARD and OUTPUT + chains. It's in both the ipv4 and ipv6 set of tables. + + For ipv4 and ipv6, the builtin INPUT, OUTPUT, and FORWARD filter chains are + wrapped, meaning that the "real" INPUT chain has a rule that jumps to the + wrapped INPUT chain, etc. Additionally, there's a wrapped chain named + "local" which is jumped to from nova-filter-top. + + For ipv4, the builtin PREROUTING, OUTPUT, and POSTROUTING nat chains are + wrapped in the same was as the builtin filter chains. Additionally, there's + a snat chain that is applied after the POSTROUTING chain. + """ + def __init__(self, execute=None): + if not execute: + if FLAGS.fake_network: + self.execute = lambda *args, **kwargs: ('', '') + else: + self.execute = utils.execute + else: + self.execute = execute + + self.ipv4 = {'filter': IptablesTable(), + 'nat': IptablesTable()} + self.ipv6 = {'filter': IptablesTable()} + + # Add a nova-filter-top chain. It's intended to be shared + # among the various nova components. It sits at the very top + # of FORWARD and OUTPUT. + for tables in [self.ipv4, self.ipv6]: + tables['filter'].add_chain('nova-filter-top', wrap=False) + tables['filter'].add_rule('FORWARD', '-j nova-filter-top', + wrap=False, top=True) + tables['filter'].add_rule('OUTPUT', '-j nova-filter-top', + wrap=False, top=True) + + tables['filter'].add_chain('local') + tables['filter'].add_rule('nova-filter-top', '-j $local', + wrap=False) + + # Wrap the builtin chains + builtin_chains = {4: {'filter': ['INPUT', 'OUTPUT', 'FORWARD'], + 'nat': ['PREROUTING', 'OUTPUT', 'POSTROUTING']}, + 6: {'filter': ['INPUT', 'OUTPUT', 'FORWARD']}} + + for ip_version in builtin_chains: + if ip_version == 4: + tables = self.ipv4 + elif ip_version == 6: + tables = self.ipv6 + + for table, chains in builtin_chains[ip_version].iteritems(): + for chain in chains: + tables[table].add_chain(chain) + tables[table].add_rule(chain, '-j $%s' % (chain,), + wrap=False) + + # Add a nova-postrouting-bottom chain. It's intended to be shared + # among the various nova components. We set it as the last chain + # of POSTROUTING chain. + self.ipv4['nat'].add_chain('nova-postrouting-bottom', wrap=False) + self.ipv4['nat'].add_rule('POSTROUTING', '-j nova-postrouting-bottom', + wrap=False) + + # We add a snat chain to the shared nova-postrouting-bottom chain + # so that it's applied last. + self.ipv4['nat'].add_chain('snat') + self.ipv4['nat'].add_rule('nova-postrouting-bottom', '-j $snat', + wrap=False) + + # And then we add a floating-snat chain and jump to first thing in + # the snat chain. + self.ipv4['nat'].add_chain('floating-snat') + self.ipv4['nat'].add_rule('snat', '-j $floating-snat') + + self.semaphore = semaphore.Semaphore() + + @utils.synchronized('iptables') + def apply(self): + """Apply the current in-memory set of iptables rules + + This will blow away any rules left over from previous runs of the + same component of Nova, and replace them with our current set of + rules. This happens atomically, thanks to iptables-restore. + + We wrap the call in a semaphore lock, so that we don't race with + ourselves. In the event of a race with another component running + an iptables-* command at the same time, we retry up to 5 times. + """ + with self.semaphore: + s = [('iptables', self.ipv4)] + if FLAGS.use_ipv6: + s += [('ip6tables', self.ipv6)] + + for cmd, tables in s: + for table in tables: + current_table, _ = self.execute('sudo', + '%s-save' % (cmd,), + '-t', '%s' % (table,), + attempts=5) + current_lines = current_table.split('\n') + new_filter = self._modify_rules(current_lines, + tables[table]) + self.execute('sudo', '%s-restore' % (cmd,), + process_input='\n'.join(new_filter), + attempts=5) + + def _modify_rules(self, current_lines, table, binary=None): + unwrapped_chains = table.unwrapped_chains + chains = table.chains + rules = table.rules + + # Remove any trace of our rules + new_filter = filter(lambda line: binary_name not in line, + current_lines) + + seen_chains = False + rules_index = 0 + for rules_index, rule in enumerate(new_filter): + if not seen_chains: + if rule.startswith(':'): + seen_chains = True + else: + if not rule.startswith(':'): + break + + our_rules = [] + for rule in rules: + rule_str = str(rule) + if rule.top: + # rule.top == True means we want this rule to be at the top. + # Further down, we weed out duplicates from the bottom of the + # list, so here we remove the dupes ahead of time. + new_filter = filter(lambda s: s.strip() != rule_str.strip(), + new_filter) + our_rules += [rule_str] + + new_filter[rules_index:rules_index] = our_rules + + new_filter[rules_index:rules_index] = [':%s - [0:0]' % \ + (name,) \ + for name in unwrapped_chains] + new_filter[rules_index:rules_index] = [':%s-%s - [0:0]' % \ + (binary_name, name,) \ + for name in chains] + + seen_lines = set() + + def _weed_out_duplicates(line): + line = line.strip() + if line in seen_lines: + return False + else: + seen_lines.add(line) + return True + + # We filter duplicates, letting the *last* occurrence take + # precendence. + new_filter.reverse() + new_filter = filter(_weed_out_duplicates, new_filter) + new_filter.reverse() + return new_filter + + +iptables_manager = IptablesManager() + + def metadata_forward(): """Create forwarding rule for metadata""" - _confirm_rule("PREROUTING", '-t', 'nat', '-s', '0.0.0.0/0', - '-d', '169.254.169.254/32', '-p', 'tcp', '-m', 'tcp', - '--dport', '80', '-j', 'DNAT', - '--to-destination', - '%s:%s' % (FLAGS.ec2_dmz_host, FLAGS.ec2_port)) + iptables_manager.ipv4['nat'].add_rule("PREROUTING", + "-s 0.0.0.0/0 -d 169.254.169.254/32 " + "-p tcp -m tcp --dport 80 -j DNAT " + "--to-destination %s:%s" % \ + (FLAGS.ec2_dmz_host, FLAGS.ec2_port)) + iptables_manager.apply() def init_host(): """Basic networking setup goes here""" - - if FLAGS.use_nova_chains: - _execute('sudo', 'iptables', '-N', 'nova_input', check_exit_code=False) - _execute('sudo', 'iptables', '-D', FLAGS.input_chain, - '-j', 'nova_input', - check_exit_code=False) - _execute('sudo', 'iptables', '-A', FLAGS.input_chain, - '-j', 'nova_input') - _execute('sudo', 'iptables', '-N', 'nova_forward', - check_exit_code=False) - _execute('sudo', 'iptables', '-D', 'FORWARD', '-j', 'nova_forward', - check_exit_code=False) - _execute('sudo', 'iptables', '-A', 'FORWARD', '-j', 'nova_forward') - _execute('sudo', 'iptables', '-N', 'nova_output', - check_exit_code=False) - _execute('sudo', 'iptables', '-D', 'OUTPUT', '-j', 'nova_output', - check_exit_code=False) - _execute('sudo', 'iptables', '-A', 'OUTPUT', '-j', 'nova_output') - _execute('sudo', 'iptables', '-t', 'nat', '-N', 'nova_prerouting', - check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-D', 'PREROUTING', - '-j', 'nova_prerouting', check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-A', 'PREROUTING', - '-j', 'nova_prerouting') - _execute('sudo', 'iptables', '-t', 'nat', '-N', 'nova_postrouting', - check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-D', 'POSTROUTING', - '-j', 'nova_postrouting', check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-A', 'POSTROUTING', - '-j', 'nova_postrouting') - _execute('sudo', 'iptables', '-t', 'nat', '-N', 'nova_snatting', - check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-D', 'POSTROUTING', - '-j nova_snatting', check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-A', 'POSTROUTING', - '-j', 'nova_snatting') - _execute('sudo', 'iptables', '-t', 'nat', '-N', 'nova_output', - check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-D', 'OUTPUT', - '-j nova_output', check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-A', 'OUTPUT', - '-j', 'nova_output') - else: - # NOTE(vish): This makes it easy to ensure snatting rules always - # come after the accept rules in the postrouting chain - _execute('sudo', 'iptables', '-t', 'nat', '-N', 'SNATTING', - check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-D', 'POSTROUTING', - '-j', 'SNATTING', check_exit_code=False) - _execute('sudo', 'iptables', '-t', 'nat', '-A', 'POSTROUTING', - '-j', 'SNATTING') - # NOTE(devcamcar): Cloud public SNAT entries and the default # SNAT rule for outbound traffic. - _confirm_rule("SNATTING", '-t', 'nat', '-s', FLAGS.fixed_range, - '-j', 'SNAT', '--to-source', FLAGS.routing_source_ip, - append=True) + iptables_manager.ipv4['nat'].add_rule("snat", + "-s %s -j SNAT --to-source %s" % \ + (FLAGS.fixed_range, + FLAGS.routing_source_ip)) - _confirm_rule("POSTROUTING", '-t', 'nat', '-s', FLAGS.fixed_range, - '-d', FLAGS.dmz_cidr, '-j', 'ACCEPT') - _confirm_rule("POSTROUTING", '-t', 'nat', '-s', FLAGS.fixed_range, - '-d', FLAGS.fixed_range, '-j', 'ACCEPT') + iptables_manager.ipv4['nat'].add_rule("POSTROUTING", + "-s %s -d %s -j ACCEPT" % \ + (FLAGS.fixed_range, FLAGS.dmz_cidr)) + + iptables_manager.ipv4['nat'].add_rule("POSTROUTING", + "-s %(range)s -d %(range)s " + "-j ACCEPT" % \ + {'range': FLAGS.fixed_range}) + iptables_manager.apply() def bind_floating_ip(floating_ip, check_exit_code=True): @@ -153,31 +409,36 @@ def unbind_floating_ip(floating_ip): def ensure_vlan_forward(public_ip, port, private_ip): """Sets up forwarding rules for vlan""" - _confirm_rule("FORWARD", '-d', private_ip, '-p', 'udp', - '--dport', '1194', '-j', 'ACCEPT') - _confirm_rule("PREROUTING", '-t', 'nat', '-d', public_ip, '-p', 'udp', - '--dport', port, '-j', 'DNAT', '--to', '%s:1194' - % private_ip) + iptables_manager.ipv4['filter'].add_rule("FORWARD", + "-d %s -p udp " + "--dport 1194 " + "-j ACCEPT" % private_ip) + iptables_manager.ipv4['nat'].add_rule("PREROUTING", + "-d %s -p udp " + "--dport %s -j DNAT --to %s:1194" % + (public_ip, port, private_ip)) + iptables_manager.apply() def ensure_floating_forward(floating_ip, fixed_ip): """Ensure floating ip forwarding rule""" - _confirm_rule("PREROUTING", '-t', 'nat', '-d', floating_ip, '-j', 'DNAT', - '--to', fixed_ip) - _confirm_rule("OUTPUT", '-t', 'nat', '-d', floating_ip, '-j', 'DNAT', - '--to', fixed_ip) - _confirm_rule("SNATTING", '-t', 'nat', '-s', fixed_ip, '-j', 'SNAT', - '--to', floating_ip) + for chain, rule in floating_forward_rules(floating_ip, fixed_ip): + iptables_manager.ipv4['nat'].add_rule(chain, rule) + iptables_manager.apply() def remove_floating_forward(floating_ip, fixed_ip): """Remove forwarding for floating ip""" - _remove_rule("PREROUTING", '-t', 'nat', '-d', floating_ip, '-j', 'DNAT', - '--to', fixed_ip) - _remove_rule("OUTPUT", '-t', 'nat', '-d', floating_ip, '-j', 'DNAT', - '--to', fixed_ip) - _remove_rule("SNATTING", '-t', 'nat', '-s', fixed_ip, '-j', 'SNAT', - '--to', floating_ip) + for chain, rule in floating_forward_rules(floating_ip, fixed_ip): + iptables_manager.ipv4['nat'].remove_rule(chain, rule) + iptables_manager.apply() + + +def floating_forward_rules(floating_ip, fixed_ip): + return [("PREROUTING", "-d %s -j DNAT --to %s" % (floating_ip, fixed_ip)), + ("OUTPUT", "-d %s -j DNAT --to %s" % (floating_ip, fixed_ip)), + ("floating-snat", + "-s %s -j SNAT --to %s" % (fixed_ip, floating_ip))] def ensure_vlan_bridge(vlan_num, bridge, net_attrs=None): @@ -255,11 +516,9 @@ def ensure_bridge(bridge, interface, net_attrs=None): for line in out.split("\n"): fields = line.split() if fields and fields[0] == "inet": - params = ' '.join(fields[1:-1]) - _execute('sudo', 'ip', 'addr', - 'del', params, 'dev', fields[-1]) - _execute('sudo', 'ip', 'addr', - 'add', params, 'dev', bridge) + params = fields[1:-1] + _execute(*_ip_bridge_cmd('del', params, fields[-1])) + _execute(*_ip_bridge_cmd('add', params, bridge)) if gateway: _execute('sudo', 'route', 'add', '0.0.0.0', 'gw', gateway) out, err = _execute('sudo', 'brctl', 'addif', bridge, interface, @@ -269,23 +528,25 @@ def ensure_bridge(bridge, interface, net_attrs=None): "enslave it to bridge %s.\n" % (interface, bridge)): raise exception.Error("Failed to add interface: %s" % err) - if FLAGS.use_nova_chains: - (out, err) = _execute('sudo', 'iptables', '-N', 'nova_forward', - check_exit_code=False) - if err != 'iptables: Chain already exists.\n': - # NOTE(vish): chain didn't exist link chain - _execute('sudo', 'iptables', '-D', 'FORWARD', '-j', 'nova_forward', - check_exit_code=False) - _execute('sudo', 'iptables', '-A', 'FORWARD', '-j', 'nova_forward') + iptables_manager.ipv4['filter'].add_rule("FORWARD", + "--in-interface %s -j ACCEPT" % \ + bridge) + iptables_manager.ipv4['filter'].add_rule("FORWARD", + "--out-interface %s -j ACCEPT" % \ + bridge) + - _confirm_rule("FORWARD", '--in-interface', bridge, '-j', 'ACCEPT') - _confirm_rule("FORWARD", '--out-interface', bridge, '-j', 'ACCEPT') - _execute('sudo', 'iptables', '-N', 'nova-local', check_exit_code=False) - _confirm_rule("FORWARD", '-j', 'nova-local') +def get_dhcp_leases(context, network_id): + """Return a network's hosts config in dnsmasq leasefile format""" + hosts = [] + for fixed_ip_ref in db.network_get_associated_fixed_ips(context, + network_id): + hosts.append(_host_lease(fixed_ip_ref)) + return '\n'.join(hosts) def get_dhcp_hosts(context, network_id): - """Get a string containing a network's hosts config in dnsmasq format""" + """Get a string containing a network's hosts config in dhcp-host format""" hosts = [] for fixed_ip_ref in db.network_get_associated_fixed_ips(context, network_id): @@ -376,8 +637,24 @@ interface %s utils.get_my_linklocal(network_ref['bridge'])}) +def _host_lease(fixed_ip_ref): + """Return a host string for an address in leasefile format""" + instance_ref = fixed_ip_ref['instance'] + if instance_ref['updated_at']: + timestamp = instance_ref['updated_at'] + else: + timestamp = instance_ref['created_at'] + + seconds_since_epoch = calendar.timegm(timestamp.utctimetuple()) + + return "%d %s %s %s *" % (seconds_since_epoch + FLAGS.dhcp_lease_time, + instance_ref['mac_address'], + fixed_ip_ref['address'], + instance_ref['hostname'] or '*') + + def _host_dhcp(fixed_ip_ref): - """Return a host string for an address""" + """Return a host string for an address in dhcp-host format""" instance_ref = fixed_ip_ref['instance'] return "%s,%s.%s,%s" % (instance_ref['mac_address'], instance_ref['hostname'], @@ -401,27 +678,6 @@ def _device_exists(device): return not err -def _confirm_rule(chain, *cmd, **kwargs): - append = kwargs.get('append', False) - """Delete and re-add iptables rule""" - if FLAGS.use_nova_chains: - chain = "nova_%s" % chain.lower() - if append: - loc = "-A" - else: - loc = "-I" - _execute('sudo', 'iptables', '--delete', chain, *cmd, - check_exit_code=False) - _execute('sudo', 'iptables', loc, chain, *cmd) - - -def _remove_rule(chain, *cmd): - """Remove iptables rule""" - if FLAGS.use_nova_chains: - chain = "%s" % chain.lower() - _execute('sudo', 'iptables', '--delete', chain, *cmd) - - def _dnsmasq_cmd(net): """Builds dnsmasq command""" cmd = ['sudo', '-E', 'dnsmasq', @@ -509,3 +765,12 @@ def _ra_pid_for(bridge): if os.path.exists(pid_file): with open(pid_file, 'r') as f: return int(f.read()) + + +def _ip_bridge_cmd(action, params, device): + """Build commands to add/del ips to bridges/devices""" + + cmd = ['sudo', 'ip', 'addr', action] + cmd.extend(params) + cmd.extend(['dev', device]) + return cmd diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index 66e46c1b9..ed3dfe1c0 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -26,10 +26,14 @@ import datetime from nova import db from nova import exception from nova import flags +from nova import log as logging +from nova import rpc +from nova.compute import power_state FLAGS = flags.FLAGS flags.DEFINE_integer('service_down_time', 60, 'maximum time since last checkin for up service') +flags.DECLARE('instances_path', 'nova.compute.manager') class NoValidHost(exception.Error): @@ -64,3 +68,236 @@ class Scheduler(object): def schedule(self, context, topic, *_args, **_kwargs): """Must override at least this method for scheduler to work.""" raise NotImplementedError(_("Must implement a fallback schedule")) + + def schedule_live_migration(self, context, instance_id, dest): + """Live migration scheduling method. + + :param context: + :param instance_id: + :param dest: destination host + :return: + The host where instance is running currently. + Then scheduler send request that host. + + """ + + # Whether instance exists and is running. + instance_ref = db.instance_get(context, instance_id) + + # Checking instance. + self._live_migration_src_check(context, instance_ref) + + # Checking destination host. + self._live_migration_dest_check(context, instance_ref, dest) + + # Common checking. + self._live_migration_common_check(context, instance_ref, dest) + + # Changing instance_state. + db.instance_set_state(context, + instance_id, + power_state.PAUSED, + 'migrating') + + # Changing volume state + for volume_ref in instance_ref['volumes']: + db.volume_update(context, + volume_ref['id'], + {'status': 'migrating'}) + + # Return value is necessary to send request to src + # Check _schedule() in detail. + src = instance_ref['host'] + return src + + def _live_migration_src_check(self, context, instance_ref): + """Live migration check routine (for src host). + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + + """ + + # Checking instance is running. + if (power_state.RUNNING != instance_ref['state'] or \ + 'running' != instance_ref['state_description']): + ec2_id = instance_ref['hostname'] + raise exception.Invalid(_('Instance(%s) is not running') % ec2_id) + + # Checing volume node is running when any volumes are mounted + # to the instance. + if len(instance_ref['volumes']) != 0: + services = db.service_get_all_by_topic(context, 'volume') + if len(services) < 1 or not self.service_is_up(services[0]): + raise exception.Invalid(_("volume node is not alive" + "(time synchronize problem?)")) + + # Checking src host exists and compute node + src = instance_ref['host'] + services = db.service_get_all_compute_by_host(context, src) + + # Checking src host is alive. + if not self.service_is_up(services[0]): + raise exception.Invalid(_("%s is not alive(time " + "synchronize problem?)") % src) + + def _live_migration_dest_check(self, context, instance_ref, dest): + """Live migration check routine (for destination host). + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + # Checking dest exists and compute node. + dservice_refs = db.service_get_all_compute_by_host(context, dest) + dservice_ref = dservice_refs[0] + + # Checking dest host is alive. + if not self.service_is_up(dservice_ref): + raise exception.Invalid(_("%s is not alive(time " + "synchronize problem?)") % dest) + + # Checking whether The host where instance is running + # and dest is not same. + src = instance_ref['host'] + if dest == src: + ec2_id = instance_ref['hostname'] + raise exception.Invalid(_("%(dest)s is where %(ec2_id)s is " + "running now. choose other host.") + % locals()) + + # Checking dst host still has enough capacities. + self.assert_compute_node_has_enough_resources(context, + instance_ref, + dest) + + def _live_migration_common_check(self, context, instance_ref, dest): + """Live migration common check routine. + + Below checkings are followed by + http://wiki.libvirt.org/page/TodoPreMigrationChecks + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + # Checking shared storage connectivity + self.mounted_on_same_shared_storage(context, instance_ref, dest) + + # Checking dest exists. + dservice_refs = db.service_get_all_compute_by_host(context, dest) + dservice_ref = dservice_refs[0]['compute_node'][0] + + # Checking original host( where instance was launched at) exists. + try: + oservice_refs = db.service_get_all_compute_by_host(context, + instance_ref['launched_on']) + except exception.NotFound: + raise exception.Invalid(_("host %s where instance was launched " + "does not exist.") + % instance_ref['launched_on']) + oservice_ref = oservice_refs[0]['compute_node'][0] + + # Checking hypervisor is same. + orig_hypervisor = oservice_ref['hypervisor_type'] + dest_hypervisor = dservice_ref['hypervisor_type'] + if orig_hypervisor != dest_hypervisor: + raise exception.Invalid(_("Different hypervisor type" + "(%(orig_hypervisor)s->" + "%(dest_hypervisor)s)')" % locals())) + + # Checkng hypervisor version. + orig_hypervisor = oservice_ref['hypervisor_version'] + dest_hypervisor = dservice_ref['hypervisor_version'] + if orig_hypervisor > dest_hypervisor: + raise exception.Invalid(_("Older hypervisor version" + "(%(orig_hypervisor)s->" + "%(dest_hypervisor)s)") % locals()) + + # Checking cpuinfo. + try: + rpc.call(context, + db.queue_get_for(context, FLAGS.compute_topic, dest), + {"method": 'compare_cpu', + "args": {'cpu_info': oservice_ref['cpu_info']}}) + + except rpc.RemoteError: + src = instance_ref['host'] + logging.exception(_("host %(dest)s is not compatible with " + "original host %(src)s.") % locals()) + raise + + def assert_compute_node_has_enough_resources(self, context, + instance_ref, dest): + """Checks if destination host has enough resource for live migration. + + Currently, only memory checking has been done. + If storage migration(block migration, meaning live-migration + without any shared storage) will be available, local storage + checking is also necessary. + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + # Getting instance information + ec2_id = instance_ref['hostname'] + + # Getting host information + service_refs = db.service_get_all_compute_by_host(context, dest) + compute_node_ref = service_refs[0]['compute_node'][0] + + mem_total = int(compute_node_ref['memory_mb']) + mem_used = int(compute_node_ref['memory_mb_used']) + mem_avail = mem_total - mem_used + mem_inst = instance_ref['memory_mb'] + if mem_avail <= mem_inst: + raise exception.NotEmpty(_("Unable to migrate %(ec2_id)s " + "to destination: %(dest)s " + "(host:%(mem_avail)s " + "<= instance:%(mem_inst)s)") + % locals()) + + def mounted_on_same_shared_storage(self, context, instance_ref, dest): + """Check if the src and dest host mount same shared storage. + + At first, dest host creates temp file, and src host can see + it if they mounts same shared storage. Then src host erase it. + + :param context: security context + :param instance_ref: nova.db.sqlalchemy.models.Instance object + :param dest: destination host + + """ + + src = instance_ref['host'] + dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest) + src_t = db.queue_get_for(context, FLAGS.compute_topic, src) + + try: + # create tmpfile at dest host + filename = rpc.call(context, dst_t, + {"method": 'create_shared_storage_test_file'}) + + # make sure existence at src host. + rpc.call(context, src_t, + {"method": 'check_shared_storage_test_file', + "args": {'filename': filename}}) + + except rpc.RemoteError: + ipath = FLAGS.instances_path + logging.error(_("Cannot confirm tmpfile at %(ipath)s is on " + "same shared storage between %(src)s " + "and %(dest)s.") % locals()) + raise + + finally: + rpc.call(context, dst_t, + {"method": 'cleanup_shared_storage_test_file', + "args": {'filename': filename}}) diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index c94397210..053a53356 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -77,3 +77,55 @@ class SchedulerManager(manager.Manager): {"method": method, "args": kwargs}) LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals()) + + # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin. + # Based on bexar design summit discussion, + # just put this here for bexar release. + def show_host_resources(self, context, host, *args): + """Shows the physical/usage resource given by hosts. + + :param context: security context + :param host: hostname + :returns: + example format is below. + {'resource':D, 'usage':{proj_id1:D, proj_id2:D}} + D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048} + + """ + + compute_ref = db.service_get_all_compute_by_host(context, host) + compute_ref = compute_ref[0] + + # Getting physical resource information + compute_node_ref = compute_ref['compute_node'][0] + resource = {'vcpus': compute_node_ref['vcpus'], + 'memory_mb': compute_node_ref['memory_mb'], + 'local_gb': compute_node_ref['local_gb'], + 'vcpus_used': compute_node_ref['vcpus_used'], + 'memory_mb_used': compute_node_ref['memory_mb_used'], + 'local_gb_used': compute_node_ref['local_gb_used']} + + # Getting usage resource information + usage = {} + instance_refs = db.instance_get_all_by_host(context, + compute_ref['host']) + if not instance_refs: + return {'resource': resource, 'usage': usage} + + project_ids = [i['project_id'] for i in instance_refs] + project_ids = list(set(project_ids)) + for project_id in project_ids: + vcpus = db.instance_get_vcpu_sum_by_host_and_project(context, + host, + project_id) + mem = db.instance_get_memory_sum_by_host_and_project(context, + host, + project_id) + hdd = db.instance_get_disk_sum_by_host_and_project(context, + host, + project_id) + usage[project_id] = {'vcpus': int(vcpus), + 'memory_mb': int(mem), + 'local_gb': int(hdd)} + + return {'resource': resource, 'usage': usage} diff --git a/nova/service.py b/nova/service.py index af20db01c..d60df987c 100644 --- a/nova/service.py +++ b/nova/service.py @@ -92,6 +92,9 @@ class Service(object): except exception.NotFound: self._create_service_ref(ctxt) + if 'nova-compute' == self.binary: + self.manager.update_available_resource(ctxt) + conn1 = rpc.Connection.instance(new=True) conn2 = rpc.Connection.instance(new=True) if self.report_interval: diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index 2c4e57246..e50d11a3d 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -27,7 +27,6 @@ from paste import urlmap from glance import client as glance_client from glance.common import exception as glance_exc -from nova import auth from nova import context from nova import exception as exc from nova import flags @@ -36,6 +35,7 @@ import nova.api.openstack.auth from nova.api import openstack from nova.api.openstack import auth from nova.api.openstack import ratelimiting +from nova.auth.manager import User, Project from nova.image import glance from nova.image import local from nova.image import service @@ -229,19 +229,97 @@ class FakeAuthDatabase(object): class FakeAuthManager(object): auth_data = {} + projects = {} + + @classmethod + def clear_fakes(cls): + cls.auth_data = {} + cls.projects = {} + + @classmethod + def reset_fake_data(cls): + cls.auth_data = dict(acc1=User('guy1', 'guy1', 'acc1', + 'fortytwo!', False)) + cls.projects = dict(testacct=Project('testacct', + 'testacct', + 'guy1', + 'test', + [])) def add_user(self, key, user): FakeAuthManager.auth_data[key] = user + def get_users(self): + return FakeAuthManager.auth_data.values() + def get_user(self, uid): for k, v in FakeAuthManager.auth_data.iteritems(): if v.id == uid: return v return None - def get_project(self, pid): + def delete_user(self, uid): + for k, v in FakeAuthManager.auth_data.items(): + if v.id == uid: + del FakeAuthManager.auth_data[k] return None + def create_user(self, name, access=None, secret=None, admin=False): + u = User(name, name, access, secret, admin) + FakeAuthManager.auth_data[access] = u + return u + + def modify_user(self, user_id, access=None, secret=None, admin=None): + user = None + for k, v in FakeAuthManager.auth_data.iteritems(): + if v.id == user_id: + user = v + if user: + user.access = access + user.secret = secret + if admin is not None: + user.admin = admin + + def is_admin(self, user): + return user.admin + + def is_project_member(self, user, project): + return ((user.id in project.member_ids) or + (user.id == project.project_manager_id)) + + def create_project(self, name, manager_user, description=None, + member_users=None): + member_ids = [User.safe_id(m) for m in member_users] \ + if member_users else [] + p = Project(name, name, User.safe_id(manager_user), + description, member_ids) + FakeAuthManager.projects[name] = p + return p + + def delete_project(self, pid): + if pid in FakeAuthManager.projects: + del FakeAuthManager.projects[pid] + + def modify_project(self, project, manager_user=None, description=None): + p = FakeAuthManager.projects.get(project) + p.project_manager_id = User.safe_id(manager_user) + p.description = description + + def get_project(self, pid): + p = FakeAuthManager.projects.get(pid) + if p: + return p + else: + raise exc.NotFound + + def get_projects(self, user=None): + if not user: + return FakeAuthManager.projects.values() + else: + return [p for p in FakeAuthManager.projects.values() + if (user.id in p.member_ids) or + (user.id == p.project_manager_id)] + def get_user_from_access_key(self, key): return FakeAuthManager.auth_data.get(key, None) diff --git a/nova/tests/api/openstack/test_accounts.py b/nova/tests/api/openstack/test_accounts.py new file mode 100644 index 000000000..60edce769 --- /dev/null +++ b/nova/tests/api/openstack/test_accounts.py @@ -0,0 +1,125 @@ +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import json + +import stubout +import webob + +import nova.api +import nova.api.openstack.auth +from nova import context +from nova import flags +from nova import test +from nova.auth.manager import User +from nova.tests.api.openstack import fakes + + +FLAGS = flags.FLAGS +FLAGS.verbose = True + + +def fake_init(self): + self.manager = fakes.FakeAuthManager() + + +def fake_admin_check(self, req): + return True + + +class AccountsTest(test.TestCase): + def setUp(self): + super(AccountsTest, self).setUp() + self.stubs = stubout.StubOutForTesting() + self.stubs.Set(nova.api.openstack.accounts.Controller, '__init__', + fake_init) + self.stubs.Set(nova.api.openstack.accounts.Controller, '_check_admin', + fake_admin_check) + fakes.FakeAuthManager.clear_fakes() + fakes.FakeAuthDatabase.data = {} + fakes.stub_out_networking(self.stubs) + fakes.stub_out_rate_limiting(self.stubs) + fakes.stub_out_auth(self.stubs) + + self.allow_admin = FLAGS.allow_admin_api + FLAGS.allow_admin_api = True + fakemgr = fakes.FakeAuthManager() + joeuser = User('guy1', 'guy1', 'acc1', 'fortytwo!', False) + superuser = User('guy2', 'guy2', 'acc2', 'swordfish', True) + fakemgr.add_user(joeuser.access, joeuser) + fakemgr.add_user(superuser.access, superuser) + fakemgr.create_project('test1', joeuser) + fakemgr.create_project('test2', superuser) + + def tearDown(self): + self.stubs.UnsetAll() + FLAGS.allow_admin_api = self.allow_admin + super(AccountsTest, self).tearDown() + + def test_get_account(self): + req = webob.Request.blank('/v1.0/accounts/test1') + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + + self.assertEqual(res_dict['account']['id'], 'test1') + self.assertEqual(res_dict['account']['name'], 'test1') + self.assertEqual(res_dict['account']['manager'], 'guy1') + self.assertEqual(res.status_int, 200) + + def test_account_delete(self): + req = webob.Request.blank('/v1.0/accounts/test1') + req.method = 'DELETE' + res = req.get_response(fakes.wsgi_app()) + self.assertTrue('test1' not in fakes.FakeAuthManager.projects) + self.assertEqual(res.status_int, 200) + + def test_account_create(self): + body = dict(account=dict(description='test account', + manager='guy1')) + req = webob.Request.blank('/v1.0/accounts/newacct') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_dict['account']['id'], 'newacct') + self.assertEqual(res_dict['account']['name'], 'newacct') + self.assertEqual(res_dict['account']['description'], 'test account') + self.assertEqual(res_dict['account']['manager'], 'guy1') + self.assertTrue('newacct' in + fakes.FakeAuthManager.projects) + self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3) + + def test_account_update(self): + body = dict(account=dict(description='test account', + manager='guy2')) + req = webob.Request.blank('/v1.0/accounts/test1') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_dict['account']['id'], 'test1') + self.assertEqual(res_dict['account']['name'], 'test1') + self.assertEqual(res_dict['account']['description'], 'test account') + self.assertEqual(res_dict['account']['manager'], 'guy2') + self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2) diff --git a/nova/tests/api/openstack/test_adminapi.py b/nova/tests/api/openstack/test_adminapi.py index dfce1b127..4568cb9f5 100644 --- a/nova/tests/api/openstack/test_adminapi.py +++ b/nova/tests/api/openstack/test_adminapi.py @@ -35,7 +35,7 @@ class AdminAPITest(test.TestCase): def setUp(self): super(AdminAPITest, self).setUp() self.stubs = stubout.StubOutForTesting() - fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthManager.reset_fake_data() fakes.FakeAuthDatabase.data = {} fakes.stub_out_networking(self.stubs) fakes.stub_out_rate_limiting(self.stubs) diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py index ff8d42a14..aaaa4e415 100644 --- a/nova/tests/api/openstack/test_auth.py +++ b/nova/tests/api/openstack/test_auth.py @@ -65,7 +65,9 @@ class Test(test.TestCase): def test_authorize_token(self): f = fakes.FakeAuthManager() - f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None)) + u = nova.auth.manager.User(1, 'herp', None, None, None) + f.add_user('derp', u) + f.create_project('test', u) req = webob.Request.blank('/v1.0/', {'HTTP_HOST': 'foo'}) req.headers['X-Auth-User'] = 'herp' @@ -176,7 +178,9 @@ class TestLimiter(test.TestCase): def test_authorize_token(self): f = fakes.FakeAuthManager() - f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None)) + u = nova.auth.manager.User(1, 'herp', None, None, None) + f.add_user('derp', u) + f.create_project('test', u) req = webob.Request.blank('/v1.0/') req.headers['X-Auth-User'] = 'herp' diff --git a/nova/tests/api/openstack/test_flavors.py b/nova/tests/api/openstack/test_flavors.py index 319767bb5..8280a505f 100644 --- a/nova/tests/api/openstack/test_flavors.py +++ b/nova/tests/api/openstack/test_flavors.py @@ -30,7 +30,7 @@ class FlavorsTest(test.TestCase): def setUp(self): super(FlavorsTest, self).setUp() self.stubs = stubout.StubOutForTesting() - fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthManager.reset_fake_data() fakes.FakeAuthDatabase.data = {} fakes.stub_out_networking(self.stubs) fakes.stub_out_rate_limiting(self.stubs) diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py index eb5039bdb..76f758929 100644 --- a/nova/tests/api/openstack/test_images.py +++ b/nova/tests/api/openstack/test_images.py @@ -205,7 +205,7 @@ class ImageControllerWithGlanceServiceTest(test.TestCase): self.orig_image_service = FLAGS.image_service FLAGS.image_service = 'nova.image.glance.GlanceImageService' self.stubs = stubout.StubOutForTesting() - fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthManager.reset_fake_data() fakes.FakeAuthDatabase.data = {} fakes.stub_out_networking(self.stubs) fakes.stub_out_rate_limiting(self.stubs) diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py index c1e05b18a..5d7a208e9 100644 --- a/nova/tests/api/openstack/test_servers.py +++ b/nova/tests/api/openstack/test_servers.py @@ -120,7 +120,7 @@ class ServersTest(test.TestCase): def setUp(self): super(ServersTest, self).setUp() self.stubs = stubout.StubOutForTesting() - fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthManager.reset_fake_data() fakes.FakeAuthDatabase.data = {} fakes.stub_out_networking(self.stubs) fakes.stub_out_rate_limiting(self.stubs) @@ -440,7 +440,8 @@ class ServersTest(test.TestCase): body = dict(server=dict( name='server_test', imageId=2, flavorId=2, metadata={}, personality={})) - req = webob.Request.blank('/v1.0/servers/1/inject_network_info') + req = webob.Request.blank( + '/v1.0/servers/1/inject_network_info') req.method = 'POST' req.content_type = 'application/json' req.body = json.dumps(body) diff --git a/nova/tests/api/openstack/test_users.py b/nova/tests/api/openstack/test_users.py new file mode 100644 index 000000000..2dda4319b --- /dev/null +++ b/nova/tests/api/openstack/test_users.py @@ -0,0 +1,141 @@ +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +import stubout +import webob + +import nova.api +import nova.api.openstack.auth +from nova import context +from nova import flags +from nova import test +from nova.auth.manager import User, Project +from nova.tests.api.openstack import fakes + + +FLAGS = flags.FLAGS +FLAGS.verbose = True + + +def fake_init(self): + self.manager = fakes.FakeAuthManager() + + +def fake_admin_check(self, req): + return True + + +class UsersTest(test.TestCase): + def setUp(self): + super(UsersTest, self).setUp() + self.stubs = stubout.StubOutForTesting() + self.stubs.Set(nova.api.openstack.users.Controller, '__init__', + fake_init) + self.stubs.Set(nova.api.openstack.users.Controller, '_check_admin', + fake_admin_check) + fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthManager.projects = dict(testacct=Project('testacct', + 'testacct', + 'guy1', + 'test', + [])) + fakes.FakeAuthDatabase.data = {} + fakes.stub_out_networking(self.stubs) + fakes.stub_out_rate_limiting(self.stubs) + fakes.stub_out_auth(self.stubs) + + self.allow_admin = FLAGS.allow_admin_api + FLAGS.allow_admin_api = True + fakemgr = fakes.FakeAuthManager() + fakemgr.add_user('acc1', User('guy1', 'guy1', 'acc1', + 'fortytwo!', False)) + fakemgr.add_user('acc2', User('guy2', 'guy2', 'acc2', + 'swordfish', True)) + + def tearDown(self): + self.stubs.UnsetAll() + FLAGS.allow_admin_api = self.allow_admin + super(UsersTest, self).tearDown() + + def test_get_user_list(self): + req = webob.Request.blank('/v1.0/users') + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(len(res_dict['users']), 2) + + def test_get_user_by_id(self): + req = webob.Request.blank('/v1.0/users/guy2') + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + + self.assertEqual(res_dict['user']['id'], 'guy2') + self.assertEqual(res_dict['user']['name'], 'guy2') + self.assertEqual(res_dict['user']['secret'], 'swordfish') + self.assertEqual(res_dict['user']['admin'], True) + self.assertEqual(res.status_int, 200) + + def test_user_delete(self): + req = webob.Request.blank('/v1.0/users/guy1') + req.method = 'DELETE' + res = req.get_response(fakes.wsgi_app()) + self.assertTrue('guy1' not in [u.id for u in + fakes.FakeAuthManager.auth_data.values()]) + self.assertEqual(res.status_int, 200) + + def test_user_create(self): + body = dict(user=dict(name='test_guy', + access='acc3', + secret='invasionIsInNormandy', + admin=True)) + req = webob.Request.blank('/v1.0/users') + req.headers["Content-Type"] = "application/json" + req.method = 'POST' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_dict['user']['id'], 'test_guy') + self.assertEqual(res_dict['user']['name'], 'test_guy') + self.assertEqual(res_dict['user']['access'], 'acc3') + self.assertEqual(res_dict['user']['secret'], 'invasionIsInNormandy') + self.assertEqual(res_dict['user']['admin'], True) + self.assertTrue('test_guy' in [u.id for u in + fakes.FakeAuthManager.auth_data.values()]) + self.assertEqual(len(fakes.FakeAuthManager.auth_data.values()), 3) + + def test_user_update(self): + body = dict(user=dict(name='guy2', + access='acc2', + secret='invasionIsInNormandy')) + req = webob.Request.blank('/v1.0/users/guy2') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_dict = json.loads(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_dict['user']['id'], 'guy2') + self.assertEqual(res_dict['user']['name'], 'guy2') + self.assertEqual(res_dict['user']['access'], 'acc2') + self.assertEqual(res_dict['user']['secret'], 'invasionIsInNormandy') + self.assertEqual(res_dict['user']['admin'], True) diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py index 4f4fabf12..38399bb3f 100644 --- a/nova/tests/api/openstack/test_zones.py +++ b/nova/tests/api/openstack/test_zones.py @@ -58,7 +58,7 @@ def zone_get_all_scheduler(*args): dict(id=1, api_url='http://example.com', username='bob', password='xxx'), dict(id=2, api_url='http://example.org', username='alice', - password='qwerty') + password='qwerty'), ] @@ -71,7 +71,7 @@ def zone_get_all_db(context): dict(id=1, api_url='http://example.com', username='bob', password='xxx'), dict(id=2, api_url='http://example.org', username='alice', - password='qwerty') + password='qwerty'), ] @@ -79,7 +79,7 @@ class ZonesTest(test.TestCase): def setUp(self): super(ZonesTest, self).setUp() self.stubs = stubout.StubOutForTesting() - fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthManager.reset_fake_data() fakes.FakeAuthDatabase.data = {} fakes.stub_out_networking(self.stubs) fakes.stub_out_rate_limiting(self.stubs) diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py index d760dc456..142f6b1c6 100644 --- a/nova/tests/db/fakes.py +++ b/nova/tests/db/fakes.py @@ -77,6 +77,7 @@ def stub_out_db_instance_api(stubs): 'mac_address': values['mac_address'], 'vcpus': type_data['vcpus'], 'local_gb': type_data['local_gb'], + 'os_type': values['os_type'] } return FakeModel(base_options) diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index 643b2e93a..e486050be 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -20,6 +20,7 @@ Tests For Compute """ import datetime +import mox from nova import compute from nova import context @@ -27,15 +28,20 @@ from nova import db from nova import exception from nova import flags from nova import log as logging +from nova import rpc from nova import test from nova import utils from nova.auth import manager from nova.compute import instance_types +from nova.compute import manager as compute_manager +from nova.compute import power_state +from nova.db.sqlalchemy import models from nova.image import local LOG = logging.getLogger('nova.tests.compute') FLAGS = flags.FLAGS flags.DECLARE('stub_network', 'nova.compute.manager') +flags.DECLARE('live_migration_retry_count', 'nova.compute.manager') class ComputeTestCase(test.TestCase): @@ -83,6 +89,41 @@ class ComputeTestCase(test.TestCase): 'project_id': self.project.id} return db.security_group_create(self.context, values) + def _get_dummy_instance(self): + """Get mock-return-value instance object + Use this when any testcase executed later than test_run_terminate + """ + vol1 = models.Volume() + vol1['id'] = 1 + vol2 = models.Volume() + vol2['id'] = 2 + instance_ref = models.Instance() + instance_ref['id'] = 1 + instance_ref['volumes'] = [vol1, vol2] + instance_ref['hostname'] = 'i-00000001' + instance_ref['host'] = 'dummy' + return instance_ref + + def test_create_instance_defaults_display_name(self): + """Verify that an instance cannot be created without a display_name.""" + cases = [dict(), dict(display_name=None)] + for instance in cases: + ref = self.compute_api.create(self.context, + FLAGS.default_instance_type, None, **instance) + try: + self.assertNotEqual(ref[0]['display_name'], None) + finally: + db.instance_destroy(self.context, ref[0]['id']) + + def test_create_instance_associates_security_groups(self): + """Make sure create associates security groups""" + group = self._create_group() + instance_ref = models.Instance() + instance_ref['id'] = 1 + instance_ref['volumes'] = [{'id': 1}, {'id': 2}] + instance_ref['hostname'] = 'i-00000001' + return instance_ref + def test_create_instance_defaults_display_name(self): """Verify that an instance cannot be created without a display_name.""" cases = [dict(), dict(display_name=None)] @@ -301,3 +342,256 @@ class ComputeTestCase(test.TestCase): self.compute.terminate_instance(self.context, instance_id) type = instance_types.get_by_flavor_id("1") self.assertEqual(type, 'm1.tiny') + + def _setup_other_managers(self): + self.volume_manager = utils.import_object(FLAGS.volume_manager) + self.network_manager = utils.import_object(FLAGS.network_manager) + self.compute_driver = utils.import_object(FLAGS.compute_driver) + + def test_pre_live_migration_instance_has_no_fixed_ip(self): + """Confirm raising exception if instance doesn't have fixed_ip.""" + instance_ref = self._get_dummy_instance() + c = context.get_admin_context() + i_id = instance_ref['id'] + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_id).AndReturn(instance_ref) + dbmock.instance_get_fixed_address(c, i_id).AndReturn(None) + + self.compute.db = dbmock + self.mox.ReplayAll() + self.assertRaises(exception.NotFound, + self.compute.pre_live_migration, + c, instance_ref['id']) + + def test_pre_live_migration_instance_has_volume(self): + """Confirm setup_compute_volume is called when volume is mounted.""" + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + + self._setup_other_managers() + dbmock = self.mox.CreateMock(db) + volmock = self.mox.CreateMock(self.volume_manager) + netmock = self.mox.CreateMock(self.network_manager) + drivermock = self.mox.CreateMock(self.compute_driver) + + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy') + for i in range(len(i_ref['volumes'])): + vid = i_ref['volumes'][i]['id'] + volmock.setup_compute_volume(c, vid).InAnyOrder('g1') + netmock.setup_compute_network(c, i_ref['id']) + drivermock.ensure_filtering_rules_for_instance(i_ref) + + self.compute.db = dbmock + self.compute.volume_manager = volmock + self.compute.network_manager = netmock + self.compute.driver = drivermock + + self.mox.ReplayAll() + ret = self.compute.pre_live_migration(c, i_ref['id']) + self.assertEqual(ret, None) + + def test_pre_live_migration_instance_has_no_volume(self): + """Confirm log meg when instance doesn't mount any volumes.""" + i_ref = self._get_dummy_instance() + i_ref['volumes'] = [] + c = context.get_admin_context() + + self._setup_other_managers() + dbmock = self.mox.CreateMock(db) + netmock = self.mox.CreateMock(self.network_manager) + drivermock = self.mox.CreateMock(self.compute_driver) + + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy') + self.mox.StubOutWithMock(compute_manager.LOG, 'info') + compute_manager.LOG.info(_("%s has no volume."), i_ref['hostname']) + netmock.setup_compute_network(c, i_ref['id']) + drivermock.ensure_filtering_rules_for_instance(i_ref) + + self.compute.db = dbmock + self.compute.network_manager = netmock + self.compute.driver = drivermock + + self.mox.ReplayAll() + ret = self.compute.pre_live_migration(c, i_ref['id']) + self.assertEqual(ret, None) + + def test_pre_live_migration_setup_compute_node_fail(self): + """Confirm operation setup_compute_network() fails. + + It retries and raise exception when timeout exceeded. + + """ + + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + + self._setup_other_managers() + dbmock = self.mox.CreateMock(db) + netmock = self.mox.CreateMock(self.network_manager) + volmock = self.mox.CreateMock(self.volume_manager) + + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy') + for i in range(len(i_ref['volumes'])): + volmock.setup_compute_volume(c, i_ref['volumes'][i]['id']) + for i in range(FLAGS.live_migration_retry_count): + netmock.setup_compute_network(c, i_ref['id']).\ + AndRaise(exception.ProcessExecutionError()) + + self.compute.db = dbmock + self.compute.network_manager = netmock + self.compute.volume_manager = volmock + + self.mox.ReplayAll() + self.assertRaises(exception.ProcessExecutionError, + self.compute.pre_live_migration, + c, i_ref['id']) + + def test_live_migration_works_correctly_with_volume(self): + """Confirm check_for_export to confirm volume health check.""" + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + self.mox.StubOutWithMock(rpc, 'call') + rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export", + "args": {'instance_id': i_ref['id']}}) + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}) + self.mox.StubOutWithMock(self.compute.driver, 'live_migration') + self.compute.driver.live_migration(c, i_ref, i_ref['host'], + self.compute.post_live_migration, + self.compute.recover_live_migration) + + self.compute.db = dbmock + self.mox.ReplayAll() + ret = self.compute.live_migration(c, i_ref['id'], i_ref['host']) + self.assertEqual(ret, None) + + def test_live_migration_dest_raises_exception(self): + """Confirm exception when pre_live_migration fails.""" + i_ref = self._get_dummy_instance() + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + self.mox.StubOutWithMock(rpc, 'call') + rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export", + "args": {'instance_id': i_ref['id']}}) + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}).\ + AndRaise(rpc.RemoteError('', '', '')) + dbmock.instance_update(c, i_ref['id'], {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': i_ref['host']}) + for v in i_ref['volumes']: + dbmock.volume_update(c, v['id'], {'status': 'in-use'}) + + self.compute.db = dbmock + self.mox.ReplayAll() + self.assertRaises(rpc.RemoteError, + self.compute.live_migration, + c, i_ref['id'], i_ref['host']) + + def test_live_migration_dest_raises_exception_no_volume(self): + """Same as above test(input pattern is different) """ + i_ref = self._get_dummy_instance() + i_ref['volumes'] = [] + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + self.mox.StubOutWithMock(rpc, 'call') + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}).\ + AndRaise(rpc.RemoteError('', '', '')) + dbmock.instance_update(c, i_ref['id'], {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': i_ref['host']}) + + self.compute.db = dbmock + self.mox.ReplayAll() + self.assertRaises(rpc.RemoteError, + self.compute.live_migration, + c, i_ref['id'], i_ref['host']) + + def test_live_migration_works_correctly_no_volume(self): + """Confirm live_migration() works as expected correctly.""" + i_ref = self._get_dummy_instance() + i_ref['volumes'] = [] + c = context.get_admin_context() + topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) + + dbmock = self.mox.CreateMock(db) + dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) + self.mox.StubOutWithMock(rpc, 'call') + dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\ + AndReturn(topic) + rpc.call(c, topic, {"method": "pre_live_migration", + "args": {'instance_id': i_ref['id']}}) + self.mox.StubOutWithMock(self.compute.driver, 'live_migration') + self.compute.driver.live_migration(c, i_ref, i_ref['host'], + self.compute.post_live_migration, + self.compute.recover_live_migration) + + self.compute.db = dbmock + self.mox.ReplayAll() + ret = self.compute.live_migration(c, i_ref['id'], i_ref['host']) + self.assertEqual(ret, None) + + def test_post_live_migration_working_correctly(self): + """Confirm post_live_migration() works as expected correctly.""" + dest = 'desthost' + flo_addr = '1.2.1.2' + + # Preparing datas + c = context.get_admin_context() + instance_id = self._create_instance() + i_ref = db.instance_get(c, instance_id) + db.instance_update(c, i_ref['id'], {'state_description': 'migrating', + 'state': power_state.PAUSED}) + v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id}) + fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1', + 'instance_id': instance_id}) + fix_ref = db.fixed_ip_get_by_address(c, fix_addr) + flo_ref = db.floating_ip_create(c, {'address': flo_addr, + 'fixed_ip_id': fix_ref['id']}) + # reload is necessary before setting mocks + i_ref = db.instance_get(c, instance_id) + + # Preparing mocks + self.mox.StubOutWithMock(self.compute.volume_manager, + 'remove_compute_volume') + for v in i_ref['volumes']: + self.compute.volume_manager.remove_compute_volume(c, v['id']) + self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance') + self.compute.driver.unfilter_instance(i_ref) + + # executing + self.mox.ReplayAll() + ret = self.compute.post_live_migration(c, i_ref, dest) + + # make sure every data is rewritten to dest + i_ref = db.instance_get(c, i_ref['id']) + c1 = (i_ref['host'] == dest) + flo_refs = db.floating_ip_get_all_by_host(c, dest) + c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr) + + # post operaton + self.assertTrue(c1 and c2) + db.instance_destroy(c, instance_id) + db.volume_destroy(c, v_ref['id']) + db.floating_ip_destroy(c, flo_addr) diff --git a/nova/tests/test_network.py b/nova/tests/test_network.py index 19099ff4c..1e634b388 100644 --- a/nova/tests/test_network.py +++ b/nova/tests/test_network.py @@ -20,6 +20,7 @@ Unit Tests for network code """ import IPy import os +import time from nova import context from nova import db @@ -29,11 +30,153 @@ from nova import log as logging from nova import test from nova import utils from nova.auth import manager +from nova.network import linux_net FLAGS = flags.FLAGS LOG = logging.getLogger('nova.tests.network') +class IptablesManagerTestCase(test.TestCase): + sample_filter = ['#Generated by iptables-save on Fri Feb 18 15:17:05 2011', + '*filter', + ':INPUT ACCEPT [2223527:305688874]', + ':FORWARD ACCEPT [0:0]', + ':OUTPUT ACCEPT [2172501:140856656]', + ':nova-compute-FORWARD - [0:0]', + ':nova-compute-INPUT - [0:0]', + ':nova-compute-local - [0:0]', + ':nova-compute-OUTPUT - [0:0]', + ':nova-filter-top - [0:0]', + '-A FORWARD -j nova-filter-top ', + '-A OUTPUT -j nova-filter-top ', + '-A nova-filter-top -j nova-compute-local ', + '-A INPUT -j nova-compute-INPUT ', + '-A OUTPUT -j nova-compute-OUTPUT ', + '-A FORWARD -j nova-compute-FORWARD ', + '-A INPUT -i virbr0 -p udp -m udp --dport 53 -j ACCEPT ', + '-A INPUT -i virbr0 -p tcp -m tcp --dport 53 -j ACCEPT ', + '-A INPUT -i virbr0 -p udp -m udp --dport 67 -j ACCEPT ', + '-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ', + '-A FORWARD -s 192.168.122.0/24 -i virbr0 -j ACCEPT ', + '-A FORWARD -i virbr0 -o virbr0 -j ACCEPT ', + '-A FORWARD -o virbr0 -j REJECT --reject-with ' + 'icmp-port-unreachable ', + '-A FORWARD -i virbr0 -j REJECT --reject-with ' + 'icmp-port-unreachable ', + 'COMMIT', + '# Completed on Fri Feb 18 15:17:05 2011'] + + sample_nat = ['# Generated by iptables-save on Fri Feb 18 15:17:05 2011', + '*nat', + ':PREROUTING ACCEPT [3936:762355]', + ':INPUT ACCEPT [2447:225266]', + ':OUTPUT ACCEPT [63491:4191863]', + ':POSTROUTING ACCEPT [63112:4108641]', + ':nova-compute-OUTPUT - [0:0]', + ':nova-compute-floating-ip-snat - [0:0]', + ':nova-compute-SNATTING - [0:0]', + ':nova-compute-PREROUTING - [0:0]', + ':nova-compute-POSTROUTING - [0:0]', + ':nova-postrouting-bottom - [0:0]', + '-A PREROUTING -j nova-compute-PREROUTING ', + '-A OUTPUT -j nova-compute-OUTPUT ', + '-A POSTROUTING -j nova-compute-POSTROUTING ', + '-A POSTROUTING -j nova-postrouting-bottom ', + '-A nova-postrouting-bottom -j nova-compute-SNATTING ', + '-A nova-compute-SNATTING -j nova-compute-floating-ip-snat ', + 'COMMIT', + '# Completed on Fri Feb 18 15:17:05 2011'] + + def setUp(self): + super(IptablesManagerTestCase, self).setUp() + self.manager = linux_net.IptablesManager() + + def test_filter_rules_are_wrapped(self): + current_lines = self.sample_filter + + table = self.manager.ipv4['filter'] + table.add_rule('FORWARD', '-s 1.2.3.4/5 -j DROP') + new_lines = self.manager._modify_rules(current_lines, table) + self.assertTrue('-A run_tests.py-FORWARD ' + '-s 1.2.3.4/5 -j DROP' in new_lines) + + table.remove_rule('FORWARD', '-s 1.2.3.4/5 -j DROP') + new_lines = self.manager._modify_rules(current_lines, table) + self.assertTrue('-A run_tests.py-FORWARD ' + '-s 1.2.3.4/5 -j DROP' not in new_lines) + + def test_nat_rules(self): + current_lines = self.sample_nat + new_lines = self.manager._modify_rules(current_lines, + self.manager.ipv4['nat']) + + for line in [':nova-compute-OUTPUT - [0:0]', + ':nova-compute-floating-ip-snat - [0:0]', + ':nova-compute-SNATTING - [0:0]', + ':nova-compute-PREROUTING - [0:0]', + ':nova-compute-POSTROUTING - [0:0]']: + self.assertTrue(line in new_lines, "One of nova-compute's chains " + "went missing.") + + seen_lines = set() + for line in new_lines: + line = line.strip() + self.assertTrue(line not in seen_lines, + "Duplicate line: %s" % line) + seen_lines.add(line) + + last_postrouting_line = '' + + for line in new_lines: + if line.startswith('-A POSTROUTING'): + last_postrouting_line = line + + self.assertTrue('-j nova-postrouting-bottom' in last_postrouting_line, + "Last POSTROUTING rule does not jump to " + "nova-postouting-bottom: %s" % last_postrouting_line) + + for chain in ['POSTROUTING', 'PREROUTING', 'OUTPUT']: + self.assertTrue('-A %s -j run_tests.py-%s' \ + % (chain, chain) in new_lines, + "Built-in chain %s not wrapped" % (chain,)) + + def test_filter_rules(self): + current_lines = self.sample_filter + new_lines = self.manager._modify_rules(current_lines, + self.manager.ipv4['filter']) + + for line in [':nova-compute-FORWARD - [0:0]', + ':nova-compute-INPUT - [0:0]', + ':nova-compute-local - [0:0]', + ':nova-compute-OUTPUT - [0:0]']: + self.assertTrue(line in new_lines, "One of nova-compute's chains" + " went missing.") + + seen_lines = set() + for line in new_lines: + line = line.strip() + self.assertTrue(line not in seen_lines, + "Duplicate line: %s" % line) + seen_lines.add(line) + + for chain in ['FORWARD', 'OUTPUT']: + for line in new_lines: + if line.startswith('-A %s' % chain): + self.assertTrue('-j nova-filter-top' in line, + "First %s rule does not " + "jump to nova-filter-top" % chain) + break + + self.assertTrue('-A nova-filter-top ' + '-j run_tests.py-local' in new_lines, + "nova-filter-top does not jump to wrapped local chain") + + for chain in ['INPUT', 'OUTPUT', 'FORWARD']: + self.assertTrue('-A %s -j run_tests.py-%s' \ + % (chain, chain) in new_lines, + "Built-in chain %s not wrapped" % (chain,)) + + class NetworkTestCase(test.TestCase): """Test cases for network code""" def setUp(self): @@ -321,6 +464,31 @@ class NetworkTestCase(test.TestCase): network['id']) self.assertEqual(ip_count, num_available_ips) + def test_dhcp_lease_output(self): + admin_ctxt = context.get_admin_context() + address = self._create_address(0, self.instance_id) + lease_ip(address) + network_ref = db.network_get_by_instance(admin_ctxt, self.instance_id) + leases = linux_net.get_dhcp_leases(context.get_admin_context(), + network_ref['id']) + for line in leases.split('\n'): + seconds, mac, ip, hostname, client_id = line.split(' ') + self.assertTrue(int(seconds) > time.time(), 'Lease expires in ' + 'the past') + octets = mac.split(':') + self.assertEqual(len(octets), 6, "Wrong number of octets " + "in %s" % (max,)) + for octet in octets: + self.assertEqual(len(octet), 2, "Oddly sized octet: %s" + % (octet,)) + # This will throw an exception if the octet is invalid + int(octet, 16) + + # And this will raise an exception in case of an invalid IP + IPy.IP(ip) + + release_ip(address) + def is_allocated_in_project(address, project_id): """Returns true if address is in specified project""" diff --git a/nova/tests/test_scheduler.py b/nova/tests/test_scheduler.py index bb279ac4b..244e43bd9 100644 --- a/nova/tests/test_scheduler.py +++ b/nova/tests/test_scheduler.py @@ -20,10 +20,12 @@ Tests For Scheduler """ import datetime +import mox from mox import IgnoreArg from nova import context from nova import db +from nova import exception from nova import flags from nova import service from nova import test @@ -32,11 +34,14 @@ from nova import utils from nova.auth import manager as auth_manager from nova.scheduler import manager from nova.scheduler import driver +from nova.compute import power_state +from nova.db.sqlalchemy import models FLAGS = flags.FLAGS flags.DECLARE('max_cores', 'nova.scheduler.simple') flags.DECLARE('stub_network', 'nova.compute.manager') +flags.DECLARE('instances_path', 'nova.compute.manager') class TestDriver(driver.Scheduler): @@ -54,6 +59,34 @@ class SchedulerTestCase(test.TestCase): super(SchedulerTestCase, self).setUp() self.flags(scheduler_driver='nova.tests.test_scheduler.TestDriver') + def _create_compute_service(self): + """Create compute-manager(ComputeNode and Service record).""" + ctxt = context.get_admin_context() + dic = {'host': 'dummy', 'binary': 'nova-compute', 'topic': 'compute', + 'report_count': 0, 'availability_zone': 'dummyzone'} + s_ref = db.service_create(ctxt, dic) + + dic = {'service_id': s_ref['id'], + 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100, + 'vcpus_used': 16, 'memory_mb_used': 32, 'local_gb_used': 10, + 'hypervisor_type': 'qemu', 'hypervisor_version': 12003, + 'cpu_info': ''} + db.compute_node_create(ctxt, dic) + + return db.service_get(ctxt, s_ref['id']) + + def _create_instance(self, **kwargs): + """Create a test instance""" + ctxt = context.get_admin_context() + inst = {} + inst['user_id'] = 'admin' + inst['project_id'] = kwargs.get('project_id', 'fake') + inst['host'] = kwargs.get('host', 'dummy') + inst['vcpus'] = kwargs.get('vcpus', 1) + inst['memory_mb'] = kwargs.get('memory_mb', 10) + inst['local_gb'] = kwargs.get('local_gb', 20) + return db.instance_create(ctxt, inst) + def test_fallback(self): scheduler = manager.SchedulerManager() self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) @@ -76,6 +109,73 @@ class SchedulerTestCase(test.TestCase): self.mox.ReplayAll() scheduler.named_method(ctxt, 'topic', num=7) + def test_show_host_resources_host_not_exit(self): + """A host given as an argument does not exists.""" + + scheduler = manager.SchedulerManager() + dest = 'dummydest' + ctxt = context.get_admin_context() + + try: + scheduler.show_host_resources(ctxt, dest) + except exception.NotFound, e: + c1 = (e.message.find(_("does not exist or is not a " + "compute node.")) >= 0) + self.assertTrue(c1) + + def _dic_is_equal(self, dic1, dic2, keys=None): + """Compares 2 dictionary contents(Helper method)""" + if not keys: + keys = ['vcpus', 'memory_mb', 'local_gb', + 'vcpus_used', 'memory_mb_used', 'local_gb_used'] + + for key in keys: + if not (dic1[key] == dic2[key]): + return False + return True + + def test_show_host_resources_no_project(self): + """No instance are running on the given host.""" + + scheduler = manager.SchedulerManager() + ctxt = context.get_admin_context() + s_ref = self._create_compute_service() + + result = scheduler.show_host_resources(ctxt, s_ref['host']) + + # result checking + c1 = ('resource' in result and 'usage' in result) + compute_node = s_ref['compute_node'][0] + c2 = self._dic_is_equal(result['resource'], compute_node) + c3 = result['usage'] == {} + self.assertTrue(c1 and c2 and c3) + db.service_destroy(ctxt, s_ref['id']) + + def test_show_host_resources_works_correctly(self): + """Show_host_resources() works correctly as expected.""" + + scheduler = manager.SchedulerManager() + ctxt = context.get_admin_context() + s_ref = self._create_compute_service() + i_ref1 = self._create_instance(project_id='p-01', host=s_ref['host']) + i_ref2 = self._create_instance(project_id='p-02', vcpus=3, + host=s_ref['host']) + + result = scheduler.show_host_resources(ctxt, s_ref['host']) + + c1 = ('resource' in result and 'usage' in result) + compute_node = s_ref['compute_node'][0] + c2 = self._dic_is_equal(result['resource'], compute_node) + c3 = result['usage'].keys() == ['p-01', 'p-02'] + keys = ['vcpus', 'memory_mb', 'local_gb'] + c4 = self._dic_is_equal(result['usage']['p-01'], i_ref1, keys) + c5 = self._dic_is_equal(result['usage']['p-02'], i_ref2, keys) + self.assertTrue(c1 and c2 and c3 and c4 and c5) + + db.service_destroy(ctxt, s_ref['id']) + db.instance_destroy(ctxt, i_ref1['id']) + db.instance_destroy(ctxt, i_ref2['id']) + class ZoneSchedulerTestCase(test.TestCase): """Test case for zone scheduler""" @@ -161,9 +261,15 @@ class SimpleDriverTestCase(test.TestCase): inst['project_id'] = self.project.id inst['instance_type'] = 'm1.tiny' inst['mac_address'] = utils.generate_mac() + inst['vcpus'] = kwargs.get('vcpus', 1) inst['ami_launch_index'] = 0 - inst['vcpus'] = 1 inst['availability_zone'] = kwargs.get('availability_zone', None) + inst['host'] = kwargs.get('host', 'dummy') + inst['memory_mb'] = kwargs.get('memory_mb', 20) + inst['local_gb'] = kwargs.get('local_gb', 30) + inst['launched_on'] = kwargs.get('launghed_on', 'dummy') + inst['state_description'] = kwargs.get('state_description', 'running') + inst['state'] = kwargs.get('state', power_state.RUNNING) return db.instance_create(self.context, inst)['id'] def _create_volume(self): @@ -173,6 +279,211 @@ class SimpleDriverTestCase(test.TestCase): vol['availability_zone'] = 'test' return db.volume_create(self.context, vol)['id'] + def _create_compute_service(self, **kwargs): + """Create a compute service.""" + + dic = {'binary': 'nova-compute', 'topic': 'compute', + 'report_count': 0, 'availability_zone': 'dummyzone'} + dic['host'] = kwargs.get('host', 'dummy') + s_ref = db.service_create(self.context, dic) + if 'created_at' in kwargs.keys() or 'updated_at' in kwargs.keys(): + t = datetime.datetime.utcnow() - datetime.timedelta(0) + dic['created_at'] = kwargs.get('created_at', t) + dic['updated_at'] = kwargs.get('updated_at', t) + db.service_update(self.context, s_ref['id'], dic) + + dic = {'service_id': s_ref['id'], + 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100, + 'vcpus_used': 16, 'local_gb_used': 10, + 'hypervisor_type': 'qemu', 'hypervisor_version': 12003, + 'cpu_info': ''} + dic['memory_mb_used'] = kwargs.get('memory_mb_used', 32) + dic['hypervisor_type'] = kwargs.get('hypervisor_type', 'qemu') + dic['hypervisor_version'] = kwargs.get('hypervisor_version', 12003) + db.compute_node_create(self.context, dic) + return db.service_get(self.context, s_ref['id']) + + def test_doesnt_report_disabled_hosts_as_up(self): + """Ensures driver doesn't find hosts before they are enabled""" + # NOTE(vish): constructing service without create method + # because we are going to use it without queue + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute') + s2 = db.service_get_by_args(self.context, 'host2', 'nova-compute') + db.service_update(self.context, s1['id'], {'disabled': True}) + db.service_update(self.context, s2['id'], {'disabled': True}) + hosts = self.scheduler.driver.hosts_up(self.context, 'compute') + self.assertEqual(0, len(hosts)) + compute1.kill() + compute2.kill() + + def test_reports_enabled_hosts_as_up(self): + """Ensures driver can find the hosts that are up""" + # NOTE(vish): constructing service without create method + # because we are going to use it without queue + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + hosts = self.scheduler.driver.hosts_up(self.context, 'compute') + self.assertEqual(2, len(hosts)) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_instance(self): + """Ensures the host with less cores gets the next one""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + instance_id1 = self._create_instance() + compute1.run_instance(self.context, instance_id1) + instance_id2 = self._create_instance() + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual(host, 'host2') + compute1.terminate_instance(self.context, instance_id1) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + compute2.kill() + + def test_specific_host_gets_instance(self): + """Ensures if you set availability_zone it launches on that zone""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + instance_id1 = self._create_instance() + compute1.run_instance(self.context, instance_id1) + instance_id2 = self._create_instance(availability_zone='nova:host1') + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual('host1', host) + compute1.terminate_instance(self.context, instance_id1) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + compute2.kill() + + def test_wont_sechedule_if_specified_host_is_down(self): + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute') + now = datetime.datetime.utcnow() + delta = datetime.timedelta(seconds=FLAGS.service_down_time * 2) + past = now - delta + db.service_update(self.context, s1['id'], {'updated_at': past}) + instance_id2 = self._create_instance(availability_zone='nova:host1') + self.assertRaises(driver.WillNotSchedule, + self.scheduler.driver.schedule_run_instance, + self.context, + instance_id2) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + + def test_will_schedule_on_disabled_host_if_specified(self): + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute') + db.service_update(self.context, s1['id'], {'disabled': True}) + instance_id2 = self._create_instance(availability_zone='nova:host1') + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual('host1', host) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + + def test_too_many_cores(self): + """Ensures we don't go over max cores""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute1.start() + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2.start() + instance_ids1 = [] + instance_ids2 = [] + for index in xrange(FLAGS.max_cores): + instance_id = self._create_instance() + compute1.run_instance(self.context, instance_id) + instance_ids1.append(instance_id) + instance_id = self._create_instance() + compute2.run_instance(self.context, instance_id) + instance_ids2.append(instance_id) + instance_id = self._create_instance() + self.assertRaises(driver.NoValidHost, + self.scheduler.driver.schedule_run_instance, + self.context, + instance_id) + for instance_id in instance_ids1: + compute1.terminate_instance(self.context, instance_id) + for instance_id in instance_ids2: + compute2.terminate_instance(self.context, instance_id) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_volume(self): + """Ensures the host with less gigabytes gets the next one""" + volume1 = service.Service('host1', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume1.start() + volume2 = service.Service('host2', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume2.start() + volume_id1 = self._create_volume() + volume1.create_volume(self.context, volume_id1) + volume_id2 = self._create_volume() + host = self.scheduler.driver.schedule_create_volume(self.context, + volume_id2) + self.assertEqual(host, 'host2') + volume1.delete_volume(self.context, volume_id1) + db.volume_destroy(self.context, volume_id2) + dic = {'service_id': s_ref['id'], + 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100, + 'vcpus_used': 16, 'memory_mb_used': 12, 'local_gb_used': 10, + 'hypervisor_type': 'qemu', 'hypervisor_version': 12003, + 'cpu_info': ''} + def test_doesnt_report_disabled_hosts_as_up(self): """Ensures driver doesn't find hosts before they are enabled""" compute1 = self.start_service('compute', host='host1') @@ -316,3 +627,313 @@ class SimpleDriverTestCase(test.TestCase): volume2.delete_volume(self.context, volume_id) volume1.kill() volume2.kill() + + def test_scheduler_live_migration_with_volume(self): + """scheduler_live_migration() works correctly as expected. + + Also, checks instance state is changed from 'running' -> 'migrating'. + + """ + + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + dic = {'instance_id': instance_id, 'size': 1} + v_ref = db.volume_create(self.context, dic) + + # cannot check 2nd argument b/c the addresses of instance object + # is different. + driver_i = self.scheduler.driver + nocare = mox.IgnoreArg() + self.mox.StubOutWithMock(driver_i, '_live_migration_src_check') + self.mox.StubOutWithMock(driver_i, '_live_migration_dest_check') + self.mox.StubOutWithMock(driver_i, '_live_migration_common_check') + driver_i._live_migration_src_check(nocare, nocare) + driver_i._live_migration_dest_check(nocare, nocare, i_ref['host']) + driver_i._live_migration_common_check(nocare, nocare, i_ref['host']) + self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) + kwargs = {'instance_id': instance_id, 'dest': i_ref['host']} + rpc.cast(self.context, + db.queue_get_for(nocare, FLAGS.compute_topic, i_ref['host']), + {"method": 'live_migration', "args": kwargs}) + + self.mox.ReplayAll() + self.scheduler.live_migration(self.context, FLAGS.compute_topic, + instance_id=instance_id, + dest=i_ref['host']) + + i_ref = db.instance_get(self.context, instance_id) + self.assertTrue(i_ref['state_description'] == 'migrating') + db.instance_destroy(self.context, instance_id) + db.volume_destroy(self.context, v_ref['id']) + + def test_live_migration_src_check_instance_not_running(self): + """The instance given by instance_id is not running.""" + + instance_id = self._create_instance(state_description='migrating') + i_ref = db.instance_get(self.context, instance_id) + + try: + self.scheduler.driver._live_migration_src_check(self.context, + i_ref) + except exception.Invalid, e: + c = (e.message.find('is not running') > 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + + def test_live_migration_src_check_volume_node_not_alive(self): + """Raise exception when volume node is not alive.""" + + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + dic = {'instance_id': instance_id, 'size': 1} + v_ref = db.volume_create(self.context, {'instance_id': instance_id, + 'size': 1}) + t1 = datetime.datetime.utcnow() - datetime.timedelta(1) + dic = {'created_at': t1, 'updated_at': t1, 'binary': 'nova-volume', + 'topic': 'volume', 'report_count': 0} + s_ref = db.service_create(self.context, dic) + + try: + self.scheduler.driver.schedule_live_migration(self.context, + instance_id, + i_ref['host']) + except exception.Invalid, e: + c = (e.message.find('volume node is not alive') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.volume_destroy(self.context, v_ref['id']) + + def test_live_migration_src_check_compute_node_not_alive(self): + """Confirms src-compute node is alive.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + t = datetime.datetime.utcnow() - datetime.timedelta(10) + s_ref = self._create_compute_service(created_at=t, updated_at=t, + host=i_ref['host']) + + try: + self.scheduler.driver._live_migration_src_check(self.context, + i_ref) + except exception.Invalid, e: + c = (e.message.find('is not alive') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_src_check_works_correctly(self): + """Confirms this method finishes with no error.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host=i_ref['host']) + + ret = self.scheduler.driver._live_migration_src_check(self.context, + i_ref) + + self.assertTrue(ret == None) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_not_alive(self): + """Confirms exception raises in case dest host does not exist.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + t = datetime.datetime.utcnow() - datetime.timedelta(10) + s_ref = self._create_compute_service(created_at=t, updated_at=t, + host=i_ref['host']) + + try: + self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + i_ref['host']) + except exception.Invalid, e: + c = (e.message.find('is not alive') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_service_same_host(self): + """Confirms exceptioin raises in case dest and src is same host.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host=i_ref['host']) + + try: + self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + i_ref['host']) + except exception.Invalid, e: + c = (e.message.find('choose other host') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_service_lack_memory(self): + """Confirms exception raises when dest doesn't have enough memory.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host='somewhere', + memory_mb_used=12) + + try: + self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + 'somewhere') + except exception.NotEmpty, e: + c = (e.message.find('Unable to migrate') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_dest_check_service_works_correctly(self): + """Confirms method finishes with no error.""" + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + s_ref = self._create_compute_service(host='somewhere', + memory_mb_used=5) + + ret = self.scheduler.driver._live_migration_dest_check(self.context, + i_ref, + 'somewhere') + self.assertTrue(ret == None) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_common_check_service_orig_not_exists(self): + """Destination host does not exist.""" + + dest = 'dummydest' + # mocks for live_migration_common_check() + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + t1 = datetime.datetime.utcnow() - datetime.timedelta(10) + s_ref = self._create_compute_service(created_at=t1, updated_at=t1, + host=dest) + + # mocks for mounted_on_same_shared_storage() + fpath = '/test/20110127120000' + self.mox.StubOutWithMock(driver, 'rpc', use_mock_anything=True) + topic = FLAGS.compute_topic + driver.rpc.call(mox.IgnoreArg(), + db.queue_get_for(self.context, topic, dest), + {"method": 'create_shared_storage_test_file'}).AndReturn(fpath) + driver.rpc.call(mox.IgnoreArg(), + db.queue_get_for(mox.IgnoreArg(), topic, i_ref['host']), + {"method": 'check_shared_storage_test_file', + "args": {'filename': fpath}}) + driver.rpc.call(mox.IgnoreArg(), + db.queue_get_for(mox.IgnoreArg(), topic, dest), + {"method": 'cleanup_shared_storage_test_file', + "args": {'filename': fpath}}) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except exception.Invalid, e: + c = (e.message.find('does not exist') >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + + def test_live_migration_common_check_service_different_hypervisor(self): + """Original host and dest host has different hypervisor type.""" + dest = 'dummydest' + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + + # compute service for destination + s_ref = self._create_compute_service(host=i_ref['host']) + # compute service for original host + s_ref2 = self._create_compute_service(host=dest, hypervisor_type='xen') + + # mocks + driver = self.scheduler.driver + self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage') + driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except exception.Invalid, e: + c = (e.message.find(_('Different hypervisor type')) >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.service_destroy(self.context, s_ref2['id']) + + def test_live_migration_common_check_service_different_version(self): + """Original host and dest host has different hypervisor version.""" + dest = 'dummydest' + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + + # compute service for destination + s_ref = self._create_compute_service(host=i_ref['host']) + # compute service for original host + s_ref2 = self._create_compute_service(host=dest, + hypervisor_version=12002) + + # mocks + driver = self.scheduler.driver + self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage') + driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except exception.Invalid, e: + c = (e.message.find(_('Older hypervisor version')) >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.service_destroy(self.context, s_ref2['id']) + + def test_live_migration_common_check_checking_cpuinfo_fail(self): + """Raise excetion when original host doen't have compatible cpu.""" + + dest = 'dummydest' + instance_id = self._create_instance() + i_ref = db.instance_get(self.context, instance_id) + + # compute service for destination + s_ref = self._create_compute_service(host=i_ref['host']) + # compute service for original host + s_ref2 = self._create_compute_service(host=dest) + + # mocks + driver = self.scheduler.driver + self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage') + driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest) + self.mox.StubOutWithMock(rpc, 'call', use_mock_anything=True) + rpc.call(mox.IgnoreArg(), mox.IgnoreArg(), + {"method": 'compare_cpu', + "args": {'cpu_info': s_ref2['compute_node'][0]['cpu_info']}}).\ + AndRaise(rpc.RemoteError("doesn't have compatibility to", "", "")) + + self.mox.ReplayAll() + try: + self.scheduler.driver._live_migration_common_check(self.context, + i_ref, + dest) + except rpc.RemoteError, e: + c = (e.message.find(_("doesn't have compatibility to")) >= 0) + + self.assertTrue(c) + db.instance_destroy(self.context, instance_id) + db.service_destroy(self.context, s_ref['id']) + db.service_destroy(self.context, s_ref2['id']) diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py index 45d9afa6c..393f9d20b 100644 --- a/nova/tests/test_service.py +++ b/nova/tests/test_service.py @@ -30,6 +30,7 @@ from nova import rpc from nova import test from nova import service from nova import manager +from nova.compute import manager as compute_manager FLAGS = flags.FLAGS flags.DEFINE_string("fake_manager", "nova.tests.test_service.FakeManager", @@ -251,3 +252,43 @@ class ServiceTestCase(test.TestCase): serv.report_state() self.assert_(not serv.model_disconnected) + + def test_compute_can_update_available_resource(self): + """Confirm compute updates their record of compute-service table.""" + host = 'foo' + binary = 'nova-compute' + topic = 'compute' + + # Any mocks are not working without UnsetStubs() here. + self.mox.UnsetStubs() + ctxt = context.get_admin_context() + service_ref = db.service_create(ctxt, {'host': host, + 'binary': binary, + 'topic': topic}) + serv = service.Service(host, + binary, + topic, + 'nova.compute.manager.ComputeManager') + + # This testcase want to test calling update_available_resource. + # No need to call periodic call, then below variable must be set 0. + serv.report_interval = 0 + serv.periodic_interval = 0 + + # Creating mocks + self.mox.StubOutWithMock(service.rpc.Connection, 'instance') + service.rpc.Connection.instance(new=mox.IgnoreArg()) + service.rpc.Connection.instance(new=mox.IgnoreArg()) + self.mox.StubOutWithMock(serv.manager.driver, + 'update_available_resource') + serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) + + # Just doing start()-stop(), not confirm new db record is created, + # because update_available_resource() works only in + # libvirt environment. This testcase confirms + # update_available_resource() is called. Otherwise, mox complains. + self.mox.ReplayAll() + serv.start() + serv.stop() + + db.service_destroy(ctxt, service_ref['id']) diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py index 0625296a4..b214f5ce7 100644 --- a/nova/tests/test_virt.py +++ b/nova/tests/test_virt.py @@ -14,21 +14,29 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet +import mox import os +import re +import sys -import eventlet from xml.etree.ElementTree import fromstring as xml_to_tree from xml.dom.minidom import parseString as xml_to_dom from nova import context from nova import db +from nova import exception from nova import flags from nova import test from nova import utils from nova.api.ec2 import cloud from nova.auth import manager +from nova.compute import manager as compute_manager +from nova.compute import power_state +from nova.db.sqlalchemy import models from nova.virt import libvirt_conn +libvirt = None FLAGS = flags.FLAGS flags.DECLARE('instances_path', 'nova.compute.manager') @@ -103,11 +111,28 @@ class LibvirtConnTestCase(test.TestCase): libvirt_conn._late_load_cheetah() self.flags(fake_call=True) self.manager = manager.AuthManager() + + try: + pjs = self.manager.get_projects() + pjs = [p for p in pjs if p.name == 'fake'] + if 0 != len(pjs): + self.manager.delete_project(pjs[0]) + + users = self.manager.get_users() + users = [u for u in users if u.name == 'fake'] + if 0 != len(users): + self.manager.delete_user(users[0]) + except Exception, e: + pass + + users = self.manager.get_users() self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True) self.project = self.manager.create_project('fake', 'fake', 'fake') self.network = utils.import_object(FLAGS.network_manager) + self.context = context.get_admin_context() FLAGS.instances_path = '' + self.call_libvirt_dependant_setup = False test_ip = '10.11.12.13' test_instance = {'memory_kb': '1024000', @@ -119,6 +144,58 @@ class LibvirtConnTestCase(test.TestCase): 'bridge': 'br101', 'instance_type': 'm1.small'} + def lazy_load_library_exists(self): + """check if libvirt is available.""" + # try to connect libvirt. if fail, skip test. + try: + import libvirt + import libxml2 + except ImportError: + return False + global libvirt + libvirt = __import__('libvirt') + libvirt_conn.libvirt = __import__('libvirt') + libvirt_conn.libxml2 = __import__('libxml2') + return True + + def create_fake_libvirt_mock(self, **kwargs): + """Defining mocks for LibvirtConnection(libvirt is not used).""" + + # A fake libvirt.virConnect + class FakeLibvirtConnection(object): + pass + + # A fake libvirt_conn.IptablesFirewallDriver + class FakeIptablesFirewallDriver(object): + + def __init__(self, **kwargs): + pass + + def setattr(self, key, val): + self.__setattr__(key, val) + + # Creating mocks + fake = FakeLibvirtConnection() + fakeip = FakeIptablesFirewallDriver + # Customizing above fake if necessary + for key, val in kwargs.items(): + fake.__setattr__(key, val) + + # Inevitable mocks for libvirt_conn.LibvirtConnection + self.mox.StubOutWithMock(libvirt_conn.utils, 'import_class') + libvirt_conn.utils.import_class(mox.IgnoreArg()).AndReturn(fakeip) + self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn') + libvirt_conn.LibvirtConnection._conn = fake + + def create_service(self, **kwargs): + service_ref = {'host': kwargs.get('host', 'dummy'), + 'binary': 'nova-compute', + 'topic': 'compute', + 'report_count': 0, + 'availability_zone': 'zone'} + + return db.service_create(context.get_admin_context(), service_ref) + def test_xml_and_uri_no_ramdisk_no_kernel(self): instance_data = dict(self.test_instance) self._check_xml_and_uri(instance_data, @@ -258,8 +335,8 @@ class LibvirtConnTestCase(test.TestCase): expected_result, '%s failed common check %d' % (xml, i)) - # This test is supposed to make sure we don't override a specifically - # set uri + # This test is supposed to make sure we don't + # override a specifically set uri # # Deliberately not just assigning this string to FLAGS.libvirt_uri and # checking against that later on. This way we make sure the @@ -273,6 +350,150 @@ class LibvirtConnTestCase(test.TestCase): self.assertEquals(uri, testuri) db.instance_destroy(user_context, instance_ref['id']) + def test_update_available_resource_works_correctly(self): + """Confirm compute_node table is updated successfully.""" + org_path = FLAGS.instances_path = '' + FLAGS.instances_path = '.' + + # Prepare mocks + def getVersion(): + return 12003 + + def getType(): + return 'qemu' + + def listDomainsID(): + return [] + + service_ref = self.create_service(host='dummy') + self.create_fake_libvirt_mock(getVersion=getVersion, + getType=getType, + listDomainsID=listDomainsID) + self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, + 'get_cpu_info') + libvirt_conn.LibvirtConnection.get_cpu_info().AndReturn('cpuinfo') + + # Start test + self.mox.ReplayAll() + conn = libvirt_conn.LibvirtConnection(False) + conn.update_available_resource(self.context, 'dummy') + service_ref = db.service_get(self.context, service_ref['id']) + compute_node = service_ref['compute_node'][0] + + if sys.platform.upper() == 'LINUX2': + self.assertTrue(compute_node['vcpus'] >= 0) + self.assertTrue(compute_node['memory_mb'] > 0) + self.assertTrue(compute_node['local_gb'] > 0) + self.assertTrue(compute_node['vcpus_used'] == 0) + self.assertTrue(compute_node['memory_mb_used'] > 0) + self.assertTrue(compute_node['local_gb_used'] > 0) + self.assertTrue(len(compute_node['hypervisor_type']) > 0) + self.assertTrue(compute_node['hypervisor_version'] > 0) + else: + self.assertTrue(compute_node['vcpus'] >= 0) + self.assertTrue(compute_node['memory_mb'] == 0) + self.assertTrue(compute_node['local_gb'] > 0) + self.assertTrue(compute_node['vcpus_used'] == 0) + self.assertTrue(compute_node['memory_mb_used'] == 0) + self.assertTrue(compute_node['local_gb_used'] > 0) + self.assertTrue(len(compute_node['hypervisor_type']) > 0) + self.assertTrue(compute_node['hypervisor_version'] > 0) + + db.service_destroy(self.context, service_ref['id']) + FLAGS.instances_path = org_path + + def test_update_resource_info_no_compute_record_found(self): + """Raise exception if no recorde found on services table.""" + org_path = FLAGS.instances_path = '' + FLAGS.instances_path = '.' + self.create_fake_libvirt_mock() + + self.mox.ReplayAll() + conn = libvirt_conn.LibvirtConnection(False) + self.assertRaises(exception.Invalid, + conn.update_available_resource, + self.context, 'dummy') + + FLAGS.instances_path = org_path + + def test_ensure_filtering_rules_for_instance_timeout(self): + """ensure_filtering_fules_for_instance() finishes with timeout.""" + # Skip if non-libvirt environment + if not self.lazy_load_library_exists(): + return + + # Preparing mocks + def fake_none(self): + return + + def fake_raise(self): + raise libvirt.libvirtError('ERR') + + self.create_fake_libvirt_mock(nwfilterLookupByName=fake_raise) + instance_ref = db.instance_create(self.context, self.test_instance) + + # Start test + self.mox.ReplayAll() + try: + conn = libvirt_conn.LibvirtConnection(False) + conn.firewall_driver.setattr('setup_basic_filtering', fake_none) + conn.firewall_driver.setattr('prepare_instance_filter', fake_none) + conn.ensure_filtering_rules_for_instance(instance_ref) + except exception.Error, e: + c1 = (0 <= e.message.find('Timeout migrating for')) + self.assertTrue(c1) + + db.instance_destroy(self.context, instance_ref['id']) + + def test_live_migration_raises_exception(self): + """Confirms recover method is called when exceptions are raised.""" + # Skip if non-libvirt environment + if not self.lazy_load_library_exists(): + return + + # Preparing data + self.compute = utils.import_object(FLAGS.compute_manager) + instance_dict = {'host': 'fake', 'state': power_state.RUNNING, + 'state_description': 'running'} + instance_ref = db.instance_create(self.context, self.test_instance) + instance_ref = db.instance_update(self.context, instance_ref['id'], + instance_dict) + vol_dict = {'status': 'migrating', 'size': 1} + volume_ref = db.volume_create(self.context, vol_dict) + db.volume_attached(self.context, volume_ref['id'], instance_ref['id'], + '/dev/fake') + + # Preparing mocks + vdmock = self.mox.CreateMock(libvirt.virDomain) + self.mox.StubOutWithMock(vdmock, "migrateToURI") + vdmock.migrateToURI(FLAGS.live_migration_uri % 'dest', + mox.IgnoreArg(), + None, FLAGS.live_migration_bandwidth).\ + AndRaise(libvirt.libvirtError('ERR')) + + def fake_lookup(instance_name): + if instance_name == instance_ref.name: + return vdmock + + self.create_fake_libvirt_mock(lookupByName=fake_lookup) + + # Start test + self.mox.ReplayAll() + conn = libvirt_conn.LibvirtConnection(False) + self.assertRaises(libvirt.libvirtError, + conn._live_migration, + self.context, instance_ref, 'dest', '', + self.compute.recover_live_migration) + + instance_ref = db.instance_get(self.context, instance_ref['id']) + self.assertTrue(instance_ref['state_description'] == 'running') + self.assertTrue(instance_ref['state'] == power_state.RUNNING) + volume_ref = db.volume_get(self.context, volume_ref['id']) + self.assertTrue(volume_ref['status'] == 'in-use') + + db.volume_destroy(self.context, volume_ref['id']) + db.instance_destroy(self.context, instance_ref['id']) + def tearDown(self): self.manager.delete_project(self.project) self.manager.delete_user(self.user) @@ -301,16 +522,22 @@ class IptablesFirewallTestCase(test.TestCase): self.manager.delete_user(self.user) super(IptablesFirewallTestCase, self).tearDown() - in_rules = [ + in_nat_rules = [ + '# Generated by iptables-save v1.4.10 on Sat Feb 19 00:03:19 2011', + '*nat', + ':PREROUTING ACCEPT [1170:189210]', + ':INPUT ACCEPT [844:71028]', + ':OUTPUT ACCEPT [5149:405186]', + ':POSTROUTING ACCEPT [5063:386098]', + ] + + in_filter_rules = [ '# Generated by iptables-save v1.4.4 on Mon Dec 6 11:54:13 2010', '*filter', ':INPUT ACCEPT [969615:281627771]', ':FORWARD ACCEPT [0:0]', ':OUTPUT ACCEPT [915599:63811649]', ':nova-block-ipv4 - [0:0]', - '-A INPUT -i virbr0 -p udp -m udp --dport 53 -j ACCEPT ', - '-A INPUT -i virbr0 -p tcp -m tcp --dport 53 -j ACCEPT ', - '-A INPUT -i virbr0 -p udp -m udp --dport 67 -j ACCEPT ', '-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ', '-A FORWARD -d 192.168.122.0/24 -o virbr0 -m state --state RELATED' ',ESTABLISHED -j ACCEPT ', @@ -322,7 +549,7 @@ class IptablesFirewallTestCase(test.TestCase): '# Completed on Mon Dec 6 11:54:13 2010', ] - in6_rules = [ + in6_filter_rules = [ '# Generated by ip6tables-save v1.4.4 on Tue Jan 18 23:47:56 2011', '*filter', ':INPUT ACCEPT [349155:75810423]', @@ -385,21 +612,31 @@ class IptablesFirewallTestCase(test.TestCase): def fake_iptables_execute(*cmd, **kwargs): process_input = kwargs.get('process_input', None) if cmd == ('sudo', 'ip6tables-save', '-t', 'filter'): - return '\n'.join(self.in6_rules), None + return '\n'.join(self.in6_filter_rules), None if cmd == ('sudo', 'iptables-save', '-t', 'filter'): - return '\n'.join(self.in_rules), None + return '\n'.join(self.in_filter_rules), None + if cmd == ('sudo', 'iptables-save', '-t', 'nat'): + return '\n'.join(self.in_nat_rules), None if cmd == ('sudo', 'iptables-restore'): - self.out_rules = process_input.split('\n') + lines = process_input.split('\n') + if '*filter' in lines: + self.out_rules = lines return '', '' if cmd == ('sudo', 'ip6tables-restore'): - self.out6_rules = process_input.split('\n') + lines = process_input.split('\n') + if '*filter' in lines: + self.out6_rules = lines return '', '' - self.fw.execute = fake_iptables_execute + print cmd, kwargs + + from nova.network import linux_net + linux_net.iptables_manager.execute = fake_iptables_execute self.fw.prepare_instance_filter(instance_ref) self.fw.apply_instance_filter(instance_ref) - in_rules = filter(lambda l: not l.startswith('#'), self.in_rules) + in_rules = filter(lambda l: not l.startswith('#'), + self.in_filter_rules) for rule in in_rules: if not 'nova' in rule: self.assertTrue(rule in self.out_rules, @@ -422,17 +659,18 @@ class IptablesFirewallTestCase(test.TestCase): self.assertTrue(security_group_chain, "The security group chain wasn't added") - self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -j ACCEPT' % \ - security_group_chain in self.out_rules, + regex = re.compile('-A .* -p icmp -s 192.168.11.0/24 -j ACCEPT') + self.assertTrue(len(filter(regex.match, self.out_rules)) > 0, "ICMP acceptance rule wasn't added") - self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -m icmp --icmp-type ' - '8 -j ACCEPT' % security_group_chain in self.out_rules, + regex = re.compile('-A .* -p icmp -s 192.168.11.0/24 -m icmp ' + '--icmp-type 8 -j ACCEPT') + self.assertTrue(len(filter(regex.match, self.out_rules)) > 0, "ICMP Echo Request acceptance rule wasn't added") - self.assertTrue('-A %s -p tcp -s 192.168.10.0/24 -m multiport ' - '--dports 80:81 -j ACCEPT' % security_group_chain \ - in self.out_rules, + regex = re.compile('-A .* -p tcp -s 192.168.10.0/24 -m multiport ' + '--dports 80:81 -j ACCEPT') + self.assertTrue(len(filter(regex.match, self.out_rules)) > 0, "TCP port 80/81 acceptance rule wasn't added") db.instance_destroy(admin_ctxt, instance_ref['id']) diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py index f698c85b5..1b1d72092 100644 --- a/nova/tests/test_volume.py +++ b/nova/tests/test_volume.py @@ -20,6 +20,8 @@ Tests for Volume Code. """ +import cStringIO + from nova import context from nova import exception from nova import db @@ -173,3 +175,196 @@ class VolumeTestCase(test.TestCase): # each of them having a different FLAG for storage_node # This will allow us to test cross-node interactions pass + + +class DriverTestCase(test.TestCase): + """Base Test class for Drivers.""" + driver_name = "nova.volume.driver.FakeAOEDriver" + + def setUp(self): + super(DriverTestCase, self).setUp() + self.flags(volume_driver=self.driver_name, + logging_default_format_string="%(message)s") + self.volume = utils.import_object(FLAGS.volume_manager) + self.context = context.get_admin_context() + self.output = "" + + def _fake_execute(_command, *_args, **_kwargs): + """Fake _execute.""" + return self.output, None + self.volume.driver._execute = _fake_execute + self.volume.driver._sync_execute = _fake_execute + + log = logging.getLogger() + self.stream = cStringIO.StringIO() + log.addHandler(logging.StreamHandler(self.stream)) + + inst = {} + self.instance_id = db.instance_create(self.context, inst)['id'] + + def tearDown(self): + super(DriverTestCase, self).tearDown() + + def _attach_volume(self): + """Attach volumes to an instance. This function also sets + a fake log message.""" + return [] + + def _detach_volume(self, volume_id_list): + """Detach volumes from an instance.""" + for volume_id in volume_id_list: + db.volume_detached(self.context, volume_id) + self.volume.delete_volume(self.context, volume_id) + + +class AOETestCase(DriverTestCase): + """Test Case for AOEDriver""" + driver_name = "nova.volume.driver.AOEDriver" + + def setUp(self): + super(AOETestCase, self).setUp() + + def tearDown(self): + super(AOETestCase, self).tearDown() + + def _attach_volume(self): + """Attach volumes to an instance. This function also sets + a fake log message.""" + volume_id_list = [] + for index in xrange(3): + vol = {} + vol['size'] = 0 + volume_id = db.volume_create(self.context, + vol)['id'] + self.volume.create_volume(self.context, volume_id) + + # each volume has a different mountpoint + mountpoint = "/dev/sd" + chr((ord('b') + index)) + db.volume_attached(self.context, volume_id, self.instance_id, + mountpoint) + + (shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context, + volume_id) + self.output += "%s %s eth0 /dev/nova-volumes/vol-foo auto run\n" \ + % (shelf_id, blade_id) + + volume_id_list.append(volume_id) + + return volume_id_list + + def test_check_for_export_with_no_volume(self): + """No log message when no volume is attached to an instance.""" + self.stream.truncate(0) + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + + def test_check_for_export_with_all_vblade_processes(self): + """No log message when all the vblade processes are running.""" + volume_id_list = self._attach_volume() + + self.stream.truncate(0) + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + + self._detach_volume(volume_id_list) + + def test_check_for_export_with_vblade_process_missing(self): + """Output a warning message when some vblade processes aren't + running.""" + volume_id_list = self._attach_volume() + + # the first vblade process isn't running + self.output = self.output.replace("run", "down", 1) + (shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context, + volume_id_list[0]) + + msg_is_match = False + self.stream.truncate(0) + try: + self.volume.check_for_export(self.context, self.instance_id) + except exception.ProcessExecutionError, e: + volume_id = volume_id_list[0] + msg = _("Cannot confirm exported volume id:%(volume_id)s. " + "vblade process for e%(shelf_id)s.%(blade_id)s " + "isn't running.") % locals() + + msg_is_match = (0 <= e.message.find(msg)) + + self.assertTrue(msg_is_match) + self._detach_volume(volume_id_list) + + +class ISCSITestCase(DriverTestCase): + """Test Case for ISCSIDriver""" + driver_name = "nova.volume.driver.ISCSIDriver" + + def setUp(self): + super(ISCSITestCase, self).setUp() + + def tearDown(self): + super(ISCSITestCase, self).tearDown() + + def _attach_volume(self): + """Attach volumes to an instance. This function also sets + a fake log message.""" + volume_id_list = [] + for index in xrange(3): + vol = {} + vol['size'] = 0 + vol_ref = db.volume_create(self.context, vol) + self.volume.create_volume(self.context, vol_ref['id']) + vol_ref = db.volume_get(self.context, vol_ref['id']) + + # each volume has a different mountpoint + mountpoint = "/dev/sd" + chr((ord('b') + index)) + db.volume_attached(self.context, vol_ref['id'], self.instance_id, + mountpoint) + volume_id_list.append(vol_ref['id']) + + return volume_id_list + + def test_check_for_export_with_no_volume(self): + """No log message when no volume is attached to an instance.""" + self.stream.truncate(0) + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + + def test_check_for_export_with_all_volume_exported(self): + """No log message when all the vblade processes are running.""" + volume_id_list = self._attach_volume() + + self.mox.StubOutWithMock(self.volume.driver, '_execute') + for i in volume_id_list: + tid = db.volume_get_iscsi_target_num(self.context, i) + self.volume.driver._execute("sudo ietadm --op show --tid=%(tid)d" + % locals()) + + self.stream.truncate(0) + self.mox.ReplayAll() + self.volume.check_for_export(self.context, self.instance_id) + self.assertEqual(self.stream.getvalue(), '') + self.mox.UnsetStubs() + + self._detach_volume(volume_id_list) + + def test_check_for_export_with_some_volume_missing(self): + """Output a warning message when some volumes are not recognied + by ietd.""" + volume_id_list = self._attach_volume() + + # the first vblade process isn't running + tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0]) + self.mox.StubOutWithMock(self.volume.driver, '_execute') + self.volume.driver._execute("sudo ietadm --op show --tid=%(tid)d" + % locals()).AndRaise(exception.ProcessExecutionError()) + + self.mox.ReplayAll() + self.assertRaises(exception.ProcessExecutionError, + self.volume.check_for_export, + self.context, + self.instance_id) + msg = _("Cannot confirm exported volume id:%s.") % volume_id_list[0] + self.assertTrue(0 <= self.stream.getvalue().find(msg)) + self.mox.UnsetStubs() + + self._detach_volume(volume_id_list) diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index c26dc8639..cd125a301 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -18,6 +18,7 @@ Test suite for XenAPI """ +import functools import stubout from nova import db @@ -41,6 +42,21 @@ from nova.tests.glance import stubs as glance_stubs FLAGS = flags.FLAGS +def stub_vm_utils_with_vdi_attached_here(function, should_return=True): + """ + vm_utils.with_vdi_attached_here needs to be stubbed out because it + calls down to the filesystem to attach a vdi. This provides a + decorator to handle that. + """ + @functools.wraps(function) + def decorated_function(self, *args, **kwargs): + orig_with_vdi_attached_here = vm_utils.with_vdi_attached_here + vm_utils.with_vdi_attached_here = lambda *x: should_return + function(self, *args, **kwargs) + vm_utils.with_vdi_attached_here = orig_with_vdi_attached_here + return decorated_function + + class XenAPIVolumeTestCase(test.TestCase): """ Unit tests for Volume operations @@ -62,6 +78,7 @@ class XenAPIVolumeTestCase(test.TestCase): 'ramdisk_id': 3, 'instance_type': 'm1.large', 'mac_address': 'aa:bb:cc:dd:ee:ff', + 'os_type': 'linux' } def _create_volume(self, size='0'): @@ -219,7 +236,7 @@ class XenAPIVMTestCase(test.TestCase): check() - def check_vm_record(self, conn): + def create_vm_record(self, conn, os_type): instances = conn.list_instances() self.assertEquals(instances, [1]) @@ -231,28 +248,63 @@ class XenAPIVMTestCase(test.TestCase): in xenapi_fake.get_all_records('VM').iteritems() if not rec['is_control_domain']] vm = vms[0] + self.vm_info = vm_info + self.vm = vm + def check_vm_record(self, conn): # Check that m1.large above turned into the right thing. instance_type = db.instance_type_get_by_name(conn, 'm1.large') mem_kib = long(instance_type['memory_mb']) << 10 mem_bytes = str(mem_kib << 10) vcpus = instance_type['vcpus'] - self.assertEquals(vm_info['max_mem'], mem_kib) - self.assertEquals(vm_info['mem'], mem_kib) - self.assertEquals(vm['memory_static_max'], mem_bytes) - self.assertEquals(vm['memory_dynamic_max'], mem_bytes) - self.assertEquals(vm['memory_dynamic_min'], mem_bytes) - self.assertEquals(vm['VCPUs_max'], str(vcpus)) - self.assertEquals(vm['VCPUs_at_startup'], str(vcpus)) + self.assertEquals(self.vm_info['max_mem'], mem_kib) + self.assertEquals(self.vm_info['mem'], mem_kib) + self.assertEquals(self.vm['memory_static_max'], mem_bytes) + self.assertEquals(self.vm['memory_dynamic_max'], mem_bytes) + self.assertEquals(self.vm['memory_dynamic_min'], mem_bytes) + self.assertEquals(self.vm['VCPUs_max'], str(vcpus)) + self.assertEquals(self.vm['VCPUs_at_startup'], str(vcpus)) # Check that the VM is running according to Nova - self.assertEquals(vm_info['state'], power_state.RUNNING) + self.assertEquals(self.vm_info['state'], power_state.RUNNING) # Check that the VM is running according to XenAPI. - self.assertEquals(vm['power_state'], 'Running') + self.assertEquals(self.vm['power_state'], 'Running') + + def check_vm_params_for_windows(self): + self.assertEquals(self.vm['platform']['nx'], 'true') + self.assertEquals(self.vm['HVM_boot_params'], {'order': 'dc'}) + self.assertEquals(self.vm['HVM_boot_policy'], 'BIOS order') + + # check that these are not set + self.assertEquals(self.vm['PV_args'], '') + self.assertEquals(self.vm['PV_bootloader'], '') + self.assertEquals(self.vm['PV_kernel'], '') + self.assertEquals(self.vm['PV_ramdisk'], '') + + def check_vm_params_for_linux(self): + self.assertEquals(self.vm['platform']['nx'], 'false') + self.assertEquals(self.vm['PV_args'], 'clocksource=jiffies') + self.assertEquals(self.vm['PV_bootloader'], 'pygrub') + + # check that these are not set + self.assertEquals(self.vm['PV_kernel'], '') + self.assertEquals(self.vm['PV_ramdisk'], '') + self.assertEquals(self.vm['HVM_boot_params'], {}) + self.assertEquals(self.vm['HVM_boot_policy'], '') + + def check_vm_params_for_linux_with_external_kernel(self): + self.assertEquals(self.vm['platform']['nx'], 'false') + self.assertEquals(self.vm['PV_args'], 'root=/dev/xvda1') + self.assertNotEquals(self.vm['PV_kernel'], '') + self.assertNotEquals(self.vm['PV_ramdisk'], '') + + # check that these are not set + self.assertEquals(self.vm['HVM_boot_params'], {}) + self.assertEquals(self.vm['HVM_boot_policy'], '') def _test_spawn(self, image_id, kernel_id, ramdisk_id, - instance_type="m1.large"): + instance_type="m1.large", os_type="linux"): stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) values = {'name': 1, 'id': 1, @@ -263,10 +315,12 @@ class XenAPIVMTestCase(test.TestCase): 'ramdisk_id': ramdisk_id, 'instance_type': instance_type, 'mac_address': 'aa:bb:cc:dd:ee:ff', + 'os_type': os_type } conn = xenapi_conn.get_connection(False) instance = db.instance_create(values) conn.spawn(instance) + self.create_vm_record(conn, os_type) self.check_vm_record(conn) def test_spawn_not_enough_memory(self): @@ -283,24 +337,37 @@ class XenAPIVMTestCase(test.TestCase): FLAGS.xenapi_image_service = 'objectstore' self._test_spawn(1, 2, 3) + @stub_vm_utils_with_vdi_attached_here def test_spawn_raw_glance(self): FLAGS.xenapi_image_service = 'glance' self._test_spawn(glance_stubs.FakeGlance.IMAGE_RAW, None, None) + self.check_vm_params_for_linux() + + def test_spawn_vhd_glance_linux(self): + FLAGS.xenapi_image_service = 'glance' + self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None, + os_type="linux") + self.check_vm_params_for_linux() - def test_spawn_vhd_glance(self): + def test_spawn_vhd_glance_windows(self): FLAGS.xenapi_image_service = 'glance' - self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None) + self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None, + os_type="windows") + self.check_vm_params_for_windows() def test_spawn_glance(self): FLAGS.xenapi_image_service = 'glance' self._test_spawn(glance_stubs.FakeGlance.IMAGE_MACHINE, glance_stubs.FakeGlance.IMAGE_KERNEL, glance_stubs.FakeGlance.IMAGE_RAMDISK) + self.check_vm_params_for_linux_with_external_kernel() def tearDown(self): super(XenAPIVMTestCase, self).tearDown() self.manager.delete_project(self.project) self.manager.delete_user(self.user) + self.vm_info = None + self.vm = None self.stubs.UnsetAll() def _create_instance(self): @@ -314,7 +381,8 @@ class XenAPIVMTestCase(test.TestCase): 'kernel_id': 2, 'ramdisk_id': 3, 'instance_type': 'm1.large', - 'mac_address': 'aa:bb:cc:dd:ee:ff'} + 'mac_address': 'aa:bb:cc:dd:ee:ff', + 'os_type': 'linux'} instance = db.instance_create(values) self.conn.spawn(instance) return instance @@ -372,6 +440,7 @@ class XenAPIMigrateInstance(test.TestCase): 'ramdisk_id': None, 'instance_type': 'm1.large', 'mac_address': 'aa:bb:cc:dd:ee:ff', + 'os_type': 'linux' } stubs.stub_out_migration_methods(self.stubs) glance_stubs.stubout_glance_client(self.stubs, @@ -410,6 +479,7 @@ class XenAPIDetermineDiskImageTestCase(test.TestCase): self.fake_instance = FakeInstance() self.fake_instance.id = 42 + self.fake_instance.os_type = 'linux' def assert_disk_type(self, disk_type): dt = vm_utils.VMHelper.determine_disk_image_type( diff --git a/nova/utils.py b/nova/utils.py index 0466fecf4..87e726394 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -139,34 +139,44 @@ def execute(*cmd, **kwargs): stdin = kwargs.get('stdin', subprocess.PIPE) stdout = kwargs.get('stdout', subprocess.PIPE) stderr = kwargs.get('stderr', subprocess.PIPE) + attempts = kwargs.get('attempts', 1) cmd = map(str, cmd) - LOG.debug(_("Running cmd (subprocess): %s"), ' '.join(cmd)) - env = os.environ.copy() - if addl_env: - env.update(addl_env) - obj = subprocess.Popen(cmd, stdin=stdin, - stdout=stdout, stderr=stderr, env=env) - result = None - if process_input != None: - result = obj.communicate(process_input) - else: - result = obj.communicate() - obj.stdin.close() - if obj.returncode: - LOG.debug(_("Result was %s") % obj.returncode) - if type(check_exit_code) == types.IntType \ - and obj.returncode != check_exit_code: - (stdout, stderr) = result - raise ProcessExecutionError(exit_code=obj.returncode, - stdout=stdout, - stderr=stderr, - cmd=' '.join(cmd)) - # NOTE(termie): this appears to be necessary to let the subprocess call - # clean something up in between calls, without it two - # execute calls in a row hangs the second one - greenthread.sleep(0) - return result + while attempts > 0: + attempts -= 1 + try: + LOG.debug(_("Running cmd (subprocess): %s"), ' '.join(cmd)) + env = os.environ.copy() + if addl_env: + env.update(addl_env) + obj = subprocess.Popen(cmd, stdin=stdin, + stdout=stdout, stderr=stderr, env=env) + result = None + if process_input != None: + result = obj.communicate(process_input) + else: + result = obj.communicate() + obj.stdin.close() + if obj.returncode: + LOG.debug(_("Result was %s") % obj.returncode) + if type(check_exit_code) == types.IntType \ + and obj.returncode != check_exit_code: + (stdout, stderr) = result + raise ProcessExecutionError(exit_code=obj.returncode, + stdout=stdout, + stderr=stderr, + cmd=' '.join(cmd)) + # NOTE(termie): this appears to be necessary to let the subprocess + # call clean something up in between calls, without + # it two execute calls in a row hangs the second one + greenthread.sleep(0) + return result + except ProcessExecutionError: + if not attempts: + raise + else: + LOG.debug(_("%r failed. Retrying."), cmd) + greenthread.sleep(random.randint(20, 200) / 100.0) def ssh_execute(ssh, cmd, process_input=None, diff --git a/nova/virt/cpuinfo.xml.template b/nova/virt/cpuinfo.xml.template new file mode 100644 index 000000000..48842b29d --- /dev/null +++ b/nova/virt/cpuinfo.xml.template @@ -0,0 +1,9 @@ +<cpu> + <arch>$arch</arch> + <model>$model</model> + <vendor>$vendor</vendor> + <topology sockets="$topology.sockets" cores="$topology.cores" threads="$topology.threads"/> +#for $var in $features + <features name="$var" /> +#end for +</cpu> diff --git a/nova/virt/disk.py b/nova/virt/disk.py index 5d499c42c..9abe44cc3 100644 --- a/nova/virt/disk.py +++ b/nova/virt/disk.py @@ -189,4 +189,4 @@ def _inject_net_into_fs(net, fs): utils.execute('sudo', 'chown', 'root:root', netdir) utils.execute('sudo', 'chmod', 755, netdir) netfile = os.path.join(netdir, 'interfaces') - utils.execute('sudo', 'tee', netfile, net) + utils.execute('sudo', 'tee', netfile, process_input=net) diff --git a/nova/virt/fake.py b/nova/virt/fake.py index c744acf91..3a06284a1 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -407,6 +407,27 @@ class FakeConnection(object): """ return True + def update_available_resource(self, ctxt, host): + """This method is supported only by libvirt.""" + return + + def compare_cpu(self, xml): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def ensure_filtering_rules_for_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def live_migration(self, context, instance_ref, dest, + post_method, recover_method): + """This method is supported only by libvirt.""" + return + + def unfilter_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + class FakeInstance(object): diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 44b07213a..7994e9547 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -36,10 +36,13 @@ Supports KVM, QEMU, UML, and XEN. """ +import multiprocessing import os import shutil +import sys import random import subprocess +import time import uuid from xml.dom import minidom @@ -56,7 +59,6 @@ from nova import flags from nova import log as logging #from nova import test from nova import utils -#from nova.api import context from nova.auth import manager from nova.compute import instance_types from nova.compute import power_state @@ -70,6 +72,7 @@ Template = None LOG = logging.getLogger('nova.virt.libvirt_conn') FLAGS = flags.FLAGS +flags.DECLARE('live_migration_retry_count', 'nova.compute.manager') # TODO(vish): These flags should probably go into a shared location flags.DEFINE_string('rescue_image_id', 'ami-rescue', 'Rescue ami image') flags.DEFINE_string('rescue_kernel_id', 'aki-rescue', 'Rescue aki image') @@ -100,6 +103,17 @@ flags.DEFINE_string('ajaxterm_portrange', flags.DEFINE_string('firewall_driver', 'nova.virt.libvirt_conn.IptablesFirewallDriver', 'Firewall driver (defaults to iptables)') +flags.DEFINE_string('cpuinfo_xml_template', + utils.abspath('virt/cpuinfo.xml.template'), + 'CpuInfo XML Template (Used only live migration now)') +flags.DEFINE_string('live_migration_uri', + "qemu+tcp://%s/system", + 'Define protocol used by live_migration feature') +flags.DEFINE_string('live_migration_flag', + "VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER", + 'Define live migration behavior.') +flags.DEFINE_integer('live_migration_bandwidth', 0, + 'Define live migration behavior') def get_connection(read_only): @@ -146,6 +160,7 @@ class LibvirtConnection(object): self.libvirt_uri = self.get_uri() self.libvirt_xml = open(FLAGS.libvirt_xml_template).read() + self.cpuinfo_xml = open(FLAGS.cpuinfo_xml_template).read() self._wrapped_conn = None self.read_only = read_only @@ -347,19 +362,19 @@ class LibvirtConnection(object): @exception.wrap_exception def pause(self, instance, callback): - raise exception.APIError("pause not supported for libvirt.") + raise exception.ApiError("pause not supported for libvirt.") @exception.wrap_exception def unpause(self, instance, callback): - raise exception.APIError("unpause not supported for libvirt.") + raise exception.ApiError("unpause not supported for libvirt.") @exception.wrap_exception def suspend(self, instance, callback): - raise exception.APIError("suspend not supported for libvirt") + raise exception.ApiError("suspend not supported for libvirt") @exception.wrap_exception def resume(self, instance, callback): - raise exception.APIError("resume not supported for libvirt") + raise exception.ApiError("resume not supported for libvirt") @exception.wrap_exception def rescue(self, instance, callback=None): @@ -764,7 +779,7 @@ class LibvirtConnection(object): 'cpu_time': cpu_time} def get_diagnostics(self, instance_name): - raise exception.APIError(_("diagnostics are not supported " + raise exception.ApiError(_("diagnostics are not supported " "for libvirt")) def get_disks(self, instance_name): @@ -851,6 +866,158 @@ class LibvirtConnection(object): return interfaces + def get_vcpu_total(self): + """Get vcpu number of physical computer. + + :returns: the number of cpu core. + + """ + + # On certain platforms, this will raise a NotImplementedError. + try: + return multiprocessing.cpu_count() + except NotImplementedError: + LOG.warn(_("Cannot get the number of cpu, because this " + "function is not implemented for this platform. " + "This error can be safely ignored for now.")) + return 0 + + def get_memory_mb_total(self): + """Get the total memory size(MB) of physical computer. + + :returns: the total amount of memory(MB). + + """ + + if sys.platform.upper() != 'LINUX2': + return 0 + + meminfo = open('/proc/meminfo').read().split() + idx = meminfo.index('MemTotal:') + # transforming kb to mb. + return int(meminfo[idx + 1]) / 1024 + + def get_local_gb_total(self): + """Get the total hdd size(GB) of physical computer. + + :returns: + The total amount of HDD(GB). + Note that this value shows a partition where + NOVA-INST-DIR/instances mounts. + + """ + + hddinfo = os.statvfs(FLAGS.instances_path) + return hddinfo.f_frsize * hddinfo.f_blocks / 1024 / 1024 / 1024 + + def get_vcpu_used(self): + """ Get vcpu usage number of physical computer. + + :returns: The total number of vcpu that currently used. + + """ + + total = 0 + for dom_id in self._conn.listDomainsID(): + dom = self._conn.lookupByID(dom_id) + total += len(dom.vcpus()[1]) + return total + + def get_memory_mb_used(self): + """Get the free memory size(MB) of physical computer. + + :returns: the total usage of memory(MB). + + """ + + if sys.platform.upper() != 'LINUX2': + return 0 + + m = open('/proc/meminfo').read().split() + idx1 = m.index('MemFree:') + idx2 = m.index('Buffers:') + idx3 = m.index('Cached:') + avail = (int(m[idx1 + 1]) + int(m[idx2 + 1]) + int(m[idx3 + 1])) / 1024 + return self.get_memory_mb_total() - avail + + def get_local_gb_used(self): + """Get the free hdd size(GB) of physical computer. + + :returns: + The total usage of HDD(GB). + Note that this value shows a partition where + NOVA-INST-DIR/instances mounts. + + """ + + hddinfo = os.statvfs(FLAGS.instances_path) + avail = hddinfo.f_frsize * hddinfo.f_bavail / 1024 / 1024 / 1024 + return self.get_local_gb_total() - avail + + def get_hypervisor_type(self): + """Get hypervisor type. + + :returns: hypervisor type (ex. qemu) + + """ + + return self._conn.getType() + + def get_hypervisor_version(self): + """Get hypervisor version. + + :returns: hypervisor version (ex. 12003) + + """ + + return self._conn.getVersion() + + def get_cpu_info(self): + """Get cpuinfo information. + + Obtains cpu feature from virConnect.getCapabilities, + and returns as a json string. + + :return: see above description + + """ + + xml = self._conn.getCapabilities() + xml = libxml2.parseDoc(xml) + nodes = xml.xpathEval('//cpu') + if len(nodes) != 1: + raise exception.Invalid(_("Invalid xml. '<cpu>' must be 1," + "but %d\n") % len(nodes) + + xml.serialize()) + + cpu_info = dict() + cpu_info['arch'] = xml.xpathEval('//cpu/arch')[0].getContent() + cpu_info['model'] = xml.xpathEval('//cpu/model')[0].getContent() + cpu_info['vendor'] = xml.xpathEval('//cpu/vendor')[0].getContent() + + topology_node = xml.xpathEval('//cpu/topology')[0].get_properties() + topology = dict() + while topology_node != None: + name = topology_node.get_name() + topology[name] = topology_node.getContent() + topology_node = topology_node.get_next() + + keys = ['cores', 'sockets', 'threads'] + tkeys = topology.keys() + if list(set(tkeys)) != list(set(keys)): + ks = ', '.join(keys) + raise exception.Invalid(_("Invalid xml: topology(%(topology)s) " + "must have %(ks)s") % locals()) + + feature_nodes = xml.xpathEval('//cpu/feature') + features = list() + for nodes in feature_nodes: + features.append(nodes.get_properties().getContent()) + + cpu_info['topology'] = topology + cpu_info['features'] = features + return utils.dumps(cpu_info) + def block_stats(self, instance_name, disk): """ Note that this function takes an instance name, not an Instance, so @@ -881,6 +1048,207 @@ class LibvirtConnection(object): def refresh_security_group_members(self, security_group_id): self.firewall_driver.refresh_security_group_members(security_group_id) + def update_available_resource(self, ctxt, host): + """Updates compute manager resource info on ComputeNode table. + + This method is called when nova-coompute launches, and + whenever admin executes "nova-manage service update_resource". + + :param ctxt: security context + :param host: hostname that compute manager is currently running + + """ + + try: + service_ref = db.service_get_all_compute_by_host(ctxt, host)[0] + except exception.NotFound: + raise exception.Invalid(_("Cannot update compute manager " + "specific info, because no service " + "record was found.")) + + # Updating host information + dic = {'vcpus': self.get_vcpu_total(), + 'memory_mb': self.get_memory_mb_total(), + 'local_gb': self.get_local_gb_total(), + 'vcpus_used': self.get_vcpu_used(), + 'memory_mb_used': self.get_memory_mb_used(), + 'local_gb_used': self.get_local_gb_used(), + 'hypervisor_type': self.get_hypervisor_type(), + 'hypervisor_version': self.get_hypervisor_version(), + 'cpu_info': self.get_cpu_info()} + + compute_node_ref = service_ref['compute_node'] + if not compute_node_ref: + LOG.info(_('Compute_service record created for %s ') % host) + dic['service_id'] = service_ref['id'] + db.compute_node_create(ctxt, dic) + else: + LOG.info(_('Compute_service record updated for %s ') % host) + db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic) + + def compare_cpu(self, cpu_info): + """Checks the host cpu is compatible to a cpu given by xml. + + "xml" must be a part of libvirt.openReadonly().getCapabilities(). + return values follows by virCPUCompareResult. + if 0 > return value, do live migration. + 'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult' + + :param cpu_info: json string that shows cpu feature(see get_cpu_info()) + :returns: + None. if given cpu info is not compatible to this server, + raise exception. + + """ + + LOG.info(_('Instance launched has CPU info:\n%s') % cpu_info) + dic = utils.loads(cpu_info) + xml = str(Template(self.cpuinfo_xml, searchList=dic)) + LOG.info(_('to xml...\n:%s ' % xml)) + + u = "http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult" + m = _("CPU doesn't have compatibility.\n\n%(ret)s\n\nRefer to %(u)s") + # unknown character exists in xml, then libvirt complains + try: + ret = self._conn.compareCPU(xml, 0) + except libvirt.libvirtError, e: + ret = e.message + LOG.error(m % locals()) + raise + + if ret <= 0: + raise exception.Invalid(m % locals()) + + return + + def ensure_filtering_rules_for_instance(self, instance_ref): + """Setting up filtering rules and waiting for its completion. + + To migrate an instance, filtering rules to hypervisors + and firewalls are inevitable on destination host. + ( Waiting only for filterling rules to hypervisor, + since filtering rules to firewall rules can be set faster). + + Concretely, the below method must be called. + - setup_basic_filtering (for nova-basic, etc.) + - prepare_instance_filter(for nova-instance-instance-xxx, etc.) + + to_xml may have to be called since it defines PROJNET, PROJMASK. + but libvirt migrates those value through migrateToURI(), + so , no need to be called. + + Don't use thread for this method since migration should + not be started when setting-up filtering rules operations + are not completed. + + :params instance_ref: nova.db.sqlalchemy.models.Instance object + + """ + + # If any instances never launch at destination host, + # basic-filtering must be set here. + self.firewall_driver.setup_basic_filtering(instance_ref) + # setting up n)ova-instance-instance-xx mainly. + self.firewall_driver.prepare_instance_filter(instance_ref) + + # wait for completion + timeout_count = range(FLAGS.live_migration_retry_count) + while timeout_count: + try: + filter_name = 'nova-instance-%s' % instance_ref.name + self._conn.nwfilterLookupByName(filter_name) + break + except libvirt.libvirtError: + timeout_count.pop() + if len(timeout_count) == 0: + ec2_id = instance_ref['hostname'] + iname = instance_ref.name + msg = _('Timeout migrating for %(ec2_id)s(%(iname)s)') + raise exception.Error(msg % locals()) + time.sleep(1) + + def live_migration(self, ctxt, instance_ref, dest, + post_method, recover_method): + """Spawning live_migration operation for distributing high-load. + + :params ctxt: security context + :params instance_ref: + nova.db.sqlalchemy.models.Instance object + instance object that is migrated. + :params dest: destination host + :params post_method: + post operation method. + expected nova.compute.manager.post_live_migration. + :params recover_method: + recovery method when any exception occurs. + expected nova.compute.manager.recover_live_migration. + + """ + + greenthread.spawn(self._live_migration, ctxt, instance_ref, dest, + post_method, recover_method) + + def _live_migration(self, ctxt, instance_ref, dest, + post_method, recover_method): + """Do live migration. + + :params ctxt: security context + :params instance_ref: + nova.db.sqlalchemy.models.Instance object + instance object that is migrated. + :params dest: destination host + :params post_method: + post operation method. + expected nova.compute.manager.post_live_migration. + :params recover_method: + recovery method when any exception occurs. + expected nova.compute.manager.recover_live_migration. + + """ + + # Do live migration. + try: + flaglist = FLAGS.live_migration_flag.split(',') + flagvals = [getattr(libvirt, x.strip()) for x in flaglist] + logical_sum = reduce(lambda x, y: x | y, flagvals) + + if self.read_only: + tmpconn = self._connect(self.libvirt_uri, False) + dom = tmpconn.lookupByName(instance_ref.name) + dom.migrateToURI(FLAGS.live_migration_uri % dest, + logical_sum, + None, + FLAGS.live_migration_bandwidth) + tmpconn.close() + else: + dom = self._conn.lookupByName(instance_ref.name) + dom.migrateToURI(FLAGS.live_migration_uri % dest, + logical_sum, + None, + FLAGS.live_migration_bandwidth) + + except Exception: + recover_method(ctxt, instance_ref) + raise + + # Waiting for completion of live_migration. + timer = utils.LoopingCall(f=None) + + def wait_for_live_migration(): + """waiting for live migration completion""" + try: + self.get_info(instance_ref.name)['state'] + except exception.NotFound: + timer.stop() + post_method(ctxt, instance_ref, dest) + + timer.f = wait_for_live_migration + timer.start(interval=0.5, now=True) + + def unfilter_instance(self, instance_ref): + """See comments of same method in firewall_driver.""" + self.firewall_driver.unfilter_instance(instance_ref) + class FirewallDriver(object): def prepare_instance_filter(self, instance): @@ -1222,10 +1590,14 @@ class NWFilterFirewall(FirewallDriver): class IptablesFirewallDriver(FirewallDriver): def __init__(self, execute=None, **kwargs): - self.execute = execute or utils.execute + from nova.network import linux_net + self.iptables = linux_net.iptables_manager self.instances = {} self.nwfilter = NWFilterFirewall(kwargs['get_connection']) + self.iptables.ipv4['filter'].add_chain('sg-fallback') + self.iptables.ipv4['filter'].add_rule('sg-fallback', '-j DROP') + def setup_basic_filtering(self, instance): """Use NWFilter from libvirt for this.""" return self.nwfilter.setup_basic_filtering(instance) @@ -1234,128 +1606,96 @@ class IptablesFirewallDriver(FirewallDriver): """No-op. Everything is done in prepare_instance_filter""" pass - def remove_instance(self, instance): - if instance['id'] in self.instances: - del self.instances[instance['id']] + def unfilter_instance(self, instance): + if self.instances.pop(instance['id'], None): + self.remove_filters_for_instance(instance) + self.iptables.apply() else: LOG.info(_('Attempted to unfilter instance %s which is not ' - 'filtered'), instance['id']) + 'filtered'), instance['id']) - def add_instance(self, instance): + def prepare_instance_filter(self, instance): self.instances[instance['id']] = instance + self.add_filters_for_instance(instance) + self.iptables.apply() - def unfilter_instance(self, instance): - self.remove_instance(instance) - self.apply_ruleset() + def add_filters_for_instance(self, instance): + chain_name = self._instance_chain_name(instance) - def prepare_instance_filter(self, instance): - self.add_instance(instance) - self.apply_ruleset() - - def apply_ruleset(self): - current_filter, _ = self.execute('sudo', 'iptables-save', - '-t', 'filter') - current_lines = current_filter.split('\n') - new_filter = self.modify_rules(current_lines, 4) - self.execute('sudo', 'iptables-restore', - process_input='\n'.join(new_filter)) - if(FLAGS.use_ipv6): - current_filter, _ = self.execute('sudo', 'ip6tables-save', - '-t', 'filter') - current_lines = current_filter.split('\n') - new_filter = self.modify_rules(current_lines, 6) - self.execute('sudo', 'ip6tables-restore', - process_input='\n'.join(new_filter)) - - def modify_rules(self, current_lines, ip_version=4): + self.iptables.ipv4['filter'].add_chain(chain_name) + ipv4_address = self._ip_for_instance(instance) + self.iptables.ipv4['filter'].add_rule('local', + '-d %s -j $%s' % + (ipv4_address, chain_name)) + + if FLAGS.use_ipv6: + self.iptables.ipv6['filter'].add_chain(chain_name) + ipv6_address = self._ip_for_instance_v6(instance) + self.iptables.ipv6['filter'].add_rule('local', + '-d %s -j $%s' % + (ipv6_address, + chain_name)) + + ipv4_rules, ipv6_rules = self.instance_rules(instance) + + for rule in ipv4_rules: + self.iptables.ipv4['filter'].add_rule(chain_name, rule) + + if FLAGS.use_ipv6: + for rule in ipv6_rules: + self.iptables.ipv6['filter'].add_rule(chain_name, rule) + + def remove_filters_for_instance(self, instance): + chain_name = self._instance_chain_name(instance) + + self.iptables.ipv4['filter'].remove_chain(chain_name) + if FLAGS.use_ipv6: + self.iptables.ipv6['filter'].remove_chain(chain_name) + + def instance_rules(self, instance): ctxt = context.get_admin_context() - # Remove any trace of nova rules. - new_filter = filter(lambda l: 'nova-' not in l, current_lines) - - seen_chains = False - for rules_index in range(len(new_filter)): - if not seen_chains: - if new_filter[rules_index].startswith(':'): - seen_chains = True - elif seen_chains == 1: - if not new_filter[rules_index].startswith(':'): - break - our_chains = [':nova-fallback - [0:0]'] - our_rules = ['-A nova-fallback -j DROP'] - - our_chains += [':nova-local - [0:0]'] - our_rules += ['-A FORWARD -j nova-local'] - our_rules += ['-A OUTPUT -j nova-local'] - - security_groups = {} - # Add our chains - # First, we add instance chains and rules - for instance_id in self.instances: - instance = self.instances[instance_id] - chain_name = self._instance_chain_name(instance) - if(ip_version == 4): - ip_address = self._ip_for_instance(instance) - elif(ip_version == 6): - ip_address = self._ip_for_instance_v6(instance) - - our_chains += [':%s - [0:0]' % chain_name] - - # Jump to the per-instance chain - our_rules += ['-A nova-local -d %s -j %s' % (ip_address, - chain_name)] - - # Always drop invalid packets - our_rules += ['-A %s -m state --state ' - 'INVALID -j DROP' % (chain_name,)] - - # Allow established connections - our_rules += ['-A %s -m state --state ' - 'ESTABLISHED,RELATED -j ACCEPT' % (chain_name,)] - - # Jump to each security group chain in turn - for security_group in \ - db.security_group_get_by_instance(ctxt, - instance['id']): - security_groups[security_group['id']] = security_group - - sg_chain_name = self._security_group_chain_name( - security_group['id']) + ipv4_rules = [] + ipv6_rules = [] - our_rules += ['-A %s -j %s' % (chain_name, sg_chain_name)] - - if(ip_version == 4): - # Allow DHCP responses - dhcp_server = self._dhcp_server_for_instance(instance) - our_rules += ['-A %s -s %s -p udp --sport 67 --dport 68 ' - '-j ACCEPT ' % (chain_name, dhcp_server)] - #Allow project network traffic - if (FLAGS.allow_project_net_traffic): - cidr = self._project_cidr_for_instance(instance) - our_rules += ['-A %s -s %s -j ACCEPT' % (chain_name, cidr)] - elif(ip_version == 6): - # Allow RA responses - ra_server = self._ra_server_for_instance(instance) - if ra_server: - our_rules += ['-A %s -s %s -p icmpv6 -j ACCEPT' % - (chain_name, ra_server + "/128")] - #Allow project network traffic - if (FLAGS.allow_project_net_traffic): - cidrv6 = self._project_cidrv6_for_instance(instance) - our_rules += ['-A %s -s %s -j ACCEPT' % - (chain_name, cidrv6)] - - # If nothing matches, jump to the fallback chain - our_rules += ['-A %s -j nova-fallback' % (chain_name,)] + # Always drop invalid packets + ipv4_rules += ['-m state --state ' 'INVALID -j DROP'] + ipv6_rules += ['-m state --state ' 'INVALID -j DROP'] - # then, security group chains and rules - for security_group_id in security_groups: - chain_name = self._security_group_chain_name(security_group_id) - our_chains += [':%s - [0:0]' % chain_name] + # Allow established connections + ipv4_rules += ['-m state --state ESTABLISHED,RELATED -j ACCEPT'] + ipv6_rules += ['-m state --state ESTABLISHED,RELATED -j ACCEPT'] + + dhcp_server = self._dhcp_server_for_instance(instance) + ipv4_rules += ['-s %s -p udp --sport 67 --dport 68 ' + '-j ACCEPT' % (dhcp_server,)] + + #Allow project network traffic + if FLAGS.allow_project_net_traffic: + cidr = self._project_cidr_for_instance(instance) + ipv4_rules += ['-s %s -j ACCEPT' % (cidr,)] + + # We wrap these in FLAGS.use_ipv6 because they might cause + # a DB lookup. The other ones are just list operations, so + # they're not worth the clutter. + if FLAGS.use_ipv6: + # Allow RA responses + ra_server = self._ra_server_for_instance(instance) + if ra_server: + ipv6_rules += ['-s %s/128 -p icmpv6 -j ACCEPT' % (ra_server,)] + + #Allow project network traffic + if FLAGS.allow_project_net_traffic: + cidrv6 = self._project_cidrv6_for_instance(instance) + ipv6_rules += ['-s %s -j ACCEPT' % (cidrv6,)] - rules = \ - db.security_group_rule_get_by_security_group(ctxt, - security_group_id) + security_groups = db.security_group_get_by_instance(ctxt, + instance['id']) + + # then, security group chains and rules + for security_group in security_groups: + rules = db.security_group_rule_get_by_security_group(ctxt, + security_group['id']) for rule in rules: logging.info('%r', rule) @@ -1366,14 +1706,16 @@ class IptablesFirewallDriver(FirewallDriver): continue version = _get_ip_version(rule.cidr) - if version != ip_version: - continue + if version == 4: + rules = ipv4_rules + else: + rules = ipv6_rules protocol = rule.protocol if version == 6 and rule.protocol == 'icmp': protocol = 'icmpv6' - args = ['-A', chain_name, '-p', protocol, '-s', rule.cidr] + args = ['-p', protocol, '-s', rule.cidr] if rule.protocol in ['udp', 'tcp']: if rule.from_port == rule.to_port: @@ -1394,32 +1736,39 @@ class IptablesFirewallDriver(FirewallDriver): icmp_type_arg += '/%s' % icmp_code if icmp_type_arg: - if(ip_version == 4): + if version == 4: args += ['-m', 'icmp', '--icmp-type', icmp_type_arg] - elif(ip_version == 6): + elif version == 6: args += ['-m', 'icmp6', '--icmpv6-type', icmp_type_arg] args += ['-j ACCEPT'] - our_rules += [' '.join(args)] + rules += [' '.join(args)] + + ipv4_rules += ['-j $sg-fallback'] + ipv6_rules += ['-j $sg-fallback'] - new_filter[rules_index:rules_index] = our_rules - new_filter[rules_index:rules_index] = our_chains - logging.info('new_filter: %s', '\n'.join(new_filter)) - return new_filter + return ipv4_rules, ipv6_rules def refresh_security_group_members(self, security_group): pass def refresh_security_group_rules(self, security_group): - self.apply_ruleset() + # We use the semaphore to make sure noone applies the rule set + # after we've yanked the existing rules but before we've put in + # the new ones. + with self.iptables.semaphore: + for instance in self.instances.values(): + self.remove_filters_for_instance(instance) + self.add_filters_for_instance(instance) + self.iptables.apply() def _security_group_chain_name(self, security_group_id): return 'nova-sg-%s' % (security_group_id,) def _instance_chain_name(self, instance): - return 'nova-inst-%s' % (instance['id'],) + return 'inst-%s' % (instance['id'],) def _ip_for_instance(self, instance): return db.instance_get_fixed_address(context.get_admin_context(), diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 4e6c71446..f07b57796 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -41,9 +41,11 @@ from nova.virt.xenapi import HelperBase from nova.virt.xenapi.volume_utils import StorageError -FLAGS = flags.FLAGS LOG = logging.getLogger("nova.virt.xenapi.vm_utils") +FLAGS = flags.FLAGS +flags.DEFINE_string('default_os_type', 'linux', 'Default OS type') + XENAPI_POWER_STATE = { 'Halted': power_state.SHUTDOWN, 'Running': power_state.RUNNING, @@ -80,10 +82,19 @@ class VMHelper(HelperBase): """ @classmethod - def create_vm(cls, session, instance, kernel, ramdisk, pv_kernel=False): + def create_vm(cls, session, instance, kernel, ramdisk, + use_pv_kernel=False): """Create a VM record. Returns a Deferred that gives the new VM reference. - the pv_kernel flag indicates whether the guest is HVM or PV + the use_pv_kernel flag indicates whether the guest is HVM or PV + + There are 3 scenarios: + + 1. Using paravirtualization, kernel passed in + + 2. Using paravirtualization, kernel within the image + + 3. Using hardware virtualization """ instance_type = instance_types.\ @@ -91,52 +102,62 @@ class VMHelper(HelperBase): mem = str(long(instance_type['memory_mb']) * 1024 * 1024) vcpus = str(instance_type['vcpus']) rec = { - 'name_label': instance.name, - 'name_description': '', + 'actions_after_crash': 'destroy', + 'actions_after_reboot': 'restart', + 'actions_after_shutdown': 'destroy', + 'affinity': '', + 'blocked_operations': {}, + 'ha_always_run': False, + 'ha_restart_priority': '', + 'HVM_boot_params': {}, + 'HVM_boot_policy': '', 'is_a_template': False, - 'memory_static_min': '0', - 'memory_static_max': mem, 'memory_dynamic_min': mem, 'memory_dynamic_max': mem, - 'VCPUs_at_startup': vcpus, - 'VCPUs_max': vcpus, - 'VCPUs_params': {}, - 'actions_after_shutdown': 'destroy', - 'actions_after_reboot': 'restart', - 'actions_after_crash': 'destroy', - 'PV_bootloader': '', - 'PV_kernel': '', - 'PV_ramdisk': '', + 'memory_static_min': '0', + 'memory_static_max': mem, + 'memory_target': mem, + 'name_description': '', + 'name_label': instance.name, + 'other_config': {'allowvssprovider': False}, + 'other_config': {}, + 'PCI_bus': '', + 'platform': {'acpi': 'true', 'apic': 'true', 'pae': 'true', + 'viridian': 'true', 'timeoffset': '0'}, 'PV_args': '', + 'PV_bootloader': '', 'PV_bootloader_args': '', + 'PV_kernel': '', 'PV_legacy_args': '', - 'HVM_boot_policy': '', - 'HVM_boot_params': {}, - 'platform': {}, - 'PCI_bus': '', + 'PV_ramdisk': '', 'recommendations': '', - 'affinity': '', + 'tags': [], 'user_version': '0', - 'other_config': {}, + 'VCPUs_at_startup': vcpus, + 'VCPUs_max': vcpus, + 'VCPUs_params': {}, + 'xenstore_data': {} } - #Complete VM configuration record according to the image type - #non-raw/raw with PV kernel/raw in HVM mode - if instance.kernel_id: - rec['PV_bootloader'] = '' - rec['PV_kernel'] = kernel - rec['PV_ramdisk'] = ramdisk - rec['PV_args'] = 'root=/dev/xvda1' - rec['PV_bootloader_args'] = '' - rec['PV_legacy_args'] = '' - else: - if pv_kernel: - rec['PV_args'] = 'noninteractive' - rec['PV_bootloader'] = 'pygrub' + + # Complete VM configuration record according to the image type + # non-raw/raw with PV kernel/raw in HVM mode + if use_pv_kernel: + rec['platform']['nx'] = 'false' + if instance.kernel_id: + # 1. Kernel explicitly passed in, use that + rec['PV_args'] = 'root=/dev/xvda1' + rec['PV_kernel'] = kernel + rec['PV_ramdisk'] = ramdisk else: - rec['HVM_boot_policy'] = 'BIOS order' - rec['HVM_boot_params'] = {'order': 'dc'} - rec['platform'] = {'acpi': 'true', 'apic': 'true', - 'pae': 'true', 'viridian': 'true'} + # 2. Use kernel within the image + rec['PV_args'] = 'clocksource=jiffies' + rec['PV_bootloader'] = 'pygrub' + else: + # 3. Using hardware virtualization + rec['platform']['nx'] = 'true' + rec['HVM_boot_params'] = {'order': 'dc'} + rec['HVM_boot_policy'] = 'BIOS order' + LOG.debug(_('Created VM %s...'), instance.name) vm_ref = session.call_xenapi('VM.create', rec) instance_name = instance.name @@ -181,13 +202,13 @@ class VMHelper(HelperBase): @classmethod def find_vbd_by_number(cls, session, vm_ref, number): """Get the VBD reference from the device number""" - vbds = session.get_xenapi().VM.get_VBDs(vm_ref) - if vbds: - for vbd in vbds: + vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) + if vbd_refs: + for vbd_ref in vbd_refs: try: - vbd_rec = session.get_xenapi().VBD.get_record(vbd) + vbd_rec = session.get_xenapi().VBD.get_record(vbd_ref) if vbd_rec['userdevice'] == str(number): - return vbd + return vbd_ref except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('VBD not found in instance %s') % vm_ref) @@ -319,7 +340,7 @@ class VMHelper(HelperBase): return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid) @classmethod - def upload_image(cls, session, instance_id, vdi_uuids, image_id): + def upload_image(cls, session, instance, vdi_uuids, image_id): """ Requests that the Glance plugin bundle the specified VDIs and push them into Glance using the specified human-friendly name. """ @@ -328,15 +349,18 @@ class VMHelper(HelperBase): logging.debug(_("Asking xapi to upload %(vdi_uuids)s as" " ID %(image_id)s") % locals()) + os_type = instance.os_type or FLAGS.default_os_type + params = {'vdi_uuids': vdi_uuids, 'image_id': image_id, 'glance_host': FLAGS.glance_host, 'glance_port': FLAGS.glance_port, - 'sr_path': cls.get_sr_path(session)} + 'sr_path': cls.get_sr_path(session), + 'os_type': os_type} kwargs = {'params': pickle.dumps(params)} task = session.async_call_plugin('glance', 'upload_vhd', kwargs) - session.wait_for_task(task, instance_id) + session.wait_for_task(task, instance.id) @classmethod def fetch_image(cls, session, instance_id, image, user, project, @@ -419,29 +443,29 @@ class VMHelper(HelperBase): vdi_size += MBR_SIZE_BYTES name_label = get_name_label_for_image(image) - vdi = cls.create_vdi(session, sr_ref, name_label, vdi_size, False) + vdi_ref = cls.create_vdi(session, sr_ref, name_label, vdi_size, False) - with_vdi_attached_here(session, vdi, False, + with_vdi_attached_here(session, vdi_ref, False, lambda dev: _stream_disk(dev, image_type, virtual_size, image_file)) if image_type == ImageType.KERNEL_RAMDISK: #we need to invoke a plugin for copying VDI's #content into proper path - LOG.debug(_("Copying VDI %s to /boot/guest on dom0"), vdi) + LOG.debug(_("Copying VDI %s to /boot/guest on dom0"), vdi_ref) fn = "copy_kernel_vdi" args = {} - args['vdi-ref'] = vdi + args['vdi-ref'] = vdi_ref #let the plugin copy the correct number of bytes args['image-size'] = str(vdi_size) task = session.async_call_plugin('glance', fn, args) filename = session.wait_for_task(task, instance_id) #remove the VDI as it is not needed anymore - session.get_xenapi().VDI.destroy(vdi) - LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi) + session.get_xenapi().VDI.destroy(vdi_ref) + LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref) return filename else: - return session.get_xenapi().VDI.get_uuid(vdi) + return session.get_xenapi().VDI.get_uuid(vdi_ref) @classmethod def determine_disk_image_type(cls, instance): @@ -533,17 +557,33 @@ class VMHelper(HelperBase): return uuid @classmethod - def lookup_image(cls, session, instance_id, vdi_ref): + def determine_is_pv(cls, session, instance_id, vdi_ref, disk_image_type, + os_type): """ - Determine if VDI is using a PV kernel + Determine whether the VM will use a paravirtualized kernel or if it + will use hardware virtualization. + + 1. Objectstore (any image type): + We use plugin to figure out whether the VDI uses PV + + 2. Glance (VHD): then we use `os_type`, raise if not set + + 3. Glance (DISK_RAW): use Pygrub to figure out if pv kernel is + available + + 4. Glance (DISK): pv is assumed """ if FLAGS.xenapi_image_service == 'glance': - return cls._lookup_image_glance(session, vdi_ref) + # 2, 3, 4: Glance + return cls._determine_is_pv_glance( + session, vdi_ref, disk_image_type, os_type) else: - return cls._lookup_image_objectstore(session, instance_id, vdi_ref) + # 1. Objecstore + return cls._determine_is_pv_objectstore(session, instance_id, + vdi_ref) @classmethod - def _lookup_image_objectstore(cls, session, instance_id, vdi_ref): + def _determine_is_pv_objectstore(cls, session, instance_id, vdi_ref): LOG.debug(_("Looking up vdi %s for PV kernel"), vdi_ref) fn = "is_vdi_pv" args = {} @@ -559,42 +599,72 @@ class VMHelper(HelperBase): return pv @classmethod - def _lookup_image_glance(cls, session, vdi_ref): + def _determine_is_pv_glance(cls, session, vdi_ref, disk_image_type, + os_type): + """ + For a Glance image, determine if we need paravirtualization. + + The relevant scenarios are: + 2. Glance (VHD): then we use `os_type`, raise if not set + + 3. Glance (DISK_RAW): use Pygrub to figure out if pv kernel is + available + + 4. Glance (DISK): pv is assumed + """ + LOG.debug(_("Looking up vdi %s for PV kernel"), vdi_ref) - return with_vdi_attached_here(session, vdi_ref, True, _is_vdi_pv) + if disk_image_type == ImageType.DISK_VHD: + # 2. VHD + if os_type == 'windows': + is_pv = False + else: + is_pv = True + elif disk_image_type == ImageType.DISK_RAW: + # 3. RAW + is_pv = with_vdi_attached_here(session, vdi_ref, True, _is_vdi_pv) + elif disk_image_type == ImageType.DISK: + # 4. Disk + is_pv = True + else: + raise exception.Error(_("Unknown image format %(disk_image_type)s") + % locals()) + + return is_pv @classmethod - def lookup(cls, session, i): + def lookup(cls, session, name_label): """Look the instance i up, and returns it if available""" - vms = session.get_xenapi().VM.get_by_name_label(i) - n = len(vms) + vm_refs = session.get_xenapi().VM.get_by_name_label(name_label) + n = len(vm_refs) if n == 0: return None elif n > 1: - raise exception.Duplicate(_('duplicate name found: %s') % i) + raise exception.Duplicate(_('duplicate name found: %s') % + name_label) else: - return vms[0] + return vm_refs[0] @classmethod - def lookup_vm_vdis(cls, session, vm): + def lookup_vm_vdis(cls, session, vm_ref): """Look for the VDIs that are attached to the VM""" # Firstly we get the VBDs, then the VDIs. # TODO(Armando): do we leave the read-only devices? - vbds = session.get_xenapi().VM.get_VBDs(vm) - vdis = [] - if vbds: - for vbd in vbds: + vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) + vdi_refs = [] + if vbd_refs: + for vbd_ref in vbd_refs: try: - vdi = session.get_xenapi().VBD.get_VDI(vbd) + vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) # Test valid VDI - record = session.get_xenapi().VDI.get_record(vdi) + record = session.get_xenapi().VDI.get_record(vdi_ref) LOG.debug(_('VDI %s is still available'), record['uuid']) except cls.XenAPI.Failure, exc: LOG.exception(exc) else: - vdis.append(vdi) - if len(vdis) > 0: - return vdis + vdi_refs.append(vdi_ref) + if len(vdi_refs) > 0: + return vdi_refs else: return None @@ -770,16 +840,16 @@ def safe_find_sr(session): def find_sr(session): """Return the storage repository to hold VM images""" host = session.get_xenapi_host() - srs = session.get_xenapi().SR.get_all() - for sr in srs: - sr_rec = session.get_xenapi().SR.get_record(sr) + sr_refs = session.get_xenapi().SR.get_all() + for sr_ref in sr_refs: + sr_rec = session.get_xenapi().SR.get_record(sr_ref) if not ('i18n-key' in sr_rec['other_config'] and sr_rec['other_config']['i18n-key'] == 'local-storage'): continue - for pbd in sr_rec['PBDs']: - pbd_rec = session.get_xenapi().PBD.get_record(pbd) + for pbd_ref in sr_rec['PBDs']: + pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref) if pbd_rec['host'] == host: - return sr + return sr_ref return None @@ -804,11 +874,11 @@ def remap_vbd_dev(dev): return remapped_dev -def with_vdi_attached_here(session, vdi, read_only, f): +def with_vdi_attached_here(session, vdi_ref, read_only, f): this_vm_ref = get_this_vm_ref(session) vbd_rec = {} vbd_rec['VM'] = this_vm_ref - vbd_rec['VDI'] = vdi + vbd_rec['VDI'] = vdi_ref vbd_rec['userdevice'] = 'autodetect' vbd_rec['bootable'] = False vbd_rec['mode'] = read_only and 'RO' or 'RW' @@ -819,28 +889,28 @@ def with_vdi_attached_here(session, vdi, read_only, f): vbd_rec['qos_algorithm_type'] = '' vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] - LOG.debug(_('Creating VBD for VDI %s ... '), vdi) - vbd = session.get_xenapi().VBD.create(vbd_rec) - LOG.debug(_('Creating VBD for VDI %s done.'), vdi) + LOG.debug(_('Creating VBD for VDI %s ... '), vdi_ref) + vbd_ref = session.get_xenapi().VBD.create(vbd_rec) + LOG.debug(_('Creating VBD for VDI %s done.'), vdi_ref) try: - LOG.debug(_('Plugging VBD %s ... '), vbd) - session.get_xenapi().VBD.plug(vbd) - LOG.debug(_('Plugging VBD %s done.'), vbd) - orig_dev = session.get_xenapi().VBD.get_device(vbd) - LOG.debug(_('VBD %(vbd)s plugged as %(orig_dev)s') % locals()) + LOG.debug(_('Plugging VBD %s ... '), vbd_ref) + session.get_xenapi().VBD.plug(vbd_ref) + LOG.debug(_('Plugging VBD %s done.'), vbd_ref) + orig_dev = session.get_xenapi().VBD.get_device(vbd_ref) + LOG.debug(_('VBD %(vbd_ref)s plugged as %(orig_dev)s') % locals()) dev = remap_vbd_dev(orig_dev) if dev != orig_dev: - LOG.debug(_('VBD %(vbd)s plugged into wrong dev, ' + LOG.debug(_('VBD %(vbd_ref)s plugged into wrong dev, ' 'remapping to %(dev)s') % locals()) return f(dev) finally: - LOG.debug(_('Destroying VBD for VDI %s ... '), vdi) - vbd_unplug_with_retry(session, vbd) - ignore_failure(session.get_xenapi().VBD.destroy, vbd) - LOG.debug(_('Destroying VBD for VDI %s done.'), vdi) + LOG.debug(_('Destroying VBD for VDI %s ... '), vdi_ref) + vbd_unplug_with_retry(session, vbd_ref) + ignore_failure(session.get_xenapi().VBD.destroy, vbd_ref) + LOG.debug(_('Destroying VBD for VDI %s done.'), vdi_ref) -def vbd_unplug_with_retry(session, vbd): +def vbd_unplug_with_retry(session, vbd_ref): """Call VBD.unplug on the given VBD, with a retry if we get DEVICE_DETACH_REJECTED. For reasons which I don't understand, we're seeing the device still in use, even when all processes using the device @@ -848,7 +918,7 @@ def vbd_unplug_with_retry(session, vbd): # FIXME(sirp): We can use LoopingCall here w/o blocking sleep() while True: try: - session.get_xenapi().VBD.unplug(vbd) + session.get_xenapi().VBD.unplug(vbd_ref) LOG.debug(_('VBD.unplug successful first time.')) return except VMHelper.XenAPI.Failure, e: diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 562ecd4d5..fcb290d03 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -55,12 +55,12 @@ class VMOps(object): def list_instances(self): """List VM instances""" - vms = [] - for vm in self._session.get_xenapi().VM.get_all(): - rec = self._session.get_xenapi().VM.get_record(vm) - if not rec["is_a_template"] and not rec["is_control_domain"]: - vms.append(rec["name_label"]) - return vms + vm_refs = [] + for vm_ref in self._session.get_xenapi().VM.get_all(): + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: + vm_refs.append(vm_rec["name_label"]) + return vm_refs def _start(self, instance, vm_ref=None): """Power on a VM instance""" @@ -87,8 +87,8 @@ class VMOps(object): def _spawn_with_disk(self, instance, vdi_uuid): """Create VM instance""" instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is not None: + vm_ref = VMHelper.lookup(self._session, instance_name) + if vm_ref is not None: raise exception.Duplicate(_('Attempted to create' ' non-unique name %s') % instance_name) @@ -104,31 +104,26 @@ class VMOps(object): user = AuthManager().get_user(instance.user_id) project = AuthManager().get_project(instance.project_id) - kernel = ramdisk = pv_kernel = None - # Are we building from a pre-existing disk? vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid) disk_image_type = VMHelper.determine_disk_image_type(instance) - if disk_image_type == ImageType.DISK_RAW: - # Have a look at the VDI and see if it has a PV kernel - pv_kernel = VMHelper.lookup_image(self._session, instance.id, - vdi_ref) - elif disk_image_type == ImageType.DISK_VHD: - # TODO(sirp): Assuming PV for now; this will need to be - # configurable as Windows will use HVM. - pv_kernel = True + kernel = None if instance.kernel_id: kernel = VMHelper.fetch_image(self._session, instance.id, instance.kernel_id, user, project, ImageType.KERNEL_RAMDISK) + ramdisk = None if instance.ramdisk_id: ramdisk = VMHelper.fetch_image(self._session, instance.id, instance.ramdisk_id, user, project, ImageType.KERNEL_RAMDISK) - vm_ref = VMHelper.create_vm(self._session, - instance, kernel, ramdisk, pv_kernel) + use_pv_kernel = VMHelper.determine_is_pv(self._session, instance.id, + vdi_ref, disk_image_type, instance.os_type) + vm_ref = VMHelper.create_vm(self._session, instance, kernel, ramdisk, + use_pv_kernel) + VMHelper.create_vbd(session=self._session, vm_ref=vm_ref, vdi_ref=vdi_ref, userdevice=0, bootable=True) @@ -266,7 +261,7 @@ class VMOps(object): template_vm_ref, template_vdi_uuids = self._get_snapshot(instance) # call plugin to ship snapshot off to glance VMHelper.upload_image( - self._session, instance.id, template_vdi_uuids, image_id) + self._session, instance, template_vdi_uuids, image_id) finally: if template_vm_ref: self._destroy(instance, template_vm_ref, @@ -371,8 +366,8 @@ class VMOps(object): def reboot(self, instance): """Reboot VM instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.clean_reboot', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.clean_reboot', vm_ref) self._session.wait_for_task(task, instance.id) def set_admin_password(self, instance, new_pass): @@ -439,7 +434,7 @@ class VMOps(object): raise RuntimeError(resp_dict['message']) return resp_dict['message'] - def _shutdown(self, instance, vm, hard=True): + def _shutdown(self, instance, vm_ref, hard=True): """Shutdown an instance""" state = self.get_info(instance['name'])['state'] if state == power_state.SHUTDOWN: @@ -453,31 +448,33 @@ class VMOps(object): try: task = None if hard: - task = self._session.call_xenapi("Async.VM.hard_shutdown", vm) + task = self._session.call_xenapi("Async.VM.hard_shutdown", + vm_ref) else: - task = self._session.call_xenapi('Async.VM.clean_shutdown', vm) + task = self._session.call_xenapi("Async.VM.clean_shutdown", + vm_ref) self._session.wait_for_task(task, instance.id) except self.XenAPI.Failure, exc: LOG.exception(exc) - def _destroy_vdis(self, instance, vm): - """Destroys all VDIs associated with a VM """ + def _destroy_vdis(self, instance, vm_ref): + """Destroys all VDIs associated with a VM""" instance_id = instance.id LOG.debug(_("Destroying VDIs for Instance %(instance_id)s") % locals()) - vdis = VMHelper.lookup_vm_vdis(self._session, vm) + vdi_refs = VMHelper.lookup_vm_vdis(self._session, vm_ref) - if not vdis: + if not vdi_refs: return - for vdi in vdis: + for vdi_ref in vdi_refs: try: - task = self._session.call_xenapi('Async.VDI.destroy', vdi) + task = self._session.call_xenapi('Async.VDI.destroy', vdi_ref) self._session.wait_for_task(task, instance.id) except self.XenAPI.Failure, exc: LOG.exception(exc) - def _destroy_kernel_ramdisk(self, instance, vm): + def _destroy_kernel_ramdisk(self, instance, vm_ref): """ Three situations can occur: @@ -504,8 +501,8 @@ class VMOps(object): "both" % locals())) # 3. We have both kernel and ramdisk - (kernel, ramdisk) = VMHelper.lookup_kernel_ramdisk( - self._session, vm) + (kernel, ramdisk) = VMHelper.lookup_kernel_ramdisk(self._session, + vm_ref) LOG.debug(_("Removing kernel/ramdisk files")) @@ -516,11 +513,11 @@ class VMOps(object): LOG.debug(_("kernel/ramdisk files removed")) - def _destroy_vm(self, instance, vm): - """Destroys a VM record """ + def _destroy_vm(self, instance, vm_ref): + """Destroys a VM record""" instance_id = instance.id try: - task = self._session.call_xenapi('Async.VM.destroy', vm) + task = self._session.call_xenapi('Async.VM.destroy', vm_ref) self._session.wait_for_task(task, instance_id) except self.XenAPI.Failure, exc: LOG.exception(exc) @@ -536,10 +533,10 @@ class VMOps(object): """ instance_id = instance.id LOG.info(_("Destroying VM for Instance %(instance_id)s") % locals()) - vm = VMHelper.lookup(self._session, instance.name) - return self._destroy(instance, vm, shutdown=True) + vm_ref = VMHelper.lookup(self._session, instance.name) + return self._destroy(instance, vm_ref, shutdown=True) - def _destroy(self, instance, vm, shutdown=True, + def _destroy(self, instance, vm_ref, shutdown=True, destroy_kernel_ramdisk=True): """ Destroys VM instance by performing: @@ -549,17 +546,17 @@ class VMOps(object): 3. Destroying kernel and ramdisk files (if necessary) 4. Destroying that actual VM record """ - if vm is None: + if vm_ref is None: LOG.warning(_("VM is not present, skipping destroy...")) return if shutdown: - self._shutdown(instance, vm) + self._shutdown(instance, vm_ref) - self._destroy_vdis(instance, vm) + self._destroy_vdis(instance, vm_ref) if destroy_kernel_ramdisk: - self._destroy_kernel_ramdisk(instance, vm) - self._destroy_vm(instance, vm) + self._destroy_kernel_ramdisk(instance, vm_ref) + self._destroy_vm(instance, vm_ref) def _wait_with_callback(self, instance_id, task, callback): ret = None @@ -571,26 +568,27 @@ class VMOps(object): def pause(self, instance, callback): """Pause VM instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.pause', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.pause', vm_ref) self._wait_with_callback(instance.id, task, callback) def unpause(self, instance, callback): """Unpause VM instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.unpause', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.unpause', vm_ref) self._wait_with_callback(instance.id, task, callback) def suspend(self, instance, callback): """suspend the specified instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.suspend', vm) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.suspend', vm_ref) self._wait_with_callback(instance.id, task, callback) def resume(self, instance, callback): """resume the specified instance""" - vm = self._get_vm_opaque_ref(instance) - task = self._session.call_xenapi('Async.VM.resume', vm, False, True) + vm_ref = self._get_vm_opaque_ref(instance) + task = self._session.call_xenapi('Async.VM.resume', vm_ref, False, + True) self._wait_with_callback(instance.id, task, callback) def rescue(self, instance, callback): @@ -600,29 +598,26 @@ class VMOps(object): - spawn a rescue VM (the vm name-label will be instance-N-rescue) """ - rescue_vm = VMHelper.lookup(self._session, instance.name + "-rescue") - if rescue_vm: + rescue_vm_ref = VMHelper.lookup(self._session, + instance.name + "-rescue") + if rescue_vm_ref: raise RuntimeError(_( "Instance is already in Rescue Mode: %s" % instance.name)) - vm = self._get_vm_opaque_ref(instance) - self._shutdown(instance, vm) - self._acquire_bootlock(vm) + vm_ref = self._get_vm_opaque_ref(instance) + self._shutdown(instance, vm_ref) + self._acquire_bootlock(vm_ref) instance._rescue = True self.spawn(instance) - rescue_vm = self._get_vm_opaque_ref(instance) + rescue_vm_ref = self._get_vm_opaque_ref(instance) - vbd = self._session.get_xenapi().VM.get_VBDs(vm)[0] - vdi_ref = self._session.get_xenapi().VBD.get_record(vbd)["VDI"] - vbd_ref = VMHelper.create_vbd( - self._session, - rescue_vm, - vdi_ref, - 1, - False) + vbd_ref = self._session.get_xenapi().VM.get_VBDs(vm_ref)[0] + vdi_ref = self._session.get_xenapi().VBD.get_record(vbd_ref)["VDI"] + rescue_vbd_ref = VMHelper.create_vbd(self._session, rescue_vm_ref, + vdi_ref, 1, False) - self._session.call_xenapi("Async.VBD.plug", vbd_ref) + self._session.call_xenapi("Async.VBD.plug", rescue_vbd_ref) def unrescue(self, instance, callback): """Unrescue the specified instance @@ -631,51 +626,53 @@ class VMOps(object): - release the bootlock to allow the instance VM to start """ - rescue_vm = VMHelper.lookup(self._session, instance.name + "-rescue") + rescue_vm_ref = VMHelper.lookup(self._session, + instance.name + "-rescue") - if not rescue_vm: + if not rescue_vm_ref: raise exception.NotFound(_( "Instance is not in Rescue Mode: %s" % instance.name)) - original_vm = self._get_vm_opaque_ref(instance) - vbds = self._session.get_xenapi().VM.get_VBDs(rescue_vm) + original_vm_ref = self._get_vm_opaque_ref(instance) + vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref) instance._rescue = False - for vbd_ref in vbds: - vbd = self._session.get_xenapi().VBD.get_record(vbd_ref) - if vbd["userdevice"] == "1": + for vbd_ref in vbd_refs: + _vbd_ref = self._session.get_xenapi().VBD.get_record(vbd_ref) + if _vbd_ref["userdevice"] == "1": VMHelper.unplug_vbd(self._session, vbd_ref) VMHelper.destroy_vbd(self._session, vbd_ref) - task1 = self._session.call_xenapi("Async.VM.hard_shutdown", rescue_vm) + task1 = self._session.call_xenapi("Async.VM.hard_shutdown", + rescue_vm_ref) self._session.wait_for_task(task1, instance.id) - vdis = VMHelper.lookup_vm_vdis(self._session, rescue_vm) - for vdi in vdis: + vdi_refs = VMHelper.lookup_vm_vdis(self._session, rescue_vm_ref) + for vdi_ref in vdi_refs: try: - task = self._session.call_xenapi('Async.VDI.destroy', vdi) + task = self._session.call_xenapi('Async.VDI.destroy', vdi_ref) self._session.wait_for_task(task, instance.id) except self.XenAPI.Failure: continue - task2 = self._session.call_xenapi('Async.VM.destroy', rescue_vm) + task2 = self._session.call_xenapi('Async.VM.destroy', rescue_vm_ref) self._session.wait_for_task(task2, instance.id) - self._release_bootlock(original_vm) - self._start(instance, original_vm) + self._release_bootlock(original_vm_ref) + self._start(instance, original_vm_ref) def get_info(self, instance): """Return data about VM instance""" - vm = self._get_vm_opaque_ref(instance) - rec = self._session.get_xenapi().VM.get_record(vm) - return VMHelper.compile_info(rec) + vm_ref = self._get_vm_opaque_ref(instance) + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + return VMHelper.compile_info(vm_rec) def get_diagnostics(self, instance): """Return data about VM diagnostics""" - vm = self._get_vm_opaque_ref(instance) - rec = self._session.get_xenapi().VM.get_record(vm) - return VMHelper.compile_diagnostics(self._session, rec) + vm_ref = self._get_vm_opaque_ref(instance) + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + return VMHelper.compile_diagnostics(self._session, vm_rec) def get_console_output(self, instance): """Return snapshot of console""" @@ -698,9 +695,9 @@ class VMOps(object): # at this stage even though they aren't implemented because these will # be needed for multi-nic and there was no sense writing it for single # network/single IP and then having to turn around and re-write it - vm_opaque_ref = self._get_vm_opaque_ref(instance.id) + vm_ref = self._get_vm_opaque_ref(instance.id) logging.debug(_("injecting network info to xenstore for vm: |%s|"), - vm_opaque_ref) + vm_ref) admin_context = context.get_admin_context() IPs = db.fixed_ip_get_all_by_instance(admin_context, instance['id']) networks = db.network_get_all_by_instance(admin_context, @@ -731,11 +728,10 @@ class VMOps(object): 'ips': [ip_dict(ip) for ip in network_IPs], 'ip6s': [ip6_dict(ip) for ip in network_IPs]} - self.write_to_param_xenstore(vm_opaque_ref, {location: mapping}) + self.write_to_param_xenstore(vm_ref, {location: mapping}) try: - self.write_to_xenstore(vm_opaque_ref, location, - mapping['location']) + self.write_to_xenstore(vm_ref, location, mapping['location']) except KeyError: # catch KeyError for domid if instance isn't running pass @@ -747,8 +743,8 @@ class VMOps(object): Creates vifs for an instance """ - vm_opaque_ref = self._get_vm_opaque_ref(instance.id) - logging.debug(_("creating vif(s) for vm: |%s|"), vm_opaque_ref) + vm_ref = self._get_vm_opaque_ref(instance.id) + logging.debug(_("creating vif(s) for vm: |%s|"), vm_ref) if networks is None: networks = db.network_get_all_by_instance(admin_context, instance['id']) @@ -768,12 +764,8 @@ class VMOps(object): except AttributeError: device = "0" - VMHelper.create_vif( - self._session, - vm_opaque_ref, - network_ref, - instance.mac_address, - device) + VMHelper.create_vif(self._session, vm_ref, network_ref, + instance.mac_address, device) def reset_network(self, instance): """ @@ -837,9 +829,9 @@ class VMOps(object): Any errors raised by the plugin will in turn raise a RuntimeError here. """ instance_id = vm.id - vm = self._get_vm_opaque_ref(vm) - rec = self._session.get_xenapi().VM.get_record(vm) - args = {'dom_id': rec['domid'], 'path': path} + vm_ref = self._get_vm_opaque_ref(vm) + vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + args = {'dom_id': vm_rec['domid'], 'path': path} args.update(addl_args) try: task = self._session.async_call_plugin(plugin, method, args) @@ -919,9 +911,9 @@ class VMOps(object): value for 'keys' is passed, the returned dict is filtered to only return the values for those keys. """ - vm = self._get_vm_opaque_ref(instance_or_vm) + vm_ref = self._get_vm_opaque_ref(instance_or_vm) data = self._session.call_xenapi_request('VM.get_xenstore_data', - (vm, )) + (vm_ref, )) ret = {} if keys is None: keys = data.keys() @@ -939,11 +931,11 @@ class VMOps(object): """Takes a key/value pair and adds it to the xenstore parameter record for the given vm instance. If the key exists in xenstore, it is overwritten""" - vm = self._get_vm_opaque_ref(instance_or_vm) + vm_ref = self._get_vm_opaque_ref(instance_or_vm) self.remove_from_param_xenstore(instance_or_vm, key) jsonval = json.dumps(val) self._session.call_xenapi_request('VM.add_to_xenstore_data', - (vm, key, jsonval)) + (vm_ref, key, jsonval)) def write_to_param_xenstore(self, instance_or_vm, mapping): """Takes a dict and writes each key/value pair to the xenstore @@ -958,14 +950,14 @@ class VMOps(object): them from the xenstore parameter record data for the given VM. If the key doesn't exist, the request is ignored. """ - vm = self._get_vm_opaque_ref(instance_or_vm) + vm_ref = self._get_vm_opaque_ref(instance_or_vm) if isinstance(key_or_keys, basestring): keys = [key_or_keys] else: keys = key_or_keys for key in keys: self._session.call_xenapi_request('VM.remove_from_xenstore_data', - (vm, key)) + (vm_ref, key)) def clear_param_xenstore(self, instance_or_vm): """Removes all data from the xenstore parameter record for this VM.""" diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py index d5ebd29d5..72284ac02 100644 --- a/nova/virt/xenapi/volume_utils.py +++ b/nova/virt/xenapi/volume_utils.py @@ -117,16 +117,16 @@ class VolumeHelper(HelperBase): def introduce_vdi(cls, session, sr_ref): """Introduce VDI in the host""" try: - vdis = session.get_xenapi().SR.get_VDIs(sr_ref) + vdi_refs = session.get_xenapi().SR.get_VDIs(sr_ref) except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref) try: - vdi_rec = session.get_xenapi().VDI.get_record(vdis[0]) + vdi_rec = session.get_xenapi().VDI.get_record(vdi_refs[0]) except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('Unable to get record' - ' of VDI %s on') % vdis[0]) + ' of VDI %s on') % vdi_refs[0]) else: try: return session.get_xenapi().VDI.introduce( diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index b63a5f8c3..da42a83b6 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -49,6 +49,12 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. address for the nova-volume host :target_port: iSCSI Target Port, 3260 Default :iqn_prefix: IQN Prefix, e.g. 'iqn.2010-10.org.openstack' + +**Variable Naming Scheme** + +- suffix "_ref" for opaque references +- suffix "_uuid" for UUIDs +- suffix "_rec" for record objects """ import sys @@ -263,6 +269,27 @@ class XenAPIConnection(object): 'username': FLAGS.xenapi_connection_username, 'password': FLAGS.xenapi_connection_password} + def update_available_resource(self, ctxt, host): + """This method is supported only by libvirt.""" + return + + def compare_cpu(self, xml): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + + def ensure_filtering_rules_for_instance(self, instance_ref): + """This method is supported only libvirt.""" + return + + def live_migration(self, context, instance_ref, dest, + post_method, recover_method): + """This method is supported only by libvirt.""" + return + + def unfilter_instance(self, instance_ref): + """This method is supported only by libvirt.""" + raise NotImplementedError('This method is supported only by libvirt.') + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" diff --git a/nova/volume/driver.py b/nova/volume/driver.py index 45cc800e7..7b4bacdec 100644 --- a/nova/volume/driver.py +++ b/nova/volume/driver.py @@ -143,6 +143,10 @@ class VolumeDriver(object): """Undiscover volume on a remote host.""" raise NotImplementedError() + def check_for_export(self, context, volume_id): + """Make sure volume is exported.""" + raise NotImplementedError() + class AOEDriver(VolumeDriver): """Implements AOE specific volume commands.""" @@ -198,15 +202,45 @@ class AOEDriver(VolumeDriver): self._try_execute('sudo', 'vblade-persist', 'destroy', shelf_id, blade_id) - def discover_volume(self, _volume): + def discover_volume(self, context, _volume): """Discover volume on a remote host.""" - self._execute('sudo', 'aoe-discover') - self._execute('sudo', 'aoe-stat', check_exit_code=False) + (shelf_id, + blade_id) = self.db.volume_get_shelf_and_blade(context, + _volume['id']) + self._execute("sudo aoe-discover") + out, err = self._execute("sudo aoe-stat", check_exit_code=False) + device_path = 'e%(shelf_id)d.%(blade_id)d' % locals() + if out.find(device_path) >= 0: + return "/dev/etherd/%s" % device_path + else: + return def undiscover_volume(self, _volume): """Undiscover volume on a remote host.""" pass + def check_for_export(self, context, volume_id): + """Make sure volume is exported.""" + (shelf_id, + blade_id) = self.db.volume_get_shelf_and_blade(context, + volume_id) + cmd = "sudo vblade-persist ls --no-header" + out, _err = self._execute(cmd) + exported = False + for line in out.split('\n'): + param = line.split(' ') + if len(param) == 6 and param[0] == str(shelf_id) \ + and param[1] == str(blade_id) and param[-1] == "run": + exported = True + break + if not exported: + # Instance will be terminated in this case. + desc = _("Cannot confirm exported volume id:%(volume_id)s. " + "vblade process for e%(shelf_id)s.%(blade_id)s " + "isn't running.") % locals() + raise exception.ProcessExecutionError(out, _err, cmd=cmd, + description=desc) + class FakeAOEDriver(AOEDriver): """Logs calls instead of executing.""" @@ -402,7 +436,7 @@ class ISCSIDriver(VolumeDriver): (property_key, property_value)) return self._run_iscsiadm(iscsi_properties, iscsi_command) - def discover_volume(self, volume): + def discover_volume(self, context, volume): """Discover volume on a remote host.""" iscsi_properties = self._get_iscsi_properties(volume) @@ -461,6 +495,20 @@ class ISCSIDriver(VolumeDriver): self._run_iscsiadm(iscsi_properties, "--logout") self._run_iscsiadm(iscsi_properties, "--op delete") + def check_for_export(self, context, volume_id): + """Make sure volume is exported.""" + + tid = self.db.volume_get_iscsi_target_num(context, volume_id) + try: + self._execute("sudo ietadm --op show --tid=%(tid)d" % locals()) + except exception.ProcessExecutionError, e: + # Instances remount read-only in this case. + # /etc/init.d/iscsitarget restart and rebooting nova-volume + # is better since ensure_export() works at boot time. + logging.error(_("Cannot confirm exported volume " + "id:%(volume_id)s.") % locals()) + raise + class FakeISCSIDriver(ISCSIDriver): """Logs calls instead of executing.""" diff --git a/nova/volume/manager.py b/nova/volume/manager.py index 3e8bc16b3..9dea35b35 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -160,7 +160,7 @@ class VolumeManager(manager.Manager): if volume_ref['host'] == self.host and FLAGS.use_local_volumes: path = self.driver.local_path(volume_ref) else: - path = self.driver.discover_volume(volume_ref) + path = self.driver.discover_volume(context, volume_ref) return path def remove_compute_volume(self, context, volume_id): @@ -171,3 +171,9 @@ class VolumeManager(manager.Manager): return True else: self.driver.undiscover_volume(volume_ref) + + def check_for_export(self, context, instance_id): + """Make sure whether volume is exported.""" + instance_ref = self.db.instance_get(context, instance_id) + for volume in instance_ref['volumes']: + self.driver.check_for_export(context, volume['id']) diff --git a/nova/wsgi.py b/nova/wsgi.py index 2d18da8fb..ba0819466 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -436,7 +436,8 @@ class Serializer(object): try: return handlers[content_type] except Exception: - raise exception.InvalidContentType() + raise exception.InvalidContentType(_("Invalid content type %s" + % content_type)) def _from_json(self, datastring): return utils.loads(datastring) diff --git a/plugins/xenserver/networking/etc/xensource/scripts/vif_rules.py b/plugins/xenserver/networking/etc/xensource/scripts/vif_rules.py index d2b2d61e6..48122e6d6 100755 --- a/plugins/xenserver/networking/etc/xensource/scripts/vif_rules.py +++ b/plugins/xenserver/networking/etc/xensource/scripts/vif_rules.py @@ -54,6 +54,7 @@ def main(dom_id, command, only_this_vif=None): def execute(*command, return_stdout=False): devnull = open(os.devnull, 'w') + command = map(str, command) proc = subprocess.Popen(command, close_fds=True, stdout=subprocess.PIPE, stderr=devnull) devnull.close() @@ -71,13 +72,13 @@ def apply_iptables_rules(command, params): iptables = lambda *rule: execute('/sbin/iptables', *rule) iptables('-D', 'FORWARD', '-m', 'physdev', - '--physdev-in', '%(VIF)s' % params, - '-s', '%(IP)s' % params, + '--physdev-in', params['VIF'], + '-s', params['IP'], '-j', 'ACCEPT') if command == 'online': iptables('-A', 'FORWARD', '-m', 'physdev', - '--physdev-in', '%(VIF)s' % params, - '-s', '%(IP)s' % params, + '--physdev-in', params['VIF'], + '-s', params['IP'], '-j', 'ACCEPT') @@ -85,25 +86,24 @@ def apply_arptables_rules(command, params): arptables = lambda *rule: execute('/sbin/arptables', *rule) arptables('-D', 'FORWARD', '--opcode', 'Request', - '--in-interface', '%(VIF)s' % params, - '--source-ip', '%(IP)s' % params, - '--source-mac', '%(MAC)s' % params, + '--in-interface', params['VIF'], + '--source-ip', params['IP'], + '--source-mac', params['MAC'], '-j', 'ACCEPT') arptables('-D', 'FORWARD', '--opcode', 'Reply', - '--in-interface', '%(VIF)s' % params, - '--source-ip', '%(IP)s' % params, - '--source-mac', '%(MAC)s' % params, + '--in-interface', params['VIF'], + '--source-ip', params['IP'], + '--source-mac', params['MAC'], '-j', 'ACCEPT') if command == 'online': arptables('-A', 'FORWARD', '--opcode', 'Request', - '--in-interface', '%(VIF)s' % params - '--source-ip', '%(IP)s' % params, - '--source-mac', '%(MAC)s' % params, + '--in-interface', params['VIF'], + '--source-mac', params['MAC'], '-j', 'ACCEPT') arptables('-A', 'FORWARD', '--opcode', 'Reply', - '--in-interface', '%(VIF)s' % params, - '--source-ip', '%(IP)s' % params, - '--source-mac', '%(MAC)s' % params, + '--in-interface', params['VIF'], + '--source-ip', params['IP'], + '--source-mac', params['MAC'], '-j', 'ACCEPT') @@ -130,7 +130,7 @@ def apply_ebtables_rules(command, params): '-i', params['VIF'], '-j', 'DROP') if command == 'online': ebtables('-I', 'FORWARD', '1', '-s', '!', params['MAC'], - '-i', '%(VIF)s', '-j', 'DROP') + '-i', params['VIF'], '-j', 'DROP') if __name__ == "__main__": diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance index 201b99fda..c996f6ef4 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance @@ -191,7 +191,7 @@ def _prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids): os.link(source, link_name) -def _upload_tarball(staging_path, image_id, glance_host, glance_port): +def _upload_tarball(staging_path, image_id, glance_host, glance_port, os_type): """ Create a tarball of the image and then stream that into Glance using chunked-transfer-encoded HTTP. @@ -215,7 +215,10 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port): 'x-image-meta-is-public': 'True', 'x-image-meta-status': 'queued', 'x-image-meta-disk-format': 'vhd', - 'x-image-meta-container-format': 'ovf'} + 'x-image-meta-container-format': 'ovf', + 'x-image-meta-property-os-type': os_type + } + for header, value in headers.iteritems(): conn.putheader(header, value) conn.endheaders() @@ -337,11 +340,13 @@ def upload_vhd(session, args): glance_host = params["glance_host"] glance_port = params["glance_port"] sr_path = params["sr_path"] + os_type = params["os_type"] staging_path = _make_staging_area(sr_path) try: _prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids) - _upload_tarball(staging_path, image_id, glance_host, glance_port) + _upload_tarball(staging_path, image_id, glance_host, glance_port, + os_type) finally: _cleanup_staging_area(staging_path) |