diff options
| author | masumotok <masumotok@nttdata.co.jp> | 2010-12-31 04:03:37 +0900 |
|---|---|---|
| committer | masumotok <masumotok@nttdata.co.jp> | 2010-12-31 04:03:37 +0900 |
| commit | bf7bc8725fcc26cc5074ea1628bcba6ac6093768 (patch) | |
| tree | dd845322f758c2353f295afa44d4e6455564ffff /nova/compute | |
| parent | 85acbbe916df8b2d18f0dc3a0b8cad9fcfdd6907 (diff) | |
| parent | bd6a3cb1acb68ac2252c1bafc531b3b12f9746d8 (diff) | |
merge recent revision(version of 2010/12/28)
Change:
1. Use greenthread instead of defer at nova.virt.libvirt_conn.live_migration.
2. Move nova.scheduler.manager.live_migration to nova.scheduler.driver
3. Move nova.scheduler.manager.has_enough_resource to nova.scheduler.driver
4. Any check routine in nova-manage.instance.live_migration is moved to
nova.scheduler.driver.schedule_live_migration.
Diffstat (limited to 'nova/compute')
| -rw-r--r-- | nova/compute/api.py | 241 | ||||
| -rw-r--r-- | nova/compute/disk.py | 106 | ||||
| -rw-r--r-- | nova/compute/instance_types.py | 3 | ||||
| -rw-r--r-- | nova/compute/manager.py | 239 | ||||
| -rw-r--r-- | nova/compute/monitor.py | 12 |
5 files changed, 398 insertions, 203 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index 929342a1e..038f52046 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -20,6 +20,7 @@ Handles all API requests relating to instances (guest vms). """ +import datetime import logging import time @@ -43,17 +44,37 @@ def generate_default_hostname(internal_id): class ComputeAPI(base.Base): """API for interacting with the compute manager.""" - def __init__(self, **kwargs): - self.network_manager = utils.import_object(FLAGS.network_manager) + def __init__(self, network_manager=None, image_service=None, **kwargs): + if not network_manager: + network_manager = utils.import_object(FLAGS.network_manager) + self.network_manager = network_manager + if not image_service: + image_service = utils.import_object(FLAGS.image_service) + self.image_service = image_service super(ComputeAPI, self).__init__(**kwargs) - # TODO(eday): network_topic arg should go away once we push network - # allocation into the scheduler or compute worker. - def create_instances(self, context, instance_type, image_service, image_id, - network_topic, min_count=1, max_count=1, - kernel_id=None, ramdisk_id=None, name='', - description='', user_data='', key_name=None, + def get_network_topic(self, context, instance_id): + try: + instance = self.db.instance_get_by_internal_id(context, + instance_id) + except exception.NotFound as e: + logging.warning("Instance %d was not found in get_network_topic", + instance_id) + raise e + + host = instance['host'] + if not host: + raise exception.Error("Instance %d has no host" % instance_id) + topic = self.db.queue_get_for(context, FLAGS.compute_topic, host) + return rpc.call(context, + topic, + {"method": "get_network_topic", "args": {'fake': 1}}) + + def create_instances(self, context, instance_type, image_id, min_count=1, + max_count=1, kernel_id=None, ramdisk_id=None, + display_name='', description='', key_name=None, key_data=None, security_group='default', + user_data=None, generate_hostname=generate_default_hostname): """Create the number of instances requested if quote and other arguments check out ok.""" @@ -69,15 +90,20 @@ class ComputeAPI(base.Base): is_vpn = image_id == FLAGS.vpn_image_id if not is_vpn: - image = image_service.show(context, image_id) + image = self.image_service.show(context, image_id) + + # If kernel_id/ramdisk_id isn't explicitly set in API call + # we take the defaults from the image's metadata if kernel_id is None: - kernel_id = image.get('kernelId', FLAGS.default_kernel) + kernel_id = image.get('kernelId', None) if ramdisk_id is None: - ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk) + ramdisk_id = image.get('ramdiskId', None) # Make sure we have access to kernel and ramdisk - image_service.show(context, kernel_id) - image_service.show(context, ramdisk_id) + if kernel_id: + self.image_service.show(context, kernel_id) + if ramdisk_id: + self.image_service.show(context, ramdisk_id) if security_group is None: security_group = ['default'] @@ -100,8 +126,8 @@ class ComputeAPI(base.Base): base_options = { 'reservation_id': utils.generate_uid('r'), 'image_id': image_id, - 'kernel_id': kernel_id, - 'ramdisk_id': ramdisk_id, + 'kernel_id': kernel_id or '', + 'ramdisk_id': ramdisk_id or '', 'state_description': 'scheduling', 'user_id': context.user_id, 'project_id': context.project_id, @@ -110,41 +136,41 @@ class ComputeAPI(base.Base): 'memory_mb': type_data['memory_mb'], 'vcpus': type_data['vcpus'], 'local_gb': type_data['local_gb'], - 'display_name': name, + 'display_name': display_name, 'display_description': description, + 'user_data': user_data or '', 'key_name': key_name, 'key_data': key_data} elevated = context.elevated() instances = [] - logging.debug("Going to run %s instances...", num_instances) + logging.debug(_("Going to run %s instances..."), num_instances) for num in range(num_instances): instance = dict(mac_address=utils.generate_mac(), launch_index=num, **base_options) - instance_ref = self.create_instance(context, security_groups, - **instance) - instance_id = instance_ref['id'] - internal_id = instance_ref['internal_id'] - hostname = generate_hostname(internal_id) - self.update_instance(context, instance_id, hostname=hostname) - instances.append(dict(id=instance_id, internal_id=internal_id, - hostname=hostname, **instance)) - - # TODO(vish): This probably should be done in the scheduler - # or in compute as a call. The network should be - # allocated after the host is assigned and setup - # can happen at the same time. - address = self.network_manager.allocate_fixed_ip(context, - instance_id, - is_vpn) - rpc.cast(elevated, - network_topic, - {"method": "setup_fixed_ip", - "args": {"address": address}}) - - logging.debug("Casting to scheduler for %s/%s's instance %s" % - (context.project_id, context.user_id, instance_id)) + instance = self.db.instance_create(context, instance) + instance_id = instance['id'] + internal_id = instance['internal_id'] + + elevated = context.elevated() + if not security_groups: + security_groups = [] + for security_group_id in security_groups: + self.db.instance_add_security_group(elevated, + instance_id, + security_group_id) + + # Set sane defaults if not specified + updates = dict(hostname=generate_hostname(internal_id)) + if 'display_name' not in instance: + updates['display_name'] = "Server %s" % internal_id + + instance = self.update_instance(context, instance_id, **updates) + instances.append(instance) + + logging.debug(_("Casting to scheduler for %s/%s's instance %s"), + context.project_id, context.user_id, instance_id) rpc.cast(context, FLAGS.scheduler_topic, {"method": "run_instance", @@ -154,6 +180,12 @@ class ComputeAPI(base.Base): return instances def ensure_default_security_group(self, context): + """ Create security group for the security context if it + does not already exist + + :param context: the security context + + """ try: db.security_group_get_by_name(context, context.project_id, 'default') @@ -162,40 +194,7 @@ class ComputeAPI(base.Base): 'description': 'default', 'user_id': context.user_id, 'project_id': context.project_id} - group = db.security_group_create(context, values) - - def create_instance(self, context, security_groups=None, **kwargs): - """Creates the instance in the datastore and returns the - new instance as a mapping - - :param context: The security context - :param security_groups: list of security group ids to - attach to the instance - :param kwargs: All additional keyword args are treated - as data fields of the instance to be - created - - :retval Returns a mapping of the instance information - that has just been created - - """ - instance_ref = self.db.instance_create(context, kwargs) - inst_id = instance_ref['id'] - # Set sane defaults if not specified - if kwargs.get('display_name') is None: - display_name = "Server %s" % instance_ref['internal_id'] - instance_ref['display_name'] = display_name - self.db.instance_update(context, inst_id, - {'display_name': display_name}) - - elevated = context.elevated() - if not security_groups: - security_groups = [] - for security_group_id in security_groups: - self.db.instance_add_security_group(elevated, - inst_id, - security_group_id) - return instance_ref + db.security_group_create(context, values) def update_instance(self, context, instance_id, **kwargs): """Updates the instance in the datastore. @@ -209,4 +208,96 @@ class ComputeAPI(base.Base): :retval None """ - self.db.instance_update(context, instance_id, kwargs) + return self.db.instance_update(context, instance_id, kwargs) + + def delete_instance(self, context, instance_id): + logging.debug("Going to try and terminate %d" % instance_id) + try: + instance = self.db.instance_get_by_internal_id(context, + instance_id) + except exception.NotFound as e: + logging.warning(_("Instance %d was not found during terminate"), + instance_id) + raise e + + if (instance['state_description'] == 'terminating'): + logging.warning(_("Instance %d is already being terminated"), + instance_id) + return + + self.update_instance(context, + instance['id'], + state_description='terminating', + state=0, + terminated_at=datetime.datetime.utcnow()) + + host = instance['host'] + logging.error('terminate %s %s %s %s',context, FLAGS.compute_topic, host, self.db.queue_get_for(context, FLAGS.compute_topic, host)) + if host: + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "terminate_instance", + "args": {"instance_id": instance['id']}}) + else: + self.db.instance_destroy(context, instance['id']) + + def get_instances(self, context, project_id=None): + """Get all instances, possibly filtered by project ID or + user ID. If there is no filter and the context is an admin, + it will retreive all instances in the system.""" + if project_id or not context.is_admin: + if not context.project: + return self.db.instance_get_all_by_user(context, + context.user_id) + if project_id is None: + project_id = context.project_id + return self.db.instance_get_all_by_project(context, project_id) + return self.db.instance_get_all(context) + + def get_instance(self, context, instance_id): + return self.db.instance_get_by_internal_id(context, instance_id) + + def reboot(self, context, instance_id): + """Reboot the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "reboot_instance", + "args": {"instance_id": instance['id']}}) + + def pause(self, context, instance_id): + """Pause the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "pause_instance", + "args": {"instance_id": instance['id']}}) + + def unpause(self, context, instance_id): + """Unpause the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "unpause_instance", + "args": {"instance_id": instance['id']}}) + + def rescue(self, context, instance_id): + """Rescue the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "rescue_instance", + "args": {"instance_id": instance['id']}}) + + def unrescue(self, context, instance_id): + """Unrescue the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "unrescue_instance", + "args": {"instance_id": instance['id']}}) diff --git a/nova/compute/disk.py b/nova/compute/disk.py index 4338d39f0..814a258cd 100644 --- a/nova/compute/disk.py +++ b/nova/compute/disk.py @@ -26,8 +26,6 @@ import logging import os import tempfile -from twisted.internet import defer - from nova import exception from nova import flags @@ -39,7 +37,6 @@ flags.DEFINE_integer('block_size', 1024 * 1024 * 256, 'block_size to use for dd') -@defer.inlineCallbacks def partition(infile, outfile, local_bytes=0, resize=True, local_type='ext2', execute=None): """ @@ -64,18 +61,18 @@ def partition(infile, outfile, local_bytes=0, resize=True, file_size = os.path.getsize(infile) if resize and file_size < FLAGS.minimum_root_size: last_sector = FLAGS.minimum_root_size / sector_size - 1 - yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' - % (infile, last_sector, sector_size)) - yield execute('e2fsck -fp %s' % infile, check_exit_code=False) - yield execute('resize2fs %s' % infile) + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (infile, last_sector, sector_size)) + execute('e2fsck -fp %s' % infile, check_exit_code=False) + execute('resize2fs %s' % infile) file_size = FLAGS.minimum_root_size elif file_size % sector_size != 0: - logging.warn("Input partition size not evenly divisible by" - " sector size: %d / %d", file_size, sector_size) + logging.warn(_("Input partition size not evenly divisible by" + " sector size: %d / %d"), file_size, sector_size) primary_sectors = file_size / sector_size if local_bytes % sector_size != 0: - logging.warn("Bytes for local storage not evenly divisible" - " by sector size: %d / %d", local_bytes, sector_size) + logging.warn(_("Bytes for local storage not evenly divisible" + " by sector size: %d / %d"), local_bytes, sector_size) local_sectors = local_bytes / sector_size mbr_last = 62 # a @@ -86,30 +83,36 @@ def partition(infile, outfile, local_bytes=0, resize=True, last_sector = local_last # e # create an empty file - yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' - % (outfile, mbr_last, sector_size)) + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (outfile, mbr_last, sector_size)) # make mbr partition - yield execute('parted --script %s mklabel msdos' % outfile) + execute('parted --script %s mklabel msdos' % outfile) # append primary file - yield execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append' - % (infile, outfile, FLAGS.block_size)) + execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append' + % (infile, outfile, FLAGS.block_size)) # make primary partition - yield execute('parted --script %s mkpart primary %ds %ds' - % (outfile, primary_first, primary_last)) + execute('parted --script %s mkpart primary %ds %ds' + % (outfile, primary_first, primary_last)) if local_bytes > 0: # make the file bigger - yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' - % (outfile, last_sector, sector_size)) + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (outfile, last_sector, sector_size)) # make and format local partition - yield execute('parted --script %s mkpartfs primary %s %ds %ds' - % (outfile, local_type, local_first, local_last)) + execute('parted --script %s mkpartfs primary %s %ds %ds' + % (outfile, local_type, local_first, local_last)) + + +def extend(image, size, execute): + file_size = os.path.getsize(image) + if file_size >= size: + return + return execute('truncate -s size %s' % (image,)) -@defer.inlineCallbacks def inject_data(image, key=None, net=None, partition=None, execute=None): """Injects a ssh key and optionally net data into a disk image. @@ -119,74 +122,83 @@ def inject_data(image, key=None, net=None, partition=None, execute=None): If partition is not specified it mounts the image as a single partition. """ - out, err = yield execute('sudo losetup -f --show %s' % image) + out, err = execute('sudo losetup --find --show %s' % image) if err: - raise exception.Error('Could not attach image to loopback: %s' % err) + raise exception.Error(_('Could not attach image to loopback: %s') + % err) device = out.strip() try: if not partition is None: # create partition - out, err = yield execute('sudo kpartx -a %s' % device) + out, err = execute('sudo kpartx -a %s' % device) if err: - raise exception.Error('Failed to load partition: %s' % err) + raise exception.Error(_('Failed to load partition: %s') % err) mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1], partition) else: mapped_device = device - out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device) + + # We can only loopback mount raw images. If the device isn't there, + # it's normally because it's a .vmdk or a .vdi etc + if not os.path.exists(mapped_device): + raise exception.Error('Mapped device was not found (we can' + ' only inject raw disk images): %s' % + mapped_device) + + # Configure ext2fs so that it doesn't auto-check every N boots + out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device) tmpdir = tempfile.mkdtemp() try: # mount loopback to dir - out, err = yield execute( + out, err = execute( 'sudo mount %s %s' % (mapped_device, tmpdir)) if err: - raise exception.Error('Failed to mount filesystem: %s' % err) + raise exception.Error(_('Failed to mount filesystem: %s') + % err) try: if key: # inject key file - yield _inject_key_into_fs(key, tmpdir, execute=execute) + _inject_key_into_fs(key, tmpdir, execute=execute) if net: - yield _inject_net_into_fs(net, tmpdir, execute=execute) + _inject_net_into_fs(net, tmpdir, execute=execute) finally: # unmount device - yield execute('sudo umount %s' % mapped_device) + execute('sudo umount %s' % mapped_device) finally: # remove temporary directory - yield execute('rmdir %s' % tmpdir) + execute('rmdir %s' % tmpdir) if not partition is None: # remove partitions - yield execute('sudo kpartx -d %s' % device) + execute('sudo kpartx -d %s' % device) finally: # remove loopback - yield execute('sudo losetup -d %s' % device) + execute('sudo losetup --detach %s' % device) -@defer.inlineCallbacks def _inject_key_into_fs(key, fs, execute=None): """Add the given public ssh key to root's authorized_keys. key is an ssh key string. fs is the path to the base of the filesystem into which to inject the key. """ - sshdir = os.path.join(os.path.join(fs, 'root'), '.ssh') - yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter - yield execute('sudo chown root %s' % sshdir) - yield execute('sudo chmod 700 %s' % sshdir) + sshdir = os.path.join(fs, 'root', '.ssh') + execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter + execute('sudo chown root %s' % sshdir) + execute('sudo chmod 700 %s' % sshdir) keyfile = os.path.join(sshdir, 'authorized_keys') - yield execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n') + execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n') -@defer.inlineCallbacks def _inject_net_into_fs(net, fs, execute=None): """Inject /etc/network/interfaces into the filesystem rooted at fs. net is the contents of /etc/network/interfaces. """ netdir = os.path.join(os.path.join(fs, 'etc'), 'network') - yield execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter - yield execute('sudo chown root:root %s' % netdir) - yield execute('sudo chmod 755 %s' % netdir) + execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter + execute('sudo chown root:root %s' % netdir) + execute('sudo chmod 755 %s' % netdir) netfile = os.path.join(netdir, 'interfaces') - yield execute('sudo tee %s' % netfile, net) + execute('sudo tee %s' % netfile, net) diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py index a2679e0fc..196d6a8df 100644 --- a/nova/compute/instance_types.py +++ b/nova/compute/instance_types.py @@ -22,6 +22,7 @@ The built-in instance properties. """ from nova import flags +from nova import exception FLAGS = flags.FLAGS INSTANCE_TYPES = { @@ -37,7 +38,7 @@ def get_by_type(instance_type): if instance_type is None: return FLAGS.default_instance_type if instance_type not in INSTANCE_TYPES: - raise exception.ApiError("Unknown instance type: %s", + raise exception.ApiError(_("Unknown instance type: %s"), instance_type) return instance_type diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 66bbb8d5a..dc660d2a6 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -42,11 +42,10 @@ import os import time -from twisted.internet import defer - from nova import exception from nova import flags from nova import manager +from nova import rpc from nova import utils from nova.compute import power_state from nova import rpc @@ -57,8 +56,10 @@ flags.DEFINE_string('instances_path', '$state_path/instances', 'where instances are stored on disk') flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection', 'Driver to use for controlling virtualization') -flags.DEFINE_string('live_migration_timeout', 30, +flags.DEFINE_string('live_migration_timeout', 10, 'Timeout value for pre_live_migration is completed.') +flags.DEFINE_string('stub_network', False, + 'Stub network related code') class ComputeManager(manager.Manager): @@ -76,6 +77,12 @@ class ComputeManager(manager.Manager): self.volume_manager = utils.import_object(FLAGS.volume_manager) super(ComputeManager, self).__init__(*args, **kwargs) + def init_host(self): + """Do any initialization that needs to be run if this is a + standalone service. + """ + self.driver.init_host() + def _update_state(self, context, instance_id): """Update the state of an instance from the driver info.""" # FIXME(ja): include other fields from state? @@ -87,26 +94,56 @@ class ComputeManager(manager.Manager): state = power_state.NOSTATE self.db.instance_set_state(context, instance_id, state) - @defer.inlineCallbacks + def get_network_topic(self, context, **_kwargs): + """Retrieves the network host for a project on this host""" + # TODO(vish): This method should be memoized. This will make + # the call to get_network_host cheaper, so that + # it can pas messages instead of checking the db + # locally. + if FLAGS.stub_network: + host = FLAGS.network_host + else: + host = self.network_manager.get_network_host(context) + return self.db.queue_get_for(context, + FLAGS.network_topic, + host) + @exception.wrap_exception def refresh_security_group(self, context, security_group_id, **_kwargs): """This call passes stright through to the virtualization driver.""" - yield self.driver.refresh_security_group(security_group_id) + self.driver.refresh_security_group(security_group_id) - @defer.inlineCallbacks @exception.wrap_exception def run_instance(self, context, instance_id, **_kwargs): """Launch a new instance with specified options.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) if instance_ref['name'] in self.driver.list_instances(): - raise exception.Error("Instance has already been created") - logging.debug("instance %s: starting...", instance_id) - self.network_manager.setup_compute_network(context, instance_id) + raise exception.Error(_("Instance has already been created")) self.db.instance_update(context, instance_id, {'host': self.host, 'launched_on':self.host}) + self.db.instance_set_state(context, + instance_id, + power_state.NOSTATE, + 'networking') + + is_vpn = instance_ref['image_id'] == FLAGS.vpn_image_id + # NOTE(vish): This could be a cast because we don't do anything + # with the address currently, but I'm leaving it as + # a call to ensure that network setup completes. We + # will eventually also need to save the address here. + if not FLAGS.stub_network: + address = rpc.call(context, + self.get_network_topic(context), + {"method": "allocate_fixed_ip", + "args": {"instance_id": instance_id, + "vpn": is_vpn}}) + + self.network_manager.setup_compute_network(context, + instance_id) + # TODO(vish) check to make sure the availability zone matches self.db.instance_set_state(context, instance_id, @@ -114,13 +151,13 @@ class ComputeManager(manager.Manager): 'spawning') try: - yield self.driver.spawn(instance_ref) + self.driver.spawn(instance_ref) now = datetime.datetime.utcnow() self.db.instance_update(context, instance_id, {'launched_at': now}) except Exception: # pylint: disable-msg=W0702 - logging.exception("instance %s: Failed to spawn", + logging.exception(_("instance %s: Failed to spawn"), instance_ref['name']) self.db.instance_set_state(context, instance_id, @@ -128,104 +165,167 @@ class ComputeManager(manager.Manager): self._update_state(context, instance_id) - @defer.inlineCallbacks @exception.wrap_exception def terminate_instance(self, context, instance_id): """Terminate an instance on this machine.""" context = context.elevated() - logging.debug("instance %s: terminating", instance_id) instance_ref = self.db.instance_get(context, instance_id) + + if not FLAGS.stub_network: + address = self.db.instance_get_floating_address(context, + instance_ref['id']) + if address: + logging.debug(_("Disassociating address %s") % address) + # NOTE(vish): Right now we don't really care if the ip is + # disassociated. We may need to worry about + # checking this later. + rpc.cast(context, + self.get_network_topic(context), + {"method": "disassociate_floating_ip", + "args": {"floating_address": address}}) + + address = self.db.instance_get_fixed_address(context, + instance_ref['id']) + if address: + logging.debug(_("Deallocating address %s") % address) + # NOTE(vish): Currently, nothing needs to be done on the + # network node until release. If this changes, + # we will need to cast here. + self.network_manager.deallocate_fixed_ip(context.elevated(), + address) + + logging.debug(_("instance %s: terminating"), instance_id) + volumes = instance_ref.get('volumes', []) or [] for volume in volumes: self.detach_volume(context, instance_id, volume['id']) if instance_ref['state'] == power_state.SHUTOFF: self.db.instance_destroy(context, instance_id) - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % instance_id) - yield self.driver.destroy(instance_ref) + raise exception.Error(_('trying to destroy already destroyed' + ' instance: %s') % instance_id) + self.driver.destroy(instance_ref) # TODO(ja): should we keep it in a terminated state for a bit? self.db.instance_destroy(context, instance_id) - @defer.inlineCallbacks @exception.wrap_exception def reboot_instance(self, context, instance_id): """Reboot an instance on this server.""" context = context.elevated() - instance_ref = self.db.instance_get(context, instance_id) self._update_state(context, instance_id) + instance_ref = self.db.instance_get(context, instance_id) if instance_ref['state'] != power_state.RUNNING: - logging.warn('trying to reboot a non-running ' - 'instance: %s (state: %s excepted: %s)', + logging.warn(_('trying to reboot a non-running ' + 'instance: %s (state: %s excepted: %s)'), instance_ref['internal_id'], instance_ref['state'], power_state.RUNNING) - logging.debug('instance %s: rebooting', instance_ref['name']) + logging.debug(_('instance %s: rebooting'), instance_ref['name']) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, 'rebooting') - yield self.driver.reboot(instance_ref) + self.network_manager.setup_compute_network(context, instance_id) + self.driver.reboot(instance_ref) self._update_state(context, instance_id) - @defer.inlineCallbacks @exception.wrap_exception def rescue_instance(self, context, instance_id): """Rescue an instance on this server.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug('instance %s: rescuing', + logging.debug(_('instance %s: rescuing'), instance_ref['internal_id']) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, 'rescuing') - yield self.driver.rescue(instance_ref) + self.network_manager.setup_compute_network(context, instance_id) + self.driver.rescue(instance_ref) self._update_state(context, instance_id) - @defer.inlineCallbacks @exception.wrap_exception def unrescue_instance(self, context, instance_id): """Rescue an instance on this server.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug('instance %s: unrescuing', + logging.debug(_('instance %s: unrescuing'), instance_ref['internal_id']) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, 'unrescuing') - yield self.driver.unrescue(instance_ref) + self.driver.unrescue(instance_ref) self._update_state(context, instance_id) + @staticmethod + def _update_state_callback(self, context, instance_id, result): + """Update instance state when async task completes.""" + self._update_state(context, instance_id) + + @exception.wrap_exception + def pause_instance(self, context, instance_id): + """Pause an instance on this server.""" + context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) + + logging.debug('instance %s: pausing', + instance_ref['internal_id']) + self.db.instance_set_state(context, + instance_id, + power_state.NOSTATE, + 'pausing') + self.driver.pause(instance_ref, + lambda result: self._update_state_callback(self, + context, + instance_id, + result)) + + @exception.wrap_exception + def unpause_instance(self, context, instance_id): + """Unpause a paused instance on this server.""" + context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) + + logging.debug('instance %s: unpausing', + instance_ref['internal_id']) + self.db.instance_set_state(context, + instance_id, + power_state.NOSTATE, + 'unpausing') + self.driver.unpause(instance_ref, + lambda result: self._update_state_callback(self, + context, + instance_id, + result)) + @exception.wrap_exception def get_console_output(self, context, instance_id): """Send the console output for an instance.""" context = context.elevated() - logging.debug("instance %s: getting console output", instance_id) + logging.debug(_("instance %s: getting console output"), instance_id) instance_ref = self.db.instance_get(context, instance_id) return self.driver.get_console_output(instance_ref) - @defer.inlineCallbacks @exception.wrap_exception def attach_volume(self, context, instance_id, volume_id, mountpoint): """Attach a volume to an instance.""" context = context.elevated() - logging.debug("instance %s: attaching volume %s to %s", instance_id, + logging.debug(_("instance %s: attaching volume %s to %s"), instance_id, volume_id, mountpoint) instance_ref = self.db.instance_get(context, instance_id) - dev_path = yield self.volume_manager.setup_compute_volume(context, - volume_id) + dev_path = self.volume_manager.setup_compute_volume(context, + volume_id) try: - yield self.driver.attach_volume(instance_ref['name'], - dev_path, - mountpoint) + self.driver.attach_volume(instance_ref['name'], + dev_path, + mountpoint) self.db.volume_attached(context, volume_id, instance_id, @@ -234,36 +334,35 @@ class ComputeManager(manager.Manager): # NOTE(vish): The inline callback eats the exception info so we # log the traceback here and reraise the same # ecxception below. - logging.exception("instance %s: attach failed %s, removing", + logging.exception(_("instance %s: attach failed %s, removing"), instance_id, mountpoint) - yield self.volume_manager.remove_compute_volume(context, - volume_id) + self.volume_manager.remove_compute_volume(context, + volume_id) raise exc - defer.returnValue(True) - @defer.inlineCallbacks + return True + @exception.wrap_exception def detach_volume(self, context, instance_id, volume_id): """Detach a volume from an instance.""" context = context.elevated() - logging.debug("instance %s: detaching volume %s", + logging.debug(_("instance %s: detaching volume %s"), instance_id, volume_id) instance_ref = self.db.instance_get(context, instance_id) volume_ref = self.db.volume_get(context, volume_id) if instance_ref['name'] not in self.driver.list_instances(): - logging.warn("Detaching volume from unknown instance %s", + logging.warn(_("Detaching volume from unknown instance %s"), instance_ref['name']) else: - yield self.driver.detach_volume(instance_ref['name'], - volume_ref['mountpoint']) - yield self.volume_manager.remove_compute_volume(context, volume_id) + self.driver.detach_volume(instance_ref['name'], + volume_ref['mountpoint']) + self.volume_manager.remove_compute_volume(context, volume_id) self.db.volume_detached(context, volume_id) - defer.returnValue(True) + return True def compareCPU(self, context, xml): """ Check the host cpu is compatible to a cpu given by xml.""" - logging.warn('good!') return self.driver.compareCPU(xml) def get_memory_mb(self): @@ -281,7 +380,7 @@ class ComputeManager(manager.Manager): def pre_live_migration(self, context, instance_id, dest): """Any preparation for live migration at dst host.""" - # 1. getting volume info ( shlf/slot number ) + # Getting volume info ( shlf/slot number ) instance_ref = db.instance_get(context, instance_id) ec2_id = instance_ref['hostname'] @@ -289,40 +388,35 @@ class ComputeManager(manager.Manager): try: volumes = db.volume_get_by_ec2_id(context, ec2_id) except exception.NotFound: - logging.debug('%s has no volume.', ec2_id) + logging.info(_('%s has no volume.'), ec2_id) shelf_slots = {} for vol in volumes: shelf, slot = db.volume_get_shelf_and_blade(context, vol['id']) shelf_slots[vol.id] = (shelf, slot) - # 2. getting fixed ips + # Getting fixed ips fixed_ip = db.instance_get_fixed_address(context, instance_id) if None == fixed_ip: exc_type = 'NotFoundError' - val = '%s(%s) doesnt have fixed_ip ' % (instance_id, ec2_id) + val = _('%s(%s) doesnt have fixed_ip') % (instance_id, ec2_id) tb = ''.join(traceback.format_tb(sys.exc_info()[2])) raise rpc.RemoteError(exc_type, val, tb) - # 3. if any volume is mounted, prepare here. + # If any volume is mounted, prepare here. if 0 != len(shelf_slots): pass - # 4. Creating nova-instance-instance-xxx, this is written to libvirt.xml, - # and can be seen when executin "virsh nwfiter-list" On destination host, - # this nwfilter is necessary. - # In addition this method is creating security rule ingress rule onto - # destination host. + # Creating nova-instance-instance-xxx, this is written to libvirt.xml, + # and can be seen when executin "virsh nwfiter-list" On destination host, + # this nwfilter is necessary. + # In addition this method is creating security rule ingress rule onto + # destination host. self.driver.setup_nwfilters_for_instance(instance_ref) # 5. bridge settings -<<<<<<< TREE - self.network_manager.setup_compute_network(instance_id) - return True -======= self.network_manager.setup_compute_network(context, instance_id) return True ->>>>>>> MERGE-SOURCE def nwfilter_for_instance_exists(self, context, instance_id): """Check nova-instance-instance-xxx filter exists """ @@ -332,7 +426,7 @@ class ComputeManager(manager.Manager): def live_migration(self, context, instance_id, dest): """executes live migration.""" - # 1. ask dest host to preparing live migration. + # Asking dest host to preparing live migration. compute_topic = db.queue_get_for(context, FLAGS.compute_topic, dest) ret = rpc.call(context, compute_topic, @@ -341,16 +435,16 @@ class ComputeManager(manager.Manager): 'dest': dest}}) if True != ret: - logging.error('Live migration failed(err at %s)', dest) + logging.error(_('Pre live migration failed(err at %s)'), dest) db.instance_set_state(context, instance_id, power_state.RUNNING, 'running') return - # waiting for setting up nwfilter(nova-instance-instance-xxx) + # Waiting for setting up nwfilter such as, nova-instance-instance-xxx. # otherwise, live migration fail. - timeout_count = range(FLAGS.live_migration_timeout) + timeout_count = range(FLAGS.live_migration_timeout * 2) while 0 != len(timeout_count): ret = rpc.call(context, compute_topic, @@ -360,17 +454,14 @@ class ComputeManager(manager.Manager): break timeout_count.pop() - time.sleep(1) + time.sleep(0.5) if not ret: - logging.error('Timeout for pre_live_migration at %s', dest) + logging.error(_('Timeout for pre_live_migration at %s'), dest) return - # 2. executing live migration + # Executing live migration # live_migration might raises ProcessExecution error, but # nothing must be recovered in this version. instance_ref = db.instance_get(context, instance_id) - ret = self.driver.live_migration(instance_ref, dest) - if not ret: - logging.debug('Fail to live migration') - return + self.driver.live_migration(context, instance_ref, dest) diff --git a/nova/compute/monitor.py b/nova/compute/monitor.py index 22653113a..60c347a5e 100644 --- a/nova/compute/monitor.py +++ b/nova/compute/monitor.py @@ -255,7 +255,7 @@ class Instance(object): Updates the instances statistics and stores the resulting graphs in the internal object store on the cloud controller. """ - logging.debug('updating %s...', self.instance_id) + logging.debug(_('updating %s...'), self.instance_id) try: data = self.fetch_cpu_stats() @@ -285,7 +285,7 @@ class Instance(object): graph_disk(self, '1w') graph_disk(self, '1m') except Exception: - logging.exception('unexpected error during update') + logging.exception(_('unexpected error during update')) self.last_updated = utcnow() @@ -351,7 +351,7 @@ class Instance(object): rd += rd_bytes wr += wr_bytes except TypeError: - logging.error('Cannot get blockstats for "%s" on "%s"', + logging.error(_('Cannot get blockstats for "%s" on "%s"'), disk, self.instance_id) raise @@ -373,7 +373,7 @@ class Instance(object): rx += stats[0] tx += stats[4] except TypeError: - logging.error('Cannot get ifstats for "%s" on "%s"', + logging.error(_('Cannot get ifstats for "%s" on "%s"'), interface, self.instance_id) raise @@ -408,7 +408,7 @@ class InstanceMonitor(object, service.Service): try: conn = virt_connection.get_connection(read_only=True) except Exception, exn: - logging.exception('unexpected exception getting connection') + logging.exception(_('unexpected exception getting connection')) time.sleep(FLAGS.monitoring_instances_delay) return @@ -423,7 +423,7 @@ class InstanceMonitor(object, service.Service): if not domain_id in self._instances: instance = Instance(conn, domain_id) self._instances[domain_id] = instance - logging.debug('Found instance: %s', domain_id) + logging.debug(_('Found instance: %s'), domain_id) for key in self._instances.keys(): instance = self._instances[key] |
