diff options
author | Anne Gentle <anne@openstack.org> | 2011-01-07 10:57:53 -0600 |
---|---|---|
committer | Anne Gentle <anne@openstack.org> | 2011-01-07 10:57:53 -0600 |
commit | 5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1 (patch) | |
tree | e75bc3a452904d8e28db0d2a69b44d726835b0f0 | |
parent | 80d10d960d0d472bf9a1d35bce081fb350c98a35 (diff) | |
parent | e33102d23ec8f357c08e2583f8d9e3c1753bab4d (diff) | |
download | nova-5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1.tar.gz nova-5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1.tar.xz nova-5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1.zip |
Merged with trunk
38 files changed, 1707 insertions, 550 deletions
@@ -30,3 +30,4 @@ <rconradharris@gmail.com> <rick.harris@rackspace.com> <corywright@gmail.com> <cory.wright@rackspace.com> <ant@openstack.org> <amesserl@rackspace.com> +<chiradeep@cloud.com> <chiradeep@chiradeep-lt2> @@ -3,6 +3,7 @@ Anne Gentle <anne@openstack.org> Anthony Young <sleepsonthefloor@gmail.com> Antony Messerli <ant@openstack.org> Armando Migliaccio <Armando.Migliaccio@eu.citrix.com> +Chiradeep Vittal <chiradeep@cloud.com> Chris Behrens <cbehrens@codestud.com> Chmouel Boudjnah <chmouel@chmouel.com> Cory Wright <corywright@gmail.com> diff --git a/doc/source/community.rst b/doc/source/community.rst index 402f9dfe4..4ae32f1eb 100644 --- a/doc/source/community.rst +++ b/doc/source/community.rst @@ -35,7 +35,8 @@ Contributing Code To contribute code, sign up for a Launchpad account and sign a contributor license agreement, available on the `OpenStack Wiki <http://wiki.openstack.org/CLA>`_. Once the CLA is signed you -can contribute code through the Bazaar version control system which is related to your Launchpad account. +can contribute code through the Bazaar version control system which is related to your Launchpad +account. See the :doc:`devref/development.environment` page to get started. #openstack on Freenode IRC Network ---------------------------------- diff --git a/doc/source/devref/development.environment.rst b/doc/source/devref/development.environment.rst index d5f10c42a..f3c454d64 100644 --- a/doc/source/devref/development.environment.rst +++ b/doc/source/devref/development.environment.rst @@ -88,7 +88,12 @@ Here's how to get the latest code:: source .nova_venv/bin/activate ./run_tests.sh -And then you can do cleaning work or hack hack hack with a branched named cleaning:: +Then you can do cleaning work or hack hack hack with a branched named cleaning. + +Contributing Your Work +---------------------- + +Once your work is complete you may wish to contribute it to the project. Add your name and email address to the `Authors` file, and also to the `.mailmap` file if you use multiple email addresses. Your contributions can not be merged into trunk unless you are listed in the Authors file. Now, push the branch to Launchpad:: bzr push lp:~launchpaduserid/nova/cleaning diff --git a/nova/adminclient.py b/nova/adminclient.py index 6ae9f0c0f..b2609c8c4 100644 --- a/nova/adminclient.py +++ b/nova/adminclient.py @@ -23,12 +23,9 @@ import base64 import boto import httplib -from nova import flags from boto.ec2.regioninfo import RegionInfo -FLAGS = flags.FLAGS - DEFAULT_CLC_URL = 'http://127.0.0.1:8773' DEFAULT_REGION = 'nova' @@ -199,8 +196,8 @@ class NovaAdminClient(object): self, clc_url=DEFAULT_CLC_URL, region=DEFAULT_REGION, - access_key=FLAGS.aws_access_key_id, - secret_key=FLAGS.aws_secret_access_key, + access_key=None, + secret_key=None, **kwargs): parts = self.split_clc_url(clc_url) diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index 9fb6307a8..6619b5452 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -31,19 +31,19 @@ import os from nova import context import IPy +from nova import compute from nova import crypto from nova import db from nova import exception from nova import flags -from nova import quota +from nova import network from nova import rpc from nova import utils -from nova.compute import api as compute_api +from nova import volume from nova.compute import instance_types FLAGS = flags.FLAGS -flags.DECLARE('storage_availability_zone', 'nova.volume.manager') InvalidInputException = exception.InvalidInputException @@ -71,16 +71,16 @@ def _gen_key(context, user_id, key_name): return {'private_key': private_key, 'fingerprint': fingerprint} -def ec2_id_to_internal_id(ec2_id): - """Convert an ec2 ID (i-[base 36 number]) to an internal id (int)""" +def ec2_id_to_id(ec2_id): + """Convert an ec2 ID (i-[base 36 number]) to an instance id (int)""" return int(ec2_id[2:], 36) -def internal_id_to_ec2_id(internal_id): - """Convert an internal ID (int) to an ec2 ID (i-[base 36 number])""" +def id_to_ec2_id(instance_id): + """Convert an instance ID (int) to an ec2 ID (i-[base 36 number])""" digits = [] - while internal_id != 0: - internal_id, remainder = divmod(internal_id, 36) + while instance_id != 0: + instance_id, remainder = divmod(instance_id, 36) digits.append('0123456789abcdefghijklmnopqrstuvwxyz'[remainder]) return "i-%s" % ''.join(reversed(digits)) @@ -91,10 +91,11 @@ class CloudController(object): sent to the other nodes. """ def __init__(self): - self.network_manager = utils.import_object(FLAGS.network_manager) self.image_service = utils.import_object(FLAGS.image_service) - self.compute_api = compute_api.ComputeAPI(self.network_manager, - self.image_service) + self.network_api = network.API() + self.volume_api = volume.API() + self.compute_api = compute.API(self.image_service, self.network_api, + self.volume_api) self.setup() def __str__(self): @@ -118,7 +119,8 @@ class CloudController(object): def _get_mpi_data(self, context, project_id): result = {} - for instance in self.compute_api.get_instances(context, project_id): + for instance in self.compute_api.get_all(context, + project_id=project_id): if instance['fixed_ip']: line = '%s slots=%d' % (instance['fixed_ip']['address'], instance['vcpus']) @@ -140,7 +142,7 @@ class CloudController(object): def get_metadata(self, address): ctxt = context.get_admin_context() - instance_ref = db.fixed_ip_get_instance(ctxt, address) + instance_ref = self.compute_api.get_all(ctxt, fixed_ip=address) if instance_ref is None: return None mpi = self._get_mpi_data(ctxt, instance_ref['project_id']) @@ -152,7 +154,7 @@ class CloudController(object): hostname = instance_ref['hostname'] floating_ip = db.instance_get_floating_address(ctxt, instance_ref['id']) - ec2_id = internal_id_to_ec2_id(instance_ref['internal_id']) + ec2_id = id_to_ec2_id(instance_ref['id']) data = { 'user-data': base64.b64decode(instance_ref['user_data']), 'meta-data': { @@ -478,8 +480,8 @@ class CloudController(object): def get_console_output(self, context, instance_id, **kwargs): # instance_id is passed in as a list of instances ec2_id = instance_id[0] - internal_id = ec2_id_to_internal_id(ec2_id) - instance_ref = self.compute_api.get_instance(context, internal_id) + instance_id = ec2_id_to_id(ec2_id) + instance_ref = self.compute_api.get(context, instance_id) output = rpc.call(context, '%s.%s' % (FLAGS.compute_topic, instance_ref['host']), @@ -492,27 +494,22 @@ class CloudController(object): "output": base64.b64encode(output)} def describe_volumes(self, context, volume_id=None, **kwargs): - if context.user.is_admin(): - volumes = db.volume_get_all(context) - else: - volumes = db.volume_get_all_by_project(context, context.project_id) - + volumes = self.volume_api.get_all(context) # NOTE(vish): volume_id is an optional list of volume ids to filter by. volumes = [self._format_volume(context, v) for v in volumes - if volume_id is None or v['ec2_id'] in volume_id] - + if volume_id is None or v['id'] in volume_id] return {'volumeSet': volumes} def _format_volume(self, context, volume): instance_ec2_id = None instance_data = None if volume.get('instance', None): - internal_id = volume['instance']['internal_id'] - instance_ec2_id = internal_id_to_ec2_id(internal_id) + instance_id = volume['instance']['id'] + instance_ec2_id = id_to_ec2_id(instance_id) instance_data = '%s[%s]' % (instance_ec2_id, volume['instance']['host']) v = {} - v['volumeId'] = volume['ec2_id'] + v['volumeId'] = volume['id'] v['status'] = volume['status'] v['size'] = volume['size'] v['availabilityZone'] = volume['availability_zone'] @@ -539,88 +536,47 @@ class CloudController(object): return v def create_volume(self, context, size, **kwargs): - # check quota - if quota.allowed_volumes(context, 1, size) < 1: - logging.warn("Quota exceeeded for %s, tried to create %sG volume", - context.project_id, size) - raise quota.QuotaError("Volume quota exceeded. You cannot " - "create a volume of size %s" % size) - vol = {} - vol['size'] = size - vol['user_id'] = context.user.id - vol['project_id'] = context.project_id - vol['availability_zone'] = FLAGS.storage_availability_zone - vol['status'] = "creating" - vol['attach_status'] = "detached" - vol['display_name'] = kwargs.get('display_name') - vol['display_description'] = kwargs.get('display_description') - volume_ref = db.volume_create(context, vol) - - rpc.cast(context, - FLAGS.scheduler_topic, - {"method": "create_volume", - "args": {"topic": FLAGS.volume_topic, - "volume_id": volume_ref['id']}}) - + volume = self.volume_api.create(context, size, + kwargs.get('display_name'), + kwargs.get('display_description')) # TODO(vish): Instance should be None at db layer instead of # trying to lazy load, but for now we turn it into # a dict to avoid an error. return {'volumeSet': [self._format_volume(context, dict(volume_ref))]} + def delete_volume(self, context, volume_id, **kwargs): + self.volume_api.delete(context, volume_id) + return True + + def update_volume(self, context, volume_id, **kwargs): + updatable_fields = ['display_name', 'display_description'] + changes = {} + for field in updatable_fields: + if field in kwargs: + changes[field] = kwargs[field] + if changes: + self.volume_api.update(context, volume_id, kwargs) + return True + def attach_volume(self, context, volume_id, instance_id, device, **kwargs): - volume_ref = db.volume_get_by_ec2_id(context, volume_id) - if not re.match("^/dev/[a-z]d[a-z]+$", device): - raise exception.ApiError(_("Invalid device specified: %s. " - "Example device: /dev/vdb") % device) - # TODO(vish): abstract status checking? - if volume_ref['status'] != "available": - raise exception.ApiError(_("Volume status must be available")) - if volume_ref['attach_status'] == "attached": - raise exception.ApiError(_("Volume is already attached")) - internal_id = ec2_id_to_internal_id(instance_id) - instance_ref = self.compute_api.get_instance(context, internal_id) - host = instance_ref['host'] - rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "attach_volume", - "args": {"volume_id": volume_ref['id'], - "instance_id": instance_ref['id'], - "mountpoint": device}}) - return {'attachTime': volume_ref['attach_time'], - 'device': volume_ref['mountpoint'], - 'instanceId': instance_ref['id'], + self.compute_api.attach_volume(context, instance_id, volume_id, device) + volume = self.volume_api.get(context, volume_id) + return {'attachTime': volume['attach_time'], + 'device': volume['mountpoint'], + 'instanceId': instance_id, 'requestId': context.request_id, - 'status': volume_ref['attach_status'], - 'volumeId': volume_ref['id']} + 'status': volume['attach_status'], + 'volumeId': volume_id} def detach_volume(self, context, volume_id, **kwargs): - volume_ref = db.volume_get_by_ec2_id(context, volume_id) - instance_ref = db.volume_get_instance(context.elevated(), - volume_ref['id']) - if not instance_ref: - raise exception.ApiError(_("Volume isn't attached to anything!")) - # TODO(vish): abstract status checking? - if volume_ref['status'] == "available": - raise exception.ApiError(_("Volume is already detached")) - try: - host = instance_ref['host'] - rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "detach_volume", - "args": {"instance_id": instance_ref['id'], - "volume_id": volume_ref['id']}}) - except exception.NotFound: - # If the instance doesn't exist anymore, - # then we need to call detach blind - db.volume_detached(context) - internal_id = instance_ref['internal_id'] - ec2_id = internal_id_to_ec2_id(internal_id) - return {'attachTime': volume_ref['attach_time'], - 'device': volume_ref['mountpoint'], - 'instanceId': internal_id, + volume = self.volume_api.get(context, volume_id) + instance = self.compute_api.detach_volume(context, volume_id) + return {'attachTime': volume['attach_time'], + 'device': volume['mountpoint'], + 'instanceId': id_to_ec2_id(instance['id']), 'requestId': context.request_id, - 'status': volume_ref['attach_status'], - 'volumeId': volume_ref['id']} + 'status': volume['attach_status'], + 'volumeId': volume_id} def _convert_to_set(self, lst, label): if lst == None or lst == []: @@ -629,16 +585,6 @@ class CloudController(object): lst = [lst] return [{label: x} for x in lst] - def update_volume(self, context, volume_id, **kwargs): - updatable_fields = ['display_name', 'display_description'] - changes = {} - for field in updatable_fields: - if field in kwargs: - changes[field] = kwargs[field] - if changes: - db.volume_update(context, volume_id, kwargs) - return True - def describe_instances(self, context, **kwargs): return self._format_describe_instances(context) @@ -646,24 +592,20 @@ class CloudController(object): return {'reservationSet': self._format_instances(context)} def _format_run_instances(self, context, reservation_id): - i = self._format_instances(context, reservation_id) + i = self._format_instances(context, reservation_id=reservation_id) assert len(i) == 1 return i[0] - def _format_instances(self, context, reservation_id=None): + def _format_instances(self, context, **kwargs): reservations = {} - if reservation_id: - instances = db.instance_get_all_by_reservation(context, - reservation_id) - else: - instances = self.compute_api.get_instances(context) + instances = self.compute_api.get_all(context, **kwargs) for instance in instances: if not context.user.is_admin(): if instance['image_id'] == FLAGS.vpn_image_id: continue i = {} - internal_id = instance['internal_id'] - ec2_id = internal_id_to_ec2_id(internal_id) + instance_id = instance['id'] + ec2_id = id_to_ec2_id(instance_id) i['instanceId'] = ec2_id i['imageId'] = instance['image_id'] i['instanceState'] = { @@ -716,8 +658,8 @@ class CloudController(object): ec2_id = None if (floating_ip_ref['fixed_ip'] and floating_ip_ref['fixed_ip']['instance']): - internal_id = floating_ip_ref['fixed_ip']['instance']['ec2_id'] - ec2_id = internal_id_to_ec2_id(internal_id) + instance_id = floating_ip_ref['fixed_ip']['instance']['ec2_id'] + ec2_id = id_to_ec2_id(instance_id) address_rv = {'public_ip': address, 'instance_id': ec2_id} if context.user.is_admin(): @@ -728,69 +670,25 @@ class CloudController(object): return {'addressesSet': addresses} def allocate_address(self, context, **kwargs): - # check quota - if quota.allowed_floating_ips(context, 1) < 1: - logging.warn(_("Quota exceeeded for %s, tried to allocate " - "address"), - context.project_id) - raise quota.QuotaError(_("Address quota exceeded. You cannot " - "allocate any more addresses")) - # NOTE(vish): We don't know which network host should get the ip - # when we allocate, so just send it to any one. This - # will probably need to move into a network supervisor - # at some point. - public_ip = rpc.call(context, - FLAGS.network_topic, - {"method": "allocate_floating_ip", - "args": {"project_id": context.project_id}}) + public_ip = self.network_api.allocate_floating_ip(context) return {'addressSet': [{'publicIp': public_ip}]} def release_address(self, context, public_ip, **kwargs): - floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) - # NOTE(vish): We don't know which network host should get the ip - # when we deallocate, so just send it to any one. This - # will probably need to move into a network supervisor - # at some point. - rpc.cast(context, - FLAGS.network_topic, - {"method": "deallocate_floating_ip", - "args": {"floating_address": floating_ip_ref['address']}}) + self.network_api.release_floating_ip(context, public_ip) return {'releaseResponse': ["Address released."]} def associate_address(self, context, instance_id, public_ip, **kwargs): - internal_id = ec2_id_to_internal_id(instance_id) - instance_ref = self.compute_api.get_instance(context, internal_id) - fixed_address = db.instance_get_fixed_address(context, - instance_ref['id']) - floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) - # NOTE(vish): Perhaps we should just pass this on to compute and - # let compute communicate with network. - network_topic = self.compute_api.get_network_topic(context, - internal_id) - rpc.cast(context, - network_topic, - {"method": "associate_floating_ip", - "args": {"floating_address": floating_ip_ref['address'], - "fixed_address": fixed_address}}) + instance_id = ec2_id_to_id(instance_id) + self.compute_api.associate_floating_ip(context, instance_id, public_ip) return {'associateResponse': ["Address associated."]} def disassociate_address(self, context, public_ip, **kwargs): - floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) - # NOTE(vish): Get the topic from the host name of the network of - # the associated fixed ip. - if not floating_ip_ref.get('fixed_ip'): - raise exception.ApiError('Address is not associated.') - host = floating_ip_ref['fixed_ip']['network']['host'] - topic = db.queue_get_for(context, FLAGS.network_topic, host) - rpc.cast(context, - topic, - {"method": "disassociate_floating_ip", - "args": {"floating_address": floating_ip_ref['address']}}) + self.network_api.disassociate_floating_ip(context, public_ip) return {'disassociateResponse': ["Address disassociated."]} def run_instances(self, context, **kwargs): max_count = int(kwargs.get('max_count', 1)) - instances = self.compute_api.create_instances(context, + instances = self.compute_api.create(context, instance_types.get_by_type(kwargs.get('instance_type', None)), kwargs['image_id'], min_count=int(kwargs.get('min_count', max_count)), @@ -798,13 +696,13 @@ class CloudController(object): kernel_id=kwargs.get('kernel_id', None), ramdisk_id=kwargs.get('ramdisk_id'), display_name=kwargs.get('display_name'), - description=kwargs.get('display_description'), + display_description=kwargs.get('display_description'), key_name=kwargs.get('key_name'), user_data=kwargs.get('user_data'), security_group=kwargs.get('security_group'), availability_zone=kwargs.get('placement', {}).get( 'AvailabilityZone'), - generate_hostname=internal_id_to_ec2_id) + generate_hostname=id_to_ec2_id) return self._format_run_instances(context, instances[0]['reservation_id']) @@ -813,27 +711,27 @@ class CloudController(object): instance_id is a kwarg so its name cannot be modified.""" logging.debug("Going to start terminating instances") for ec2_id in instance_id: - internal_id = ec2_id_to_internal_id(ec2_id) - self.compute_api.delete_instance(context, internal_id) + instance_id = ec2_id_to_id(ec2_id) + self.compute_api.delete(context, instance_id) return True def reboot_instances(self, context, instance_id, **kwargs): """instance_id is a list of instance ids""" for ec2_id in instance_id: - internal_id = ec2_id_to_internal_id(ec2_id) - self.compute_api.reboot(context, internal_id) + instance_id = ec2_id_to_id(ec2_id) + self.compute_api.reboot(context, instance_id) return True def rescue_instance(self, context, instance_id, **kwargs): """This is an extension to the normal ec2_api""" - internal_id = ec2_id_to_internal_id(instance_id) - self.compute_api.rescue(context, internal_id) + instance_id = ec2_id_to_id(instance_id) + self.compute_api.rescue(context, instance_id) return True def unrescue_instance(self, context, instance_id, **kwargs): """This is an extension to the normal ec2_api""" - internal_id = ec2_id_to_internal_id(instance_id) - self.compute_api.unrescue(context, internal_id) + instance_id = ec2_id_to_id(instance_id) + self.compute_api.unrescue(context, instance_id) return True def update_instance(self, context, ec2_id, **kwargs): @@ -843,24 +741,8 @@ class CloudController(object): if field in kwargs: changes[field] = kwargs[field] if changes: - internal_id = ec2_id_to_internal_id(ec2_id) - inst = self.compute_api.get_instance(context, internal_id) - db.instance_update(context, inst['id'], kwargs) - return True - - def delete_volume(self, context, volume_id, **kwargs): - # TODO: return error if not authorized - volume_ref = db.volume_get_by_ec2_id(context, volume_id) - if volume_ref['status'] != "available": - raise exception.ApiError(_("Volume status must be available")) - now = datetime.datetime.utcnow() - db.volume_update(context, volume_ref['id'], {'status': 'deleting', - 'terminated_at': now}) - host = volume_ref['host'] - rpc.cast(context, - db.queue_get_for(context, FLAGS.volume_topic, host), - {"method": "delete_volume", - "args": {"volume_id": volume_ref['id']}}) + instance_id = ec2_id_to_id(ec2_id) + self.compute_api.update(context, instance_id, **kwargs) return True def describe_images(self, context, image_id=None, **kwargs): diff --git a/nova/api/openstack/images.py b/nova/api/openstack/images.py index 867ee5a7e..0b239aab8 100644 --- a/nova/api/openstack/images.py +++ b/nova/api/openstack/images.py @@ -17,15 +17,14 @@ from webob import exc +from nova import compute from nova import flags from nova import utils from nova import wsgi import nova.api.openstack -import nova.image.service - from nova.api.openstack import common from nova.api.openstack import faults -from nova.compute import api as compute_api +import nova.image.service FLAGS = flags.FLAGS @@ -131,7 +130,7 @@ class Controller(wsgi.Controller): env = self._deserialize(req.body, req) instance_id = env["image"]["serverId"] name = env["image"]["name"] - return compute_api.ComputeAPI().snapshot(context, instance_id, name) + return compute.API().snapshot(context, instance_id, name) def update(self, req, id): # Users may not modify public images, and that's all that diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py index c5cbe21ef..10679ccb6 100644 --- a/nova/api/openstack/servers.py +++ b/nova/api/openstack/servers.py @@ -20,12 +20,12 @@ import traceback from webob import exc +from nova import compute from nova import exception from nova import wsgi from nova.api.openstack import common from nova.api.openstack import faults from nova.auth import manager as auth_manager -from nova.compute import api as compute_api from nova.compute import instance_types from nova.compute import power_state import nova.api.openstack @@ -51,7 +51,7 @@ def _translate_detail_keys(inst): inst_dict = {} mapped_keys = dict(status='state', imageId='image_id', - flavorId='instance_type', name='display_name', id='internal_id') + flavorId='instance_type', name='display_name', id='id') for k, v in mapped_keys.iteritems(): inst_dict[k] = inst[v] @@ -67,7 +67,7 @@ def _translate_detail_keys(inst): def _translate_keys(inst): """ Coerces into dictionary format, excluding all model attributes save for id and name """ - return dict(server=dict(id=inst['internal_id'], name=inst['display_name'])) + return dict(server=dict(id=inst['id'], name=inst['display_name'])) class Controller(wsgi.Controller): @@ -80,7 +80,7 @@ class Controller(wsgi.Controller): "status", "progress"]}}} def __init__(self): - self.compute_api = compute_api.ComputeAPI() + self.compute_api = compute.API() super(Controller, self).__init__() def index(self, req): @@ -96,8 +96,7 @@ class Controller(wsgi.Controller): entity_maker - either _translate_detail_keys or _translate_keys """ - instance_list = self.compute_api.get_instances( - req.environ['nova.context']) + instance_list = self.compute_api.get_all(req.environ['nova.context']) limited_list = common.limited(instance_list, req) res = [entity_maker(inst)['server'] for inst in limited_list] return dict(servers=res) @@ -105,8 +104,7 @@ class Controller(wsgi.Controller): def show(self, req, id): """ Returns server details by server id """ try: - instance = self.compute_api.get_instance( - req.environ['nova.context'], int(id)) + instance = self.compute_api.get(req.environ['nova.context'], id) return _translate_detail_keys(instance) except exception.NotFound: return faults.Fault(exc.HTTPNotFound()) @@ -114,8 +112,7 @@ class Controller(wsgi.Controller): def delete(self, req, id): """ Destroys a server """ try: - self.compute_api.delete_instance(req.environ['nova.context'], - int(id)) + self.compute_api.delete(req.environ['nova.context'], id) except exception.NotFound: return faults.Fault(exc.HTTPNotFound()) return exc.HTTPAccepted() @@ -128,12 +125,12 @@ class Controller(wsgi.Controller): key_pair = auth_manager.AuthManager.get_key_pairs( req.environ['nova.context'])[0] - instances = self.compute_api.create_instances( + instances = self.compute_api.create( req.environ['nova.context'], instance_types.get_by_flavor_id(env['server']['flavorId']), env['server']['imageId'], display_name=env['server']['name'], - description=env['server']['name'], + display_description=env['server']['name'], key_name=key_pair['name'], key_data=key_pair['public_key']) return _translate_keys(instances[0]) @@ -151,10 +148,8 @@ class Controller(wsgi.Controller): update_dict['display_name'] = inst_dict['server']['name'] try: - ctxt = req.environ['nova.context'] - self.compute_api.update_instance(ctxt, - id, - **update_dict) + self.compute_api.update(req.environ['nova.context'], id, + **update_dict) except exception.NotFound: return faults.Fault(exc.HTTPNotFound()) return exc.HTTPNoContent() @@ -175,6 +170,50 @@ class Controller(wsgi.Controller): return faults.Fault(exc.HTTPUnprocessableEntity()) return exc.HTTPAccepted() + def lock(self, req, id): + """ + lock the instance with id + admin only operation + + """ + context = req.environ['nova.context'] + try: + self.compute_api.lock(context, id) + except: + readable = traceback.format_exc() + logging.error(_("Compute.api::lock %s"), readable) + return faults.Fault(exc.HTTPUnprocessableEntity()) + return exc.HTTPAccepted() + + def unlock(self, req, id): + """ + unlock the instance with id + admin only operation + + """ + context = req.environ['nova.context'] + try: + self.compute_api.unlock(context, id) + except: + readable = traceback.format_exc() + logging.error(_("Compute.api::unlock %s"), readable) + return faults.Fault(exc.HTTPUnprocessableEntity()) + return exc.HTTPAccepted() + + def get_lock(self, req, id): + """ + return the boolean state of (instance with id)'s lock + + """ + context = req.environ['nova.context'] + try: + self.compute_api.get_lock(context, id) + except: + readable = traceback.format_exc() + logging.error(_("Compute.api::get_lock %s"), readable) + return faults.Fault(exc.HTTPUnprocessableEntity()) + return exc.HTTPAccepted() + def pause(self, req, id): """ Permit Admins to Pause the server. """ ctxt = req.environ['nova.context'] @@ -227,4 +266,13 @@ class Controller(wsgi.Controller): def actions(self, req, id): """Permit Admins to retrieve server actions.""" ctxt = req.environ["nova.context"] - return self.compute_api.get_actions(ctxt, id) + items = self.compute_api.get_actions(ctxt, id) + actions = [] + # TODO(jk0): Do not do pre-serialization here once the default + # serializer is updated + for item in items: + actions.append(dict( + created_at=str(item.created_at), + action=item.action, + error=item.error)) + return dict(actions=actions) diff --git a/nova/compute/__init__.py b/nova/compute/__init__.py index a5df2ec1a..b94f971d1 100644 --- a/nova/compute/__init__.py +++ b/nova/compute/__init__.py @@ -16,17 +16,4 @@ # License for the specific language governing permissions and limitations # under the License. -""" -:mod:`nova.compute` -- Compute Nodes using LibVirt -===================================================== - -.. automodule:: nova.compute - :platform: Unix - :synopsis: Thin wrapper around libvirt for VM mgmt. -.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> -.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> -.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> -.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> -.. moduleauthor:: Manish Singh <yosh@gimp.org> -.. moduleauthor:: Andy Smith <andy@anarkystic.com> -""" +from nova.compute.api import API diff --git a/nova/compute/api.py b/nova/compute/api.py index 07c69bd31..015934ee7 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -17,7 +17,7 @@ # under the License. """ -Handles all API requests relating to instances (guest vms). +Handles all requests relating to instances (guest vms). """ import datetime @@ -27,36 +27,41 @@ import time from nova import db from nova import exception from nova import flags +from nova import network from nova import quota from nova import rpc from nova import utils +from nova import volume from nova.compute import instance_types from nova.db import base FLAGS = flags.FLAGS -def generate_default_hostname(internal_id): +def generate_default_hostname(instance_id): """Default function to generate a hostname given an instance reference.""" - return str(internal_id) + return str(instance_id) -class ComputeAPI(base.Base): +class API(base.Base): """API for interacting with the compute manager.""" - def __init__(self, network_manager=None, image_service=None, **kwargs): - if not network_manager: - network_manager = utils.import_object(FLAGS.network_manager) - self.network_manager = network_manager + def __init__(self, image_service=None, network_api=None, volume_api=None, + **kwargs): if not image_service: image_service = utils.import_object(FLAGS.image_service) self.image_service = image_service - super(ComputeAPI, self).__init__(**kwargs) + if not network_api: + network_api = network.API() + self.network_api = network_api + if not volume_api: + volume_api = volume.API() + self.volume_api = volume_api + super(API, self).__init__(**kwargs) def get_network_topic(self, context, instance_id): try: - instance = self.db.instance_get_by_internal_id(context, - instance_id) + instance = self.get(context, instance_id) except exception.NotFound as e: logging.warning("Instance %d was not found in get_network_topic", instance_id) @@ -70,18 +75,18 @@ class ComputeAPI(base.Base): topic, {"method": "get_network_topic", "args": {'fake': 1}}) - def create_instances(self, context, instance_type, image_id, min_count=1, - max_count=1, kernel_id=None, ramdisk_id=None, - display_name='', description='', key_name=None, - key_data=None, security_group='default', - availability_zone=None, - user_data=None, - generate_hostname=generate_default_hostname): - """Create the number of instances requested if quote and + def create(self, context, instance_type, + image_id, kernel_id=None, ramdisk_id=None, + min_count=1, max_count=1, + display_name='', display_description='', + key_name=None, key_data=None, security_group='default', + availability_zone=None, user_data=None, + generate_hostname=generate_default_hostname): + """Create the number of instances requested if quota and other arguments check out ok.""" - num_instances = quota.allowed_instances(context, max_count, - instance_type) + type_data = instance_types.INSTANCE_TYPES[instance_type] + num_instances = quota.allowed_instances(context, max_count, type_data) if num_instances < min_count: logging.warn("Quota exceeeded for %s, tried to run %s instances", context.project_id, min_count) @@ -96,7 +101,7 @@ class ComputeAPI(base.Base): kernel_id = image.get('kernelId', None) if ramdisk_id is None: ramdisk_id = image.get('ramdiskId', None) - #No kernel and ramdisk for raw images + # No kernel and ramdisk for raw images if kernel_id == str(FLAGS.null_kernel): kernel_id = None ramdisk_id = None @@ -124,7 +129,6 @@ class ComputeAPI(base.Base): key_pair = db.key_pair_get(context, context.user_id, key_name) key_data = key_pair['public_key'] - type_data = instance_types.INSTANCE_TYPES[instance_type] base_options = { 'reservation_id': utils.generate_uid('r'), 'image_id': image_id, @@ -139,10 +143,11 @@ class ComputeAPI(base.Base): 'vcpus': type_data['vcpus'], 'local_gb': type_data['local_gb'], 'display_name': display_name, - 'display_description': description, + 'display_description': display_description, 'user_data': user_data or '', 'key_name': key_name, 'key_data': key_data, + 'locked': False, 'availability_zone': availability_zone} elevated = context.elevated() @@ -154,7 +159,6 @@ class ComputeAPI(base.Base): **base_options) instance = self.db.instance_create(context, instance) instance_id = instance['id'] - internal_id = instance['internal_id'] elevated = context.elevated() if not security_groups: @@ -165,11 +169,11 @@ class ComputeAPI(base.Base): security_group_id) # Set sane defaults if not specified - updates = dict(hostname=generate_hostname(internal_id)) + updates = dict(hostname=generate_hostname(instance_id)) if 'display_name' not in instance: - updates['display_name'] = "Server %s" % internal_id + updates['display_name'] = "Server %s" % instance_id - instance = self.update_instance(context, instance_id, **updates) + instance = self.update(context, instance_id, **updates) instances.append(instance) logging.debug(_("Casting to scheduler for %s/%s's instance %s"), @@ -199,7 +203,7 @@ class ComputeAPI(base.Base): 'project_id': context.project_id} db.security_group_create(context, values) - def update_instance(self, context, instance_id, **kwargs): + def update(self, context, instance_id, **kwargs): """Updates the instance in the datastore. :param context: The security context @@ -213,134 +217,204 @@ class ComputeAPI(base.Base): """ return self.db.instance_update(context, instance_id, kwargs) - def delete_instance(self, context, instance_id): - logging.debug("Going to try and terminate %d" % instance_id) + def delete(self, context, instance_id): + logging.debug("Going to try and terminate %s" % instance_id) try: - instance = self.db.instance_get_by_internal_id(context, - instance_id) + instance = self.get(context, instance_id) except exception.NotFound as e: - logging.warning(_("Instance %d was not found during terminate"), + logging.warning(_("Instance %s was not found during terminate"), instance_id) raise e if (instance['state_description'] == 'terminating'): - logging.warning(_("Instance %d is already being terminated"), + logging.warning(_("Instance %s is already being terminated"), instance_id) return - self.update_instance(context, - instance['id'], - state_description='terminating', - state=0, - terminated_at=datetime.datetime.utcnow()) + self.update(context, + instance['id'], + state_description='terminating', + state=0, + terminated_at=datetime.datetime.utcnow()) host = instance['host'] if host: rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "terminate_instance", - "args": {"instance_id": instance['id']}}) + "args": {"instance_id": instance_id}}) else: - self.db.instance_destroy(context, instance['id']) - - def get_instances(self, context, project_id=None): - """Get all instances, possibly filtered by project ID or - user ID. If there is no filter and the context is an admin, - it will retreive all instances in the system.""" + self.db.instance_destroy(context, instance_id) + + def get(self, context, instance_id): + """Get a single instance with the given ID.""" + return self.db.instance_get_by_id(context, instance_id) + + def get_all(self, context, project_id=None, reservation_id=None, + fixed_ip=None): + """Get all instances, possibly filtered by one of the + given parameters. If there is no filter and the context is + an admin, it will retreive all instances in the system.""" + if reservation_id is not None: + return self.db.instance_get_all_by_reservation(context, + reservation_id) + if fixed_ip is not None: + return self.db.fixed_ip_get_instance(context, fixed_ip) if project_id or not context.is_admin: if not context.project: return self.db.instance_get_all_by_user(context, context.user_id) if project_id is None: project_id = context.project_id - return self.db.instance_get_all_by_project(context, project_id) + return self.db.instance_get_all_by_project(context, + project_id) return self.db.instance_get_all(context) - def get_instance(self, context, instance_id): - return self.db.instance_get_by_internal_id(context, instance_id) - def snapshot(self, context, instance_id, name): """Snapshot the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "snapshot_instance", - "args": {"instance_id": instance['id'], "name": name}}) + "args": {"instance_id": instance_id, "name": name}}) def reboot(self, context, instance_id): """Reboot the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "reboot_instance", - "args": {"instance_id": instance['id']}}) + "args": {"instance_id": instance_id}}) def pause(self, context, instance_id): """Pause the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "pause_instance", - "args": {"instance_id": instance['id']}}) + "args": {"instance_id": instance_id}}) def unpause(self, context, instance_id): """Unpause the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "unpause_instance", - "args": {"instance_id": instance['id']}}) + "args": {"instance_id": instance_id}}) def get_diagnostics(self, context, instance_id): """Retrieve diagnostics for the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance["host"] return rpc.call(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "get_diagnostics", - "args": {"instance_id": instance["id"]}}) + "args": {"instance_id": instance_id}}) def get_actions(self, context, instance_id): """Retrieve actions for the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) - return self.db.instance_get_actions(context, instance["id"]) + return self.db.instance_get_actions(context, instance_id) def suspend(self, context, instance_id): """suspend the instance with instance_id""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "suspend_instance", - "args": {"instance_id": instance['id']}}) + "args": {"instance_id": instance_id}}) def resume(self, context, instance_id): """resume the instance with instance_id""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "resume_instance", - "args": {"instance_id": instance['id']}}) + "args": {"instance_id": instance_id}}) def rescue(self, context, instance_id): """Rescue the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "rescue_instance", - "args": {"instance_id": instance['id']}}) + "args": {"instance_id": instance_id}}) def unrescue(self, context, instance_id): """Unrescue the given instance.""" - instance = self.db.instance_get_by_internal_id(context, instance_id) + instance = self.get(context, instance_id) host = instance['host'] rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), {"method": "unrescue_instance", + "args": {"instance_id": instance_id}}) + + def lock(self, context, instance_id): + """ + lock the instance with instance_id + + """ + instance = self.get_instance(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "lock_instance", "args": {"instance_id": instance['id']}}) + + def unlock(self, context, instance_id): + """ + unlock the instance with instance_id + + """ + instance = self.get_instance(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "unlock_instance", + "args": {"instance_id": instance['id']}}) + + def get_lock(self, context, instance_id): + """ + return the boolean state of (instance with instance_id)'s lock + + """ + instance = self.get_instance(context, instance_id) + return instance['locked'] + + def attach_volume(self, context, instance_id, volume_id, device): + if not re.match("^/dev/[a-z]d[a-z]+$", device): + raise exception.ApiError(_("Invalid device specified: %s. " + "Example device: /dev/vdb") % device) + self.volume_api.check_attach(context, volume_id) + instance = self.get(context, instance_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "attach_volume", + "args": {"volume_id": volume_id, + "instance_id": instance_id, + "mountpoint": device}}) + + def detach_volume(self, context, volume_id): + instance = self.db.volume_get_instance(context.elevated(), volume_id) + if not instance: + raise exception.ApiError(_("Volume isn't attached to anything!")) + self.volume_api.check_detach(context, volume_id) + host = instance['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "detach_volume", + "args": {"instance_id": instance['id'], + "volume_id": volume_id}}) + return instance + + def associate_floating_ip(self, context, instance_id, address): + instance = self.get(context, instance_id) + self.network_api.associate_floating_ip(context, address, + instance['fixed_ip']) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index c9aff75ac..d5136eb26 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -36,6 +36,7 @@ terminating it. import datetime import logging +import functools from nova import exception from nova import flags @@ -53,6 +54,38 @@ flags.DEFINE_string('stub_network', False, 'Stub network related code') +def checks_instance_lock(function): + """ + decorator used for preventing action against locked instances + unless, of course, you happen to be admin + + """ + + @functools.wraps(function) + def decorated_function(self, context, instance_id, *args, **kwargs): + + logging.info(_("check_instance_lock: decorating: |%s|"), function) + logging.info(_("check_instance_lock: arguments: |%s| |%s| |%s|"), + self, + context, + instance_id) + locked = self.get_lock(context, instance_id) + admin = context.is_admin + logging.info(_("check_instance_lock: locked: |%s|"), locked) + logging.info(_("check_instance_lock: admin: |%s|"), admin) + + # if admin or unlocked call function otherwise log error + if admin or not locked: + logging.info(_("check_instance_lock: executing: |%s|"), function) + function(self, context, instance_id, *args, **kwargs) + else: + logging.error(_("check_instance_lock: not executing |%s|"), + function) + return False + + return decorated_function + + class ComputeManager(manager.Manager): """Manages the running instances from creation to destruction.""" @@ -158,6 +191,7 @@ class ComputeManager(manager.Manager): self._update_state(context, instance_id) @exception.wrap_exception + @checks_instance_lock def terminate_instance(self, context, instance_id): """Terminate an instance on this machine.""" context = context.elevated() @@ -202,6 +236,7 @@ class ComputeManager(manager.Manager): self.db.instance_destroy(context, instance_id) @exception.wrap_exception + @checks_instance_lock def reboot_instance(self, context, instance_id): """Reboot an instance on this server.""" context = context.elevated() @@ -211,7 +246,7 @@ class ComputeManager(manager.Manager): if instance_ref['state'] != power_state.RUNNING: logging.warn(_('trying to reboot a non-running ' 'instance: %s (state: %s excepted: %s)'), - instance_ref['internal_id'], + instance_id, instance_ref['state'], power_state.RUNNING) @@ -239,20 +274,20 @@ class ComputeManager(manager.Manager): if instance_ref['state'] != power_state.RUNNING: logging.warn(_('trying to snapshot a non-running ' 'instance: %s (state: %s excepted: %s)'), - instance_ref['internal_id'], + instance_id, instance_ref['state'], power_state.RUNNING) self.driver.snapshot(instance_ref, name) @exception.wrap_exception + @checks_instance_lock def rescue_instance(self, context, instance_id): """Rescue an instance on this server.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug(_('instance %s: rescuing'), - instance_ref['internal_id']) + logging.debug(_('instance %s: rescuing'), instance_id) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, @@ -262,13 +297,13 @@ class ComputeManager(manager.Manager): self._update_state(context, instance_id) @exception.wrap_exception + @checks_instance_lock def unrescue_instance(self, context, instance_id): """Rescue an instance on this server.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug(_('instance %s: unrescuing'), - instance_ref['internal_id']) + logging.debug(_('instance %s: unrescuing'), instance_id) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, @@ -282,13 +317,13 @@ class ComputeManager(manager.Manager): self._update_state(context, instance_id) @exception.wrap_exception + @checks_instance_lock def pause_instance(self, context, instance_id): """Pause an instance on this server.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug('instance %s: pausing', - instance_ref['internal_id']) + logging.debug('instance %s: pausing', instance_id) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, @@ -300,13 +335,13 @@ class ComputeManager(manager.Manager): result)) @exception.wrap_exception + @checks_instance_lock def unpause_instance(self, context, instance_id): """Unpause a paused instance on this server.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug('instance %s: unpausing', - instance_ref['internal_id']) + logging.debug('instance %s: unpausing', instance_id) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, @@ -324,16 +359,20 @@ class ComputeManager(manager.Manager): if instance_ref["state"] == power_state.RUNNING: logging.debug(_("instance %s: retrieving diagnostics"), - instance_ref["internal_id"]) + instance_id) return self.driver.get_diagnostics(instance_ref) + @exception.wrap_exception + @checks_instance_lock def suspend_instance(self, context, instance_id): - """suspend the instance with instance_id""" + """ + suspend the instance with instance_id + + """ context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug(_('instance %s: suspending'), - instance_ref['internal_id']) + logging.debug(_('instance %s: suspending'), instance_id) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, 'suspending') @@ -344,12 +383,16 @@ class ComputeManager(manager.Manager): result)) @exception.wrap_exception + @checks_instance_lock def resume_instance(self, context, instance_id): - """resume the suspended instance with instance_id""" + """ + resume the suspended instance with instance_id + + """ context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - logging.debug(_('instance %s: resuming'), instance_ref['internal_id']) + logging.debug(_('instance %s: resuming'), instance_id) self.db.instance_set_state(context, instance_id, power_state.NOSTATE, 'resuming') @@ -360,6 +403,41 @@ class ComputeManager(manager.Manager): result)) @exception.wrap_exception + def lock_instance(self, context, instance_id): + """ + lock the instance with instance_id + + """ + context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) + + logging.debug(_('instance %s: locking'), instance_id) + self.db.instance_update(context, instance_id, {'locked': True}) + + @exception.wrap_exception + def unlock_instance(self, context, instance_id): + """ + unlock the instance with instance_id + + """ + context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) + + logging.debug(_('instance %s: unlocking'), instance_id) + self.db.instance_update(context, instance_id, {'locked': False}) + + @exception.wrap_exception + def get_lock(self, context, instance_id): + """ + return the boolean state of (instance with instance_id)'s lock + + """ + context = context.elevated() + logging.debug(_('instance %s: getting locked state'), instance_id) + instance_ref = self.db.instance_get(context, instance_id) + return instance_ref['locked'] + + @exception.wrap_exception def get_console_output(self, context, instance_id): """Send the console output for an instance.""" context = context.elevated() @@ -369,6 +447,7 @@ class ComputeManager(manager.Manager): return self.driver.get_console_output(instance_ref) @exception.wrap_exception + @checks_instance_lock def attach_volume(self, context, instance_id, volume_id, mountpoint): """Attach a volume to an instance.""" context = context.elevated() @@ -398,6 +477,7 @@ class ComputeManager(manager.Manager): return True @exception.wrap_exception + @checks_instance_lock def detach_volume(self, context, instance_id, volume_id): """Detach a volume from an instance.""" context = context.elevated() diff --git a/nova/db/api.py b/nova/db/api.py index bab698d9d..0fa5eb1e8 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -353,9 +353,9 @@ def instance_get_project_vpn(context, project_id): return IMPL.instance_get_project_vpn(context, project_id) -def instance_get_by_internal_id(context, internal_id): - """Get an instance by internal id.""" - return IMPL.instance_get_by_internal_id(context, internal_id) +def instance_get_by_id(context, instance_id): + """Get an instance by id.""" + return IMPL.instance_get_by_id(context, instance_id) def instance_is_vpn(context, instance_id): @@ -719,7 +719,7 @@ def security_group_get_all(context): def security_group_get(context, security_group_id): - """Get security group by its internal id.""" + """Get security group by its id.""" return IMPL.security_group_get(context, security_group_id) diff --git a/nova/db/sqlalchemy/__init__.py b/nova/db/sqlalchemy/__init__.py index 3288ebd20..22aa1cfe6 100644 --- a/nova/db/sqlalchemy/__init__.py +++ b/nova/db/sqlalchemy/__init__.py @@ -19,6 +19,25 @@ """ SQLAlchemy database backend """ +import logging +import time + +from sqlalchemy.exc import OperationalError + +from nova import flags from nova.db.sqlalchemy import models -models.register_models() + +FLAGS = flags.FLAGS + + +for i in xrange(FLAGS.sql_max_retries): + if i > 0: + time.sleep(FLAGS.sql_retry_interval) + + try: + models.register_models() + break + except OperationalError: + logging.exception(_("Data store is unreachable." + " Trying again in %d seconds.") % FLAGS.sql_retry_interval) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 72369d187..45427597a 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -19,7 +19,6 @@ Implementation of SQLAlchemy backend. """ -import random import warnings from nova import db @@ -606,30 +605,18 @@ def fixed_ip_update(context, address, values): ################### -#TODO(gundlach): instance_create and volume_create are nearly identical -#and should be refactored. I expect there are other copy-and-paste -#functions between the two of them as well. - - @require_context def instance_create(context, values): """Create a new Instance record in the database. context - request context object values - dict containing column values. - 'internal_id' is auto-generated and should not be specified. """ instance_ref = models.Instance() instance_ref.update(values) session = get_session() with session.begin(): - while instance_ref.internal_id == None: - # Instances have integer internal ids. - internal_id = random.randint(0, 2 ** 31 - 1) - if not instance_internal_id_exists(context, internal_id, - session=session): - instance_ref.internal_id = internal_id instance_ref.save(session=session) return instance_ref @@ -751,38 +738,29 @@ def instance_get_project_vpn(context, project_id): @require_context -def instance_get_by_internal_id(context, internal_id): +def instance_get_by_id(context, instance_id): session = get_session() if is_admin_context(context): result = session.query(models.Instance).\ options(joinedload('security_groups')).\ - filter_by(internal_id=internal_id).\ + filter_by(id=instance_id).\ filter_by(deleted=can_read_deleted(context)).\ first() elif is_user_context(context): result = session.query(models.Instance).\ options(joinedload('security_groups')).\ filter_by(project_id=context.project_id).\ - filter_by(internal_id=internal_id).\ + filter_by(id=instance_id).\ filter_by(deleted=False).\ first() if not result: - raise exception.NotFound(_('Instance %s not found') % (internal_id)) + raise exception.NotFound(_('Instance %s not found') % (instance_id)) return result @require_context -def instance_internal_id_exists(context, internal_id, session=None): - if not session: - session = get_session() - return session.query(exists().\ - where(models.Instance.internal_id == internal_id)).\ - one()[0] - - -@require_context def instance_get_fixed_address(context, instance_id): session = get_session() with session.begin(): @@ -862,12 +840,9 @@ def instance_action_create(context, values): def instance_get_actions(context, instance_id): """Return the actions associated to the given instance id""" session = get_session() - actions = {} - for action in session.query(models.InstanceActions).\ + return session.query(models.InstanceActions).\ filter_by(instance_id=instance_id).\ - all(): - actions[action.action] = action.error - return actions + all() ################### @@ -1317,10 +1292,6 @@ def volume_create(context, values): session = get_session() with session.begin(): - while volume_ref.ec2_id == None: - ec2_id = utils.generate_uid('vol') - if not volume_ec2_id_exists(context, ec2_id, session=session): - volume_ref.ec2_id = ec2_id volume_ref.save(session=session) return volume_ref @@ -1418,41 +1389,6 @@ def volume_get_all_by_project(context, project_id): all() -@require_context -def volume_get_by_ec2_id(context, ec2_id): - session = get_session() - result = None - - if is_admin_context(context): - result = session.query(models.Volume).\ - filter_by(ec2_id=ec2_id).\ - filter_by(deleted=can_read_deleted(context)).\ - first() - elif is_user_context(context): - result = session.query(models.Volume).\ - filter_by(project_id=context.project_id).\ - filter_by(ec2_id=ec2_id).\ - filter_by(deleted=False).\ - first() - else: - raise exception.NotAuthorized() - - if not result: - raise exception.NotFound(_('Volume %s not found') % ec2_id) - - return result - - -@require_context -def volume_ec2_id_exists(context, ec2_id, session=None): - if not session: - session = get_session() - - return session.query(exists().\ - where(models.Volume.id == ec2_id)).\ - one()[0] - - @require_admin_context def volume_get_instance(context, volume_id): session = get_session() diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 0c9c387fc..1ed366127 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -164,11 +164,13 @@ class Certificate(BASE, NovaBase): class Instance(BASE, NovaBase): """Represents a guest vm.""" __tablename__ = 'instances' - id = Column(Integer, primary_key=True) - internal_id = Column(Integer, unique=True) + id = Column(Integer, primary_key=True, autoincrement=True) - admin_pass = Column(String(255)) + @property + def name(self): + return "instance-%08x" % self.id + admin_pass = Column(String(255)) user_id = Column(String(255)) project_id = Column(String(255)) @@ -180,10 +182,6 @@ class Instance(BASE, NovaBase): def project(self): return auth.manager.AuthManager().get_project(self.project_id) - @property - def name(self): - return "instance-%d" % self.internal_id - image_id = Column(String(255)) kernel_id = Column(String(255)) ramdisk_id = Column(String(255)) @@ -226,6 +224,8 @@ class Instance(BASE, NovaBase): display_name = Column(String(255)) display_description = Column(String(255)) + locked = Column(Boolean) + # TODO(vish): see Ewan's email about state improvements, probably # should be in a driver base class or some such # vmstate_state = running, halted, suspended, paused @@ -251,8 +251,11 @@ class InstanceActions(BASE, NovaBase): class Volume(BASE, NovaBase): """Represents a block storage device that can be attached to a vm.""" __tablename__ = 'volumes' - id = Column(Integer, primary_key=True) - ec2_id = Column(String(12), unique=True) + id = Column(Integer, primary_key=True, autoincrement=True) + + @property + def name(self): + return "volume-%08x" % self.id user_id = Column(String(255)) project_id = Column(String(255)) @@ -278,10 +281,6 @@ class Volume(BASE, NovaBase): display_name = Column(String(255)) display_description = Column(String(255)) - @property - def name(self): - return self.ec2_id - class Quota(BASE, NovaBase): """Represents quota overrides for a project.""" @@ -545,7 +544,8 @@ def register_models(): """Register Models and create metadata. Called from nova.db.sqlalchemy.__init__ as part of loading the driver, - it will never need to be called explicitly elsewhere. + it will never need to be called explicitly elsewhere unless the + connection is lost and needs to be reestablished. """ from sqlalchemy import create_engine models = (Service, Instance, InstanceActions, diff --git a/nova/flags.py b/nova/flags.py index e872ba217..4b7334927 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -266,6 +266,8 @@ DEFINE_string('sql_connection', DEFINE_string('sql_idle_timeout', '3600', 'timeout for idle sql database connections') +DEFINE_integer('sql_max_retries', 12, 'sql connection attempts') +DEFINE_integer('sql_retry_interval', 10, 'sql connection retry interval') DEFINE_string('compute_manager', 'nova.compute.manager.ComputeManager', 'Manager for compute') diff --git a/nova/image/glance.py b/nova/image/glance.py index cc3192e7c..cb3936df1 100644 --- a/nova/image/glance.py +++ b/nova/image/glance.py @@ -24,7 +24,6 @@ import urlparse import webob.exc -from nova.compute import api as compute_api from nova import utils from nova import flags from nova import exception diff --git a/nova/network/__init__.py b/nova/network/__init__.py index dcc54db09..6eb3e3ef6 100644 --- a/nova/network/__init__.py +++ b/nova/network/__init__.py @@ -16,17 +16,4 @@ # License for the specific language governing permissions and limitations # under the License. -""" -:mod:`nova.network` -- Network Nodes -===================================================== - -.. automodule:: nova.network - :platform: Unix - :synopsis: Network is responsible for managing networking -.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> -.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> -.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> -.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> -.. moduleauthor:: Manish Singh <yosh@gimp.org> -.. moduleauthor:: Andy Smith <andy@anarkystic.com> -""" +from nova.network.api import API diff --git a/nova/network/api.py b/nova/network/api.py new file mode 100644 index 000000000..cbd912047 --- /dev/null +++ b/nova/network/api.py @@ -0,0 +1,87 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Handles all requests relating to instances (guest vms). +""" + +import logging + +from nova import db +from nova import flags +from nova import quota +from nova import rpc +from nova.db import base + +FLAGS = flags.FLAGS + + +class API(base.Base): + """API for interacting with the network manager.""" + + def allocate_floating_ip(self, context): + if quota.allowed_floating_ips(context, 1) < 1: + logging.warn(_("Quota exceeeded for %s, tried to allocate " + "address"), + context.project_id) + raise quota.QuotaError(_("Address quota exceeded. You cannot " + "allocate any more addresses")) + # NOTE(vish): We don't know which network host should get the ip + # when we allocate, so just send it to any one. This + # will probably need to move into a network supervisor + # at some point. + return rpc.call(context, + FLAGS.network_topic, + {"method": "allocate_floating_ip", + "args": {"project_id": context.project_id}}) + + def release_floating_ip(self, context, address): + floating_ip = self.db.floating_ip_get_by_address(context, address) + # NOTE(vish): We don't know which network host should get the ip + # when we deallocate, so just send it to any one. This + # will probably need to move into a network supervisor + # at some point. + rpc.cast(context, + FLAGS.network_topic, + {"method": "deallocate_floating_ip", + "args": {"floating_address": floating_ip['address']}}) + + def associate_floating_ip(self, context, floating_ip, fixed_ip): + if isinstance(fixed_ip, str) or isinstance(fixed_ip, unicode): + fixed_ip = self.db.fixed_ip_get_by_address(context, fixed_ip) + floating_ip = self.db.floating_ip_get_by_address(context, floating_ip) + # NOTE(vish): Perhaps we should just pass this on to compute and + # let compute communicate with network. + host = fixed_ip['network']['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.network_topic, host), + {"method": "associate_floating_ip", + "args": {"floating_address": floating_ip['address'], + "fixed_address": fixed_ip['address']}}) + + def disassociate_floating_ip(self, context, address): + floating_ip = self.db.floating_ip_get_by_address(context, address) + if not floating_ip.get('fixed_ip'): + raise exception.ApiError('Address is not associated.') + # NOTE(vish): Get the topic from the host name of the network of + # the associated fixed ip. + host = floating_ip['fixed_ip']['network']['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.network_topic, host), + {"method": "disassociate_floating_ip", + "args": {"floating_address": floating_ip['address']}}) diff --git a/nova/quota.py b/nova/quota.py index f6ca9f77c..3884eb308 100644 --- a/nova/quota.py +++ b/nova/quota.py @@ -22,7 +22,6 @@ Quotas for instances, volumes, and floating ips from nova import db from nova import exception from nova import flags -from nova.compute import instance_types FLAGS = flags.FLAGS @@ -63,10 +62,9 @@ def allowed_instances(context, num_instances, instance_type): quota = get_quota(context, project_id) allowed_instances = quota['instances'] - used_instances allowed_cores = quota['cores'] - used_cores - type_cores = instance_types.INSTANCE_TYPES[instance_type]['vcpus'] - num_cores = num_instances * type_cores + num_cores = num_instances * instance_type['vcpus'] allowed_instances = min(allowed_instances, - int(allowed_cores // type_cores)) + int(allowed_cores // instance_type['vcpus'])) return min(num_instances, allowed_instances) diff --git a/nova/service.py b/nova/service.py index f1f90742f..7203430c6 100644 --- a/nova/service.py +++ b/nova/service.py @@ -24,17 +24,21 @@ import inspect import logging import os import sys +import time from eventlet import event from eventlet import greenthread from eventlet import greenpool +from sqlalchemy.exc import OperationalError + from nova import context from nova import db from nova import exception from nova import flags from nova import rpc from nova import utils +from nova.db.sqlalchemy import models FLAGS = flags.FLAGS @@ -204,6 +208,14 @@ class Service(object): self.model_disconnected = True logging.exception(_("model server went away")) + try: + models.register_models() + except OperationalError: + logging.exception(_("Data store is unreachable." + " Trying again in %d seconds.") % + FLAGS.sql_retry_interval) + time.sleep(FLAGS.sql_retry_interval) + def serve(*services): argv = FLAGS(sys.argv) diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index 961431154..291a0e468 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -113,7 +113,7 @@ def stub_out_networking(stubs): def stub_out_compute_api_snapshot(stubs): def snapshot(self, context, instance_id, name): return 123 - stubs.Set(nova.compute.api.ComputeAPI, 'snapshot', snapshot) + stubs.Set(nova.compute.API, 'snapshot', snapshot) def stub_out_glance(stubs, initial_fixtures=[]): diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py index 70ff714e6..6e611a55d 100644 --- a/nova/tests/api/openstack/test_servers.py +++ b/nova/tests/api/openstack/test_servers.py @@ -56,8 +56,8 @@ def instance_address(context, instance_id): def stub_instance(id, user_id=1): - return Instance(id=int(id) + 123456, state=0, image_id=10, user_id=user_id, - display_name='server%s' % id, internal_id=id) + return Instance(id=id, state=0, image_id=10, user_id=user_id, + display_name='server%s' % id) def fake_compute_api(cls, req, id): @@ -76,8 +76,7 @@ class ServersTest(unittest.TestCase): fakes.stub_out_key_pair_funcs(self.stubs) fakes.stub_out_image_service(self.stubs) self.stubs.Set(nova.db.api, 'instance_get_all', return_servers) - self.stubs.Set(nova.db.api, 'instance_get_by_internal_id', - return_server) + self.stubs.Set(nova.db.api, 'instance_get_by_id', return_server) self.stubs.Set(nova.db.api, 'instance_get_all_by_user', return_servers) self.stubs.Set(nova.db.api, 'instance_add_security_group', @@ -87,18 +86,12 @@ class ServersTest(unittest.TestCase): instance_address) self.stubs.Set(nova.db.api, 'instance_get_floating_address', instance_address) - self.stubs.Set(nova.compute.api.ComputeAPI, 'pause', - fake_compute_api) - self.stubs.Set(nova.compute.api.ComputeAPI, 'unpause', - fake_compute_api) - self.stubs.Set(nova.compute.api.ComputeAPI, 'suspend', - fake_compute_api) - self.stubs.Set(nova.compute.api.ComputeAPI, 'resume', - fake_compute_api) - self.stubs.Set(nova.compute.api.ComputeAPI, "get_diagnostics", - fake_compute_api) - self.stubs.Set(nova.compute.api.ComputeAPI, "get_actions", - fake_compute_api) + self.stubs.Set(nova.compute.API, 'pause', fake_compute_api) + self.stubs.Set(nova.compute.API, 'unpause', fake_compute_api) + self.stubs.Set(nova.compute.API, 'suspend', fake_compute_api) + self.stubs.Set(nova.compute.API, 'resume', fake_compute_api) + self.stubs.Set(nova.compute.API, "get_diagnostics", fake_compute_api) + self.stubs.Set(nova.compute.API, "get_actions", fake_compute_api) self.allow_admin = FLAGS.allow_admin_api def tearDown(self): @@ -109,7 +102,7 @@ class ServersTest(unittest.TestCase): req = webob.Request.blank('/v1.0/servers/1') res = req.get_response(nova.api.API('os')) res_dict = json.loads(res.body) - self.assertEqual(res_dict['server']['id'], 1) + self.assertEqual(res_dict['server']['id'], '1') self.assertEqual(res_dict['server']['name'], 'server1') def test_get_server_list(self): @@ -126,7 +119,7 @@ class ServersTest(unittest.TestCase): def test_create_instance(self): def instance_create(context, inst): - return {'id': 1, 'internal_id': 1, 'display_name': ''} + return {'id': '1', 'display_name': ''} def server_update(context, id, params): return instance_create(context, id) diff --git a/nova/tests/hyperv_unittest.py b/nova/tests/hyperv_unittest.py new file mode 100644 index 000000000..3980ae3cb --- /dev/null +++ b/nova/tests/hyperv_unittest.py @@ -0,0 +1,71 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2010 Cloud.com, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For Hyper-V driver +""" + +import random + +from nova import context +from nova import db +from nova import flags +from nova import test +from nova.auth import manager +from nova.virt import hyperv + +FLAGS = flags.FLAGS +FLAGS.connection_type = 'hyperv' + + +class HyperVTestCase(test.TestCase): + """Test cases for the Hyper-V driver""" + def setUp(self): + super(HyperVTestCase, self).setUp() + self.manager = manager.AuthManager() + self.user = self.manager.create_user('fake', 'fake', 'fake', + admin=True) + self.project = self.manager.create_project('fake', 'fake', 'fake') + self.context = context.RequestContext(self.user, self.project) + + def test_create_destroy(self): + """Create a VM and destroy it""" + instance = {'internal_id': random.randint(1, 1000000), + 'memory_mb': '1024', + 'mac_address': '02:12:34:46:56:67', + 'vcpus': 2, + 'project_id': 'fake', + 'instance_type': 'm1.small'} + instance_ref = db.instance_create(self.context, instance) + + conn = hyperv.get_connection(False) + conn._create_vm(instance_ref) # pylint: disable-msg=W0212 + found = [n for n in conn.list_instances() + if n == instance_ref['name']] + self.assertTrue(len(found) == 1) + info = conn.get_info(instance_ref['name']) + #Unfortunately since the vm is not running at this point, + #we cannot obtain memory information from get_info + self.assertEquals(info['num_cpu'], instance_ref['vcpus']) + + conn.destroy(instance_ref) + found = [n for n in conn.list_instances() + if n == instance_ref['name']] + self.assertTrue(len(found) == 0) + + def tearDown(self): + super(HyperVTestCase, self).tearDown() + self.manager.delete_project(self.project) + self.manager.delete_user(self.user) diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index 70d2c44da..ba58fab59 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -106,7 +106,7 @@ class CloudTestCase(test.TestCase): self.cloud.allocate_address(self.context) inst = db.instance_create(self.context, {'host': FLAGS.host}) fixed = self.network.allocate_fixed_ip(self.context, inst['id']) - ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id']) + ec2_id = cloud.id_to_ec2_id(inst['id']) self.cloud.associate_address(self.context, instance_id=ec2_id, public_ip=address) @@ -127,9 +127,9 @@ class CloudTestCase(test.TestCase): result = self.cloud.describe_volumes(self.context) self.assertEqual(len(result['volumeSet']), 2) result = self.cloud.describe_volumes(self.context, - volume_id=[vol2['ec2_id']]) + volume_id=[vol2['id']]) self.assertEqual(len(result['volumeSet']), 1) - self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id']) + self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['id']) db.volume_destroy(self.context, vol1['id']) db.volume_destroy(self.context, vol2['id']) @@ -140,15 +140,16 @@ class CloudTestCase(test.TestCase): kwargs = {'image_id': image_id, 'instance_type': instance_type, 'max_count': max_count} - rv = yield self.cloud.run_instances(self.context, **kwargs) + rv = self.cloud.run_instances(self.context, **kwargs) + print rv instance_id = rv['instancesSet'][0]['instanceId'] - output = yield self.cloud.get_console_output(context=self.context, + output = self.cloud.get_console_output(context=self.context, instance_id=[instance_id]) self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT') # TODO(soren): We need this until we can stop polling in the rpc code # for unit tests. greenthread.sleep(0.3) - rv = yield self.cloud.terminate_instances(self.context, [instance_id]) + rv = self.cloud.terminate_instances(self.context, [instance_id]) def test_key_generation(self): result = self._create_key('test') @@ -186,7 +187,7 @@ class CloudTestCase(test.TestCase): kwargs = {'image_id': image_id, 'instance_type': instance_type, 'max_count': max_count} - rv = yield self.cloud.run_instances(self.context, **kwargs) + rv = self.cloud.run_instances(self.context, **kwargs) # TODO: check for proper response instance_id = rv['reservationSet'][0].keys()[0] instance = rv['reservationSet'][0][instance_id][0] @@ -209,7 +210,7 @@ class CloudTestCase(test.TestCase): for instance in reservations[reservations.keys()[0]]: instance_id = instance['instance_id'] logging.debug("Terminating instance %s" % instance_id) - rv = yield self.compute.terminate_instance(instance_id) + rv = self.compute.terminate_instance(instance_id) def test_instance_update_state(self): def instance(num): @@ -296,7 +297,7 @@ class CloudTestCase(test.TestCase): def test_update_of_instance_display_fields(self): inst = db.instance_create(self.context, {}) - ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id']) + ec2_id = cloud.id_to_ec2_id(inst['id']) self.cloud.update_instance(self.context, ec2_id, display_name='c00l 1m4g3') inst = db.instance_get(self.context, inst['id']) diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index 1fb9143f1..534493dfe 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -22,6 +22,7 @@ Tests For Compute import datetime import logging +from nova import compute from nova import context from nova import db from nova import exception @@ -29,7 +30,6 @@ from nova import flags from nova import test from nova import utils from nova.auth import manager -from nova.compute import api as compute_api FLAGS = flags.FLAGS @@ -44,7 +44,7 @@ class ComputeTestCase(test.TestCase): stub_network=True, network_manager='nova.network.manager.FlatManager') self.compute = utils.import_object(FLAGS.compute_manager) - self.compute_api = compute_api.ComputeAPI() + self.compute_api = compute.API() self.manager = manager.AuthManager() self.user = self.manager.create_user('fake', 'fake', 'fake') self.project = self.manager.create_project('fake', 'fake', 'fake') @@ -72,7 +72,7 @@ class ComputeTestCase(test.TestCase): """Verify that an instance cannot be created without a display_name.""" cases = [dict(), dict(display_name=None)] for instance in cases: - ref = self.compute_api.create_instances(self.context, + ref = self.compute_api.create(self.context, FLAGS.default_instance_type, None, **instance) try: self.assertNotEqual(ref[0].display_name, None) @@ -80,13 +80,13 @@ class ComputeTestCase(test.TestCase): db.instance_destroy(self.context, ref[0]['id']) def test_create_instance_associates_security_groups(self): - """Make sure create_instances associates security groups""" + """Make sure create associates security groups""" values = {'name': 'default', 'description': 'default', 'user_id': self.user.id, 'project_id': self.project.id} group = db.security_group_create(self.context, values) - ref = self.compute_api.create_instances(self.context, + ref = self.compute_api.create(self.context, FLAGS.default_instance_type, None, security_group=['default']) try: self.assertEqual(len(ref[0]['security_groups']), 1) @@ -178,3 +178,22 @@ class ComputeTestCase(test.TestCase): self.context, instance_id) self.compute.terminate_instance(self.context, instance_id) + + def test_lock(self): + """ensure locked instance cannot be changed""" + instance_id = self._create_instance() + self.compute.run_instance(self.context, instance_id) + + non_admin_context = context.RequestContext(None, None, False, False) + + # decorator should return False (fail) with locked nonadmin context + self.compute.lock_instance(self.context, instance_id) + ret_val = self.compute.reboot_instance(non_admin_context, instance_id) + self.assertEqual(ret_val, False) + + # decorator should return None (success) with unlocked nonadmin context + self.compute.unlock_instance(self.context, instance_id) + ret_val = self.compute.reboot_instance(non_admin_context, instance_id) + self.assertEqual(ret_val, None) + + self.compute.terminate_instance(self.context, instance_id) diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py index 8cf2a5e54..b5f9f30ef 100644 --- a/nova/tests/test_quota.py +++ b/nova/tests/test_quota.py @@ -27,6 +27,7 @@ from nova import test from nova import utils from nova.auth import manager from nova.api.ec2 import cloud +from nova.compute import instance_types FLAGS = flags.FLAGS @@ -78,14 +79,17 @@ class QuotaTestCase(test.TestCase): def test_quota_overrides(self): """Make sure overriding a projects quotas works""" - num_instances = quota.allowed_instances(self.context, 100, 'm1.small') + num_instances = quota.allowed_instances(self.context, 100, + instance_types.INSTANCE_TYPES['m1.small']) self.assertEqual(num_instances, 2) db.quota_create(self.context, {'project_id': self.project.id, 'instances': 10}) - num_instances = quota.allowed_instances(self.context, 100, 'm1.small') + num_instances = quota.allowed_instances(self.context, 100, + instance_types.INSTANCE_TYPES['m1.small']) self.assertEqual(num_instances, 4) db.quota_update(self.context, self.project.id, {'cores': 100}) - num_instances = quota.allowed_instances(self.context, 100, 'm1.small') + num_instances = quota.allowed_instances(self.context, 100, + instance_types.INSTANCE_TYPES['m1.small']) self.assertEqual(num_instances, 10) db.quota_destroy(self.context, self.project.id) diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index c95a53af3..ec9462ada 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -79,8 +79,8 @@ class XenAPIVolumeTestCase(test.TestCase): helper = volume_utils.VolumeHelper helper.XenAPI = session.get_imported_xenapi() vol = self._create_volume() - info = helper.parse_volume_info(vol['ec2_id'], '/dev/sdc') - label = 'SR-%s' % vol['ec2_id'] + info = helper.parse_volume_info(vol['id'], '/dev/sdc') + label = 'SR-%s' % vol['id'] description = 'Test-SR' sr_ref = helper.create_iscsi_storage(session, info, label, description) srs = xenapi_fake.get_all('SR') @@ -97,7 +97,7 @@ class XenAPIVolumeTestCase(test.TestCase): # oops, wrong mount point! self.assertRaises(volume_utils.StorageError, helper.parse_volume_info, - vol['ec2_id'], + vol['id'], '/dev/sd') db.volume_destroy(context.get_admin_context(), vol['id']) @@ -108,8 +108,7 @@ class XenAPIVolumeTestCase(test.TestCase): volume = self._create_volume() instance = db.instance_create(self.values) xenapi_fake.create_vm(instance.name, 'Running') - result = conn.attach_volume(instance.name, volume['ec2_id'], - '/dev/sdc') + result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc') def check(): # check that the VM has a VBD attached to it @@ -134,7 +133,7 @@ class XenAPIVolumeTestCase(test.TestCase): self.assertRaises(Exception, conn.attach_volume, instance.name, - volume['ec2_id'], + volume['id'], '/dev/sdc') def tearDown(self): @@ -250,15 +249,16 @@ class XenAPIVMTestCase(test.TestCase): def _create_instance(self): """Creates and spawns a test instance""" - values = {'name': 1, 'id': 1, - 'project_id': self.project.id, - 'user_id': self.user.id, - 'image_id': 1, - 'kernel_id': 2, - 'ramdisk_id': 3, - 'instance_type': 'm1.large', - 'mac_address': 'aa:bb:cc:dd:ee:ff' - } + values = { + 'name': 1, + 'id': 1, + 'project_id': self.project.id, + 'user_id': self.user.id, + 'image_id': 1, + 'kernel_id': 2, + 'ramdisk_id': 3, + 'instance_type': 'm1.large', + 'mac_address': 'aa:bb:cc:dd:ee:ff'} instance = db.instance_create(values) self.conn.spawn(instance) return instance diff --git a/nova/virt/connection.py b/nova/virt/connection.py index 61e99944e..846423afe 100644 --- a/nova/virt/connection.py +++ b/nova/virt/connection.py @@ -26,6 +26,7 @@ from nova import flags from nova.virt import fake from nova.virt import libvirt_conn from nova.virt import xenapi_conn +from nova.virt import hyperv FLAGS = flags.FLAGS @@ -62,6 +63,8 @@ def get_connection(read_only=False): conn = libvirt_conn.get_connection(read_only) elif t == 'xenapi': conn = xenapi_conn.get_connection(read_only) + elif t == 'hyperv': + conn = hyperv.get_connection(read_only) else: raise Exception('Unknown connection type "%s"' % t) diff --git a/nova/virt/hyperv.py b/nova/virt/hyperv.py new file mode 100644 index 000000000..4b9f6f946 --- /dev/null +++ b/nova/virt/hyperv.py @@ -0,0 +1,459 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Cloud.com, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A connection to Hyper-V . +Uses Windows Management Instrumentation (WMI) calls to interact with Hyper-V +Hyper-V WMI usage: + http://msdn.microsoft.com/en-us/library/cc723875%28v=VS.85%29.aspx +The Hyper-V object model briefly: + The physical computer and its hosted virtual machines are each represented + by the Msvm_ComputerSystem class. + + Each virtual machine is associated with a + Msvm_VirtualSystemGlobalSettingData (vs_gs_data) instance and one or more + Msvm_VirtualSystemSettingData (vmsetting) instances. For each vmsetting + there is a series of Msvm_ResourceAllocationSettingData (rasd) objects. + The rasd objects describe the settings for each device in a VM. + Together, the vs_gs_data, vmsettings and rasds describe the configuration + of the virtual machine. + + Creating new resources such as disks and nics involves cloning a default + rasd object and appropriately modifying the clone and calling the + AddVirtualSystemResources WMI method + Changing resources such as memory uses the ModifyVirtualSystemResources + WMI method + +Using the Python WMI library: + Tutorial: + http://timgolden.me.uk/python/wmi/tutorial.html + Hyper-V WMI objects can be retrieved simply by using the class name + of the WMI object and optionally specifying a column to filter the + result set. More complex filters can be formed using WQL (sql-like) + queries. + The parameters and return tuples of WMI method calls can gleaned by + examining the doc string. For example: + >>> vs_man_svc.ModifyVirtualSystemResources.__doc__ + ModifyVirtualSystemResources (ComputerSystem, ResourceSettingData[]) + => (Job, ReturnValue)' + When passing setting data (ResourceSettingData) to the WMI method, + an XML representation of the data is passed in using GetText_(1). + Available methods on a service can be determined using method.keys(): + >>> vs_man_svc.methods.keys() + vmsettings and rasds for a vm can be retrieved using the 'associators' + method with the appropriate return class. + Long running WMI commands generally return a Job (an instance of + Msvm_ConcreteJob) whose state can be polled to determine when it finishes + +""" + +import os +import logging +import time + +from nova import exception +from nova import flags +from nova.auth import manager +from nova.compute import power_state +from nova.virt import images + +wmi = None + + +FLAGS = flags.FLAGS + + +HYPERV_POWER_STATE = { + 3: power_state.SHUTDOWN, + 2: power_state.RUNNING, + 32768: power_state.PAUSED, +} + + +REQ_POWER_STATE = { + 'Enabled': 2, + 'Disabled': 3, + 'Reboot': 10, + 'Reset': 11, + 'Paused': 32768, + 'Suspended': 32769 +} + + +WMI_JOB_STATUS_STARTED = 4096 +WMI_JOB_STATE_RUNNING = 4 +WMI_JOB_STATE_COMPLETED = 7 + + +def get_connection(_): + global wmi + if wmi is None: + wmi = __import__('wmi') + return HyperVConnection() + + +class HyperVConnection(object): + def __init__(self): + self._conn = wmi.WMI(moniker='//./root/virtualization') + self._cim_conn = wmi.WMI(moniker='//./root/cimv2') + + def init_host(self): + #FIXME(chiradeep): implement this + logging.debug(_('In init host')) + pass + + def list_instances(self): + """ Return the names of all the instances known to Hyper-V. """ + vms = [v.ElementName \ + for v in self._conn.Msvm_ComputerSystem(['ElementName'])] + return vms + + def spawn(self, instance): + """ Create a new VM and start it.""" + vm = self._lookup(instance.name) + if vm is not None: + raise exception.Duplicate(_('Attempt to create duplicate vm %s') % + instance.name) + + user = manager.AuthManager().get_user(instance['user_id']) + project = manager.AuthManager().get_project(instance['project_id']) + #Fetch the file, assume it is a VHD file. + base_vhd_filename = os.path.join(FLAGS.instances_path, + instance.name) + vhdfile = "%s.vhd" % (base_vhd_filename) + images.fetch(instance['image_id'], vhdfile, user, project) + + try: + self._create_vm(instance) + + self._create_disk(instance['name'], vhdfile) + self._create_nic(instance['name'], instance['mac_address']) + + logging.debug(_('Starting VM %s '), instance.name) + self._set_vm_state(instance['name'], 'Enabled') + logging.info(_('Started VM %s '), instance.name) + except Exception as exn: + logging.error(_('spawn vm failed: %s'), exn) + self.destroy(instance) + + def _create_vm(self, instance): + """Create a VM but don't start it. """ + vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0] + + vs_gs_data = self._conn.Msvm_VirtualSystemGlobalSettingData.new() + vs_gs_data.ElementName = instance['name'] + (job, ret_val) = vs_man_svc.DefineVirtualSystem( + [], None, vs_gs_data.GetText_(1))[1:] + if ret_val == WMI_JOB_STATUS_STARTED: + success = self._check_job_status(job) + else: + success = (ret_val == 0) + + if not success: + raise Exception(_('Failed to create VM %s'), instance.name) + + logging.debug(_('Created VM %s...'), instance.name) + vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0] + + vmsettings = vm.associators( + wmi_result_class='Msvm_VirtualSystemSettingData') + vmsetting = [s for s in vmsettings + if s.SettingType == 3][0] # avoid snapshots + memsetting = vmsetting.associators( + wmi_result_class='Msvm_MemorySettingData')[0] + #No Dynamic Memory, so reservation, limit and quantity are identical. + mem = long(str(instance['memory_mb'])) + memsetting.VirtualQuantity = mem + memsetting.Reservation = mem + memsetting.Limit = mem + + (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources( + vm.path_(), [memsetting.GetText_(1)]) + logging.debug(_('Set memory for vm %s...'), instance.name) + procsetting = vmsetting.associators( + wmi_result_class='Msvm_ProcessorSettingData')[0] + vcpus = long(instance['vcpus']) + procsetting.VirtualQuantity = vcpus + procsetting.Reservation = vcpus + procsetting.Limit = vcpus + + (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources( + vm.path_(), [procsetting.GetText_(1)]) + logging.debug(_('Set vcpus for vm %s...'), instance.name) + + def _create_disk(self, vm_name, vhdfile): + """Create a disk and attach it to the vm""" + logging.debug(_('Creating disk for %s by attaching disk file %s'), + vm_name, vhdfile) + #Find the IDE controller for the vm. + vms = self._conn.MSVM_ComputerSystem(ElementName=vm_name) + vm = vms[0] + vmsettings = vm.associators( + wmi_result_class='Msvm_VirtualSystemSettingData') + rasds = vmsettings[0].associators( + wmi_result_class='MSVM_ResourceAllocationSettingData') + ctrller = [r for r in rasds + if r.ResourceSubType == 'Microsoft Emulated IDE Controller'\ + and r.Address == "0"] + #Find the default disk drive object for the vm and clone it. + diskdflt = self._conn.query( + "SELECT * FROM Msvm_ResourceAllocationSettingData \ + WHERE ResourceSubType LIKE 'Microsoft Synthetic Disk Drive'\ + AND InstanceID LIKE '%Default%'")[0] + diskdrive = self._clone_wmi_obj( + 'Msvm_ResourceAllocationSettingData', diskdflt) + #Set the IDE ctrller as parent. + diskdrive.Parent = ctrller[0].path_() + diskdrive.Address = 0 + #Add the cloned disk drive object to the vm. + new_resources = self._add_virt_resource(diskdrive, vm) + if new_resources is None: + raise Exception(_('Failed to add diskdrive to VM %s'), + vm_name) + diskdrive_path = new_resources[0] + logging.debug(_('New disk drive path is %s'), diskdrive_path) + #Find the default VHD disk object. + vhddefault = self._conn.query( + "SELECT * FROM Msvm_ResourceAllocationSettingData \ + WHERE ResourceSubType LIKE 'Microsoft Virtual Hard Disk' AND \ + InstanceID LIKE '%Default%' ")[0] + + #Clone the default and point it to the image file. + vhddisk = self._clone_wmi_obj( + 'Msvm_ResourceAllocationSettingData', vhddefault) + #Set the new drive as the parent. + vhddisk.Parent = diskdrive_path + vhddisk.Connection = [vhdfile] + + #Add the new vhd object as a virtual hard disk to the vm. + new_resources = self._add_virt_resource(vhddisk, vm) + if new_resources is None: + raise Exception(_('Failed to add vhd file to VM %s'), + vm_name) + logging.info(_('Created disk for %s'), vm_name) + + def _create_nic(self, vm_name, mac): + """Create a (emulated) nic and attach it to the vm""" + logging.debug(_('Creating nic for %s '), vm_name) + #Find the vswitch that is connected to the physical nic. + vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name) + extswitch = self._find_external_network() + vm = vms[0] + switch_svc = self._conn.Msvm_VirtualSwitchManagementService()[0] + #Find the default nic and clone it to create a new nic for the vm. + #Use Msvm_SyntheticEthernetPortSettingData for Windows or Linux with + #Linux Integration Components installed. + emulatednics_data = self._conn.Msvm_EmulatedEthernetPortSettingData() + default_nic_data = [n for n in emulatednics_data + if n.InstanceID.rfind('Default') > 0] + new_nic_data = self._clone_wmi_obj( + 'Msvm_EmulatedEthernetPortSettingData', + default_nic_data[0]) + #Create a port on the vswitch. + (new_port, ret_val) = switch_svc.CreateSwitchPort(vm_name, vm_name, + "", extswitch.path_()) + if ret_val != 0: + logging.error(_('Failed creating a port on the external vswitch')) + raise Exception(_('Failed creating port for %s'), + vm_name) + logging.debug(_("Created switch port %s on switch %s"), + vm_name, extswitch.path_()) + #Connect the new nic to the new port. + new_nic_data.Connection = [new_port] + new_nic_data.ElementName = vm_name + ' nic' + new_nic_data.Address = ''.join(mac.split(':')) + new_nic_data.StaticMacAddress = 'TRUE' + #Add the new nic to the vm. + new_resources = self._add_virt_resource(new_nic_data, vm) + if new_resources is None: + raise Exception(_('Failed to add nic to VM %s'), + vm_name) + logging.info(_("Created nic for %s "), vm_name) + + def _add_virt_resource(self, res_setting_data, target_vm): + """Add a new resource (disk/nic) to the VM""" + vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0] + (job, new_resources, ret_val) = vs_man_svc.\ + AddVirtualSystemResources([res_setting_data.GetText_(1)], + target_vm.path_()) + success = True + if ret_val == WMI_JOB_STATUS_STARTED: + success = self._check_job_status(job) + else: + success = (ret_val == 0) + if success: + return new_resources + else: + return None + + #TODO: use the reactor to poll instead of sleep + def _check_job_status(self, jobpath): + """Poll WMI job state for completion""" + #Jobs have a path of the form: + #\\WIN-P5IG7367DAG\root\virtualization:Msvm_ConcreteJob.InstanceID= + #"8A496B9C-AF4D-4E98-BD3C-1128CD85320D" + inst_id = jobpath.split('=')[1].strip('"') + jobs = self._conn.Msvm_ConcreteJob(InstanceID=inst_id) + if len(jobs) == 0: + return False + job = jobs[0] + while job.JobState == WMI_JOB_STATE_RUNNING: + time.sleep(0.1) + job = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)[0] + if job.JobState != WMI_JOB_STATE_COMPLETED: + logging.debug(_("WMI job failed: %s"), job.ErrorSummaryDescription) + return False + logging.debug(_("WMI job succeeded: %s, Elapsed=%s "), job.Description, + job.ElapsedTime) + return True + + def _find_external_network(self): + """Find the vswitch that is connected to the physical nic. + Assumes only one physical nic on the host + """ + #If there are no physical nics connected to networks, return. + bound = self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE') + if len(bound) == 0: + return None + return self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')[0]\ + .associators(wmi_result_class='Msvm_SwitchLANEndpoint')[0]\ + .associators(wmi_result_class='Msvm_SwitchPort')[0]\ + .associators(wmi_result_class='Msvm_VirtualSwitch')[0] + + def _clone_wmi_obj(self, wmi_class, wmi_obj): + """Clone a WMI object""" + cl = self._conn.__getattr__(wmi_class) # get the class + newinst = cl.new() + #Copy the properties from the original. + for prop in wmi_obj._properties: + newinst.Properties_.Item(prop).Value =\ + wmi_obj.Properties_.Item(prop).Value + return newinst + + def reboot(self, instance): + """Reboot the specified instance.""" + vm = self._lookup(instance.name) + if vm is None: + raise exception.NotFound('instance not present %s' % instance.name) + self._set_vm_state(instance.name, 'Reboot') + + def destroy(self, instance): + """Destroy the VM. Also destroy the associated VHD disk files""" + logging.debug(_("Got request to destroy vm %s"), instance.name) + vm = self._lookup(instance.name) + if vm is None: + return + vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0] + vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0] + #Stop the VM first. + self._set_vm_state(instance.name, 'Disabled') + vmsettings = vm.associators( + wmi_result_class='Msvm_VirtualSystemSettingData') + rasds = vmsettings[0].associators( + wmi_result_class='MSVM_ResourceAllocationSettingData') + disks = [r for r in rasds \ + if r.ResourceSubType == 'Microsoft Virtual Hard Disk'] + diskfiles = [] + #Collect disk file information before destroying the VM. + for disk in disks: + diskfiles.extend([c for c in disk.Connection]) + #Nuke the VM. Does not destroy disks. + (job, ret_val) = vs_man_svc.DestroyVirtualSystem(vm.path_()) + if ret_val == WMI_JOB_STATUS_STARTED: + success = self._check_job_status(job) + elif ret_val == 0: + success = True + if not success: + raise Exception(_('Failed to destroy vm %s') % instance.name) + #Delete associated vhd disk files. + for disk in diskfiles: + vhdfile = self._cim_conn.CIM_DataFile(Name=disk) + for vf in vhdfile: + vf.Delete() + logging.debug(_("Del: disk %s vm %s"), vhdfile, instance.name) + + def get_info(self, instance_id): + """Get information about the VM""" + vm = self._lookup(instance_id) + if vm is None: + raise exception.NotFound('instance not present %s' % instance_id) + vm = self._conn.Msvm_ComputerSystem(ElementName=instance_id)[0] + vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0] + vmsettings = vm.associators( + wmi_result_class='Msvm_VirtualSystemSettingData') + settings_paths = [v.path_() for v in vmsettings] + #See http://msdn.microsoft.com/en-us/library/cc160706%28VS.85%29.aspx + summary_info = vs_man_svc.GetSummaryInformation( + [4, 100, 103, 105], settings_paths)[1] + info = summary_info[0] + logging.debug(_("Got Info for vm %s: state=%s, mem=%s, num_cpu=%s, \ + cpu_time=%s"), instance_id, + str(HYPERV_POWER_STATE[info.EnabledState]), + str(info.MemoryUsage), + str(info.NumberOfProcessors), + str(info.UpTime)) + + return {'state': HYPERV_POWER_STATE[info.EnabledState], + 'max_mem': info.MemoryUsage, + 'mem': info.MemoryUsage, + 'num_cpu': info.NumberOfProcessors, + 'cpu_time': info.UpTime} + + def _lookup(self, i): + vms = self._conn.Msvm_ComputerSystem(ElementName=i) + n = len(vms) + if n == 0: + return None + elif n > 1: + raise Exception(_('duplicate name found: %s') % i) + else: + return vms[0].ElementName + + def _set_vm_state(self, vm_name, req_state): + """Set the desired state of the VM""" + vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name) + if len(vms) == 0: + return False + (job, ret_val) = vms[0].RequestStateChange(REQ_POWER_STATE[req_state]) + success = False + if ret_val == WMI_JOB_STATUS_STARTED: + success = self._check_job_status(job) + elif ret_val == 0: + success = True + elif ret_val == 32775: + #Invalid state for current operation. Typically means it is + #already in the state requested + success = True + if success: + logging.info(_("Successfully changed vm state of %s to %s"), + vm_name, req_state) + else: + logging.error(_("Failed to change vm state of %s to %s"), + vm_name, req_state) + raise Exception(_("Failed to change vm state of %s to %s"), + vm_name, req_state) + + def attach_volume(self, instance_name, device_path, mountpoint): + vm = self._lookup(instance_name) + if vm is None: + raise exception.NotFound('Cannot attach volume to missing %s vm' % + instance_name) + + def detach_volume(self, instance_name, mountpoint): + vm = self._lookup(instance_name) + if vm is None: + raise exception.NotFound('Cannot detach volume from missing %s ' % + instance_name) diff --git a/nova/virt/images.py b/nova/virt/images.py index 1c9b2e093..2d03da4b4 100644 --- a/nova/virt/images.py +++ b/nova/virt/images.py @@ -21,8 +21,12 @@ Handling of VM disk images. """ +import logging import os.path +import shutil +import sys import time +import urllib2 import urlparse from nova import flags @@ -45,6 +49,25 @@ def fetch(image, path, user, project): return f(image, path, user, project) +def _fetch_image_no_curl(url, path, headers): + request = urllib2.Request(url) + for (k, v) in headers.iteritems(): + request.add_header(k, v) + + def urlretrieve(urlfile, fpath): + chunk = 1 * 1024 * 1024 + f = open(fpath, "wb") + while 1: + data = urlfile.read(chunk) + if not data: + break + f.write(data) + + urlopened = urllib2.urlopen(request) + urlretrieve(urlopened, path) + logging.debug(_("Finished retreving %s -- placed in %s"), url, path) + + def _fetch_s3_image(image, path, user, project): url = image_url(image) @@ -61,18 +84,24 @@ def _fetch_s3_image(image, path, user, project): url_path) headers['Authorization'] = 'AWS %s:%s' % (access, signature) - cmd = ['/usr/bin/curl', '--fail', '--silent', url] - for (k, v) in headers.iteritems(): - cmd += ['-H', '"%s: %s"' % (k, v)] + if sys.platform.startswith('win'): + return _fetch_image_no_curl(url, path, headers) + else: + cmd = ['/usr/bin/curl', '--fail', '--silent', url] + for (k, v) in headers.iteritems(): + cmd += ['-H', '%s: %s' % (k, v)] - cmd += ['-o', path] - cmd_out = ' '.join(cmd) - return utils.execute(cmd_out) + cmd += ['-o', path] + cmd_out = ' '.join(cmd) + return utils.execute(cmd_out) def _fetch_local_image(image, path, user, project): - source = _image_path('%s/image' % image) - return utils.execute('cp %s %s' % (source, path)) + source = _image_path(os.path.join(image, 'image')) + if sys.platform.startswith('win'): + return shutil.copy(source, path) + else: + return utils.execute('cp %s %s' % (source, path)) def _image_path(path): diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 76f31635a..b6d620782 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -18,6 +19,7 @@ Management class for VM-related functions (spawn, reboot, etc). """ +import json import logging from nova import db @@ -36,7 +38,6 @@ class VMOps(object): """ Management class for VM-related tasks """ - def __init__(self, session): self.XenAPI = session.get_imported_xenapi() self._session = session @@ -120,6 +121,20 @@ class VMOps(object): timer.f = _wait_for_boot return timer.start(interval=0.5, now=True) + def _get_vm_opaque_ref(self, instance_or_vm): + """Refactored out the common code of many methods that receive either + a vm name or a vm instance, and want a vm instance in return. + """ + try: + instance_name = instance_or_vm.name + vm = VMHelper.lookup(self._session, instance_name) + except AttributeError: + # A vm opaque ref was passed + vm = instance_or_vm + if vm is None: + raise Exception(_('Instance not present %s') % instance_name) + return vm + def snapshot(self, instance, name): """ Create snapshot from a running VM instance @@ -168,11 +183,7 @@ class VMOps(object): def reboot(self, instance): """Reboot VM instance""" - instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is None: - raise exception.NotFound(_('instance not' - ' found %s') % instance_name) + vm = self._get_vm_opaque_ref(instance) task = self._session.call_xenapi('Async.VM.clean_reboot', vm) self._session.wait_for_task(instance.id, task) @@ -215,27 +226,19 @@ class VMOps(object): ret = None try: ret = self._session.wait_for_task(instance_id, task) - except XenAPI.Failure, exc: + except self.XenAPI.Failure, exc: logging.warn(exc) callback(ret) def pause(self, instance, callback): """Pause VM instance""" - instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is None: - raise exception.NotFound(_('Instance not' - ' found %s') % instance_name) + vm = self._get_vm_opaque_ref(instance) task = self._session.call_xenapi('Async.VM.pause', vm) self._wait_with_callback(instance.id, task, callback) def unpause(self, instance, callback): """Unpause VM instance""" - instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is None: - raise exception.NotFound(_('Instance not' - ' found %s') % instance_name) + vm = self._get_vm_opaque_ref(instance) task = self._session.call_xenapi('Async.VM.unpause', vm) self._wait_with_callback(instance.id, task, callback) @@ -270,10 +273,7 @@ class VMOps(object): def get_diagnostics(self, instance): """Return data about VM diagnostics""" - vm = VMHelper.lookup(self._session, instance.name) - if vm is None: - raise exception.NotFound(_("Instance not found %s") % - instance.name) + vm = self._get_vm_opaque_ref(instance) rec = self._session.get_xenapi().VM.get_record(vm) return VMHelper.compile_diagnostics(self._session, rec) @@ -281,3 +281,175 @@ class VMOps(object): """Return snapshot of console""" # TODO: implement this to fix pylint! return 'FAKE CONSOLE OUTPUT of instance' + + def list_from_xenstore(self, vm, path): + """Runs the xenstore-ls command to get a listing of all records + from 'path' downward. Returns a dict with the sub-paths as keys, + and the value stored in those paths as values. If nothing is + found at that path, returns None. + """ + ret = self._make_xenstore_call('list_records', vm, path) + return json.loads(ret) + + def read_from_xenstore(self, vm, path): + """Returns the value stored in the xenstore record for the given VM + at the specified location. A XenAPIPlugin.PluginError will be raised + if any error is encountered in the read process. + """ + try: + ret = self._make_xenstore_call('read_record', vm, path, + {'ignore_missing_path': 'True'}) + except self.XenAPI.Failure, e: + return None + ret = json.loads(ret) + if ret == "None": + # Can't marshall None over RPC calls. + return None + return ret + + def write_to_xenstore(self, vm, path, value): + """Writes the passed value to the xenstore record for the given VM + at the specified location. A XenAPIPlugin.PluginError will be raised + if any error is encountered in the write process. + """ + return self._make_xenstore_call('write_record', vm, path, + {'value': json.dumps(value)}) + + def clear_xenstore(self, vm, path): + """Deletes the VM's xenstore record for the specified path. + If there is no such record, the request is ignored. + """ + self._make_xenstore_call('delete_record', vm, path) + + def _make_xenstore_call(self, method, vm, path, addl_args={}): + """Handles calls to the xenstore xenapi plugin.""" + return self._make_plugin_call('xenstore.py', method=method, vm=vm, + path=path, addl_args=addl_args) + + def _make_plugin_call(self, plugin, method, vm, path, addl_args={}): + """Abstracts out the process of calling a method of a xenapi plugin. + Any errors raised by the plugin will in turn raise a RuntimeError here. + """ + vm = self._get_vm_opaque_ref(vm) + rec = self._session.get_xenapi().VM.get_record(vm) + args = {'dom_id': rec['domid'], 'path': path} + args.update(addl_args) + # If the 'testing_mode' attribute is set, add that to the args. + if getattr(self, 'testing_mode', False): + args['testing_mode'] = 'true' + try: + task = self._session.async_call_plugin(plugin, method, args) + ret = self._session.wait_for_task(0, task) + except self.XenAPI.Failure, e: + raise RuntimeError("%s" % e.details[-1]) + return ret + + def add_to_xenstore(self, vm, path, key, value): + """Adds the passed key/value pair to the xenstore record for + the given VM at the specified location. A XenAPIPlugin.PluginError + will be raised if any error is encountered in the write process. + """ + current = self.read_from_xenstore(vm, path) + if not current: + # Nothing at that location + current = {key: value} + else: + current[key] = value + self.write_to_xenstore(vm, path, current) + + def remove_from_xenstore(self, vm, path, key_or_keys): + """Takes either a single key or a list of keys and removes + them from the xenstoreirecord data for the given VM. + If the key doesn't exist, the request is ignored. + """ + current = self.list_from_xenstore(vm, path) + if not current: + return + if isinstance(key_or_keys, basestring): + keys = [key_or_keys] + else: + keys = key_or_keys + keys.sort(lambda x, y: cmp(y.count('/'), x.count('/'))) + for key in keys: + if path: + keypath = "%s/%s" % (path, key) + else: + keypath = key + self._make_xenstore_call('delete_record', vm, keypath) + + ######################################################################## + ###### The following methods interact with the xenstore parameter + ###### record, not the live xenstore. They were created before I + ###### knew the difference, and are left in here in case they prove + ###### to be useful. They all have '_param' added to their method + ###### names to distinguish them. (dabo) + ######################################################################## + def read_partial_from_param_xenstore(self, instance_or_vm, key_prefix): + """Returns a dict of all the keys in the xenstore parameter record + for the given instance that begin with the key_prefix. + """ + data = self.read_from_param_xenstore(instance_or_vm) + badkeys = [k for k in data.keys() + if not k.startswith(key_prefix)] + for badkey in badkeys: + del data[badkey] + return data + + def read_from_param_xenstore(self, instance_or_vm, keys=None): + """Returns the xenstore parameter record data for the specified VM + instance as a dict. Accepts an optional key or list of keys; if a + value for 'keys' is passed, the returned dict is filtered to only + return the values for those keys. + """ + vm = self._get_vm_opaque_ref(instance_or_vm) + data = self._session.call_xenapi_request('VM.get_xenstore_data', + (vm, )) + ret = {} + if keys is None: + keys = data.keys() + elif isinstance(keys, basestring): + keys = [keys] + for key in keys: + raw = data.get(key) + if raw: + ret[key] = json.loads(raw) + else: + ret[key] = raw + return ret + + def add_to_param_xenstore(self, instance_or_vm, key, val): + """Takes a key/value pair and adds it to the xenstore parameter + record for the given vm instance. If the key exists in xenstore, + it is overwritten""" + vm = self._get_vm_opaque_ref(instance_or_vm) + self.remove_from_param_xenstore(instance_or_vm, key) + jsonval = json.dumps(val) + self._session.call_xenapi_request('VM.add_to_xenstore_data', + (vm, key, jsonval)) + + def write_to_param_xenstore(self, instance_or_vm, mapping): + """Takes a dict and writes each key/value pair to the xenstore + parameter record for the given vm instance. Any existing data for + those keys is overwritten. + """ + for k, v in mapping.iteritems(): + self.add_to_param_xenstore(instance_or_vm, k, v) + + def remove_from_param_xenstore(self, instance_or_vm, key_or_keys): + """Takes either a single key or a list of keys and removes + them from the xenstore parameter record data for the given VM. + If the key doesn't exist, the request is ignored. + """ + vm = self._get_vm_opaque_ref(instance_or_vm) + if isinstance(key_or_keys, basestring): + keys = [key_or_keys] + else: + keys = key_or_keys + for key in keys: + self._session.call_xenapi_request('VM.remove_from_xenstore_data', + (vm, key)) + + def clear_param_xenstore(self, instance_or_vm): + """Removes all data from the xenstore parameter record for this VM.""" + self.write_to_param_xenstore(instance_or_vm, {}) + ######################################################################## diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py index 1ca813bcf..4bbc41b03 100644 --- a/nova/virt/xenapi/volume_utils.py +++ b/nova/virt/xenapi/volume_utils.py @@ -200,15 +200,19 @@ class VolumeHelper(HelperBase): return -1 -def _get_volume_id(path): +def _get_volume_id(path_or_id): """Retrieve the volume id from device_path""" + # If we have the ID and not a path, just return it. + if isinstance(path_or_id, int): + return path_or_id # n must contain at least the volume_id # /vol- is for remote volumes # -vol- is for local volumes # see compute/manager->setup_compute_volume - volume_id = path[path.find('/vol-') + 1:] - if volume_id == path: - volume_id = path[path.find('-vol-') + 1:].replace('--', '-') + volume_id = path_or_id[path_or_id.find('/vol-') + 1:] + if volume_id == path_or_id: + volume_id = path_or_id[path_or_id.find('-vol-') + 1:] + volume_id = volume_id.replace('--', '-') return volume_id diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index f17c8f39d..c48f5b7cb 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -19,15 +20,15 @@ A connection to XenServer or Xen Cloud Platform. The concurrency model for this class is as follows: -All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator -deferredToThread). They are remote calls, and so may hang for the usual -reasons. They should not be allowed to block the reactor thread. +All XenAPI calls are on a green thread (using eventlet's "tpool" +thread pool). They are remote calls, and so may hang for the usual +reasons. All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async -(using XenAPI.VM.async_start etc). These return a task, which can then be -polled for completion. Polling is handled using reactor.callLater. +(using XenAPI.VM.async_start etc). These return a task, which can then be +polled for completion. -This combination of techniques means that we don't block the reactor thread at +This combination of techniques means that we don't block the main thread at all, and at the same time we don't hold lots of threads waiting for long-running operations. @@ -81,7 +82,7 @@ flags.DEFINE_string('xenapi_connection_password', flags.DEFINE_float('xenapi_task_poll_interval', 0.5, 'The interval used for polling of remote tasks ' - '(Async.VM.start, etc). Used only if ' + '(Async.VM.start, etc). Used only if ' 'connection_type=xenapi.') flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval', 5.0, @@ -213,6 +214,14 @@ class XenAPISession(object): f = f.__getattr__(m) return tpool.execute(f, *args) + def call_xenapi_request(self, method, *args): + """Some interactions with dom0, such as interacting with xenstore's + param record, require using the xenapi_request method of the session + object. This wraps that call on a background thread. + """ + f = self._session.xenapi_request + return tpool.execute(f, method, *args) + def async_call_plugin(self, plugin, fn, args): """Call Async.host.call_plugin on a background thread.""" return tpool.execute(self._unwrap_plugin_exceptions, @@ -222,7 +231,6 @@ class XenAPISession(object): def wait_for_task(self, id, task): """Return the result of the given task. The task is polled until it completes.""" - done = event.Event() loop = utils.LoopingCall(self._poll_task, id, task, done) loop.start(FLAGS.xenapi_task_poll_interval, now=True) @@ -235,7 +243,7 @@ class XenAPISession(object): return self.XenAPI.Session(url) def _poll_task(self, id, task, done): - """Poll the given XenAPI task, and fire the given Deferred if we + """Poll the given XenAPI task, and fire the given action if we get a result.""" try: name = self._session.xenapi.task.get_name_label(task) @@ -290,7 +298,7 @@ class XenAPISession(object): def _parse_xmlrpc_value(val): - """Parse the given value as if it were an XML-RPC value. This is + """Parse the given value as if it were an XML-RPC value. This is sometimes used as the format for the task.result field.""" if not val: return val diff --git a/nova/volume/__init__.py b/nova/volume/__init__.py index d6e944fc0..56ef9332e 100644 --- a/nova/volume/__init__.py +++ b/nova/volume/__init__.py @@ -16,16 +16,4 @@ # License for the specific language governing permissions and limitations # under the License. -""" -:mod:`nova.volume` -- Nova Block Storage -===================================================== - -.. automodule:: nova.volume - :platform: Unix -.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> -.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> -.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> -.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> -.. moduleauthor:: Manish Singh <yosh@gimp.org> -.. moduleauthor:: Andy Smith <andy@anarkystic.com> -""" +from nova.volume.api import API diff --git a/nova/volume/api.py b/nova/volume/api.py new file mode 100644 index 000000000..2d7fe3762 --- /dev/null +++ b/nova/volume/api.py @@ -0,0 +1,101 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Handles all requests relating to volumes. +""" + +import datetime +import logging + +from nova import db +from nova import exception +from nova import flags +from nova import quota +from nova import rpc +from nova.db import base + +FLAGS = flags.FLAGS +flags.DECLARE('storage_availability_zone', 'nova.volume.manager') + + +class API(base.Base): + """API for interacting with the volume manager.""" + + def create(self, context, size, name, description): + if quota.allowed_volumes(context, 1, size) < 1: + logging.warn("Quota exceeeded for %s, tried to create %sG volume", + context.project_id, size) + raise quota.QuotaError("Volume quota exceeded. You cannot " + "create a volume of size %s" % size) + + options = { + 'size': size, + 'user_id': context.user.id, + 'project_id': context.project_id, + 'availability_zone': FLAGS.storage_availability_zone, + 'status': "creating", + 'attach_status': "detached", + 'display_name': name, + 'display_description': description} + + volume = self.db.volume_create(context, options) + rpc.cast(context, + FLAGS.scheduler_topic, + {"method": "create_volume", + "args": {"topic": FLAGS.volume_topic, + "volume_id": volume['id']}}) + return volume + + def delete(self, context, volume_id): + volume = self.get(context, volume_id) + if volume['status'] != "available": + raise exception.ApiError(_("Volume status must be available")) + now = datetime.datetime.utcnow() + self.db.volume_update(context, volume_id, {'status': 'deleting', + 'terminated_at': now}) + host = volume['host'] + rpc.cast(context, + self.db.queue_get_for(context, FLAGS.volume_topic, host), + {"method": "delete_volume", + "args": {"volume_id": volume_id}}) + + def update(self, context, volume_id, fields): + self.db.volume_update(context, volume_id, fields) + + def get(self, context, volume_id): + return self.db.volume_get(context, volume_id) + + def get_all(self, context): + if context.user.is_admin(): + return self.db.volume_get_all(context) + return self.db.volume_get_all_by_project(context, context.project_id) + + def check_attach(self, context, volume_id): + volume = self.get(context, volume_id) + # TODO(vish): abstract status checking? + if volume['status'] != "available": + raise exception.ApiError(_("Volume status must be available")) + if volume['attach_status'] == "attached": + raise exception.ApiError(_("Volume is already attached")) + + def check_detach(self, context, volume_id): + volume = self.get(context, volume_id) + # TODO(vish): abstract status checking? + if volume['status'] == "available": + raise exception.ApiError(_("Volume is already detached")) diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py index 2d323a016..8e7a829d5 100755 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py @@ -45,6 +45,7 @@ class PluginError(Exception): def __init__(self, *args): Exception.__init__(self, *args) + class ArgumentError(PluginError): """Raised when required arguments are missing, argument values are invalid, or incompatible arguments are given. @@ -67,6 +68,7 @@ def ignore_failure(func, *args, **kwargs): ARGUMENT_PATTERN = re.compile(r'^[a-zA-Z0-9_:\.\-,]+$') + def validate_exists(args, key, default=None): """Validates that a string argument to a RPC method call is given, and matches the shell-safe regex, with an optional default value in case it @@ -76,20 +78,24 @@ def validate_exists(args, key, default=None): """ if key in args: if len(args[key]) == 0: - raise ArgumentError('Argument %r value %r is too short.' % (key, args[key])) + raise ArgumentError('Argument %r value %r is too short.' % + (key, args[key])) if not ARGUMENT_PATTERN.match(args[key]): - raise ArgumentError('Argument %r value %r contains invalid characters.' % (key, args[key])) + raise ArgumentError('Argument %r value %r contains invalid ' + 'characters.' % (key, args[key])) if args[key][0] == '-': - raise ArgumentError('Argument %r value %r starts with a hyphen.' % (key, args[key])) + raise ArgumentError('Argument %r value %r starts with a hyphen.' + % (key, args[key])) return args[key] elif default is not None: return default else: raise ArgumentError('Argument %s is required.' % key) + def validate_bool(args, key, default=None): - """Validates that a string argument to a RPC method call is a boolean string, - with an optional default value in case it does not exist. + """Validates that a string argument to a RPC method call is a boolean + string, with an optional default value in case it does not exist. Returns the python boolean value. """ @@ -99,7 +105,9 @@ def validate_bool(args, key, default=None): elif value.lower() == 'false': return False else: - raise ArgumentError("Argument %s may not take value %r. Valid values are ['true', 'false']." % (key, value)) + raise ArgumentError("Argument %s may not take value %r. " + "Valid values are ['true', 'false']." % (key, value)) + def exists(args, key): """Validates that a freeform string argument to a RPC method call is given. @@ -110,6 +118,7 @@ def exists(args, key): else: raise ArgumentError('Argument %s is required.' % key) + def optional(args, key): """If the given key is in args, return the corresponding value, otherwise return None""" @@ -122,13 +131,14 @@ def get_this_host(session): def get_domain_0(session): this_host_ref = get_this_host(session) - expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' % this_host_ref + expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' + expr = expr % this_host_ref return session.xenapi.VM.get_all_records_where(expr).keys()[0] def create_vdi(session, sr_ref, name_label, virtual_size, read_only): vdi_ref = session.xenapi.VDI.create( - { 'name_label': name_label, + {'name_label': name_label, 'name_description': '', 'SR': sr_ref, 'virtual_size': str(virtual_size), @@ -138,7 +148,7 @@ def create_vdi(session, sr_ref, name_label, virtual_size, read_only): 'xenstore_data': {}, 'other_config': {}, 'sm_config': {}, - 'tags': [] }) + 'tags': []}) logging.debug('Created VDI %s (%s, %s, %s) on %s.', vdi_ref, name_label, virtual_size, read_only, sr_ref) return vdi_ref diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py new file mode 100755 index 000000000..695bf3448 --- /dev/null +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +# Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# +# XenAPI plugin for reading/writing information to xenstore +# + +try: + import json +except ImportError: + import simplejson as json +import subprocess + +import XenAPIPlugin + +import pluginlib_nova as pluginlib +pluginlib.configure_logging("xenstore") + + +def jsonify(fnc): + def wrapper(*args, **kwargs): + return json.dumps(fnc(*args, **kwargs)) + return wrapper + + +@jsonify +def read_record(self, arg_dict): + """Returns the value stored at the given path for the given dom_id. + These must be encoded as key/value pairs in arg_dict. You can + optinally include a key 'ignore_missing_path'; if this is present + and boolean True, attempting to read a non-existent path will return + the string 'None' instead of raising an exception. + """ + cmd = "xenstore-read /local/domain/%(dom_id)s/%(path)s" % arg_dict + try: + return _run_command(cmd).rstrip("\n") + except pluginlib.PluginError, e: + if arg_dict.get("ignore_missing_path", False): + cmd = "xenstore-exists /local/domain/%(dom_id)s/%(path)s; echo $?" + cmd = cmd % arg_dict + ret = _run_command(cmd).strip() + # If the path exists, the cmd should return "0" + if ret != "0": + # No such path, so ignore the error and return the + # string 'None', since None can't be marshalled + # over RPC. + return "None" + # Either we shouldn't ignore path errors, or another + # error was hit. Re-raise. + raise + + +@jsonify +def write_record(self, arg_dict): + """Writes to xenstore at the specified path. If there is information + already stored in that location, it is overwritten. As in read_record, + the dom_id and path must be specified in the arg_dict; additionally, + you must specify a 'value' key, whose value must be a string. Typically, + you can json-ify more complex values and store the json output. + """ + cmd = "xenstore-write /local/domain/%(dom_id)s/%(path)s '%(value)s'" + cmd = cmd % arg_dict + _run_command(cmd) + return arg_dict["value"] + + +@jsonify +def list_records(self, arg_dict): + """Returns all the stored data at or below the given path for the + given dom_id. The data is returned as a json-ified dict, with the + path as the key and the stored value as the value. If the path + doesn't exist, an empty dict is returned. + """ + cmd = "xenstore-ls /local/domain/%(dom_id)s/%(path)s" % arg_dict + cmd = cmd.rstrip("/") + try: + recs = _run_command(cmd) + except pluginlib.PluginError, e: + if "No such file or directory" in "%s" % e: + # Path doesn't exist. + return {} + return str(e) + raise + base_path = arg_dict["path"] + paths = _paths_from_ls(recs) + ret = {} + for path in paths: + if base_path: + arg_dict["path"] = "%s/%s" % (base_path, path) + else: + arg_dict["path"] = path + rec = read_record(self, arg_dict) + try: + val = json.loads(rec) + except ValueError: + val = rec + ret[path] = val + return ret + + +@jsonify +def delete_record(self, arg_dict): + """Just like it sounds: it removes the record for the specified + VM and the specified path from xenstore. + """ + cmd = "xenstore-rm /local/domain/%(dom_id)s/%(path)s" % arg_dict + return _run_command(cmd) + + +def _paths_from_ls(recs): + """The xenstore-ls command returns a listing that isn't terribly + useful. This method cleans that up into a dict with each path + as the key, and the associated string as the value. + """ + ret = {} + last_nm = "" + level = 0 + path = [] + ret = [] + for ln in recs.splitlines(): + nm, val = ln.rstrip().split(" = ") + barename = nm.lstrip() + this_level = len(nm) - len(barename) + if this_level == 0: + ret.append(barename) + level = 0 + path = [] + elif this_level == level: + # child of same parent + ret.append("%s/%s" % ("/".join(path), barename)) + elif this_level > level: + path.append(last_nm) + ret.append("%s/%s" % ("/".join(path), barename)) + level = this_level + elif this_level < level: + path = path[:this_level] + ret.append("%s/%s" % ("/".join(path), barename)) + level = this_level + last_nm = barename + return ret + + +def _run_command(cmd): + """Abstracts out the basics of issuing system commands. If the command + returns anything in stderr, a PluginError is raised with that information. + Otherwise, the output from stdout is returned. + """ + pipe = subprocess.PIPE + proc = subprocess.Popen([cmd], shell=True, stdin=pipe, stdout=pipe, + stderr=pipe, close_fds=True) + proc.wait() + err = proc.stderr.read() + if err: + raise pluginlib.PluginError(err) + return proc.stdout.read() + + +if __name__ == "__main__": + XenAPIPlugin.dispatch( + {"read_record": read_record, + "write_record": write_record, + "list_records": list_records, + "delete_record": delete_record}) |