diff options
| author | Todd Willey <todd@ansolabs.com> | 2011-05-31 16:20:35 -0400 |
|---|---|---|
| committer | Todd Willey <todd@ansolabs.com> | 2011-05-31 16:20:35 -0400 |
| commit | eca6c3098144c1bf917725d906a50141a5aaef4e (patch) | |
| tree | 2ee52158c2c6ca49f6675359b1977bc53ca72621 /nova | |
| parent | 4d7dbdc96e30afbd19ab525e9667f6e3aaaafbe9 (diff) | |
| parent | 0b7104a8c6b2b2e3fed4a09b239439964aeb2774 (diff) | |
Merge Trunk.
Diffstat (limited to 'nova')
46 files changed, 1611 insertions, 279 deletions
diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py index c13993dd3..1915d007d 100644 --- a/nova/api/ec2/__init__.py +++ b/nova/api/ec2/__init__.py @@ -327,6 +327,12 @@ class Executor(wsgi.Application): ec2_id = ec2utils.id_to_ec2_id(ex.volume_id, 'vol-%08x') message = _('Volume %s not found') % ec2_id return self._error(req, context, type(ex).__name__, message) + except exception.SnapshotNotFound as ex: + LOG.info(_('SnapshotNotFound raised: %s'), unicode(ex), + context=context) + ec2_id = ec2utils.id_to_ec2_id(ex.snapshot_id, 'snap-%08x') + message = _('Snapshot %s not found') % ec2_id + return self._error(req, context, type(ex).__name__, message) except exception.NotFound as ex: LOG.info(_('NotFound raised: %s'), unicode(ex), context=context) return self._error(req, context, type(ex).__name__, unicode(ex)) diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index c35b6024e..79cc3b3bf 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -283,14 +283,50 @@ class CloudController(object): owner=None, restorable_by=None, **kwargs): - return {'snapshotSet': [{'snapshotId': 'fixme', - 'volumeId': 'fixme', - 'status': 'fixme', - 'startTime': 'fixme', - 'progress': 'fixme', - 'ownerId': 'fixme', - 'volumeSize': 0, - 'description': 'fixme'}]} + if snapshot_id: + snapshots = [] + for ec2_id in snapshot_id: + internal_id = ec2utils.ec2_id_to_id(ec2_id) + snapshot = self.volume_api.get_snapshot( + context, + snapshot_id=internal_id) + snapshots.append(snapshot) + else: + snapshots = self.volume_api.get_all_snapshots(context) + snapshots = [self._format_snapshot(context, s) for s in snapshots] + return {'snapshotSet': snapshots} + + def _format_snapshot(self, context, snapshot): + s = {} + s['snapshotId'] = ec2utils.id_to_ec2_id(snapshot['id'], 'snap-%08x') + s['volumeId'] = ec2utils.id_to_ec2_id(snapshot['volume_id'], + 'vol-%08x') + s['status'] = snapshot['status'] + s['startTime'] = snapshot['created_at'] + s['progress'] = snapshot['progress'] + s['ownerId'] = snapshot['project_id'] + s['volumeSize'] = snapshot['volume_size'] + s['description'] = snapshot['display_description'] + + s['display_name'] = snapshot['display_name'] + s['display_description'] = snapshot['display_description'] + return s + + def create_snapshot(self, context, volume_id, **kwargs): + LOG.audit(_("Create snapshot of volume %s"), volume_id, + context=context) + volume_id = ec2utils.ec2_id_to_id(volume_id) + snapshot = self.volume_api.create_snapshot( + context, + volume_id=volume_id, + name=kwargs.get('display_name'), + description=kwargs.get('display_description')) + return self._format_snapshot(context, snapshot) + + def delete_snapshot(self, context, snapshot_id, **kwargs): + snapshot_id = ec2utils.ec2_id_to_id(snapshot_id) + self.volume_api.delete_snapshot(context, snapshot_id=snapshot_id) + return True def describe_key_pairs(self, context, key_name=None, **kwargs): key_pairs = db.key_pair_get_all_by_user(context, context.user_id) @@ -619,16 +655,30 @@ class CloudController(object): 'volumeId': v['volumeId']}] else: v['attachmentSet'] = [{}] + if volume.get('snapshot_id') != None: + v['snapshotId'] = ec2utils.id_to_ec2_id(volume['snapshot_id'], + 'snap-%08x') + else: + v['snapshotId'] = None v['display_name'] = volume['display_name'] v['display_description'] = volume['display_description'] return v - def create_volume(self, context, size, **kwargs): - LOG.audit(_("Create volume of %s GB"), size, context=context) + def create_volume(self, context, **kwargs): + size = kwargs.get('size') + if kwargs.get('snapshot_id') != None: + snapshot_id = ec2utils.ec2_id_to_id(kwargs['snapshot_id']) + LOG.audit(_("Create volume from snapshot %s"), snapshot_id, + context=context) + else: + snapshot_id = None + LOG.audit(_("Create volume of %s GB"), size, context=context) + volume = self.volume_api.create( context, size=size, + snapshot_id=snapshot_id, name=kwargs.get('display_name'), description=kwargs.get('display_description')) # TODO(vish): Instance should be None at db layer instead of diff --git a/nova/api/openstack/contrib/volumes.py b/nova/api/openstack/contrib/volumes.py index 18de2ec71..b22bd2846 100644 --- a/nova/api/openstack/contrib/volumes.py +++ b/nova/api/openstack/contrib/volumes.py @@ -135,7 +135,7 @@ class VolumeController(wsgi.Controller): vol = env['volume'] size = vol['size'] LOG.audit(_("Create volume of %s GB"), size, context=context) - new_volume = self.volume_api.create(context, size, + new_volume = self.volume_api.create(context, size, None, vol.get('display_name'), vol.get('display_description')) diff --git a/nova/api/openstack/images.py b/nova/api/openstack/images.py index 34d4c27fc..2e779da79 100644 --- a/nova/api/openstack/images.py +++ b/nova/api/openstack/images.py @@ -28,6 +28,8 @@ from nova.api.openstack.views import images as images_view LOG = log.getLogger('nova.api.openstack.images') FLAGS = flags.FLAGS +SUPPORTED_FILTERS = ['name', 'status'] + class Controller(common.OpenstackController): """Base `wsgi.Controller` for retrieving/displaying images.""" @@ -59,7 +61,8 @@ class Controller(common.OpenstackController): :param req: `wsgi.Request` object """ context = req.environ['nova.context'] - images = self._image_service.index(context) + filters = self._get_filters(req) + images = self._image_service.index(context, filters) images = common.limited(images, req) builder = self.get_builder(req).build return dict(images=[builder(image, detail=False) for image in images]) @@ -70,11 +73,26 @@ class Controller(common.OpenstackController): :param req: `wsgi.Request` object. """ context = req.environ['nova.context'] - images = self._image_service.detail(context) + filters = self._get_filters(req) + images = self._image_service.detail(context, filters) images = common.limited(images, req) builder = self.get_builder(req).build return dict(images=[builder(image, detail=True) for image in images]) + def _get_filters(self, req): + """ + Return a dictionary of query param filters from the request + + :param req: the Request object coming from the wsgi layer + :retval a dict of key/value filters + """ + filters = {} + for param in req.str_params: + if param in SUPPORTED_FILTERS or param.startswith('property-'): + filters[param] = req.str_params.get(param) + + return filters + def show(self, req, id): """Return detailed information about a specific image. diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py index 5c10fc916..8e191c232 100644 --- a/nova/api/openstack/servers.py +++ b/nova/api/openstack/servers.py @@ -708,14 +708,16 @@ class ControllerV11(Controller): image_id = common.get_id_from_href(image_ref) personalities = info["rebuild"].get("personality", []) - metadata = info["rebuild"].get("metadata", {}) + metadata = info["rebuild"].get("metadata") + name = info["rebuild"].get("name") - self._validate_metadata(metadata) + if metadata: + self._validate_metadata(metadata) self._decode_personalities(personalities) try: - self.compute_api.rebuild(context, instance_id, image_id, metadata, - personalities) + self.compute_api.rebuild(context, instance_id, image_id, name, + metadata, personalities) except exception.BuildInProgress: msg = _("Instance %d is currently being rebuilt.") % instance_id LOG.debug(msg) diff --git a/nova/auth/novarc.template b/nova/auth/novarc.template index cda2ecc28..8170fcafe 100644 --- a/nova/auth/novarc.template +++ b/nova/auth/novarc.template @@ -1,4 +1,4 @@ -NOVA_KEY_DIR=$(pushd $(dirname $BASH_SOURCE)>/dev/null; pwd; popd>/dev/null) +NOVA_KEY_DIR=$(dirname $(readlink -f ${BASH_SOURCE})) export EC2_ACCESS_KEY="%(access)s:%(project)s" export EC2_SECRET_KEY="%(secret)s" export EC2_URL="%(ec2)s" diff --git a/nova/compute/api.py b/nova/compute/api.py index 660c7666f..1455e2de9 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -540,7 +540,7 @@ class API(base.Base): """Reboot the given instance.""" self._cast_compute_message('reboot_instance', context, instance_id) - def rebuild(self, context, instance_id, image_id, metadata=None, + def rebuild(self, context, instance_id, image_id, name=None, metadata=None, files_to_inject=None): """Rebuild the given instance with the provided metadata.""" instance = db.api.instance_get(context, instance_id) @@ -549,13 +549,16 @@ class API(base.Base): msg = _("Instance already building") raise exception.BuildInProgress(msg) - metadata = metadata or {} - self._check_metadata_properties_quota(context, metadata) - files_to_inject = files_to_inject or [] self._check_injected_file_quota(context, files_to_inject) - self.db.instance_update(context, instance_id, {"metadata": metadata}) + values = {} + if metadata is not None: + self._check_metadata_properties_quota(context, metadata) + values['metadata'] = metadata + if name is not None: + values['display_name'] = name + self.db.instance_update(context, instance_id, values) rebuild_params = { "image_id": image_id, diff --git a/nova/compute/manager.py b/nova/compute/manager.py index ff7aeb053..a2c66a957 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -336,7 +336,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock - def rebuild_instance(self, context, instance_id, image_id): + def rebuild_instance(self, context, instance_id, **kwargs): """Destroy and re-make this instance. A 'rebuild' effectively purges all existing data from the system and @@ -354,7 +354,8 @@ class ComputeManager(manager.SchedulerDependentManager): self._update_state(context, instance_id, power_state.BUILDING) self.driver.destroy(instance_ref) - instance_ref.image_id = image_id + instance_ref.image_id = kwargs.get('image_id') + instance_ref.injected_files = kwargs.get('injected_files', []) self.driver.spawn(instance_ref) self._update_image_id(context, instance_id, image_id) diff --git a/nova/db/api.py b/nova/db/api.py index f5c71019a..798b2881c 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -47,6 +47,8 @@ flags.DEFINE_string('instance_name_template', 'instance-%08x', 'Template string to be used to generate instance names') flags.DEFINE_string('volume_name_template', 'volume-%08x', 'Template string to be used to generate instance names') +flags.DEFINE_string('snapshot_name_template', 'snapshot-%08x', + 'Template string to be used to generate snapshot names') IMPL = utils.LazyPluggable(FLAGS['db_backend'], @@ -881,6 +883,43 @@ def volume_update(context, volume_id, values): #################### +def snapshot_create(context, values): + """Create a snapshot from the values dictionary.""" + return IMPL.snapshot_create(context, values) + + +def snapshot_destroy(context, snapshot_id): + """Destroy the snapshot or raise if it does not exist.""" + return IMPL.snapshot_destroy(context, snapshot_id) + + +def snapshot_get(context, snapshot_id): + """Get a snapshot or raise if it does not exist.""" + return IMPL.snapshot_get(context, snapshot_id) + + +def snapshot_get_all(context): + """Get all snapshots.""" + return IMPL.snapshot_get_all(context) + + +def snapshot_get_all_by_project(context, project_id): + """Get all snapshots belonging to a project.""" + return IMPL.snapshot_get_all_by_project(context, project_id) + + +def snapshot_update(context, snapshot_id, values): + """Set the given properties on an snapshot and update it. + + Raises NotFound if snapshot does not exist. + + """ + return IMPL.snapshot_update(context, snapshot_id, values) + + +#################### + + def security_group_get_all(context): """Get all security groups.""" return IMPL.security_group_get_all(context) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 5f5d46b86..886dc2a43 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -771,6 +771,15 @@ def fixed_ip_update(context, address, values): ################### +def _metadata_refs(metadata_dict): + metadata_refs = [] + if metadata_dict: + for k, v in metadata_dict.iteritems(): + metadata_ref = models.InstanceMetadata() + metadata_ref['key'] = k + metadata_ref['value'] = v + metadata_refs.append(metadata_ref) + return metadata_refs @require_context @@ -780,15 +789,7 @@ def instance_create(context, values): context - request context object values - dict containing column values. """ - metadata = values.get('metadata') - metadata_refs = [] - if metadata: - for k, v in metadata.iteritems(): - metadata_ref = models.InstanceMetadata() - metadata_ref['key'] = k - metadata_ref['value'] = v - metadata_refs.append(metadata_ref) - values['metadata'] = metadata_refs + values['metadata'] = _metadata_refs(values.get('metadata')) instance_ref = models.Instance() instance_ref.update(values) @@ -1010,6 +1011,11 @@ def instance_set_state(context, instance_id, state, description=None): @require_context def instance_update(context, instance_id, values): session = get_session() + metadata = values.get('metadata') + if metadata is not None: + instance_metadata_delete_all(context, instance_id) + instance_metadata_update_or_create(context, instance_id, + values.pop('metadata')) with session.begin(): instance_ref = instance_get(context, instance_id, session=session) instance_ref.update(values) @@ -1790,6 +1796,82 @@ def volume_update(context, volume_id, values): @require_context +def snapshot_create(context, values): + snapshot_ref = models.Snapshot() + snapshot_ref.update(values) + + session = get_session() + with session.begin(): + snapshot_ref.save(session=session) + return snapshot_ref + + +@require_admin_context +def snapshot_destroy(context, snapshot_id): + session = get_session() + with session.begin(): + session.query(models.Snapshot).\ + filter_by(id=snapshot_id).\ + update({'deleted': 1, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) + + +@require_context +def snapshot_get(context, snapshot_id, session=None): + if not session: + session = get_session() + result = None + + if is_admin_context(context): + result = session.query(models.Snapshot).\ + filter_by(id=snapshot_id).\ + filter_by(deleted=can_read_deleted(context)).\ + first() + elif is_user_context(context): + result = session.query(models.Snapshot).\ + filter_by(project_id=context.project_id).\ + filter_by(id=snapshot_id).\ + filter_by(deleted=False).\ + first() + if not result: + raise exception.SnapshotNotFound(snapshot_id=snapshot_id) + + return result + + +@require_admin_context +def snapshot_get_all(context): + session = get_session() + return session.query(models.Snapshot).\ + filter_by(deleted=can_read_deleted(context)).\ + all() + + +@require_context +def snapshot_get_all_by_project(context, project_id): + authorize_project_context(context, project_id) + + session = get_session() + return session.query(models.Snapshot).\ + filter_by(project_id=project_id).\ + filter_by(deleted=can_read_deleted(context)).\ + all() + + +@require_context +def snapshot_update(context, snapshot_id, values): + session = get_session() + with session.begin(): + snapshot_ref = snapshot_get(context, snapshot_id, session=session) + snapshot_ref.update(values) + snapshot_ref.save(session=session) + + +################### + + +@require_context def security_group_get_all(context): session = get_session() return session.query(models.SecurityGroup).\ @@ -2569,6 +2651,17 @@ def instance_metadata_delete(context, instance_id, key): @require_context +def instance_metadata_delete_all(context, instance_id): + session = get_session() + session.query(models.InstanceMetadata).\ + filter_by(instance_id=instance_id).\ + filter_by(deleted=False).\ + update({'deleted': True, + 'deleted_at': datetime.datetime.utcnow(), + 'updated_at': literal_column('updated_at')}) + + +@require_context def instance_metadata_get_item(context, instance_id, key): session = get_session() @@ -2587,6 +2680,9 @@ def instance_metadata_get_item(context, instance_id, key): @require_context def instance_metadata_update_or_create(context, instance_id, metadata): session = get_session() + + original_metadata = instance_metadata_get(context, instance_id) + meta_ref = None for key, value in metadata.iteritems(): try: @@ -2598,4 +2694,5 @@ def instance_metadata_update_or_create(context, instance_id, metadata): "instance_id": instance_id, "deleted": 0}) meta_ref.save(session=session) + return metadata diff --git a/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py b/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py new file mode 100644 index 000000000..f16d6db56 --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py @@ -0,0 +1,70 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 MORITA Kazutaka. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column, Table, MetaData +from sqlalchemy import Integer, DateTime, Boolean, String + +from nova import log as logging + +meta = MetaData() + +snapshots = Table('snapshots', meta, + Column('created_at', DateTime(timezone=False)), + Column('updated_at', DateTime(timezone=False)), + Column('deleted_at', DateTime(timezone=False)), + Column('deleted', Boolean(create_constraint=True, name=None)), + Column('id', Integer(), primary_key=True, nullable=False), + Column('volume_id', Integer(), nullable=False), + Column('user_id', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('project_id', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('status', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('progress', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('volume_size', Integer()), + Column('scheduled_at', DateTime(timezone=False)), + Column('display_name', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False)), + Column('display_description', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False))) + + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; + # bind migrate_engine to your metadata + meta.bind = migrate_engine + + try: + snapshots.create() + except Exception: + logging.info(repr(snapshots)) + logging.exception('Exception while creating table') + meta.drop_all(tables=[snapshots]) + raise + + +def downgrade(migrate_engine): + # Operations to reverse the above upgrade go here. + snapshots.drop() diff --git a/nova/db/sqlalchemy/migrate_repo/versions/020_add_snapshot_id_to_volumes.py b/nova/db/sqlalchemy/migrate_repo/versions/020_add_snapshot_id_to_volumes.py new file mode 100644 index 000000000..10bd9d5c9 --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/020_add_snapshot_id_to_volumes.py @@ -0,0 +1,47 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 MORITA Kazutaka. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column, Table, MetaData, Integer + +from nova import log as logging + + +meta = MetaData() + + +# Table stub-definitions +# Just for the ForeignKey and column creation to succeed, these are not the +# actual definitions of instances or services. +# +volumes = Table('volumes', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + +# +# New Column +# + +snapshot_id = Column('snapshot_id', Integer()) + + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; + # bind migrate_engine to your metadata + meta.bind = migrate_engine + + # Add columns to existing tables + volumes.create_column(snapshot_id) diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index dc10bcf00..e619eec6b 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -287,6 +287,8 @@ class Volume(BASE, NovaBase): user_id = Column(String(255)) project_id = Column(String(255)) + snapshot_id = Column(String(255)) + host = Column(String(255)) # , ForeignKey('hosts.id')) size = Column(Integer) availability_zone = Column(String(255)) # TODO(vish): foreign key? @@ -329,6 +331,31 @@ class Quota(BASE, NovaBase): hard_limit = Column(Integer, nullable=True) +class Snapshot(BASE, NovaBase): + """Represents a block storage device that can be attached to a vm.""" + __tablename__ = 'snapshots' + id = Column(Integer, primary_key=True, autoincrement=True) + + @property + def name(self): + return FLAGS.snapshot_name_template % self.id + + @property + def volume_name(self): + return FLAGS.volume_name_template % self.volume_id + + user_id = Column(String(255)) + project_id = Column(String(255)) + + volume_id = Column(Integer) + status = Column(String(255)) + progress = Column(String(255)) + volume_size = Column(Integer) + + display_name = Column(String(255)) + display_description = Column(String(255)) + + class ExportDevice(BASE, NovaBase): """Represates a shelf and blade that a volume can be exported on.""" __tablename__ = 'export_devices' diff --git a/nova/exception.py b/nova/exception.py index 56c20d111..02c65fd64 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -271,6 +271,14 @@ class VolumeNotFoundForInstance(VolumeNotFound): message = _("Volume not found for instance %(instance_id)s.") +class SnapshotNotFound(NotFound): + message = _("Snapshot %(snapshot_id)s could not be found.") + + +class VolumeIsBusy(Error): + message = _("deleting volume %(volume_name)s that has snapshot") + + class ExportDeviceNotFoundForVolume(NotFound): message = _("No export device found for volume %(volume_id)s.") diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index a7dee8caf..e7e9dab77 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit") EXCHANGES = {} QUEUES = {} +CONSUMERS = {} class Message(base.BaseMessage): @@ -96,17 +97,29 @@ class Backend(base.BaseBackend): ' key %(routing_key)s') % locals()) EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) - def declare_consumer(self, queue, callback, *args, **kwargs): - self.current_queue = queue - self.current_callback = callback + def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): + global CONSUMERS + LOG.debug("Adding consumer %s", consumer_tag) + CONSUMERS[consumer_tag] = (queue, callback) + + def cancel(self, consumer_tag): + global CONSUMERS + LOG.debug("Removing consumer %s", consumer_tag) + del CONSUMERS[consumer_tag] def consume(self, limit=None): + global CONSUMERS + num = 0 while True: - item = self.get(self.current_queue) - if item: - self.current_callback(item) - raise StopIteration() - greenthread.sleep(0) + for (queue, callback) in CONSUMERS.itervalues(): + item = self.get(queue) + if item: + callback(item) + num += 1 + yield + if limit and num == limit: + raise StopIteration() + greenthread.sleep(0.1) def get(self, queue, no_ack=False): global QUEUES @@ -134,5 +147,7 @@ class Backend(base.BaseBackend): def reset_all(): global EXCHANGES global QUEUES + global CONSUMERS EXCHANGES = {} QUEUES = {} + CONSUMERS = {} diff --git a/nova/image/fake.py b/nova/image/fake.py index b400b2adb..8e84c8597 100644 --- a/nova/image/fake.py +++ b/nova/image/fake.py @@ -52,11 +52,11 @@ class FakeImageService(service.BaseImageService): self.create(None, image) super(FakeImageService, self).__init__() - def index(self, context): + def index(self, context, filters=None): """Returns list of images.""" return copy.deepcopy(self.images.values()) - def detail(self, context): + def detail(self, context, filters=None): """Return list of detailed image information.""" return copy.deepcopy(self.images.values()) diff --git a/nova/image/glance.py b/nova/image/glance.py index 193e37273..dec797619 100644 --- a/nova/image/glance.py +++ b/nova/image/glance.py @@ -58,23 +58,23 @@ class GlanceImageService(service.BaseImageService): else: self.client = client - def index(self, context): + def index(self, context, filters=None): """Calls out to Glance for a list of images available.""" # NOTE(sirp): We need to use `get_images_detailed` and not # `get_images` here because we need `is_public` and `properties` # included so we can filter by user filtered = [] - image_metas = self.client.get_images_detailed() + image_metas = self.client.get_images_detailed(filters=filters) for image_meta in image_metas: if self._is_image_available(context, image_meta): meta_subset = utils.subset_dict(image_meta, ('id', 'name')) filtered.append(meta_subset) return filtered - def detail(self, context): + def detail(self, context, filters=None): """Calls out to Glance for a list of detailed image information.""" filtered = [] - image_metas = self.client.get_images_detailed() + image_metas = self.client.get_images_detailed(filters=filters) for image_meta in image_metas: if self._is_image_available(context, image_meta): base_image_meta = self._translate_to_base(image_meta) diff --git a/nova/image/local.py b/nova/image/local.py index 918180bae..677d5302b 100644 --- a/nova/image/local.py +++ b/nova/image/local.py @@ -63,7 +63,7 @@ class LocalImageService(service.BaseImageService): images.append(unhexed_image_id) return images - def index(self, context): + def index(self, context, *args, **kwargs): filtered = [] image_metas = self.detail(context) for image_meta in image_metas: @@ -71,7 +71,7 @@ class LocalImageService(service.BaseImageService): filtered.append(meta) return filtered - def detail(self, context): + def detail(self, context, *args, **kwargs): images = [] for image_id in self._ids(): try: diff --git a/nova/image/service.py b/nova/image/service.py index ab6749049..5361cfc89 100644 --- a/nova/image/service.py +++ b/nova/image/service.py @@ -46,7 +46,7 @@ class BaseImageService(object): # the ImageService subclass SERVICE_IMAGE_ATTRS = [] - def index(self, context): + def index(self, context, *args, **kwargs): """List images. :returns: a sequence of mappings with the following signature @@ -55,7 +55,7 @@ class BaseImageService(object): """ raise NotImplementedError - def detail(self, context): + def detail(self, context, *args, **kwargs): """Detailed information about an images. :returns: a sequence of mappings with the following signature diff --git a/nova/network/vmwareapi_net.py b/nova/network/vmwareapi_net.py index 373060add..04210c011 100644 --- a/nova/network/vmwareapi_net.py +++ b/nova/network/vmwareapi_net.py @@ -30,9 +30,7 @@ LOG = logging.getLogger("nova.network.vmwareapi_net") FLAGS = flags.FLAGS -flags.DEFINE_string('vlan_interface', 'vmnic0', - 'Physical network adapter name in VMware ESX host for ' - 'vlan networking') +FLAGS['vlan_interface'].SetDefault('vmnic0') def ensure_vlan_bridge(vlan_num, bridge, net_attrs=None): diff --git a/nova/rpc.py b/nova/rpc.py index 2116f22c3..c5277c6a9 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -28,12 +28,15 @@ import json import sys import time import traceback +import types import uuid from carrot import connection as carrot_connection from carrot import messaging from eventlet import greenpool -from eventlet import greenthread +from eventlet import pools +from eventlet import queue +import greenlet from nova import context from nova import exception @@ -47,7 +50,10 @@ LOG = logging.getLogger('nova.rpc') FLAGS = flags.FLAGS -flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_thread_pool_size', 1024, + 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_conn_pool_size', 30, + 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): @@ -90,6 +96,22 @@ class Connection(carrot_connection.BrokerConnection): return cls.instance() +class Pool(pools.Pool): + """Class that implements a Pool of Connections.""" + + # TODO(comstud): Timeout connections not used in a while + def create(self): + LOG.debug('Creating new connection') + return Connection.instance(new=True) + +# Create a ConnectionPool to use for RPC calls. We'll order the +# pool as a stack (LIFO), so that we can potentially loop through and +# timeout old unused connections at some point +ConnectionPool = Pool( + max_size=FLAGS.rpc_conn_pool_size, + order_as_stack=True) + + class Consumer(messaging.Consumer): """Consumer base class. @@ -131,7 +153,9 @@ class Consumer(messaging.Consumer): self.connection = Connection.recreate() self.backend = self.connection.create_backend() self.declare() - super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) + return super(Consumer, self).fetch(no_ack, + auto_ack, + enable_callbacks) if self.failed_connection: LOG.error(_('Reconnected to queue')) self.failed_connection = False @@ -159,13 +183,13 @@ class AdapterConsumer(Consumer): self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) super(AdapterConsumer, self).__init__(connection=connection, topic=topic) + self.register_callback(self.process_data) - def receive(self, *args, **kwargs): - self.pool.spawn_n(self._receive, *args, **kwargs) + def process_data(self, message_data, message): + """Consumer callback to call a method on a proxy object. - @exception.wrap_exception - def _receive(self, message_data, message): - """Magically looks for a method on the proxy object and calls it. + Parses the message for validity and fires off a thread to call the + proxy object method. Message data should be a dictionary with two keys: method: string representing the method to call @@ -175,8 +199,8 @@ class AdapterConsumer(Consumer): """ LOG.debug(_('received %s') % message_data) - msg_id = message_data.pop('_msg_id', None) - + # This will be popped off in _unpack_context + msg_id = message_data.get('_msg_id', None) ctxt = _unpack_context(message_data) method = message_data.get('method') @@ -188,8 +212,17 @@ class AdapterConsumer(Consumer): # we just log the message and send an error string # back to the caller LOG.warn(_('no method for message: %s') % message_data) - msg_reply(msg_id, _('No method for message: %s') % message_data) + if msg_id: + msg_reply(msg_id, + _('No method for message: %s') % message_data) return + self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) + + @exception.wrap_exception + def _process_data(self, msg_id, ctxt, method, args): + """Thread that maigcally looks for a method on the proxy + object and calls it. + """ node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) @@ -197,7 +230,18 @@ class AdapterConsumer(Consumer): try: rval = node_func(context=ctxt, **node_args) if msg_id: - msg_reply(msg_id, rval, None) + # Check if the result was a generator + if isinstance(rval, types.GeneratorType): + for x in rval: + msg_reply(msg_id, x, None) + else: + msg_reply(msg_id, rval, None) + + # This final None tells multicall that it is done. + msg_reply(msg_id, None, None) + elif isinstance(rval, types.GeneratorType): + # NOTE(vish): this iterates through the generator + list(rval) except Exception as e: logging.exception('Exception during message handling') if msg_id: @@ -205,11 +249,6 @@ class AdapterConsumer(Consumer): return -class Publisher(messaging.Publisher): - """Publisher base class.""" - pass - - class TopicAdapterConsumer(AdapterConsumer): """Consumes messages on a specific topic.""" @@ -242,6 +281,58 @@ class FanoutAdapterConsumer(AdapterConsumer): topic=topic, proxy=proxy) +class ConsumerSet(object): + """Groups consumers to listen on together on a single connection.""" + + def __init__(self, connection, consumer_list): + self.consumer_list = set(consumer_list) + self.consumer_set = None + self.enabled = True + self.init(connection) + + def init(self, conn): + if not conn: + conn = Connection.instance(new=True) + if self.consumer_set: + self.consumer_set.close() + self.consumer_set = messaging.ConsumerSet(conn) + for consumer in self.consumer_list: + consumer.connection = conn + # consumer.backend is set for us + self.consumer_set.add_consumer(consumer) + + def reconnect(self): + self.init(None) + + def wait(self, limit=None): + running = True + while running: + it = self.consumer_set.iterconsume(limit=limit) + if not it: + break + while True: + try: + it.next() + except StopIteration: + return + except greenlet.GreenletExit: + running = False + break + except Exception as e: + LOG.exception(_("Exception while processing consumer")) + self.reconnect() + # Break to outer loop + break + + def close(self): + self.consumer_set.close() + + +class Publisher(messaging.Publisher): + """Publisher base class.""" + pass + + class TopicPublisher(Publisher): """Publishes messages on a specific topic.""" @@ -306,16 +397,18 @@ def msg_reply(msg_id, reply=None, failure=None): LOG.error(_("Returning exception %s to caller"), message) LOG.error(tb) failure = (failure[0].__name__, str(failure[1]), tb) - conn = Connection.instance() - publisher = DirectPublisher(connection=conn, msg_id=msg_id) - try: - publisher.send({'result': reply, 'failure': failure}) - except TypeError: - publisher.send( - {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure}) - publisher.close() + + with ConnectionPool.item() as conn: + publisher = DirectPublisher(connection=conn, msg_id=msg_id) + try: + publisher.send({'result': reply, 'failure': failure}) + except TypeError: + publisher.send( + {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure}) + + publisher.close() class RemoteError(exception.Error): @@ -347,8 +440,9 @@ def _unpack_context(msg): if key.startswith('_context_'): value = msg.pop(key) context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) LOG.debug(_('unpacked context: %s'), context_dict) - return context.RequestContext.from_dict(context_dict) + return RpcContext.from_dict(context_dict) def _pack_context(msg, context): @@ -360,70 +454,112 @@ def _pack_context(msg, context): for args at some point. """ - context = dict([('_context_%s' % key, value) - for (key, value) in context.to_dict().iteritems()]) - msg.update(context) + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) -def call(context, topic, msg): - """Sends a message on a topic and wait for a response.""" +class RpcContext(context.RequestContext): + def __init__(self, *args, **kwargs): + msg_id = kwargs.pop('msg_id', None) + self.msg_id = msg_id + super(RpcContext, self).__init__(*args, **kwargs) + + def reply(self, *args, **kwargs): + msg_reply(self.msg_id, *args, **kwargs) + + +def multicall(context, topic, msg): + """Make a call that returns multiple times.""" LOG.debug(_('Making asynchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) _pack_context(msg, context) - class WaitMessage(object): - def __call__(self, data, message): - """Acks message and sets result.""" - message.ack() - if data['failure']: - self.result = RemoteError(*data['failure']) - else: - self.result = data['result'] - - wait_msg = WaitMessage() - conn = Connection.instance() - consumer = DirectConsumer(connection=conn, msg_id=msg_id) + con_conn = ConnectionPool.get() + consumer = DirectConsumer(connection=con_conn, msg_id=msg_id) + wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) - conn = Connection.instance() - publisher = TopicPublisher(connection=conn, topic=topic) + publisher = TopicPublisher(connection=con_conn, topic=topic) publisher.send(msg) publisher.close() - try: - consumer.wait(limit=1) - except StopIteration: - pass - consumer.close() - # NOTE(termie): this is a little bit of a change from the original - # non-eventlet code where returning a Failure - # instance from a deferred call is very similar to - # raising an exception - if isinstance(wait_msg.result, Exception): - raise wait_msg.result - return wait_msg.result + return wait_msg + + +class MulticallWaiter(object): + def __init__(self, consumer): + self._consumer = consumer + self._results = queue.Queue() + self._closed = False + + def close(self): + self._closed = True + self._consumer.close() + ConnectionPool.put(self._consumer.connection) + + def __call__(self, data, message): + """Acks message and sets result.""" + message.ack() + if data['failure']: + self._results.put(RemoteError(*data['failure'])) + else: + self._results.put(data['result']) + + def __iter__(self): + return self.wait() + + def wait(self): + while True: + rv = None + while rv is None and not self._closed: + try: + rv = self._consumer.fetch(enable_callbacks=True) + except Exception: + self.close() + raise + time.sleep(0.01) + + result = self._results.get() + if isinstance(result, Exception): + self.close() + raise result + if result == None: + self.close() + raise StopIteration + yield result + + +def call(context, topic, msg): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) _pack_context(msg, context) - conn = Connection.instance() - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() + with ConnectionPool.item() as conn: + publisher = TopicPublisher(connection=conn, topic=topic) + publisher.send(msg) + publisher.close() def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) _pack_context(msg, context) - conn = Connection.instance() - publisher = FanoutPublisher(topic, connection=conn) - publisher.send(msg) - publisher.close() + with ConnectionPool.item() as conn: + publisher = FanoutPublisher(topic, connection=conn) + publisher.send(msg) + publisher.close() def generic_response(message_data, message): @@ -459,6 +595,7 @@ def send_message(topic, message, wait=True): if wait: consumer.wait() + consumer.close() if __name__ == '__main__': diff --git a/nova/service.py b/nova/service.py index ab1238c3b..74f9f04d8 100644 --- a/nova/service.py +++ b/nova/service.py @@ -19,14 +19,11 @@ """Generic Node baseclass for all workers that run on hosts.""" +import greenlet import inspect import os -import sys -import time -from eventlet import event from eventlet import greenthread -from eventlet import greenpool from nova import context from nova import db @@ -91,27 +88,37 @@ class Service(object): if 'nova-compute' == self.binary: self.manager.update_available_resource(ctxt) - conn1 = rpc.Connection.instance(new=True) - conn2 = rpc.Connection.instance(new=True) - conn3 = rpc.Connection.instance(new=True) - if self.report_interval: - consumer_all = rpc.TopicAdapterConsumer( - connection=conn1, - topic=self.topic, - proxy=self) - consumer_node = rpc.TopicAdapterConsumer( - connection=conn2, - topic='%s.%s' % (self.topic, self.host), - proxy=self) - fanout = rpc.FanoutAdapterConsumer( - connection=conn3, - topic=self.topic, - proxy=self) - - self.timers.append(consumer_all.attach_to_eventlet()) - self.timers.append(consumer_node.attach_to_eventlet()) - self.timers.append(fanout.attach_to_eventlet()) + self.conn = rpc.Connection.instance(new=True) + logging.debug("Creating Consumer connection for Service %s" % + self.topic) + + # Share this same connection for these Consumers + consumer_all = rpc.TopicAdapterConsumer( + connection=self.conn, + topic=self.topic, + proxy=self) + consumer_node = rpc.TopicAdapterConsumer( + connection=self.conn, + topic='%s.%s' % (self.topic, self.host), + proxy=self) + fanout = rpc.FanoutAdapterConsumer( + connection=self.conn, + topic=self.topic, + proxy=self) + consumer_set = rpc.ConsumerSet( + connection=self.conn, + consumer_list=[consumer_all, consumer_node, fanout]) + + # Wait forever, processing these consumers + def _wait(): + try: + consumer_set.wait() + finally: + consumer_set.close() + + self.consumer_set_thread = greenthread.spawn(_wait) + if self.report_interval: pulse = utils.LoopingCall(self.report_state) pulse.start(interval=self.report_interval, now=False) self.timers.append(pulse) @@ -174,6 +181,11 @@ class Service(object): logging.warn(_('Service killed that has no database entry')) def stop(self): + self.consumer_set_thread.kill() + try: + self.consumer_set_thread.wait() + except greenlet.GreenletExit: + pass for x in self.timers: try: x.stop() diff --git a/nova/test.py b/nova/test.py index 4deb2a175..80b2d0a74 100644 --- a/nova/test.py +++ b/nova/test.py @@ -31,17 +31,15 @@ import uuid import unittest import mox -import shutil import stubout from eventlet import greenthread -from nova import context -from nova import db from nova import fakerabbit from nova import flags from nova import rpc from nova import service from nova import wsgi +from nova.virt import fake FLAGS = flags.FLAGS @@ -85,6 +83,7 @@ class TestCase(unittest.TestCase): self._monkey_patch_attach() self._monkey_patch_wsgi() self._original_flags = FLAGS.FlagValuesDict() + rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size) def tearDown(self): """Runs after each test method to tear down test environment.""" @@ -99,6 +98,10 @@ class TestCase(unittest.TestCase): if FLAGS.fake_rabbit: fakerabbit.reset_all() + if FLAGS.connection_type == 'fake': + if hasattr(fake.FakeConnection, '_instance'): + del fake.FakeConnection._instance + # Reset any overriden flags self.reset_flags() diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index bf51239e6..8e0156afa 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -166,11 +166,11 @@ def stub_out_glance(stubs, initial_fixtures=None): def __init__(self, initial_fixtures): self.fixtures = initial_fixtures or [] - def fake_get_images(self): + def fake_get_images(self, filters=None): return [dict(id=f['id'], name=f['name']) for f in self.fixtures] - def fake_get_images_detailed(self): + def fake_get_images_detailed(self, filters=None): return copy.deepcopy(self.fixtures) def fake_get_image_meta(self, image_id): diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py index 2c329f920..9f1f28611 100644 --- a/nova/tests/api/openstack/test_images.py +++ b/nova/tests/api/openstack/test_images.py @@ -28,6 +28,7 @@ import shutil import tempfile import xml.dom.minidom as minidom +import mox import stubout import webob @@ -708,6 +709,146 @@ class ImageControllerWithGlanceServiceTest(test.TestCase): self.assertDictListMatch(expected, response_list) + def test_image_filter_with_name(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'name': 'testname'} + image_service.index(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images?name=testname') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.index(request) + mocker.VerifyAll() + + def test_image_filter_with_status(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'status': 'ACTIVE'} + image_service.index(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images?status=ACTIVE') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.index(request) + mocker.VerifyAll() + + def test_image_filter_with_property(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'property-test': '3'} + image_service.index(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images?property-test=3') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.index(request) + mocker.VerifyAll() + + def test_image_filter_not_supported(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'status': 'ACTIVE'} + image_service.index(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images?status=ACTIVE&UNSUPPORTEDFILTER=testname') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.index(request) + mocker.VerifyAll() + + def test_image_no_filters(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {} + image_service.index(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.index(request) + mocker.VerifyAll() + + def test_image_detail_filter_with_name(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'name': 'testname'} + image_service.detail(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images/detail?name=testname') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.detail(request) + mocker.VerifyAll() + + def test_image_detail_filter_with_status(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'status': 'ACTIVE'} + image_service.detail(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images/detail?status=ACTIVE') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.detail(request) + mocker.VerifyAll() + + def test_image_detail_filter_with_property(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'property-test': '3'} + image_service.detail(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images/detail?property-test=3') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.detail(request) + mocker.VerifyAll() + + def test_image_detail_filter_not_supported(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {'status': 'ACTIVE'} + image_service.detail(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images/detail?status=ACTIVE&UNSUPPORTEDFILTER=testname') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.detail(request) + mocker.VerifyAll() + + def test_image_detail_no_filters(self): + mocker = mox.Mox() + image_service = mocker.CreateMockAnything() + context = object() + filters = {} + image_service.detail(context, filters).AndReturn([]) + mocker.ReplayAll() + request = webob.Request.blank( + '/v1.1/images/detail') + request.environ['nova.context'] = context + controller = images.ControllerV11(image_service=image_service) + controller.detail(request) + mocker.VerifyAll() + def test_get_image_found(self): req = webob.Request.blank('/v1.0/images/123') res = req.get_response(fakes.wsgi_app()) diff --git a/nova/tests/image/test_glance.py b/nova/tests/image/test_glance.py index 109905ded..6d108d494 100644 --- a/nova/tests/image/test_glance.py +++ b/nova/tests/image/test_glance.py @@ -34,7 +34,7 @@ class StubGlanceClient(object): def get_image_meta(self, image_id): return self.images[image_id] - def get_images_detailed(self): + def get_images_detailed(self, filters=None): return self.images.itervalues() def get_image(self, image_id): diff --git a/nova/tests/integrated/api/client.py b/nova/tests/integrated/api/client.py index 7e20c9b00..eb9a3056e 100644 --- a/nova/tests/integrated/api/client.py +++ b/nova/tests/integrated/api/client.py @@ -152,7 +152,10 @@ class TestOpenStackClient(object): def _decode_json(self, response): body = response.read() LOG.debug(_("Decoding JSON: %s") % (body)) - return json.loads(body) + if body: + return json.loads(body) + else: + return "" def api_get(self, relative_uri, **kwargs): kwargs.setdefault('check_response_status', [200]) @@ -166,7 +169,7 @@ class TestOpenStackClient(object): headers['Content-Type'] = 'application/json' kwargs['body'] = json.dumps(body) - kwargs.setdefault('check_response_status', [200]) + kwargs.setdefault('check_response_status', [200, 202]) response = self.api_request(relative_uri, **kwargs) return self._decode_json(response) @@ -185,6 +188,9 @@ class TestOpenStackClient(object): def post_server(self, server): return self.api_post('/servers', server)['server'] + def post_server_action(self, server_id, data): + return self.api_post('/servers/%s/action' % server_id, data) + def delete_server(self, server_id): return self.api_delete('/servers/%s' % server_id) diff --git a/nova/tests/integrated/integrated_helpers.py b/nova/tests/integrated/integrated_helpers.py index bc98921f0..7f590441e 100644 --- a/nova/tests/integrated/integrated_helpers.py +++ b/nova/tests/integrated/integrated_helpers.py @@ -154,10 +154,7 @@ class _IntegratedTestBase(test.TestCase): # set up services self.start_service('compute') self.start_service('volume') - # NOTE(justinsb): There's a bug here which is eluding me... - # If we start the network_service, all is good, but then subsequent - # tests fail: CloudTestCase.test_ajax_console in particular. - #self.start_service('network') + self.start_service('network') self.start_service('scheduler') self._start_api_service() diff --git a/nova/tests/integrated/test_servers.py b/nova/tests/integrated/test_servers.py index e89d0100a..a67fa1bb5 100644 --- a/nova/tests/integrated/test_servers.py +++ b/nova/tests/integrated/test_servers.py @@ -179,6 +179,112 @@ class ServersTest(integrated_helpers._IntegratedTestBase): # Cleanup self._delete_server(created_server_id) + def test_create_and_rebuild_server(self): + """Rebuild a server.""" + + # create a server with initially has no metadata + server = self._build_minimal_create_server_request() + server_post = {'server': server} + created_server = self.api.post_server(server_post) + LOG.debug("created_server: %s" % created_server) + self.assertTrue(created_server['id']) + created_server_id = created_server['id'] + + # rebuild the server with metadata + post = {} + post['rebuild'] = { + "imageRef": "https://localhost/v1.1/32278/images/2", + "name": "blah" + } + + self.api.post_server_action(created_server_id, post) + LOG.debug("rebuilt server: %s" % created_server) + self.assertTrue(created_server['id']) + + found_server = self.api.get_server(created_server_id) + self.assertEqual(created_server_id, found_server['id']) + self.assertEqual({}, found_server.get('metadata')) + self.assertEqual('blah', found_server.get('name')) + + # Cleanup + self._delete_server(created_server_id) + + def test_create_and_rebuild_server_with_metadata(self): + """Rebuild a server with metadata.""" + + # create a server with initially has no metadata + server = self._build_minimal_create_server_request() + server_post = {'server': server} + created_server = self.api.post_server(server_post) + LOG.debug("created_server: %s" % created_server) + self.assertTrue(created_server['id']) + created_server_id = created_server['id'] + + # rebuild the server with metadata + post = {} + post['rebuild'] = { + "imageRef": "https://localhost/v1.1/32278/images/2", + "name": "blah" + } + + metadata = {} + for i in range(30): + metadata['key_%s' % i] = 'value_%s' % i + + post['rebuild']['metadata'] = metadata + + self.api.post_server_action(created_server_id, post) + LOG.debug("rebuilt server: %s" % created_server) + self.assertTrue(created_server['id']) + + found_server = self.api.get_server(created_server_id) + self.assertEqual(created_server_id, found_server['id']) + self.assertEqual(metadata, found_server.get('metadata')) + self.assertEqual('blah', found_server.get('name')) + + # Cleanup + self._delete_server(created_server_id) + + def test_create_and_rebuild_server_with_metadata_removal(self): + """Rebuild a server with metadata.""" + + # create a server with initially has no metadata + server = self._build_minimal_create_server_request() + server_post = {'server': server} + + metadata = {} + for i in range(30): + metadata['key_%s' % i] = 'value_%s' % i + + server_post['server']['metadata'] = metadata + + created_server = self.api.post_server(server_post) + LOG.debug("created_server: %s" % created_server) + self.assertTrue(created_server['id']) + created_server_id = created_server['id'] + + # rebuild the server with metadata + post = {} + post['rebuild'] = { + "imageRef": "https://localhost/v1.1/32278/images/2", + "name": "blah" + } + + metadata = {} + post['rebuild']['metadata'] = metadata + + self.api.post_server_action(created_server_id, post) + LOG.debug("rebuilt server: %s" % created_server) + self.assertTrue(created_server['id']) + + found_server = self.api.get_server(created_server_id) + self.assertEqual(created_server_id, found_server['id']) + self.assertEqual(metadata, found_server.get('metadata')) + self.assertEqual('blah', found_server.get('name')) + + # Cleanup + self._delete_server(created_server_id) + if __name__ == "__main__": unittest.main() diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index 54c0454de..55ea6be02 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -17,13 +17,9 @@ # under the License. from base64 import b64decode -import json from M2Crypto import BIO from M2Crypto import RSA import os -import shutil -import tempfile -import time from eventlet import greenthread @@ -33,12 +29,10 @@ from nova import db from nova import flags from nova import log as logging from nova import rpc -from nova import service from nova import test from nova import utils from nova import exception from nova.auth import manager -from nova.compute import power_state from nova.api.ec2 import cloud from nova.api.ec2 import ec2utils from nova.image import local @@ -79,14 +73,21 @@ class CloudTestCase(test.TestCase): self.stubs.Set(local.LocalImageService, 'show', fake_show) self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show) + # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish + rpc_cast = rpc.cast + + def finish_cast(*args, **kwargs): + rpc_cast(*args, **kwargs) + greenthread.sleep(0.2) + + self.stubs.Set(rpc, 'cast', finish_cast) + def tearDown(self): network_ref = db.project_get_network(self.context, self.project.id) db.network_disassociate(self.context, network_ref['id']) self.manager.delete_project(self.project) self.manager.delete_user(self.user) - self.compute.kill() - self.network.kill() super(CloudTestCase, self).tearDown() def _create_key(self, name): @@ -113,7 +114,6 @@ class CloudTestCase(test.TestCase): self.cloud.describe_addresses(self.context) self.cloud.release_address(self.context, public_ip=address) - greenthread.sleep(0.3) db.floating_ip_destroy(self.context, address) def test_associate_disassociate_address(self): @@ -129,12 +129,10 @@ class CloudTestCase(test.TestCase): self.cloud.associate_address(self.context, instance_id=ec2_id, public_ip=address) - greenthread.sleep(0.3) self.cloud.disassociate_address(self.context, public_ip=address) self.cloud.release_address(self.context, public_ip=address) - greenthread.sleep(0.3) self.network.deallocate_fixed_ip(self.context, fixed) db.instance_destroy(self.context, inst['id']) db.floating_ip_destroy(self.context, address) @@ -171,6 +169,25 @@ class CloudTestCase(test.TestCase): db.volume_destroy(self.context, vol1['id']) db.volume_destroy(self.context, vol2['id']) + def test_create_volume_from_snapshot(self): + """Makes sure create_volume works when we specify a snapshot.""" + vol = db.volume_create(self.context, {'size': 1}) + snap = db.snapshot_create(self.context, {'volume_id': vol['id'], + 'volume_size': vol['size'], + 'status': "available"}) + snapshot_id = ec2utils.id_to_ec2_id(snap['id'], 'snap-%08x') + + result = self.cloud.create_volume(self.context, + snapshot_id=snapshot_id) + volume_id = result['volumeId'] + result = self.cloud.describe_volumes(self.context) + self.assertEqual(len(result['volumeSet']), 2) + self.assertEqual(result['volumeSet'][1]['volumeId'], volume_id) + + db.volume_destroy(self.context, ec2utils.ec2_id_to_id(volume_id)) + db.snapshot_destroy(self.context, snap['id']) + db.volume_destroy(self.context, vol['id']) + def test_describe_availability_zones(self): """Makes sure describe_availability_zones works and filters results.""" service1 = db.service_create(self.context, {'host': 'host1_zones', @@ -188,6 +205,52 @@ class CloudTestCase(test.TestCase): db.service_destroy(self.context, service1['id']) db.service_destroy(self.context, service2['id']) + def test_describe_snapshots(self): + """Makes sure describe_snapshots works and filters results.""" + vol = db.volume_create(self.context, {}) + snap1 = db.snapshot_create(self.context, {'volume_id': vol['id']}) + snap2 = db.snapshot_create(self.context, {'volume_id': vol['id']}) + result = self.cloud.describe_snapshots(self.context) + self.assertEqual(len(result['snapshotSet']), 2) + snapshot_id = ec2utils.id_to_ec2_id(snap2['id'], 'snap-%08x') + result = self.cloud.describe_snapshots(self.context, + snapshot_id=[snapshot_id]) + self.assertEqual(len(result['snapshotSet']), 1) + self.assertEqual( + ec2utils.ec2_id_to_id(result['snapshotSet'][0]['snapshotId']), + snap2['id']) + db.snapshot_destroy(self.context, snap1['id']) + db.snapshot_destroy(self.context, snap2['id']) + db.volume_destroy(self.context, vol['id']) + + def test_create_snapshot(self): + """Makes sure create_snapshot works.""" + vol = db.volume_create(self.context, {'status': "available"}) + volume_id = ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x') + + result = self.cloud.create_snapshot(self.context, + volume_id=volume_id) + snapshot_id = result['snapshotId'] + result = self.cloud.describe_snapshots(self.context) + self.assertEqual(len(result['snapshotSet']), 1) + self.assertEqual(result['snapshotSet'][0]['snapshotId'], snapshot_id) + + db.snapshot_destroy(self.context, ec2utils.ec2_id_to_id(snapshot_id)) + db.volume_destroy(self.context, vol['id']) + + def test_delete_snapshot(self): + """Makes sure delete_snapshot works.""" + vol = db.volume_create(self.context, {'status': "available"}) + snap = db.snapshot_create(self.context, {'volume_id': vol['id'], + 'status': "available"}) + snapshot_id = ec2utils.id_to_ec2_id(snap['id'], 'snap-%08x') + + result = self.cloud.delete_snapshot(self.context, + snapshot_id=snapshot_id) + self.assertTrue(result) + + db.volume_destroy(self.context, vol['id']) + def test_describe_instances(self): """Makes sure describe_instances works and filters results.""" inst1 = db.instance_create(self.context, {'reservation_id': 'a', @@ -306,31 +369,25 @@ class CloudTestCase(test.TestCase): 'instance_type': instance_type, 'max_count': max_count} rv = self.cloud.run_instances(self.context, **kwargs) - greenthread.sleep(0.3) instance_id = rv['instancesSet'][0]['instanceId'] output = self.cloud.get_console_output(context=self.context, instance_id=[instance_id]) self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT') # TODO(soren): We need this until we can stop polling in the rpc code # for unit tests. - greenthread.sleep(0.3) rv = self.cloud.terminate_instances(self.context, [instance_id]) - greenthread.sleep(0.3) def test_ajax_console(self): kwargs = {'image_id': 'ami-1'} rv = self.cloud.run_instances(self.context, **kwargs) instance_id = rv['instancesSet'][0]['instanceId'] - greenthread.sleep(0.3) output = self.cloud.get_ajax_console(context=self.context, instance_id=[instance_id]) self.assertEquals(output['url'], '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url) # TODO(soren): We need this until we can stop polling in the rpc code # for unit tests. - greenthread.sleep(0.3) rv = self.cloud.terminate_instances(self.context, [instance_id]) - greenthread.sleep(0.3) def test_key_generation(self): result = self._create_key('test') diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py index 916fca55e..ad73a3a69 100644 --- a/nova/tests/test_quota.py +++ b/nova/tests/test_quota.py @@ -250,6 +250,7 @@ class QuotaTestCase(test.TestCase): volume.API().create, self.context, size=10, + snapshot_id=None, name='', description='') for volume_id in volume_ids: @@ -263,6 +264,7 @@ class QuotaTestCase(test.TestCase): volume.API().create, self.context, size=10, + snapshot_id=None, name='', description='') for volume_id in volume_ids: diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 44d7c91eb..ffd748efe 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -31,7 +31,6 @@ LOG = logging.getLogger('nova.tests.rpc') class RpcTestCase(test.TestCase): - """Test cases for rpc""" def setUp(self): super(RpcTestCase, self).setUp() self.conn = rpc.Connection.instance(True) @@ -43,14 +42,55 @@ class RpcTestCase(test.TestCase): self.context = context.get_admin_context() def test_call_succeed(self): - """Get a value through rpc call""" value = 42 result = rpc.call(self.context, 'test', {"method": "echo", "args": {"value": value}}) self.assertEqual(value, result) + def test_call_succeed_despite_multiple_returns(self): + value = 42 + result = rpc.call(self.context, 'test', {"method": "echo_three_times", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_call_succeed_despite_multiple_returns_yield(self): + value = 42 + result = rpc.call(self.context, 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_multicall_succeed_once(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo", + "args": {"value": value}}) + for i, x in enumerate(result): + if i > 0: + self.fail('should only receive one response') + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times_yield(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + def test_context_passed(self): - """Makes sure a context is passed through rpc call""" + """Makes sure a context is passed through rpc call.""" value = 42 result = rpc.call(self.context, 'test', {"method": "context", @@ -58,11 +98,12 @@ class RpcTestCase(test.TestCase): self.assertEqual(self.context.to_dict(), result) def test_call_exception(self): - """Test that exception gets passed back properly + """Test that exception gets passed back properly. rpc.call returns a RemoteError object. The value of the exception is converted to a string, so we convert it back to an int in the test. + """ value = 42 self.assertRaises(rpc.RemoteError, @@ -81,7 +122,7 @@ class RpcTestCase(test.TestCase): self.assertEqual(int(exc.value), value) def test_nested_calls(self): - """Test that we can do an rpc.call inside another call""" + """Test that we can do an rpc.call inside another call.""" class Nested(object): @staticmethod def echo(context, queue, value): @@ -108,25 +149,80 @@ class RpcTestCase(test.TestCase): "value": value}}) self.assertEqual(value, result) + def test_connectionpool_single(self): + """Test that ConnectionPool recycles a single connection.""" + conn1 = rpc.ConnectionPool.get() + rpc.ConnectionPool.put(conn1) + conn2 = rpc.ConnectionPool.get() + rpc.ConnectionPool.put(conn2) + self.assertEqual(conn1, conn2) + + def test_connectionpool_double(self): + """Test that ConnectionPool returns and reuses separate connections. + + When called consecutively we should get separate connections and upon + returning them those connections should be reused for future calls + before generating a new connection. + + """ + conn1 = rpc.ConnectionPool.get() + conn2 = rpc.ConnectionPool.get() + + self.assertNotEqual(conn1, conn2) + rpc.ConnectionPool.put(conn1) + rpc.ConnectionPool.put(conn2) + + conn3 = rpc.ConnectionPool.get() + conn4 = rpc.ConnectionPool.get() + self.assertEqual(conn1, conn3) + self.assertEqual(conn2, conn4) + + def test_connectionpool_limit(self): + """Test connection pool limit and connection uniqueness.""" + max_size = FLAGS.rpc_conn_pool_size + conns = [] + + for i in xrange(max_size): + conns.append(rpc.ConnectionPool.get()) + + self.assertFalse(rpc.ConnectionPool.free_items) + self.assertEqual(rpc.ConnectionPool.current_size, + rpc.ConnectionPool.max_size) + self.assertEqual(len(set(conns)), max_size) + class TestReceiver(object): - """Simple Proxy class so the consumer has methods to call + """Simple Proxy class so the consumer has methods to call. + + Uses static methods because we aren't actually storing any state. - Uses static methods because we aren't actually storing any state""" + """ @staticmethod def echo(context, value): - """Simply returns whatever value is sent in""" + """Simply returns whatever value is sent in.""" LOG.debug(_("Received %s"), value) return value @staticmethod def context(context, value): - """Returns dictionary version of context""" + """Returns dictionary version of context.""" LOG.debug(_("Received %s"), context) return context.to_dict() @staticmethod + def echo_three_times(context, value): + context.reply(value) + context.reply(value + 1) + context.reply(value + 2) + + @staticmethod + def echo_three_times_yield(context, value): + yield value + yield value + 1 + yield value + 2 + + @staticmethod def fail(context, value): - """Raises an exception with the value sent in""" + """Raises an exception with the value sent in.""" raise Exception(value) diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py index d48de2057..d1cc8bd61 100644 --- a/nova/tests/test_service.py +++ b/nova/tests/test_service.py @@ -106,7 +106,10 @@ class ServiceTestCase(test.TestCase): # NOTE(vish): Create was moved out of mox replay to make sure that # the looping calls are created in StartService. - app = service.Service.create(host=host, binary=binary) + app = service.Service.create(host=host, binary=binary, topic=topic) + + self.mox.StubOutWithMock(service.rpc.Connection, 'instance') + service.rpc.Connection.instance(new=mox.IgnoreArg()) self.mox.StubOutWithMock(rpc, 'TopicAdapterConsumer', @@ -114,6 +117,11 @@ class ServiceTestCase(test.TestCase): self.mox.StubOutWithMock(rpc, 'FanoutAdapterConsumer', use_mock_anything=True) + + self.mox.StubOutWithMock(rpc, + 'ConsumerSet', + use_mock_anything=True) + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), topic=topic, proxy=mox.IsA(service.Service)).AndReturn( @@ -129,9 +137,14 @@ class ServiceTestCase(test.TestCase): proxy=mox.IsA(service.Service)).AndReturn( rpc.FanoutAdapterConsumer) - rpc.TopicAdapterConsumer.attach_to_eventlet() - rpc.TopicAdapterConsumer.attach_to_eventlet() - rpc.FanoutAdapterConsumer.attach_to_eventlet() + def wait_func(self, limit=None): + return None + + mock_cset = self.mox.CreateMock(rpc.ConsumerSet, + {'wait': wait_func}) + rpc.ConsumerSet(connection=mox.IgnoreArg(), + consumer_list=mox.IsA(list)).AndReturn(mock_cset) + wait_func(mox.IgnoreArg()) service_create = {'host': host, 'binary': binary, @@ -287,8 +300,42 @@ class ServiceTestCase(test.TestCase): # Creating mocks self.mox.StubOutWithMock(service.rpc.Connection, 'instance') service.rpc.Connection.instance(new=mox.IgnoreArg()) - service.rpc.Connection.instance(new=mox.IgnoreArg()) - service.rpc.Connection.instance(new=mox.IgnoreArg()) + + self.mox.StubOutWithMock(rpc, + 'TopicAdapterConsumer', + use_mock_anything=True) + self.mox.StubOutWithMock(rpc, + 'FanoutAdapterConsumer', + use_mock_anything=True) + + self.mox.StubOutWithMock(rpc, + 'ConsumerSet', + use_mock_anything=True) + + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), + topic=topic, + proxy=mox.IsA(service.Service)).AndReturn( + rpc.TopicAdapterConsumer) + + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), + topic='%s.%s' % (topic, host), + proxy=mox.IsA(service.Service)).AndReturn( + rpc.TopicAdapterConsumer) + + rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(), + topic=topic, + proxy=mox.IsA(service.Service)).AndReturn( + rpc.FanoutAdapterConsumer) + + def wait_func(self, limit=None): + return None + + mock_cset = self.mox.CreateMock(rpc.ConsumerSet, + {'wait': wait_func}) + rpc.ConsumerSet(connection=mox.IgnoreArg(), + consumer_list=mox.IsA(list)).AndReturn(mock_cset) + wait_func(mox.IgnoreArg()) + self.mox.StubOutWithMock(serv.manager.driver, 'update_available_resource') serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py index 236d12434..4f10ee6af 100644 --- a/nova/tests/test_volume.py +++ b/nova/tests/test_volume.py @@ -45,10 +45,11 @@ class VolumeTestCase(test.TestCase): self.context = context.get_admin_context() @staticmethod - def _create_volume(size='0'): + def _create_volume(size='0', snapshot_id=None): """Create a volume object.""" vol = {} vol['size'] = size + vol['snapshot_id'] = snapshot_id vol['user_id'] = 'fake' vol['project_id'] = 'fake' vol['availability_zone'] = FLAGS.storage_availability_zone @@ -69,6 +70,25 @@ class VolumeTestCase(test.TestCase): self.context, volume_id) + def test_create_volume_from_snapshot(self): + """Test volume can be created from a snapshot.""" + volume_src_id = self._create_volume() + self.volume.create_volume(self.context, volume_src_id) + snapshot_id = self._create_snapshot(volume_src_id) + self.volume.create_snapshot(self.context, volume_src_id, snapshot_id) + volume_dst_id = self._create_volume(0, snapshot_id) + self.volume.create_volume(self.context, volume_dst_id, snapshot_id) + self.assertEqual(volume_dst_id, db.volume_get( + context.get_admin_context(), + volume_dst_id).id) + self.assertEqual(snapshot_id, db.volume_get( + context.get_admin_context(), + volume_dst_id).snapshot_id) + + self.volume.delete_volume(self.context, volume_dst_id) + self.volume.delete_snapshot(self.context, snapshot_id) + self.volume.delete_volume(self.context, volume_src_id) + def test_too_big_volume(self): """Ensure failure if a too large of a volume is requested.""" # FIXME(vish): validation needs to move into the data layer in @@ -176,6 +196,34 @@ class VolumeTestCase(test.TestCase): # This will allow us to test cross-node interactions pass + @staticmethod + def _create_snapshot(volume_id, size='0'): + """Create a snapshot object.""" + snap = {} + snap['volume_size'] = size + snap['user_id'] = 'fake' + snap['project_id'] = 'fake' + snap['volume_id'] = volume_id + snap['status'] = "creating" + return db.snapshot_create(context.get_admin_context(), snap)['id'] + + def test_create_delete_snapshot(self): + """Test snapshot can be created and deleted.""" + volume_id = self._create_volume() + self.volume.create_volume(self.context, volume_id) + snapshot_id = self._create_snapshot(volume_id) + self.volume.create_snapshot(self.context, volume_id, snapshot_id) + self.assertEqual(snapshot_id, + db.snapshot_get(context.get_admin_context(), + snapshot_id).id) + + self.volume.delete_snapshot(self.context, snapshot_id) + self.assertRaises(exception.NotFound, + db.snapshot_get, + self.context, + snapshot_id) + self.volume.delete_volume(self.context, volume_id) + class DriverTestCase(test.TestCase): """Base Test class for Drivers.""" diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index be1e35697..9d56c1644 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -395,6 +395,29 @@ class XenAPIVMTestCase(test.TestCase): os_type="linux") self.check_vm_params_for_linux() + def test_spawn_vhd_glance_swapdisk(self): + # Change the default host_call_plugin to one that'll return + # a swap disk + orig_func = stubs.FakeSessionForVMTests.host_call_plugin + + stubs.FakeSessionForVMTests.host_call_plugin = \ + stubs.FakeSessionForVMTests.host_call_plugin_swap + + try: + # We'll steal the above glance linux test + self.test_spawn_vhd_glance_linux() + finally: + # Make sure to put this back + stubs.FakeSessionForVMTests.host_call_plugin = orig_func + + # We should have 2 VBDs. + self.assertEqual(len(self.vm['VBDs']), 2) + # Now test that we have 1. + self.tearDown() + self.setUp() + self.test_spawn_vhd_glance_linux() + self.assertEqual(len(self.vm['VBDs']), 1) + def test_spawn_vhd_glance_windows(self): FLAGS.xenapi_image_service = 'glance' self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None, @@ -569,11 +592,29 @@ class XenAPIDiffieHellmanTestCase(test.TestCase): bob_shared = self.bob.compute_shared(alice_pub) self.assertEquals(alice_shared, bob_shared) - def test_encryption(self): - msg = "This is a top-secret message" - enc = self.alice.encrypt(msg) + def _test_encryption(self, message): + enc = self.alice.encrypt(message) + self.assertFalse(enc.endswith('\n')) dec = self.bob.decrypt(enc) - self.assertEquals(dec, msg) + self.assertEquals(dec, message) + + def test_encrypt_simple_message(self): + self._test_encryption('This is a simple message.') + + def test_encrypt_message_with_newlines_at_end(self): + self._test_encryption('This message has a newline at the end.\n') + + def test_encrypt_many_newlines_at_end(self): + self._test_encryption('Message with lotsa newlines.\n\n\n') + + def test_encrypt_newlines_inside_message(self): + self._test_encryption('Message\nwith\ninterior\nnewlines.') + + def test_encrypt_with_leading_newlines(self): + self._test_encryption('\n\nMessage with leading newlines.') + + def test_encrypt_really_long_message(self): + self._test_encryption(''.join(['abcd' for i in xrange(1024)])) def tearDown(self): super(XenAPIDiffieHellmanTestCase, self).tearDown() diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 4833ccb07..35308d95f 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -17,6 +17,7 @@ """Stubouts, mocks and fixtures for the test suite""" import eventlet +import json from nova.virt import xenapi_conn from nova.virt.xenapi import fake from nova.virt.xenapi import volume_utils @@ -37,7 +38,7 @@ def stubout_instance_snapshot(stubs): sr_ref=sr_ref, sharable=False) vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_uuid = vdi_rec['uuid'] - return vdi_uuid + return [dict(vdi_type='os', vdi_uuid=vdi_uuid)] stubs.Set(vm_utils.VMHelper, 'fetch_image', fake_fetch_image) @@ -132,11 +133,30 @@ class FakeSessionForVMTests(fake.SessionBase): def __init__(self, uri): super(FakeSessionForVMTests, self).__init__(uri) - def host_call_plugin(self, _1, _2, _3, _4, _5): + def host_call_plugin(self, _1, _2, plugin, method, _5): + sr_ref = fake.get_all('SR')[0] + vdi_ref = fake.create_vdi('', False, sr_ref, False) + vdi_rec = fake.get_record('VDI', vdi_ref) + if plugin == "glance" and method == "download_vhd": + ret_str = json.dumps([dict(vdi_type='os', + vdi_uuid=vdi_rec['uuid'])]) + else: + ret_str = vdi_rec['uuid'] + return '<string>%s</string>' % ret_str + + def host_call_plugin_swap(self, _1, _2, plugin, method, _5): sr_ref = fake.get_all('SR')[0] vdi_ref = fake.create_vdi('', False, sr_ref, False) vdi_rec = fake.get_record('VDI', vdi_ref) - return '<string>%s</string>' % vdi_rec['uuid'] + if plugin == "glance" and method == "download_vhd": + swap_vdi_ref = fake.create_vdi('', False, sr_ref, False) + swap_vdi_rec = fake.get_record('VDI', swap_vdi_ref) + ret_str = json.dumps( + [dict(vdi_type='os', vdi_uuid=vdi_rec['uuid']), + dict(vdi_type='swap', vdi_uuid=swap_vdi_rec['uuid'])]) + else: + ret_str = vdi_rec['uuid'] + return '<string>%s</string>' % ret_str def VM_start(self, _1, ref, _2, _3): vm = fake.get_record('VM', ref) diff --git a/nova/virt/libvirt.xml.template b/nova/virt/libvirt.xml.template index de2497a76..20986d4d5 100644 --- a/nova/virt/libvirt.xml.template +++ b/nova/virt/libvirt.xml.template @@ -116,7 +116,7 @@ </serial> #if $getVar('vncserver_host', False) - <graphics type='vnc' port='-1' autoport='yes' keymap='en-us' listen='${vncserver_host}'/> + <graphics type='vnc' port='-1' autoport='yes' keymap='${vnc_keymap}' listen='${vncserver_host}'/> #end if </devices> </domain> diff --git a/nova/virt/libvirt/connection.py b/nova/virt/libvirt/connection.py index dfeda3814..178ea0048 100644 --- a/nova/virt/libvirt/connection.py +++ b/nova/virt/libvirt/connection.py @@ -962,6 +962,7 @@ class LibvirtConnection(driver.ComputeDriver): if FLAGS.vnc_enabled: if FLAGS.libvirt_type != 'lxc': xml_info['vncserver_host'] = FLAGS.vncserver_host + xml_info['vnc_keymap'] = FLAGS.vnc_keymap if not rescue: if instance['kernel_id']: xml_info['kernel'] = xml_info['basepath'] + "/kernel" diff --git a/nova/virt/vmwareapi/vmops.py b/nova/virt/vmwareapi/vmops.py index c3e79a92f..6d7149841 100644 --- a/nova/virt/vmwareapi/vmops.py +++ b/nova/virt/vmwareapi/vmops.py @@ -590,11 +590,11 @@ class VMWareVMOps(object): def pause(self, instance, callback):
"""Pause a VM instance."""
- raise exception.APIError("pause not supported for vmwareapi")
+ raise exception.ApiError("pause not supported for vmwareapi")
def unpause(self, instance, callback):
"""Un-Pause a VM instance."""
- raise exception.APIError("unpause not supported for vmwareapi")
+ raise exception.ApiError("unpause not supported for vmwareapi")
def suspend(self, instance, callback):
"""Suspend the specified instance."""
@@ -673,7 +673,7 @@ class VMWareVMOps(object): def get_diagnostics(self, instance):
"""Return data about VM diagnostics."""
- raise exception.APIError("get_diagnostics not implemented for "
+ raise exception.ApiError("get_diagnostics not implemented for "
"vmwareapi")
def get_console_output(self, instance):
diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py index e36ef3288..76988b172 100644 --- a/nova/virt/xenapi/fake.py +++ b/nova/virt/xenapi/fake.py @@ -159,7 +159,10 @@ def after_VBD_create(vbd_ref, vbd_rec): vbd_rec['device'] = '' vm_ref = vbd_rec['VM'] vm_rec = _db_content['VM'][vm_ref] - vm_rec['VBDs'] = [vbd_ref] + if vm_rec.get('VBDs', None): + vm_rec['VBDs'].append(vbd_ref) + else: + vm_rec['VBDs'] = [vbd_ref] vm_name_label = _db_content['VM'][vm_ref]['name_label'] vbd_rec['vm_name_label'] = vm_name_label diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 9f6cd608c..06ee8ee9b 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -19,6 +19,7 @@ Helper methods for operations related to the management of VM records and their attributes like VDIs, VIFs, as well as their lookup functions. """ +import json import os import pickle import re @@ -376,6 +377,9 @@ class VMHelper(HelperBase): xenapi_image_service = ['glance', 'objectstore'] glance_address = 'address for glance services' glance_port = 'port for glance services' + + Returns: A single filename if image_type is KERNEL_RAMDISK + A list of dictionaries that describe VDIs, otherwise """ access = AuthManager().get_access_key(user, project) @@ -390,6 +394,10 @@ class VMHelper(HelperBase): @classmethod def _fetch_image_glance_vhd(cls, session, instance_id, image, access, image_type): + """Tell glance to download an image and put the VHDs into the SR + + Returns: A list of dictionaries that describe VDIs + """ LOG.debug(_("Asking xapi to fetch vhd image %(image)s") % locals()) @@ -408,18 +416,26 @@ class VMHelper(HelperBase): kwargs = {'params': pickle.dumps(params)} task = session.async_call_plugin('glance', 'download_vhd', kwargs) - vdi_uuid = session.wait_for_task(task, instance_id) + result = session.wait_for_task(task, instance_id) + # 'download_vhd' will return a json encoded string containing + # a list of dictionaries describing VDIs. The dictionary will + # contain 'vdi_type' and 'vdi_uuid' keys. 'vdi_type' can be + # 'os' or 'swap' right now. + vdis = json.loads(result) + for vdi in vdis: + LOG.debug(_("xapi 'download_vhd' returned VDI of " + "type '%(vdi_type)s' with UUID '%(vdi_uuid)s'" % vdi)) cls.scan_sr(session, instance_id, sr_ref) + # Pull out the UUID of the first VDI + vdi_uuid = vdis[0]['vdi_uuid'] # Set the name-label to ease debugging vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid) - name_label = get_name_label_for_image(image) - session.get_xenapi().VDI.set_name_label(vdi_ref, name_label) + primary_name_label = get_name_label_for_image(image) + session.get_xenapi().VDI.set_name_label(vdi_ref, primary_name_label) - LOG.debug(_("xapi 'download_vhd' returned VDI UUID %(vdi_uuid)s") - % locals()) - return vdi_uuid + return vdis @classmethod def _fetch_image_glance_disk(cls, session, instance_id, image, access, @@ -431,6 +447,8 @@ class VMHelper(HelperBase): plugin; instead, it streams the disks through domU to the VDI directly. + Returns: A single filename if image_type is KERNEL_RAMDISK + A list of dictionaries that describe VDIs, otherwise """ # FIXME(sirp): Since the Glance plugin seems to be required for the # VHD disk, it may be worth using the plugin for both VHD and RAW and @@ -476,7 +494,8 @@ class VMHelper(HelperBase): LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref) return filename else: - return session.get_xenapi().VDI.get_uuid(vdi_ref) + vdi_uuid = session.get_xenapi().VDI.get_uuid(vdi_ref) + return [dict(vdi_type='os', vdi_uuid=vdi_uuid)] @classmethod def determine_disk_image_type(cls, instance): @@ -535,6 +554,11 @@ class VMHelper(HelperBase): @classmethod def _fetch_image_glance(cls, session, instance_id, image, access, image_type): + """Fetch image from glance based on image type. + + Returns: A single filename if image_type is KERNEL_RAMDISK + A list of dictionaries that describe VDIs, otherwise + """ if image_type == ImageType.DISK_VHD: return cls._fetch_image_glance_vhd( session, instance_id, image, access, image_type) @@ -545,6 +569,11 @@ class VMHelper(HelperBase): @classmethod def _fetch_image_objectstore(cls, session, instance_id, image, access, secret, image_type): + """Fetch an image from objectstore. + + Returns: A single filename if image_type is KERNEL_RAMDISK + A list of dictionaries that describe VDIs, otherwise + """ url = images.image_url(image) LOG.debug(_("Asking xapi to fetch %(url)s as %(access)s") % locals()) if image_type == ImageType.KERNEL_RAMDISK: @@ -562,8 +591,10 @@ class VMHelper(HelperBase): if image_type == ImageType.DISK_RAW: args['raw'] = 'true' task = session.async_call_plugin('objectstore', fn, args) - uuid = session.wait_for_task(task, instance_id) - return uuid + uuid_or_fn = session.wait_for_task(task, instance_id) + if image_type != ImageType.KERNEL_RAMDISK: + return [dict(vdi_type='os', vdi_uuid=uuid_or_fn)] + return uuid_or_fn @classmethod def determine_is_pv(cls, session, instance_id, vdi_ref, disk_image_type, diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index be6ef48ea..2b3fb6a39 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -91,7 +91,8 @@ class VMOps(object): def finish_resize(self, instance, disk_info): vdi_uuid = self.link_disks(instance, disk_info['base_copy'], disk_info['cow']) - vm_ref = self._create_vm(instance, vdi_uuid) + vm_ref = self._create_vm(instance, + [dict(vdi_type='os', vdi_uuid=vdi_uuid)]) self.resize_instance(instance, vdi_uuid) self._spawn(instance, vm_ref) @@ -105,24 +106,25 @@ class VMOps(object): LOG.debug(_("Starting instance %s"), instance.name) self._session.call_xenapi('VM.start', vm_ref, False, False) - def _create_disk(self, instance): + def _create_disks(self, instance): user = AuthManager().get_user(instance.user_id) project = AuthManager().get_project(instance.project_id) disk_image_type = VMHelper.determine_disk_image_type(instance) - vdi_uuid = VMHelper.fetch_image(self._session, instance.id, - instance.image_id, user, project, disk_image_type) - return vdi_uuid + vdis = VMHelper.fetch_image(self._session, + instance.id, instance.image_id, user, project, + disk_image_type) + return vdis def spawn(self, instance, network_info=None): - vdi_uuid = self._create_disk(instance) - vm_ref = self._create_vm(instance, vdi_uuid, network_info) + vdis = self._create_disks(instance) + vm_ref = self._create_vm(instance, vdis, network_info) self._spawn(instance, vm_ref) def spawn_rescue(self, instance): """Spawn a rescue instance.""" self.spawn(instance) - def _create_vm(self, instance, vdi_uuid, network_info=None): + def _create_vm(self, instance, vdis, network_info=None): """Create VM instance.""" instance_name = instance.name vm_ref = VMHelper.lookup(self._session, instance_name) @@ -141,28 +143,43 @@ class VMOps(object): user = AuthManager().get_user(instance.user_id) project = AuthManager().get_project(instance.project_id) - # Are we building from a pre-existing disk? - vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid) - disk_image_type = VMHelper.determine_disk_image_type(instance) kernel = None if instance.kernel_id: kernel = VMHelper.fetch_image(self._session, instance.id, - instance.kernel_id, user, project, ImageType.KERNEL_RAMDISK) + instance.kernel_id, user, project, + ImageType.KERNEL_RAMDISK) ramdisk = None if instance.ramdisk_id: ramdisk = VMHelper.fetch_image(self._session, instance.id, - instance.ramdisk_id, user, project, ImageType.KERNEL_RAMDISK) - - use_pv_kernel = VMHelper.determine_is_pv(self._session, instance.id, - vdi_ref, disk_image_type, instance.os_type) - vm_ref = VMHelper.create_vm(self._session, instance, kernel, ramdisk, - use_pv_kernel) - + instance.ramdisk_id, user, project, + ImageType.KERNEL_RAMDISK) + + # Create the VM ref and attach the first disk + first_vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', + vdis[0]['vdi_uuid']) + use_pv_kernel = VMHelper.determine_is_pv(self._session, + instance.id, first_vdi_ref, disk_image_type, + instance.os_type) + vm_ref = VMHelper.create_vm(self._session, instance, + kernel, ramdisk, use_pv_kernel) VMHelper.create_vbd(session=self._session, vm_ref=vm_ref, - vdi_ref=vdi_ref, userdevice=0, bootable=True) + vdi_ref=first_vdi_ref, userdevice=0, bootable=True) + + # Attach any other disks + # userdevice 1 is reserved for rescue + userdevice = 2 + for vdi in vdis[1:]: + # vdi['vdi_type'] is either 'os' or 'swap', but we don't + # really care what it is right here. + vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', + vdi['vdi_uuid']) + VMHelper.create_vbd(session=self._session, vm_ref=vm_ref, + vdi_ref=vdi_ref, userdevice=userdevice, + bootable=False) + userdevice += 1 # TODO(tr3buchet) - check to make sure we have network info, otherwise # create it now. This goes away once nova-multi-nic hits. @@ -172,7 +189,7 @@ class VMOps(object): # Alter the image before VM start for, e.g. network injection if FLAGS.xenapi_inject_image: VMHelper.preconfigure_instance(self._session, instance, - vdi_ref, network_info) + first_vdi_ref, network_info) self.create_vifs(vm_ref, network_info) self.inject_network_info(instance, network_info, vm_ref) @@ -1173,26 +1190,22 @@ class SimpleDH(object): mpi = M2Crypto.m2.bn_to_mpi(bn) return mpi - def _run_ssl(self, text, which): - base_cmd = ('openssl enc -aes-128-cbc -a -pass pass:%(shared)s ' - '-nosalt %(dec_flag)s') - if which.lower()[0] == 'd': - dec_flag = ' -d' - else: - dec_flag = '' - shared = self._shared - cmd = base_cmd % locals() - proc = _runproc(cmd) - proc.stdin.write(text + '\n') + def _run_ssl(self, text, extra_args=None): + if not extra_args: + extra_args = '' + cmd = 'enc -aes-128-cbc -A -a -pass pass:%s -nosalt %s' % ( + self._shared, extra_args) + proc = _runproc('openssl %s' % cmd) + proc.stdin.write(text) proc.stdin.close() proc.wait() err = proc.stderr.read() if err: raise RuntimeError(_('OpenSSL error: %s') % err) - return proc.stdout.read().strip('\n') + return proc.stdout.read() def encrypt(self, text): - return self._run_ssl(text, 'enc') + return self._run_ssl(text).strip('\n') def decrypt(self, text): - return self._run_ssl(text, 'dec') + return self._run_ssl(text, '-d') diff --git a/nova/vnc/__init__.py b/nova/vnc/__init__.py index b5b00e44e..859bfd65f 100644 --- a/nova/vnc/__init__.py +++ b/nova/vnc/__init__.py @@ -32,3 +32,5 @@ flags.DEFINE_string('vncserver_host', '0.0.0.0', 'the host interface on which vnc server should listen') flags.DEFINE_bool('vnc_enabled', True, 'enable vnc related features') +flags.DEFINE_string('vnc_keymap', 'en-us', + 'keymap for vnc') diff --git a/nova/volume/api.py b/nova/volume/api.py index 09befb647..5804955f7 100644 --- a/nova/volume/api.py +++ b/nova/volume/api.py @@ -39,7 +39,14 @@ LOG = logging.getLogger('nova.volume') class API(base.Base): """API for interacting with the volume manager.""" - def create(self, context, size, name, description): + def create(self, context, size, snapshot_id, name, description): + if snapshot_id != None: + snapshot = self.get_snapshot(context, snapshot_id) + if snapshot['status'] != "available": + raise exception.ApiError( + _("Snapshot status must be available")) + size = snapshot['volume_size'] + if quota.allowed_volumes(context, 1, size) < 1: pid = context.project_id LOG.warn(_("Quota exceeeded for %(pid)s, tried to create" @@ -51,6 +58,7 @@ class API(base.Base): 'size': size, 'user_id': context.user_id, 'project_id': context.project_id, + 'snapshot_id': snapshot_id, 'availability_zone': FLAGS.storage_availability_zone, 'status': "creating", 'attach_status': "detached", @@ -62,7 +70,8 @@ class API(base.Base): FLAGS.scheduler_topic, {"method": "create_volume", "args": {"topic": FLAGS.volume_topic, - "volume_id": volume['id']}}) + "volume_id": volume['id'], + "snapshot_id": snapshot_id}}) return volume def delete(self, context, volume_id): @@ -90,6 +99,15 @@ class API(base.Base): return self.db.volume_get_all(context) return self.db.volume_get_all_by_project(context, context.project_id) + def get_snapshot(self, context, snapshot_id): + rv = self.db.snapshot_get(context, snapshot_id) + return dict(rv.iteritems()) + + def get_all_snapshots(self, context): + if context.is_admin: + return self.db.snapshot_get_all(context) + return self.db.snapshot_get_all_by_project(context, context.project_id) + def check_attach(self, context, volume_id): volume = self.get(context, volume_id) # TODO(vish): abstract status checking? @@ -110,3 +128,38 @@ class API(base.Base): self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "remove_volume", "args": {'volume_id': volume_id}}) + + def create_snapshot(self, context, volume_id, name, description): + volume = self.get(context, volume_id) + if volume['status'] != "available": + raise exception.ApiError(_("Volume status must be available")) + + options = { + 'volume_id': volume_id, + 'user_id': context.user_id, + 'project_id': context.project_id, + 'status': "creating", + 'progress': '0%', + 'volume_size': volume['size'], + 'display_name': name, + 'display_description': description} + + snapshot = self.db.snapshot_create(context, options) + rpc.cast(context, + FLAGS.scheduler_topic, + {"method": "create_snapshot", + "args": {"topic": FLAGS.volume_topic, + "volume_id": volume_id, + "snapshot_id": snapshot['id']}}) + return snapshot + + def delete_snapshot(self, context, snapshot_id): + snapshot = self.get_snapshot(context, snapshot_id) + if snapshot['status'] != "available": + raise exception.ApiError(_("Snapshot status must be available")) + self.db.snapshot_update(context, snapshot_id, {'status': 'deleting'}) + rpc.cast(context, + FLAGS.scheduler_topic, + {"method": "delete_snapshot", + "args": {"topic": FLAGS.volume_topic, + "snapshot_id": snapshot_id}}) diff --git a/nova/volume/driver.py b/nova/volume/driver.py index 55307ad9b..87e13277f 100644 --- a/nova/volume/driver.py +++ b/nova/volume/driver.py @@ -90,42 +90,97 @@ class VolumeDriver(object): raise exception.Error(_("volume group %s doesn't exist") % FLAGS.volume_group) - def create_volume(self, volume): - """Creates a logical volume. Can optionally return a Dictionary of - changes to the volume object to be persisted.""" - if int(volume['size']) == 0: - sizestr = '100M' - else: - sizestr = '%sG' % volume['size'] + def _create_volume(self, volume_name, sizestr): self._try_execute('sudo', 'lvcreate', '-L', sizestr, '-n', - volume['name'], - FLAGS.volume_group) + volume_name, FLAGS.volume_group) - def delete_volume(self, volume): - """Deletes a logical volume.""" + def _copy_volume(self, srcstr, deststr, size_in_g): + self._execute('sudo', 'dd', 'if=%s' % srcstr, 'of=%s' % deststr, + 'count=%d' % (size_in_g * 1024), 'bs=1M') + + def _volume_not_present(self, volume_name): + path_name = '%s/%s' % (FLAGS.volume_group, volume_name) try: - self._try_execute('sudo', 'lvdisplay', - '%s/%s' % - (FLAGS.volume_group, - volume['name'])) + self._try_execute('sudo', 'lvdisplay', path_name) except Exception as e: - # If the volume isn't present, then don't attempt to delete + # If the volume isn't present return True + return False + def _delete_volume(self, volume, size_in_g): + """Deletes a logical volume.""" # zero out old volumes to prevent data leaking between users # TODO(ja): reclaiming space should be done lazy and low priority - self._execute('sudo', 'dd', 'if=/dev/zero', - 'of=%s' % self.local_path(volume), - 'count=%d' % (volume['size'] * 1024), - 'bs=1M') + self._copy_volume('/dev/zero', self.local_path(volume), size_in_g) self._try_execute('sudo', 'lvremove', '-f', "%s/%s" % (FLAGS.volume_group, - volume['name'])) + self._escape_snapshot(volume['name']))) + + def _sizestr(self, size_in_g): + if int(size_in_g) == 0: + return '100M' + return '%sG' % size_in_g + + # Linux LVM reserves name that starts with snapshot, so that + # such volume name can't be created. Mangle it. + def _escape_snapshot(self, snapshot_name): + if not snapshot_name.startswith('snapshot'): + return snapshot_name + return '_' + snapshot_name + + def create_volume(self, volume): + """Creates a logical volume. Can optionally return a Dictionary of + changes to the volume object to be persisted.""" + self._create_volume(volume['name'], self._sizestr(volume['size'])) + + def create_volume_from_snapshot(self, volume, snapshot): + """Creates a volume from a snapshot.""" + self._create_volume(volume['name'], self._sizestr(volume['size'])) + self._copy_volume(self.local_path(snapshot), self.local_path(volume), + snapshot['volume_size']) + + def delete_volume(self, volume): + """Deletes a logical volume.""" + if self._volume_not_present(volume['name']): + # If the volume isn't present, then don't attempt to delete + return True + + # TODO(yamahata): lvm can't delete origin volume only without + # deleting derived snapshots. Can we do something fancy? + out, err = self._execute('sudo', 'lvdisplay', '--noheading', + '-C', '-o', 'Attr', + '%s/%s' % (FLAGS.volume_group, + volume['name'])) + # fake_execute returns None resulting unit test error + if out: + out = out.strip() + if (out[0] == 'o') or (out[0] == 'O'): + raise exception.VolumeIsBusy(volume_name=volume['name']) + + self._delete_volume(volume, volume['size']) + + def create_snapshot(self, snapshot): + """Creates a snapshot.""" + orig_lv_name = "%s/%s" % (FLAGS.volume_group, snapshot['volume_name']) + self._try_execute('sudo', 'lvcreate', '-L', + self._sizestr(snapshot['volume_size']), + '--name', self._escape_snapshot(snapshot['name']), + '--snapshot', orig_lv_name) + + def delete_snapshot(self, snapshot): + """Deletes a snapshot.""" + if self._volume_not_present(self._escape_snapshot(snapshot['name'])): + # If the snapshot isn't present, then don't attempt to delete + return True + + # TODO(yamahata): zeroing out the whole snapshot triggers COW. + # it's quite slow. + self._delete_volume(snapshot, snapshot['volume_size']) def local_path(self, volume): # NOTE(vish): stops deprecation warning escaped_group = FLAGS.volume_group.replace('-', '--') - escaped_name = volume['name'].replace('-', '--') + escaped_name = self._escape_snapshot(volume['name']).replace('-', '--') return "/dev/mapper/%s-%s" % (escaped_group, escaped_name) def ensure_export(self, context, volume): @@ -559,6 +614,18 @@ class RBDDriver(VolumeDriver): self._try_execute('rbd', '--pool', FLAGS.rbd_pool, 'rm', volume['name']) + def create_snapshot(self, snapshot): + """Creates an rbd snapshot""" + self._try_execute('rbd', '--pool', FLAGS.rbd_pool, + 'snap', 'create', '--snap', snapshot['name'], + snapshot['volume_name']) + + def delete_snapshot(self, snapshot): + """Deletes an rbd snapshot""" + self._try_execute('rbd', '--pool', FLAGS.rbd_pool, + 'snap', 'rm', '--snap', snapshot['name'], + snapshot['volume_name']) + def local_path(self, volume): """Returns the path of the rbd volume.""" # This is the same as the remote path @@ -600,18 +667,31 @@ class SheepdogDriver(VolumeDriver): def create_volume(self, volume): """Creates a sheepdog volume""" - if int(volume['size']) == 0: - sizestr = '100M' - else: - sizestr = '%sG' % volume['size'] self._try_execute('qemu-img', 'create', "sheepdog:%s" % volume['name'], - sizestr) + self._sizestr(volume['size'])) + + def create_volume_from_snapshot(self, volume, snapshot): + """Creates a sheepdog volume from a snapshot.""" + self._try_execute('qemu-img', 'create', '-b', + "sheepdog:%s:%s" % (snapshot['volume_name'], + snapshot['name']), + "sheepdog:%s" % volume['name']) def delete_volume(self, volume): """Deletes a logical volume""" self._try_execute('collie', 'vdi', 'delete', volume['name']) + def create_snapshot(self, snapshot): + """Creates a sheepdog snapshot""" + self._try_execute('qemu-img', 'snapshot', '-c', snapshot['name'], + "sheepdog:%s" % snapshot['volume_name']) + + def delete_snapshot(self, snapshot): + """Deletes a sheepdog snapshot""" + self._try_execute('collie', 'vdi', 'delete', snapshot['volume_name'], + '-s', snapshot['name']) + def local_path(self, volume): return "sheepdog:%s" % volume['name'] diff --git a/nova/volume/manager.py b/nova/volume/manager.py index 2178389ce..ff53f0701 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -90,7 +90,7 @@ class VolumeManager(manager.SchedulerDependentManager): else: LOG.info(_("volume %s: skipping export"), volume['name']) - def create_volume(self, context, volume_id): + def create_volume(self, context, volume_id, snapshot_id=None): """Creates and exports the volume.""" context = context.elevated() volume_ref = self.db.volume_get(context, volume_id) @@ -108,7 +108,13 @@ class VolumeManager(manager.SchedulerDependentManager): vol_size = volume_ref['size'] LOG.debug(_("volume %(vol_name)s: creating lv of" " size %(vol_size)sG") % locals()) - model_update = self.driver.create_volume(volume_ref) + if snapshot_id == None: + model_update = self.driver.create_volume(volume_ref) + else: + snapshot_ref = self.db.snapshot_get(context, snapshot_id) + model_update = self.driver.create_volume_from_snapshot( + volume_ref, + snapshot_ref) if model_update: self.db.volume_update(context, volume_ref['id'], model_update) @@ -142,6 +148,12 @@ class VolumeManager(manager.SchedulerDependentManager): self.driver.remove_export(context, volume_ref) LOG.debug(_("volume %s: deleting"), volume_ref['name']) self.driver.delete_volume(volume_ref) + except exception.VolumeIsBusy, e: + LOG.debug(_("volume %s: volume is busy"), volume_ref['name']) + self.driver.ensure_export(context, volume_ref) + self.db.volume_update(context, volume_ref['id'], + {'status': 'available'}) + return True except Exception: self.db.volume_update(context, volume_ref['id'], @@ -152,6 +164,49 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.debug(_("volume %s: deleted successfully"), volume_ref['name']) return True + def create_snapshot(self, context, volume_id, snapshot_id): + """Creates and exports the snapshot.""" + context = context.elevated() + snapshot_ref = self.db.snapshot_get(context, snapshot_id) + LOG.info(_("snapshot %s: creating"), snapshot_ref['name']) + + try: + snap_name = snapshot_ref['name'] + LOG.debug(_("snapshot %(snap_name)s: creating") % locals()) + model_update = self.driver.create_snapshot(snapshot_ref) + if model_update: + self.db.snapshot_update(context, snapshot_ref['id'], + model_update) + + except Exception: + self.db.snapshot_update(context, + snapshot_ref['id'], {'status': 'error'}) + raise + + self.db.snapshot_update(context, + snapshot_ref['id'], {'status': 'available', + 'progress': '100%'}) + LOG.debug(_("snapshot %s: created successfully"), snapshot_ref['name']) + return snapshot_id + + def delete_snapshot(self, context, snapshot_id): + """Deletes and unexports snapshot.""" + context = context.elevated() + snapshot_ref = self.db.snapshot_get(context, snapshot_id) + + try: + LOG.debug(_("snapshot %s: deleting"), snapshot_ref['name']) + self.driver.delete_snapshot(snapshot_ref) + except Exception: + self.db.snapshot_update(context, + snapshot_ref['id'], + {'status': 'error_deleting'}) + raise + + self.db.snapshot_destroy(context, snapshot_id) + LOG.debug(_("snapshot %s: deleted successfully"), snapshot_ref['name']) + return True + def setup_compute_volume(self, context, volume_id): """Setup remote volume on compute host. |
