summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTodd Willey <todd@ansolabs.com>2011-05-31 16:20:35 -0400
committerTodd Willey <todd@ansolabs.com>2011-05-31 16:20:35 -0400
commiteca6c3098144c1bf917725d906a50141a5aaef4e (patch)
tree2ee52158c2c6ca49f6675359b1977bc53ca72621
parent4d7dbdc96e30afbd19ab525e9667f6e3aaaafbe9 (diff)
parent0b7104a8c6b2b2e3fed4a09b239439964aeb2774 (diff)
Merge Trunk.
-rw-r--r--Authors2
-rw-r--r--nova/api/ec2/__init__.py6
-rw-r--r--nova/api/ec2/cloud.py70
-rw-r--r--nova/api/openstack/contrib/volumes.py2
-rw-r--r--nova/api/openstack/images.py22
-rw-r--r--nova/api/openstack/servers.py10
-rw-r--r--nova/auth/novarc.template2
-rw-r--r--nova/compute/api.py13
-rw-r--r--nova/compute/manager.py5
-rw-r--r--nova/db/api.py39
-rw-r--r--nova/db/sqlalchemy/api.py115
-rw-r--r--nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py70
-rw-r--r--nova/db/sqlalchemy/migrate_repo/versions/020_add_snapshot_id_to_volumes.py47
-rw-r--r--nova/db/sqlalchemy/models.py27
-rw-r--r--nova/exception.py8
-rw-r--r--nova/fakerabbit.py31
-rw-r--r--nova/image/fake.py4
-rw-r--r--nova/image/glance.py8
-rw-r--r--nova/image/local.py4
-rw-r--r--nova/image/service.py4
-rw-r--r--nova/network/vmwareapi_net.py4
-rw-r--r--nova/rpc.py271
-rw-r--r--nova/service.py60
-rw-r--r--nova/test.py9
-rw-r--r--nova/tests/api/openstack/fakes.py4
-rw-r--r--nova/tests/api/openstack/test_images.py141
-rw-r--r--nova/tests/image/test_glance.py2
-rw-r--r--nova/tests/integrated/api/client.py10
-rw-r--r--nova/tests/integrated/integrated_helpers.py5
-rw-r--r--nova/tests/integrated/test_servers.py106
-rw-r--r--nova/tests/test_cloud.py91
-rw-r--r--nova/tests/test_quota.py2
-rw-r--r--nova/tests/test_rpc.py116
-rw-r--r--nova/tests/test_service.py59
-rw-r--r--nova/tests/test_volume.py50
-rw-r--r--nova/tests/test_xenapi.py49
-rw-r--r--nova/tests/xenapi/stubs.py26
-rw-r--r--nova/virt/libvirt.xml.template2
-rw-r--r--nova/virt/libvirt/connection.py1
-rw-r--r--nova/virt/vmwareapi/vmops.py6
-rw-r--r--nova/virt/xenapi/fake.py5
-rw-r--r--nova/virt/xenapi/vm_utils.py49
-rw-r--r--nova/virt/xenapi/vmops.py83
-rw-r--r--nova/vnc/__init__.py2
-rw-r--r--nova/volume/api.py57
-rw-r--r--nova/volume/driver.py134
-rw-r--r--nova/volume/manager.py59
-rw-r--r--plugins/xenserver/xenapi/etc/xapi.d/plugins/glance96
-rw-r--r--tools/pip-requires3
49 files changed, 1685 insertions, 306 deletions
diff --git a/Authors b/Authors
index 50f4680a9..8b30fba77 100644
--- a/Authors
+++ b/Authors
@@ -30,6 +30,7 @@ Gabe Westmaas <gabe.westmaas@rackspace.com>
Hisaharu Ishii <ishii.hisaharu@lab.ntt.co.jp>
Hisaki Ohara <hisaki.ohara@intel.com>
Ilya Alekseyev <ialekseev@griddynamics.com>
+Isaku Yamahata <yamahata@valinux.co.jp>
Jason Koelker <jason@koelker.net>
Jay Pipes <jaypipes@gmail.com>
Jesse Andrews <anotherjesse@gmail.com>
@@ -83,6 +84,7 @@ Trey Morris <trey.morris@rackspace.com>
Tushar Patil <tushar.vitthal.patil@gmail.com>
Vasiliy Shlykov <vash@vasiliyshlykov.org>
Vishvananda Ishaya <vishvananda@gmail.com>
+Vivek Y S <vivek.ys@gmail.com>
William Wolf <throughnothing@gmail.com>
Yoshiaki Tamura <yoshi@midokura.jp>
Youcef Laribi <Youcef.Laribi@eu.citrix.com>
diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py
index c13993dd3..1915d007d 100644
--- a/nova/api/ec2/__init__.py
+++ b/nova/api/ec2/__init__.py
@@ -327,6 +327,12 @@ class Executor(wsgi.Application):
ec2_id = ec2utils.id_to_ec2_id(ex.volume_id, 'vol-%08x')
message = _('Volume %s not found') % ec2_id
return self._error(req, context, type(ex).__name__, message)
+ except exception.SnapshotNotFound as ex:
+ LOG.info(_('SnapshotNotFound raised: %s'), unicode(ex),
+ context=context)
+ ec2_id = ec2utils.id_to_ec2_id(ex.snapshot_id, 'snap-%08x')
+ message = _('Snapshot %s not found') % ec2_id
+ return self._error(req, context, type(ex).__name__, message)
except exception.NotFound as ex:
LOG.info(_('NotFound raised: %s'), unicode(ex), context=context)
return self._error(req, context, type(ex).__name__, unicode(ex))
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index c35b6024e..79cc3b3bf 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -283,14 +283,50 @@ class CloudController(object):
owner=None,
restorable_by=None,
**kwargs):
- return {'snapshotSet': [{'snapshotId': 'fixme',
- 'volumeId': 'fixme',
- 'status': 'fixme',
- 'startTime': 'fixme',
- 'progress': 'fixme',
- 'ownerId': 'fixme',
- 'volumeSize': 0,
- 'description': 'fixme'}]}
+ if snapshot_id:
+ snapshots = []
+ for ec2_id in snapshot_id:
+ internal_id = ec2utils.ec2_id_to_id(ec2_id)
+ snapshot = self.volume_api.get_snapshot(
+ context,
+ snapshot_id=internal_id)
+ snapshots.append(snapshot)
+ else:
+ snapshots = self.volume_api.get_all_snapshots(context)
+ snapshots = [self._format_snapshot(context, s) for s in snapshots]
+ return {'snapshotSet': snapshots}
+
+ def _format_snapshot(self, context, snapshot):
+ s = {}
+ s['snapshotId'] = ec2utils.id_to_ec2_id(snapshot['id'], 'snap-%08x')
+ s['volumeId'] = ec2utils.id_to_ec2_id(snapshot['volume_id'],
+ 'vol-%08x')
+ s['status'] = snapshot['status']
+ s['startTime'] = snapshot['created_at']
+ s['progress'] = snapshot['progress']
+ s['ownerId'] = snapshot['project_id']
+ s['volumeSize'] = snapshot['volume_size']
+ s['description'] = snapshot['display_description']
+
+ s['display_name'] = snapshot['display_name']
+ s['display_description'] = snapshot['display_description']
+ return s
+
+ def create_snapshot(self, context, volume_id, **kwargs):
+ LOG.audit(_("Create snapshot of volume %s"), volume_id,
+ context=context)
+ volume_id = ec2utils.ec2_id_to_id(volume_id)
+ snapshot = self.volume_api.create_snapshot(
+ context,
+ volume_id=volume_id,
+ name=kwargs.get('display_name'),
+ description=kwargs.get('display_description'))
+ return self._format_snapshot(context, snapshot)
+
+ def delete_snapshot(self, context, snapshot_id, **kwargs):
+ snapshot_id = ec2utils.ec2_id_to_id(snapshot_id)
+ self.volume_api.delete_snapshot(context, snapshot_id=snapshot_id)
+ return True
def describe_key_pairs(self, context, key_name=None, **kwargs):
key_pairs = db.key_pair_get_all_by_user(context, context.user_id)
@@ -619,16 +655,30 @@ class CloudController(object):
'volumeId': v['volumeId']}]
else:
v['attachmentSet'] = [{}]
+ if volume.get('snapshot_id') != None:
+ v['snapshotId'] = ec2utils.id_to_ec2_id(volume['snapshot_id'],
+ 'snap-%08x')
+ else:
+ v['snapshotId'] = None
v['display_name'] = volume['display_name']
v['display_description'] = volume['display_description']
return v
- def create_volume(self, context, size, **kwargs):
- LOG.audit(_("Create volume of %s GB"), size, context=context)
+ def create_volume(self, context, **kwargs):
+ size = kwargs.get('size')
+ if kwargs.get('snapshot_id') != None:
+ snapshot_id = ec2utils.ec2_id_to_id(kwargs['snapshot_id'])
+ LOG.audit(_("Create volume from snapshot %s"), snapshot_id,
+ context=context)
+ else:
+ snapshot_id = None
+ LOG.audit(_("Create volume of %s GB"), size, context=context)
+
volume = self.volume_api.create(
context,
size=size,
+ snapshot_id=snapshot_id,
name=kwargs.get('display_name'),
description=kwargs.get('display_description'))
# TODO(vish): Instance should be None at db layer instead of
diff --git a/nova/api/openstack/contrib/volumes.py b/nova/api/openstack/contrib/volumes.py
index 18de2ec71..b22bd2846 100644
--- a/nova/api/openstack/contrib/volumes.py
+++ b/nova/api/openstack/contrib/volumes.py
@@ -135,7 +135,7 @@ class VolumeController(wsgi.Controller):
vol = env['volume']
size = vol['size']
LOG.audit(_("Create volume of %s GB"), size, context=context)
- new_volume = self.volume_api.create(context, size,
+ new_volume = self.volume_api.create(context, size, None,
vol.get('display_name'),
vol.get('display_description'))
diff --git a/nova/api/openstack/images.py b/nova/api/openstack/images.py
index 34d4c27fc..2e779da79 100644
--- a/nova/api/openstack/images.py
+++ b/nova/api/openstack/images.py
@@ -28,6 +28,8 @@ from nova.api.openstack.views import images as images_view
LOG = log.getLogger('nova.api.openstack.images')
FLAGS = flags.FLAGS
+SUPPORTED_FILTERS = ['name', 'status']
+
class Controller(common.OpenstackController):
"""Base `wsgi.Controller` for retrieving/displaying images."""
@@ -59,7 +61,8 @@ class Controller(common.OpenstackController):
:param req: `wsgi.Request` object
"""
context = req.environ['nova.context']
- images = self._image_service.index(context)
+ filters = self._get_filters(req)
+ images = self._image_service.index(context, filters)
images = common.limited(images, req)
builder = self.get_builder(req).build
return dict(images=[builder(image, detail=False) for image in images])
@@ -70,11 +73,26 @@ class Controller(common.OpenstackController):
:param req: `wsgi.Request` object.
"""
context = req.environ['nova.context']
- images = self._image_service.detail(context)
+ filters = self._get_filters(req)
+ images = self._image_service.detail(context, filters)
images = common.limited(images, req)
builder = self.get_builder(req).build
return dict(images=[builder(image, detail=True) for image in images])
+ def _get_filters(self, req):
+ """
+ Return a dictionary of query param filters from the request
+
+ :param req: the Request object coming from the wsgi layer
+ :retval a dict of key/value filters
+ """
+ filters = {}
+ for param in req.str_params:
+ if param in SUPPORTED_FILTERS or param.startswith('property-'):
+ filters[param] = req.str_params.get(param)
+
+ return filters
+
def show(self, req, id):
"""Return detailed information about a specific image.
diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py
index 5c10fc916..8e191c232 100644
--- a/nova/api/openstack/servers.py
+++ b/nova/api/openstack/servers.py
@@ -708,14 +708,16 @@ class ControllerV11(Controller):
image_id = common.get_id_from_href(image_ref)
personalities = info["rebuild"].get("personality", [])
- metadata = info["rebuild"].get("metadata", {})
+ metadata = info["rebuild"].get("metadata")
+ name = info["rebuild"].get("name")
- self._validate_metadata(metadata)
+ if metadata:
+ self._validate_metadata(metadata)
self._decode_personalities(personalities)
try:
- self.compute_api.rebuild(context, instance_id, image_id, metadata,
- personalities)
+ self.compute_api.rebuild(context, instance_id, image_id, name,
+ metadata, personalities)
except exception.BuildInProgress:
msg = _("Instance %d is currently being rebuilt.") % instance_id
LOG.debug(msg)
diff --git a/nova/auth/novarc.template b/nova/auth/novarc.template
index cda2ecc28..8170fcafe 100644
--- a/nova/auth/novarc.template
+++ b/nova/auth/novarc.template
@@ -1,4 +1,4 @@
-NOVA_KEY_DIR=$(pushd $(dirname $BASH_SOURCE)>/dev/null; pwd; popd>/dev/null)
+NOVA_KEY_DIR=$(dirname $(readlink -f ${BASH_SOURCE}))
export EC2_ACCESS_KEY="%(access)s:%(project)s"
export EC2_SECRET_KEY="%(secret)s"
export EC2_URL="%(ec2)s"
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 660c7666f..1455e2de9 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -540,7 +540,7 @@ class API(base.Base):
"""Reboot the given instance."""
self._cast_compute_message('reboot_instance', context, instance_id)
- def rebuild(self, context, instance_id, image_id, metadata=None,
+ def rebuild(self, context, instance_id, image_id, name=None, metadata=None,
files_to_inject=None):
"""Rebuild the given instance with the provided metadata."""
instance = db.api.instance_get(context, instance_id)
@@ -549,13 +549,16 @@ class API(base.Base):
msg = _("Instance already building")
raise exception.BuildInProgress(msg)
- metadata = metadata or {}
- self._check_metadata_properties_quota(context, metadata)
-
files_to_inject = files_to_inject or []
self._check_injected_file_quota(context, files_to_inject)
- self.db.instance_update(context, instance_id, {"metadata": metadata})
+ values = {}
+ if metadata is not None:
+ self._check_metadata_properties_quota(context, metadata)
+ values['metadata'] = metadata
+ if name is not None:
+ values['display_name'] = name
+ self.db.instance_update(context, instance_id, values)
rebuild_params = {
"image_id": image_id,
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index ff7aeb053..a2c66a957 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -336,7 +336,7 @@ class ComputeManager(manager.SchedulerDependentManager):
@exception.wrap_exception
@checks_instance_lock
- def rebuild_instance(self, context, instance_id, image_id):
+ def rebuild_instance(self, context, instance_id, **kwargs):
"""Destroy and re-make this instance.
A 'rebuild' effectively purges all existing data from the system and
@@ -354,7 +354,8 @@ class ComputeManager(manager.SchedulerDependentManager):
self._update_state(context, instance_id, power_state.BUILDING)
self.driver.destroy(instance_ref)
- instance_ref.image_id = image_id
+ instance_ref.image_id = kwargs.get('image_id')
+ instance_ref.injected_files = kwargs.get('injected_files', [])
self.driver.spawn(instance_ref)
self._update_image_id(context, instance_id, image_id)
diff --git a/nova/db/api.py b/nova/db/api.py
index f5c71019a..798b2881c 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -47,6 +47,8 @@ flags.DEFINE_string('instance_name_template', 'instance-%08x',
'Template string to be used to generate instance names')
flags.DEFINE_string('volume_name_template', 'volume-%08x',
'Template string to be used to generate instance names')
+flags.DEFINE_string('snapshot_name_template', 'snapshot-%08x',
+ 'Template string to be used to generate snapshot names')
IMPL = utils.LazyPluggable(FLAGS['db_backend'],
@@ -881,6 +883,43 @@ def volume_update(context, volume_id, values):
####################
+def snapshot_create(context, values):
+ """Create a snapshot from the values dictionary."""
+ return IMPL.snapshot_create(context, values)
+
+
+def snapshot_destroy(context, snapshot_id):
+ """Destroy the snapshot or raise if it does not exist."""
+ return IMPL.snapshot_destroy(context, snapshot_id)
+
+
+def snapshot_get(context, snapshot_id):
+ """Get a snapshot or raise if it does not exist."""
+ return IMPL.snapshot_get(context, snapshot_id)
+
+
+def snapshot_get_all(context):
+ """Get all snapshots."""
+ return IMPL.snapshot_get_all(context)
+
+
+def snapshot_get_all_by_project(context, project_id):
+ """Get all snapshots belonging to a project."""
+ return IMPL.snapshot_get_all_by_project(context, project_id)
+
+
+def snapshot_update(context, snapshot_id, values):
+ """Set the given properties on an snapshot and update it.
+
+ Raises NotFound if snapshot does not exist.
+
+ """
+ return IMPL.snapshot_update(context, snapshot_id, values)
+
+
+####################
+
+
def security_group_get_all(context):
"""Get all security groups."""
return IMPL.security_group_get_all(context)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 5f5d46b86..886dc2a43 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -771,6 +771,15 @@ def fixed_ip_update(context, address, values):
###################
+def _metadata_refs(metadata_dict):
+ metadata_refs = []
+ if metadata_dict:
+ for k, v in metadata_dict.iteritems():
+ metadata_ref = models.InstanceMetadata()
+ metadata_ref['key'] = k
+ metadata_ref['value'] = v
+ metadata_refs.append(metadata_ref)
+ return metadata_refs
@require_context
@@ -780,15 +789,7 @@ def instance_create(context, values):
context - request context object
values - dict containing column values.
"""
- metadata = values.get('metadata')
- metadata_refs = []
- if metadata:
- for k, v in metadata.iteritems():
- metadata_ref = models.InstanceMetadata()
- metadata_ref['key'] = k
- metadata_ref['value'] = v
- metadata_refs.append(metadata_ref)
- values['metadata'] = metadata_refs
+ values['metadata'] = _metadata_refs(values.get('metadata'))
instance_ref = models.Instance()
instance_ref.update(values)
@@ -1010,6 +1011,11 @@ def instance_set_state(context, instance_id, state, description=None):
@require_context
def instance_update(context, instance_id, values):
session = get_session()
+ metadata = values.get('metadata')
+ if metadata is not None:
+ instance_metadata_delete_all(context, instance_id)
+ instance_metadata_update_or_create(context, instance_id,
+ values.pop('metadata'))
with session.begin():
instance_ref = instance_get(context, instance_id, session=session)
instance_ref.update(values)
@@ -1790,6 +1796,82 @@ def volume_update(context, volume_id, values):
@require_context
+def snapshot_create(context, values):
+ snapshot_ref = models.Snapshot()
+ snapshot_ref.update(values)
+
+ session = get_session()
+ with session.begin():
+ snapshot_ref.save(session=session)
+ return snapshot_ref
+
+
+@require_admin_context
+def snapshot_destroy(context, snapshot_id):
+ session = get_session()
+ with session.begin():
+ session.query(models.Snapshot).\
+ filter_by(id=snapshot_id).\
+ update({'deleted': 1,
+ 'deleted_at': datetime.datetime.utcnow(),
+ 'updated_at': literal_column('updated_at')})
+
+
+@require_context
+def snapshot_get(context, snapshot_id, session=None):
+ if not session:
+ session = get_session()
+ result = None
+
+ if is_admin_context(context):
+ result = session.query(models.Snapshot).\
+ filter_by(id=snapshot_id).\
+ filter_by(deleted=can_read_deleted(context)).\
+ first()
+ elif is_user_context(context):
+ result = session.query(models.Snapshot).\
+ filter_by(project_id=context.project_id).\
+ filter_by(id=snapshot_id).\
+ filter_by(deleted=False).\
+ first()
+ if not result:
+ raise exception.SnapshotNotFound(snapshot_id=snapshot_id)
+
+ return result
+
+
+@require_admin_context
+def snapshot_get_all(context):
+ session = get_session()
+ return session.query(models.Snapshot).\
+ filter_by(deleted=can_read_deleted(context)).\
+ all()
+
+
+@require_context
+def snapshot_get_all_by_project(context, project_id):
+ authorize_project_context(context, project_id)
+
+ session = get_session()
+ return session.query(models.Snapshot).\
+ filter_by(project_id=project_id).\
+ filter_by(deleted=can_read_deleted(context)).\
+ all()
+
+
+@require_context
+def snapshot_update(context, snapshot_id, values):
+ session = get_session()
+ with session.begin():
+ snapshot_ref = snapshot_get(context, snapshot_id, session=session)
+ snapshot_ref.update(values)
+ snapshot_ref.save(session=session)
+
+
+###################
+
+
+@require_context
def security_group_get_all(context):
session = get_session()
return session.query(models.SecurityGroup).\
@@ -2569,6 +2651,17 @@ def instance_metadata_delete(context, instance_id, key):
@require_context
+def instance_metadata_delete_all(context, instance_id):
+ session = get_session()
+ session.query(models.InstanceMetadata).\
+ filter_by(instance_id=instance_id).\
+ filter_by(deleted=False).\
+ update({'deleted': True,
+ 'deleted_at': datetime.datetime.utcnow(),
+ 'updated_at': literal_column('updated_at')})
+
+
+@require_context
def instance_metadata_get_item(context, instance_id, key):
session = get_session()
@@ -2587,6 +2680,9 @@ def instance_metadata_get_item(context, instance_id, key):
@require_context
def instance_metadata_update_or_create(context, instance_id, metadata):
session = get_session()
+
+ original_metadata = instance_metadata_get(context, instance_id)
+
meta_ref = None
for key, value in metadata.iteritems():
try:
@@ -2598,4 +2694,5 @@ def instance_metadata_update_or_create(context, instance_id, metadata):
"instance_id": instance_id,
"deleted": 0})
meta_ref.save(session=session)
+
return metadata
diff --git a/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py b/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py
new file mode 100644
index 000000000..f16d6db56
--- /dev/null
+++ b/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py
@@ -0,0 +1,70 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 MORITA Kazutaka.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Column, Table, MetaData
+from sqlalchemy import Integer, DateTime, Boolean, String
+
+from nova import log as logging
+
+meta = MetaData()
+
+snapshots = Table('snapshots', meta,
+ Column('created_at', DateTime(timezone=False)),
+ Column('updated_at', DateTime(timezone=False)),
+ Column('deleted_at', DateTime(timezone=False)),
+ Column('deleted', Boolean(create_constraint=True, name=None)),
+ Column('id', Integer(), primary_key=True, nullable=False),
+ Column('volume_id', Integer(), nullable=False),
+ Column('user_id',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('project_id',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('status',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('progress',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('volume_size', Integer()),
+ Column('scheduled_at', DateTime(timezone=False)),
+ Column('display_name',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('display_description',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)))
+
+
+def upgrade(migrate_engine):
+ # Upgrade operations go here. Don't create your own engine;
+ # bind migrate_engine to your metadata
+ meta.bind = migrate_engine
+
+ try:
+ snapshots.create()
+ except Exception:
+ logging.info(repr(snapshots))
+ logging.exception('Exception while creating table')
+ meta.drop_all(tables=[snapshots])
+ raise
+
+
+def downgrade(migrate_engine):
+ # Operations to reverse the above upgrade go here.
+ snapshots.drop()
diff --git a/nova/db/sqlalchemy/migrate_repo/versions/020_add_snapshot_id_to_volumes.py b/nova/db/sqlalchemy/migrate_repo/versions/020_add_snapshot_id_to_volumes.py
new file mode 100644
index 000000000..10bd9d5c9
--- /dev/null
+++ b/nova/db/sqlalchemy/migrate_repo/versions/020_add_snapshot_id_to_volumes.py
@@ -0,0 +1,47 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 MORITA Kazutaka.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Column, Table, MetaData, Integer
+
+from nova import log as logging
+
+
+meta = MetaData()
+
+
+# Table stub-definitions
+# Just for the ForeignKey and column creation to succeed, these are not the
+# actual definitions of instances or services.
+#
+volumes = Table('volumes', meta,
+ Column('id', Integer(), primary_key=True, nullable=False),
+ )
+
+#
+# New Column
+#
+
+snapshot_id = Column('snapshot_id', Integer())
+
+
+def upgrade(migrate_engine):
+ # Upgrade operations go here. Don't create your own engine;
+ # bind migrate_engine to your metadata
+ meta.bind = migrate_engine
+
+ # Add columns to existing tables
+ volumes.create_column(snapshot_id)
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index dc10bcf00..e619eec6b 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -287,6 +287,8 @@ class Volume(BASE, NovaBase):
user_id = Column(String(255))
project_id = Column(String(255))
+ snapshot_id = Column(String(255))
+
host = Column(String(255)) # , ForeignKey('hosts.id'))
size = Column(Integer)
availability_zone = Column(String(255)) # TODO(vish): foreign key?
@@ -329,6 +331,31 @@ class Quota(BASE, NovaBase):
hard_limit = Column(Integer, nullable=True)
+class Snapshot(BASE, NovaBase):
+ """Represents a block storage device that can be attached to a vm."""
+ __tablename__ = 'snapshots'
+ id = Column(Integer, primary_key=True, autoincrement=True)
+
+ @property
+ def name(self):
+ return FLAGS.snapshot_name_template % self.id
+
+ @property
+ def volume_name(self):
+ return FLAGS.volume_name_template % self.volume_id
+
+ user_id = Column(String(255))
+ project_id = Column(String(255))
+
+ volume_id = Column(Integer)
+ status = Column(String(255))
+ progress = Column(String(255))
+ volume_size = Column(Integer)
+
+ display_name = Column(String(255))
+ display_description = Column(String(255))
+
+
class ExportDevice(BASE, NovaBase):
"""Represates a shelf and blade that a volume can be exported on."""
__tablename__ = 'export_devices'
diff --git a/nova/exception.py b/nova/exception.py
index 56c20d111..02c65fd64 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -271,6 +271,14 @@ class VolumeNotFoundForInstance(VolumeNotFound):
message = _("Volume not found for instance %(instance_id)s.")
+class SnapshotNotFound(NotFound):
+ message = _("Snapshot %(snapshot_id)s could not be found.")
+
+
+class VolumeIsBusy(Error):
+ message = _("deleting volume %(volume_name)s that has snapshot")
+
+
class ExportDeviceNotFoundForVolume(NotFound):
message = _("No export device found for volume %(volume_id)s.")
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py
index a7dee8caf..e7e9dab77 100644
--- a/nova/fakerabbit.py
+++ b/nova/fakerabbit.py
@@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit")
EXCHANGES = {}
QUEUES = {}
+CONSUMERS = {}
class Message(base.BaseMessage):
@@ -96,17 +97,29 @@ class Backend(base.BaseBackend):
' key %(routing_key)s') % locals())
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
- def declare_consumer(self, queue, callback, *args, **kwargs):
- self.current_queue = queue
- self.current_callback = callback
+ def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
+ global CONSUMERS
+ LOG.debug("Adding consumer %s", consumer_tag)
+ CONSUMERS[consumer_tag] = (queue, callback)
+
+ def cancel(self, consumer_tag):
+ global CONSUMERS
+ LOG.debug("Removing consumer %s", consumer_tag)
+ del CONSUMERS[consumer_tag]
def consume(self, limit=None):
+ global CONSUMERS
+ num = 0
while True:
- item = self.get(self.current_queue)
- if item:
- self.current_callback(item)
- raise StopIteration()
- greenthread.sleep(0)
+ for (queue, callback) in CONSUMERS.itervalues():
+ item = self.get(queue)
+ if item:
+ callback(item)
+ num += 1
+ yield
+ if limit and num == limit:
+ raise StopIteration()
+ greenthread.sleep(0.1)
def get(self, queue, no_ack=False):
global QUEUES
@@ -134,5 +147,7 @@ class Backend(base.BaseBackend):
def reset_all():
global EXCHANGES
global QUEUES
+ global CONSUMERS
EXCHANGES = {}
QUEUES = {}
+ CONSUMERS = {}
diff --git a/nova/image/fake.py b/nova/image/fake.py
index b400b2adb..8e84c8597 100644
--- a/nova/image/fake.py
+++ b/nova/image/fake.py
@@ -52,11 +52,11 @@ class FakeImageService(service.BaseImageService):
self.create(None, image)
super(FakeImageService, self).__init__()
- def index(self, context):
+ def index(self, context, filters=None):
"""Returns list of images."""
return copy.deepcopy(self.images.values())
- def detail(self, context):
+ def detail(self, context, filters=None):
"""Return list of detailed image information."""
return copy.deepcopy(self.images.values())
diff --git a/nova/image/glance.py b/nova/image/glance.py
index 193e37273..dec797619 100644
--- a/nova/image/glance.py
+++ b/nova/image/glance.py
@@ -58,23 +58,23 @@ class GlanceImageService(service.BaseImageService):
else:
self.client = client
- def index(self, context):
+ def index(self, context, filters=None):
"""Calls out to Glance for a list of images available."""
# NOTE(sirp): We need to use `get_images_detailed` and not
# `get_images` here because we need `is_public` and `properties`
# included so we can filter by user
filtered = []
- image_metas = self.client.get_images_detailed()
+ image_metas = self.client.get_images_detailed(filters=filters)
for image_meta in image_metas:
if self._is_image_available(context, image_meta):
meta_subset = utils.subset_dict(image_meta, ('id', 'name'))
filtered.append(meta_subset)
return filtered
- def detail(self, context):
+ def detail(self, context, filters=None):
"""Calls out to Glance for a list of detailed image information."""
filtered = []
- image_metas = self.client.get_images_detailed()
+ image_metas = self.client.get_images_detailed(filters=filters)
for image_meta in image_metas:
if self._is_image_available(context, image_meta):
base_image_meta = self._translate_to_base(image_meta)
diff --git a/nova/image/local.py b/nova/image/local.py
index 918180bae..677d5302b 100644
--- a/nova/image/local.py
+++ b/nova/image/local.py
@@ -63,7 +63,7 @@ class LocalImageService(service.BaseImageService):
images.append(unhexed_image_id)
return images
- def index(self, context):
+ def index(self, context, *args, **kwargs):
filtered = []
image_metas = self.detail(context)
for image_meta in image_metas:
@@ -71,7 +71,7 @@ class LocalImageService(service.BaseImageService):
filtered.append(meta)
return filtered
- def detail(self, context):
+ def detail(self, context, *args, **kwargs):
images = []
for image_id in self._ids():
try:
diff --git a/nova/image/service.py b/nova/image/service.py
index ab6749049..5361cfc89 100644
--- a/nova/image/service.py
+++ b/nova/image/service.py
@@ -46,7 +46,7 @@ class BaseImageService(object):
# the ImageService subclass
SERVICE_IMAGE_ATTRS = []
- def index(self, context):
+ def index(self, context, *args, **kwargs):
"""List images.
:returns: a sequence of mappings with the following signature
@@ -55,7 +55,7 @@ class BaseImageService(object):
"""
raise NotImplementedError
- def detail(self, context):
+ def detail(self, context, *args, **kwargs):
"""Detailed information about an images.
:returns: a sequence of mappings with the following signature
diff --git a/nova/network/vmwareapi_net.py b/nova/network/vmwareapi_net.py
index 373060add..04210c011 100644
--- a/nova/network/vmwareapi_net.py
+++ b/nova/network/vmwareapi_net.py
@@ -30,9 +30,7 @@ LOG = logging.getLogger("nova.network.vmwareapi_net")
FLAGS = flags.FLAGS
-flags.DEFINE_string('vlan_interface', 'vmnic0',
- 'Physical network adapter name in VMware ESX host for '
- 'vlan networking')
+FLAGS['vlan_interface'].SetDefault('vmnic0')
def ensure_vlan_bridge(vlan_num, bridge, net_attrs=None):
diff --git a/nova/rpc.py b/nova/rpc.py
index 2116f22c3..c5277c6a9 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -28,12 +28,15 @@ import json
import sys
import time
import traceback
+import types
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenpool
-from eventlet import greenthread
+from eventlet import pools
+from eventlet import queue
+import greenlet
from nova import context
from nova import exception
@@ -47,7 +50,10 @@ LOG = logging.getLogger('nova.rpc')
FLAGS = flags.FLAGS
-flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
+flags.DEFINE_integer('rpc_thread_pool_size', 1024,
+ 'Size of RPC thread pool')
+flags.DEFINE_integer('rpc_conn_pool_size', 30,
+ 'Size of RPC connection pool')
class Connection(carrot_connection.BrokerConnection):
@@ -90,6 +96,22 @@ class Connection(carrot_connection.BrokerConnection):
return cls.instance()
+class Pool(pools.Pool):
+ """Class that implements a Pool of Connections."""
+
+ # TODO(comstud): Timeout connections not used in a while
+ def create(self):
+ LOG.debug('Creating new connection')
+ return Connection.instance(new=True)
+
+# Create a ConnectionPool to use for RPC calls. We'll order the
+# pool as a stack (LIFO), so that we can potentially loop through and
+# timeout old unused connections at some point
+ConnectionPool = Pool(
+ max_size=FLAGS.rpc_conn_pool_size,
+ order_as_stack=True)
+
+
class Consumer(messaging.Consumer):
"""Consumer base class.
@@ -131,7 +153,9 @@ class Consumer(messaging.Consumer):
self.connection = Connection.recreate()
self.backend = self.connection.create_backend()
self.declare()
- super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
+ return super(Consumer, self).fetch(no_ack,
+ auto_ack,
+ enable_callbacks)
if self.failed_connection:
LOG.error(_('Reconnected to queue'))
self.failed_connection = False
@@ -159,13 +183,13 @@ class AdapterConsumer(Consumer):
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
super(AdapterConsumer, self).__init__(connection=connection,
topic=topic)
+ self.register_callback(self.process_data)
- def receive(self, *args, **kwargs):
- self.pool.spawn_n(self._receive, *args, **kwargs)
+ def process_data(self, message_data, message):
+ """Consumer callback to call a method on a proxy object.
- @exception.wrap_exception
- def _receive(self, message_data, message):
- """Magically looks for a method on the proxy object and calls it.
+ Parses the message for validity and fires off a thread to call the
+ proxy object method.
Message data should be a dictionary with two keys:
method: string representing the method to call
@@ -175,8 +199,8 @@ class AdapterConsumer(Consumer):
"""
LOG.debug(_('received %s') % message_data)
- msg_id = message_data.pop('_msg_id', None)
-
+ # This will be popped off in _unpack_context
+ msg_id = message_data.get('_msg_id', None)
ctxt = _unpack_context(message_data)
method = message_data.get('method')
@@ -188,8 +212,17 @@ class AdapterConsumer(Consumer):
# we just log the message and send an error string
# back to the caller
LOG.warn(_('no method for message: %s') % message_data)
- msg_reply(msg_id, _('No method for message: %s') % message_data)
+ if msg_id:
+ msg_reply(msg_id,
+ _('No method for message: %s') % message_data)
return
+ self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
+
+ @exception.wrap_exception
+ def _process_data(self, msg_id, ctxt, method, args):
+ """Thread that maigcally looks for a method on the proxy
+ object and calls it.
+ """
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
@@ -197,7 +230,18 @@ class AdapterConsumer(Consumer):
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
- msg_reply(msg_id, rval, None)
+ # Check if the result was a generator
+ if isinstance(rval, types.GeneratorType):
+ for x in rval:
+ msg_reply(msg_id, x, None)
+ else:
+ msg_reply(msg_id, rval, None)
+
+ # This final None tells multicall that it is done.
+ msg_reply(msg_id, None, None)
+ elif isinstance(rval, types.GeneratorType):
+ # NOTE(vish): this iterates through the generator
+ list(rval)
except Exception as e:
logging.exception('Exception during message handling')
if msg_id:
@@ -205,11 +249,6 @@ class AdapterConsumer(Consumer):
return
-class Publisher(messaging.Publisher):
- """Publisher base class."""
- pass
-
-
class TopicAdapterConsumer(AdapterConsumer):
"""Consumes messages on a specific topic."""
@@ -242,6 +281,58 @@ class FanoutAdapterConsumer(AdapterConsumer):
topic=topic, proxy=proxy)
+class ConsumerSet(object):
+ """Groups consumers to listen on together on a single connection."""
+
+ def __init__(self, connection, consumer_list):
+ self.consumer_list = set(consumer_list)
+ self.consumer_set = None
+ self.enabled = True
+ self.init(connection)
+
+ def init(self, conn):
+ if not conn:
+ conn = Connection.instance(new=True)
+ if self.consumer_set:
+ self.consumer_set.close()
+ self.consumer_set = messaging.ConsumerSet(conn)
+ for consumer in self.consumer_list:
+ consumer.connection = conn
+ # consumer.backend is set for us
+ self.consumer_set.add_consumer(consumer)
+
+ def reconnect(self):
+ self.init(None)
+
+ def wait(self, limit=None):
+ running = True
+ while running:
+ it = self.consumer_set.iterconsume(limit=limit)
+ if not it:
+ break
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+ except greenlet.GreenletExit:
+ running = False
+ break
+ except Exception as e:
+ LOG.exception(_("Exception while processing consumer"))
+ self.reconnect()
+ # Break to outer loop
+ break
+
+ def close(self):
+ self.consumer_set.close()
+
+
+class Publisher(messaging.Publisher):
+ """Publisher base class."""
+ pass
+
+
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic."""
@@ -306,16 +397,18 @@ def msg_reply(msg_id, reply=None, failure=None):
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
- conn = Connection.instance()
- publisher = DirectPublisher(connection=conn, msg_id=msg_id)
- try:
- publisher.send({'result': reply, 'failure': failure})
- except TypeError:
- publisher.send(
- {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure})
- publisher.close()
+
+ with ConnectionPool.item() as conn:
+ publisher = DirectPublisher(connection=conn, msg_id=msg_id)
+ try:
+ publisher.send({'result': reply, 'failure': failure})
+ except TypeError:
+ publisher.send(
+ {'result': dict((k, repr(v))
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure})
+
+ publisher.close()
class RemoteError(exception.Error):
@@ -347,8 +440,9 @@ def _unpack_context(msg):
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
+ context_dict['msg_id'] = msg.pop('_msg_id', None)
LOG.debug(_('unpacked context: %s'), context_dict)
- return context.RequestContext.from_dict(context_dict)
+ return RpcContext.from_dict(context_dict)
def _pack_context(msg, context):
@@ -360,70 +454,112 @@ def _pack_context(msg, context):
for args at some point.
"""
- context = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
- msg.update(context)
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in context.to_dict().iteritems()])
+ msg.update(context_d)
-def call(context, topic, msg):
- """Sends a message on a topic and wait for a response."""
+class RpcContext(context.RequestContext):
+ def __init__(self, *args, **kwargs):
+ msg_id = kwargs.pop('msg_id', None)
+ self.msg_id = msg_id
+ super(RpcContext, self).__init__(*args, **kwargs)
+
+ def reply(self, *args, **kwargs):
+ msg_reply(self.msg_id, *args, **kwargs)
+
+
+def multicall(context, topic, msg):
+ """Make a call that returns multiple times."""
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
- class WaitMessage(object):
- def __call__(self, data, message):
- """Acks message and sets result."""
- message.ack()
- if data['failure']:
- self.result = RemoteError(*data['failure'])
- else:
- self.result = data['result']
-
- wait_msg = WaitMessage()
- conn = Connection.instance()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+ con_conn = ConnectionPool.get()
+ consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
+ wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg)
- conn = Connection.instance()
- publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher = TopicPublisher(connection=con_conn, topic=topic)
publisher.send(msg)
publisher.close()
- try:
- consumer.wait(limit=1)
- except StopIteration:
- pass
- consumer.close()
- # NOTE(termie): this is a little bit of a change from the original
- # non-eventlet code where returning a Failure
- # instance from a deferred call is very similar to
- # raising an exception
- if isinstance(wait_msg.result, Exception):
- raise wait_msg.result
- return wait_msg.result
+ return wait_msg
+
+
+class MulticallWaiter(object):
+ def __init__(self, consumer):
+ self._consumer = consumer
+ self._results = queue.Queue()
+ self._closed = False
+
+ def close(self):
+ self._closed = True
+ self._consumer.close()
+ ConnectionPool.put(self._consumer.connection)
+
+ def __call__(self, data, message):
+ """Acks message and sets result."""
+ message.ack()
+ if data['failure']:
+ self._results.put(RemoteError(*data['failure']))
+ else:
+ self._results.put(data['result'])
+
+ def __iter__(self):
+ return self.wait()
+
+ def wait(self):
+ while True:
+ rv = None
+ while rv is None and not self._closed:
+ try:
+ rv = self._consumer.fetch(enable_callbacks=True)
+ except Exception:
+ self.close()
+ raise
+ time.sleep(0.01)
+
+ result = self._results.get()
+ if isinstance(result, Exception):
+ self.close()
+ raise result
+ if result == None:
+ self.close()
+ raise StopIteration
+ yield result
+
+
+def call(context, topic, msg):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(context, topic, msg)
+ # NOTE(vish): return the last result from the multicall
+ rv = list(rv)
+ if not rv:
+ return
+ return rv[-1]
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context)
- conn = Connection.instance()
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
+ with ConnectionPool.item() as conn:
+ publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher.send(msg)
+ publisher.close()
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_pack_context(msg, context)
- conn = Connection.instance()
- publisher = FanoutPublisher(topic, connection=conn)
- publisher.send(msg)
- publisher.close()
+ with ConnectionPool.item() as conn:
+ publisher = FanoutPublisher(topic, connection=conn)
+ publisher.send(msg)
+ publisher.close()
def generic_response(message_data, message):
@@ -459,6 +595,7 @@ def send_message(topic, message, wait=True):
if wait:
consumer.wait()
+ consumer.close()
if __name__ == '__main__':
diff --git a/nova/service.py b/nova/service.py
index ab1238c3b..74f9f04d8 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -19,14 +19,11 @@
"""Generic Node baseclass for all workers that run on hosts."""
+import greenlet
import inspect
import os
-import sys
-import time
-from eventlet import event
from eventlet import greenthread
-from eventlet import greenpool
from nova import context
from nova import db
@@ -91,27 +88,37 @@ class Service(object):
if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt)
- conn1 = rpc.Connection.instance(new=True)
- conn2 = rpc.Connection.instance(new=True)
- conn3 = rpc.Connection.instance(new=True)
- if self.report_interval:
- consumer_all = rpc.TopicAdapterConsumer(
- connection=conn1,
- topic=self.topic,
- proxy=self)
- consumer_node = rpc.TopicAdapterConsumer(
- connection=conn2,
- topic='%s.%s' % (self.topic, self.host),
- proxy=self)
- fanout = rpc.FanoutAdapterConsumer(
- connection=conn3,
- topic=self.topic,
- proxy=self)
-
- self.timers.append(consumer_all.attach_to_eventlet())
- self.timers.append(consumer_node.attach_to_eventlet())
- self.timers.append(fanout.attach_to_eventlet())
+ self.conn = rpc.Connection.instance(new=True)
+ logging.debug("Creating Consumer connection for Service %s" %
+ self.topic)
+
+ # Share this same connection for these Consumers
+ consumer_all = rpc.TopicAdapterConsumer(
+ connection=self.conn,
+ topic=self.topic,
+ proxy=self)
+ consumer_node = rpc.TopicAdapterConsumer(
+ connection=self.conn,
+ topic='%s.%s' % (self.topic, self.host),
+ proxy=self)
+ fanout = rpc.FanoutAdapterConsumer(
+ connection=self.conn,
+ topic=self.topic,
+ proxy=self)
+ consumer_set = rpc.ConsumerSet(
+ connection=self.conn,
+ consumer_list=[consumer_all, consumer_node, fanout])
+
+ # Wait forever, processing these consumers
+ def _wait():
+ try:
+ consumer_set.wait()
+ finally:
+ consumer_set.close()
+
+ self.consumer_set_thread = greenthread.spawn(_wait)
+ if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
self.timers.append(pulse)
@@ -174,6 +181,11 @@ class Service(object):
logging.warn(_('Service killed that has no database entry'))
def stop(self):
+ self.consumer_set_thread.kill()
+ try:
+ self.consumer_set_thread.wait()
+ except greenlet.GreenletExit:
+ pass
for x in self.timers:
try:
x.stop()
diff --git a/nova/test.py b/nova/test.py
index 4deb2a175..80b2d0a74 100644
--- a/nova/test.py
+++ b/nova/test.py
@@ -31,17 +31,15 @@ import uuid
import unittest
import mox
-import shutil
import stubout
from eventlet import greenthread
-from nova import context
-from nova import db
from nova import fakerabbit
from nova import flags
from nova import rpc
from nova import service
from nova import wsgi
+from nova.virt import fake
FLAGS = flags.FLAGS
@@ -85,6 +83,7 @@ class TestCase(unittest.TestCase):
self._monkey_patch_attach()
self._monkey_patch_wsgi()
self._original_flags = FLAGS.FlagValuesDict()
+ rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
def tearDown(self):
"""Runs after each test method to tear down test environment."""
@@ -99,6 +98,10 @@ class TestCase(unittest.TestCase):
if FLAGS.fake_rabbit:
fakerabbit.reset_all()
+ if FLAGS.connection_type == 'fake':
+ if hasattr(fake.FakeConnection, '_instance'):
+ del fake.FakeConnection._instance
+
# Reset any overriden flags
self.reset_flags()
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index bf51239e6..8e0156afa 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -166,11 +166,11 @@ def stub_out_glance(stubs, initial_fixtures=None):
def __init__(self, initial_fixtures):
self.fixtures = initial_fixtures or []
- def fake_get_images(self):
+ def fake_get_images(self, filters=None):
return [dict(id=f['id'], name=f['name'])
for f in self.fixtures]
- def fake_get_images_detailed(self):
+ def fake_get_images_detailed(self, filters=None):
return copy.deepcopy(self.fixtures)
def fake_get_image_meta(self, image_id):
diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py
index 2c329f920..9f1f28611 100644
--- a/nova/tests/api/openstack/test_images.py
+++ b/nova/tests/api/openstack/test_images.py
@@ -28,6 +28,7 @@ import shutil
import tempfile
import xml.dom.minidom as minidom
+import mox
import stubout
import webob
@@ -708,6 +709,146 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
self.assertDictListMatch(expected, response_list)
+ def test_image_filter_with_name(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'name': 'testname'}
+ image_service.index(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images?name=testname')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.index(request)
+ mocker.VerifyAll()
+
+ def test_image_filter_with_status(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'status': 'ACTIVE'}
+ image_service.index(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images?status=ACTIVE')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.index(request)
+ mocker.VerifyAll()
+
+ def test_image_filter_with_property(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'property-test': '3'}
+ image_service.index(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images?property-test=3')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.index(request)
+ mocker.VerifyAll()
+
+ def test_image_filter_not_supported(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'status': 'ACTIVE'}
+ image_service.index(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images?status=ACTIVE&UNSUPPORTEDFILTER=testname')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.index(request)
+ mocker.VerifyAll()
+
+ def test_image_no_filters(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {}
+ image_service.index(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.index(request)
+ mocker.VerifyAll()
+
+ def test_image_detail_filter_with_name(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'name': 'testname'}
+ image_service.detail(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images/detail?name=testname')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.detail(request)
+ mocker.VerifyAll()
+
+ def test_image_detail_filter_with_status(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'status': 'ACTIVE'}
+ image_service.detail(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images/detail?status=ACTIVE')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.detail(request)
+ mocker.VerifyAll()
+
+ def test_image_detail_filter_with_property(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'property-test': '3'}
+ image_service.detail(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images/detail?property-test=3')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.detail(request)
+ mocker.VerifyAll()
+
+ def test_image_detail_filter_not_supported(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {'status': 'ACTIVE'}
+ image_service.detail(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images/detail?status=ACTIVE&UNSUPPORTEDFILTER=testname')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.detail(request)
+ mocker.VerifyAll()
+
+ def test_image_detail_no_filters(self):
+ mocker = mox.Mox()
+ image_service = mocker.CreateMockAnything()
+ context = object()
+ filters = {}
+ image_service.detail(context, filters).AndReturn([])
+ mocker.ReplayAll()
+ request = webob.Request.blank(
+ '/v1.1/images/detail')
+ request.environ['nova.context'] = context
+ controller = images.ControllerV11(image_service=image_service)
+ controller.detail(request)
+ mocker.VerifyAll()
+
def test_get_image_found(self):
req = webob.Request.blank('/v1.0/images/123')
res = req.get_response(fakes.wsgi_app())
diff --git a/nova/tests/image/test_glance.py b/nova/tests/image/test_glance.py
index 109905ded..6d108d494 100644
--- a/nova/tests/image/test_glance.py
+++ b/nova/tests/image/test_glance.py
@@ -34,7 +34,7 @@ class StubGlanceClient(object):
def get_image_meta(self, image_id):
return self.images[image_id]
- def get_images_detailed(self):
+ def get_images_detailed(self, filters=None):
return self.images.itervalues()
def get_image(self, image_id):
diff --git a/nova/tests/integrated/api/client.py b/nova/tests/integrated/api/client.py
index 7e20c9b00..eb9a3056e 100644
--- a/nova/tests/integrated/api/client.py
+++ b/nova/tests/integrated/api/client.py
@@ -152,7 +152,10 @@ class TestOpenStackClient(object):
def _decode_json(self, response):
body = response.read()
LOG.debug(_("Decoding JSON: %s") % (body))
- return json.loads(body)
+ if body:
+ return json.loads(body)
+ else:
+ return ""
def api_get(self, relative_uri, **kwargs):
kwargs.setdefault('check_response_status', [200])
@@ -166,7 +169,7 @@ class TestOpenStackClient(object):
headers['Content-Type'] = 'application/json'
kwargs['body'] = json.dumps(body)
- kwargs.setdefault('check_response_status', [200])
+ kwargs.setdefault('check_response_status', [200, 202])
response = self.api_request(relative_uri, **kwargs)
return self._decode_json(response)
@@ -185,6 +188,9 @@ class TestOpenStackClient(object):
def post_server(self, server):
return self.api_post('/servers', server)['server']
+ def post_server_action(self, server_id, data):
+ return self.api_post('/servers/%s/action' % server_id, data)
+
def delete_server(self, server_id):
return self.api_delete('/servers/%s' % server_id)
diff --git a/nova/tests/integrated/integrated_helpers.py b/nova/tests/integrated/integrated_helpers.py
index bc98921f0..7f590441e 100644
--- a/nova/tests/integrated/integrated_helpers.py
+++ b/nova/tests/integrated/integrated_helpers.py
@@ -154,10 +154,7 @@ class _IntegratedTestBase(test.TestCase):
# set up services
self.start_service('compute')
self.start_service('volume')
- # NOTE(justinsb): There's a bug here which is eluding me...
- # If we start the network_service, all is good, but then subsequent
- # tests fail: CloudTestCase.test_ajax_console in particular.
- #self.start_service('network')
+ self.start_service('network')
self.start_service('scheduler')
self._start_api_service()
diff --git a/nova/tests/integrated/test_servers.py b/nova/tests/integrated/test_servers.py
index e89d0100a..a67fa1bb5 100644
--- a/nova/tests/integrated/test_servers.py
+++ b/nova/tests/integrated/test_servers.py
@@ -179,6 +179,112 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
# Cleanup
self._delete_server(created_server_id)
+ def test_create_and_rebuild_server(self):
+ """Rebuild a server."""
+
+ # create a server with initially has no metadata
+ server = self._build_minimal_create_server_request()
+ server_post = {'server': server}
+ created_server = self.api.post_server(server_post)
+ LOG.debug("created_server: %s" % created_server)
+ self.assertTrue(created_server['id'])
+ created_server_id = created_server['id']
+
+ # rebuild the server with metadata
+ post = {}
+ post['rebuild'] = {
+ "imageRef": "https://localhost/v1.1/32278/images/2",
+ "name": "blah"
+ }
+
+ self.api.post_server_action(created_server_id, post)
+ LOG.debug("rebuilt server: %s" % created_server)
+ self.assertTrue(created_server['id'])
+
+ found_server = self.api.get_server(created_server_id)
+ self.assertEqual(created_server_id, found_server['id'])
+ self.assertEqual({}, found_server.get('metadata'))
+ self.assertEqual('blah', found_server.get('name'))
+
+ # Cleanup
+ self._delete_server(created_server_id)
+
+ def test_create_and_rebuild_server_with_metadata(self):
+ """Rebuild a server with metadata."""
+
+ # create a server with initially has no metadata
+ server = self._build_minimal_create_server_request()
+ server_post = {'server': server}
+ created_server = self.api.post_server(server_post)
+ LOG.debug("created_server: %s" % created_server)
+ self.assertTrue(created_server['id'])
+ created_server_id = created_server['id']
+
+ # rebuild the server with metadata
+ post = {}
+ post['rebuild'] = {
+ "imageRef": "https://localhost/v1.1/32278/images/2",
+ "name": "blah"
+ }
+
+ metadata = {}
+ for i in range(30):
+ metadata['key_%s' % i] = 'value_%s' % i
+
+ post['rebuild']['metadata'] = metadata
+
+ self.api.post_server_action(created_server_id, post)
+ LOG.debug("rebuilt server: %s" % created_server)
+ self.assertTrue(created_server['id'])
+
+ found_server = self.api.get_server(created_server_id)
+ self.assertEqual(created_server_id, found_server['id'])
+ self.assertEqual(metadata, found_server.get('metadata'))
+ self.assertEqual('blah', found_server.get('name'))
+
+ # Cleanup
+ self._delete_server(created_server_id)
+
+ def test_create_and_rebuild_server_with_metadata_removal(self):
+ """Rebuild a server with metadata."""
+
+ # create a server with initially has no metadata
+ server = self._build_minimal_create_server_request()
+ server_post = {'server': server}
+
+ metadata = {}
+ for i in range(30):
+ metadata['key_%s' % i] = 'value_%s' % i
+
+ server_post['server']['metadata'] = metadata
+
+ created_server = self.api.post_server(server_post)
+ LOG.debug("created_server: %s" % created_server)
+ self.assertTrue(created_server['id'])
+ created_server_id = created_server['id']
+
+ # rebuild the server with metadata
+ post = {}
+ post['rebuild'] = {
+ "imageRef": "https://localhost/v1.1/32278/images/2",
+ "name": "blah"
+ }
+
+ metadata = {}
+ post['rebuild']['metadata'] = metadata
+
+ self.api.post_server_action(created_server_id, post)
+ LOG.debug("rebuilt server: %s" % created_server)
+ self.assertTrue(created_server['id'])
+
+ found_server = self.api.get_server(created_server_id)
+ self.assertEqual(created_server_id, found_server['id'])
+ self.assertEqual(metadata, found_server.get('metadata'))
+ self.assertEqual('blah', found_server.get('name'))
+
+ # Cleanup
+ self._delete_server(created_server_id)
+
if __name__ == "__main__":
unittest.main()
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index 54c0454de..55ea6be02 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -17,13 +17,9 @@
# under the License.
from base64 import b64decode
-import json
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import shutil
-import tempfile
-import time
from eventlet import greenthread
@@ -33,12 +29,10 @@ from nova import db
from nova import flags
from nova import log as logging
from nova import rpc
-from nova import service
from nova import test
from nova import utils
from nova import exception
from nova.auth import manager
-from nova.compute import power_state
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
@@ -79,14 +73,21 @@ class CloudTestCase(test.TestCase):
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
+ # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
+ rpc_cast = rpc.cast
+
+ def finish_cast(*args, **kwargs):
+ rpc_cast(*args, **kwargs)
+ greenthread.sleep(0.2)
+
+ self.stubs.Set(rpc, 'cast', finish_cast)
+
def tearDown(self):
network_ref = db.project_get_network(self.context,
self.project.id)
db.network_disassociate(self.context, network_ref['id'])
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
- self.compute.kill()
- self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -113,7 +114,6 @@ class CloudTestCase(test.TestCase):
self.cloud.describe_addresses(self.context)
self.cloud.release_address(self.context,
public_ip=address)
- greenthread.sleep(0.3)
db.floating_ip_destroy(self.context, address)
def test_associate_disassociate_address(self):
@@ -129,12 +129,10 @@ class CloudTestCase(test.TestCase):
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
- greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
public_ip=address)
- greenthread.sleep(0.3)
self.network.deallocate_fixed_ip(self.context, fixed)
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
@@ -171,6 +169,25 @@ class CloudTestCase(test.TestCase):
db.volume_destroy(self.context, vol1['id'])
db.volume_destroy(self.context, vol2['id'])
+ def test_create_volume_from_snapshot(self):
+ """Makes sure create_volume works when we specify a snapshot."""
+ vol = db.volume_create(self.context, {'size': 1})
+ snap = db.snapshot_create(self.context, {'volume_id': vol['id'],
+ 'volume_size': vol['size'],
+ 'status': "available"})
+ snapshot_id = ec2utils.id_to_ec2_id(snap['id'], 'snap-%08x')
+
+ result = self.cloud.create_volume(self.context,
+ snapshot_id=snapshot_id)
+ volume_id = result['volumeId']
+ result = self.cloud.describe_volumes(self.context)
+ self.assertEqual(len(result['volumeSet']), 2)
+ self.assertEqual(result['volumeSet'][1]['volumeId'], volume_id)
+
+ db.volume_destroy(self.context, ec2utils.ec2_id_to_id(volume_id))
+ db.snapshot_destroy(self.context, snap['id'])
+ db.volume_destroy(self.context, vol['id'])
+
def test_describe_availability_zones(self):
"""Makes sure describe_availability_zones works and filters results."""
service1 = db.service_create(self.context, {'host': 'host1_zones',
@@ -188,6 +205,52 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, service1['id'])
db.service_destroy(self.context, service2['id'])
+ def test_describe_snapshots(self):
+ """Makes sure describe_snapshots works and filters results."""
+ vol = db.volume_create(self.context, {})
+ snap1 = db.snapshot_create(self.context, {'volume_id': vol['id']})
+ snap2 = db.snapshot_create(self.context, {'volume_id': vol['id']})
+ result = self.cloud.describe_snapshots(self.context)
+ self.assertEqual(len(result['snapshotSet']), 2)
+ snapshot_id = ec2utils.id_to_ec2_id(snap2['id'], 'snap-%08x')
+ result = self.cloud.describe_snapshots(self.context,
+ snapshot_id=[snapshot_id])
+ self.assertEqual(len(result['snapshotSet']), 1)
+ self.assertEqual(
+ ec2utils.ec2_id_to_id(result['snapshotSet'][0]['snapshotId']),
+ snap2['id'])
+ db.snapshot_destroy(self.context, snap1['id'])
+ db.snapshot_destroy(self.context, snap2['id'])
+ db.volume_destroy(self.context, vol['id'])
+
+ def test_create_snapshot(self):
+ """Makes sure create_snapshot works."""
+ vol = db.volume_create(self.context, {'status': "available"})
+ volume_id = ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x')
+
+ result = self.cloud.create_snapshot(self.context,
+ volume_id=volume_id)
+ snapshot_id = result['snapshotId']
+ result = self.cloud.describe_snapshots(self.context)
+ self.assertEqual(len(result['snapshotSet']), 1)
+ self.assertEqual(result['snapshotSet'][0]['snapshotId'], snapshot_id)
+
+ db.snapshot_destroy(self.context, ec2utils.ec2_id_to_id(snapshot_id))
+ db.volume_destroy(self.context, vol['id'])
+
+ def test_delete_snapshot(self):
+ """Makes sure delete_snapshot works."""
+ vol = db.volume_create(self.context, {'status': "available"})
+ snap = db.snapshot_create(self.context, {'volume_id': vol['id'],
+ 'status': "available"})
+ snapshot_id = ec2utils.id_to_ec2_id(snap['id'], 'snap-%08x')
+
+ result = self.cloud.delete_snapshot(self.context,
+ snapshot_id=snapshot_id)
+ self.assertTrue(result)
+
+ db.volume_destroy(self.context, vol['id'])
+
def test_describe_instances(self):
"""Makes sure describe_instances works and filters results."""
inst1 = db.instance_create(self.context, {'reservation_id': 'a',
@@ -306,31 +369,25 @@ class CloudTestCase(test.TestCase):
'instance_type': instance_type,
'max_count': max_count}
rv = self.cloud.run_instances(self.context, **kwargs)
- greenthread.sleep(0.3)
instance_id = rv['instancesSet'][0]['instanceId']
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
- greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
- greenthread.sleep(0.3)
def test_ajax_console(self):
kwargs = {'image_id': 'ami-1'}
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
- greenthread.sleep(0.3)
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
'%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
- greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
- greenthread.sleep(0.3)
def test_key_generation(self):
result = self._create_key('test')
diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py
index 916fca55e..ad73a3a69 100644
--- a/nova/tests/test_quota.py
+++ b/nova/tests/test_quota.py
@@ -250,6 +250,7 @@ class QuotaTestCase(test.TestCase):
volume.API().create,
self.context,
size=10,
+ snapshot_id=None,
name='',
description='')
for volume_id in volume_ids:
@@ -263,6 +264,7 @@ class QuotaTestCase(test.TestCase):
volume.API().create,
self.context,
size=10,
+ snapshot_id=None,
name='',
description='')
for volume_id in volume_ids:
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index 44d7c91eb..ffd748efe 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -31,7 +31,6 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase):
- """Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True)
@@ -43,14 +42,55 @@ class RpcTestCase(test.TestCase):
self.context = context.get_admin_context()
def test_call_succeed(self):
- """Get a value through rpc call"""
value = 42
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
+ def test_call_succeed_despite_multiple_returns(self):
+ value = 42
+ result = rpc.call(self.context, 'test', {"method": "echo_three_times",
+ "args": {"value": value}})
+ self.assertEqual(value + 2, result)
+
+ def test_call_succeed_despite_multiple_returns_yield(self):
+ value = 42
+ result = rpc.call(self.context, 'test',
+ {"method": "echo_three_times_yield",
+ "args": {"value": value}})
+ self.assertEqual(value + 2, result)
+
+ def test_multicall_succeed_once(self):
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo",
+ "args": {"value": value}})
+ for i, x in enumerate(result):
+ if i > 0:
+ self.fail('should only receive one response')
+ self.assertEqual(value + i, x)
+
+ def test_multicall_succeed_three_times(self):
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo_three_times",
+ "args": {"value": value}})
+ for i, x in enumerate(result):
+ self.assertEqual(value + i, x)
+
+ def test_multicall_succeed_three_times_yield(self):
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo_three_times_yield",
+ "args": {"value": value}})
+ for i, x in enumerate(result):
+ self.assertEqual(value + i, x)
+
def test_context_passed(self):
- """Makes sure a context is passed through rpc call"""
+ """Makes sure a context is passed through rpc call."""
value = 42
result = rpc.call(self.context,
'test', {"method": "context",
@@ -58,11 +98,12 @@ class RpcTestCase(test.TestCase):
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
- """Test that exception gets passed back properly
+ """Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
+
"""
value = 42
self.assertRaises(rpc.RemoteError,
@@ -81,7 +122,7 @@ class RpcTestCase(test.TestCase):
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
- """Test that we can do an rpc.call inside another call"""
+ """Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
@@ -108,25 +149,80 @@ class RpcTestCase(test.TestCase):
"value": value}})
self.assertEqual(value, result)
+ def test_connectionpool_single(self):
+ """Test that ConnectionPool recycles a single connection."""
+ conn1 = rpc.ConnectionPool.get()
+ rpc.ConnectionPool.put(conn1)
+ conn2 = rpc.ConnectionPool.get()
+ rpc.ConnectionPool.put(conn2)
+ self.assertEqual(conn1, conn2)
+
+ def test_connectionpool_double(self):
+ """Test that ConnectionPool returns and reuses separate connections.
+
+ When called consecutively we should get separate connections and upon
+ returning them those connections should be reused for future calls
+ before generating a new connection.
+
+ """
+ conn1 = rpc.ConnectionPool.get()
+ conn2 = rpc.ConnectionPool.get()
+
+ self.assertNotEqual(conn1, conn2)
+ rpc.ConnectionPool.put(conn1)
+ rpc.ConnectionPool.put(conn2)
+
+ conn3 = rpc.ConnectionPool.get()
+ conn4 = rpc.ConnectionPool.get()
+ self.assertEqual(conn1, conn3)
+ self.assertEqual(conn2, conn4)
+
+ def test_connectionpool_limit(self):
+ """Test connection pool limit and connection uniqueness."""
+ max_size = FLAGS.rpc_conn_pool_size
+ conns = []
+
+ for i in xrange(max_size):
+ conns.append(rpc.ConnectionPool.get())
+
+ self.assertFalse(rpc.ConnectionPool.free_items)
+ self.assertEqual(rpc.ConnectionPool.current_size,
+ rpc.ConnectionPool.max_size)
+ self.assertEqual(len(set(conns)), max_size)
+
class TestReceiver(object):
- """Simple Proxy class so the consumer has methods to call
+ """Simple Proxy class so the consumer has methods to call.
+
+ Uses static methods because we aren't actually storing any state.
- Uses static methods because we aren't actually storing any state"""
+ """
@staticmethod
def echo(context, value):
- """Simply returns whatever value is sent in"""
+ """Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
- """Returns dictionary version of context"""
+ """Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
+ def echo_three_times(context, value):
+ context.reply(value)
+ context.reply(value + 1)
+ context.reply(value + 2)
+
+ @staticmethod
+ def echo_three_times_yield(context, value):
+ yield value
+ yield value + 1
+ yield value + 2
+
+ @staticmethod
def fail(context, value):
- """Raises an exception with the value sent in"""
+ """Raises an exception with the value sent in."""
raise Exception(value)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index d48de2057..d1cc8bd61 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -106,7 +106,10 @@ class ServiceTestCase(test.TestCase):
# NOTE(vish): Create was moved out of mox replay to make sure that
# the looping calls are created in StartService.
- app = service.Service.create(host=host, binary=binary)
+ app = service.Service.create(host=host, binary=binary, topic=topic)
+
+ self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
+ service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(rpc,
'TopicAdapterConsumer',
@@ -114,6 +117,11 @@ class ServiceTestCase(test.TestCase):
self.mox.StubOutWithMock(rpc,
'FanoutAdapterConsumer',
use_mock_anything=True)
+
+ self.mox.StubOutWithMock(rpc,
+ 'ConsumerSet',
+ use_mock_anything=True)
+
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@@ -129,9 +137,14 @@ class ServiceTestCase(test.TestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.FanoutAdapterConsumer)
- rpc.TopicAdapterConsumer.attach_to_eventlet()
- rpc.TopicAdapterConsumer.attach_to_eventlet()
- rpc.FanoutAdapterConsumer.attach_to_eventlet()
+ def wait_func(self, limit=None):
+ return None
+
+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
+ {'wait': wait_func})
+ rpc.ConsumerSet(connection=mox.IgnoreArg(),
+ consumer_list=mox.IsA(list)).AndReturn(mock_cset)
+ wait_func(mox.IgnoreArg())
service_create = {'host': host,
'binary': binary,
@@ -287,8 +300,42 @@ class ServiceTestCase(test.TestCase):
# Creating mocks
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
- service.rpc.Connection.instance(new=mox.IgnoreArg())
- service.rpc.Connection.instance(new=mox.IgnoreArg())
+
+ self.mox.StubOutWithMock(rpc,
+ 'TopicAdapterConsumer',
+ use_mock_anything=True)
+ self.mox.StubOutWithMock(rpc,
+ 'FanoutAdapterConsumer',
+ use_mock_anything=True)
+
+ self.mox.StubOutWithMock(rpc,
+ 'ConsumerSet',
+ use_mock_anything=True)
+
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.TopicAdapterConsumer)
+
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
+ topic='%s.%s' % (topic, host),
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.TopicAdapterConsumer)
+
+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.FanoutAdapterConsumer)
+
+ def wait_func(self, limit=None):
+ return None
+
+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
+ {'wait': wait_func})
+ rpc.ConsumerSet(connection=mox.IgnoreArg(),
+ consumer_list=mox.IsA(list)).AndReturn(mock_cset)
+ wait_func(mox.IgnoreArg())
+
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py
index 236d12434..4f10ee6af 100644
--- a/nova/tests/test_volume.py
+++ b/nova/tests/test_volume.py
@@ -45,10 +45,11 @@ class VolumeTestCase(test.TestCase):
self.context = context.get_admin_context()
@staticmethod
- def _create_volume(size='0'):
+ def _create_volume(size='0', snapshot_id=None):
"""Create a volume object."""
vol = {}
vol['size'] = size
+ vol['snapshot_id'] = snapshot_id
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['availability_zone'] = FLAGS.storage_availability_zone
@@ -69,6 +70,25 @@ class VolumeTestCase(test.TestCase):
self.context,
volume_id)
+ def test_create_volume_from_snapshot(self):
+ """Test volume can be created from a snapshot."""
+ volume_src_id = self._create_volume()
+ self.volume.create_volume(self.context, volume_src_id)
+ snapshot_id = self._create_snapshot(volume_src_id)
+ self.volume.create_snapshot(self.context, volume_src_id, snapshot_id)
+ volume_dst_id = self._create_volume(0, snapshot_id)
+ self.volume.create_volume(self.context, volume_dst_id, snapshot_id)
+ self.assertEqual(volume_dst_id, db.volume_get(
+ context.get_admin_context(),
+ volume_dst_id).id)
+ self.assertEqual(snapshot_id, db.volume_get(
+ context.get_admin_context(),
+ volume_dst_id).snapshot_id)
+
+ self.volume.delete_volume(self.context, volume_dst_id)
+ self.volume.delete_snapshot(self.context, snapshot_id)
+ self.volume.delete_volume(self.context, volume_src_id)
+
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
@@ -176,6 +196,34 @@ class VolumeTestCase(test.TestCase):
# This will allow us to test cross-node interactions
pass
+ @staticmethod
+ def _create_snapshot(volume_id, size='0'):
+ """Create a snapshot object."""
+ snap = {}
+ snap['volume_size'] = size
+ snap['user_id'] = 'fake'
+ snap['project_id'] = 'fake'
+ snap['volume_id'] = volume_id
+ snap['status'] = "creating"
+ return db.snapshot_create(context.get_admin_context(), snap)['id']
+
+ def test_create_delete_snapshot(self):
+ """Test snapshot can be created and deleted."""
+ volume_id = self._create_volume()
+ self.volume.create_volume(self.context, volume_id)
+ snapshot_id = self._create_snapshot(volume_id)
+ self.volume.create_snapshot(self.context, volume_id, snapshot_id)
+ self.assertEqual(snapshot_id,
+ db.snapshot_get(context.get_admin_context(),
+ snapshot_id).id)
+
+ self.volume.delete_snapshot(self.context, snapshot_id)
+ self.assertRaises(exception.NotFound,
+ db.snapshot_get,
+ self.context,
+ snapshot_id)
+ self.volume.delete_volume(self.context, volume_id)
+
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index be1e35697..9d56c1644 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -395,6 +395,29 @@ class XenAPIVMTestCase(test.TestCase):
os_type="linux")
self.check_vm_params_for_linux()
+ def test_spawn_vhd_glance_swapdisk(self):
+ # Change the default host_call_plugin to one that'll return
+ # a swap disk
+ orig_func = stubs.FakeSessionForVMTests.host_call_plugin
+
+ stubs.FakeSessionForVMTests.host_call_plugin = \
+ stubs.FakeSessionForVMTests.host_call_plugin_swap
+
+ try:
+ # We'll steal the above glance linux test
+ self.test_spawn_vhd_glance_linux()
+ finally:
+ # Make sure to put this back
+ stubs.FakeSessionForVMTests.host_call_plugin = orig_func
+
+ # We should have 2 VBDs.
+ self.assertEqual(len(self.vm['VBDs']), 2)
+ # Now test that we have 1.
+ self.tearDown()
+ self.setUp()
+ self.test_spawn_vhd_glance_linux()
+ self.assertEqual(len(self.vm['VBDs']), 1)
+
def test_spawn_vhd_glance_windows(self):
FLAGS.xenapi_image_service = 'glance'
self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None,
@@ -569,11 +592,29 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
bob_shared = self.bob.compute_shared(alice_pub)
self.assertEquals(alice_shared, bob_shared)
- def test_encryption(self):
- msg = "This is a top-secret message"
- enc = self.alice.encrypt(msg)
+ def _test_encryption(self, message):
+ enc = self.alice.encrypt(message)
+ self.assertFalse(enc.endswith('\n'))
dec = self.bob.decrypt(enc)
- self.assertEquals(dec, msg)
+ self.assertEquals(dec, message)
+
+ def test_encrypt_simple_message(self):
+ self._test_encryption('This is a simple message.')
+
+ def test_encrypt_message_with_newlines_at_end(self):
+ self._test_encryption('This message has a newline at the end.\n')
+
+ def test_encrypt_many_newlines_at_end(self):
+ self._test_encryption('Message with lotsa newlines.\n\n\n')
+
+ def test_encrypt_newlines_inside_message(self):
+ self._test_encryption('Message\nwith\ninterior\nnewlines.')
+
+ def test_encrypt_with_leading_newlines(self):
+ self._test_encryption('\n\nMessage with leading newlines.')
+
+ def test_encrypt_really_long_message(self):
+ self._test_encryption(''.join(['abcd' for i in xrange(1024)]))
def tearDown(self):
super(XenAPIDiffieHellmanTestCase, self).tearDown()
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 4833ccb07..35308d95f 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -17,6 +17,7 @@
"""Stubouts, mocks and fixtures for the test suite"""
import eventlet
+import json
from nova.virt import xenapi_conn
from nova.virt.xenapi import fake
from nova.virt.xenapi import volume_utils
@@ -37,7 +38,7 @@ def stubout_instance_snapshot(stubs):
sr_ref=sr_ref, sharable=False)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
vdi_uuid = vdi_rec['uuid']
- return vdi_uuid
+ return [dict(vdi_type='os', vdi_uuid=vdi_uuid)]
stubs.Set(vm_utils.VMHelper, 'fetch_image', fake_fetch_image)
@@ -132,11 +133,30 @@ class FakeSessionForVMTests(fake.SessionBase):
def __init__(self, uri):
super(FakeSessionForVMTests, self).__init__(uri)
- def host_call_plugin(self, _1, _2, _3, _4, _5):
+ def host_call_plugin(self, _1, _2, plugin, method, _5):
+ sr_ref = fake.get_all('SR')[0]
+ vdi_ref = fake.create_vdi('', False, sr_ref, False)
+ vdi_rec = fake.get_record('VDI', vdi_ref)
+ if plugin == "glance" and method == "download_vhd":
+ ret_str = json.dumps([dict(vdi_type='os',
+ vdi_uuid=vdi_rec['uuid'])])
+ else:
+ ret_str = vdi_rec['uuid']
+ return '<string>%s</string>' % ret_str
+
+ def host_call_plugin_swap(self, _1, _2, plugin, method, _5):
sr_ref = fake.get_all('SR')[0]
vdi_ref = fake.create_vdi('', False, sr_ref, False)
vdi_rec = fake.get_record('VDI', vdi_ref)
- return '<string>%s</string>' % vdi_rec['uuid']
+ if plugin == "glance" and method == "download_vhd":
+ swap_vdi_ref = fake.create_vdi('', False, sr_ref, False)
+ swap_vdi_rec = fake.get_record('VDI', swap_vdi_ref)
+ ret_str = json.dumps(
+ [dict(vdi_type='os', vdi_uuid=vdi_rec['uuid']),
+ dict(vdi_type='swap', vdi_uuid=swap_vdi_rec['uuid'])])
+ else:
+ ret_str = vdi_rec['uuid']
+ return '<string>%s</string>' % ret_str
def VM_start(self, _1, ref, _2, _3):
vm = fake.get_record('VM', ref)
diff --git a/nova/virt/libvirt.xml.template b/nova/virt/libvirt.xml.template
index de2497a76..20986d4d5 100644
--- a/nova/virt/libvirt.xml.template
+++ b/nova/virt/libvirt.xml.template
@@ -116,7 +116,7 @@
</serial>
#if $getVar('vncserver_host', False)
- <graphics type='vnc' port='-1' autoport='yes' keymap='en-us' listen='${vncserver_host}'/>
+ <graphics type='vnc' port='-1' autoport='yes' keymap='${vnc_keymap}' listen='${vncserver_host}'/>
#end if
</devices>
</domain>
diff --git a/nova/virt/libvirt/connection.py b/nova/virt/libvirt/connection.py
index dfeda3814..178ea0048 100644
--- a/nova/virt/libvirt/connection.py
+++ b/nova/virt/libvirt/connection.py
@@ -962,6 +962,7 @@ class LibvirtConnection(driver.ComputeDriver):
if FLAGS.vnc_enabled:
if FLAGS.libvirt_type != 'lxc':
xml_info['vncserver_host'] = FLAGS.vncserver_host
+ xml_info['vnc_keymap'] = FLAGS.vnc_keymap
if not rescue:
if instance['kernel_id']:
xml_info['kernel'] = xml_info['basepath'] + "/kernel"
diff --git a/nova/virt/vmwareapi/vmops.py b/nova/virt/vmwareapi/vmops.py
index c3e79a92f..6d7149841 100644
--- a/nova/virt/vmwareapi/vmops.py
+++ b/nova/virt/vmwareapi/vmops.py
@@ -590,11 +590,11 @@ class VMWareVMOps(object):
def pause(self, instance, callback):
"""Pause a VM instance."""
- raise exception.APIError("pause not supported for vmwareapi")
+ raise exception.ApiError("pause not supported for vmwareapi")
def unpause(self, instance, callback):
"""Un-Pause a VM instance."""
- raise exception.APIError("unpause not supported for vmwareapi")
+ raise exception.ApiError("unpause not supported for vmwareapi")
def suspend(self, instance, callback):
"""Suspend the specified instance."""
@@ -673,7 +673,7 @@ class VMWareVMOps(object):
def get_diagnostics(self, instance):
"""Return data about VM diagnostics."""
- raise exception.APIError("get_diagnostics not implemented for "
+ raise exception.ApiError("get_diagnostics not implemented for "
"vmwareapi")
def get_console_output(self, instance):
diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py
index e36ef3288..76988b172 100644
--- a/nova/virt/xenapi/fake.py
+++ b/nova/virt/xenapi/fake.py
@@ -159,7 +159,10 @@ def after_VBD_create(vbd_ref, vbd_rec):
vbd_rec['device'] = ''
vm_ref = vbd_rec['VM']
vm_rec = _db_content['VM'][vm_ref]
- vm_rec['VBDs'] = [vbd_ref]
+ if vm_rec.get('VBDs', None):
+ vm_rec['VBDs'].append(vbd_ref)
+ else:
+ vm_rec['VBDs'] = [vbd_ref]
vm_name_label = _db_content['VM'][vm_ref]['name_label']
vbd_rec['vm_name_label'] = vm_name_label
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
index 9f6cd608c..06ee8ee9b 100644
--- a/nova/virt/xenapi/vm_utils.py
+++ b/nova/virt/xenapi/vm_utils.py
@@ -19,6 +19,7 @@ Helper methods for operations related to the management of VM records and
their attributes like VDIs, VIFs, as well as their lookup functions.
"""
+import json
import os
import pickle
import re
@@ -376,6 +377,9 @@ class VMHelper(HelperBase):
xenapi_image_service = ['glance', 'objectstore']
glance_address = 'address for glance services'
glance_port = 'port for glance services'
+
+ Returns: A single filename if image_type is KERNEL_RAMDISK
+ A list of dictionaries that describe VDIs, otherwise
"""
access = AuthManager().get_access_key(user, project)
@@ -390,6 +394,10 @@ class VMHelper(HelperBase):
@classmethod
def _fetch_image_glance_vhd(cls, session, instance_id, image, access,
image_type):
+ """Tell glance to download an image and put the VHDs into the SR
+
+ Returns: A list of dictionaries that describe VDIs
+ """
LOG.debug(_("Asking xapi to fetch vhd image %(image)s")
% locals())
@@ -408,18 +416,26 @@ class VMHelper(HelperBase):
kwargs = {'params': pickle.dumps(params)}
task = session.async_call_plugin('glance', 'download_vhd', kwargs)
- vdi_uuid = session.wait_for_task(task, instance_id)
+ result = session.wait_for_task(task, instance_id)
+ # 'download_vhd' will return a json encoded string containing
+ # a list of dictionaries describing VDIs. The dictionary will
+ # contain 'vdi_type' and 'vdi_uuid' keys. 'vdi_type' can be
+ # 'os' or 'swap' right now.
+ vdis = json.loads(result)
+ for vdi in vdis:
+ LOG.debug(_("xapi 'download_vhd' returned VDI of "
+ "type '%(vdi_type)s' with UUID '%(vdi_uuid)s'" % vdi))
cls.scan_sr(session, instance_id, sr_ref)
+ # Pull out the UUID of the first VDI
+ vdi_uuid = vdis[0]['vdi_uuid']
# Set the name-label to ease debugging
vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid)
- name_label = get_name_label_for_image(image)
- session.get_xenapi().VDI.set_name_label(vdi_ref, name_label)
+ primary_name_label = get_name_label_for_image(image)
+ session.get_xenapi().VDI.set_name_label(vdi_ref, primary_name_label)
- LOG.debug(_("xapi 'download_vhd' returned VDI UUID %(vdi_uuid)s")
- % locals())
- return vdi_uuid
+ return vdis
@classmethod
def _fetch_image_glance_disk(cls, session, instance_id, image, access,
@@ -431,6 +447,8 @@ class VMHelper(HelperBase):
plugin; instead, it streams the disks through domU to the VDI
directly.
+ Returns: A single filename if image_type is KERNEL_RAMDISK
+ A list of dictionaries that describe VDIs, otherwise
"""
# FIXME(sirp): Since the Glance plugin seems to be required for the
# VHD disk, it may be worth using the plugin for both VHD and RAW and
@@ -476,7 +494,8 @@ class VMHelper(HelperBase):
LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref)
return filename
else:
- return session.get_xenapi().VDI.get_uuid(vdi_ref)
+ vdi_uuid = session.get_xenapi().VDI.get_uuid(vdi_ref)
+ return [dict(vdi_type='os', vdi_uuid=vdi_uuid)]
@classmethod
def determine_disk_image_type(cls, instance):
@@ -535,6 +554,11 @@ class VMHelper(HelperBase):
@classmethod
def _fetch_image_glance(cls, session, instance_id, image, access,
image_type):
+ """Fetch image from glance based on image type.
+
+ Returns: A single filename if image_type is KERNEL_RAMDISK
+ A list of dictionaries that describe VDIs, otherwise
+ """
if image_type == ImageType.DISK_VHD:
return cls._fetch_image_glance_vhd(
session, instance_id, image, access, image_type)
@@ -545,6 +569,11 @@ class VMHelper(HelperBase):
@classmethod
def _fetch_image_objectstore(cls, session, instance_id, image, access,
secret, image_type):
+ """Fetch an image from objectstore.
+
+ Returns: A single filename if image_type is KERNEL_RAMDISK
+ A list of dictionaries that describe VDIs, otherwise
+ """
url = images.image_url(image)
LOG.debug(_("Asking xapi to fetch %(url)s as %(access)s") % locals())
if image_type == ImageType.KERNEL_RAMDISK:
@@ -562,8 +591,10 @@ class VMHelper(HelperBase):
if image_type == ImageType.DISK_RAW:
args['raw'] = 'true'
task = session.async_call_plugin('objectstore', fn, args)
- uuid = session.wait_for_task(task, instance_id)
- return uuid
+ uuid_or_fn = session.wait_for_task(task, instance_id)
+ if image_type != ImageType.KERNEL_RAMDISK:
+ return [dict(vdi_type='os', vdi_uuid=uuid_or_fn)]
+ return uuid_or_fn
@classmethod
def determine_is_pv(cls, session, instance_id, vdi_ref, disk_image_type,
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index be6ef48ea..2b3fb6a39 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -91,7 +91,8 @@ class VMOps(object):
def finish_resize(self, instance, disk_info):
vdi_uuid = self.link_disks(instance, disk_info['base_copy'],
disk_info['cow'])
- vm_ref = self._create_vm(instance, vdi_uuid)
+ vm_ref = self._create_vm(instance,
+ [dict(vdi_type='os', vdi_uuid=vdi_uuid)])
self.resize_instance(instance, vdi_uuid)
self._spawn(instance, vm_ref)
@@ -105,24 +106,25 @@ class VMOps(object):
LOG.debug(_("Starting instance %s"), instance.name)
self._session.call_xenapi('VM.start', vm_ref, False, False)
- def _create_disk(self, instance):
+ def _create_disks(self, instance):
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
disk_image_type = VMHelper.determine_disk_image_type(instance)
- vdi_uuid = VMHelper.fetch_image(self._session, instance.id,
- instance.image_id, user, project, disk_image_type)
- return vdi_uuid
+ vdis = VMHelper.fetch_image(self._session,
+ instance.id, instance.image_id, user, project,
+ disk_image_type)
+ return vdis
def spawn(self, instance, network_info=None):
- vdi_uuid = self._create_disk(instance)
- vm_ref = self._create_vm(instance, vdi_uuid, network_info)
+ vdis = self._create_disks(instance)
+ vm_ref = self._create_vm(instance, vdis, network_info)
self._spawn(instance, vm_ref)
def spawn_rescue(self, instance):
"""Spawn a rescue instance."""
self.spawn(instance)
- def _create_vm(self, instance, vdi_uuid, network_info=None):
+ def _create_vm(self, instance, vdis, network_info=None):
"""Create VM instance."""
instance_name = instance.name
vm_ref = VMHelper.lookup(self._session, instance_name)
@@ -141,28 +143,43 @@ class VMOps(object):
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
- # Are we building from a pre-existing disk?
- vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
-
disk_image_type = VMHelper.determine_disk_image_type(instance)
kernel = None
if instance.kernel_id:
kernel = VMHelper.fetch_image(self._session, instance.id,
- instance.kernel_id, user, project, ImageType.KERNEL_RAMDISK)
+ instance.kernel_id, user, project,
+ ImageType.KERNEL_RAMDISK)
ramdisk = None
if instance.ramdisk_id:
ramdisk = VMHelper.fetch_image(self._session, instance.id,
- instance.ramdisk_id, user, project, ImageType.KERNEL_RAMDISK)
-
- use_pv_kernel = VMHelper.determine_is_pv(self._session, instance.id,
- vdi_ref, disk_image_type, instance.os_type)
- vm_ref = VMHelper.create_vm(self._session, instance, kernel, ramdisk,
- use_pv_kernel)
-
+ instance.ramdisk_id, user, project,
+ ImageType.KERNEL_RAMDISK)
+
+ # Create the VM ref and attach the first disk
+ first_vdi_ref = self._session.call_xenapi('VDI.get_by_uuid',
+ vdis[0]['vdi_uuid'])
+ use_pv_kernel = VMHelper.determine_is_pv(self._session,
+ instance.id, first_vdi_ref, disk_image_type,
+ instance.os_type)
+ vm_ref = VMHelper.create_vm(self._session, instance,
+ kernel, ramdisk, use_pv_kernel)
VMHelper.create_vbd(session=self._session, vm_ref=vm_ref,
- vdi_ref=vdi_ref, userdevice=0, bootable=True)
+ vdi_ref=first_vdi_ref, userdevice=0, bootable=True)
+
+ # Attach any other disks
+ # userdevice 1 is reserved for rescue
+ userdevice = 2
+ for vdi in vdis[1:]:
+ # vdi['vdi_type'] is either 'os' or 'swap', but we don't
+ # really care what it is right here.
+ vdi_ref = self._session.call_xenapi('VDI.get_by_uuid',
+ vdi['vdi_uuid'])
+ VMHelper.create_vbd(session=self._session, vm_ref=vm_ref,
+ vdi_ref=vdi_ref, userdevice=userdevice,
+ bootable=False)
+ userdevice += 1
# TODO(tr3buchet) - check to make sure we have network info, otherwise
# create it now. This goes away once nova-multi-nic hits.
@@ -172,7 +189,7 @@ class VMOps(object):
# Alter the image before VM start for, e.g. network injection
if FLAGS.xenapi_inject_image:
VMHelper.preconfigure_instance(self._session, instance,
- vdi_ref, network_info)
+ first_vdi_ref, network_info)
self.create_vifs(vm_ref, network_info)
self.inject_network_info(instance, network_info, vm_ref)
@@ -1173,26 +1190,22 @@ class SimpleDH(object):
mpi = M2Crypto.m2.bn_to_mpi(bn)
return mpi
- def _run_ssl(self, text, which):
- base_cmd = ('openssl enc -aes-128-cbc -a -pass pass:%(shared)s '
- '-nosalt %(dec_flag)s')
- if which.lower()[0] == 'd':
- dec_flag = ' -d'
- else:
- dec_flag = ''
- shared = self._shared
- cmd = base_cmd % locals()
- proc = _runproc(cmd)
- proc.stdin.write(text + '\n')
+ def _run_ssl(self, text, extra_args=None):
+ if not extra_args:
+ extra_args = ''
+ cmd = 'enc -aes-128-cbc -A -a -pass pass:%s -nosalt %s' % (
+ self._shared, extra_args)
+ proc = _runproc('openssl %s' % cmd)
+ proc.stdin.write(text)
proc.stdin.close()
proc.wait()
err = proc.stderr.read()
if err:
raise RuntimeError(_('OpenSSL error: %s') % err)
- return proc.stdout.read().strip('\n')
+ return proc.stdout.read()
def encrypt(self, text):
- return self._run_ssl(text, 'enc')
+ return self._run_ssl(text).strip('\n')
def decrypt(self, text):
- return self._run_ssl(text, 'dec')
+ return self._run_ssl(text, '-d')
diff --git a/nova/vnc/__init__.py b/nova/vnc/__init__.py
index b5b00e44e..859bfd65f 100644
--- a/nova/vnc/__init__.py
+++ b/nova/vnc/__init__.py
@@ -32,3 +32,5 @@ flags.DEFINE_string('vncserver_host', '0.0.0.0',
'the host interface on which vnc server should listen')
flags.DEFINE_bool('vnc_enabled', True,
'enable vnc related features')
+flags.DEFINE_string('vnc_keymap', 'en-us',
+ 'keymap for vnc')
diff --git a/nova/volume/api.py b/nova/volume/api.py
index 09befb647..5804955f7 100644
--- a/nova/volume/api.py
+++ b/nova/volume/api.py
@@ -39,7 +39,14 @@ LOG = logging.getLogger('nova.volume')
class API(base.Base):
"""API for interacting with the volume manager."""
- def create(self, context, size, name, description):
+ def create(self, context, size, snapshot_id, name, description):
+ if snapshot_id != None:
+ snapshot = self.get_snapshot(context, snapshot_id)
+ if snapshot['status'] != "available":
+ raise exception.ApiError(
+ _("Snapshot status must be available"))
+ size = snapshot['volume_size']
+
if quota.allowed_volumes(context, 1, size) < 1:
pid = context.project_id
LOG.warn(_("Quota exceeeded for %(pid)s, tried to create"
@@ -51,6 +58,7 @@ class API(base.Base):
'size': size,
'user_id': context.user_id,
'project_id': context.project_id,
+ 'snapshot_id': snapshot_id,
'availability_zone': FLAGS.storage_availability_zone,
'status': "creating",
'attach_status': "detached",
@@ -62,7 +70,8 @@ class API(base.Base):
FLAGS.scheduler_topic,
{"method": "create_volume",
"args": {"topic": FLAGS.volume_topic,
- "volume_id": volume['id']}})
+ "volume_id": volume['id'],
+ "snapshot_id": snapshot_id}})
return volume
def delete(self, context, volume_id):
@@ -90,6 +99,15 @@ class API(base.Base):
return self.db.volume_get_all(context)
return self.db.volume_get_all_by_project(context, context.project_id)
+ def get_snapshot(self, context, snapshot_id):
+ rv = self.db.snapshot_get(context, snapshot_id)
+ return dict(rv.iteritems())
+
+ def get_all_snapshots(self, context):
+ if context.is_admin:
+ return self.db.snapshot_get_all(context)
+ return self.db.snapshot_get_all_by_project(context, context.project_id)
+
def check_attach(self, context, volume_id):
volume = self.get(context, volume_id)
# TODO(vish): abstract status checking?
@@ -110,3 +128,38 @@ class API(base.Base):
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "remove_volume",
"args": {'volume_id': volume_id}})
+
+ def create_snapshot(self, context, volume_id, name, description):
+ volume = self.get(context, volume_id)
+ if volume['status'] != "available":
+ raise exception.ApiError(_("Volume status must be available"))
+
+ options = {
+ 'volume_id': volume_id,
+ 'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'status': "creating",
+ 'progress': '0%',
+ 'volume_size': volume['size'],
+ 'display_name': name,
+ 'display_description': description}
+
+ snapshot = self.db.snapshot_create(context, options)
+ rpc.cast(context,
+ FLAGS.scheduler_topic,
+ {"method": "create_snapshot",
+ "args": {"topic": FLAGS.volume_topic,
+ "volume_id": volume_id,
+ "snapshot_id": snapshot['id']}})
+ return snapshot
+
+ def delete_snapshot(self, context, snapshot_id):
+ snapshot = self.get_snapshot(context, snapshot_id)
+ if snapshot['status'] != "available":
+ raise exception.ApiError(_("Snapshot status must be available"))
+ self.db.snapshot_update(context, snapshot_id, {'status': 'deleting'})
+ rpc.cast(context,
+ FLAGS.scheduler_topic,
+ {"method": "delete_snapshot",
+ "args": {"topic": FLAGS.volume_topic,
+ "snapshot_id": snapshot_id}})
diff --git a/nova/volume/driver.py b/nova/volume/driver.py
index 55307ad9b..87e13277f 100644
--- a/nova/volume/driver.py
+++ b/nova/volume/driver.py
@@ -90,42 +90,97 @@ class VolumeDriver(object):
raise exception.Error(_("volume group %s doesn't exist")
% FLAGS.volume_group)
- def create_volume(self, volume):
- """Creates a logical volume. Can optionally return a Dictionary of
- changes to the volume object to be persisted."""
- if int(volume['size']) == 0:
- sizestr = '100M'
- else:
- sizestr = '%sG' % volume['size']
+ def _create_volume(self, volume_name, sizestr):
self._try_execute('sudo', 'lvcreate', '-L', sizestr, '-n',
- volume['name'],
- FLAGS.volume_group)
+ volume_name, FLAGS.volume_group)
- def delete_volume(self, volume):
- """Deletes a logical volume."""
+ def _copy_volume(self, srcstr, deststr, size_in_g):
+ self._execute('sudo', 'dd', 'if=%s' % srcstr, 'of=%s' % deststr,
+ 'count=%d' % (size_in_g * 1024), 'bs=1M')
+
+ def _volume_not_present(self, volume_name):
+ path_name = '%s/%s' % (FLAGS.volume_group, volume_name)
try:
- self._try_execute('sudo', 'lvdisplay',
- '%s/%s' %
- (FLAGS.volume_group,
- volume['name']))
+ self._try_execute('sudo', 'lvdisplay', path_name)
except Exception as e:
- # If the volume isn't present, then don't attempt to delete
+ # If the volume isn't present
return True
+ return False
+ def _delete_volume(self, volume, size_in_g):
+ """Deletes a logical volume."""
# zero out old volumes to prevent data leaking between users
# TODO(ja): reclaiming space should be done lazy and low priority
- self._execute('sudo', 'dd', 'if=/dev/zero',
- 'of=%s' % self.local_path(volume),
- 'count=%d' % (volume['size'] * 1024),
- 'bs=1M')
+ self._copy_volume('/dev/zero', self.local_path(volume), size_in_g)
self._try_execute('sudo', 'lvremove', '-f', "%s/%s" %
(FLAGS.volume_group,
- volume['name']))
+ self._escape_snapshot(volume['name'])))
+
+ def _sizestr(self, size_in_g):
+ if int(size_in_g) == 0:
+ return '100M'
+ return '%sG' % size_in_g
+
+ # Linux LVM reserves name that starts with snapshot, so that
+ # such volume name can't be created. Mangle it.
+ def _escape_snapshot(self, snapshot_name):
+ if not snapshot_name.startswith('snapshot'):
+ return snapshot_name
+ return '_' + snapshot_name
+
+ def create_volume(self, volume):
+ """Creates a logical volume. Can optionally return a Dictionary of
+ changes to the volume object to be persisted."""
+ self._create_volume(volume['name'], self._sizestr(volume['size']))
+
+ def create_volume_from_snapshot(self, volume, snapshot):
+ """Creates a volume from a snapshot."""
+ self._create_volume(volume['name'], self._sizestr(volume['size']))
+ self._copy_volume(self.local_path(snapshot), self.local_path(volume),
+ snapshot['volume_size'])
+
+ def delete_volume(self, volume):
+ """Deletes a logical volume."""
+ if self._volume_not_present(volume['name']):
+ # If the volume isn't present, then don't attempt to delete
+ return True
+
+ # TODO(yamahata): lvm can't delete origin volume only without
+ # deleting derived snapshots. Can we do something fancy?
+ out, err = self._execute('sudo', 'lvdisplay', '--noheading',
+ '-C', '-o', 'Attr',
+ '%s/%s' % (FLAGS.volume_group,
+ volume['name']))
+ # fake_execute returns None resulting unit test error
+ if out:
+ out = out.strip()
+ if (out[0] == 'o') or (out[0] == 'O'):
+ raise exception.VolumeIsBusy(volume_name=volume['name'])
+
+ self._delete_volume(volume, volume['size'])
+
+ def create_snapshot(self, snapshot):
+ """Creates a snapshot."""
+ orig_lv_name = "%s/%s" % (FLAGS.volume_group, snapshot['volume_name'])
+ self._try_execute('sudo', 'lvcreate', '-L',
+ self._sizestr(snapshot['volume_size']),
+ '--name', self._escape_snapshot(snapshot['name']),
+ '--snapshot', orig_lv_name)
+
+ def delete_snapshot(self, snapshot):
+ """Deletes a snapshot."""
+ if self._volume_not_present(self._escape_snapshot(snapshot['name'])):
+ # If the snapshot isn't present, then don't attempt to delete
+ return True
+
+ # TODO(yamahata): zeroing out the whole snapshot triggers COW.
+ # it's quite slow.
+ self._delete_volume(snapshot, snapshot['volume_size'])
def local_path(self, volume):
# NOTE(vish): stops deprecation warning
escaped_group = FLAGS.volume_group.replace('-', '--')
- escaped_name = volume['name'].replace('-', '--')
+ escaped_name = self._escape_snapshot(volume['name']).replace('-', '--')
return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
def ensure_export(self, context, volume):
@@ -559,6 +614,18 @@ class RBDDriver(VolumeDriver):
self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
'rm', volume['name'])
+ def create_snapshot(self, snapshot):
+ """Creates an rbd snapshot"""
+ self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
+ 'snap', 'create', '--snap', snapshot['name'],
+ snapshot['volume_name'])
+
+ def delete_snapshot(self, snapshot):
+ """Deletes an rbd snapshot"""
+ self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
+ 'snap', 'rm', '--snap', snapshot['name'],
+ snapshot['volume_name'])
+
def local_path(self, volume):
"""Returns the path of the rbd volume."""
# This is the same as the remote path
@@ -600,18 +667,31 @@ class SheepdogDriver(VolumeDriver):
def create_volume(self, volume):
"""Creates a sheepdog volume"""
- if int(volume['size']) == 0:
- sizestr = '100M'
- else:
- sizestr = '%sG' % volume['size']
self._try_execute('qemu-img', 'create',
"sheepdog:%s" % volume['name'],
- sizestr)
+ self._sizestr(volume['size']))
+
+ def create_volume_from_snapshot(self, volume, snapshot):
+ """Creates a sheepdog volume from a snapshot."""
+ self._try_execute('qemu-img', 'create', '-b',
+ "sheepdog:%s:%s" % (snapshot['volume_name'],
+ snapshot['name']),
+ "sheepdog:%s" % volume['name'])
def delete_volume(self, volume):
"""Deletes a logical volume"""
self._try_execute('collie', 'vdi', 'delete', volume['name'])
+ def create_snapshot(self, snapshot):
+ """Creates a sheepdog snapshot"""
+ self._try_execute('qemu-img', 'snapshot', '-c', snapshot['name'],
+ "sheepdog:%s" % snapshot['volume_name'])
+
+ def delete_snapshot(self, snapshot):
+ """Deletes a sheepdog snapshot"""
+ self._try_execute('collie', 'vdi', 'delete', snapshot['volume_name'],
+ '-s', snapshot['name'])
+
def local_path(self, volume):
return "sheepdog:%s" % volume['name']
diff --git a/nova/volume/manager.py b/nova/volume/manager.py
index 2178389ce..ff53f0701 100644
--- a/nova/volume/manager.py
+++ b/nova/volume/manager.py
@@ -90,7 +90,7 @@ class VolumeManager(manager.SchedulerDependentManager):
else:
LOG.info(_("volume %s: skipping export"), volume['name'])
- def create_volume(self, context, volume_id):
+ def create_volume(self, context, volume_id, snapshot_id=None):
"""Creates and exports the volume."""
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
@@ -108,7 +108,13 @@ class VolumeManager(manager.SchedulerDependentManager):
vol_size = volume_ref['size']
LOG.debug(_("volume %(vol_name)s: creating lv of"
" size %(vol_size)sG") % locals())
- model_update = self.driver.create_volume(volume_ref)
+ if snapshot_id == None:
+ model_update = self.driver.create_volume(volume_ref)
+ else:
+ snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+ model_update = self.driver.create_volume_from_snapshot(
+ volume_ref,
+ snapshot_ref)
if model_update:
self.db.volume_update(context, volume_ref['id'], model_update)
@@ -142,6 +148,12 @@ class VolumeManager(manager.SchedulerDependentManager):
self.driver.remove_export(context, volume_ref)
LOG.debug(_("volume %s: deleting"), volume_ref['name'])
self.driver.delete_volume(volume_ref)
+ except exception.VolumeIsBusy, e:
+ LOG.debug(_("volume %s: volume is busy"), volume_ref['name'])
+ self.driver.ensure_export(context, volume_ref)
+ self.db.volume_update(context, volume_ref['id'],
+ {'status': 'available'})
+ return True
except Exception:
self.db.volume_update(context,
volume_ref['id'],
@@ -152,6 +164,49 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])
return True
+ def create_snapshot(self, context, volume_id, snapshot_id):
+ """Creates and exports the snapshot."""
+ context = context.elevated()
+ snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+ LOG.info(_("snapshot %s: creating"), snapshot_ref['name'])
+
+ try:
+ snap_name = snapshot_ref['name']
+ LOG.debug(_("snapshot %(snap_name)s: creating") % locals())
+ model_update = self.driver.create_snapshot(snapshot_ref)
+ if model_update:
+ self.db.snapshot_update(context, snapshot_ref['id'],
+ model_update)
+
+ except Exception:
+ self.db.snapshot_update(context,
+ snapshot_ref['id'], {'status': 'error'})
+ raise
+
+ self.db.snapshot_update(context,
+ snapshot_ref['id'], {'status': 'available',
+ 'progress': '100%'})
+ LOG.debug(_("snapshot %s: created successfully"), snapshot_ref['name'])
+ return snapshot_id
+
+ def delete_snapshot(self, context, snapshot_id):
+ """Deletes and unexports snapshot."""
+ context = context.elevated()
+ snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+
+ try:
+ LOG.debug(_("snapshot %s: deleting"), snapshot_ref['name'])
+ self.driver.delete_snapshot(snapshot_ref)
+ except Exception:
+ self.db.snapshot_update(context,
+ snapshot_ref['id'],
+ {'status': 'error_deleting'})
+ raise
+
+ self.db.snapshot_destroy(context, snapshot_id)
+ LOG.debug(_("snapshot %s: deleted successfully"), snapshot_ref['name'])
+ return True
+
def setup_compute_volume(self, context, volume_id):
"""Setup remote volume on compute host.
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance
index 4b45671ae..0c00d168b 100644
--- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance
@@ -22,6 +22,10 @@
#
import httplib
+try:
+ import json
+except ImportError:
+ import simplejson as json
import os
import os.path
import pickle
@@ -87,8 +91,8 @@ def _download_tarball(sr_path, staging_path, image_id, glance_host,
conn.close()
-def _fixup_vhds(sr_path, staging_path, uuid_stack):
- """Fixup the downloaded VHDs before we move them into the SR.
+def _import_vhds(sr_path, staging_path, uuid_stack):
+ """Import the VHDs found in the staging path.
We cannot extract VHDs directly into the SR since they don't yet have
UUIDs, aren't properly associated with each other, and would be subject to
@@ -98,16 +102,25 @@ def _fixup_vhds(sr_path, staging_path, uuid_stack):
To avoid these we problems, we use a staging area to fixup the VHDs before
moving them into the SR. The steps involved are:
- 1. Extracting tarball into staging area
+ 1. Extracting tarball into staging area (done prior to this call)
2. Renaming VHDs to use UUIDs ('snap.vhd' -> 'ffff-aaaa-...vhd')
- 3. Linking the two VHDs together
+ 3. Linking VHDs together if there's a snap.vhd
4. Pseudo-atomically moving the images into the SR. (It's not really
- atomic because it takes place as two os.rename operations; however,
- the chances of an SR.scan occuring between the two rename()
+ atomic because it takes place as multiple os.rename operations;
+ however, the chances of an SR.scan occuring between the rename()s
invocations is so small that we can safely ignore it)
+
+ Returns: A list of VDIs. Each list element is a dictionary containing
+ information about the VHD. Dictionary keys are:
+ 1. "vdi_type" - The type of VDI. Currently they can be "os_disk" or
+ "swap"
+ 2. "vdi_uuid" - The UUID of the VDI
+
+ Example return: [{"vdi_type": "os_disk","vdi_uuid": "ffff-aaa..vhd"},
+ {"vdi_type": "swap","vdi_uuid": "ffff-bbb..vhd"}]
"""
def rename_with_uuid(orig_path):
"""Rename VHD using UUID so that it will be recognized by SR on a
@@ -158,27 +171,59 @@ def _fixup_vhds(sr_path, staging_path, uuid_stack):
"VHD %(path)s is marked as hidden without child" %
locals())
- orig_base_copy_path = os.path.join(staging_path, 'image.vhd')
- if not os.path.exists(orig_base_copy_path):
+ def prepare_if_exists(staging_path, vhd_name, parent_path=None):
+ """
+ Check for existance of a particular VHD in the staging path and
+ preparing it for moving into the SR.
+
+ Returns: Tuple of (Path to move into the SR, VDI_UUID)
+ None, if the vhd_name doesn't exist in the staging path
+
+ If the VHD exists, we will do the following:
+ 1. Rename it with a UUID.
+ 2. If parent_path exists, we'll link up the VHDs.
+ """
+ orig_path = os.path.join(staging_path, vhd_name)
+ if not os.path.exists(orig_path):
+ return None
+ new_path, vdi_uuid = rename_with_uuid(orig_path)
+ if parent_path:
+ # NOTE(sirp): this step is necessary so that an SR scan won't
+ # delete the base_copy out from under us (since it would be
+ # orphaned)
+ link_vhds(new_path, parent_path)
+ return (new_path, vdi_uuid)
+
+ vdi_return_list = []
+ paths_to_move = []
+
+ image_info = prepare_if_exists(staging_path, 'image.vhd')
+ if not image_info:
raise Exception("Invalid image: image.vhd not present")
- base_copy_path, base_copy_uuid = rename_with_uuid(orig_base_copy_path)
-
- vdi_uuid = base_copy_uuid
- orig_snap_path = os.path.join(staging_path, 'snap.vhd')
- if os.path.exists(orig_snap_path):
- snap_path, snap_uuid = rename_with_uuid(orig_snap_path)
- vdi_uuid = snap_uuid
- # NOTE(sirp): this step is necessary so that an SR scan won't
- # delete the base_copy out from under us (since it would be
- # orphaned)
- link_vhds(snap_path, base_copy_path)
- move_into_sr(snap_path)
+ paths_to_move.append(image_info[0])
+
+ snap_info = prepare_if_exists(staging_path, 'snap.vhd',
+ image_info[0])
+ if snap_info:
+ paths_to_move.append(snap_info[0])
+ # We return this snap as the VDI instead of image.vhd
+ vdi_return_list.append(dict(vdi_type="os", vdi_uuid=snap_info[1]))
else:
- assert_vhd_not_hidden(base_copy_path)
+ assert_vhd_not_hidden(image_info[0])
+ # If there's no snap, we return the image.vhd UUID
+ vdi_return_list.append(dict(vdi_type="os", vdi_uuid=image_info[1]))
+
+ swap_info = prepare_if_exists(staging_path, 'swap.vhd')
+ if swap_info:
+ assert_vhd_not_hidden(swap_info[0])
+ paths_to_move.append(swap_info[0])
+ vdi_return_list.append(dict(vdi_type="swap", vdi_uuid=swap_info[1]))
+
+ for path in paths_to_move:
+ move_into_sr(path)
- move_into_sr(base_copy_path)
- return vdi_uuid
+ return vdi_return_list
def _prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids):
@@ -324,8 +369,9 @@ def download_vhd(session, args):
try:
_download_tarball(sr_path, staging_path, image_id, glance_host,
glance_port)
- vdi_uuid = _fixup_vhds(sr_path, staging_path, uuid_stack)
- return vdi_uuid
+ # Right now, it's easier to return a single string via XenAPI,
+ # so we'll json encode the list of VHDs.
+ return json.dumps(_import_vhds(sr_path, staging_path, uuid_stack))
finally:
_cleanup_staging_area(staging_path)
diff --git a/tools/pip-requires b/tools/pip-requires
index 8f8018765..f1c5b2003 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -17,8 +17,7 @@ redis==2.0.0
routes==1.12.3
WebOb==0.9.8
wsgiref==0.1.2
-mox==0.5.0
--f http://pymox.googlecode.com/files/mox-0.5.0.tar.gz
+mox==0.5.3
greenlet==0.3.1
nose
bzr