From cac332c39645286a11c009094a86f62d02752183 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Tue, 31 Jan 2012 14:49:04 +0000 Subject: Refactoring required for blueprint xenapi-live-migration This refactoring of the libvirt live migration code is required to enable live migration in the xenapi driver. This change ensures libvirt specific checks are performed only when the libvirt driver is enabled. The complication is that some of these checks require information to be passed between the source and destination hosts. For example, when comparing CPU flags. Change-Id: I7389f0b7f03313d7f04b907f481787dadf0716fd --- nova/compute/manager.py | 141 +++++++++++++++++++++--------------------------- nova/compute/rpcapi.py | 37 ++++++++----- 2 files changed, 84 insertions(+), 94 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 6706f9238..326ef28de 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -69,6 +69,7 @@ from nova.openstack.common import jsonutils from nova.openstack.common import log as logging from nova.openstack.common.notifier import api as notifier from nova.openstack.common import rpc +from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common import timeutils from nova import utils from nova.virt import driver @@ -234,7 +235,7 @@ def _get_additional_capabilities(): class ComputeManager(manager.SchedulerDependentManager): """Manages the running instances from creation to destruction.""" - RPC_API_VERSION = '1.1' + RPC_API_VERSION = '1.2' def __init__(self, compute_driver=None, *args, **kwargs): """Load configuration options and connect to the hypervisor.""" @@ -1892,74 +1893,72 @@ class ComputeManager(manager.SchedulerDependentManager): except exception.NotFound: pass - @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) - def compare_cpu(self, context, cpu_info): - """Checks that the host cpu is compatible with a cpu given by xml. + def get_instance_disk_info(self, context, instance_name): + """Getting infomation of instance's current disk. + + Implementation nova.virt.libvirt.connection. :param context: security context - :param cpu_info: json string obtained from virConnect.getCapabilities - :returns: See driver.compare_cpu + :param instance_name: instance name """ - return self.driver.compare_cpu(cpu_info) + return self.driver.get_instance_disk_info(instance_name) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) - def create_shared_storage_test_file(self, context): - """Makes tmpfile under FLAGS.instance_path. - - This method creates a temporary file that acts as an indicator to - other compute nodes that utilize the same shared storage as this node. - (create|check|cleanup)_shared_storage_test_file() are a set and should - be run together. - - :param context: security context - :returns: tmpfile name(basename) + def compare_cpu(self, context, cpu_info): + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) - """ - dirpath = FLAGS.instances_path - fd, tmp_file = tempfile.mkstemp(dir=dirpath) - LOG.debug(_("Creating tmpfile %s to notify to other " - "compute nodes that they should mount " - "the same storage.") % tmp_file) - os.close(fd) - return os.path.basename(tmp_file) + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def create_shared_storage_test_file(self, context): + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def check_shared_storage_test_file(self, context, filename): - """Confirms existence of the tmpfile under FLAGS.instances_path. - Cannot confirm tmpfile return False. - - :param context: security context - :param filename: confirm existence of FLAGS.instances_path/thisfile - - """ - tmp_file = os.path.join(FLAGS.instances_path, filename) - if not os.path.exists(tmp_file): - return False - else: - return True + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def cleanup_shared_storage_test_file(self, context, filename): - """Removes existence of the tmpfile under FLAGS.instances_path. + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) - :param context: security context - :param filename: remove existence of FLAGS.instances_path/thisfile + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def check_can_live_migrate_destination(self, ctxt, instance_id, + block_migration=False, + disk_over_commit=False): + """Check if it is possible to execute live migration. + This runs checks on the destination host, and then calls + back to the source host to check the results. + + :param context: security context + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param block_migration: if true, prepare for block migration + :param disk_over_commit: if true, allow disk over commit """ - tmp_file = os.path.join(FLAGS.instances_path, filename) - os.remove(tmp_file) + instance_ref = self.db.instance_get(ctxt, instance_id) + dest_check_data = self.driver.check_can_live_migrate_destination(ctxt, + instance_ref, block_migration, disk_over_commit) + try: + self.compute_rpcapi.check_can_live_migrate_source(ctxt, + instance_ref, dest_check_data) + finally: + self.driver.check_can_live_migrate_destination_cleanup(ctxt, + dest_check_data) - def get_instance_disk_info(self, context, instance_name): - """Get information about instance's current disk. + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def check_can_live_migrate_source(self, ctxt, instance_id, + dest_check_data): + """Check if it is possible to execute live migration. - Implementation nova.virt.libvirt.connection. + This checks if the live migration can succeed, based on the + results from check_can_live_migrate_destination. :param context: security context - :param instance_name: instance name - + :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param dest_check_data: result of check_can_live_migrate_destination """ - return self.driver.get_instance_disk_info(instance_name) + instance_ref = self.db.instance_get(ctxt, instance_id) + self.driver.check_can_live_migrate_source(ctxt, instance_ref, + dest_check_data) def pre_live_migration(self, context, instance_id, block_migration=False, disk=None): @@ -1979,40 +1978,21 @@ class ComputeManager(manager.SchedulerDependentManager): if not block_device_info['block_device_mapping']: LOG.info(_('Instance has no volume.'), instance=instance_ref) - self.driver.pre_live_migration(block_device_info) - - # NOTE(tr3buchet): setup networks on destination host - self.network_api.setup_networks_on_host(context, instance_ref, - self.host) - - # Bridge settings. - # Call this method prior to ensure_filtering_rules_for_instance, - # since bridge is not set up, ensure_filtering_rules_for instance - # fails. - # - # Retry operation is necessary because continuously request comes, - # concorrent request occurs to iptables, then it complains. network_info = self._get_instance_nw_info(context, instance_ref) # TODO(tr3buchet): figure out how on the earth this is necessary fixed_ips = network_info.fixed_ips() if not fixed_ips: - raise exception.FixedIpNotFoundForInstance(instance_id=instance_id) + raise exception.FixedIpNotFoundForInstance( + instance_id=instance_id) - max_retry = FLAGS.live_migration_retry_count - for cnt in range(max_retry): - try: - self.driver.plug_vifs(instance_ref, - self._legacy_nw_info(network_info)) - break - except exception.ProcessExecutionError: - if cnt == max_retry - 1: - raise - else: - LOG.warn(_("plug_vifs() failed %(cnt)d." - "Retry up to %(max_retry)d for %(hostname)s.") - % locals(), instance=instance_ref) - time.sleep(1) + self.driver.pre_live_migration(context, instance_ref, + block_device_info, + self._legacy_nw_info(network_info)) + + # NOTE(tr3buchet): setup networks on destination host + self.network_api.setup_networks_on_host(context, instance_ref, + self.host) # Creating filters to hypervisors and firewalls. # An example is that nova-instance-instance-xxx, @@ -2083,12 +2063,11 @@ class ComputeManager(manager.SchedulerDependentManager): and mainly updating database record. :param ctxt: security context - :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param instance_ref: nova.db.sqlalchemy.models.Instance :param dest: destination host :param block_migration: if true, prepare for block migration """ - LOG.info(_('post_live_migration() is started..'), instance=instance_ref) @@ -2161,8 +2140,8 @@ class ComputeManager(manager.SchedulerDependentManager): "This error can be safely ignored."), instance=instance_ref) - def post_live_migration_at_destination(self, context, - instance_id, block_migration=False): + def post_live_migration_at_destination(self, context, instance_id, + block_migration=False): """Post operations for live migration . :param context: security context @@ -2204,7 +2183,7 @@ class ComputeManager(manager.SchedulerDependentManager): """Recovers Instance/volume state from migrating -> running. :param context: security context - :param instance_id: nova.db.sqlalchemy.models.Instance.Id + :param instance_ref: nova.db.sqlalchemy.models.Instance :param dest: This method is called from live migration src host. This param specifies destination host. diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index d740925a7..332b8153e 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -21,6 +21,7 @@ Client side of the compute RPC API. from nova import exception from nova import flags from nova.openstack.common import rpc +from nova.openstack.common.rpc import common as rpc_common import nova.openstack.common.rpc.proxy @@ -55,6 +56,7 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): 1.0 - Initial version. 1.1 - Adds get_host_uptime() + 1.2 - Adds check_can_live_migrate_[destination|source] ''' BASE_RPC_API_VERSION = '1.0' @@ -88,19 +90,33 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): mountpoint=mountpoint), topic=_compute_topic(self.topic, ctxt, None, instance)) + def check_can_live_migrate_destination(self, ctxt, instance, destination, + block_migration, disk_over_commit): + self.call(ctxt, self.make_msg('check_can_live_migrate_destination', + instance_id=instance['id'], + block_migration=block_migration, + disk_over_commit=disk_over_commit), + topic=_compute_topic(self.topic, ctxt, destination, None), + version='1.2') + + def check_can_live_migrate_source(self, ctxt, instance, dest_check_data): + self.call(ctxt, self.make_msg('check_can_live_migrate_source', + instance_id=instance['id'], + dest_check_data=dest_check_data), + topic=_compute_topic(self.topic, ctxt, None, instance), + version='1.2') + def check_shared_storage_test_file(self, ctxt, filename, host): - return self.call(ctxt, self.make_msg('check_shared_storage_test_file', - filename=filename), - topic=_compute_topic(self.topic, ctxt, host, None)) + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) def cleanup_shared_storage_test_file(self, ctxt, filename, host): - self.cast(ctxt, self.make_msg('cleanup_shared_storage_test_file', - filename=filename), - topic=_compute_topic(self.topic, ctxt, host, None)) + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) def compare_cpu(self, ctxt, cpu_info, host): - return self.call(ctxt, self.make_msg('compare_cpu', cpu_info=cpu_info), - topic=_compute_topic(self.topic, ctxt, host, None)) + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) + + def create_shared_storage_test_file(self, ctxt, host): + raise rpc_common.RPCException(message=_('Deprecated from version 1.2')) def confirm_resize(self, ctxt, instance, migration_id, host, cast=True): @@ -109,11 +125,6 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): instance_uuid=instance['uuid'], migration_id=migration_id), topic=_compute_topic(self.topic, ctxt, host, instance)) - def create_shared_storage_test_file(self, ctxt, host): - return self.call(ctxt, - self.make_msg('create_shared_storage_test_file'), - topic=_compute_topic(self.topic, ctxt, host, None)) - def detach_volume(self, ctxt, instance, volume_id): self.cast(ctxt, self.make_msg('detach_volume', instance_uuid=instance['uuid'], volume_id=volume_id), -- cgit