diff options
| author | Soren Hansen <soren.hansen@rackspace.com> | 2010-12-15 11:54:00 +0100 |
|---|---|---|
| committer | Soren Hansen <soren.hansen@rackspace.com> | 2010-12-15 11:54:00 +0100 |
| commit | bb0adb5f3309e9502f1afc34224d7194485f0124 (patch) | |
| tree | 549d7383e62ad2fdbd7c0222f44aa8d3c25b48bb /nova | |
| parent | bf34529e75022451f3833552df0e807139d0e498 (diff) | |
| parent | 0a93a9298dda075b5519e71289d0bac6fb461404 (diff) | |
Merge trunk
Diffstat (limited to 'nova')
29 files changed, 1328 insertions, 788 deletions
diff --git a/nova/api/cloud.py b/nova/api/cloud.py deleted file mode 100644 index b8f15019f..000000000 --- a/nova/api/cloud.py +++ /dev/null @@ -1,58 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Methods for API calls to control instances via AMQP. -""" - - -from nova import db -from nova import flags -from nova import rpc - -FLAGS = flags.FLAGS - - -def reboot(instance_id, context=None): - """Reboot the given instance.""" - instance_ref = db.instance_get_by_internal_id(context, instance_id) - host = instance_ref['host'] - rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "reboot_instance", - "args": {"instance_id": instance_ref['id']}}) - - -def rescue(instance_id, context): - """Rescue the given instance.""" - instance_ref = db.instance_get_by_internal_id(context, instance_id) - host = instance_ref['host'] - rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "rescue_instance", - "args": {"instance_id": instance_ref['id']}}) - - -def unrescue(instance_id, context): - """Unrescue the given instance.""" - instance_ref = db.instance_get_by_internal_id(context, instance_id) - host = instance_ref['host'] - rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "unrescue_instance", - "args": {"instance_id": instance_ref['id']}}) diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index e50906ae1..ebb13aedc 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -41,8 +41,6 @@ from nova import rpc from nova import utils from nova.compute import api as compute_api from nova.compute import instance_types -from nova.api import cloud -from nova.image.s3 import S3ImageService FLAGS = flags.FLAGS @@ -95,8 +93,9 @@ class CloudController(object): """ def __init__(self): self.network_manager = utils.import_object(FLAGS.network_manager) - self.compute_api = compute_api.ComputeAPI() - self.image_service = S3ImageService() + self.image_service = utils.import_object(FLAGS.image_service) + self.compute_api = compute_api.ComputeAPI(self.network_manager, + self.image_service) self.setup() def __str__(self): @@ -120,7 +119,7 @@ class CloudController(object): def _get_mpi_data(self, context, project_id): result = {} - for instance in db.instance_get_all_by_project(context, project_id): + for instance in self.compute_api.get_instances(context, project_id): if instance['fixed_ip']: line = '%s slots=%d' % (instance['fixed_ip']['address'], instance['vcpus']) @@ -439,7 +438,7 @@ class CloudController(object): # instance_id is passed in as a list of instances ec2_id = instance_id[0] internal_id = ec2_id_to_internal_id(ec2_id) - instance_ref = db.instance_get_by_internal_id(context, internal_id) + instance_ref = self.compute_api.get_instance(context, internal_id) output = rpc.call(context, '%s.%s' % (FLAGS.compute_topic, instance_ref['host']), @@ -451,13 +450,15 @@ class CloudController(object): "Timestamp": now, "output": base64.b64encode(output)} - def describe_volumes(self, context, **kwargs): + def describe_volumes(self, context, volume_id=None, **kwargs): if context.user.is_admin(): volumes = db.volume_get_all(context) else: volumes = db.volume_get_all_by_project(context, context.project_id) - volumes = [self._format_volume(context, v) for v in volumes] + # NOTE(vish): volume_id is an optional list of volume ids to filter by. + volumes = [self._format_volume(context, v) for v in volumes + if volume_id is None or v['ec2_id'] in volume_id] return {'volumeSet': volumes} @@ -536,7 +537,7 @@ class CloudController(object): if volume_ref['attach_status'] == "attached": raise exception.ApiError("Volume is already attached") internal_id = ec2_id_to_internal_id(instance_id) - instance_ref = db.instance_get_by_internal_id(context, internal_id) + instance_ref = self.compute_api.get_instance(context, internal_id) host = instance_ref['host'] rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, host), @@ -614,11 +615,7 @@ class CloudController(object): instances = db.instance_get_all_by_reservation(context, reservation_id) else: - if context.user.is_admin(): - instances = db.instance_get_all(context) - else: - instances = db.instance_get_all_by_project(context, - context.project_id) + instances = self.compute_api.get_instances(context) for instance in instances: if not context.user.is_admin(): if instance['image_id'] == FLAGS.vpn_image_id: @@ -715,7 +712,7 @@ class CloudController(object): def associate_address(self, context, instance_id, public_ip, **kwargs): internal_id = ec2_id_to_internal_id(instance_id) - instance_ref = db.instance_get_by_internal_id(context, internal_id) + instance_ref = self.compute_api.get_instance(context, internal_id) fixed_address = db.instance_get_fixed_address(context, instance_ref['id']) floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) @@ -751,16 +748,13 @@ class CloudController(object): max_count = int(kwargs.get('max_count', 1)) instances = self.compute_api.create_instances(context, instance_types.get_by_type(kwargs.get('instance_type', None)), - self.image_service, kwargs['image_id'], - self._get_network_topic(context), min_count=int(kwargs.get('min_count', max_count)), max_count=max_count, kernel_id=kwargs.get('kernel_id'), ramdisk_id=kwargs.get('ramdisk_id'), - name=kwargs.get('display_name'), + display_name=kwargs.get('display_name'), description=kwargs.get('display_description'), - user_data=kwargs.get('user_data', ''), key_name=kwargs.get('key_name'), security_group=kwargs.get('security_group'), generate_hostname=internal_id_to_ec2_id) @@ -769,84 +763,30 @@ class CloudController(object): def terminate_instances(self, context, instance_id, **kwargs): """Terminate each instance in instance_id, which is a list of ec2 ids. - - instance_id is a kwarg so its name cannot be modified. - """ - ec2_id_list = instance_id + instance_id is a kwarg so its name cannot be modified.""" logging.debug("Going to start terminating instances") - for id_str in ec2_id_list: - internal_id = ec2_id_to_internal_id(id_str) - logging.debug("Going to try and terminate %s" % id_str) - try: - instance_ref = db.instance_get_by_internal_id(context, - internal_id) - except exception.NotFound: - logging.warning("Instance %s was not found during terminate", - id_str) - continue - - if (instance_ref['state_description'] == 'terminating'): - logging.warning("Instance %s is already being terminated", - id_str) - continue - now = datetime.datetime.utcnow() - self.compute_api.update_instance(context, - instance_ref['id'], - state_description='terminating', - state=0, - terminated_at=now) - - # FIXME(ja): where should network deallocate occur? - address = db.instance_get_floating_address(context, - instance_ref['id']) - if address: - logging.debug("Disassociating address %s" % address) - # NOTE(vish): Right now we don't really care if the ip is - # disassociated. We may need to worry about - # checking this later. Perhaps in the scheduler? - network_topic = self._get_network_topic(context) - rpc.cast(context, - network_topic, - {"method": "disassociate_floating_ip", - "args": {"floating_address": address}}) - - address = db.instance_get_fixed_address(context, - instance_ref['id']) - if address: - logging.debug("Deallocating address %s" % address) - # NOTE(vish): Currently, nothing needs to be done on the - # network node until release. If this changes, - # we will need to cast here. - self.network_manager.deallocate_fixed_ip(context.elevated(), - address) - - host = instance_ref['host'] - if host: - rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "terminate_instance", - "args": {"instance_id": instance_ref['id']}}) - else: - db.instance_destroy(context, instance_ref['id']) + for ec2_id in instance_id: + internal_id = ec2_id_to_internal_id(ec2_id) + self.compute_api.delete_instance(context, internal_id) return True def reboot_instances(self, context, instance_id, **kwargs): """instance_id is a list of instance ids""" for ec2_id in instance_id: internal_id = ec2_id_to_internal_id(ec2_id) - cloud.reboot(internal_id, context=context) + self.compute_api.reboot(context, internal_id) return True def rescue_instance(self, context, instance_id, **kwargs): """This is an extension to the normal ec2_api""" internal_id = ec2_id_to_internal_id(instance_id) - cloud.rescue(internal_id, context=context) + self.compute_api.rescue(context, internal_id) return True def unrescue_instance(self, context, instance_id, **kwargs): """This is an extension to the normal ec2_api""" internal_id = ec2_id_to_internal_id(instance_id) - cloud.unrescue(internal_id, context=context) + self.compute_api.unrescue(context, internal_id) return True def update_instance(self, context, ec2_id, **kwargs): @@ -857,7 +797,7 @@ class CloudController(object): changes[field] = kwargs[field] if changes: internal_id = ec2_id_to_internal_id(ec2_id) - inst = db.instance_get_by_internal_id(context, internal_id) + inst = self.compute_api.get_instance(context, internal_id) db.instance_update(context, inst['id'], kwargs) return True diff --git a/nova/api/openstack/__init__.py b/nova/api/openstack/__init__.py index 4ca108c4e..b9ecbd9b8 100644 --- a/nova/api/openstack/__init__.py +++ b/nova/api/openstack/__init__.py @@ -30,6 +30,7 @@ import webob.dec import webob.exc import webob +from nova import context from nova import flags from nova import utils from nova import wsgi @@ -48,6 +49,10 @@ flags.DEFINE_string('nova_api_auth', 'nova.api.openstack.auth.BasicApiAuthManager', 'The auth mechanism to use for the OpenStack API implemenation') +flags.DEFINE_bool('allow_admin_api', + False, + 'When True, this API service will accept admin operations.') + class API(wsgi.Middleware): """WSGI entry point for all OpenStack API requests.""" @@ -84,9 +89,7 @@ class AuthMiddleware(wsgi.Middleware): if not user: return faults.Fault(webob.exc.HTTPUnauthorized()) - if 'nova.context' not in req.environ: - req.environ['nova.context'] = {} - req.environ['nova.context']['user'] = user + req.environ['nova.context'] = context.RequestContext(user, user) return self.application @@ -121,12 +124,12 @@ class RateLimitingMiddleware(wsgi.Middleware): If the request should be rate limited, return a 413 status with a Retry-After header giving the time when the request would succeed. """ - user_id = req.environ['nova.context']['user']['id'] action_name = self.get_action_name(req) if not action_name: # Not rate limited return self.application - delay = self.get_delay(action_name, user_id) + delay = self.get_delay(action_name, + req.environ['nova.context'].user_id) if delay: # TODO(gundlach): Get the retry-after format correct. exc = webob.exc.HTTPRequestEntityTooLarge( @@ -183,6 +186,10 @@ class APIRouter(wsgi.Router): mapper.resource("sharedipgroup", "sharedipgroups", controller=sharedipgroups.Controller()) + if FLAGS.allow_admin_api: + logging.debug("Including admin operations in API.") + # TODO: Place routes for admin operations here. + super(APIRouter, self).__init__(mapper) diff --git a/nova/api/openstack/auth.py b/nova/api/openstack/auth.py index 205035915..fcda97ab1 100644 --- a/nova/api/openstack/auth.py +++ b/nova/api/openstack/auth.py @@ -74,9 +74,7 @@ class BasicApiAuthManager(object): if delta.days >= 2: self.db.auth_destroy_token(self.context, token) else: - #TODO(gundlach): Why not just return dict(id=token.user_id)? - user = self.auth.get_user(token.user_id) - return {'id': user.id} + return self.auth.get_user(token.user_id) return None def _authorize_user(self, username, key, req): diff --git a/nova/api/openstack/images.py b/nova/api/openstack/images.py index cdbdc9bdd..4a0a8e6f1 100644 --- a/nova/api/openstack/images.py +++ b/nova/api/openstack/images.py @@ -17,7 +17,6 @@ from webob import exc -from nova import context from nova import flags from nova import utils from nova import wsgi @@ -47,10 +46,8 @@ class Controller(wsgi.Controller): def detail(self, req): """Return all public images in detail.""" - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) try: - images = self._service.detail(ctxt) + images = self._service.detail(req.environ['nova.context']) images = nova.api.openstack.limited(images, req) except NotImplementedError: # Emulate detail() using repeated calls to show() @@ -61,9 +58,7 @@ class Controller(wsgi.Controller): def show(self, req, id): """Return data about the given image id.""" - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) - return dict(image=self._service.show(ctxt, id)) + return dict(image=self._service.show(req.environ['nova.context'], id)) def delete(self, req, id): # Only public images are supported for now. diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py index e7f765c02..7704f48f1 100644 --- a/nova/api/openstack/servers.py +++ b/nova/api/openstack/servers.py @@ -15,23 +15,16 @@ # License for the specific language governing permissions and limitations # under the License. -import webob from webob import exc -from nova import flags -from nova import rpc -from nova import utils +from nova import exception from nova import wsgi -from nova import context -from nova.api import cloud from nova.api.openstack import faults +from nova.auth import manager as auth_manager from nova.compute import api as compute_api from nova.compute import instance_types from nova.compute import power_state import nova.api.openstack -import nova.image.service - -FLAGS = flags.FLAGS def _entity_list(entities): @@ -79,11 +72,7 @@ class Controller(wsgi.Controller): "server": ["id", "imageId", "name", "flavorId", "hostId", "status", "progress"]}}} - def __init__(self, db_driver=None): - if not db_driver: - db_driver = FLAGS.db_driver - self.db_driver = utils.import_object(db_driver) - self.network_manager = utils.import_object(FLAGS.network_manager) + def __init__(self): self.compute_api = compute_api.ComputeAPI() super(Controller, self).__init__() @@ -100,32 +89,29 @@ class Controller(wsgi.Controller): entity_maker - either _entity_detail or _entity_inst """ - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) - instance_list = self.db_driver.instance_get_all_by_user(ctxt, user_id) + instance_list = self.compute_api.get_instances( + req.environ['nova.context']) limited_list = nova.api.openstack.limited(instance_list, req) res = [entity_maker(inst)['server'] for inst in limited_list] return _entity_list(res) def show(self, req, id): """ Returns server details by server id """ - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) - inst = self.db_driver.instance_get_by_internal_id(ctxt, int(id)) - if inst: - if inst.user_id == user_id: - return _entity_detail(inst) - raise faults.Fault(exc.HTTPNotFound()) + try: + instance = self.compute_api.get_instance( + req.environ['nova.context'], int(id)) + return _entity_detail(instance) + except exception.NotFound: + return faults.Fault(exc.HTTPNotFound()) def delete(self, req, id): """ Destroys a server """ - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) - instance = self.db_driver.instance_get_by_internal_id(ctxt, int(id)) - if instance and instance['user_id'] == user_id: - self.db_driver.instance_destroy(ctxt, id) - return faults.Fault(exc.HTTPAccepted()) - return faults.Fault(exc.HTTPNotFound()) + try: + self.compute_api.delete_instance(req.environ['nova.context'], + int(id)) + except exception.NotFound: + return faults.Fault(exc.HTTPNotFound()) + return exc.HTTPAccepted() def create(self, req): """ Creates a new server for a given user """ @@ -133,15 +119,13 @@ class Controller(wsgi.Controller): if not env: return faults.Fault(exc.HTTPUnprocessableEntity()) - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) - key_pair = self.db_driver.key_pair_get_all_by_user(None, user_id)[0] - instances = self.compute_api.create_instances(ctxt, + key_pair = auth_manager.AuthManager.get_key_pairs( + req.environ['nova.context'])[0] + instances = self.compute_api.create_instances( + req.environ['nova.context'], instance_types.get_by_flavor_id(env['server']['flavorId']), - utils.import_object(FLAGS.image_service), env['server']['imageId'], - self._get_network_topic(ctxt), - name=env['server']['name'], + display_name=env['server']['name'], description=env['server']['name'], key_name=key_pair['name'], key_data=key_pair['public_key']) @@ -149,51 +133,36 @@ class Controller(wsgi.Controller): def update(self, req, id): """ Updates the server name or password """ - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) - inst_dict = self._deserialize(req.body, req) - if not inst_dict: return faults.Fault(exc.HTTPUnprocessableEntity()) - instance = self.db_driver.instance_get_by_internal_id(ctxt, int(id)) - if not instance or instance.user_id != user_id: - return faults.Fault(exc.HTTPNotFound()) - update_dict = {} if 'adminPass' in inst_dict['server']: update_dict['admin_pass'] = inst_dict['server']['adminPass'] if 'name' in inst_dict['server']: update_dict['display_name'] = inst_dict['server']['name'] - self.compute_api.update_instance(ctxt, instance['id'], update_dict) + try: + self.compute_api.update_instance(req.environ['nova.context'], + instance['id'], + **update_dict) + except exception.NotFound: + return faults.Fault(exc.HTTPNotFound()) return exc.HTTPNoContent() def action(self, req, id): - """ multi-purpose method used to reboot, rebuild, and + """ Multi-purpose method used to reboot, rebuild, and resize a server """ - user_id = req.environ['nova.context']['user']['id'] - ctxt = context.RequestContext(user_id, user_id) input_dict = self._deserialize(req.body, req) try: reboot_type = input_dict['reboot']['type'] except Exception: - raise faults.Fault(webob.exc.HTTPNotImplemented()) - inst_ref = self.db.instance_get_by_internal_id(ctxt, int(id)) - if not inst_ref or (inst_ref and not inst_ref.user_id == user_id): + raise faults.Fault(exc.HTTPNotImplemented()) + try: + # TODO(gundlach): pass reboot_type, support soft reboot in + # virt driver + self.compute_api.reboot(req.environ['nova.context'], id) + except: return faults.Fault(exc.HTTPUnprocessableEntity()) - #TODO(gundlach): pass reboot_type, support soft reboot in - #virt driver - cloud.reboot(id) - - def _get_network_topic(self, context): - """Retrieves the network host for a project""" - network_ref = self.network_manager.get_network(context) - host = network_ref['host'] - if not host: - host = rpc.call(context, - FLAGS.network_topic, - {"method": "set_network_host", - "args": {"network_id": network_ref['id']}}) - return self.db_driver.queue_get_for(context, FLAGS.network_topic, host) + return exc.HTTPAccepted() diff --git a/nova/auth/ldapdriver.py b/nova/auth/ldapdriver.py index ceade1d65..c10939d74 100644 --- a/nova/auth/ldapdriver.py +++ b/nova/auth/ldapdriver.py @@ -40,6 +40,8 @@ flags.DEFINE_string('ldap_user_dn', 'cn=Manager,dc=example,dc=com', flags.DEFINE_string('ldap_user_unit', 'Users', 'OID for Users') flags.DEFINE_string('ldap_user_subtree', 'ou=Users,dc=example,dc=com', 'OU for Users') +flags.DEFINE_boolean('ldap_user_modify_only', False, + 'Modify attributes for users instead of creating/deleting') flags.DEFINE_string('ldap_project_subtree', 'ou=Groups,dc=example,dc=com', 'OU for Projects') flags.DEFINE_string('role_project_subtree', 'ou=Groups,dc=example,dc=com', @@ -89,8 +91,7 @@ class LdapDriver(object): def get_user(self, uid): """Retrieve user by id""" - attr = self.__find_object(self.__uid_to_dn(uid), - '(objectclass=novaUser)') + attr = self.__get_ldap_user(uid) return self.__to_user(attr) def get_user_from_access_key(self, access): @@ -110,7 +111,12 @@ class LdapDriver(object): """Retrieve list of users""" attrs = self.__find_objects(FLAGS.ldap_user_subtree, '(objectclass=novaUser)') - return [self.__to_user(attr) for attr in attrs] + users = [] + for attr in attrs: + user = self.__to_user(attr) + if user is not None: + users.append(user) + return users def get_projects(self, uid=None): """Retrieve list of projects""" @@ -125,21 +131,52 @@ class LdapDriver(object): """Create a user""" if self.__user_exists(name): raise exception.Duplicate("LDAP user %s already exists" % name) - attr = [ - ('objectclass', ['person', - 'organizationalPerson', - 'inetOrgPerson', - 'novaUser']), - ('ou', [FLAGS.ldap_user_unit]), - ('uid', [name]), - ('sn', [name]), - ('cn', [name]), - ('secretKey', [secret_key]), - ('accessKey', [access_key]), - ('isAdmin', [str(is_admin).upper()]), - ] - self.conn.add_s(self.__uid_to_dn(name), attr) - return self.__to_user(dict(attr)) + if FLAGS.ldap_user_modify_only: + if self.__ldap_user_exists(name): + # Retrieve user by name + user = self.__get_ldap_user(name) + # Entry could be malformed, test for missing attrs. + # Malformed entries are useless, replace attributes found. + attr = [] + if 'secretKey' in user.keys(): + attr.append((self.ldap.MOD_REPLACE, 'secretKey', \ + [secret_key])) + else: + attr.append((self.ldap.MOD_ADD, 'secretKey', \ + [secret_key])) + if 'accessKey' in user.keys(): + attr.append((self.ldap.MOD_REPLACE, 'accessKey', \ + [access_key])) + else: + attr.append((self.ldap.MOD_ADD, 'accessKey', \ + [access_key])) + if 'isAdmin' in user.keys(): + attr.append((self.ldap.MOD_REPLACE, 'isAdmin', \ + [str(is_admin).upper()])) + else: + attr.append((self.ldap.MOD_ADD, 'isAdmin', \ + [str(is_admin).upper()])) + self.conn.modify_s(self.__uid_to_dn(name), attr) + return self.get_user(name) + else: + raise exception.NotFound("LDAP object for %s doesn't exist" + % name) + else: + attr = [ + ('objectclass', ['person', + 'organizationalPerson', + 'inetOrgPerson', + 'novaUser']), + ('ou', [FLAGS.ldap_user_unit]), + ('uid', [name]), + ('sn', [name]), + ('cn', [name]), + ('secretKey', [secret_key]), + ('accessKey', [access_key]), + ('isAdmin', [str(is_admin).upper()]), + ] + self.conn.add_s(self.__uid_to_dn(name), attr) + return self.__to_user(dict(attr)) def create_project(self, name, manager_uid, description=None, member_uids=None): @@ -155,7 +192,7 @@ class LdapDriver(object): if description is None: description = name members = [] - if member_uids != None: + if member_uids is not None: for member_uid in member_uids: if not self.__user_exists(member_uid): raise exception.NotFound("Project can't be created " @@ -256,7 +293,24 @@ class LdapDriver(object): if not self.__user_exists(uid): raise exception.NotFound("User %s doesn't exist" % uid) self.__remove_from_all(uid) - self.conn.delete_s(self.__uid_to_dn(uid)) + if FLAGS.ldap_user_modify_only: + # Delete attributes + attr = [] + # Retrieve user by name + user = self.__get_ldap_user(uid) + if 'secretKey' in user.keys(): + attr.append((self.ldap.MOD_DELETE, 'secretKey', \ + user['secretKey'])) + if 'accessKey' in user.keys(): + attr.append((self.ldap.MOD_DELETE, 'accessKey', \ + user['accessKey'])) + if 'isAdmin' in user.keys(): + attr.append((self.ldap.MOD_DELETE, 'isAdmin', \ + user['isAdmin'])) + self.conn.modify_s(self.__uid_to_dn(uid), attr) + else: + # Delete entry + self.conn.delete_s(self.__uid_to_dn(uid)) def delete_project(self, project_id): """Delete a project""" @@ -265,7 +319,7 @@ class LdapDriver(object): self.__delete_group(project_dn) def modify_user(self, uid, access_key=None, secret_key=None, admin=None): - """Modify an existing project""" + """Modify an existing user""" if not access_key and not secret_key and admin is None: return attr = [] @@ -279,11 +333,21 @@ class LdapDriver(object): def __user_exists(self, uid): """Check if user exists""" - return self.get_user(uid) != None + return self.get_user(uid) is not None + + def __ldap_user_exists(self, uid): + """Check if the user exists in ldap""" + return self.__get_ldap_user(uid) is not None def __project_exists(self, project_id): """Check if project exists""" - return self.get_project(project_id) != None + return self.get_project(project_id) is not None + + def __get_ldap_user(self, uid): + """Retrieve LDAP user entry by id""" + attr = self.__find_object(self.__uid_to_dn(uid), + '(objectclass=novaUser)') + return attr def __find_object(self, dn, query=None, scope=None): """Find an object by dn and query""" @@ -330,12 +394,12 @@ class LdapDriver(object): def __group_exists(self, dn): """Check if group exists""" - return self.__find_object(dn, '(objectclass=groupOfNames)') != None + return self.__find_object(dn, '(objectclass=groupOfNames)') is not None @staticmethod def __role_to_dn(role, project_id=None): """Convert role to corresponding dn""" - if project_id == None: + if project_id is None: return FLAGS.__getitem__("ldap_%s" % role).value else: return 'cn=%s,cn=%s,%s' % (role, @@ -349,7 +413,7 @@ class LdapDriver(object): raise exception.Duplicate("Group can't be created because " "group %s already exists" % name) members = [] - if member_uids != None: + if member_uids is not None: for member_uid in member_uids: if not self.__user_exists(member_uid): raise exception.NotFound("Group can't be created " @@ -375,7 +439,7 @@ class LdapDriver(object): res = self.__find_object(group_dn, '(member=%s)' % self.__uid_to_dn(uid), self.ldap.SCOPE_BASE) - return res != None + return res is not None def __add_to_group(self, uid, group_dn): """Add user to group""" @@ -447,18 +511,22 @@ class LdapDriver(object): @staticmethod def __to_user(attr): """Convert ldap attributes to User object""" - if attr == None: + if attr is None: + return None + if ('accessKey' in attr.keys() and 'secretKey' in attr.keys() \ + and 'isAdmin' in attr.keys()): + return { + 'id': attr['uid'][0], + 'name': attr['cn'][0], + 'access': attr['accessKey'][0], + 'secret': attr['secretKey'][0], + 'admin': (attr['isAdmin'][0] == 'TRUE')} + else: return None - return { - 'id': attr['uid'][0], - 'name': attr['cn'][0], - 'access': attr['accessKey'][0], - 'secret': attr['secretKey'][0], - 'admin': (attr['isAdmin'][0] == 'TRUE')} def __to_project(self, attr): """Convert ldap attributes to Project object""" - if attr == None: + if attr is None: return None member_dns = attr.get('member', []) return { diff --git a/nova/auth/manager.py b/nova/auth/manager.py index 7b2b68161..11c3bd6df 100644 --- a/nova/auth/manager.py +++ b/nova/auth/manager.py @@ -624,6 +624,10 @@ class AuthManager(object): with self.driver() as drv: drv.modify_user(uid, access_key, secret_key, admin) + @staticmethod + def get_key_pairs(context): + return db.key_pair_get_all_by_user(context.elevated(), context.user_id) + def get_credentials(self, user, project=None): """Get credential zip for user in project""" if not isinstance(user, User): diff --git a/nova/auth/opendj.sh b/nova/auth/opendj.sh new file mode 100755 index 000000000..8052c077d --- /dev/null +++ b/nova/auth/opendj.sh @@ -0,0 +1,119 @@ +#!/usr/bin/env bash +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# LDAP INSTALL SCRIPT - IS IDEMPOTENT, does not scrub users + +apt-get install -y ldap-utils python-ldap openjdk-6-jre + +if [ ! -d "/usr/opendj" ] +then + # TODO(rlane): Wikimedia Foundation is the current package maintainer. + # After the package is included in Ubuntu's channel, change this. + wget http://apt.wikimedia.org/wikimedia/pool/main/o/opendj/opendj_2.4.0-7_amd64.deb + dpkg -i opendj_2.4.0-7_amd64.deb +fi + +abspath=`dirname "$(cd "${0%/*}" 2>/dev/null; echo "$PWD"/"${0##*/}")"` +schemapath='/var/opendj/instance/config/schema' +cp $abspath/openssh-lpk_sun.schema $schemapath/97-openssh-lpk_sun.ldif +cp $abspath/nova_sun.schema $schemapath/98-nova_sun.ldif +chown opendj:opendj $schemapath/97-openssh-lpk_sun.ldif +chown opendj:opendj $schemapath/98-nova_sun.ldif + +cat >/etc/ldap/ldap.conf <<LDAP_CONF_EOF +# LDAP Client Settings +URI ldap://localhost +BASE dc=example,dc=com +BINDDN cn=Directory Manager +SIZELIMIT 0 +TIMELIMIT 0 +LDAP_CONF_EOF + +cat >/etc/ldap/base.ldif <<BASE_LDIF_EOF +# This is the root of the directory tree +dn: dc=example,dc=com +description: Example.Com, your trusted non-existent corporation. +dc: example +o: Example.Com +objectClass: top +objectClass: dcObject +objectClass: organization + +# Subtree for users +dn: ou=Users,dc=example,dc=com +ou: Users +description: Users +objectClass: organizationalUnit + +# Subtree for groups +dn: ou=Groups,dc=example,dc=com +ou: Groups +description: Groups +objectClass: organizationalUnit + +# Subtree for system accounts +dn: ou=System,dc=example,dc=com +ou: System +description: Special accounts used by software applications. +objectClass: organizationalUnit + +# Special Account for Authentication: +dn: uid=authenticate,ou=System,dc=example,dc=com +uid: authenticate +ou: System +description: Special account for authenticating users +userPassword: {MD5}TLnIqASP0CKUR3/LGkEZGg== +objectClass: account +objectClass: simpleSecurityObject + +# create the sysadmin entry + +dn: cn=developers,ou=Groups,dc=example,dc=com +objectclass: groupOfNames +cn: developers +description: IT admin group +member: uid=admin,ou=Users,dc=example,dc=com + +dn: cn=sysadmins,ou=Groups,dc=example,dc=com +objectclass: groupOfNames +cn: sysadmins +description: IT admin group +member: uid=admin,ou=Users,dc=example,dc=com + +dn: cn=netadmins,ou=Groups,dc=example,dc=com +objectclass: groupOfNames +cn: netadmins +description: Network admin group +member: uid=admin,ou=Users,dc=example,dc=com + +dn: cn=cloudadmins,ou=Groups,dc=example,dc=com +objectclass: groupOfNames +cn: cloudadmins +description: Cloud admin group +member: uid=admin,ou=Users,dc=example,dc=com + +dn: cn=itsec,ou=Groups,dc=example,dc=com +objectclass: groupOfNames +cn: itsec +description: IT security users group +member: uid=admin,ou=Users,dc=example,dc=com +BASE_LDIF_EOF + +/etc/init.d/opendj stop +su - opendj -c '/usr/opendj/setup -i -b "dc=example,dc=com" -l /etc/ldap/base.ldif -S -w changeme -O -n --noPropertiesFile' +/etc/init.d/opendj start diff --git a/nova/compute/api.py b/nova/compute/api.py index 929342a1e..8e0efa4cc 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -20,6 +20,7 @@ Handles all API requests relating to instances (guest vms). """ +import datetime import logging import time @@ -43,16 +44,18 @@ def generate_default_hostname(internal_id): class ComputeAPI(base.Base): """API for interacting with the compute manager.""" - def __init__(self, **kwargs): - self.network_manager = utils.import_object(FLAGS.network_manager) + def __init__(self, network_manager=None, image_service=None, **kwargs): + if not network_manager: + network_manager = utils.import_object(FLAGS.network_manager) + self.network_manager = network_manager + if not image_service: + image_service = utils.import_object(FLAGS.image_service) + self.image_service = image_service super(ComputeAPI, self).__init__(**kwargs) - # TODO(eday): network_topic arg should go away once we push network - # allocation into the scheduler or compute worker. - def create_instances(self, context, instance_type, image_service, image_id, - network_topic, min_count=1, max_count=1, - kernel_id=None, ramdisk_id=None, name='', - description='', user_data='', key_name=None, + def create_instances(self, context, instance_type, image_id, min_count=1, + max_count=1, kernel_id=None, ramdisk_id=None, + display_name='', description='', key_name=None, key_data=None, security_group='default', generate_hostname=generate_default_hostname): """Create the number of instances requested if quote and @@ -69,15 +72,15 @@ class ComputeAPI(base.Base): is_vpn = image_id == FLAGS.vpn_image_id if not is_vpn: - image = image_service.show(context, image_id) + image = self.image_service.show(context, image_id) if kernel_id is None: kernel_id = image.get('kernelId', FLAGS.default_kernel) if ramdisk_id is None: ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk) # Make sure we have access to kernel and ramdisk - image_service.show(context, kernel_id) - image_service.show(context, ramdisk_id) + self.image_service.show(context, kernel_id) + self.image_service.show(context, ramdisk_id) if security_group is None: security_group = ['default'] @@ -110,7 +113,7 @@ class ComputeAPI(base.Base): 'memory_mb': type_data['memory_mb'], 'vcpus': type_data['vcpus'], 'local_gb': type_data['local_gb'], - 'display_name': name, + 'display_name': display_name, 'display_description': description, 'key_name': key_name, 'key_data': key_data} @@ -122,14 +125,25 @@ class ComputeAPI(base.Base): instance = dict(mac_address=utils.generate_mac(), launch_index=num, **base_options) - instance_ref = self.create_instance(context, security_groups, - **instance) - instance_id = instance_ref['id'] - internal_id = instance_ref['internal_id'] - hostname = generate_hostname(internal_id) - self.update_instance(context, instance_id, hostname=hostname) - instances.append(dict(id=instance_id, internal_id=internal_id, - hostname=hostname, **instance)) + instance = self.db.instance_create(context, instance) + instance_id = instance['id'] + internal_id = instance['internal_id'] + + elevated = context.elevated() + if not security_groups: + security_groups = [] + for security_group_id in security_groups: + self.db.instance_add_security_group(elevated, + instance_id, + security_group_id) + + # Set sane defaults if not specified + updates = dict(hostname=generate_hostname(internal_id)) + if 'display_name' not in instance: + updates['display_name'] = "Server %s" % internal_id + + instance = self.update_instance(context, instance_id, **updates) + instances.append(instance) # TODO(vish): This probably should be done in the scheduler # or in compute as a call. The network should be @@ -139,12 +153,12 @@ class ComputeAPI(base.Base): instance_id, is_vpn) rpc.cast(elevated, - network_topic, + self._get_network_topic(context), {"method": "setup_fixed_ip", "args": {"address": address}}) - logging.debug("Casting to scheduler for %s/%s's instance %s" % - (context.project_id, context.user_id, instance_id)) + logging.debug("Casting to scheduler for %s/%s's instance %s", + context.project_id, context.user_id, instance_id) rpc.cast(context, FLAGS.scheduler_topic, {"method": "run_instance", @@ -154,6 +168,12 @@ class ComputeAPI(base.Base): return instances def ensure_default_security_group(self, context): + """ Create security group for the security context if it + does not already exist + + :param context: the security context + + """ try: db.security_group_get_by_name(context, context.project_id, 'default') @@ -162,40 +182,7 @@ class ComputeAPI(base.Base): 'description': 'default', 'user_id': context.user_id, 'project_id': context.project_id} - group = db.security_group_create(context, values) - - def create_instance(self, context, security_groups=None, **kwargs): - """Creates the instance in the datastore and returns the - new instance as a mapping - - :param context: The security context - :param security_groups: list of security group ids to - attach to the instance - :param kwargs: All additional keyword args are treated - as data fields of the instance to be - created - - :retval Returns a mapping of the instance information - that has just been created - - """ - instance_ref = self.db.instance_create(context, kwargs) - inst_id = instance_ref['id'] - # Set sane defaults if not specified - if kwargs.get('display_name') is None: - display_name = "Server %s" % instance_ref['internal_id'] - instance_ref['display_name'] = display_name - self.db.instance_update(context, inst_id, - {'display_name': display_name}) - - elevated = context.elevated() - if not security_groups: - security_groups = [] - for security_group_id in security_groups: - self.db.instance_add_security_group(elevated, - inst_id, - security_group_id) - return instance_ref + db.security_group_create(context, values) def update_instance(self, context, instance_id, **kwargs): """Updates the instance in the datastore. @@ -209,4 +196,110 @@ class ComputeAPI(base.Base): :retval None """ - self.db.instance_update(context, instance_id, kwargs) + return self.db.instance_update(context, instance_id, kwargs) + + def delete_instance(self, context, instance_id): + logging.debug("Going to try and terminate %d" % instance_id) + try: + instance = self.db.instance_get_by_internal_id(context, + instance_id) + except exception.NotFound as e: + logging.warning("Instance %d was not found during terminate", + instance_id) + raise e + + if (instance['state_description'] == 'terminating'): + logging.warning("Instance %d is already being terminated", + instance_id) + return + + self.update_instance(context, + instance['id'], + state_description='terminating', + state=0, + terminated_at=datetime.datetime.utcnow()) + + # FIXME(ja): where should network deallocate occur? + address = self.db.instance_get_floating_address(context, + instance['id']) + if address: + logging.debug("Disassociating address %s" % address) + # NOTE(vish): Right now we don't really care if the ip is + # disassociated. We may need to worry about + # checking this later. Perhaps in the scheduler? + rpc.cast(context, + self._get_network_topic(context), + {"method": "disassociate_floating_ip", + "args": {"floating_address": address}}) + + address = self.db.instance_get_fixed_address(context, instance['id']) + if address: + logging.debug("Deallocating address %s" % address) + # NOTE(vish): Currently, nothing needs to be done on the + # network node until release. If this changes, + # we will need to cast here. + self.network_manager.deallocate_fixed_ip(context.elevated(), + address) + + host = instance['host'] + if host: + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "terminate_instance", + "args": {"instance_id": instance['id']}}) + else: + self.db.instance_destroy(context, instance['id']) + + def get_instances(self, context, project_id=None): + """Get all instances, possibly filtered by project ID or + user ID. If there is no filter and the context is an admin, + it will retreive all instances in the system.""" + if project_id or not context.is_admin: + if not context.project: + return self.db.instance_get_all_by_user(context, + context.user_id) + if project_id is None: + project_id = context.project_id + return self.db.instance_get_all_by_project(context, project_id) + return self.db.instance_get_all(context) + + def get_instance(self, context, instance_id): + return self.db.instance_get_by_internal_id(context, instance_id) + + def reboot(self, context, instance_id): + """Reboot the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "reboot_instance", + "args": {"instance_id": instance['id']}}) + + def rescue(self, context, instance_id): + """Rescue the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "rescue_instance", + "args": {"instance_id": instance['id']}}) + + def unrescue(self, context, instance_id): + """Unrescue the given instance.""" + instance = self.db.instance_get_by_internal_id(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "unrescue_instance", + "args": {"instance_id": instance['id']}}) + + def _get_network_topic(self, context): + """Retrieves the network host for a project""" + network_ref = self.network_manager.get_network(context) + host = network_ref['host'] + if not host: + host = rpc.call(context, + FLAGS.network_topic, + {"method": "set_network_host", + "args": {"network_id": network_ref['id']}}) + return self.db.queue_get_for(context, FLAGS.network_topic, host) diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py index a2679e0fc..6e47170bd 100644 --- a/nova/compute/instance_types.py +++ b/nova/compute/instance_types.py @@ -22,6 +22,7 @@ The built-in instance properties. """ from nova import flags +from nova import exception FLAGS = flags.FLAGS INSTANCE_TYPES = { @@ -37,8 +38,7 @@ def get_by_type(instance_type): if instance_type is None: return FLAGS.default_instance_type if instance_type not in INSTANCE_TYPES: - raise exception.ApiError("Unknown instance type: %s", - instance_type) + raise exception.ApiError("Unknown instance type: %s" % instance_type) return instance_type diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 2dc140274..55036d1d1 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -732,6 +732,7 @@ def instance_update(context, instance_id, values): instance_ref = instance_get(context, instance_id, session=session) instance_ref.update(values) instance_ref.save(session=session) + return instance_ref def instance_add_security_group(context, instance_id, security_group_id): diff --git a/nova/flags.py b/nova/flags.py index 1f94feb08..c6578023d 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -259,7 +259,7 @@ DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager', 'Manager for scheduler') # The service to use for image search and retrieval -DEFINE_string('image_service', 'nova.image.local.LocalImageService', +DEFINE_string('image_service', 'nova.image.s3.S3ImageService', 'The service to use for retrieving and searching for images.') DEFINE_string('host', socket.gethostname(), diff --git a/nova/objectstore/image.py b/nova/objectstore/image.py index 7292dbab8..34a90b0a2 100644 --- a/nova/objectstore/image.py +++ b/nova/objectstore/image.py @@ -21,7 +21,6 @@ Take uploaded bucket contents and register them as disk images (AMIs). Requires decryption using keys in the manifest. """ -# TODO(jesse): Got these from Euca2ools, will need to revisit them import binascii import glob @@ -29,7 +28,6 @@ import json import os import shutil import tarfile -import tempfile from xml.etree import ElementTree from nova import exception @@ -199,12 +197,17 @@ class Image(object): except: ramdisk_id = None + try: + arch = manifest.find("machine_configuration/architecture").text + except: + arch = 'x86_64' + info = { 'imageId': image_id, 'imageLocation': image_location, 'imageOwnerId': context.project_id, 'isPublic': False, # FIXME: grab public from manifest - 'architecture': 'x86_64', # FIXME: grab architecture from manifest + 'architecture': arch, 'imageType': image_type} if kernel_id: diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py index aaf213923..8dc87d0e2 100644 --- a/nova/tests/__init__.py +++ b/nova/tests/__init__.py @@ -29,3 +29,8 @@ .. moduleauthor:: Manish Singh <yosh@gimp.org> .. moduleauthor:: Andy Smith <andy@anarkystic.com> """ + +# See http://code.google.com/p/python-nose/issues/detail?id=373 +# The code below enables nosetests to work with i18n _() blocks +import __builtin__ +setattr(__builtin__, '_', lambda x: x) diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index 7c0343942..21b8aac1c 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -24,9 +24,10 @@ import webob import webob.dec from nova import auth -from nova import utils -from nova import flags +from nova import context from nova import exception as exc +from nova import flags +from nova import utils import nova.api.openstack.auth from nova.image import service from nova.image import glance @@ -58,7 +59,7 @@ def fake_auth_init(self): @webob.dec.wsgify def fake_wsgi(self, req): - req.environ['nova.context'] = dict(user=dict(id=1)) + req.environ['nova.context'] = context.RequestContext(1, 1) if req.body: req.environ['inst_dict'] = json.loads(req.body) return self.application @@ -67,8 +68,7 @@ def fake_wsgi(self, req): def stub_out_key_pair_funcs(stubs): def key_pair(context, user_id): return [dict(name='key', public_key='public_key')] - stubs.Set(nova.db.api, 'key_pair_get_all_by_user', - key_pair) + stubs.Set(nova.db, 'key_pair_get_all_by_user', key_pair) def stub_out_image_service(stubs): @@ -172,6 +172,12 @@ class FakeToken(object): setattr(self, k, v) +class FakeRequestContext(object): + def __init__(self, user, project): + self.user_id = 1 + self.project_id = 1 + + class FakeAuthDatabase(object): data = {} diff --git a/nova/tests/api/openstack/test_adminapi.py b/nova/tests/api/openstack/test_adminapi.py new file mode 100644 index 000000000..1b2e1654d --- /dev/null +++ b/nova/tests/api/openstack/test_adminapi.py @@ -0,0 +1,61 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +import stubout +import webob + +import nova.api +from nova import flags +from nova.tests.api.openstack import fakes + +FLAGS = flags.FLAGS + + +class AdminAPITest(unittest.TestCase): + def setUp(self): + self.stubs = stubout.StubOutForTesting() + fakes.FakeAuthManager.auth_data = {} + fakes.FakeAuthDatabase.data = {} + fakes.stub_out_networking(self.stubs) + fakes.stub_out_rate_limiting(self.stubs) + fakes.stub_out_auth(self.stubs) + self.allow_admin = FLAGS.allow_admin_api + + def tearDown(self): + self.stubs.UnsetAll() + FLAGS.allow_admin_api = self.allow_admin + + def test_admin_enabled(self): + FLAGS.allow_admin_api = True + # We should still be able to access public operations. + req = webob.Request.blank('/v1.0/flavors') + res = req.get_response(nova.api.API('os')) + self.assertEqual(res.status_int, 200) + # TODO: Confirm admin operations are available. + + def test_admin_disabled(self): + FLAGS.allow_admin_api = False + # We should still be able to access public operations. + req = webob.Request.blank('/v1.0/flavors') + res = req.get_response(nova.api.API('os')) + self.assertEqual(res.status_int, 200) + # TODO: Confirm admin operations are unavailable. + +if __name__ == '__main__': + unittest.main() diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py index 14e720be4..7b427c2db 100644 --- a/nova/tests/api/openstack/test_auth.py +++ b/nova/tests/api/openstack/test_auth.py @@ -26,6 +26,7 @@ import nova.api import nova.api.openstack.auth import nova.auth.manager from nova import auth +from nova import context from nova.tests.api.openstack import fakes @@ -35,6 +36,7 @@ class Test(unittest.TestCase): self.stubs = stubout.StubOutForTesting() self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager, '__init__', fakes.fake_auth_init) + self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext) fakes.FakeAuthManager.auth_data = {} fakes.FakeAuthDatabase.data = {} fakes.stub_out_rate_limiting(self.stubs) @@ -131,6 +133,7 @@ class TestLimiter(unittest.TestCase): self.stubs = stubout.StubOutForTesting() self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager, '__init__', fakes.fake_auth_init) + self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext) fakes.FakeAuthManager.auth_data = {} fakes.FakeAuthDatabase.data = {} fakes.stub_out_networking(self.stubs) diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py index 44ac8f342..8444b6fce 100644 --- a/nova/tests/api/openstack/test_servers.py +++ b/nova/tests/api/openstack/test_servers.py @@ -47,6 +47,14 @@ def return_security_group(context, instance_id, security_group_id): pass +def instance_update(context, instance_id, kwargs): + return stub_instance(instance_id) + + +def instance_address(context, instance_id): + return None + + def stub_instance(id, user_id=1): return Instance(id=id + 123456, state=0, image_id=10, user_id=user_id, display_name='server%s' % id, internal_id=id) @@ -69,6 +77,11 @@ class ServersTest(unittest.TestCase): return_servers) self.stubs.Set(nova.db.api, 'instance_add_security_group', return_security_group) + self.stubs.Set(nova.db.api, 'instance_update', instance_update) + self.stubs.Set(nova.db.api, 'instance_get_fixed_address', + instance_address) + self.stubs.Set(nova.db.api, 'instance_get_floating_address', + instance_address) def tearDown(self): self.stubs.UnsetAll() @@ -93,11 +106,11 @@ class ServersTest(unittest.TestCase): i += 1 def test_create_instance(self): - def server_update(context, id, params): - pass - def instance_create(context, inst): - return {'id': 1, 'internal_id': 1} + return {'id': 1, 'internal_id': 1, 'display_name': ''} + + def server_update(context, id, params): + return instance_create(context, id) def fake_method(*args, **kwargs): pass diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 9886a2449..770c94219 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -126,6 +126,19 @@ class CloudTestCase(test.TrialTestCase): db.instance_destroy(self.context, inst['id']) db.floating_ip_destroy(self.context, address) + def test_describe_volumes(self): + """Makes sure describe_volumes works and filters results.""" + vol1 = db.volume_create(self.context, {}) + vol2 = db.volume_create(self.context, {}) + result = self.cloud.describe_volumes(self.context) + self.assertEqual(len(result['volumeSet']), 2) + result = self.cloud.describe_volumes(self.context, + volume_id=[vol2['ec2_id']]) + self.assertEqual(len(result['volumeSet']), 1) + self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id']) + db.volume_destroy(self.context, vol1['id']) + db.volume_destroy(self.context, vol2['id']) + def test_console_output(self): image_id = FLAGS.default_image instance_type = FLAGS.default_instance_type diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py index a55449739..6f3ef96cb 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/compute_unittest.py @@ -72,33 +72,27 @@ class ComputeTestCase(test.TrialTestCase): """Verify that an instance cannot be created without a display_name.""" cases = [dict(), dict(display_name=None)] for instance in cases: - ref = self.compute_api.create_instance(self.context, None, - **instance) + ref = self.compute_api.create_instances(self.context, + FLAGS.default_instance_type, None, **instance) try: - self.assertNotEqual(ref.display_name, None) + self.assertNotEqual(ref[0].display_name, None) finally: - db.instance_destroy(self.context, ref['id']) + db.instance_destroy(self.context, ref[0]['id']) def test_create_instance_associates_security_groups(self): - """Make sure create_instance associates security groups""" - inst = {} - inst['user_id'] = self.user.id - inst['project_id'] = self.project.id + """Make sure create_instances associates security groups""" values = {'name': 'default', 'description': 'default', 'user_id': self.user.id, 'project_id': self.project.id} group = db.security_group_create(self.context, values) - ref = self.compute_api.create_instance(self.context, - security_groups=[group['id']], - **inst) - # reload to get groups - instance_ref = db.instance_get(self.context, ref['id']) + ref = self.compute_api.create_instances(self.context, + FLAGS.default_instance_type, None, security_group=['default']) try: - self.assertEqual(len(instance_ref['security_groups']), 1) + self.assertEqual(len(ref[0]['security_groups']), 1) finally: db.security_group_destroy(self.context, group['id']) - db.instance_destroy(self.context, instance_ref['id']) + db.instance_destroy(self.context, ref[0]['id']) @defer.inlineCallbacks def test_run_terminate(self): diff --git a/nova/virt/connection.py b/nova/virt/connection.py index 11f0fa8ce..c40bb4bb4 100644 --- a/nova/virt/connection.py +++ b/nova/virt/connection.py @@ -25,7 +25,7 @@ import sys from nova import flags from nova.virt import fake from nova.virt import libvirt_conn -from nova.virt import xenapi +from nova.virt import xenapi_conn FLAGS = flags.FLAGS @@ -61,7 +61,7 @@ def get_connection(read_only=False): elif t == 'libvirt': conn = libvirt_conn.get_connection(read_only) elif t == 'xenapi': - conn = xenapi.get_connection(read_only) + conn = xenapi_conn.get_connection(read_only) else: raise Exception('Unknown connection type "%s"' % t) diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py deleted file mode 100644 index de3d68582..000000000 --- a/nova/virt/xenapi.py +++ /dev/null @@ -1,442 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (c) 2010 Citrix Systems, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -A connection to XenServer or Xen Cloud Platform. - -The concurrency model for this class is as follows: - -All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator -deferredToThread). They are remote calls, and so may hang for the usual -reasons. They should not be allowed to block the reactor thread. - -All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async -(using XenAPI.VM.async_start etc). These return a task, which can then be -polled for completion. Polling is handled using reactor.callLater. - -This combination of techniques means that we don't block the reactor thread at -all, and at the same time we don't hold lots of threads waiting for -long-running operations. - -FIXME: get_info currently doesn't conform to these rules, and will block the -reactor thread if the VM.get_by_name_label or VM.get_record calls block. - -**Related Flags** - -:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. -:xenapi_connection_username: Username for connection to XenServer/Xen Cloud - Platform (default: root). -:xenapi_connection_password: Password for connection to XenServer/Xen Cloud - Platform. -:xenapi_task_poll_interval: The interval (seconds) used for polling of - remote tasks (Async.VM.start, etc) - (default: 0.5). - -""" - -import logging -import xmlrpclib - -from twisted.internet import defer -from twisted.internet import reactor - -from nova import db -from nova import flags -from nova import utils -from nova.auth.manager import AuthManager -from nova.compute import instance_types -from nova.compute import power_state -from nova.virt import images - -XenAPI = None - - -FLAGS = flags.FLAGS -flags.DEFINE_string('xenapi_connection_url', - None, - 'URL for connection to XenServer/Xen Cloud Platform.' - ' Required if connection_type=xenapi.') -flags.DEFINE_string('xenapi_connection_username', - 'root', - 'Username for connection to XenServer/Xen Cloud Platform.' - ' Used only if connection_type=xenapi.') -flags.DEFINE_string('xenapi_connection_password', - None, - 'Password for connection to XenServer/Xen Cloud Platform.' - ' Used only if connection_type=xenapi.') -flags.DEFINE_float('xenapi_task_poll_interval', - 0.5, - 'The interval used for polling of remote tasks ' - '(Async.VM.start, etc). Used only if ' - 'connection_type=xenapi.') - - -XENAPI_POWER_STATE = { - 'Halted': power_state.SHUTDOWN, - 'Running': power_state.RUNNING, - 'Paused': power_state.PAUSED, - 'Suspended': power_state.SHUTDOWN, # FIXME - 'Crashed': power_state.CRASHED} - - -def get_connection(_): - """Note that XenAPI doesn't have a read-only connection mode, so - the read_only parameter is ignored.""" - # This is loaded late so that there's no need to install this - # library when not using XenAPI. - global XenAPI - if XenAPI is None: - XenAPI = __import__('XenAPI') - url = FLAGS.xenapi_connection_url - username = FLAGS.xenapi_connection_username - password = FLAGS.xenapi_connection_password - if not url or password is None: - raise Exception('Must specify xenapi_connection_url, ' - 'xenapi_connection_username (optionally), and ' - 'xenapi_connection_password to use ' - 'connection_type=xenapi') - return XenAPIConnection(url, username, password) - - -class XenAPIConnection(object): - def __init__(self, url, user, pw): - self._conn = XenAPI.Session(url) - self._conn.login_with_password(user, pw) - - def list_instances(self): - return [self._conn.xenapi.VM.get_name_label(vm) \ - for vm in self._conn.xenapi.VM.get_all()] - - @defer.inlineCallbacks - def spawn(self, instance): - vm = yield self._lookup(instance.name) - if vm is not None: - raise Exception('Attempted to create non-unique name %s' % - instance.name) - - network = db.project_get_network(None, instance.project_id) - network_ref = \ - yield self._find_network_with_bridge(network.bridge) - - user = AuthManager().get_user(instance.user_id) - project = AuthManager().get_project(instance.project_id) - vdi_uuid = yield self._fetch_image( - instance.image_id, user, project, True) - kernel = yield self._fetch_image( - instance.kernel_id, user, project, False) - ramdisk = yield self._fetch_image( - instance.ramdisk_id, user, project, False) - vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid) - - vm_ref = yield self._create_vm(instance, kernel, ramdisk) - yield self._create_vbd(vm_ref, vdi_ref, 0, True) - if network_ref: - yield self._create_vif(vm_ref, network_ref, instance.mac_address) - logging.debug('Starting VM %s...', vm_ref) - yield self._call_xenapi('VM.start', vm_ref, False, False) - logging.info('Spawning VM %s created %s.', instance.name, vm_ref) - - @defer.inlineCallbacks - def _create_vm(self, instance, kernel, ramdisk): - """Create a VM record. Returns a Deferred that gives the new - VM reference.""" - - instance_type = instance_types.INSTANCE_TYPES[instance.instance_type] - mem = str(long(instance_type['memory_mb']) * 1024 * 1024) - vcpus = str(instance_type['vcpus']) - rec = { - 'name_label': instance.name, - 'name_description': '', - 'is_a_template': False, - 'memory_static_min': '0', - 'memory_static_max': mem, - 'memory_dynamic_min': mem, - 'memory_dynamic_max': mem, - 'VCPUs_at_startup': vcpus, - 'VCPUs_max': vcpus, - 'VCPUs_params': {}, - 'actions_after_shutdown': 'destroy', - 'actions_after_reboot': 'restart', - 'actions_after_crash': 'destroy', - 'PV_bootloader': '', - 'PV_kernel': kernel, - 'PV_ramdisk': ramdisk, - 'PV_args': 'root=/dev/xvda1', - 'PV_bootloader_args': '', - 'PV_legacy_args': '', - 'HVM_boot_policy': '', - 'HVM_boot_params': {}, - 'platform': {}, - 'PCI_bus': '', - 'recommendations': '', - 'affinity': '', - 'user_version': '0', - 'other_config': {}, - } - logging.debug('Created VM %s...', instance.name) - vm_ref = yield self._call_xenapi('VM.create', rec) - logging.debug('Created VM %s as %s.', instance.name, vm_ref) - defer.returnValue(vm_ref) - - @defer.inlineCallbacks - def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): - """Create a VBD record. Returns a Deferred that gives the new - VBD reference.""" - - vbd_rec = {} - vbd_rec['VM'] = vm_ref - vbd_rec['VDI'] = vdi_ref - vbd_rec['userdevice'] = str(userdevice) - vbd_rec['bootable'] = bootable - vbd_rec['mode'] = 'RW' - vbd_rec['type'] = 'disk' - vbd_rec['unpluggable'] = True - vbd_rec['empty'] = False - vbd_rec['other_config'] = {} - vbd_rec['qos_algorithm_type'] = '' - vbd_rec['qos_algorithm_params'] = {} - vbd_rec['qos_supported_algorithms'] = [] - logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec) - logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, - vdi_ref) - defer.returnValue(vbd_ref) - - @defer.inlineCallbacks - def _create_vif(self, vm_ref, network_ref, mac_address): - """Create a VIF record. Returns a Deferred that gives the new - VIF reference.""" - - vif_rec = {} - vif_rec['device'] = '0' - vif_rec['network'] = network_ref - vif_rec['VM'] = vm_ref - vif_rec['MAC'] = mac_address - vif_rec['MTU'] = '1500' - vif_rec['other_config'] = {} - vif_rec['qos_algorithm_type'] = '' - vif_rec['qos_algorithm_params'] = {} - logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, - network_ref) - vif_ref = yield self._call_xenapi('VIF.create', vif_rec) - logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, - vm_ref, network_ref) - defer.returnValue(vif_ref) - - @defer.inlineCallbacks - def _find_network_with_bridge(self, bridge): - expr = 'field "bridge" = "%s"' % bridge - networks = yield self._call_xenapi('network.get_all_records_where', - expr) - if len(networks) == 1: - defer.returnValue(networks.keys()[0]) - elif len(networks) > 1: - raise Exception('Found non-unique network for bridge %s' % bridge) - else: - raise Exception('Found no network for bridge %s' % bridge) - - @defer.inlineCallbacks - def _fetch_image(self, image, user, project, use_sr): - """use_sr: True to put the image as a VDI in an SR, False to place - it on dom0's filesystem. The former is for VM disks, the latter for - its kernel and ramdisk (if external kernels are being used). - Returns a Deferred that gives the new VDI UUID.""" - - url = images.image_url(image) - access = AuthManager().get_access_key(user, project) - logging.debug("Asking xapi to fetch %s as %s" % (url, access)) - fn = use_sr and 'get_vdi' or 'get_kernel' - args = {} - args['src_url'] = url - args['username'] = access - args['password'] = user.secret - if use_sr: - args['add_partition'] = 'true' - task = yield self._async_call_plugin('objectstore', fn, args) - uuid = yield self._wait_for_task(task) - defer.returnValue(uuid) - - @defer.inlineCallbacks - def reboot(self, instance): - vm = yield self._lookup(instance.name) - if vm is None: - raise Exception('instance not present %s' % instance.name) - task = yield self._call_xenapi('Async.VM.clean_reboot', vm) - yield self._wait_for_task(task) - - @defer.inlineCallbacks - def destroy(self, instance): - vm = yield self._lookup(instance.name) - if vm is None: - # Don't complain, just return. This lets us clean up instances - # that have already disappeared from the underlying platform. - defer.returnValue(None) - # Get the VDIs related to the VM - vdis = yield self._lookup_vm_vdis(vm) - try: - task = yield self._call_xenapi('Async.VM.hard_shutdown', vm) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - # Disk clean-up - if vdis: - for vdi in vdis: - try: - task = yield self._call_xenapi('Async.VDI.destroy', vdi) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - try: - task = yield self._call_xenapi('Async.VM.destroy', vm) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - - def get_info(self, instance_id): - vm = self._lookup_blocking(instance_id) - if vm is None: - raise Exception('instance not present %s' % instance_id) - rec = self._conn.xenapi.VM.get_record(vm) - return {'state': XENAPI_POWER_STATE[rec['power_state']], - 'max_mem': long(rec['memory_static_max']) >> 10, - 'mem': long(rec['memory_dynamic_max']) >> 10, - 'num_cpu': rec['VCPUs_max'], - 'cpu_time': 0} - - def get_console_output(self, instance): - return 'FAKE CONSOLE OUTPUT' - - @utils.deferredToThread - def _lookup(self, i): - return self._lookup_blocking(i) - - def _lookup_blocking(self, i): - vms = self._conn.xenapi.VM.get_by_name_label(i) - n = len(vms) - if n == 0: - return None - elif n > 1: - raise Exception('duplicate name found: %s' % i) - else: - return vms[0] - - @utils.deferredToThread - def _lookup_vm_vdis(self, vm): - return self._lookup_vm_vdis_blocking(vm) - - def _lookup_vm_vdis_blocking(self, vm): - # Firstly we get the VBDs, then the VDIs. - # TODO: do we leave the read-only devices? - vbds = self._conn.xenapi.VM.get_VBDs(vm) - vdis = [] - if vbds: - for vbd in vbds: - try: - vdi = self._conn.xenapi.VBD.get_VDI(vbd) - # Test valid VDI - record = self._conn.xenapi.VDI.get_record(vdi) - except Exception, exc: - logging.warn(exc) - else: - vdis.append(vdi) - if len(vdis) > 0: - return vdis - else: - return None - - def _wait_for_task(self, task): - """Return a Deferred that will give the result of the given task. - The task is polled until it completes.""" - d = defer.Deferred() - reactor.callLater(0, self._poll_task, task, d) - return d - - @utils.deferredToThread - def _poll_task(self, task, deferred): - """Poll the given XenAPI task, and fire the given Deferred if we - get a result.""" - try: - #logging.debug('Polling task %s...', task) - status = self._conn.xenapi.task.get_status(task) - if status == 'pending': - reactor.callLater(FLAGS.xenapi_task_poll_interval, - self._poll_task, task, deferred) - elif status == 'success': - result = self._conn.xenapi.task.get_result(task) - logging.info('Task %s status: success. %s', task, result) - deferred.callback(_parse_xmlrpc_value(result)) - else: - error_info = self._conn.xenapi.task.get_error_info(task) - logging.warn('Task %s status: %s. %s', task, status, - error_info) - deferred.errback(XenAPI.Failure(error_info)) - #logging.debug('Polling task %s done.', task) - except Exception, exc: - logging.warn(exc) - deferred.errback(exc) - - @utils.deferredToThread - def _call_xenapi(self, method, *args): - """Call the specified XenAPI method on a background thread. Returns - a Deferred for the result.""" - f = self._conn.xenapi - for m in method.split('.'): - f = f.__getattr__(m) - return f(*args) - - @utils.deferredToThread - def _async_call_plugin(self, plugin, fn, args): - """Call Async.host.call_plugin on a background thread. Returns a - Deferred with the task reference.""" - return _unwrap_plugin_exceptions( - self._conn.xenapi.Async.host.call_plugin, - self._get_xenapi_host(), plugin, fn, args) - - def _get_xenapi_host(self): - return self._conn.xenapi.session.get_this_host(self._conn.handle) - - -def _unwrap_plugin_exceptions(func, *args, **kwargs): - try: - return func(*args, **kwargs) - except XenAPI.Failure, exc: - logging.debug("Got exception: %s", exc) - if (len(exc.details) == 4 and - exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and - exc.details[2] == 'Failure'): - params = None - try: - params = eval(exc.details[3]) - except: - raise exc - raise XenAPI.Failure(params) - else: - raise - except xmlrpclib.ProtocolError, exc: - logging.debug("Got exception: %s", exc) - raise - - -def _parse_xmlrpc_value(val): - """Parse the given value as if it were an XML-RPC value. This is - sometimes used as the format for the task.result field.""" - if not val: - return val - x = xmlrpclib.loads( - '<?xml version="1.0"?><methodResponse><params><param>' + - val + - '</param></params></methodResponse>') - return x[0][0] diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py new file mode 100644 index 000000000..3d598c463 --- /dev/null +++ b/nova/virt/xenapi/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py new file mode 100644 index 000000000..8cb4cce3a --- /dev/null +++ b/nova/virt/xenapi/network_utils.py @@ -0,0 +1,45 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of network +records and their attributes like bridges, PIFs, QoS, as well as +their lookup functions. +""" + +from twisted.internet import defer + + +class NetworkHelper(): + """ + The class that wraps the helper methods together. + """ + def __init__(self): + return + + @classmethod + @defer.inlineCallbacks + def find_network_with_bridge(cls, session, bridge): + """ Return the network on which the bridge is attached, if found """ + expr = 'field "bridge" = "%s"' % bridge + networks = yield session.call_xenapi('network.get_all_records_where', + expr) + if len(networks) == 1: + defer.returnValue(networks.keys()[0]) + elif len(networks) > 1: + raise Exception('Found non-unique network for bridge %s' % bridge) + else: + raise Exception('Found no network for bridge %s' % bridge) diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py new file mode 100644 index 000000000..77edb576e --- /dev/null +++ b/nova/virt/xenapi/vm_utils.py @@ -0,0 +1,268 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of VM records and +their attributes like VDIs, VIFs, as well as their lookup functions. +""" + +import logging +import urllib + +from twisted.internet import defer +from xml.dom import minidom + +from nova import flags +from nova import utils + +from nova.auth.manager import AuthManager +from nova.compute import instance_types +from nova.compute import power_state +from nova.virt import images + +FLAGS = flags.FLAGS + +XENAPI_POWER_STATE = { + 'Halted': power_state.SHUTDOWN, + 'Running': power_state.RUNNING, + 'Paused': power_state.PAUSED, + 'Suspended': power_state.SHUTDOWN, # FIXME + 'Crashed': power_state.CRASHED} + +XenAPI = None + + +class VMHelper(): + """ + The class that wraps the helper methods together. + """ + def __init__(self): + return + + @classmethod + def late_import(cls): + """ + Load the XenAPI module in for helper class, if required. + This is to avoid to install the XenAPI library when other + hypervisors are used + """ + global XenAPI + if XenAPI is None: + XenAPI = __import__('XenAPI') + + @classmethod + @defer.inlineCallbacks + def create_vm(cls, session, instance, kernel, ramdisk): + """Create a VM record. Returns a Deferred that gives the new + VM reference.""" + + instance_type = instance_types.INSTANCE_TYPES[instance.instance_type] + mem = str(long(instance_type['memory_mb']) * 1024 * 1024) + vcpus = str(instance_type['vcpus']) + rec = { + 'name_label': instance.name, + 'name_description': '', + 'is_a_template': False, + 'memory_static_min': '0', + 'memory_static_max': mem, + 'memory_dynamic_min': mem, + 'memory_dynamic_max': mem, + 'VCPUs_at_startup': vcpus, + 'VCPUs_max': vcpus, + 'VCPUs_params': {}, + 'actions_after_shutdown': 'destroy', + 'actions_after_reboot': 'restart', + 'actions_after_crash': 'destroy', + 'PV_bootloader': '', + 'PV_kernel': kernel, + 'PV_ramdisk': ramdisk, + 'PV_args': 'root=/dev/xvda1', + 'PV_bootloader_args': '', + 'PV_legacy_args': '', + 'HVM_boot_policy': '', + 'HVM_boot_params': {}, + 'platform': {}, + 'PCI_bus': '', + 'recommendations': '', + 'affinity': '', + 'user_version': '0', + 'other_config': {}, + } + logging.debug('Created VM %s...', instance.name) + vm_ref = yield session.call_xenapi('VM.create', rec) + logging.debug('Created VM %s as %s.', instance.name, vm_ref) + defer.returnValue(vm_ref) + + @classmethod + @defer.inlineCallbacks + def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable): + """Create a VBD record. Returns a Deferred that gives the new + VBD reference.""" + + vbd_rec = {} + vbd_rec['VM'] = vm_ref + vbd_rec['VDI'] = vdi_ref + vbd_rec['userdevice'] = str(userdevice) + vbd_rec['bootable'] = bootable + vbd_rec['mode'] = 'RW' + vbd_rec['type'] = 'disk' + vbd_rec['unpluggable'] = True + vbd_rec['empty'] = False + vbd_rec['other_config'] = {} + vbd_rec['qos_algorithm_type'] = '' + vbd_rec['qos_algorithm_params'] = {} + vbd_rec['qos_supported_algorithms'] = [] + logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) + vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec) + logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, + vdi_ref) + defer.returnValue(vbd_ref) + + @classmethod + @defer.inlineCallbacks + def create_vif(cls, session, vm_ref, network_ref, mac_address): + """Create a VIF record. Returns a Deferred that gives the new + VIF reference.""" + + vif_rec = {} + vif_rec['device'] = '0' + vif_rec['network'] = network_ref + vif_rec['VM'] = vm_ref + vif_rec['MAC'] = mac_address + vif_rec['MTU'] = '1500' + vif_rec['other_config'] = {} + vif_rec['qos_algorithm_type'] = '' + vif_rec['qos_algorithm_params'] = {} + logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, + network_ref) + vif_ref = yield session.call_xenapi('VIF.create', vif_rec) + logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, + vm_ref, network_ref) + defer.returnValue(vif_ref) + + @classmethod + @defer.inlineCallbacks + def fetch_image(cls, session, image, user, project, use_sr): + """use_sr: True to put the image as a VDI in an SR, False to place + it on dom0's filesystem. The former is for VM disks, the latter for + its kernel and ramdisk (if external kernels are being used). + Returns a Deferred that gives the new VDI UUID.""" + + url = images.image_url(image) + access = AuthManager().get_access_key(user, project) + logging.debug("Asking xapi to fetch %s as %s", url, access) + fn = use_sr and 'get_vdi' or 'get_kernel' + args = {} + args['src_url'] = url + args['username'] = access + args['password'] = user.secret + if use_sr: + args['add_partition'] = 'true' + task = yield session.async_call_plugin('objectstore', fn, args) + uuid = yield session.wait_for_task(task) + defer.returnValue(uuid) + + @classmethod + @utils.deferredToThread + def lookup(cls, session, i): + """ Look the instance i up, and returns it if available """ + return VMHelper.lookup_blocking(session, i) + + @classmethod + def lookup_blocking(cls, session, i): + """ Synchronous lookup """ + vms = session.get_xenapi().VM.get_by_name_label(i) + n = len(vms) + if n == 0: + return None + elif n > 1: + raise Exception('duplicate name found: %s' % i) + else: + return vms[0] + + @classmethod + @utils.deferredToThread + def lookup_vm_vdis(cls, session, vm): + """ Look for the VDIs that are attached to the VM """ + return VMHelper.lookup_vm_vdis_blocking(session, vm) + + @classmethod + def lookup_vm_vdis_blocking(cls, session, vm): + """ Synchronous lookup_vm_vdis """ + # Firstly we get the VBDs, then the VDIs. + # TODO(Armando): do we leave the read-only devices? + vbds = session.get_xenapi().VM.get_VBDs(vm) + vdis = [] + if vbds: + for vbd in vbds: + try: + vdi = session.get_xenapi().VBD.get_VDI(vbd) + # Test valid VDI + record = session.get_xenapi().VDI.get_record(vdi) + logging.debug('VDI %s is still available', record['uuid']) + except XenAPI.Failure, exc: + logging.warn(exc) + else: + vdis.append(vdi) + if len(vdis) > 0: + return vdis + else: + return None + + @classmethod + def compile_info(cls, record): + return {'state': XENAPI_POWER_STATE[record['power_state']], + 'max_mem': long(record['memory_static_max']) >> 10, + 'mem': long(record['memory_dynamic_max']) >> 10, + 'num_cpu': record['VCPUs_max'], + 'cpu_time': 0} + + @classmethod + def compile_diagnostics(cls, session, record): + """Compile VM diagnostics data""" + try: + host = session.get_xenapi_host() + host_ip = session.get_xenapi().host.get_record(host)["address"] + metrics = session.get_xenapi().VM_guest_metrics.get_record( + record["guest_metrics"]) + diags = { + "Kernel": metrics["os_version"]["uname"], + "Distro": metrics["os_version"]["name"]} + xml = get_rrd(host_ip, record["uuid"]) + if xml: + rrd = minidom.parseString(xml) + for i, node in enumerate(rrd.firstChild.childNodes): + # We don't want all of the extra garbage + if i >= 3 and i <= 11: + ref = node.childNodes + # Name and Value + diags[ref[0].firstChild.data] = ref[6].firstChild.data + return diags + except XenAPI.Failure as e: + return {"Unable to retrieve diagnostics": e} + + +def get_rrd(host, uuid): + """Return the VM RRD XML as a string""" + try: + xml = urllib.urlopen("http://%s:%s@%s/vm_rrd?uuid=%s" % ( + FLAGS.xenapi_connection_username, + FLAGS.xenapi_connection_password, + host, + uuid)) + return xml.read() + except IOError: + return None diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py new file mode 100644 index 000000000..9bfd07267 --- /dev/null +++ b/nova/virt/xenapi/vmops.py @@ -0,0 +1,146 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for VM-related functions (spawn, reboot, etc). +""" + +import logging + +from twisted.internet import defer + +from nova import db +from nova import context + +from nova.auth.manager import AuthManager +from nova.virt.xenapi.network_utils import NetworkHelper +from nova.virt.xenapi.vm_utils import VMHelper + +XenAPI = None + + +class VMOps(object): + """ + Management class for VM-related tasks + """ + def __init__(self, session): + global XenAPI + if XenAPI is None: + XenAPI = __import__('XenAPI') + self._session = session + # Load XenAPI module in the helper class + VMHelper.late_import() + + def list_instances(self): + """ List VM instances """ + return [self._session.get_xenapi().VM.get_name_label(vm) \ + for vm in self._session.get_xenapi().VM.get_all()] + + @defer.inlineCallbacks + def spawn(self, instance): + """ Create VM instance """ + vm = yield VMHelper.lookup(self._session, instance.name) + if vm is not None: + raise Exception('Attempted to create non-unique name %s' % + instance.name) + + bridge = db.project_get_network(context.get_admin_context(), + instance.project_id).bridge + network_ref = \ + yield NetworkHelper.find_network_with_bridge(self._session, bridge) + + user = AuthManager().get_user(instance.user_id) + project = AuthManager().get_project(instance.project_id) + vdi_uuid = yield VMHelper.fetch_image(self._session, + instance.image_id, user, project, True) + kernel = yield VMHelper.fetch_image(self._session, + instance.kernel_id, user, project, False) + ramdisk = yield VMHelper.fetch_image(self._session, + instance.ramdisk_id, user, project, False) + vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid) + vm_ref = yield VMHelper.create_vm(self._session, + instance, kernel, ramdisk) + yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True) + if network_ref: + yield VMHelper.create_vif(self._session, vm_ref, + network_ref, instance.mac_address) + logging.debug('Starting VM %s...', vm_ref) + yield self._session.call_xenapi('VM.start', vm_ref, False, False) + logging.info('Spawning VM %s created %s.', instance.name, + vm_ref) + + @defer.inlineCallbacks + def reboot(self, instance): + """ Reboot VM instance """ + instance_name = instance.name + vm = yield VMHelper.lookup(self._session, instance_name) + if vm is None: + raise Exception('instance not present %s' % instance_name) + task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm) + yield self._session.wait_for_task(task) + + @defer.inlineCallbacks + def destroy(self, instance): + """ Destroy VM instance """ + vm = yield VMHelper.lookup(self._session, instance.name) + if vm is None: + # Don't complain, just return. This lets us clean up instances + # that have already disappeared from the underlying platform. + defer.returnValue(None) + # Get the VDIs related to the VM + vdis = yield VMHelper.lookup_vm_vdis(self._session, vm) + try: + task = yield self._session.call_xenapi('Async.VM.hard_shutdown', + vm) + yield self._session.wait_for_task(task) + except XenAPI.Failure, exc: + logging.warn(exc) + # Disk clean-up + if vdis: + for vdi in vdis: + try: + task = yield self._session.call_xenapi('Async.VDI.destroy', + vdi) + yield self._session.wait_for_task(task) + except XenAPI.Failure, exc: + logging.warn(exc) + try: + task = yield self._session.call_xenapi('Async.VM.destroy', vm) + yield self._session.wait_for_task(task) + except XenAPI.Failure, exc: + logging.warn(exc) + + def get_info(self, instance_id): + """ Return data about VM instance """ + vm = VMHelper.lookup_blocking(self._session, instance_id) + if vm is None: + raise Exception('instance not present %s' % instance_id) + rec = self._session.get_xenapi().VM.get_record(vm) + return VMHelper.compile_info(rec) + + @defer.inlineCallbacks + def get_diagnostics(self, instance_id): + """Return data about VM diagnostics""" + vm = yield VMHelper.lookup(self._session, instance_id) + if vm is None: + raise Exception("instance not present %s" % instance_id) + rec = yield self._session.get_xenapi().VM.get_record(vm) + defer.returnValue(VMHelper.compile_diagnostics(self._session, rec)) + + def get_console_output(self, instance): + """ Return snapshot of console """ + # TODO: implement this to fix pylint! + return 'FAKE CONSOLE OUTPUT of instance' diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py new file mode 100644 index 000000000..a4c7a3861 --- /dev/null +++ b/nova/virt/xenapi/volumeops.py @@ -0,0 +1,32 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for Storage-related functions (attach, detach, etc). +""" + + +class VolumeOps(object): + def __init__(self, session): + self._session = session + + def attach_volume(self, instance_name, device_path, mountpoint): + # FIXME: that's going to be sorted when iscsi-xenapi lands in branch + return True + + def detach_volume(self, instance_name, mountpoint): + # FIXME: that's going to be sorted when iscsi-xenapi lands in branch + return True diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py new file mode 100644 index 000000000..2153810c8 --- /dev/null +++ b/nova/virt/xenapi_conn.py @@ -0,0 +1,242 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A connection to XenServer or Xen Cloud Platform. + +The concurrency model for this class is as follows: + +All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator +deferredToThread). They are remote calls, and so may hang for the usual +reasons. They should not be allowed to block the reactor thread. + +All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async +(using XenAPI.VM.async_start etc). These return a task, which can then be +polled for completion. Polling is handled using reactor.callLater. + +This combination of techniques means that we don't block the reactor thread at +all, and at the same time we don't hold lots of threads waiting for +long-running operations. + +FIXME: get_info currently doesn't conform to these rules, and will block the +reactor thread if the VM.get_by_name_label or VM.get_record calls block. + +**Related Flags** + +:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. +:xenapi_connection_username: Username for connection to XenServer/Xen Cloud + Platform (default: root). +:xenapi_connection_password: Password for connection to XenServer/Xen Cloud + Platform. +:xenapi_task_poll_interval: The interval (seconds) used for polling of + remote tasks (Async.VM.start, etc) + (default: 0.5). + +""" + +import logging +import xmlrpclib + +from twisted.internet import defer +from twisted.internet import reactor + +from nova import utils +from nova import flags +from nova.virt.xenapi.vmops import VMOps +from nova.virt.xenapi.volumeops import VolumeOps + +FLAGS = flags.FLAGS +flags.DEFINE_string('xenapi_connection_url', + None, + 'URL for connection to XenServer/Xen Cloud Platform.' + ' Required if connection_type=xenapi.') +flags.DEFINE_string('xenapi_connection_username', + 'root', + 'Username for connection to XenServer/Xen Cloud Platform.' + ' Used only if connection_type=xenapi.') +flags.DEFINE_string('xenapi_connection_password', + None, + 'Password for connection to XenServer/Xen Cloud Platform.' + ' Used only if connection_type=xenapi.') +flags.DEFINE_float('xenapi_task_poll_interval', + 0.5, + 'The interval used for polling of remote tasks ' + '(Async.VM.start, etc). Used only if ' + 'connection_type=xenapi.') + +XenAPI = None + + +def get_connection(_): + """Note that XenAPI doesn't have a read-only connection mode, so + the read_only parameter is ignored.""" + # This is loaded late so that there's no need to install this + # library when not using XenAPI. + global XenAPI + if XenAPI is None: + XenAPI = __import__('XenAPI') + url = FLAGS.xenapi_connection_url + username = FLAGS.xenapi_connection_username + password = FLAGS.xenapi_connection_password + if not url or password is None: + raise Exception('Must specify xenapi_connection_url, ' + 'xenapi_connection_username (optionally), and ' + 'xenapi_connection_password to use ' + 'connection_type=xenapi') + return XenAPIConnection(url, username, password) + + +class XenAPIConnection(object): + """ A connection to XenServer or Xen Cloud Platform """ + def __init__(self, url, user, pw): + session = XenAPISession(url, user, pw) + self._vmops = VMOps(session) + self._volumeops = VolumeOps(session) + + def list_instances(self): + """ List VM instances """ + return self._vmops.list_instances() + + def spawn(self, instance): + """ Create VM instance """ + self._vmops.spawn(instance) + + def reboot(self, instance): + """ Reboot VM instance """ + self._vmops.reboot(instance) + + def destroy(self, instance): + """ Destroy VM instance """ + self._vmops.destroy(instance) + + def get_info(self, instance_id): + """ Return data about VM instance """ + return self._vmops.get_info(instance_id) + + def get_diagnostics(self, instance_id): + """Return data about VM diagnostics""" + return self._vmops.get_diagnostics(instance_id) + + def get_console_output(self, instance): + """ Return snapshot of console """ + return self._vmops.get_console_output(instance) + + def attach_volume(self, instance_name, device_path, mountpoint): + """ Attach volume storage to VM instance """ + return self._volumeops.attach_volume(instance_name, + device_path, + mountpoint) + + def detach_volume(self, instance_name, mountpoint): + """ Detach volume storage to VM instance """ + return self._volumeops.detach_volume(instance_name, mountpoint) + + +class XenAPISession(object): + """ The session to invoke XenAPI SDK calls """ + def __init__(self, url, user, pw): + self._session = XenAPI.Session(url) + self._session.login_with_password(user, pw) + + def get_xenapi(self): + """ Return the xenapi object """ + return self._session.xenapi + + def get_xenapi_host(self): + """ Return the xenapi host """ + return self._session.xenapi.session.get_this_host(self._session.handle) + + @utils.deferredToThread + def call_xenapi(self, method, *args): + """Call the specified XenAPI method on a background thread. Returns + a Deferred for the result.""" + f = self._session.xenapi + for m in method.split('.'): + f = f.__getattr__(m) + return f(*args) + + @utils.deferredToThread + def async_call_plugin(self, plugin, fn, args): + """Call Async.host.call_plugin on a background thread. Returns a + Deferred with the task reference.""" + return _unwrap_plugin_exceptions( + self._session.xenapi.Async.host.call_plugin, + self.get_xenapi_host(), plugin, fn, args) + + def wait_for_task(self, task): + """Return a Deferred that will give the result of the given task. + The task is polled until it completes.""" + d = defer.Deferred() + reactor.callLater(0, self._poll_task, task, d) + return d + + @utils.deferredToThread + def _poll_task(self, task, deferred): + """Poll the given XenAPI task, and fire the given Deferred if we + get a result.""" + try: + #logging.debug('Polling task %s...', task) + status = self._session.xenapi.task.get_status(task) + if status == 'pending': + reactor.callLater(FLAGS.xenapi_task_poll_interval, + self._poll_task, task, deferred) + elif status == 'success': + result = self._session.xenapi.task.get_result(task) + logging.info('Task %s status: success. %s', task, result) + deferred.callback(_parse_xmlrpc_value(result)) + else: + error_info = self._session.xenapi.task.get_error_info(task) + logging.warn('Task %s status: %s. %s', task, status, + error_info) + deferred.errback(XenAPI.Failure(error_info)) + #logging.debug('Polling task %s done.', task) + except XenAPI.Failure, exc: + logging.warn(exc) + deferred.errback(exc) + + +def _unwrap_plugin_exceptions(func, *args, **kwargs): + """ Parse exception details """ + try: + return func(*args, **kwargs) + except XenAPI.Failure, exc: + logging.debug("Got exception: %s", exc) + if (len(exc.details) == 4 and + exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and + exc.details[2] == 'Failure'): + params = None + try: + params = eval(exc.details[3]) + except: + raise exc + raise XenAPI.Failure(params) + else: + raise + except xmlrpclib.ProtocolError, exc: + logging.debug("Got exception: %s", exc) + raise + + +def _parse_xmlrpc_value(val): + """Parse the given value as if it were an XML-RPC value. This is + sometimes used as the format for the task.result field.""" + if not val: + return val + x = xmlrpclib.loads( + '<?xml version="1.0"?><methodResponse><params><param>' + + val + + '</param></params></methodResponse>') + return x[0][0] |
