summaryrefslogtreecommitdiffstats
path: root/nova/compute
diff options
context:
space:
mode:
authormasumotok <masumotok@nttdata.co.jp>2010-12-31 04:03:37 +0900
committermasumotok <masumotok@nttdata.co.jp>2010-12-31 04:03:37 +0900
commitbf7bc8725fcc26cc5074ea1628bcba6ac6093768 (patch)
treedd845322f758c2353f295afa44d4e6455564ffff /nova/compute
parent85acbbe916df8b2d18f0dc3a0b8cad9fcfdd6907 (diff)
parentbd6a3cb1acb68ac2252c1bafc531b3b12f9746d8 (diff)
merge recent revision(version of 2010/12/28)
Change: 1. Use greenthread instead of defer at nova.virt.libvirt_conn.live_migration. 2. Move nova.scheduler.manager.live_migration to nova.scheduler.driver 3. Move nova.scheduler.manager.has_enough_resource to nova.scheduler.driver 4. Any check routine in nova-manage.instance.live_migration is moved to nova.scheduler.driver.schedule_live_migration.
Diffstat (limited to 'nova/compute')
-rw-r--r--nova/compute/api.py241
-rw-r--r--nova/compute/disk.py106
-rw-r--r--nova/compute/instance_types.py3
-rw-r--r--nova/compute/manager.py239
-rw-r--r--nova/compute/monitor.py12
5 files changed, 398 insertions, 203 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 929342a1e..038f52046 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -20,6 +20,7 @@
Handles all API requests relating to instances (guest vms).
"""
+import datetime
import logging
import time
@@ -43,17 +44,37 @@ def generate_default_hostname(internal_id):
class ComputeAPI(base.Base):
"""API for interacting with the compute manager."""
- def __init__(self, **kwargs):
- self.network_manager = utils.import_object(FLAGS.network_manager)
+ def __init__(self, network_manager=None, image_service=None, **kwargs):
+ if not network_manager:
+ network_manager = utils.import_object(FLAGS.network_manager)
+ self.network_manager = network_manager
+ if not image_service:
+ image_service = utils.import_object(FLAGS.image_service)
+ self.image_service = image_service
super(ComputeAPI, self).__init__(**kwargs)
- # TODO(eday): network_topic arg should go away once we push network
- # allocation into the scheduler or compute worker.
- def create_instances(self, context, instance_type, image_service, image_id,
- network_topic, min_count=1, max_count=1,
- kernel_id=None, ramdisk_id=None, name='',
- description='', user_data='', key_name=None,
+ def get_network_topic(self, context, instance_id):
+ try:
+ instance = self.db.instance_get_by_internal_id(context,
+ instance_id)
+ except exception.NotFound as e:
+ logging.warning("Instance %d was not found in get_network_topic",
+ instance_id)
+ raise e
+
+ host = instance['host']
+ if not host:
+ raise exception.Error("Instance %d has no host" % instance_id)
+ topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
+ return rpc.call(context,
+ topic,
+ {"method": "get_network_topic", "args": {'fake': 1}})
+
+ def create_instances(self, context, instance_type, image_id, min_count=1,
+ max_count=1, kernel_id=None, ramdisk_id=None,
+ display_name='', description='', key_name=None,
key_data=None, security_group='default',
+ user_data=None,
generate_hostname=generate_default_hostname):
"""Create the number of instances requested if quote and
other arguments check out ok."""
@@ -69,15 +90,20 @@ class ComputeAPI(base.Base):
is_vpn = image_id == FLAGS.vpn_image_id
if not is_vpn:
- image = image_service.show(context, image_id)
+ image = self.image_service.show(context, image_id)
+
+ # If kernel_id/ramdisk_id isn't explicitly set in API call
+ # we take the defaults from the image's metadata
if kernel_id is None:
- kernel_id = image.get('kernelId', FLAGS.default_kernel)
+ kernel_id = image.get('kernelId', None)
if ramdisk_id is None:
- ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
+ ramdisk_id = image.get('ramdiskId', None)
# Make sure we have access to kernel and ramdisk
- image_service.show(context, kernel_id)
- image_service.show(context, ramdisk_id)
+ if kernel_id:
+ self.image_service.show(context, kernel_id)
+ if ramdisk_id:
+ self.image_service.show(context, ramdisk_id)
if security_group is None:
security_group = ['default']
@@ -100,8 +126,8 @@ class ComputeAPI(base.Base):
base_options = {
'reservation_id': utils.generate_uid('r'),
'image_id': image_id,
- 'kernel_id': kernel_id,
- 'ramdisk_id': ramdisk_id,
+ 'kernel_id': kernel_id or '',
+ 'ramdisk_id': ramdisk_id or '',
'state_description': 'scheduling',
'user_id': context.user_id,
'project_id': context.project_id,
@@ -110,41 +136,41 @@ class ComputeAPI(base.Base):
'memory_mb': type_data['memory_mb'],
'vcpus': type_data['vcpus'],
'local_gb': type_data['local_gb'],
- 'display_name': name,
+ 'display_name': display_name,
'display_description': description,
+ 'user_data': user_data or '',
'key_name': key_name,
'key_data': key_data}
elevated = context.elevated()
instances = []
- logging.debug("Going to run %s instances...", num_instances)
+ logging.debug(_("Going to run %s instances..."), num_instances)
for num in range(num_instances):
instance = dict(mac_address=utils.generate_mac(),
launch_index=num,
**base_options)
- instance_ref = self.create_instance(context, security_groups,
- **instance)
- instance_id = instance_ref['id']
- internal_id = instance_ref['internal_id']
- hostname = generate_hostname(internal_id)
- self.update_instance(context, instance_id, hostname=hostname)
- instances.append(dict(id=instance_id, internal_id=internal_id,
- hostname=hostname, **instance))
-
- # TODO(vish): This probably should be done in the scheduler
- # or in compute as a call. The network should be
- # allocated after the host is assigned and setup
- # can happen at the same time.
- address = self.network_manager.allocate_fixed_ip(context,
- instance_id,
- is_vpn)
- rpc.cast(elevated,
- network_topic,
- {"method": "setup_fixed_ip",
- "args": {"address": address}})
-
- logging.debug("Casting to scheduler for %s/%s's instance %s" %
- (context.project_id, context.user_id, instance_id))
+ instance = self.db.instance_create(context, instance)
+ instance_id = instance['id']
+ internal_id = instance['internal_id']
+
+ elevated = context.elevated()
+ if not security_groups:
+ security_groups = []
+ for security_group_id in security_groups:
+ self.db.instance_add_security_group(elevated,
+ instance_id,
+ security_group_id)
+
+ # Set sane defaults if not specified
+ updates = dict(hostname=generate_hostname(internal_id))
+ if 'display_name' not in instance:
+ updates['display_name'] = "Server %s" % internal_id
+
+ instance = self.update_instance(context, instance_id, **updates)
+ instances.append(instance)
+
+ logging.debug(_("Casting to scheduler for %s/%s's instance %s"),
+ context.project_id, context.user_id, instance_id)
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "run_instance",
@@ -154,6 +180,12 @@ class ComputeAPI(base.Base):
return instances
def ensure_default_security_group(self, context):
+ """ Create security group for the security context if it
+ does not already exist
+
+ :param context: the security context
+
+ """
try:
db.security_group_get_by_name(context, context.project_id,
'default')
@@ -162,40 +194,7 @@ class ComputeAPI(base.Base):
'description': 'default',
'user_id': context.user_id,
'project_id': context.project_id}
- group = db.security_group_create(context, values)
-
- def create_instance(self, context, security_groups=None, **kwargs):
- """Creates the instance in the datastore and returns the
- new instance as a mapping
-
- :param context: The security context
- :param security_groups: list of security group ids to
- attach to the instance
- :param kwargs: All additional keyword args are treated
- as data fields of the instance to be
- created
-
- :retval Returns a mapping of the instance information
- that has just been created
-
- """
- instance_ref = self.db.instance_create(context, kwargs)
- inst_id = instance_ref['id']
- # Set sane defaults if not specified
- if kwargs.get('display_name') is None:
- display_name = "Server %s" % instance_ref['internal_id']
- instance_ref['display_name'] = display_name
- self.db.instance_update(context, inst_id,
- {'display_name': display_name})
-
- elevated = context.elevated()
- if not security_groups:
- security_groups = []
- for security_group_id in security_groups:
- self.db.instance_add_security_group(elevated,
- inst_id,
- security_group_id)
- return instance_ref
+ db.security_group_create(context, values)
def update_instance(self, context, instance_id, **kwargs):
"""Updates the instance in the datastore.
@@ -209,4 +208,96 @@ class ComputeAPI(base.Base):
:retval None
"""
- self.db.instance_update(context, instance_id, kwargs)
+ return self.db.instance_update(context, instance_id, kwargs)
+
+ def delete_instance(self, context, instance_id):
+ logging.debug("Going to try and terminate %d" % instance_id)
+ try:
+ instance = self.db.instance_get_by_internal_id(context,
+ instance_id)
+ except exception.NotFound as e:
+ logging.warning(_("Instance %d was not found during terminate"),
+ instance_id)
+ raise e
+
+ if (instance['state_description'] == 'terminating'):
+ logging.warning(_("Instance %d is already being terminated"),
+ instance_id)
+ return
+
+ self.update_instance(context,
+ instance['id'],
+ state_description='terminating',
+ state=0,
+ terminated_at=datetime.datetime.utcnow())
+
+ host = instance['host']
+ logging.error('terminate %s %s %s %s',context, FLAGS.compute_topic, host, self.db.queue_get_for(context, FLAGS.compute_topic, host))
+ if host:
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "terminate_instance",
+ "args": {"instance_id": instance['id']}})
+ else:
+ self.db.instance_destroy(context, instance['id'])
+
+ def get_instances(self, context, project_id=None):
+ """Get all instances, possibly filtered by project ID or
+ user ID. If there is no filter and the context is an admin,
+ it will retreive all instances in the system."""
+ if project_id or not context.is_admin:
+ if not context.project:
+ return self.db.instance_get_all_by_user(context,
+ context.user_id)
+ if project_id is None:
+ project_id = context.project_id
+ return self.db.instance_get_all_by_project(context, project_id)
+ return self.db.instance_get_all(context)
+
+ def get_instance(self, context, instance_id):
+ return self.db.instance_get_by_internal_id(context, instance_id)
+
+ def reboot(self, context, instance_id):
+ """Reboot the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "reboot_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def pause(self, context, instance_id):
+ """Pause the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "pause_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def unpause(self, context, instance_id):
+ """Unpause the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "unpause_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def rescue(self, context, instance_id):
+ """Rescue the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "rescue_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def unrescue(self, context, instance_id):
+ """Unrescue the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "unrescue_instance",
+ "args": {"instance_id": instance['id']}})
diff --git a/nova/compute/disk.py b/nova/compute/disk.py
index 4338d39f0..814a258cd 100644
--- a/nova/compute/disk.py
+++ b/nova/compute/disk.py
@@ -26,8 +26,6 @@ import logging
import os
import tempfile
-from twisted.internet import defer
-
from nova import exception
from nova import flags
@@ -39,7 +37,6 @@ flags.DEFINE_integer('block_size', 1024 * 1024 * 256,
'block_size to use for dd')
-@defer.inlineCallbacks
def partition(infile, outfile, local_bytes=0, resize=True,
local_type='ext2', execute=None):
"""
@@ -64,18 +61,18 @@ def partition(infile, outfile, local_bytes=0, resize=True,
file_size = os.path.getsize(infile)
if resize and file_size < FLAGS.minimum_root_size:
last_sector = FLAGS.minimum_root_size / sector_size - 1
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (infile, last_sector, sector_size))
- yield execute('e2fsck -fp %s' % infile, check_exit_code=False)
- yield execute('resize2fs %s' % infile)
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (infile, last_sector, sector_size))
+ execute('e2fsck -fp %s' % infile, check_exit_code=False)
+ execute('resize2fs %s' % infile)
file_size = FLAGS.minimum_root_size
elif file_size % sector_size != 0:
- logging.warn("Input partition size not evenly divisible by"
- " sector size: %d / %d", file_size, sector_size)
+ logging.warn(_("Input partition size not evenly divisible by"
+ " sector size: %d / %d"), file_size, sector_size)
primary_sectors = file_size / sector_size
if local_bytes % sector_size != 0:
- logging.warn("Bytes for local storage not evenly divisible"
- " by sector size: %d / %d", local_bytes, sector_size)
+ logging.warn(_("Bytes for local storage not evenly divisible"
+ " by sector size: %d / %d"), local_bytes, sector_size)
local_sectors = local_bytes / sector_size
mbr_last = 62 # a
@@ -86,30 +83,36 @@ def partition(infile, outfile, local_bytes=0, resize=True,
last_sector = local_last # e
# create an empty file
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, mbr_last, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, mbr_last, sector_size))
# make mbr partition
- yield execute('parted --script %s mklabel msdos' % outfile)
+ execute('parted --script %s mklabel msdos' % outfile)
# append primary file
- yield execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
- % (infile, outfile, FLAGS.block_size))
+ execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
+ % (infile, outfile, FLAGS.block_size))
# make primary partition
- yield execute('parted --script %s mkpart primary %ds %ds'
- % (outfile, primary_first, primary_last))
+ execute('parted --script %s mkpart primary %ds %ds'
+ % (outfile, primary_first, primary_last))
if local_bytes > 0:
# make the file bigger
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, last_sector, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, last_sector, sector_size))
# make and format local partition
- yield execute('parted --script %s mkpartfs primary %s %ds %ds'
- % (outfile, local_type, local_first, local_last))
+ execute('parted --script %s mkpartfs primary %s %ds %ds'
+ % (outfile, local_type, local_first, local_last))
+
+
+def extend(image, size, execute):
+ file_size = os.path.getsize(image)
+ if file_size >= size:
+ return
+ return execute('truncate -s size %s' % (image,))
-@defer.inlineCallbacks
def inject_data(image, key=None, net=None, partition=None, execute=None):
"""Injects a ssh key and optionally net data into a disk image.
@@ -119,74 +122,83 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
If partition is not specified it mounts the image as a single partition.
"""
- out, err = yield execute('sudo losetup -f --show %s' % image)
+ out, err = execute('sudo losetup --find --show %s' % image)
if err:
- raise exception.Error('Could not attach image to loopback: %s' % err)
+ raise exception.Error(_('Could not attach image to loopback: %s')
+ % err)
device = out.strip()
try:
if not partition is None:
# create partition
- out, err = yield execute('sudo kpartx -a %s' % device)
+ out, err = execute('sudo kpartx -a %s' % device)
if err:
- raise exception.Error('Failed to load partition: %s' % err)
+ raise exception.Error(_('Failed to load partition: %s') % err)
mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1],
partition)
else:
mapped_device = device
- out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
+
+ # We can only loopback mount raw images. If the device isn't there,
+ # it's normally because it's a .vmdk or a .vdi etc
+ if not os.path.exists(mapped_device):
+ raise exception.Error('Mapped device was not found (we can'
+ ' only inject raw disk images): %s' %
+ mapped_device)
+
+ # Configure ext2fs so that it doesn't auto-check every N boots
+ out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
tmpdir = tempfile.mkdtemp()
try:
# mount loopback to dir
- out, err = yield execute(
+ out, err = execute(
'sudo mount %s %s' % (mapped_device, tmpdir))
if err:
- raise exception.Error('Failed to mount filesystem: %s' % err)
+ raise exception.Error(_('Failed to mount filesystem: %s')
+ % err)
try:
if key:
# inject key file
- yield _inject_key_into_fs(key, tmpdir, execute=execute)
+ _inject_key_into_fs(key, tmpdir, execute=execute)
if net:
- yield _inject_net_into_fs(net, tmpdir, execute=execute)
+ _inject_net_into_fs(net, tmpdir, execute=execute)
finally:
# unmount device
- yield execute('sudo umount %s' % mapped_device)
+ execute('sudo umount %s' % mapped_device)
finally:
# remove temporary directory
- yield execute('rmdir %s' % tmpdir)
+ execute('rmdir %s' % tmpdir)
if not partition is None:
# remove partitions
- yield execute('sudo kpartx -d %s' % device)
+ execute('sudo kpartx -d %s' % device)
finally:
# remove loopback
- yield execute('sudo losetup -d %s' % device)
+ execute('sudo losetup --detach %s' % device)
-@defer.inlineCallbacks
def _inject_key_into_fs(key, fs, execute=None):
"""Add the given public ssh key to root's authorized_keys.
key is an ssh key string.
fs is the path to the base of the filesystem into which to inject the key.
"""
- sshdir = os.path.join(os.path.join(fs, 'root'), '.ssh')
- yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
- yield execute('sudo chown root %s' % sshdir)
- yield execute('sudo chmod 700 %s' % sshdir)
+ sshdir = os.path.join(fs, 'root', '.ssh')
+ execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
+ execute('sudo chown root %s' % sshdir)
+ execute('sudo chmod 700 %s' % sshdir)
keyfile = os.path.join(sshdir, 'authorized_keys')
- yield execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
+ execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
-@defer.inlineCallbacks
def _inject_net_into_fs(net, fs, execute=None):
"""Inject /etc/network/interfaces into the filesystem rooted at fs.
net is the contents of /etc/network/interfaces.
"""
netdir = os.path.join(os.path.join(fs, 'etc'), 'network')
- yield execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
- yield execute('sudo chown root:root %s' % netdir)
- yield execute('sudo chmod 755 %s' % netdir)
+ execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
+ execute('sudo chown root:root %s' % netdir)
+ execute('sudo chmod 755 %s' % netdir)
netfile = os.path.join(netdir, 'interfaces')
- yield execute('sudo tee %s' % netfile, net)
+ execute('sudo tee %s' % netfile, net)
diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py
index a2679e0fc..196d6a8df 100644
--- a/nova/compute/instance_types.py
+++ b/nova/compute/instance_types.py
@@ -22,6 +22,7 @@ The built-in instance properties.
"""
from nova import flags
+from nova import exception
FLAGS = flags.FLAGS
INSTANCE_TYPES = {
@@ -37,7 +38,7 @@ def get_by_type(instance_type):
if instance_type is None:
return FLAGS.default_instance_type
if instance_type not in INSTANCE_TYPES:
- raise exception.ApiError("Unknown instance type: %s",
+ raise exception.ApiError(_("Unknown instance type: %s"),
instance_type)
return instance_type
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 66bbb8d5a..dc660d2a6 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -42,11 +42,10 @@ import os
import time
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import manager
+from nova import rpc
from nova import utils
from nova.compute import power_state
from nova import rpc
@@ -57,8 +56,10 @@ flags.DEFINE_string('instances_path', '$state_path/instances',
'where instances are stored on disk')
flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
'Driver to use for controlling virtualization')
-flags.DEFINE_string('live_migration_timeout', 30,
+flags.DEFINE_string('live_migration_timeout', 10,
'Timeout value for pre_live_migration is completed.')
+flags.DEFINE_string('stub_network', False,
+ 'Stub network related code')
class ComputeManager(manager.Manager):
@@ -76,6 +77,12 @@ class ComputeManager(manager.Manager):
self.volume_manager = utils.import_object(FLAGS.volume_manager)
super(ComputeManager, self).__init__(*args, **kwargs)
+ def init_host(self):
+ """Do any initialization that needs to be run if this is a
+ standalone service.
+ """
+ self.driver.init_host()
+
def _update_state(self, context, instance_id):
"""Update the state of an instance from the driver info."""
# FIXME(ja): include other fields from state?
@@ -87,26 +94,56 @@ class ComputeManager(manager.Manager):
state = power_state.NOSTATE
self.db.instance_set_state(context, instance_id, state)
- @defer.inlineCallbacks
+ def get_network_topic(self, context, **_kwargs):
+ """Retrieves the network host for a project on this host"""
+ # TODO(vish): This method should be memoized. This will make
+ # the call to get_network_host cheaper, so that
+ # it can pas messages instead of checking the db
+ # locally.
+ if FLAGS.stub_network:
+ host = FLAGS.network_host
+ else:
+ host = self.network_manager.get_network_host(context)
+ return self.db.queue_get_for(context,
+ FLAGS.network_topic,
+ host)
+
@exception.wrap_exception
def refresh_security_group(self, context, security_group_id, **_kwargs):
"""This call passes stright through to the virtualization driver."""
- yield self.driver.refresh_security_group(security_group_id)
+ self.driver.refresh_security_group(security_group_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def run_instance(self, context, instance_id, **_kwargs):
"""Launch a new instance with specified options."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
if instance_ref['name'] in self.driver.list_instances():
- raise exception.Error("Instance has already been created")
- logging.debug("instance %s: starting...", instance_id)
- self.network_manager.setup_compute_network(context, instance_id)
+ raise exception.Error(_("Instance has already been created"))
self.db.instance_update(context,
instance_id,
{'host': self.host, 'launched_on':self.host})
+ self.db.instance_set_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'networking')
+
+ is_vpn = instance_ref['image_id'] == FLAGS.vpn_image_id
+ # NOTE(vish): This could be a cast because we don't do anything
+ # with the address currently, but I'm leaving it as
+ # a call to ensure that network setup completes. We
+ # will eventually also need to save the address here.
+ if not FLAGS.stub_network:
+ address = rpc.call(context,
+ self.get_network_topic(context),
+ {"method": "allocate_fixed_ip",
+ "args": {"instance_id": instance_id,
+ "vpn": is_vpn}})
+
+ self.network_manager.setup_compute_network(context,
+ instance_id)
+
# TODO(vish) check to make sure the availability zone matches
self.db.instance_set_state(context,
instance_id,
@@ -114,13 +151,13 @@ class ComputeManager(manager.Manager):
'spawning')
try:
- yield self.driver.spawn(instance_ref)
+ self.driver.spawn(instance_ref)
now = datetime.datetime.utcnow()
self.db.instance_update(context,
instance_id,
{'launched_at': now})
except Exception: # pylint: disable-msg=W0702
- logging.exception("instance %s: Failed to spawn",
+ logging.exception(_("instance %s: Failed to spawn"),
instance_ref['name'])
self.db.instance_set_state(context,
instance_id,
@@ -128,104 +165,167 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
context = context.elevated()
- logging.debug("instance %s: terminating", instance_id)
instance_ref = self.db.instance_get(context, instance_id)
+
+ if not FLAGS.stub_network:
+ address = self.db.instance_get_floating_address(context,
+ instance_ref['id'])
+ if address:
+ logging.debug(_("Disassociating address %s") % address)
+ # NOTE(vish): Right now we don't really care if the ip is
+ # disassociated. We may need to worry about
+ # checking this later.
+ rpc.cast(context,
+ self.get_network_topic(context),
+ {"method": "disassociate_floating_ip",
+ "args": {"floating_address": address}})
+
+ address = self.db.instance_get_fixed_address(context,
+ instance_ref['id'])
+ if address:
+ logging.debug(_("Deallocating address %s") % address)
+ # NOTE(vish): Currently, nothing needs to be done on the
+ # network node until release. If this changes,
+ # we will need to cast here.
+ self.network_manager.deallocate_fixed_ip(context.elevated(),
+ address)
+
+ logging.debug(_("instance %s: terminating"), instance_id)
+
volumes = instance_ref.get('volumes', []) or []
for volume in volumes:
self.detach_volume(context, instance_id, volume['id'])
if instance_ref['state'] == power_state.SHUTOFF:
self.db.instance_destroy(context, instance_id)
- raise exception.Error('trying to destroy already destroyed'
- ' instance: %s' % instance_id)
- yield self.driver.destroy(instance_ref)
+ raise exception.Error(_('trying to destroy already destroyed'
+ ' instance: %s') % instance_id)
+ self.driver.destroy(instance_ref)
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
context = context.elevated()
- instance_ref = self.db.instance_get(context, instance_id)
self._update_state(context, instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
if instance_ref['state'] != power_state.RUNNING:
- logging.warn('trying to reboot a non-running '
- 'instance: %s (state: %s excepted: %s)',
+ logging.warn(_('trying to reboot a non-running '
+ 'instance: %s (state: %s excepted: %s)'),
instance_ref['internal_id'],
instance_ref['state'],
power_state.RUNNING)
- logging.debug('instance %s: rebooting', instance_ref['name'])
+ logging.debug(_('instance %s: rebooting'), instance_ref['name'])
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
'rebooting')
- yield self.driver.reboot(instance_ref)
+ self.network_manager.setup_compute_network(context, instance_id)
+ self.driver.reboot(instance_ref)
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def rescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug('instance %s: rescuing',
+ logging.debug(_('instance %s: rescuing'),
instance_ref['internal_id'])
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
'rescuing')
- yield self.driver.rescue(instance_ref)
+ self.network_manager.setup_compute_network(context, instance_id)
+ self.driver.rescue(instance_ref)
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def unrescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug('instance %s: unrescuing',
+ logging.debug(_('instance %s: unrescuing'),
instance_ref['internal_id'])
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
'unrescuing')
- yield self.driver.unrescue(instance_ref)
+ self.driver.unrescue(instance_ref)
self._update_state(context, instance_id)
+ @staticmethod
+ def _update_state_callback(self, context, instance_id, result):
+ """Update instance state when async task completes."""
+ self._update_state(context, instance_id)
+
+ @exception.wrap_exception
+ def pause_instance(self, context, instance_id):
+ """Pause an instance on this server."""
+ context = context.elevated()
+ instance_ref = self.db.instance_get(context, instance_id)
+
+ logging.debug('instance %s: pausing',
+ instance_ref['internal_id'])
+ self.db.instance_set_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'pausing')
+ self.driver.pause(instance_ref,
+ lambda result: self._update_state_callback(self,
+ context,
+ instance_id,
+ result))
+
+ @exception.wrap_exception
+ def unpause_instance(self, context, instance_id):
+ """Unpause a paused instance on this server."""
+ context = context.elevated()
+ instance_ref = self.db.instance_get(context, instance_id)
+
+ logging.debug('instance %s: unpausing',
+ instance_ref['internal_id'])
+ self.db.instance_set_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'unpausing')
+ self.driver.unpause(instance_ref,
+ lambda result: self._update_state_callback(self,
+ context,
+ instance_id,
+ result))
+
@exception.wrap_exception
def get_console_output(self, context, instance_id):
"""Send the console output for an instance."""
context = context.elevated()
- logging.debug("instance %s: getting console output", instance_id)
+ logging.debug(_("instance %s: getting console output"), instance_id)
instance_ref = self.db.instance_get(context, instance_id)
return self.driver.get_console_output(instance_ref)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
context = context.elevated()
- logging.debug("instance %s: attaching volume %s to %s", instance_id,
+ logging.debug(_("instance %s: attaching volume %s to %s"), instance_id,
volume_id, mountpoint)
instance_ref = self.db.instance_get(context, instance_id)
- dev_path = yield self.volume_manager.setup_compute_volume(context,
- volume_id)
+ dev_path = self.volume_manager.setup_compute_volume(context,
+ volume_id)
try:
- yield self.driver.attach_volume(instance_ref['name'],
- dev_path,
- mountpoint)
+ self.driver.attach_volume(instance_ref['name'],
+ dev_path,
+ mountpoint)
self.db.volume_attached(context,
volume_id,
instance_id,
@@ -234,36 +334,35 @@ class ComputeManager(manager.Manager):
# NOTE(vish): The inline callback eats the exception info so we
# log the traceback here and reraise the same
# ecxception below.
- logging.exception("instance %s: attach failed %s, removing",
+ logging.exception(_("instance %s: attach failed %s, removing"),
instance_id, mountpoint)
- yield self.volume_manager.remove_compute_volume(context,
- volume_id)
+ self.volume_manager.remove_compute_volume(context,
+ volume_id)
raise exc
- defer.returnValue(True)
- @defer.inlineCallbacks
+ return True
+
@exception.wrap_exception
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
context = context.elevated()
- logging.debug("instance %s: detaching volume %s",
+ logging.debug(_("instance %s: detaching volume %s"),
instance_id,
volume_id)
instance_ref = self.db.instance_get(context, instance_id)
volume_ref = self.db.volume_get(context, volume_id)
if instance_ref['name'] not in self.driver.list_instances():
- logging.warn("Detaching volume from unknown instance %s",
+ logging.warn(_("Detaching volume from unknown instance %s"),
instance_ref['name'])
else:
- yield self.driver.detach_volume(instance_ref['name'],
- volume_ref['mountpoint'])
- yield self.volume_manager.remove_compute_volume(context, volume_id)
+ self.driver.detach_volume(instance_ref['name'],
+ volume_ref['mountpoint'])
+ self.volume_manager.remove_compute_volume(context, volume_id)
self.db.volume_detached(context, volume_id)
- defer.returnValue(True)
+ return True
def compareCPU(self, context, xml):
""" Check the host cpu is compatible to a cpu given by xml."""
- logging.warn('good!')
return self.driver.compareCPU(xml)
def get_memory_mb(self):
@@ -281,7 +380,7 @@ class ComputeManager(manager.Manager):
def pre_live_migration(self, context, instance_id, dest):
"""Any preparation for live migration at dst host."""
- # 1. getting volume info ( shlf/slot number )
+ # Getting volume info ( shlf/slot number )
instance_ref = db.instance_get(context, instance_id)
ec2_id = instance_ref['hostname']
@@ -289,40 +388,35 @@ class ComputeManager(manager.Manager):
try:
volumes = db.volume_get_by_ec2_id(context, ec2_id)
except exception.NotFound:
- logging.debug('%s has no volume.', ec2_id)
+ logging.info(_('%s has no volume.'), ec2_id)
shelf_slots = {}
for vol in volumes:
shelf, slot = db.volume_get_shelf_and_blade(context, vol['id'])
shelf_slots[vol.id] = (shelf, slot)
- # 2. getting fixed ips
+ # Getting fixed ips
fixed_ip = db.instance_get_fixed_address(context, instance_id)
if None == fixed_ip:
exc_type = 'NotFoundError'
- val = '%s(%s) doesnt have fixed_ip ' % (instance_id, ec2_id)
+ val = _('%s(%s) doesnt have fixed_ip') % (instance_id, ec2_id)
tb = ''.join(traceback.format_tb(sys.exc_info()[2]))
raise rpc.RemoteError(exc_type, val, tb)
- # 3. if any volume is mounted, prepare here.
+ # If any volume is mounted, prepare here.
if 0 != len(shelf_slots):
pass
- # 4. Creating nova-instance-instance-xxx, this is written to libvirt.xml,
- # and can be seen when executin "virsh nwfiter-list" On destination host,
- # this nwfilter is necessary.
- # In addition this method is creating security rule ingress rule onto
- # destination host.
+ # Creating nova-instance-instance-xxx, this is written to libvirt.xml,
+ # and can be seen when executin "virsh nwfiter-list" On destination host,
+ # this nwfilter is necessary.
+ # In addition this method is creating security rule ingress rule onto
+ # destination host.
self.driver.setup_nwfilters_for_instance(instance_ref)
# 5. bridge settings
-<<<<<<< TREE
- self.network_manager.setup_compute_network(instance_id)
- return True
-=======
self.network_manager.setup_compute_network(context, instance_id)
return True
->>>>>>> MERGE-SOURCE
def nwfilter_for_instance_exists(self, context, instance_id):
"""Check nova-instance-instance-xxx filter exists """
@@ -332,7 +426,7 @@ class ComputeManager(manager.Manager):
def live_migration(self, context, instance_id, dest):
"""executes live migration."""
- # 1. ask dest host to preparing live migration.
+ # Asking dest host to preparing live migration.
compute_topic = db.queue_get_for(context, FLAGS.compute_topic, dest)
ret = rpc.call(context,
compute_topic,
@@ -341,16 +435,16 @@ class ComputeManager(manager.Manager):
'dest': dest}})
if True != ret:
- logging.error('Live migration failed(err at %s)', dest)
+ logging.error(_('Pre live migration failed(err at %s)'), dest)
db.instance_set_state(context,
instance_id,
power_state.RUNNING,
'running')
return
- # waiting for setting up nwfilter(nova-instance-instance-xxx)
+ # Waiting for setting up nwfilter such as, nova-instance-instance-xxx.
# otherwise, live migration fail.
- timeout_count = range(FLAGS.live_migration_timeout)
+ timeout_count = range(FLAGS.live_migration_timeout * 2)
while 0 != len(timeout_count):
ret = rpc.call(context,
compute_topic,
@@ -360,17 +454,14 @@ class ComputeManager(manager.Manager):
break
timeout_count.pop()
- time.sleep(1)
+ time.sleep(0.5)
if not ret:
- logging.error('Timeout for pre_live_migration at %s', dest)
+ logging.error(_('Timeout for pre_live_migration at %s'), dest)
return
- # 2. executing live migration
+ # Executing live migration
# live_migration might raises ProcessExecution error, but
# nothing must be recovered in this version.
instance_ref = db.instance_get(context, instance_id)
- ret = self.driver.live_migration(instance_ref, dest)
- if not ret:
- logging.debug('Fail to live migration')
- return
+ self.driver.live_migration(context, instance_ref, dest)
diff --git a/nova/compute/monitor.py b/nova/compute/monitor.py
index 22653113a..60c347a5e 100644
--- a/nova/compute/monitor.py
+++ b/nova/compute/monitor.py
@@ -255,7 +255,7 @@ class Instance(object):
Updates the instances statistics and stores the resulting graphs
in the internal object store on the cloud controller.
"""
- logging.debug('updating %s...', self.instance_id)
+ logging.debug(_('updating %s...'), self.instance_id)
try:
data = self.fetch_cpu_stats()
@@ -285,7 +285,7 @@ class Instance(object):
graph_disk(self, '1w')
graph_disk(self, '1m')
except Exception:
- logging.exception('unexpected error during update')
+ logging.exception(_('unexpected error during update'))
self.last_updated = utcnow()
@@ -351,7 +351,7 @@ class Instance(object):
rd += rd_bytes
wr += wr_bytes
except TypeError:
- logging.error('Cannot get blockstats for "%s" on "%s"',
+ logging.error(_('Cannot get blockstats for "%s" on "%s"'),
disk, self.instance_id)
raise
@@ -373,7 +373,7 @@ class Instance(object):
rx += stats[0]
tx += stats[4]
except TypeError:
- logging.error('Cannot get ifstats for "%s" on "%s"',
+ logging.error(_('Cannot get ifstats for "%s" on "%s"'),
interface, self.instance_id)
raise
@@ -408,7 +408,7 @@ class InstanceMonitor(object, service.Service):
try:
conn = virt_connection.get_connection(read_only=True)
except Exception, exn:
- logging.exception('unexpected exception getting connection')
+ logging.exception(_('unexpected exception getting connection'))
time.sleep(FLAGS.monitoring_instances_delay)
return
@@ -423,7 +423,7 @@ class InstanceMonitor(object, service.Service):
if not domain_id in self._instances:
instance = Instance(conn, domain_id)
self._instances[domain_id] = instance
- logging.debug('Found instance: %s', domain_id)
+ logging.debug(_('Found instance: %s'), domain_id)
for key in self._instances.keys():
instance = self._instances[key]