summaryrefslogtreecommitdiffstats
path: root/nova
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@gmail.com>2010-12-16 22:52:08 +0000
committerVishvananda Ishaya <vishvananda@gmail.com>2010-12-16 22:52:08 +0000
commit26ebccc8d92aba23efa1663fe9949c141a4cc671 (patch)
treeb5c5724d1f806051ffd30b9a8596c52f6a890351 /nova
parentdc29400d104d34c6383132a43e018f7724e85ec3 (diff)
parentcd460a1f661eea7e050891f50a8218fdf24f2c6f (diff)
downloadnova-26ebccc8d92aba23efa1663fe9949c141a4cc671.tar.gz
nova-26ebccc8d92aba23efa1663fe9949c141a4cc671.tar.xz
nova-26ebccc8d92aba23efa1663fe9949c141a4cc671.zip
merged trunk, fixed conflicts and tests
Diffstat (limited to 'nova')
-rw-r--r--nova/api/cloud.py58
-rw-r--r--nova/api/ec2/cloud.py276
-rw-r--r--nova/api/openstack/__init__.py19
-rw-r--r--nova/api/openstack/auth.py26
-rw-r--r--nova/api/openstack/images.py9
-rw-r--r--nova/api/openstack/servers.py214
-rw-r--r--nova/auth/ldapdriver.py138
-rw-r--r--nova/auth/manager.py4
-rw-r--r--nova/auth/nova_openldap.schema84
-rw-r--r--nova/auth/nova_sun.schema16
-rwxr-xr-xnova/auth/opendj.sh119
-rw-r--r--nova/auth/openssh-lpk_openldap.schema19
-rw-r--r--nova/auth/openssh-lpk_sun.schema10
-rwxr-xr-xnova/auth/slap.sh112
-rw-r--r--nova/compute/api.py305
-rw-r--r--nova/compute/disk.py72
-rw-r--r--nova/compute/instance_types.py20
-rw-r--r--nova/compute/manager.py100
-rw-r--r--nova/db/base.py (renamed from nova/tests/validator_unittest.py)32
-rw-r--r--nova/db/sqlalchemy/api.py9
-rw-r--r--nova/db/sqlalchemy/models.py3
-rw-r--r--nova/flags.py10
-rw-r--r--nova/image/local.py2
-rw-r--r--nova/manager.py14
-rw-r--r--nova/network/linux_net.py1
-rw-r--r--nova/network/manager.py4
-rw-r--r--nova/objectstore/image.py10
-rw-r--r--nova/process.py209
-rw-r--r--nova/quota.py5
-rw-r--r--nova/rpc.py83
-rw-r--r--nova/server.py151
-rw-r--r--nova/service.py95
-rw-r--r--nova/test.py100
-rw-r--r--nova/tests/__init__.py5
-rw-r--r--nova/tests/access_unittest.py2
-rw-r--r--nova/tests/api/openstack/fakes.py19
-rw-r--r--nova/tests/api/openstack/test_adminapi.py61
-rw-r--r--nova/tests/api/openstack/test_auth.py7
-rw-r--r--nova/tests/api/openstack/test_servers.py31
-rw-r--r--nova/tests/auth_unittest.py6
-rw-r--r--nova/tests/cloud_unittest.py19
-rw-r--r--nova/tests/compute_unittest.py70
-rw-r--r--nova/tests/flags_unittest.py2
-rw-r--r--nova/tests/misc_unittest.py51
-rw-r--r--nova/tests/network_unittest.py2
-rw-r--r--nova/tests/objectstore_unittest.py4
-rw-r--r--nova/tests/process_unittest.py132
-rw-r--r--nova/tests/quota_unittest.py18
-rw-r--r--nova/tests/rpc_unittest.py36
-rw-r--r--nova/tests/scheduler_unittest.py24
-rw-r--r--nova/tests/service_unittest.py46
-rw-r--r--nova/tests/virt_unittest.py11
-rw-r--r--nova/tests/volume_unittest.py58
-rw-r--r--nova/utils.py56
-rw-r--r--nova/validate.py94
-rw-r--r--nova/virt/connection.py4
-rw-r--r--nova/virt/fake.py10
-rw-r--r--nova/virt/images.py11
-rw-r--r--nova/virt/libvirt_conn.py177
-rw-r--r--nova/virt/xenapi.py444
-rw-r--r--nova/virt/xenapi/__init__.py15
-rw-r--r--nova/virt/xenapi/network_utils.py41
-rw-r--r--nova/virt/xenapi/vm_utils.py260
-rw-r--r--nova/virt/xenapi/vmops.py139
-rw-r--r--nova/virt/xenapi/volumeops.py32
-rw-r--r--nova/virt/xenapi_conn.py241
-rw-r--r--nova/volume/driver.py140
-rw-r--r--nova/volume/manager.py27
68 files changed, 2310 insertions, 2314 deletions
diff --git a/nova/api/cloud.py b/nova/api/cloud.py
deleted file mode 100644
index b8f15019f..000000000
--- a/nova/api/cloud.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Methods for API calls to control instances via AMQP.
-"""
-
-
-from nova import db
-from nova import flags
-from nova import rpc
-
-FLAGS = flags.FLAGS
-
-
-def reboot(instance_id, context=None):
- """Reboot the given instance."""
- instance_ref = db.instance_get_by_internal_id(context, instance_id)
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "reboot_instance",
- "args": {"instance_id": instance_ref['id']}})
-
-
-def rescue(instance_id, context):
- """Rescue the given instance."""
- instance_ref = db.instance_get_by_internal_id(context, instance_id)
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "rescue_instance",
- "args": {"instance_id": instance_ref['id']}})
-
-
-def unrescue(instance_id, context):
- """Unrescue the given instance."""
- instance_ref = db.instance_get_by_internal_id(context, instance_id)
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "unrescue_instance",
- "args": {"instance_id": instance_ref['id']}})
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index 9327bf0d4..ebb13aedc 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -39,9 +39,8 @@ from nova import flags
from nova import quota
from nova import rpc
from nova import utils
-from nova.compute.instance_types import INSTANCE_TYPES
-from nova.api import cloud
-from nova.image.s3 import S3ImageService
+from nova.compute import api as compute_api
+from nova.compute import instance_types
FLAGS = flags.FLAGS
@@ -50,11 +49,6 @@ flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
InvalidInputException = exception.InvalidInputException
-class QuotaError(exception.ApiError):
- """Quota Exceeeded"""
- pass
-
-
def _gen_key(context, user_id, key_name):
"""Generate a key
@@ -99,8 +93,9 @@ class CloudController(object):
"""
def __init__(self):
self.network_manager = utils.import_object(FLAGS.network_manager)
- self.compute_manager = utils.import_object(FLAGS.compute_manager)
- self.image_service = S3ImageService()
+ self.image_service = utils.import_object(FLAGS.image_service)
+ self.compute_api = compute_api.ComputeAPI(self.network_manager,
+ self.image_service)
self.setup()
def __str__(self):
@@ -124,10 +119,10 @@ class CloudController(object):
def _get_mpi_data(self, context, project_id):
result = {}
- for instance in db.instance_get_all_by_project(context, project_id):
+ for instance in self.compute_api.get_instances(context, project_id):
if instance['fixed_ip']:
line = '%s slots=%d' % (instance['fixed_ip']['address'],
- INSTANCE_TYPES[instance['instance_type']]['vcpus'])
+ instance['vcpus'])
key = str(instance['key_name'])
if key in result:
result[key].append(line)
@@ -260,7 +255,7 @@ class CloudController(object):
return True
def describe_security_groups(self, context, group_name=None, **kwargs):
- self._ensure_default_security_group(context)
+ self.compute_api.ensure_default_security_group(context)
if context.user.is_admin():
groups = db.security_group_get_all(context)
else:
@@ -358,7 +353,7 @@ class CloudController(object):
return False
def revoke_security_group_ingress(self, context, group_name, **kwargs):
- self._ensure_default_security_group(context)
+ self.compute_api.ensure_default_security_group(context)
security_group = db.security_group_get_by_name(context,
context.project_id,
group_name)
@@ -383,7 +378,7 @@ class CloudController(object):
# for these operations, so support for newer API versions
# is sketchy.
def authorize_security_group_ingress(self, context, group_name, **kwargs):
- self._ensure_default_security_group(context)
+ self.compute_api.ensure_default_security_group(context)
security_group = db.security_group_get_by_name(context,
context.project_id,
group_name)
@@ -419,7 +414,7 @@ class CloudController(object):
return source_project_id
def create_security_group(self, context, group_name, group_description):
- self._ensure_default_security_group(context)
+ self.compute_api.ensure_default_security_group(context)
if db.security_group_exists(context, context.project_id, group_name):
raise exception.ApiError('group %s already exists' % group_name)
@@ -443,7 +438,7 @@ class CloudController(object):
# instance_id is passed in as a list of instances
ec2_id = instance_id[0]
internal_id = ec2_id_to_internal_id(ec2_id)
- instance_ref = db.instance_get_by_internal_id(context, internal_id)
+ instance_ref = self.compute_api.get_instance(context, internal_id)
output = rpc.call(context,
'%s.%s' % (FLAGS.compute_topic,
instance_ref['host']),
@@ -455,13 +450,15 @@ class CloudController(object):
"Timestamp": now,
"output": base64.b64encode(output)}
- def describe_volumes(self, context, **kwargs):
+ def describe_volumes(self, context, volume_id=None, **kwargs):
if context.user.is_admin():
volumes = db.volume_get_all(context)
else:
volumes = db.volume_get_all_by_project(context, context.project_id)
- volumes = [self._format_volume(context, v) for v in volumes]
+ # NOTE(vish): volume_id is an optional list of volume ids to filter by.
+ volumes = [self._format_volume(context, v) for v in volumes
+ if volume_id is None or v['ec2_id'] in volume_id]
return {'volumeSet': volumes}
@@ -505,9 +502,8 @@ class CloudController(object):
if quota.allowed_volumes(context, 1, size) < 1:
logging.warn("Quota exceeeded for %s, tried to create %sG volume",
context.project_id, size)
- raise QuotaError("Volume quota exceeded. You cannot "
- "create a volume of size %s" %
- size)
+ raise quota.QuotaError("Volume quota exceeded. You cannot "
+ "create a volume of size %s" % size)
vol = {}
vol['size'] = size
vol['user_id'] = context.user.id
@@ -541,7 +537,7 @@ class CloudController(object):
if volume_ref['attach_status'] == "attached":
raise exception.ApiError("Volume is already attached")
internal_id = ec2_id_to_internal_id(instance_id)
- instance_ref = db.instance_get_by_internal_id(context, internal_id)
+ instance_ref = self.compute_api.get_instance(context, internal_id)
host = instance_ref['host']
rpc.cast(context,
db.queue_get_for(context, FLAGS.compute_topic, host),
@@ -619,11 +615,7 @@ class CloudController(object):
instances = db.instance_get_all_by_reservation(context,
reservation_id)
else:
- if context.user.is_admin():
- instances = db.instance_get_all(context)
- else:
- instances = db.instance_get_all_by_project(context,
- context.project_id)
+ instances = self.compute_api.get_instances(context)
for instance in instances:
if not context.user.is_admin():
if instance['image_id'] == FLAGS.vpn_image_id:
@@ -699,8 +691,8 @@ class CloudController(object):
if quota.allowed_floating_ips(context, 1) < 1:
logging.warn("Quota exceeeded for %s, tried to allocate address",
context.project_id)
- raise QuotaError("Address quota exceeded. You cannot "
- "allocate any more addresses")
+ raise quota.QuotaError("Address quota exceeded. You cannot "
+ "allocate any more addresses")
network_topic = self._get_network_topic(context)
public_ip = rpc.call(context,
network_topic,
@@ -720,7 +712,7 @@ class CloudController(object):
def associate_address(self, context, instance_id, public_ip, **kwargs):
internal_id = ec2_id_to_internal_id(instance_id)
- instance_ref = db.instance_get_by_internal_id(context, internal_id)
+ instance_ref = self.compute_api.get_instance(context, internal_id)
fixed_address = db.instance_get_fixed_address(context,
instance_ref['id'])
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
@@ -752,218 +744,49 @@ class CloudController(object):
"args": {"network_id": network_ref['id']}})
return db.queue_get_for(context, FLAGS.network_topic, host)
- def _ensure_default_security_group(self, context):
- try:
- db.security_group_get_by_name(context,
- context.project_id,
- 'default')
- except exception.NotFound:
- values = {'name': 'default',
- 'description': 'default',
- 'user_id': context.user.id,
- 'project_id': context.project_id}
- group = db.security_group_create(context, values)
-
def run_instances(self, context, **kwargs):
- instance_type = kwargs.get('instance_type', 'm1.small')
- if instance_type not in INSTANCE_TYPES:
- raise exception.ApiError("Unknown instance type: %s",
- instance_type)
- # check quota
- max_instances = int(kwargs.get('max_count', 1))
- min_instances = int(kwargs.get('min_count', max_instances))
- num_instances = quota.allowed_instances(context,
- max_instances,
- instance_type)
- if num_instances < min_instances:
- logging.warn("Quota exceeeded for %s, tried to run %s instances",
- context.project_id, min_instances)
- raise QuotaError("Instance quota exceeded. You can only "
- "run %s more instances of this type." %
- num_instances, "InstanceLimitExceeded")
- # make sure user can access the image
- # vpn image is private so it doesn't show up on lists
- vpn = kwargs['image_id'] == FLAGS.vpn_image_id
-
- if not vpn:
- image = self.image_service.show(context, kwargs['image_id'])
-
- # FIXME(ja): if image is vpn, this breaks
- # get defaults from imagestore
- image_id = image['imageId']
- kernel_id = image.get('kernelId', FLAGS.default_kernel)
- ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
-
- # API parameters overrides of defaults
- kernel_id = kwargs.get('kernel_id', kernel_id)
- ramdisk_id = kwargs.get('ramdisk_id', ramdisk_id)
-
- # make sure we have access to kernel and ramdisk
- self.image_service.show(context, kernel_id)
- self.image_service.show(context, ramdisk_id)
-
- logging.debug("Going to run %s instances...", num_instances)
- launch_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
- key_data = None
- if 'key_name' in kwargs:
- key_pair_ref = db.key_pair_get(context,
- context.user.id,
- kwargs['key_name'])
- key_data = key_pair_ref['public_key']
-
- security_group_arg = kwargs.get('security_group', ["default"])
- if not type(security_group_arg) is list:
- security_group_arg = [security_group_arg]
-
- security_groups = []
- self._ensure_default_security_group(context)
- for security_group_name in security_group_arg:
- group = db.security_group_get_by_name(context,
- context.project_id,
- security_group_name)
- security_groups.append(group['id'])
-
- reservation_id = utils.generate_uid('r')
- base_options = {}
- base_options['state_description'] = 'scheduling'
- base_options['image_id'] = image_id
- base_options['kernel_id'] = kernel_id
- base_options['ramdisk_id'] = ramdisk_id
- base_options['reservation_id'] = reservation_id
- base_options['key_data'] = key_data
- base_options['key_name'] = kwargs.get('key_name', None)
- base_options['user_id'] = context.user.id
- base_options['project_id'] = context.project_id
- base_options['user_data'] = kwargs.get('user_data', '')
-
- base_options['display_name'] = kwargs.get('display_name')
- base_options['display_description'] = kwargs.get('display_description')
-
- type_data = INSTANCE_TYPES[instance_type]
- base_options['instance_type'] = instance_type
- base_options['memory_mb'] = type_data['memory_mb']
- base_options['vcpus'] = type_data['vcpus']
- base_options['local_gb'] = type_data['local_gb']
- elevated = context.elevated()
-
- for num in range(num_instances):
-
- instance_ref = self.compute_manager.create_instance(context,
- security_groups,
- mac_address=utils.generate_mac(),
- launch_index=num,
- **base_options)
- inst_id = instance_ref['id']
-
- internal_id = instance_ref['internal_id']
- ec2_id = internal_id_to_ec2_id(internal_id)
-
- self.compute_manager.update_instance(context,
- inst_id,
- hostname=ec2_id)
-
- # TODO(vish): This probably should be done in the scheduler
- # or in compute as a call. The network should be
- # allocated after the host is assigned and setup
- # can happen at the same time.
- address = self.network_manager.allocate_fixed_ip(context,
- inst_id,
- vpn)
- network_topic = self._get_network_topic(context)
- rpc.cast(elevated,
- network_topic,
- {"method": "setup_fixed_ip",
- "args": {"address": address}})
-
- rpc.cast(context,
- FLAGS.scheduler_topic,
- {"method": "run_instance",
- "args": {"topic": FLAGS.compute_topic,
- "instance_id": inst_id}})
- logging.debug("Casting to scheduler for %s/%s's instance %s" %
- (context.project.name, context.user.name, inst_id))
- return self._format_run_instances(context, reservation_id)
+ max_count = int(kwargs.get('max_count', 1))
+ instances = self.compute_api.create_instances(context,
+ instance_types.get_by_type(kwargs.get('instance_type', None)),
+ kwargs['image_id'],
+ min_count=int(kwargs.get('min_count', max_count)),
+ max_count=max_count,
+ kernel_id=kwargs.get('kernel_id'),
+ ramdisk_id=kwargs.get('ramdisk_id'),
+ display_name=kwargs.get('display_name'),
+ description=kwargs.get('display_description'),
+ key_name=kwargs.get('key_name'),
+ security_group=kwargs.get('security_group'),
+ generate_hostname=internal_id_to_ec2_id)
+ return self._format_run_instances(context,
+ instances[0]['reservation_id'])
def terminate_instances(self, context, instance_id, **kwargs):
"""Terminate each instance in instance_id, which is a list of ec2 ids.
-
- instance_id is a kwarg so its name cannot be modified.
- """
- ec2_id_list = instance_id
+ instance_id is a kwarg so its name cannot be modified."""
logging.debug("Going to start terminating instances")
- for id_str in ec2_id_list:
- internal_id = ec2_id_to_internal_id(id_str)
- logging.debug("Going to try and terminate %s" % id_str)
- try:
- instance_ref = db.instance_get_by_internal_id(context,
- internal_id)
- except exception.NotFound:
- logging.warning("Instance %s was not found during terminate",
- id_str)
- continue
-
- if (instance_ref['state_description'] == 'terminating'):
- logging.warning("Instance %s is already being terminated",
- id_str)
- continue
- now = datetime.datetime.utcnow()
- self.compute_manager.update_instance(context,
- instance_ref['id'],
- state_description='terminating',
- state=0,
- terminated_at=now)
-
- # FIXME(ja): where should network deallocate occur?
- address = db.instance_get_floating_address(context,
- instance_ref['id'])
- if address:
- logging.debug("Disassociating address %s" % address)
- # NOTE(vish): Right now we don't really care if the ip is
- # disassociated. We may need to worry about
- # checking this later. Perhaps in the scheduler?
- network_topic = self._get_network_topic(context)
- rpc.cast(context,
- network_topic,
- {"method": "disassociate_floating_ip",
- "args": {"floating_address": address}})
-
- address = db.instance_get_fixed_address(context,
- instance_ref['id'])
- if address:
- logging.debug("Deallocating address %s" % address)
- # NOTE(vish): Currently, nothing needs to be done on the
- # network node until release. If this changes,
- # we will need to cast here.
- self.network_manager.deallocate_fixed_ip(context.elevated(),
- address)
-
- host = instance_ref['host']
- if host:
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "terminate_instance",
- "args": {"instance_id": instance_ref['id']}})
- else:
- db.instance_destroy(context, instance_ref['id'])
+ for ec2_id in instance_id:
+ internal_id = ec2_id_to_internal_id(ec2_id)
+ self.compute_api.delete_instance(context, internal_id)
return True
def reboot_instances(self, context, instance_id, **kwargs):
"""instance_id is a list of instance ids"""
for ec2_id in instance_id:
internal_id = ec2_id_to_internal_id(ec2_id)
- cloud.reboot(internal_id, context=context)
+ self.compute_api.reboot(context, internal_id)
return True
def rescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
internal_id = ec2_id_to_internal_id(instance_id)
- cloud.rescue(internal_id, context=context)
+ self.compute_api.rescue(context, internal_id)
return True
def unrescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
internal_id = ec2_id_to_internal_id(instance_id)
- cloud.unrescue(internal_id, context=context)
+ self.compute_api.unrescue(context, internal_id)
return True
def update_instance(self, context, ec2_id, **kwargs):
@@ -974,7 +797,7 @@ class CloudController(object):
changes[field] = kwargs[field]
if changes:
internal_id = ec2_id_to_internal_id(ec2_id)
- inst = db.instance_get_by_internal_id(context, internal_id)
+ inst = self.compute_api.get_instance(context, internal_id)
db.instance_update(context, inst['id'], kwargs)
return True
@@ -994,8 +817,11 @@ class CloudController(object):
return True
def describe_images(self, context, image_id=None, **kwargs):
- imageSet = self.image_service.index(context, image_id)
- return {'imagesSet': imageSet}
+ # Note: image_id is a list!
+ images = self.image_service.index(context)
+ if image_id:
+ images = filter(lambda x: x['imageId'] in image_id, images)
+ return {'imagesSet': images}
def deregister_image(self, context, image_id, **kwargs):
self.image_service.deregister(context, image_id)
diff --git a/nova/api/openstack/__init__.py b/nova/api/openstack/__init__.py
index 1dd3ba770..b9ecbd9b8 100644
--- a/nova/api/openstack/__init__.py
+++ b/nova/api/openstack/__init__.py
@@ -25,10 +25,12 @@ import time
import logging
import routes
+import traceback
import webob.dec
import webob.exc
import webob
+from nova import context
from nova import flags
from nova import utils
from nova import wsgi
@@ -47,6 +49,10 @@ flags.DEFINE_string('nova_api_auth',
'nova.api.openstack.auth.BasicApiAuthManager',
'The auth mechanism to use for the OpenStack API implemenation')
+flags.DEFINE_bool('allow_admin_api',
+ False,
+ 'When True, this API service will accept admin operations.')
+
class API(wsgi.Middleware):
"""WSGI entry point for all OpenStack API requests."""
@@ -61,6 +67,7 @@ class API(wsgi.Middleware):
return req.get_response(self.application)
except Exception as ex:
logging.warn("Caught error: %s" % str(ex))
+ logging.debug(traceback.format_exc())
exc = webob.exc.HTTPInternalServerError(explanation=str(ex))
return faults.Fault(exc)
@@ -82,9 +89,7 @@ class AuthMiddleware(wsgi.Middleware):
if not user:
return faults.Fault(webob.exc.HTTPUnauthorized())
- if 'nova.context' not in req.environ:
- req.environ['nova.context'] = {}
- req.environ['nova.context']['user'] = user
+ req.environ['nova.context'] = context.RequestContext(user, user)
return self.application
@@ -119,12 +124,12 @@ class RateLimitingMiddleware(wsgi.Middleware):
If the request should be rate limited, return a 413 status with a
Retry-After header giving the time when the request would succeed.
"""
- user_id = req.environ['nova.context']['user']['id']
action_name = self.get_action_name(req)
if not action_name:
# Not rate limited
return self.application
- delay = self.get_delay(action_name, user_id)
+ delay = self.get_delay(action_name,
+ req.environ['nova.context'].user_id)
if delay:
# TODO(gundlach): Get the retry-after format correct.
exc = webob.exc.HTTPRequestEntityTooLarge(
@@ -181,6 +186,10 @@ class APIRouter(wsgi.Router):
mapper.resource("sharedipgroup", "sharedipgroups",
controller=sharedipgroups.Controller())
+ if FLAGS.allow_admin_api:
+ logging.debug("Including admin operations in API.")
+ # TODO: Place routes for admin operations here.
+
super(APIRouter, self).__init__(mapper)
diff --git a/nova/api/openstack/auth.py b/nova/api/openstack/auth.py
index ff428ff70..fcda97ab1 100644
--- a/nova/api/openstack/auth.py
+++ b/nova/api/openstack/auth.py
@@ -23,10 +23,7 @@ class Context(object):
class BasicApiAuthManager(object):
""" Implements a somewhat rudimentary version of OpenStack Auth"""
- def __init__(self, host=None, db_driver=None):
- if not host:
- host = FLAGS.host
- self.host = host
+ def __init__(self, db_driver=None):
if not db_driver:
db_driver = FLAGS.db_driver
self.db = utils.import_object(db_driver)
@@ -47,7 +44,7 @@ class BasicApiAuthManager(object):
except KeyError:
return faults.Fault(webob.exc.HTTPUnauthorized())
- token, user = self._authorize_user(username, key)
+ token, user = self._authorize_user(username, key, req)
if user and token:
res = webob.Response()
res.headers['X-Auth-Token'] = token.token_hash
@@ -77,13 +74,16 @@ class BasicApiAuthManager(object):
if delta.days >= 2:
self.db.auth_destroy_token(self.context, token)
else:
- #TODO(gundlach): Why not just return dict(id=token.user_id)?
- user = self.auth.get_user(token.user_id)
- return {'id': user.id}
+ return self.auth.get_user(token.user_id)
return None
- def _authorize_user(self, username, key):
- """ Generates a new token and assigns it to a user """
+ def _authorize_user(self, username, key, req):
+ """Generates a new token and assigns it to a user.
+
+ username - string
+ key - string API key
+ req - webob.Request object
+ """
user = self.auth.get_user_from_access_key(key)
if user and user.name == username:
token_hash = hashlib.sha1('%s%s%f' % (username, key,
@@ -91,12 +91,10 @@ class BasicApiAuthManager(object):
token_dict = {}
token_dict['token_hash'] = token_hash
token_dict['cdn_management_url'] = ''
- token_dict['server_management_url'] = self._get_server_mgmt_url()
+ # Same as auth url, e.g. http://foo.org:8774/baz/v1.0
+ token_dict['server_management_url'] = req.url
token_dict['storage_url'] = ''
token_dict['user_id'] = user.id
token = self.db.auth_create_token(self.context, token_dict)
return token, user
return None, None
-
- def _get_server_mgmt_url(self):
- return 'https://%s/v1.0/' % self.host
diff --git a/nova/api/openstack/images.py b/nova/api/openstack/images.py
index cdbdc9bdd..4a0a8e6f1 100644
--- a/nova/api/openstack/images.py
+++ b/nova/api/openstack/images.py
@@ -17,7 +17,6 @@
from webob import exc
-from nova import context
from nova import flags
from nova import utils
from nova import wsgi
@@ -47,10 +46,8 @@ class Controller(wsgi.Controller):
def detail(self, req):
"""Return all public images in detail."""
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
try:
- images = self._service.detail(ctxt)
+ images = self._service.detail(req.environ['nova.context'])
images = nova.api.openstack.limited(images, req)
except NotImplementedError:
# Emulate detail() using repeated calls to show()
@@ -61,9 +58,7 @@ class Controller(wsgi.Controller):
def show(self, req, id):
"""Return data about the given image id."""
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- return dict(image=self._service.show(ctxt, id))
+ return dict(image=self._service.show(req.environ['nova.context'], id))
def delete(self, req, id):
# Only public images are supported for now.
diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py
index 1d8aa2fa4..7704f48f1 100644
--- a/nova/api/openstack/servers.py
+++ b/nova/api/openstack/servers.py
@@ -15,34 +15,16 @@
# License for the specific language governing permissions and limitations
# under the License.
-import time
-
-import webob
from webob import exc
-from nova import flags
-from nova import rpc
-from nova import utils
+from nova import exception
from nova import wsgi
-from nova import context
-from nova.api import cloud
from nova.api.openstack import faults
+from nova.auth import manager as auth_manager
+from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import power_state
import nova.api.openstack
-import nova.image.service
-
-FLAGS = flags.FLAGS
-
-
-def _filter_params(inst_dict):
- """ Extracts all updatable parameters for a server update request """
- keys = dict(name='name', admin_pass='adminPass')
- new_attrs = {}
- for k, v in keys.items():
- if v in inst_dict:
- new_attrs[k] = inst_dict[v]
- return new_attrs
def _entity_list(entities):
@@ -63,7 +45,7 @@ def _entity_detail(inst):
inst_dict = {}
mapped_keys = dict(status='state', imageId='image_id',
- flavorId='instance_type', name='server_name', id='id')
+ flavorId='instance_type', name='display_name', id='internal_id')
for k, v in mapped_keys.iteritems():
inst_dict[k] = inst[v]
@@ -78,7 +60,7 @@ def _entity_detail(inst):
def _entity_inst(inst):
""" Filters all model attributes save for id and name """
- return dict(server=dict(id=inst['id'], name=inst['server_name']))
+ return dict(server=dict(id=inst['internal_id'], name=inst['display_name']))
class Controller(wsgi.Controller):
@@ -88,14 +70,10 @@ class Controller(wsgi.Controller):
'application/xml': {
"attributes": {
"server": ["id", "imageId", "name", "flavorId", "hostId",
- "status", "progress", "progress"]}}}
+ "status", "progress"]}}}
- def __init__(self, db_driver=None):
- if not db_driver:
- db_driver = FLAGS.db_driver
- self.db_driver = utils.import_object(db_driver)
- self.network_manager = utils.import_object(FLAGS.network_manager)
- self.compute_manager = utils.import_object(FLAGS.compute_manager)
+ def __init__(self):
+ self.compute_api = compute_api.ComputeAPI()
super(Controller, self).__init__()
def index(self, req):
@@ -111,166 +89,80 @@ class Controller(wsgi.Controller):
entity_maker - either _entity_detail or _entity_inst
"""
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- instance_list = self.db_driver.instance_get_all_by_user(ctxt, user_id)
+ instance_list = self.compute_api.get_instances(
+ req.environ['nova.context'])
limited_list = nova.api.openstack.limited(instance_list, req)
res = [entity_maker(inst)['server'] for inst in limited_list]
return _entity_list(res)
def show(self, req, id):
""" Returns server details by server id """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- inst = self.db_driver.instance_get_by_internal_id(ctxt, int(id))
- if inst:
- if inst.user_id == user_id:
- return _entity_detail(inst)
- raise faults.Fault(exc.HTTPNotFound())
+ try:
+ instance = self.compute_api.get_instance(
+ req.environ['nova.context'], int(id))
+ return _entity_detail(instance)
+ except exception.NotFound:
+ return faults.Fault(exc.HTTPNotFound())
def delete(self, req, id):
""" Destroys a server """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- instance = self.db_driver.instance_get_by_internal_id(ctxt, int(id))
- if instance and instance['user_id'] == user_id:
- self.db_driver.instance_destroy(ctxt, id)
- return faults.Fault(exc.HTTPAccepted())
- return faults.Fault(exc.HTTPNotFound())
+ try:
+ self.compute_api.delete_instance(req.environ['nova.context'],
+ int(id))
+ except exception.NotFound:
+ return faults.Fault(exc.HTTPNotFound())
+ return exc.HTTPAccepted()
def create(self, req):
""" Creates a new server for a given user """
-
env = self._deserialize(req.body, req)
if not env:
return faults.Fault(exc.HTTPUnprocessableEntity())
- #try:
- inst = self._build_server_instance(req, env)
- #except Exception, e:
- # return faults.Fault(exc.HTTPUnprocessableEntity())
-
- user_id = req.environ['nova.context']['user']['id']
- rpc.cast(context.RequestContext(user_id, user_id),
- FLAGS.compute_topic,
- {"method": "run_instance",
- "args": {"instance_id": inst['id']}})
- return _entity_inst(inst)
+ key_pair = auth_manager.AuthManager.get_key_pairs(
+ req.environ['nova.context'])[0]
+ instances = self.compute_api.create_instances(
+ req.environ['nova.context'],
+ instance_types.get_by_flavor_id(env['server']['flavorId']),
+ env['server']['imageId'],
+ display_name=env['server']['name'],
+ description=env['server']['name'],
+ key_name=key_pair['name'],
+ key_data=key_pair['public_key'])
+ return _entity_inst(instances[0])
def update(self, req, id):
""" Updates the server name or password """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
-
inst_dict = self._deserialize(req.body, req)
-
if not inst_dict:
return faults.Fault(exc.HTTPUnprocessableEntity())
- instance = self.db_driver.instance_get_by_internal_id(ctxt, int(id))
- if not instance or instance.user_id != user_id:
- return faults.Fault(exc.HTTPNotFound())
+ update_dict = {}
+ if 'adminPass' in inst_dict['server']:
+ update_dict['admin_pass'] = inst_dict['server']['adminPass']
+ if 'name' in inst_dict['server']:
+ update_dict['display_name'] = inst_dict['server']['name']
- self.db_driver.instance_update(ctxt,
- int(id),
- _filter_params(inst_dict['server']))
- return faults.Fault(exc.HTTPNoContent())
+ try:
+ self.compute_api.update_instance(req.environ['nova.context'],
+ instance['id'],
+ **update_dict)
+ except exception.NotFound:
+ return faults.Fault(exc.HTTPNotFound())
+ return exc.HTTPNoContent()
def action(self, req, id):
- """ multi-purpose method used to reboot, rebuild, and
+ """ Multi-purpose method used to reboot, rebuild, and
resize a server """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
input_dict = self._deserialize(req.body, req)
try:
reboot_type = input_dict['reboot']['type']
except Exception:
- raise faults.Fault(webob.exc.HTTPNotImplemented())
- inst_ref = self.db.instance_get_by_internal_id(ctxt, int(id))
- if not inst_ref or (inst_ref and not inst_ref.user_id == user_id):
+ raise faults.Fault(exc.HTTPNotImplemented())
+ try:
+ # TODO(gundlach): pass reboot_type, support soft reboot in
+ # virt driver
+ self.compute_api.reboot(req.environ['nova.context'], id)
+ except:
return faults.Fault(exc.HTTPUnprocessableEntity())
- cloud.reboot(id)
-
- def _build_server_instance(self, req, env):
- """Build instance data structure and save it to the data store."""
- ltime = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
- inst = {}
-
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
-
- flavor_id = env['server']['flavorId']
-
- instance_type, flavor = [(k, v) for k, v in
- instance_types.INSTANCE_TYPES.iteritems()
- if v['flavorid'] == flavor_id][0]
-
- image_id = env['server']['imageId']
- img_service = utils.import_object(FLAGS.image_service)
-
- image = img_service.show(image_id)
-
- if not image:
- raise Exception("Image not found")
-
- inst['server_name'] = env['server']['name']
- inst['image_id'] = image_id
- inst['user_id'] = user_id
- inst['launch_time'] = ltime
- inst['mac_address'] = utils.generate_mac()
- inst['project_id'] = user_id
-
- inst['state_description'] = 'scheduling'
- inst['kernel_id'] = image.get('kernelId', FLAGS.default_kernel)
- inst['ramdisk_id'] = image.get('ramdiskId', FLAGS.default_ramdisk)
- inst['reservation_id'] = utils.generate_uid('r')
-
- inst['display_name'] = env['server']['name']
- inst['display_description'] = env['server']['name']
-
- #TODO(dietz) this may be ill advised
- key_pair_ref = self.db_driver.key_pair_get_all_by_user(
- None, user_id)[0]
-
- inst['key_data'] = key_pair_ref['public_key']
- inst['key_name'] = key_pair_ref['name']
-
- #TODO(dietz) stolen from ec2 api, see TODO there
- inst['security_group'] = 'default'
-
- # Flavor related attributes
- inst['instance_type'] = instance_type
- inst['memory_mb'] = flavor['memory_mb']
- inst['vcpus'] = flavor['vcpus']
- inst['local_gb'] = flavor['local_gb']
- inst['mac_address'] = utils.generate_mac()
- inst['launch_index'] = 0
-
- ref = self.compute_manager.create_instance(ctxt, **inst)
- inst['id'] = ref['internal_id']
-
- inst['hostname'] = str(ref['internal_id'])
- self.compute_manager.update_instance(ctxt, inst['id'], **inst)
-
- address = self.network_manager.allocate_fixed_ip(ctxt,
- inst['id'])
-
- # TODO(vish): This probably should be done in the scheduler
- # network is setup when host is assigned
- network_topic = self._get_network_topic(ctxt)
- rpc.call(ctxt,
- network_topic,
- {"method": "setup_fixed_ip",
- "args": {"address": address}})
- return inst
-
- def _get_network_topic(self, context):
- """Retrieves the network host for a project"""
- network_ref = self.network_manager.get_network(context)
- host = network_ref['host']
- if not host:
- host = rpc.call(context,
- FLAGS.network_topic,
- {"method": "set_network_host",
- "args": {"network_id": network_ref['id']}})
- return self.db_driver.queue_get_for(context, FLAGS.network_topic, host)
+ return exc.HTTPAccepted()
diff --git a/nova/auth/ldapdriver.py b/nova/auth/ldapdriver.py
index ceade1d65..c10939d74 100644
--- a/nova/auth/ldapdriver.py
+++ b/nova/auth/ldapdriver.py
@@ -40,6 +40,8 @@ flags.DEFINE_string('ldap_user_dn', 'cn=Manager,dc=example,dc=com',
flags.DEFINE_string('ldap_user_unit', 'Users', 'OID for Users')
flags.DEFINE_string('ldap_user_subtree', 'ou=Users,dc=example,dc=com',
'OU for Users')
+flags.DEFINE_boolean('ldap_user_modify_only', False,
+ 'Modify attributes for users instead of creating/deleting')
flags.DEFINE_string('ldap_project_subtree', 'ou=Groups,dc=example,dc=com',
'OU for Projects')
flags.DEFINE_string('role_project_subtree', 'ou=Groups,dc=example,dc=com',
@@ -89,8 +91,7 @@ class LdapDriver(object):
def get_user(self, uid):
"""Retrieve user by id"""
- attr = self.__find_object(self.__uid_to_dn(uid),
- '(objectclass=novaUser)')
+ attr = self.__get_ldap_user(uid)
return self.__to_user(attr)
def get_user_from_access_key(self, access):
@@ -110,7 +111,12 @@ class LdapDriver(object):
"""Retrieve list of users"""
attrs = self.__find_objects(FLAGS.ldap_user_subtree,
'(objectclass=novaUser)')
- return [self.__to_user(attr) for attr in attrs]
+ users = []
+ for attr in attrs:
+ user = self.__to_user(attr)
+ if user is not None:
+ users.append(user)
+ return users
def get_projects(self, uid=None):
"""Retrieve list of projects"""
@@ -125,21 +131,52 @@ class LdapDriver(object):
"""Create a user"""
if self.__user_exists(name):
raise exception.Duplicate("LDAP user %s already exists" % name)
- attr = [
- ('objectclass', ['person',
- 'organizationalPerson',
- 'inetOrgPerson',
- 'novaUser']),
- ('ou', [FLAGS.ldap_user_unit]),
- ('uid', [name]),
- ('sn', [name]),
- ('cn', [name]),
- ('secretKey', [secret_key]),
- ('accessKey', [access_key]),
- ('isAdmin', [str(is_admin).upper()]),
- ]
- self.conn.add_s(self.__uid_to_dn(name), attr)
- return self.__to_user(dict(attr))
+ if FLAGS.ldap_user_modify_only:
+ if self.__ldap_user_exists(name):
+ # Retrieve user by name
+ user = self.__get_ldap_user(name)
+ # Entry could be malformed, test for missing attrs.
+ # Malformed entries are useless, replace attributes found.
+ attr = []
+ if 'secretKey' in user.keys():
+ attr.append((self.ldap.MOD_REPLACE, 'secretKey', \
+ [secret_key]))
+ else:
+ attr.append((self.ldap.MOD_ADD, 'secretKey', \
+ [secret_key]))
+ if 'accessKey' in user.keys():
+ attr.append((self.ldap.MOD_REPLACE, 'accessKey', \
+ [access_key]))
+ else:
+ attr.append((self.ldap.MOD_ADD, 'accessKey', \
+ [access_key]))
+ if 'isAdmin' in user.keys():
+ attr.append((self.ldap.MOD_REPLACE, 'isAdmin', \
+ [str(is_admin).upper()]))
+ else:
+ attr.append((self.ldap.MOD_ADD, 'isAdmin', \
+ [str(is_admin).upper()]))
+ self.conn.modify_s(self.__uid_to_dn(name), attr)
+ return self.get_user(name)
+ else:
+ raise exception.NotFound("LDAP object for %s doesn't exist"
+ % name)
+ else:
+ attr = [
+ ('objectclass', ['person',
+ 'organizationalPerson',
+ 'inetOrgPerson',
+ 'novaUser']),
+ ('ou', [FLAGS.ldap_user_unit]),
+ ('uid', [name]),
+ ('sn', [name]),
+ ('cn', [name]),
+ ('secretKey', [secret_key]),
+ ('accessKey', [access_key]),
+ ('isAdmin', [str(is_admin).upper()]),
+ ]
+ self.conn.add_s(self.__uid_to_dn(name), attr)
+ return self.__to_user(dict(attr))
def create_project(self, name, manager_uid,
description=None, member_uids=None):
@@ -155,7 +192,7 @@ class LdapDriver(object):
if description is None:
description = name
members = []
- if member_uids != None:
+ if member_uids is not None:
for member_uid in member_uids:
if not self.__user_exists(member_uid):
raise exception.NotFound("Project can't be created "
@@ -256,7 +293,24 @@ class LdapDriver(object):
if not self.__user_exists(uid):
raise exception.NotFound("User %s doesn't exist" % uid)
self.__remove_from_all(uid)
- self.conn.delete_s(self.__uid_to_dn(uid))
+ if FLAGS.ldap_user_modify_only:
+ # Delete attributes
+ attr = []
+ # Retrieve user by name
+ user = self.__get_ldap_user(uid)
+ if 'secretKey' in user.keys():
+ attr.append((self.ldap.MOD_DELETE, 'secretKey', \
+ user['secretKey']))
+ if 'accessKey' in user.keys():
+ attr.append((self.ldap.MOD_DELETE, 'accessKey', \
+ user['accessKey']))
+ if 'isAdmin' in user.keys():
+ attr.append((self.ldap.MOD_DELETE, 'isAdmin', \
+ user['isAdmin']))
+ self.conn.modify_s(self.__uid_to_dn(uid), attr)
+ else:
+ # Delete entry
+ self.conn.delete_s(self.__uid_to_dn(uid))
def delete_project(self, project_id):
"""Delete a project"""
@@ -265,7 +319,7 @@ class LdapDriver(object):
self.__delete_group(project_dn)
def modify_user(self, uid, access_key=None, secret_key=None, admin=None):
- """Modify an existing project"""
+ """Modify an existing user"""
if not access_key and not secret_key and admin is None:
return
attr = []
@@ -279,11 +333,21 @@ class LdapDriver(object):
def __user_exists(self, uid):
"""Check if user exists"""
- return self.get_user(uid) != None
+ return self.get_user(uid) is not None
+
+ def __ldap_user_exists(self, uid):
+ """Check if the user exists in ldap"""
+ return self.__get_ldap_user(uid) is not None
def __project_exists(self, project_id):
"""Check if project exists"""
- return self.get_project(project_id) != None
+ return self.get_project(project_id) is not None
+
+ def __get_ldap_user(self, uid):
+ """Retrieve LDAP user entry by id"""
+ attr = self.__find_object(self.__uid_to_dn(uid),
+ '(objectclass=novaUser)')
+ return attr
def __find_object(self, dn, query=None, scope=None):
"""Find an object by dn and query"""
@@ -330,12 +394,12 @@ class LdapDriver(object):
def __group_exists(self, dn):
"""Check if group exists"""
- return self.__find_object(dn, '(objectclass=groupOfNames)') != None
+ return self.__find_object(dn, '(objectclass=groupOfNames)') is not None
@staticmethod
def __role_to_dn(role, project_id=None):
"""Convert role to corresponding dn"""
- if project_id == None:
+ if project_id is None:
return FLAGS.__getitem__("ldap_%s" % role).value
else:
return 'cn=%s,cn=%s,%s' % (role,
@@ -349,7 +413,7 @@ class LdapDriver(object):
raise exception.Duplicate("Group can't be created because "
"group %s already exists" % name)
members = []
- if member_uids != None:
+ if member_uids is not None:
for member_uid in member_uids:
if not self.__user_exists(member_uid):
raise exception.NotFound("Group can't be created "
@@ -375,7 +439,7 @@ class LdapDriver(object):
res = self.__find_object(group_dn,
'(member=%s)' % self.__uid_to_dn(uid),
self.ldap.SCOPE_BASE)
- return res != None
+ return res is not None
def __add_to_group(self, uid, group_dn):
"""Add user to group"""
@@ -447,18 +511,22 @@ class LdapDriver(object):
@staticmethod
def __to_user(attr):
"""Convert ldap attributes to User object"""
- if attr == None:
+ if attr is None:
+ return None
+ if ('accessKey' in attr.keys() and 'secretKey' in attr.keys() \
+ and 'isAdmin' in attr.keys()):
+ return {
+ 'id': attr['uid'][0],
+ 'name': attr['cn'][0],
+ 'access': attr['accessKey'][0],
+ 'secret': attr['secretKey'][0],
+ 'admin': (attr['isAdmin'][0] == 'TRUE')}
+ else:
return None
- return {
- 'id': attr['uid'][0],
- 'name': attr['cn'][0],
- 'access': attr['accessKey'][0],
- 'secret': attr['secretKey'][0],
- 'admin': (attr['isAdmin'][0] == 'TRUE')}
def __to_project(self, attr):
"""Convert ldap attributes to Project object"""
- if attr == None:
+ if attr is None:
return None
member_dns = attr.get('member', [])
return {
diff --git a/nova/auth/manager.py b/nova/auth/manager.py
index 735473027..74da8e045 100644
--- a/nova/auth/manager.py
+++ b/nova/auth/manager.py
@@ -621,6 +621,10 @@ class AuthManager(object):
with self.driver() as drv:
drv.modify_user(uid, access_key, secret_key, admin)
+ @staticmethod
+ def get_key_pairs(context):
+ return db.key_pair_get_all_by_user(context.elevated(), context.user_id)
+
def get_credentials(self, user, project=None, use_dmz=True):
"""Get credential zip for user in project"""
if not isinstance(user, User):
diff --git a/nova/auth/nova_openldap.schema b/nova/auth/nova_openldap.schema
new file mode 100644
index 000000000..4047361de
--- /dev/null
+++ b/nova/auth/nova_openldap.schema
@@ -0,0 +1,84 @@
+#
+# Person object for Nova
+# inetorgperson with extra attributes
+# Author: Vishvananda Ishaya <vishvananda@yahoo.com>
+#
+#
+
+# using internet experimental oid arc as per BP64 3.1
+objectidentifier novaSchema 1.3.6.1.3.1.666.666
+objectidentifier novaAttrs novaSchema:3
+objectidentifier novaOCs novaSchema:4
+
+attributetype (
+ novaAttrs:1
+ NAME 'accessKey'
+ DESC 'Key for accessing data'
+ EQUALITY caseIgnoreMatch
+ SUBSTR caseIgnoreSubstringsMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+ SINGLE-VALUE
+ )
+
+attributetype (
+ novaAttrs:2
+ NAME 'secretKey'
+ DESC 'Secret key'
+ EQUALITY caseIgnoreMatch
+ SUBSTR caseIgnoreSubstringsMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+ SINGLE-VALUE
+ )
+
+attributetype (
+ novaAttrs:3
+ NAME 'keyFingerprint'
+ DESC 'Fingerprint of private key'
+ EQUALITY caseIgnoreMatch
+ SUBSTR caseIgnoreSubstringsMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+ SINGLE-VALUE
+ )
+
+attributetype (
+ novaAttrs:4
+ NAME 'isAdmin'
+ DESC 'Is user an administrator?'
+ EQUALITY booleanMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
+ SINGLE-VALUE
+ )
+
+attributetype (
+ novaAttrs:5
+ NAME 'projectManager'
+ DESC 'Project Managers of a project'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.12
+ )
+
+objectClass (
+ novaOCs:1
+ NAME 'novaUser'
+ DESC 'access and secret keys'
+ AUXILIARY
+ MUST ( uid )
+ MAY ( accessKey $ secretKey $ isAdmin )
+ )
+
+objectClass (
+ novaOCs:2
+ NAME 'novaKeyPair'
+ DESC 'Key pair for User'
+ SUP top
+ STRUCTURAL
+ MUST ( cn $ sshPublicKey $ keyFingerprint )
+ )
+
+objectClass (
+ novaOCs:3
+ NAME 'novaProject'
+ DESC 'Container for project'
+ SUP groupOfNames
+ STRUCTURAL
+ MUST ( cn $ projectManager )
+ )
diff --git a/nova/auth/nova_sun.schema b/nova/auth/nova_sun.schema
new file mode 100644
index 000000000..e925e05e4
--- /dev/null
+++ b/nova/auth/nova_sun.schema
@@ -0,0 +1,16 @@
+#
+# Person object for Nova
+# inetorgperson with extra attributes
+# Author: Vishvananda Ishaya <vishvananda@yahoo.com>
+# Modified for strict RFC 4512 compatibility by: Ryan Lane <ryan@ryandlane.com>
+#
+# using internet experimental oid arc as per BP64 3.1
+dn: cn=schema
+attributeTypes: ( 1.3.6.1.3.1.666.666.3.1 NAME 'accessKey' DESC 'Key for accessing data' EQUALITY caseIgnoreMatch SUBSTR caseIgnoreSubstringsMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE )
+attributeTypes: ( 1.3.6.1.3.1.666.666.3.2 NAME 'secretKey' DESC 'Secret key' EQUALITY caseIgnoreMatch SUBSTR caseIgnoreSubstringsMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE )
+attributeTypes: ( 1.3.6.1.3.1.666.666.3.3 NAME 'keyFingerprint' DESC 'Fingerprint of private key' EQUALITY caseIgnoreMatch SUBSTR caseIgnoreSubstringsMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE)
+attributeTypes: ( 1.3.6.1.3.1.666.666.3.4 NAME 'isAdmin' DESC 'Is user an administrator?' EQUALITY booleanMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE )
+attributeTypes: ( 1.3.6.1.3.1.666.666.3.5 NAME 'projectManager' DESC 'Project Managers of a project' SYNTAX 1.3.6.1.4.1.1466.115.121.1.12 )
+objectClasses: ( 1.3.6.1.3.1.666.666.4.1 NAME 'novaUser' DESC 'access and secret keys' SUP top AUXILIARY MUST ( uid ) MAY ( accessKey $ secretKey $ isAdmin ) )
+objectClasses: ( 1.3.6.1.3.1.666.666.4.2 NAME 'novaKeyPair' DESC 'Key pair for User' SUP top STRUCTURAL MUST ( cn $ sshPublicKey $ keyFingerprint ) )
+objectClasses: ( 1.3.6.1.3.1.666.666.4.3 NAME 'novaProject' DESC 'Container for project' SUP groupOfNames STRUCTURAL MUST ( cn $ projectManager ) )
diff --git a/nova/auth/opendj.sh b/nova/auth/opendj.sh
new file mode 100755
index 000000000..8052c077d
--- /dev/null
+++ b/nova/auth/opendj.sh
@@ -0,0 +1,119 @@
+#!/usr/bin/env bash
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# LDAP INSTALL SCRIPT - IS IDEMPOTENT, does not scrub users
+
+apt-get install -y ldap-utils python-ldap openjdk-6-jre
+
+if [ ! -d "/usr/opendj" ]
+then
+ # TODO(rlane): Wikimedia Foundation is the current package maintainer.
+ # After the package is included in Ubuntu's channel, change this.
+ wget http://apt.wikimedia.org/wikimedia/pool/main/o/opendj/opendj_2.4.0-7_amd64.deb
+ dpkg -i opendj_2.4.0-7_amd64.deb
+fi
+
+abspath=`dirname "$(cd "${0%/*}" 2>/dev/null; echo "$PWD"/"${0##*/}")"`
+schemapath='/var/opendj/instance/config/schema'
+cp $abspath/openssh-lpk_sun.schema $schemapath/97-openssh-lpk_sun.ldif
+cp $abspath/nova_sun.schema $schemapath/98-nova_sun.ldif
+chown opendj:opendj $schemapath/97-openssh-lpk_sun.ldif
+chown opendj:opendj $schemapath/98-nova_sun.ldif
+
+cat >/etc/ldap/ldap.conf <<LDAP_CONF_EOF
+# LDAP Client Settings
+URI ldap://localhost
+BASE dc=example,dc=com
+BINDDN cn=Directory Manager
+SIZELIMIT 0
+TIMELIMIT 0
+LDAP_CONF_EOF
+
+cat >/etc/ldap/base.ldif <<BASE_LDIF_EOF
+# This is the root of the directory tree
+dn: dc=example,dc=com
+description: Example.Com, your trusted non-existent corporation.
+dc: example
+o: Example.Com
+objectClass: top
+objectClass: dcObject
+objectClass: organization
+
+# Subtree for users
+dn: ou=Users,dc=example,dc=com
+ou: Users
+description: Users
+objectClass: organizationalUnit
+
+# Subtree for groups
+dn: ou=Groups,dc=example,dc=com
+ou: Groups
+description: Groups
+objectClass: organizationalUnit
+
+# Subtree for system accounts
+dn: ou=System,dc=example,dc=com
+ou: System
+description: Special accounts used by software applications.
+objectClass: organizationalUnit
+
+# Special Account for Authentication:
+dn: uid=authenticate,ou=System,dc=example,dc=com
+uid: authenticate
+ou: System
+description: Special account for authenticating users
+userPassword: {MD5}TLnIqASP0CKUR3/LGkEZGg==
+objectClass: account
+objectClass: simpleSecurityObject
+
+# create the sysadmin entry
+
+dn: cn=developers,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: developers
+description: IT admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=sysadmins,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: sysadmins
+description: IT admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=netadmins,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: netadmins
+description: Network admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=cloudadmins,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: cloudadmins
+description: Cloud admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=itsec,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: itsec
+description: IT security users group
+member: uid=admin,ou=Users,dc=example,dc=com
+BASE_LDIF_EOF
+
+/etc/init.d/opendj stop
+su - opendj -c '/usr/opendj/setup -i -b "dc=example,dc=com" -l /etc/ldap/base.ldif -S -w changeme -O -n --noPropertiesFile'
+/etc/init.d/opendj start
diff --git a/nova/auth/openssh-lpk_openldap.schema b/nova/auth/openssh-lpk_openldap.schema
new file mode 100644
index 000000000..93351da6d
--- /dev/null
+++ b/nova/auth/openssh-lpk_openldap.schema
@@ -0,0 +1,19 @@
+#
+# LDAP Public Key Patch schema for use with openssh-ldappubkey
+# Author: Eric AUGE <eau@phear.org>
+#
+# Based on the proposal of : Mark Ruijter
+#
+
+
+# octetString SYNTAX
+attributetype ( 1.3.6.1.4.1.24552.500.1.1.1.13 NAME 'sshPublicKey'
+ DESC 'MANDATORY: OpenSSH Public key'
+ EQUALITY octetStringMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 )
+
+# printableString SYNTAX yes|no
+objectclass ( 1.3.6.1.4.1.24552.500.1.1.2.0 NAME 'ldapPublicKey' SUP top AUXILIARY
+ DESC 'MANDATORY: OpenSSH LPK objectclass'
+ MAY ( sshPublicKey $ uid )
+ )
diff --git a/nova/auth/openssh-lpk_sun.schema b/nova/auth/openssh-lpk_sun.schema
new file mode 100644
index 000000000..5f52db3b6
--- /dev/null
+++ b/nova/auth/openssh-lpk_sun.schema
@@ -0,0 +1,10 @@
+#
+# LDAP Public Key Patch schema for use with openssh-ldappubkey
+# Author: Eric AUGE <eau@phear.org>
+#
+# Schema for Sun Directory Server.
+# Based on the original schema, modified by Stefan Fischer.
+#
+dn: cn=schema
+attributeTypes: ( 1.3.6.1.4.1.24552.500.1.1.1.13 NAME 'sshPublicKey' DESC 'MANDATORY: OpenSSH Public key' EQUALITY octetStringMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 )
+objectClasses: ( 1.3.6.1.4.1.24552.500.1.1.2.0 NAME 'ldapPublicKey' SUP top AUXILIARY DESC 'MANDATORY: OpenSSH LPK objectclass' MAY ( sshPublicKey $ uid ) )
diff --git a/nova/auth/slap.sh b/nova/auth/slap.sh
index fdc0e39dc..797675d2e 100755
--- a/nova/auth/slap.sh
+++ b/nova/auth/slap.sh
@@ -20,115 +20,9 @@
apt-get install -y slapd ldap-utils python-ldap
-cat >/etc/ldap/schema/openssh-lpk_openldap.schema <<LPK_SCHEMA_EOF
-#
-# LDAP Public Key Patch schema for use with openssh-ldappubkey
-# Author: Eric AUGE <eau@phear.org>
-#
-# Based on the proposal of : Mark Ruijter
-#
-
-
-# octetString SYNTAX
-attributetype ( 1.3.6.1.4.1.24552.500.1.1.1.13 NAME 'sshPublicKey'
- DESC 'MANDATORY: OpenSSH Public key'
- EQUALITY octetStringMatch
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 )
-
-# printableString SYNTAX yes|no
-objectclass ( 1.3.6.1.4.1.24552.500.1.1.2.0 NAME 'ldapPublicKey' SUP top AUXILIARY
- DESC 'MANDATORY: OpenSSH LPK objectclass'
- MAY ( sshPublicKey $ uid )
- )
-LPK_SCHEMA_EOF
-
-cat >/etc/ldap/schema/nova.schema <<NOVA_SCHEMA_EOF
-#
-# Person object for Nova
-# inetorgperson with extra attributes
-# Author: Vishvananda Ishaya <vishvananda@yahoo.com>
-#
-#
-
-# using internet experimental oid arc as per BP64 3.1
-objectidentifier novaSchema 1.3.6.1.3.1.666.666
-objectidentifier novaAttrs novaSchema:3
-objectidentifier novaOCs novaSchema:4
-
-attributetype (
- novaAttrs:1
- NAME 'accessKey'
- DESC 'Key for accessing data'
- EQUALITY caseIgnoreMatch
- SUBSTR caseIgnoreSubstringsMatch
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
- SINGLE-VALUE
- )
-
-attributetype (
- novaAttrs:2
- NAME 'secretKey'
- DESC 'Secret key'
- EQUALITY caseIgnoreMatch
- SUBSTR caseIgnoreSubstringsMatch
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
- SINGLE-VALUE
- )
-
-attributetype (
- novaAttrs:3
- NAME 'keyFingerprint'
- DESC 'Fingerprint of private key'
- EQUALITY caseIgnoreMatch
- SUBSTR caseIgnoreSubstringsMatch
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
- SINGLE-VALUE
- )
-
-attributetype (
- novaAttrs:4
- NAME 'isAdmin'
- DESC 'Is user an administrator?'
- EQUALITY booleanMatch
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
- SINGLE-VALUE
- )
-
-attributetype (
- novaAttrs:5
- NAME 'projectManager'
- DESC 'Project Managers of a project'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.12
- )
-
-objectClass (
- novaOCs:1
- NAME 'novaUser'
- DESC 'access and secret keys'
- AUXILIARY
- MUST ( uid )
- MAY ( accessKey $ secretKey $ isAdmin )
- )
-
-objectClass (
- novaOCs:2
- NAME 'novaKeyPair'
- DESC 'Key pair for User'
- SUP top
- STRUCTURAL
- MUST ( cn $ sshPublicKey $ keyFingerprint )
- )
-
-objectClass (
- novaOCs:3
- NAME 'novaProject'
- DESC 'Container for project'
- SUP groupOfNames
- STRUCTURAL
- MUST ( cn $ projectManager )
- )
-
-NOVA_SCHEMA_EOF
+abspath=`dirname "$(cd "${0%/*}" 2>/dev/null; echo "$PWD"/"${0##*/}")"`
+cp $abspath/openssh-lpk_openldap.schema /etc/ldap/schema/openssh-lpk_openldap.schema
+cp $abspath/nova_openldap.schema /etc/ldap/schema/nova_openldap.schema
mv /etc/ldap/slapd.conf /etc/ldap/slapd.conf.orig
cat >/etc/ldap/slapd.conf <<SLAPD_CONF_EOF
diff --git a/nova/compute/api.py b/nova/compute/api.py
new file mode 100644
index 000000000..8e0efa4cc
--- /dev/null
+++ b/nova/compute/api.py
@@ -0,0 +1,305 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Handles all API requests relating to instances (guest vms).
+"""
+
+import datetime
+import logging
+import time
+
+from nova import db
+from nova import exception
+from nova import flags
+from nova import quota
+from nova import rpc
+from nova import utils
+from nova.compute import instance_types
+from nova.db import base
+
+FLAGS = flags.FLAGS
+
+
+def generate_default_hostname(internal_id):
+ """Default function to generate a hostname given an instance reference."""
+ return str(internal_id)
+
+
+class ComputeAPI(base.Base):
+ """API for interacting with the compute manager."""
+
+ def __init__(self, network_manager=None, image_service=None, **kwargs):
+ if not network_manager:
+ network_manager = utils.import_object(FLAGS.network_manager)
+ self.network_manager = network_manager
+ if not image_service:
+ image_service = utils.import_object(FLAGS.image_service)
+ self.image_service = image_service
+ super(ComputeAPI, self).__init__(**kwargs)
+
+ def create_instances(self, context, instance_type, image_id, min_count=1,
+ max_count=1, kernel_id=None, ramdisk_id=None,
+ display_name='', description='', key_name=None,
+ key_data=None, security_group='default',
+ generate_hostname=generate_default_hostname):
+ """Create the number of instances requested if quote and
+ other arguments check out ok."""
+
+ num_instances = quota.allowed_instances(context, max_count,
+ instance_type)
+ if num_instances < min_count:
+ logging.warn("Quota exceeeded for %s, tried to run %s instances",
+ context.project_id, min_count)
+ raise quota.QuotaError("Instance quota exceeded. You can only "
+ "run %s more instances of this type." %
+ num_instances, "InstanceLimitExceeded")
+
+ is_vpn = image_id == FLAGS.vpn_image_id
+ if not is_vpn:
+ image = self.image_service.show(context, image_id)
+ if kernel_id is None:
+ kernel_id = image.get('kernelId', FLAGS.default_kernel)
+ if ramdisk_id is None:
+ ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
+
+ # Make sure we have access to kernel and ramdisk
+ self.image_service.show(context, kernel_id)
+ self.image_service.show(context, ramdisk_id)
+
+ if security_group is None:
+ security_group = ['default']
+ if not type(security_group) is list:
+ security_group = [security_group]
+
+ security_groups = []
+ self.ensure_default_security_group(context)
+ for security_group_name in security_group:
+ group = db.security_group_get_by_name(context,
+ context.project_id,
+ security_group_name)
+ security_groups.append(group['id'])
+
+ if key_data is None and key_name:
+ key_pair = db.key_pair_get(context, context.user_id, key_name)
+ key_data = key_pair['public_key']
+
+ type_data = instance_types.INSTANCE_TYPES[instance_type]
+ base_options = {
+ 'reservation_id': utils.generate_uid('r'),
+ 'image_id': image_id,
+ 'kernel_id': kernel_id,
+ 'ramdisk_id': ramdisk_id,
+ 'state_description': 'scheduling',
+ 'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
+ 'instance_type': instance_type,
+ 'memory_mb': type_data['memory_mb'],
+ 'vcpus': type_data['vcpus'],
+ 'local_gb': type_data['local_gb'],
+ 'display_name': display_name,
+ 'display_description': description,
+ 'key_name': key_name,
+ 'key_data': key_data}
+
+ elevated = context.elevated()
+ instances = []
+ logging.debug("Going to run %s instances...", num_instances)
+ for num in range(num_instances):
+ instance = dict(mac_address=utils.generate_mac(),
+ launch_index=num,
+ **base_options)
+ instance = self.db.instance_create(context, instance)
+ instance_id = instance['id']
+ internal_id = instance['internal_id']
+
+ elevated = context.elevated()
+ if not security_groups:
+ security_groups = []
+ for security_group_id in security_groups:
+ self.db.instance_add_security_group(elevated,
+ instance_id,
+ security_group_id)
+
+ # Set sane defaults if not specified
+ updates = dict(hostname=generate_hostname(internal_id))
+ if 'display_name' not in instance:
+ updates['display_name'] = "Server %s" % internal_id
+
+ instance = self.update_instance(context, instance_id, **updates)
+ instances.append(instance)
+
+ # TODO(vish): This probably should be done in the scheduler
+ # or in compute as a call. The network should be
+ # allocated after the host is assigned and setup
+ # can happen at the same time.
+ address = self.network_manager.allocate_fixed_ip(context,
+ instance_id,
+ is_vpn)
+ rpc.cast(elevated,
+ self._get_network_topic(context),
+ {"method": "setup_fixed_ip",
+ "args": {"address": address}})
+
+ logging.debug("Casting to scheduler for %s/%s's instance %s",
+ context.project_id, context.user_id, instance_id)
+ rpc.cast(context,
+ FLAGS.scheduler_topic,
+ {"method": "run_instance",
+ "args": {"topic": FLAGS.compute_topic,
+ "instance_id": instance_id}})
+
+ return instances
+
+ def ensure_default_security_group(self, context):
+ """ Create security group for the security context if it
+ does not already exist
+
+ :param context: the security context
+
+ """
+ try:
+ db.security_group_get_by_name(context, context.project_id,
+ 'default')
+ except exception.NotFound:
+ values = {'name': 'default',
+ 'description': 'default',
+ 'user_id': context.user_id,
+ 'project_id': context.project_id}
+ db.security_group_create(context, values)
+
+ def update_instance(self, context, instance_id, **kwargs):
+ """Updates the instance in the datastore.
+
+ :param context: The security context
+ :param instance_id: ID of the instance to update
+ :param kwargs: All additional keyword args are treated
+ as data fields of the instance to be
+ updated
+
+ :retval None
+
+ """
+ return self.db.instance_update(context, instance_id, kwargs)
+
+ def delete_instance(self, context, instance_id):
+ logging.debug("Going to try and terminate %d" % instance_id)
+ try:
+ instance = self.db.instance_get_by_internal_id(context,
+ instance_id)
+ except exception.NotFound as e:
+ logging.warning("Instance %d was not found during terminate",
+ instance_id)
+ raise e
+
+ if (instance['state_description'] == 'terminating'):
+ logging.warning("Instance %d is already being terminated",
+ instance_id)
+ return
+
+ self.update_instance(context,
+ instance['id'],
+ state_description='terminating',
+ state=0,
+ terminated_at=datetime.datetime.utcnow())
+
+ # FIXME(ja): where should network deallocate occur?
+ address = self.db.instance_get_floating_address(context,
+ instance['id'])
+ if address:
+ logging.debug("Disassociating address %s" % address)
+ # NOTE(vish): Right now we don't really care if the ip is
+ # disassociated. We may need to worry about
+ # checking this later. Perhaps in the scheduler?
+ rpc.cast(context,
+ self._get_network_topic(context),
+ {"method": "disassociate_floating_ip",
+ "args": {"floating_address": address}})
+
+ address = self.db.instance_get_fixed_address(context, instance['id'])
+ if address:
+ logging.debug("Deallocating address %s" % address)
+ # NOTE(vish): Currently, nothing needs to be done on the
+ # network node until release. If this changes,
+ # we will need to cast here.
+ self.network_manager.deallocate_fixed_ip(context.elevated(),
+ address)
+
+ host = instance['host']
+ if host:
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "terminate_instance",
+ "args": {"instance_id": instance['id']}})
+ else:
+ self.db.instance_destroy(context, instance['id'])
+
+ def get_instances(self, context, project_id=None):
+ """Get all instances, possibly filtered by project ID or
+ user ID. If there is no filter and the context is an admin,
+ it will retreive all instances in the system."""
+ if project_id or not context.is_admin:
+ if not context.project:
+ return self.db.instance_get_all_by_user(context,
+ context.user_id)
+ if project_id is None:
+ project_id = context.project_id
+ return self.db.instance_get_all_by_project(context, project_id)
+ return self.db.instance_get_all(context)
+
+ def get_instance(self, context, instance_id):
+ return self.db.instance_get_by_internal_id(context, instance_id)
+
+ def reboot(self, context, instance_id):
+ """Reboot the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "reboot_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def rescue(self, context, instance_id):
+ """Rescue the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "rescue_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def unrescue(self, context, instance_id):
+ """Unrescue the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "unrescue_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def _get_network_topic(self, context):
+ """Retrieves the network host for a project"""
+ network_ref = self.network_manager.get_network(context)
+ host = network_ref['host']
+ if not host:
+ host = rpc.call(context,
+ FLAGS.network_topic,
+ {"method": "set_network_host",
+ "args": {"network_id": network_ref['id']}})
+ return self.db.queue_get_for(context, FLAGS.network_topic, host)
diff --git a/nova/compute/disk.py b/nova/compute/disk.py
index 4338d39f0..675cd0259 100644
--- a/nova/compute/disk.py
+++ b/nova/compute/disk.py
@@ -26,8 +26,6 @@ import logging
import os
import tempfile
-from twisted.internet import defer
-
from nova import exception
from nova import flags
@@ -39,7 +37,6 @@ flags.DEFINE_integer('block_size', 1024 * 1024 * 256,
'block_size to use for dd')
-@defer.inlineCallbacks
def partition(infile, outfile, local_bytes=0, resize=True,
local_type='ext2', execute=None):
"""
@@ -64,10 +61,10 @@ def partition(infile, outfile, local_bytes=0, resize=True,
file_size = os.path.getsize(infile)
if resize and file_size < FLAGS.minimum_root_size:
last_sector = FLAGS.minimum_root_size / sector_size - 1
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (infile, last_sector, sector_size))
- yield execute('e2fsck -fp %s' % infile, check_exit_code=False)
- yield execute('resize2fs %s' % infile)
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (infile, last_sector, sector_size))
+ execute('e2fsck -fp %s' % infile, check_exit_code=False)
+ execute('resize2fs %s' % infile)
file_size = FLAGS.minimum_root_size
elif file_size % sector_size != 0:
logging.warn("Input partition size not evenly divisible by"
@@ -86,30 +83,29 @@ def partition(infile, outfile, local_bytes=0, resize=True,
last_sector = local_last # e
# create an empty file
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, mbr_last, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, mbr_last, sector_size))
# make mbr partition
- yield execute('parted --script %s mklabel msdos' % outfile)
+ execute('parted --script %s mklabel msdos' % outfile)
# append primary file
- yield execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
- % (infile, outfile, FLAGS.block_size))
+ execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
+ % (infile, outfile, FLAGS.block_size))
# make primary partition
- yield execute('parted --script %s mkpart primary %ds %ds'
- % (outfile, primary_first, primary_last))
+ execute('parted --script %s mkpart primary %ds %ds'
+ % (outfile, primary_first, primary_last))
if local_bytes > 0:
# make the file bigger
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, last_sector, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, last_sector, sector_size))
# make and format local partition
- yield execute('parted --script %s mkpartfs primary %s %ds %ds'
- % (outfile, local_type, local_first, local_last))
+ execute('parted --script %s mkpartfs primary %s %ds %ds'
+ % (outfile, local_type, local_first, local_last))
-@defer.inlineCallbacks
def inject_data(image, key=None, net=None, partition=None, execute=None):
"""Injects a ssh key and optionally net data into a disk image.
@@ -119,26 +115,26 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
If partition is not specified it mounts the image as a single partition.
"""
- out, err = yield execute('sudo losetup -f --show %s' % image)
+ out, err = execute('sudo losetup -f --show %s' % image)
if err:
raise exception.Error('Could not attach image to loopback: %s' % err)
device = out.strip()
try:
if not partition is None:
# create partition
- out, err = yield execute('sudo kpartx -a %s' % device)
+ out, err = execute('sudo kpartx -a %s' % device)
if err:
raise exception.Error('Failed to load partition: %s' % err)
mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1],
partition)
else:
mapped_device = device
- out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
+ out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
tmpdir = tempfile.mkdtemp()
try:
# mount loopback to dir
- out, err = yield execute(
+ out, err = execute(
'sudo mount %s %s' % (mapped_device, tmpdir))
if err:
raise exception.Error('Failed to mount filesystem: %s' % err)
@@ -146,24 +142,23 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
try:
if key:
# inject key file
- yield _inject_key_into_fs(key, tmpdir, execute=execute)
+ _inject_key_into_fs(key, tmpdir, execute=execute)
if net:
- yield _inject_net_into_fs(net, tmpdir, execute=execute)
+ _inject_net_into_fs(net, tmpdir, execute=execute)
finally:
# unmount device
- yield execute('sudo umount %s' % mapped_device)
+ execute('sudo umount %s' % mapped_device)
finally:
# remove temporary directory
- yield execute('rmdir %s' % tmpdir)
+ execute('rmdir %s' % tmpdir)
if not partition is None:
# remove partitions
- yield execute('sudo kpartx -d %s' % device)
+ execute('sudo kpartx -d %s' % device)
finally:
# remove loopback
- yield execute('sudo losetup -d %s' % device)
+ execute('sudo losetup -d %s' % device)
-@defer.inlineCallbacks
def _inject_key_into_fs(key, fs, execute=None):
"""Add the given public ssh key to root's authorized_keys.
@@ -171,22 +166,21 @@ def _inject_key_into_fs(key, fs, execute=None):
fs is the path to the base of the filesystem into which to inject the key.
"""
sshdir = os.path.join(os.path.join(fs, 'root'), '.ssh')
- yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
- yield execute('sudo chown root %s' % sshdir)
- yield execute('sudo chmod 700 %s' % sshdir)
+ execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
+ execute('sudo chown root %s' % sshdir)
+ execute('sudo chmod 700 %s' % sshdir)
keyfile = os.path.join(sshdir, 'authorized_keys')
- yield execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
+ execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
-@defer.inlineCallbacks
def _inject_net_into_fs(net, fs, execute=None):
"""Inject /etc/network/interfaces into the filesystem rooted at fs.
net is the contents of /etc/network/interfaces.
"""
netdir = os.path.join(os.path.join(fs, 'etc'), 'network')
- yield execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
- yield execute('sudo chown root:root %s' % netdir)
- yield execute('sudo chmod 755 %s' % netdir)
+ execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
+ execute('sudo chown root:root %s' % netdir)
+ execute('sudo chmod 755 %s' % netdir)
netfile = os.path.join(netdir, 'interfaces')
- yield execute('sudo tee %s' % netfile, net)
+ execute('sudo tee %s' % netfile, net)
diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py
index 67ee8f8a8..6e47170bd 100644
--- a/nova/compute/instance_types.py
+++ b/nova/compute/instance_types.py
@@ -21,9 +21,29 @@
The built-in instance properties.
"""
+from nova import flags
+from nova import exception
+
+FLAGS = flags.FLAGS
INSTANCE_TYPES = {
'm1.tiny': dict(memory_mb=512, vcpus=1, local_gb=0, flavorid=1),
'm1.small': dict(memory_mb=2048, vcpus=1, local_gb=20, flavorid=2),
'm1.medium': dict(memory_mb=4096, vcpus=2, local_gb=40, flavorid=3),
'm1.large': dict(memory_mb=8192, vcpus=4, local_gb=80, flavorid=4),
'm1.xlarge': dict(memory_mb=16384, vcpus=8, local_gb=160, flavorid=5)}
+
+
+def get_by_type(instance_type):
+ """Build instance data structure and save it to the data store."""
+ if instance_type is None:
+ return FLAGS.default_instance_type
+ if instance_type not in INSTANCE_TYPES:
+ raise exception.ApiError("Unknown instance type: %s" % instance_type)
+ return instance_type
+
+
+def get_by_flavor_id(flavor_id):
+ for instance_type, details in INSTANCE_TYPES.iteritems():
+ if details['flavorid'] == flavor_id:
+ return instance_type
+ return FLAGS.default_instance_type
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 50a9d316b..7eb60e262 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -22,8 +22,8 @@ Handles all processes relating to instances (guest vms).
The :py:class:`ComputeManager` class is a :py:class:`nova.manager.Manager` that
handles RPC calls relating to creating instances. It is responsible for
building a disk image, launching it via the underlying virtualization driver,
-responding to calls to check it state, attaching persistent as well as
-termination.
+responding to calls to check its state, attaching persistent storage, and
+terminating it.
**Related Flags**
@@ -37,23 +37,21 @@ termination.
import datetime
import logging
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import manager
from nova import utils
from nova.compute import power_state
-
FLAGS = flags.FLAGS
flags.DEFINE_string('instances_path', '$state_path/instances',
'where instances are stored on disk')
flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
- 'Driver to use for volume creation')
+ 'Driver to use for controlling virtualization')
class ComputeManager(manager.Manager):
+
"""Manages the running instances from creation to destruction."""
def __init__(self, compute_driver=None, *args, **kwargs):
@@ -78,54 +76,11 @@ class ComputeManager(manager.Manager):
state = power_state.NOSTATE
self.db.instance_set_state(context, instance_id, state)
- @defer.inlineCallbacks
@exception.wrap_exception
def refresh_security_group(self, context, security_group_id, **_kwargs):
"""This call passes stright through to the virtualization driver."""
- yield self.driver.refresh_security_group(security_group_id)
-
- def create_instance(self, context, security_groups=None, **kwargs):
- """Creates the instance in the datastore and returns the
- new instance as a mapping
-
- :param context: The security context
- :param security_groups: list of security group ids to
- attach to the instance
- :param kwargs: All additional keyword args are treated
- as data fields of the instance to be
- created
-
- :retval Returns a mapping of the instance information
- that has just been created
-
- """
- instance_ref = self.db.instance_create(context, kwargs)
- inst_id = instance_ref['id']
-
- elevated = context.elevated()
- if not security_groups:
- security_groups = []
- for security_group_id in security_groups:
- self.db.instance_add_security_group(elevated,
- inst_id,
- security_group_id)
- return instance_ref
-
- def update_instance(self, context, instance_id, **kwargs):
- """Updates the instance in the datastore.
-
- :param context: The security context
- :param instance_id: ID of the instance to update
- :param kwargs: All additional keyword args are treated
- as data fields of the instance to be
- updated
-
- :retval None
-
- """
- self.db.instance_update(context, instance_id, kwargs)
-
- @defer.inlineCallbacks
+ self.driver.refresh_security_group(security_group_id)
+
@exception.wrap_exception
def run_instance(self, context, instance_id, **_kwargs):
"""Launch a new instance with specified options."""
@@ -134,7 +89,6 @@ class ComputeManager(manager.Manager):
if instance_ref['name'] in self.driver.list_instances():
raise exception.Error("Instance has already been created")
logging.debug("instance %s: starting...", instance_id)
- project_id = instance_ref['project_id']
self.network_manager.setup_compute_network(context, instance_id)
self.db.instance_update(context,
instance_id,
@@ -147,7 +101,7 @@ class ComputeManager(manager.Manager):
'spawning')
try:
- yield self.driver.spawn(instance_ref)
+ self.driver.spawn(instance_ref)
now = datetime.datetime.utcnow()
self.db.instance_update(context,
instance_id,
@@ -161,7 +115,6 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
@@ -176,13 +129,11 @@ class ComputeManager(manager.Manager):
self.db.instance_destroy(context, instance_id)
raise exception.Error('trying to destroy already destroyed'
' instance: %s' % instance_id)
-
- yield self.driver.destroy(instance_ref)
+ self.driver.destroy(instance_ref)
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
@@ -202,10 +153,9 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'rebooting')
- yield self.driver.reboot(instance_ref)
+ self.driver.reboot(instance_ref)
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def rescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
@@ -218,10 +168,9 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'rescuing')
- yield self.driver.rescue(instance_ref)
+ self.driver.rescue(instance_ref)
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def unrescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
@@ -234,7 +183,7 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'unrescuing')
- yield self.driver.unrescue(instance_ref)
+ self.driver.unrescue(instance_ref)
self._update_state(context, instance_id)
@exception.wrap_exception
@@ -246,7 +195,6 @@ class ComputeManager(manager.Manager):
return self.driver.get_console_output(instance_ref)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
@@ -254,12 +202,12 @@ class ComputeManager(manager.Manager):
logging.debug("instance %s: attaching volume %s to %s", instance_id,
volume_id, mountpoint)
instance_ref = self.db.instance_get(context, instance_id)
- dev_path = yield self.volume_manager.setup_compute_volume(context,
- volume_id)
+ dev_path = self.volume_manager.setup_compute_volume(context,
+ volume_id)
try:
- yield self.driver.attach_volume(instance_ref['name'],
- dev_path,
- mountpoint)
+ self.driver.attach_volume(instance_ref['name'],
+ dev_path,
+ mountpoint)
self.db.volume_attached(context,
volume_id,
instance_id,
@@ -270,12 +218,12 @@ class ComputeManager(manager.Manager):
# ecxception below.
logging.exception("instance %s: attach failed %s, removing",
instance_id, mountpoint)
- yield self.volume_manager.remove_compute_volume(context,
- volume_id)
+ self.volume_manager.remove_compute_volume(context,
+ volume_id)
raise exc
- defer.returnValue(True)
- @defer.inlineCallbacks
+ return True
+
@exception.wrap_exception
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
@@ -289,8 +237,8 @@ class ComputeManager(manager.Manager):
logging.warn("Detaching volume from unknown instance %s",
instance_ref['name'])
else:
- yield self.driver.detach_volume(instance_ref['name'],
- volume_ref['mountpoint'])
- yield self.volume_manager.remove_compute_volume(context, volume_id)
+ self.driver.detach_volume(instance_ref['name'],
+ volume_ref['mountpoint'])
+ self.volume_manager.remove_compute_volume(context, volume_id)
self.db.volume_detached(context, volume_id)
- defer.returnValue(True)
+ return True
diff --git a/nova/tests/validator_unittest.py b/nova/db/base.py
index b5f1c0667..1d1e80866 100644
--- a/nova/tests/validator_unittest.py
+++ b/nova/db/base.py
@@ -16,27 +16,21 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
-import unittest
+"""
+Base class for classes that need modular database access.
+"""
+from nova import utils
from nova import flags
-from nova import test
-from nova import validate
+FLAGS = flags.FLAGS
+flags.DEFINE_string('db_driver', 'nova.db.api',
+ 'driver to use for database access')
-class ValidationTestCase(test.TrialTestCase):
- def setUp(self):
- super(ValidationTestCase, self).setUp()
- def tearDown(self):
- super(ValidationTestCase, self).tearDown()
-
- def test_type_validation(self):
- self.assertTrue(type_case("foo", 5, 1))
- self.assertRaises(TypeError, type_case, "bar", "5", 1)
- self.assertRaises(TypeError, type_case, None, 5, 1)
-
-
-@validate.typetest(instanceid=str, size=int, number_of_instances=int)
-def type_case(instanceid, size, number_of_instances):
- return True
+class Base(object):
+ """DB driver is injected in the init method"""
+ def __init__(self, db_driver=None):
+ if not db_driver:
+ db_driver = FLAGS.db_driver
+ self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 0a98acf7a..6ecd824e1 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -608,6 +608,12 @@ def fixed_ip_update(context, address, values):
#functions between the two of them as well.
@require_context
def instance_create(context, values):
+ """Create a new Instance record in the database.
+
+ context - request context object
+ values - dict containing column values.
+ 'internal_id' is auto-generated and should not be specified.
+ """
instance_ref = models.Instance()
instance_ref.update(values)
@@ -615,7 +621,7 @@ def instance_create(context, values):
with session.begin():
while instance_ref.internal_id == None:
# Instances have integer internal ids.
- internal_id = random.randint(0, 2 ** 32 - 1)
+ internal_id = random.randint(0, 2 ** 31 - 1)
if not instance_internal_id_exists(context, internal_id,
session=session):
instance_ref.internal_id = internal_id
@@ -816,6 +822,7 @@ def instance_update(context, instance_id, values):
instance_ref = instance_get(context, instance_id, session=session)
instance_ref.update(values)
instance_ref.save(session=session)
+ return instance_ref
def instance_add_security_group(context, instance_id, security_group_id):
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index f8fc7c6c1..d754db90d 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -188,8 +188,6 @@ class Instance(BASE, NovaBase):
kernel_id = Column(String(255))
ramdisk_id = Column(String(255))
- server_name = Column(String(255))
-
# image_id = Column(Integer, ForeignKey('images.id'), nullable=True)
# kernel_id = Column(Integer, ForeignKey('images.id'), nullable=True)
# ramdisk_id = Column(Integer, ForeignKey('images.id'), nullable=True)
@@ -222,6 +220,7 @@ class Instance(BASE, NovaBase):
launched_at = Column(DateTime)
terminated_at = Column(DateTime)
+ # User editable field for display in user-facing UIs
display_name = Column(String(255))
display_description = Column(String(255))
diff --git a/nova/flags.py b/nova/flags.py
index 6ef94d36c..9e99ffb5e 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -159,6 +159,7 @@ class StrWrapper(object):
return str(val)
raise KeyError(name)
+
FLAGS = FlagValues()
gflags.FLAGS = FLAGS
gflags.DEFINE_flag(gflags.HelpFlag(), FLAGS)
@@ -183,6 +184,12 @@ DEFINE_list = _wrapper(gflags.DEFINE_list)
DEFINE_spaceseplist = _wrapper(gflags.DEFINE_spaceseplist)
DEFINE_multistring = _wrapper(gflags.DEFINE_multistring)
DEFINE_multi_int = _wrapper(gflags.DEFINE_multi_int)
+DEFINE_flag = _wrapper(gflags.DEFINE_flag)
+
+
+HelpFlag = gflags.HelpFlag
+HelpshortFlag = gflags.HelpshortFlag
+HelpXMLFlag = gflags.HelpXMLFlag
def DECLARE(name, module_string, flag_values=FLAGS):
@@ -224,7 +231,6 @@ DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval')
DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts')
DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
-DEFINE_string('cc_host', '127.0.0.1', 'ip of api server (for infrastructure')
DEFINE_string('cc_dmz', '127.0.0.1', 'ip of api server (for instances)')
DEFINE_integer('cc_port', 8773, 'cloud controller port')
DEFINE_string('ec2_url', 'http://127.0.0.1:8773/services/Cloud',
@@ -263,7 +269,7 @@ DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
'Manager for scheduler')
# The service to use for image search and retrieval
-DEFINE_string('image_service', 'nova.image.local.LocalImageService',
+DEFINE_string('image_service', 'nova.image.s3.S3ImageService',
'The service to use for retrieving and searching for images.')
DEFINE_string('host', socket.gethostname(),
diff --git a/nova/image/local.py b/nova/image/local.py
index 9b0cdcc50..b44593221 100644
--- a/nova/image/local.py
+++ b/nova/image/local.py
@@ -59,7 +59,7 @@ class LocalImageService(service.BaseImageService):
"""
Store the image data and return the new image id.
"""
- id = random.randint(0, 2 ** 32 - 1)
+ id = random.randint(0, 2 ** 31 - 1)
data['id'] = id
self.update(context, id, data)
return id
diff --git a/nova/manager.py b/nova/manager.py
index a6efb8732..3d38504bd 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -53,28 +53,22 @@ This module provides Manager, a base class for managers.
from nova import utils
from nova import flags
+from nova.db import base
-from twisted.internet import defer
FLAGS = flags.FLAGS
-flags.DEFINE_string('db_driver', 'nova.db.api',
- 'driver to use for volume creation')
-class Manager(object):
- """DB driver is injected in the init method"""
+class Manager(base.Base):
def __init__(self, host=None, db_driver=None):
if not host:
host = FLAGS.host
self.host = host
- if not db_driver:
- db_driver = FLAGS.db_driver
- self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103
+ super(Manager, self).__init__(db_driver)
- @defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval"""
- yield
+ pass
def init_host(self):
"""Do any initialization that needs to be run if this is a standalone
diff --git a/nova/network/linux_net.py b/nova/network/linux_net.py
index 7b00e65d4..cb9f01a36 100644
--- a/nova/network/linux_net.py
+++ b/nova/network/linux_net.py
@@ -46,6 +46,7 @@ flags.DEFINE_string('vlan_interface', 'eth0',
'network device for vlans')
flags.DEFINE_string('dhcpbridge', _bin_file('nova-dhcpbridge'),
'location of nova-dhcpbridge')
+flags.DEFINE_string('cc_host', utils.get_my_ip(), 'ip of api server')
flags.DEFINE_string('routing_source_ip', '127.0.0.1',
'Public IP of network host')
flags.DEFINE_bool('use_nova_chains', False,
diff --git a/nova/network/manager.py b/nova/network/manager.py
index a7298b47f..6a30f30b7 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -49,7 +49,6 @@ import logging
import math
import IPy
-from twisted.internet import defer
from nova import context
from nova import db
@@ -399,10 +398,9 @@ class VlanManager(NetworkManager):
instances in its subnet.
"""
- @defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval."""
- yield super(VlanManager, self).periodic_tasks(context)
+ super(VlanManager, self).periodic_tasks(context)
now = datetime.datetime.utcnow()
timeout = FLAGS.fixed_ip_disassociate_timeout
time = now - datetime.timedelta(seconds=timeout)
diff --git a/nova/objectstore/image.py b/nova/objectstore/image.py
index 7292dbab8..abc28182e 100644
--- a/nova/objectstore/image.py
+++ b/nova/objectstore/image.py
@@ -21,7 +21,6 @@ Take uploaded bucket contents and register them as disk images (AMIs).
Requires decryption using keys in the manifest.
"""
-# TODO(jesse): Got these from Euca2ools, will need to revisit them
import binascii
import glob
@@ -29,7 +28,6 @@ import json
import os
import shutil
import tarfile
-import tempfile
from xml.etree import ElementTree
from nova import exception
@@ -199,12 +197,17 @@ class Image(object):
except:
ramdisk_id = None
+ try:
+ arch = manifest.find("machine_configuration/architecture").text
+ except:
+ arch = 'x86_64'
+
info = {
'imageId': image_id,
'imageLocation': image_location,
'imageOwnerId': context.project_id,
'isPublic': False, # FIXME: grab public from manifest
- 'architecture': 'x86_64', # FIXME: grab architecture from manifest
+ 'architecture': arch,
'imageType': image_type}
if kernel_id:
@@ -264,6 +267,7 @@ class Image(object):
if err:
raise exception.Error("Failed to decrypt initialization "
"vector: %s" % err)
+
_out, err = utils.execute(
'openssl enc -d -aes-128-cbc -in %s -K %s -iv %s -out %s'
% (encrypted_filename, key, iv, decrypted_filename),
diff --git a/nova/process.py b/nova/process.py
deleted file mode 100644
index b33df048b..000000000
--- a/nova/process.py
+++ /dev/null
@@ -1,209 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# Copyright 2010 FathomDB Inc.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Process pool using twisted threading
-"""
-
-import logging
-import StringIO
-
-from twisted.internet import defer
-from twisted.internet import error
-from twisted.internet import protocol
-from twisted.internet import reactor
-
-from nova import flags
-from nova.exception import ProcessExecutionError
-
-FLAGS = flags.FLAGS
-flags.DEFINE_integer('process_pool_size', 4,
- 'Number of processes to use in the process pool')
-
-
-# This is based on _BackRelay from twister.internal.utils, but modified to
-# capture both stdout and stderr, without odd stderr handling, and also to
-# handle stdin
-class BackRelayWithInput(protocol.ProcessProtocol):
- """
- Trivial protocol for communicating with a process and turning its output
- into the result of a L{Deferred}.
-
- @ivar deferred: A L{Deferred} which will be called back with all of stdout
- and all of stderr as well (as a tuple). C{terminate_on_stderr} is true
- and any bytes are received over stderr, this will fire with an
- L{_ProcessExecutionError} instance and the attribute will be set to
- C{None}.
-
- @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
- received over stderr, this attribute will refer to a L{Deferred} which
- will be called back when the process ends. This C{Deferred} is also
- associated with the L{_ProcessExecutionError} which C{deferred} fires
- with earlier in this case so that users can determine when the process
- has actually ended, in addition to knowing when bytes have been
- received via stderr.
- """
-
- def __init__(self, deferred, cmd, started_deferred=None,
- terminate_on_stderr=False, check_exit_code=True,
- process_input=None):
- self.deferred = deferred
- self.cmd = cmd
- self.stdout = StringIO.StringIO()
- self.stderr = StringIO.StringIO()
- self.started_deferred = started_deferred
- self.terminate_on_stderr = terminate_on_stderr
- self.check_exit_code = check_exit_code
- self.process_input = process_input
- self.on_process_ended = None
-
- def _build_execution_error(self, exit_code=None):
- return ProcessExecutionError(cmd=self.cmd,
- exit_code=exit_code,
- stdout=self.stdout.getvalue(),
- stderr=self.stderr.getvalue())
-
- def errReceived(self, text):
- self.stderr.write(text)
- if self.terminate_on_stderr and (self.deferred is not None):
- self.on_process_ended = defer.Deferred()
- self.deferred.errback(self._build_execution_error())
- self.deferred = None
- self.transport.loseConnection()
-
- def outReceived(self, text):
- self.stdout.write(text)
-
- def processEnded(self, reason):
- if self.deferred is not None:
- stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
- exit_code = reason.value.exitCode
- if self.check_exit_code and exit_code != 0:
- self.deferred.errback(self._build_execution_error(exit_code))
- else:
- try:
- if self.check_exit_code:
- reason.trap(error.ProcessDone)
- self.deferred.callback((stdout, stderr))
- except:
- # NOTE(justinsb): This logic is a little suspicious to me.
- # If the callback throws an exception, then errback will
- # be called also. However, this is what the unit tests
- # test for.
- exec_error = self._build_execution_error(exit_code)
- self.deferred.errback(exec_error)
- elif self.on_process_ended is not None:
- self.on_process_ended.errback(reason)
-
- def connectionMade(self):
- if self.started_deferred:
- self.started_deferred.callback(self)
- if self.process_input:
- self.transport.write(str(self.process_input))
- self.transport.closeStdin()
-
-
-def get_process_output(executable, args=None, env=None, path=None,
- process_reactor=None, check_exit_code=True,
- process_input=None, started_deferred=None,
- terminate_on_stderr=False):
- if process_reactor is None:
- process_reactor = reactor
- args = args and args or ()
- env = env and env and {}
- deferred = defer.Deferred()
- cmd = executable
- if args:
- cmd = " ".join([cmd] + args)
- logging.debug("Running cmd: %s", cmd)
- process_handler = BackRelayWithInput(
- deferred,
- cmd,
- started_deferred=started_deferred,
- check_exit_code=check_exit_code,
- process_input=process_input,
- terminate_on_stderr=terminate_on_stderr)
- # NOTE(vish): commands come in as unicode, but self.executes needs
- # strings or process.spawn raises a deprecation warning
- executable = str(executable)
- if not args is None:
- args = [str(x) for x in args]
- process_reactor.spawnProcess(process_handler, executable,
- (executable,) + tuple(args), env, path)
- return deferred
-
-
-class ProcessPool(object):
- """ A simple process pool implementation using Twisted's Process bits.
-
- This is pretty basic right now, but hopefully the API will be the correct
- one so that it can be optimized later.
- """
- def __init__(self, size=None):
- self.size = size and size or FLAGS.process_pool_size
- self._pool = defer.DeferredSemaphore(self.size)
-
- def simple_execute(self, cmd, **kw):
- """ Weak emulation of the old utils.execute() function.
-
- This only exists as a way to quickly move old execute methods to
- this new style of code.
-
- NOTE(termie): This will break on args with spaces in them.
- """
- parsed = cmd.split(' ')
- executable, args = parsed[0], parsed[1:]
- return self.execute(executable, args, **kw)
-
- def execute(self, *args, **kw):
- deferred = self._pool.acquire()
-
- def _associate_process(proto):
- deferred.process = proto.transport
- return proto.transport
-
- started = defer.Deferred()
- started.addCallback(_associate_process)
- kw.setdefault('started_deferred', started)
-
- deferred.process = None
- deferred.started = started
-
- deferred.addCallback(lambda _: get_process_output(*args, **kw))
- deferred.addBoth(self._release)
- return deferred
-
- def _release(self, retval=None):
- self._pool.release()
- return retval
-
-
-class SharedPool(object):
- _instance = None
-
- def __init__(self):
- if SharedPool._instance is None:
- self.__class__._instance = ProcessPool()
-
- def __getattr__(self, key):
- return getattr(self._instance, key)
-
-
-def simple_execute(cmd, **kwargs):
- return SharedPool().simple_execute(cmd, **kwargs)
diff --git a/nova/quota.py b/nova/quota.py
index 01dd0ecd4..f6ca9f77c 100644
--- a/nova/quota.py
+++ b/nova/quota.py
@@ -94,3 +94,8 @@ def allowed_floating_ips(context, num_floating_ips):
quota = get_quota(context, project_id)
allowed_floating_ips = quota['floating_ips'] - used_floating_ips
return min(num_floating_ips, allowed_floating_ips)
+
+
+class QuotaError(exception.ApiError):
+ """Quota Exceeeded"""
+ pass
diff --git a/nova/rpc.py b/nova/rpc.py
index 86a29574f..6a3f552db 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -25,18 +25,18 @@ import json
import logging
import sys
import time
+import traceback
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenthread
-from twisted.internet import defer
-from twisted.internet import task
+from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
-from nova import context
+from nova import utils
FLAGS = flags.FLAGS
@@ -128,17 +128,9 @@ class Consumer(messaging.Consumer):
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
- def fetch_repeatedly():
- while True:
- self.fetch(enable_callbacks=True)
- greenthread.sleep(0.1)
- greenthread.spawn(fetch_repeatedly)
-
- def attach_to_twisted(self):
- """Attach a callback to twisted that fires 10 times a second"""
- loop = task.LoopingCall(self.fetch, enable_callbacks=True)
- loop.start(interval=0.1)
- return loop
+ timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
+ timer.start(0.1)
+ return timer
class Publisher(messaging.Publisher):
@@ -196,11 +188,13 @@ class AdapterConsumer(TopicConsumer):
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
- # pylint: disable-msg=W0142
- d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
- if msg_id:
- d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
- d.addErrback(lambda e: msg_reply(msg_id, None, e))
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ if msg_id:
+ msg_reply(msg_id, rval, None)
+ except Exception as e:
+ if msg_id:
+ msg_reply(msg_id, None, sys.exc_info())
return
@@ -242,13 +236,15 @@ class DirectPublisher(Publisher):
def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id
- failure should be a twisted failure object"""
+ failure should be a sys.exc_info() tuple.
+
+ """
if failure:
- message = failure.getErrorMessage()
- traceback = failure.getTraceback()
+ message = str(failure[1])
+ tb = traceback.format_exception(*failure)
logging.error("Returning exception %s to caller", message)
- logging.error(traceback)
- failure = (failure.type.__name__, str(failure.value), traceback)
+ logging.error(tb)
+ failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
@@ -313,7 +309,6 @@ def call(context, topic, msg):
_pack_context(msg, context)
class WaitMessage(object):
-
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
@@ -337,41 +332,15 @@ def call(context, topic, msg):
except StopIteration:
pass
consumer.close()
+ # NOTE(termie): this is a little bit of a change from the original
+ # non-eventlet code where returning a Failure
+ # instance from a deferred call is very similar to
+ # raising an exception
+ if isinstance(wait_msg.result, Exception):
+ raise wait_msg.result
return wait_msg.result
-def call_twisted(context, topic, msg):
- """Sends a message on a topic and wait for a response"""
- LOG.debug("Making asynchronous call...")
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug("MSG_ID is %s" % (msg_id))
- _pack_context(msg, context)
-
- conn = Connection.instance()
- d = defer.Deferred()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
-
- def deferred_receive(data, message):
- """Acks message and callbacks or errbacks"""
- message.ack()
- if data['failure']:
- return d.errback(RemoteError(*data['failure']))
- else:
- return d.callback(data['result'])
-
- consumer.register_callback(deferred_receive)
- injected = consumer.attach_to_twisted()
-
- # clean up after the injected listened and return x
- d.addCallback(lambda x: injected.stop() and x or x)
-
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
- return d
-
-
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")
diff --git a/nova/server.py b/nova/server.py
deleted file mode 100644
index a0ee54681..000000000
--- a/nova/server.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Base functionality for nova daemons - gradually being replaced with twistd.py.
-"""
-
-import daemon
-from daemon import pidlockfile
-import logging
-import logging.handlers
-import os
-import signal
-import sys
-import time
-
-from nova import flags
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_bool('daemonize', False, 'daemonize this process')
-# NOTE(termie): right now I am defaulting to using syslog when we daemonize
-# it may be better to do something else -shrug-
-# NOTE(Devin): I think we should let each process have its own log file
-# and put it in /var/logs/nova/(appname).log
-# This makes debugging much easier and cuts down on sys log
-# clutter.
-flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing')
-flags.DEFINE_string('logfile', None, 'log file to output to')
-flags.DEFINE_string('logdir', None, 'directory to keep log files in '
- '(will be prepended to $logfile)')
-flags.DEFINE_string('pidfile', None, 'pid file to output to')
-flags.DEFINE_string('working_directory', './', 'working directory...')
-flags.DEFINE_integer('uid', os.getuid(), 'uid under which to run')
-flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run')
-
-
-def stop(pidfile):
- """
- Stop the daemon
- """
- # Get the pid from the pidfile
- try:
- pid = int(open(pidfile, 'r').read().strip())
- except IOError:
- message = "pidfile %s does not exist. Daemon not running?\n"
- sys.stderr.write(message % pidfile)
- return
-
- # Try killing the daemon process
- try:
- while 1:
- os.kill(pid, signal.SIGTERM)
- time.sleep(0.1)
- except OSError, err:
- err = str(err)
- if err.find("No such process") > 0:
- if os.path.exists(pidfile):
- os.remove(pidfile)
- else:
- print str(err)
- sys.exit(1)
-
-
-def serve(name, main):
- """Controller for server"""
- argv = FLAGS(sys.argv)
-
- if not FLAGS.pidfile:
- FLAGS.pidfile = '%s.pid' % name
-
- logging.debug("Full set of FLAGS: \n\n\n")
- for flag in FLAGS:
- logging.debug("%s : %s", flag, FLAGS.get(flag, None))
-
- action = 'start'
- if len(argv) > 1:
- action = argv.pop()
-
- if action == 'stop':
- stop(FLAGS.pidfile)
- sys.exit()
- elif action == 'restart':
- stop(FLAGS.pidfile)
- elif action == 'start':
- pass
- else:
- print 'usage: %s [options] [start|stop|restart]' % argv[0]
- sys.exit(1)
- daemonize(argv, name, main)
-
-
-def daemonize(args, name, main):
- """Does the work of daemonizing the process"""
- logging.getLogger('amqplib').setLevel(logging.WARN)
- files_to_keep = []
- if FLAGS.daemonize:
- logger = logging.getLogger()
- formatter = logging.Formatter(
- name + '(%(name)s): %(levelname)s %(message)s')
- if FLAGS.use_syslog and not FLAGS.logfile:
- syslog = logging.handlers.SysLogHandler(address='/dev/log')
- syslog.setFormatter(formatter)
- logger.addHandler(syslog)
- files_to_keep.append(syslog.socket)
- else:
- if not FLAGS.logfile:
- FLAGS.logfile = '%s.log' % name
- if FLAGS.logdir:
- FLAGS.logfile = os.path.join(FLAGS.logdir, FLAGS.logfile)
- logfile = logging.FileHandler(FLAGS.logfile)
- logfile.setFormatter(formatter)
- logger.addHandler(logfile)
- files_to_keep.append(logfile.stream)
- stdin, stdout, stderr = None, None, None
- else:
- stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
-
- if FLAGS.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
- else:
- logging.getLogger().setLevel(logging.WARNING)
-
- with daemon.DaemonContext(
- detach_process=FLAGS.daemonize,
- working_directory=FLAGS.working_directory,
- pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile,
- acquire_timeout=1,
- threaded=False),
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- uid=FLAGS.uid,
- gid=FLAGS.gid,
- files_preserve=files_to_keep):
- main(args)
diff --git a/nova/service.py b/nova/service.py
index 9454d4049..ac30aaceb 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -17,21 +17,17 @@
# under the License.
"""
-A service is a very thin wrapper around a Manager object. It exposes the
-manager's public methods to other components of the system via rpc. It will
-report state periodically to the database and is responsible for initiating
-any periodic tasts that need to be executed on a given host.
-
-This module contains Service, a generic baseclass for all workers.
+Generic Node baseclass for all workers that run on hosts
"""
import inspect
import logging
import os
+import sys
-from twisted.internet import defer
-from twisted.internet import task
-from twisted.application import service
+from eventlet import event
+from eventlet import greenthread
+from eventlet import greenpool
from nova import context
from nova import db
@@ -50,8 +46,16 @@ flags.DEFINE_integer('periodic_interval', 60,
'seconds between running periodic tasks',
lower_bound=1)
+flags.DEFINE_string('pidfile', None,
+ 'pidfile to use for this service')
+
+
+flags.DEFINE_flag(flags.HelpFlag())
+flags.DEFINE_flag(flags.HelpshortFlag())
+flags.DEFINE_flag(flags.HelpXMLFlag())
+
-class Service(object, service.Service):
+class Service(object):
"""Base class for workers that run on hosts."""
def __init__(self, host, binary, topic, manager, report_interval=None,
@@ -64,8 +68,9 @@ class Service(object, service.Service):
self.periodic_interval = periodic_interval
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
+ self.timers = []
- def startService(self): # pylint: disable-msg C0103
+ def start(self):
manager_class = utils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *self.saved_args,
**self.saved_kwargs)
@@ -80,26 +85,29 @@ class Service(object, service.Service):
except exception.NotFound:
self._create_service_ref(ctxt)
- conn = rpc.Connection.instance()
+ conn1 = rpc.Connection.instance(new=True)
+ conn2 = rpc.Connection.instance(new=True)
if self.report_interval:
consumer_all = rpc.AdapterConsumer(
- connection=conn,
+ connection=conn1,
topic=self.topic,
proxy=self)
consumer_node = rpc.AdapterConsumer(
- connection=conn,
+ connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
- consumer_all.attach_to_twisted()
- consumer_node.attach_to_twisted()
+ self.timers.append(consumer_all.attach_to_eventlet())
+ self.timers.append(consumer_node.attach_to_eventlet())
- pulse = task.LoopingCall(self.report_state)
+ pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
+ self.timers.append(pulse)
if self.periodic_interval:
- pulse = task.LoopingCall(self.periodic_tasks)
- pulse.start(interval=self.periodic_interval, now=False)
+ periodic = utils.LoopingCall(self.periodic_tasks)
+ periodic.start(interval=self.periodic_interval, now=False)
+ self.timers.append(periodic)
def _create_service_ref(self, context):
service_ref = db.service_create(context,
@@ -147,25 +155,28 @@ class Service(object, service.Service):
service_obj = cls(host, binary, topic, manager,
report_interval, periodic_interval)
- # This is the parent service that twistd will be looking for when it
- # parses this file, return it so that we can get it into globals.
- application = service.Application(binary)
- service_obj.setServiceParent(application)
- return application
+ return service_obj
def kill(self):
"""Destroy the service object in the datastore"""
+ self.stop()
try:
db.service_destroy(context.get_admin_context(), self.service_id)
except exception.NotFound:
logging.warn("Service killed that has no database entry")
- @defer.inlineCallbacks
+ def stop(self):
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception:
+ pass
+ self.timers = []
+
def periodic_tasks(self):
"""Tasks to be run at a periodic interval"""
- yield self.manager.periodic_tasks(context.get_admin_context())
+ self.manager.periodic_tasks(context.get_admin_context())
- @defer.inlineCallbacks
def report_state(self):
"""Update the state of this service in the datastore."""
ctxt = context.get_admin_context()
@@ -192,4 +203,32 @@ class Service(object, service.Service):
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
- yield
+
+
+def serve(*services):
+ argv = FLAGS(sys.argv)
+
+ if not services:
+ services = [Service.create()]
+
+ name = '_'.join(x.binary for x in services)
+ logging.debug("Serving %s" % name)
+
+ logging.getLogger('amqplib').setLevel(logging.WARN)
+
+ if FLAGS.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ else:
+ logging.getLogger().setLevel(logging.WARNING)
+
+ logging.debug("Full set of FLAGS:")
+ for flag in FLAGS:
+ logging.debug("%s : %s" % (flag, FLAGS.get(flag, None)))
+
+ for x in services:
+ x.start()
+
+
+def wait():
+ while True:
+ greenthread.sleep(5)
diff --git a/nova/test.py b/nova/test.py
index 5c2a72819..7076f1bf4 100644
--- a/nova/test.py
+++ b/nova/test.py
@@ -25,11 +25,12 @@ and some black magic for inline callbacks.
import datetime
import sys
import time
+import unittest
import mox
import stubout
from twisted.internet import defer
-from twisted.trial import unittest
+from twisted.trial import unittest as trial_unittest
from nova import context
from nova import db
@@ -55,11 +56,11 @@ def skip_if_fake(func):
return _skipper
-class TrialTestCase(unittest.TestCase):
+class TestCase(unittest.TestCase):
"""Test case base class for all unit tests"""
def setUp(self):
"""Run before each test method to initialize test environment"""
- super(TrialTestCase, self).setUp()
+ super(TestCase, self).setUp()
# NOTE(vish): We need a better method for creating fixtures for tests
# now that we have some required db setup for the system
# to work properly.
@@ -94,7 +95,87 @@ class TrialTestCase(unittest.TestCase):
db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host,
self.start)
db.network_disassociate_all(ctxt)
- rpc.Consumer.attach_to_twisted = self.originalAttach
+ rpc.Consumer.attach_to_eventlet = self.originalAttach
+ for x in self.injected:
+ try:
+ x.stop()
+ except AssertionError:
+ pass
+
+ if FLAGS.fake_rabbit:
+ fakerabbit.reset_all()
+
+ db.security_group_destroy_all(ctxt)
+ super(TestCase, self).tearDown()
+ finally:
+ self.reset_flags()
+
+ def flags(self, **kw):
+ """Override flag variables for a test"""
+ for k, v in kw.iteritems():
+ if k in self.flag_overrides:
+ self.reset_flags()
+ raise Exception(
+ 'trying to override already overriden flag: %s' % k)
+ self.flag_overrides[k] = getattr(FLAGS, k)
+ setattr(FLAGS, k, v)
+
+ def reset_flags(self):
+ """Resets all flag variables for the test. Runs after each test"""
+ FLAGS.Reset()
+ for k, v in self._original_flags.iteritems():
+ setattr(FLAGS, k, v)
+
+ def _monkey_patch_attach(self):
+ self.originalAttach = rpc.Consumer.attach_to_eventlet
+
+ def _wrapped(innerSelf):
+ rv = self.originalAttach(innerSelf)
+ self.injected.append(rv)
+ return rv
+
+ _wrapped.func_name = self.originalAttach.func_name
+ rpc.Consumer.attach_to_eventlet = _wrapped
+
+
+class TrialTestCase(trial_unittest.TestCase):
+ """Test case base class for all unit tests"""
+ def setUp(self):
+ """Run before each test method to initialize test environment"""
+ super(TrialTestCase, self).setUp()
+ # NOTE(vish): We need a better method for creating fixtures for tests
+ # now that we have some required db setup for the system
+ # to work properly.
+ self.start = datetime.datetime.utcnow()
+ ctxt = context.get_admin_context()
+ if db.network_count(ctxt) != 5:
+ network_manager.VlanManager().create_networks(ctxt,
+ FLAGS.fixed_range,
+ 5, 16,
+ FLAGS.vlan_start,
+ FLAGS.vpn_start)
+
+ # emulate some of the mox stuff, we can't use the metaclass
+ # because it screws with our generators
+ self.mox = mox.Mox()
+ self.stubs = stubout.StubOutForTesting()
+ self.flag_overrides = {}
+ self.injected = []
+ self._original_flags = FLAGS.FlagValuesDict()
+
+ def tearDown(self):
+ """Runs after each test method to finalize/tear down test
+ environment."""
+ try:
+ self.mox.UnsetStubs()
+ self.stubs.UnsetAll()
+ self.stubs.SmartUnsetAll()
+ self.mox.VerifyAll()
+ # NOTE(vish): Clean up any ips associated during the test.
+ ctxt = context.get_admin_context()
+ db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host,
+ self.start)
+ db.network_disassociate_all(ctxt)
for x in self.injected:
try:
x.stop()
@@ -147,14 +228,3 @@ class TrialTestCase(unittest.TestCase):
return d
_wrapped.func_name = func.func_name
return _wrapped
-
- def _monkey_patch_attach(self):
- self.originalAttach = rpc.Consumer.attach_to_twisted
-
- def _wrapped(innerSelf):
- rv = self.originalAttach(innerSelf)
- self.injected.append(rv)
- return rv
-
- _wrapped.func_name = self.originalAttach.func_name
- rpc.Consumer.attach_to_twisted = _wrapped
diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py
index aaf213923..8dc87d0e2 100644
--- a/nova/tests/__init__.py
+++ b/nova/tests/__init__.py
@@ -29,3 +29,8 @@
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
+
+# See http://code.google.com/p/python-nose/issues/detail?id=373
+# The code below enables nosetests to work with i18n _() blocks
+import __builtin__
+setattr(__builtin__, '_', lambda x: x)
diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py
index 0f66c0a26..58fdea3b5 100644
--- a/nova/tests/access_unittest.py
+++ b/nova/tests/access_unittest.py
@@ -35,7 +35,7 @@ class Context(object):
pass
-class AccessTestCase(test.TrialTestCase):
+class AccessTestCase(test.TestCase):
def setUp(self):
super(AccessTestCase, self).setUp()
um = manager.AuthManager()
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 639a2ebe4..21b8aac1c 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -24,9 +24,10 @@ import webob
import webob.dec
from nova import auth
-from nova import utils
-from nova import flags
+from nova import context
from nova import exception as exc
+from nova import flags
+from nova import utils
import nova.api.openstack.auth
from nova.image import service
from nova.image import glance
@@ -54,12 +55,11 @@ def fake_auth_init(self):
self.db = FakeAuthDatabase()
self.context = Context()
self.auth = FakeAuthManager()
- self.host = 'foo'
@webob.dec.wsgify
def fake_wsgi(self, req):
- req.environ['nova.context'] = dict(user=dict(id=1))
+ req.environ['nova.context'] = context.RequestContext(1, 1)
if req.body:
req.environ['inst_dict'] = json.loads(req.body)
return self.application
@@ -68,12 +68,11 @@ def fake_wsgi(self, req):
def stub_out_key_pair_funcs(stubs):
def key_pair(context, user_id):
return [dict(name='key', public_key='public_key')]
- stubs.Set(nova.db.api, 'key_pair_get_all_by_user',
- key_pair)
+ stubs.Set(nova.db, 'key_pair_get_all_by_user', key_pair)
def stub_out_image_service(stubs):
- def fake_image_show(meh, id):
+ def fake_image_show(meh, context, id):
return dict(kernelId=1, ramdiskId=1)
stubs.Set(nova.image.local.LocalImageService, 'show', fake_image_show)
@@ -173,6 +172,12 @@ class FakeToken(object):
setattr(self, k, v)
+class FakeRequestContext(object):
+ def __init__(self, user, project):
+ self.user_id = 1
+ self.project_id = 1
+
+
class FakeAuthDatabase(object):
data = {}
diff --git a/nova/tests/api/openstack/test_adminapi.py b/nova/tests/api/openstack/test_adminapi.py
new file mode 100644
index 000000000..1b2e1654d
--- /dev/null
+++ b/nova/tests/api/openstack/test_adminapi.py
@@ -0,0 +1,61 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import stubout
+import webob
+
+import nova.api
+from nova import flags
+from nova.tests.api.openstack import fakes
+
+FLAGS = flags.FLAGS
+
+
+class AdminAPITest(unittest.TestCase):
+ def setUp(self):
+ self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.auth_data = {}
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_networking(self.stubs)
+ fakes.stub_out_rate_limiting(self.stubs)
+ fakes.stub_out_auth(self.stubs)
+ self.allow_admin = FLAGS.allow_admin_api
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.allow_admin_api = self.allow_admin
+
+ def test_admin_enabled(self):
+ FLAGS.allow_admin_api = True
+ # We should still be able to access public operations.
+ req = webob.Request.blank('/v1.0/flavors')
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 200)
+ # TODO: Confirm admin operations are available.
+
+ def test_admin_disabled(self):
+ FLAGS.allow_admin_api = False
+ # We should still be able to access public operations.
+ req = webob.Request.blank('/v1.0/flavors')
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 200)
+ # TODO: Confirm admin operations are unavailable.
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py
index 29f4b8874..7b427c2db 100644
--- a/nova/tests/api/openstack/test_auth.py
+++ b/nova/tests/api/openstack/test_auth.py
@@ -26,6 +26,7 @@ import nova.api
import nova.api.openstack.auth
import nova.auth.manager
from nova import auth
+from nova import context
from nova.tests.api.openstack import fakes
@@ -35,6 +36,7 @@ class Test(unittest.TestCase):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
'__init__', fakes.fake_auth_init)
+ self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_rate_limiting(self.stubs)
@@ -62,14 +64,14 @@ class Test(unittest.TestCase):
f = fakes.FakeAuthManager()
f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None))
- req = webob.Request.blank('/v1.0/')
+ req = webob.Request.blank('/v1.0/', {'HTTP_HOST': 'foo'})
req.headers['X-Auth-User'] = 'herp'
req.headers['X-Auth-Key'] = 'derp'
result = req.get_response(nova.api.API('os'))
self.assertEqual(result.status, '204 No Content')
self.assertEqual(len(result.headers['X-Auth-Token']), 40)
self.assertEqual(result.headers['X-Server-Management-Url'],
- "https://foo/v1.0/")
+ "http://foo/v1.0/")
self.assertEqual(result.headers['X-CDN-Management-Url'],
"")
self.assertEqual(result.headers['X-Storage-Url'], "")
@@ -131,6 +133,7 @@ class TestLimiter(unittest.TestCase):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
'__init__', fakes.fake_auth_init)
+ self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 8cfc6c45a..8444b6fce 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -43,9 +43,21 @@ def return_servers(context, user_id=1):
return [stub_instance(i, user_id) for i in xrange(5)]
+def return_security_group(context, instance_id, security_group_id):
+ pass
+
+
+def instance_update(context, instance_id, kwargs):
+ return stub_instance(instance_id)
+
+
+def instance_address(context, instance_id):
+ return None
+
+
def stub_instance(id, user_id=1):
- return Instance(id=id, state=0, image_id=10, server_name='server%s' % id,
- user_id=user_id)
+ return Instance(id=id + 123456, state=0, image_id=10, user_id=user_id,
+ display_name='server%s' % id, internal_id=id)
class ServersTest(unittest.TestCase):
@@ -63,6 +75,13 @@ class ServersTest(unittest.TestCase):
return_server)
self.stubs.Set(nova.db.api, 'instance_get_all_by_user',
return_servers)
+ self.stubs.Set(nova.db.api, 'instance_add_security_group',
+ return_security_group)
+ self.stubs.Set(nova.db.api, 'instance_update', instance_update)
+ self.stubs.Set(nova.db.api, 'instance_get_fixed_address',
+ instance_address)
+ self.stubs.Set(nova.db.api, 'instance_get_floating_address',
+ instance_address)
def tearDown(self):
self.stubs.UnsetAll()
@@ -87,11 +106,11 @@ class ServersTest(unittest.TestCase):
i += 1
def test_create_instance(self):
- def server_update(context, id, params):
- pass
-
def instance_create(context, inst):
- return {'id': 1, 'internal_id': 1}
+ return {'id': 1, 'internal_id': 1, 'display_name': ''}
+
+ def server_update(context, id, params):
+ return instance_create(context, id)
def fake_method(*args, **kwargs):
pass
diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py
index 0d2082bdb..78ce7a0a0 100644
--- a/nova/tests/auth_unittest.py
+++ b/nova/tests/auth_unittest.py
@@ -323,12 +323,12 @@ class AuthManagerTestCase(object):
self.assertTrue(user.is_admin())
-class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
def __init__(self, *args, **kwargs):
AuthManagerTestCase.__init__(self)
- test.TrialTestCase.__init__(self, *args, **kwargs)
+ test.TestCase.__init__(self, *args, **kwargs)
import nova.auth.fakeldap as fakeldap
FLAGS.redis_db = 8
if FLAGS.flush_db:
@@ -340,7 +340,7 @@ class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
self.skip = True
-class AuthManagerDbTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 9886a2449..53a762310 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -27,8 +27,6 @@ import tempfile
import time
from eventlet import greenthread
-from twisted.internet import defer
-import unittest
from xml.etree import ElementTree
from nova import context
@@ -53,7 +51,7 @@ IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
os.makedirs(IMAGES_PATH)
-class CloudTestCase(test.TrialTestCase):
+class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
self.flags(connection_type='fake', images_path=IMAGES_PATH)
@@ -126,6 +124,19 @@ class CloudTestCase(test.TrialTestCase):
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
+ def test_describe_volumes(self):
+ """Makes sure describe_volumes works and filters results."""
+ vol1 = db.volume_create(self.context, {})
+ vol2 = db.volume_create(self.context, {})
+ result = self.cloud.describe_volumes(self.context)
+ self.assertEqual(len(result['volumeSet']), 2)
+ result = self.cloud.describe_volumes(self.context,
+ volume_id=[vol2['ec2_id']])
+ self.assertEqual(len(result['volumeSet']), 1)
+ self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id'])
+ db.volume_destroy(self.context, vol1['id'])
+ db.volume_destroy(self.context, vol2['id'])
+
def test_console_output(self):
image_id = FLAGS.default_image
instance_type = FLAGS.default_instance_type
@@ -186,7 +197,7 @@ class CloudTestCase(test.TrialTestCase):
logging.debug("Need to watch instance %s until it's running..." %
instance['instance_id'])
while True:
- rv = yield defer.succeed(time.sleep(1))
+ greenthread.sleep(1)
info = self.cloud._get_instance(instance['instance_id'])
logging.debug(info['state'])
if info['state'] == power_state.RUNNING:
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index 71a1a4457..c6353d357 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -22,8 +22,6 @@ Tests For Compute
import datetime
import logging
-from twisted.internet import defer
-
from nova import context
from nova import db
from nova import exception
@@ -31,11 +29,13 @@ from nova import flags
from nova import test
from nova import utils
from nova.auth import manager
+from nova.compute import api as compute_api
+
FLAGS = flags.FLAGS
-class ComputeTestCase(test.TrialTestCase):
+class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
@@ -43,6 +43,7 @@ class ComputeTestCase(test.TrialTestCase):
self.flags(connection_type='fake',
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
+ self.compute_api = compute_api.ComputeAPI()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake')
self.project = self.manager.create_project('fake', 'fake', 'fake')
@@ -66,45 +67,48 @@ class ComputeTestCase(test.TrialTestCase):
inst['ami_launch_index'] = 0
return db.instance_create(self.context, inst)['id']
+ def test_create_instance_defaults_display_name(self):
+ """Verify that an instance cannot be created without a display_name."""
+ cases = [dict(), dict(display_name=None)]
+ for instance in cases:
+ ref = self.compute_api.create_instances(self.context,
+ FLAGS.default_instance_type, None, **instance)
+ try:
+ self.assertNotEqual(ref[0].display_name, None)
+ finally:
+ db.instance_destroy(self.context, ref[0]['id'])
+
def test_create_instance_associates_security_groups(self):
- """Make sure create_instance associates security groups"""
- inst = {}
- inst['user_id'] = self.user.id
- inst['project_id'] = self.project.id
+ """Make sure create_instances associates security groups"""
values = {'name': 'default',
'description': 'default',
'user_id': self.user.id,
'project_id': self.project.id}
group = db.security_group_create(self.context, values)
- ref = self.compute.create_instance(self.context,
- security_groups=[group['id']],
- **inst)
- # reload to get groups
- instance_ref = db.instance_get(self.context, ref['id'])
+ ref = self.compute_api.create_instances(self.context,
+ FLAGS.default_instance_type, None, security_group=['default'])
try:
- self.assertEqual(len(instance_ref['security_groups']), 1)
+ self.assertEqual(len(ref[0]['security_groups']), 1)
finally:
db.security_group_destroy(self.context, group['id'])
- db.instance_destroy(self.context, instance_ref['id'])
+ db.instance_destroy(self.context, ref[0]['id'])
- @defer.inlineCallbacks
def test_run_terminate(self):
"""Make sure it is possible to run and terminate instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("Running instances: %s", instances)
self.assertEqual(len(instances), 1)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("After terminating instances: %s", instances)
self.assertEqual(len(instances), 0)
- @defer.inlineCallbacks
def test_run_terminate_timestamps(self):
"""Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
@@ -112,42 +116,40 @@ class ComputeTestCase(test.TrialTestCase):
self.assertEqual(instance_ref['launched_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
self.context = self.context.elevated(True)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
- @defer.inlineCallbacks
def test_reboot(self):
"""Ensure instance can be rebooted"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- yield self.compute.reboot_instance(self.context, instance_id)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.reboot_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_console_output(self):
"""Make sure we can get console output from instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
- console = yield self.compute.get_console_output(self.context,
+ console = self.compute.get_console_output(self.context,
instance_id)
self.assert_(console)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_run_instance_existing(self):
"""Ensure failure when running an instance that already exists"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- self.assertFailure(self.compute.run_instance(self.context,
- instance_id),
- exception.Error)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.assertRaises(exception.Error,
+ self.compute.run_instance,
+ self.context,
+ instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
diff --git a/nova/tests/flags_unittest.py b/nova/tests/flags_unittest.py
index b97df075d..707300fcf 100644
--- a/nova/tests/flags_unittest.py
+++ b/nova/tests/flags_unittest.py
@@ -24,7 +24,7 @@ FLAGS = flags.FLAGS
flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only')
-class FlagsTestCase(test.TrialTestCase):
+class FlagsTestCase(test.TestCase):
def setUp(self):
super(FlagsTestCase, self).setUp()
diff --git a/nova/tests/misc_unittest.py b/nova/tests/misc_unittest.py
index 856060afa..3d947427a 100644
--- a/nova/tests/misc_unittest.py
+++ b/nova/tests/misc_unittest.py
@@ -15,34 +15,41 @@
# under the License.
import os
-import subprocess
from nova import test
from nova.utils import parse_mailmap, str_dict_replace
-class ProjectTestCase(test.TrialTestCase):
+class ProjectTestCase(test.TestCase):
def test_authors_up_to_date(self):
if os.path.exists('../.bzr'):
- log_cmd = subprocess.Popen(["bzr", "log", "-n0"],
- stdout=subprocess.PIPE)
- changelog = log_cmd.communicate()[0]
+ contributors = set()
+
mailmap = parse_mailmap('../.mailmap')
- contributors = set()
- for l in changelog.split('\n'):
- l = l.strip()
- if (l.startswith('author:') or l.startswith('committer:')
- and not l == 'committer: Tarmac'):
- email = l.split(' ')[-1]
- contributors.add(str_dict_replace(email, mailmap))
-
- authors_file = open('../Authors', 'r').read()
-
- missing = set()
- for contributor in contributors:
- if not contributor in authors_file:
- missing.add(contributor)
-
- self.assertTrue(len(missing) == 0,
- '%r not listed in Authors' % missing)
+ import bzrlib.workingtree
+ tree = bzrlib.workingtree.WorkingTree.open('..')
+ tree.lock_read()
+ try:
+ parents = tree.get_parent_ids()
+ g = tree.branch.repository.get_graph()
+ for p in parents[1:]:
+ rev_ids = [r for r, _ in g.iter_ancestry(parents)
+ if r != "null:"]
+ revs = tree.branch.repository.get_revisions(rev_ids)
+ for r in revs:
+ for author in r.get_apparent_authors():
+ email = author.split(' ')[-1]
+ contributors.add(str_dict_replace(email, mailmap))
+
+ authors_file = open('../Authors', 'r').read()
+
+ missing = set()
+ for contributor in contributors:
+ if not contributor in authors_file:
+ missing.add(contributor)
+
+ self.assertTrue(len(missing) == 0,
+ '%r not listed in Authors' % missing)
+ finally:
+ tree.unlock()
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index 6f4705719..bcac20585 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -33,7 +33,7 @@ from nova.auth import manager
FLAGS = flags.FLAGS
-class NetworkTestCase(test.TrialTestCase):
+class NetworkTestCase(test.TestCase):
"""Test cases for network code"""
def setUp(self):
super(NetworkTestCase, self).setUp()
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index 061799923..ceac17adb 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -54,7 +54,7 @@ os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
-class ObjectStoreTestCase(test.TrialTestCase):
+class ObjectStoreTestCase(test.TestCase):
"""Test objectstore API directly."""
def setUp(self):
@@ -191,7 +191,7 @@ class TestSite(server.Site):
protocol = TestHTTPChannel
-class S3APITestCase(test.TrialTestCase):
+class S3APITestCase(test.TestCase):
"""Test objectstore through S3 API."""
def setUp(self):
diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py
deleted file mode 100644
index 67245af03..000000000
--- a/nova/tests/process_unittest.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-from twisted.internet import defer
-from twisted.internet import reactor
-from xml.etree import ElementTree
-
-from nova import exception
-from nova import flags
-from nova import process
-from nova import test
-from nova import utils
-
-FLAGS = flags.FLAGS
-
-
-class ProcessTestCase(test.TrialTestCase):
- def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
- super(ProcessTestCase, self).setUp()
-
- def test_execute_stdout(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('echo test')
-
- def _check(rv):
- self.assertEqual(rv[0], 'test\n')
- self.assertEqual(rv[1], '')
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE', check_exit_code=False)
-
- def _check(rv):
- self.assertEqual(rv[0], '')
- self.assert_('No such file' in rv[1])
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_unexpected_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE')
- d.addCallback(lambda x: self.fail('should have raised an error'))
- d.addErrback(lambda failure: failure.trap(IOError))
- return d
-
- def test_max_processes(self):
- pool = process.ProcessPool(2)
- d1 = pool.simple_execute('sleep 0.01')
- d2 = pool.simple_execute('sleep 0.01')
- d3 = pool.simple_execute('sleep 0.005')
- d4 = pool.simple_execute('sleep 0.005')
-
- called = []
-
- def _called(rv, name):
- called.append(name)
-
- d1.addCallback(_called, 'd1')
- d2.addCallback(_called, 'd2')
- d3.addCallback(_called, 'd3')
- d4.addCallback(_called, 'd4')
-
- # Make sure that d3 and d4 had to wait on the other two and were called
- # in order
- # NOTE(termie): there may be a race condition in this test if for some
- # reason one of the sleeps takes longer to complete
- # than it should
- d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
- d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
- d4.addErrback(self.fail)
- return d4
-
- def test_kill_long_process(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d2 = pool.simple_execute('sleep 0.005')
-
- timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
-
- # kill d1 and wait on it to end then cancel the timeout
- d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
- d2.addCallback(lambda _: d1)
- d2.addBoth(lambda _: timeout.active() and timeout.cancel())
- d2.addErrback(self.fail)
- return d2
-
- def test_process_exit_is_contained(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d1.addCallback(lambda x: self.fail('should have errbacked'))
- d1.addErrback(lambda fail: fail.trap(IOError))
- reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
-
- return d1
-
- def test_shared_pool_is_singleton(self):
- pool1 = process.SharedPool()
- pool2 = process.SharedPool()
- self.assertEqual(id(pool1._instance), id(pool2._instance))
-
- def test_shared_pool_works_as_singleton(self):
- d1 = process.simple_execute('sleep 1')
- d2 = process.simple_execute('sleep 0.005')
- # lp609749: would have failed with
- # exceptions.AssertionError: Someone released me too many times:
- # too many tokens!
- return d1
diff --git a/nova/tests/quota_unittest.py b/nova/tests/quota_unittest.py
index b7c1d2acc..8cf2a5e54 100644
--- a/nova/tests/quota_unittest.py
+++ b/nova/tests/quota_unittest.py
@@ -32,7 +32,7 @@ from nova.api.ec2 import cloud
FLAGS = flags.FLAGS
-class QuotaTestCase(test.TrialTestCase):
+class QuotaTestCase(test.TestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(QuotaTestCase, self).setUp()
@@ -94,11 +94,12 @@ class QuotaTestCase(test.TrialTestCase):
for i in range(FLAGS.quota_instances):
instance_id = self._create_instance()
instance_ids.append(instance_id)
- self.assertRaises(cloud.QuotaError, self.cloud.run_instances,
+ self.assertRaises(quota.QuotaError, self.cloud.run_instances,
self.context,
min_count=1,
max_count=1,
- instance_type='m1.small')
+ instance_type='m1.small',
+ image_id='fake')
for instance_id in instance_ids:
db.instance_destroy(self.context, instance_id)
@@ -106,11 +107,12 @@ class QuotaTestCase(test.TrialTestCase):
instance_ids = []
instance_id = self._create_instance(cores=4)
instance_ids.append(instance_id)
- self.assertRaises(cloud.QuotaError, self.cloud.run_instances,
+ self.assertRaises(quota.QuotaError, self.cloud.run_instances,
self.context,
min_count=1,
max_count=1,
- instance_type='m1.small')
+ instance_type='m1.small',
+ image_id='fake')
for instance_id in instance_ids:
db.instance_destroy(self.context, instance_id)
@@ -119,7 +121,7 @@ class QuotaTestCase(test.TrialTestCase):
for i in range(FLAGS.quota_volumes):
volume_id = self._create_volume()
volume_ids.append(volume_id)
- self.assertRaises(cloud.QuotaError, self.cloud.create_volume,
+ self.assertRaises(quota.QuotaError, self.cloud.create_volume,
self.context,
size=10)
for volume_id in volume_ids:
@@ -129,7 +131,7 @@ class QuotaTestCase(test.TrialTestCase):
volume_ids = []
volume_id = self._create_volume(size=20)
volume_ids.append(volume_id)
- self.assertRaises(cloud.QuotaError,
+ self.assertRaises(quota.QuotaError,
self.cloud.create_volume,
self.context,
size=10)
@@ -146,6 +148,6 @@ class QuotaTestCase(test.TrialTestCase):
# make an rpc.call, the test just finishes with OK. It
# appears to be something in the magic inline callbacks
# that is breaking.
- self.assertRaises(cloud.QuotaError, self.cloud.allocate_address,
+ self.assertRaises(quota.QuotaError, self.cloud.allocate_address,
self.context)
db.floating_ip_destroy(context.get_admin_context(), address)
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
index f35b65a39..a2495e65a 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/rpc_unittest.py
@@ -20,8 +20,6 @@ Unit Tests for remote procedure calls using queue
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import flags
from nova import rpc
@@ -31,7 +29,7 @@ from nova import test
FLAGS = flags.FLAGS
-class RpcTestCase(test.TrialTestCase):
+class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
@@ -40,23 +38,22 @@ class RpcTestCase(test.TrialTestCase):
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
- self.consumer.attach_to_twisted()
+ self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "echo",
+ result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "context",
- "args": {"value": value}})
+ result = rpc.call(self.context,
+ 'test', {"method": "context",
+ "args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
@@ -67,14 +64,17 @@ class RpcTestCase(test.TrialTestCase):
to an int in the test.
"""
value = 42
- self.assertFailure(rpc.call_twisted(self.context, 'test',
- {"method": "fail",
- "args": {"value": value}}),
- rpc.RemoteError)
+ self.assertRaises(rpc.RemoteError,
+ rpc.call,
+ self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
try:
- yield rpc.call_twisted(self.context,
- 'test', {"method": "fail",
- "args": {"value": value}})
+ rpc.call(self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
@@ -89,13 +89,13 @@ class TestReceiver(object):
def echo(context, value):
"""Simply returns whatever value is sent in"""
logging.debug("Received %s", value)
- return defer.succeed(value)
+ return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
logging.debug("Received %s", context)
- return defer.succeed(context.to_dict())
+ return context.to_dict()
@staticmethod
def fail(context, value):
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
index cb5fe6b9c..d1756b8fb 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/scheduler_unittest.py
@@ -44,7 +44,7 @@ class TestDriver(driver.Scheduler):
return 'named_host'
-class SchedulerTestCase(test.TrialTestCase):
+class SchedulerTestCase(test.TestCase):
"""Test case for scheduler"""
def setUp(self):
super(SchedulerTestCase, self).setUp()
@@ -73,7 +73,7 @@ class SchedulerTestCase(test.TrialTestCase):
scheduler.named_method(ctxt, 'topic', num=7)
-class SimpleDriverTestCase(test.TrialTestCase):
+class SimpleDriverTestCase(test.TestCase):
"""Test case for simple driver"""
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
@@ -122,12 +122,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(len(hosts), 2)
compute1.kill()
@@ -139,12 +139,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
@@ -162,12 +162,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
@@ -195,12 +195,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
@@ -218,12 +218,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_ids1 = []
volume_ids2 = []
for index in xrange(FLAGS.max_gigabytes):
diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py
index a268bc4fe..47c092f8e 100644
--- a/nova/tests/service_unittest.py
+++ b/nova/tests/service_unittest.py
@@ -22,9 +22,6 @@ Unit Tests for remote procedure calls using queue
import mox
-from twisted.application.app import startApplication
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import rpc
@@ -48,7 +45,7 @@ class ExtendedService(service.Service):
return 'service'
-class ServiceManagerTestCase(test.TrialTestCase):
+class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
def test_attribute_error_for_no_manager(self):
@@ -63,7 +60,7 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
self.assertEqual(serv.test_method(), 'manager')
def test_override_manager_method(self):
@@ -71,11 +68,11 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
self.assertEqual(serv.test_method(), 'service')
-class ServiceTestCase(test.TrialTestCase):
+class ServiceTestCase(test.TestCase):
"""Test cases for Services"""
def setUp(self):
@@ -94,8 +91,6 @@ class ServiceTestCase(test.TrialTestCase):
self.mox.StubOutWithMock(rpc,
'AdapterConsumer',
use_mock_anything=True)
- self.mox.StubOutWithMock(
- service.task, 'LoopingCall', use_mock_anything=True)
rpc.AdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@@ -106,19 +101,8 @@ class ServiceTestCase(test.TrialTestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.AdapterConsumer)
- rpc.AdapterConsumer.attach_to_twisted()
- rpc.AdapterConsumer.attach_to_twisted()
-
- # Stub out looping call a bit needlessly since we don't have an easy
- # way to cancel it (yet) when the tests finishes
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
+ rpc.AdapterConsumer.attach_to_eventlet()
+ rpc.AdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@@ -136,14 +120,14 @@ class ServiceTestCase(test.TrialTestCase):
service_create).AndReturn(service_ref)
self.mox.ReplayAll()
- startApplication(app, False)
+ app.start()
+ app.stop()
self.assert_(app)
# We're testing sort of weird behavior in how report_state decides
# whether it is disconnected, it looks for a variable on itself called
# 'model_disconnected' and report_state doesn't really do much so this
# these are mostly just for coverage
- @defer.inlineCallbacks
def test_report_state_no_service(self):
host = 'foo'
binary = 'bar'
@@ -173,10 +157,9 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ serv.start()
+ serv.report_state()
- @defer.inlineCallbacks
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@@ -204,11 +187,10 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ serv.start()
+ serv.report_state()
self.assert_(serv.model_disconnected)
- @defer.inlineCallbacks
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
@@ -238,8 +220,8 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
serv.model_disconnected = True
- yield serv.report_state()
+ serv.report_state()
self.assert_(not serv.model_disconnected)
diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py
index d49383fb7..85e569858 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/virt_unittest.py
@@ -30,7 +30,7 @@ FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
-class LibvirtConnTestCase(test.TrialTestCase):
+class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
self.manager = manager.AuthManager()
@@ -123,7 +123,7 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.manager.delete_user(self.user)
-class NWFilterTestCase(test.TrialTestCase):
+class NWFilterTestCase(test.TestCase):
def setUp(self):
super(NWFilterTestCase, self).setUp()
@@ -235,7 +235,7 @@ class NWFilterTestCase(test.TrialTestCase):
'project_id': 'fake'})
inst_id = instance_ref['id']
- def _ensure_all_called(_):
+ def _ensure_all_called():
instance_filter = 'nova-instance-%s' % instance_ref['name']
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
@@ -253,7 +253,6 @@ class NWFilterTestCase(test.TrialTestCase):
instance = db.instance_get(self.context, inst_id)
d = self.fw.setup_nwfilters_for_instance(instance)
- d.addCallback(_ensure_all_called)
- d.addCallback(lambda _: self.teardown_security_group())
-
+ _ensure_all_called()
+ self.teardown_security_group()
return d
diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py
index 12321a96f..b13455fb0 100644
--- a/nova/tests/volume_unittest.py
+++ b/nova/tests/volume_unittest.py
@@ -21,8 +21,6 @@ Tests for Volume Code.
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import exception
from nova import db
@@ -33,7 +31,7 @@ from nova import utils
FLAGS = flags.FLAGS
-class VolumeTestCase(test.TrialTestCase):
+class VolumeTestCase(test.TestCase):
"""Test Case for volumes."""
def setUp(self):
@@ -56,51 +54,48 @@ class VolumeTestCase(test.TrialTestCase):
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)['id']
- @defer.inlineCallbacks
def test_create_delete_volume(self):
"""Test volume can be created and deleted."""
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
- @defer.inlineCallbacks
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
# volume_create
- defer.returnValue(True)
+ return True
try:
volume_id = self._create_volume('1001')
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.fail("Should have thrown TypeError")
except TypeError:
pass
- @defer.inlineCallbacks
def test_too_many_volumes(self):
"""Ensure that NoMoreTargets is raised when we run out of volumes."""
vols = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
vols.append(volume_id)
volume_id = self._create_volume()
- self.assertFailure(self.volume.create_volume(self.context,
- volume_id),
- db.NoMoreTargets)
+ self.assertRaises(db.NoMoreTargets,
+ self.volume.create_volume,
+ self.context,
+ volume_id)
db.volume_destroy(context.get_admin_context(), volume_id)
for volume_id in vols:
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
- @defer.inlineCallbacks
def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance."""
inst = {}
@@ -115,15 +110,15 @@ class VolumeTestCase(test.TrialTestCase):
instance_id = db.instance_create(self.context, inst)['id']
mountpoint = "/dev/sdf"
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_id,
mountpoint)
else:
- yield self.compute.attach_volume(self.context,
- instance_id,
- volume_id,
- mountpoint)
+ self.compute.attach_volume(self.context,
+ instance_id,
+ volume_id,
+ mountpoint)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
@@ -131,25 +126,26 @@ class VolumeTestCase(test.TrialTestCase):
instance_ref = db.volume_get_instance(self.context, volume_id)
self.assertEqual(instance_ref['id'], instance_id)
- self.assertFailure(self.volume.delete_volume(self.context, volume_id),
- exception.Error)
+ self.assertRaises(exception.Error,
+ self.volume.delete_volume,
+ self.context,
+ volume_id)
if FLAGS.fake_tests:
db.volume_detached(self.context, volume_id)
else:
- yield self.compute.detach_volume(self.context,
- instance_id,
- volume_id)
+ self.compute.detach_volume(self.context,
+ instance_id,
+ volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.Error,
db.volume_get,
self.context,
volume_id)
db.instance_destroy(self.context, instance_id)
- @defer.inlineCallbacks
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
volume_ids = []
@@ -164,15 +160,11 @@ class VolumeTestCase(test.TrialTestCase):
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
logging.debug("Target %s allocated", iscsi_target)
- deferreds = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
d = self.volume.create_volume(self.context, volume_id)
- d.addCallback(_check)
- d.addErrback(self.fail)
- deferreds.append(d)
- yield defer.DeferredList(deferreds)
+ _check(d)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)
diff --git a/nova/utils.py b/nova/utils.py
index 2739ec3cc..11160c118 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -32,7 +32,8 @@ import struct
import sys
from xml.sax import saxutils
-from twisted.internet.threads import deferToThread
+from eventlet import event
+from eventlet import greenthread
from nova import exception
from nova import flags
@@ -121,7 +122,7 @@ def fetchfile(url, target):
def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
- logging.debug("Running cmd: %s", cmd)
+ logging.debug("Running cmd (subprocess): %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
@@ -141,6 +142,10 @@ def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
stdout=stdout,
stderr=stderr,
cmd=cmd)
+ # NOTE(termie): this appears to be necessary to let the subprocess call
+ # clean something up in between calls, without it two
+ # execute calls in a row hangs the second one
+ greenthread.sleep(0)
return result
@@ -169,13 +174,7 @@ def debug(arg):
def runthis(prompt, cmd, check_exit_code=True):
logging.debug("Running %s" % (cmd))
- exit_code = subprocess.call(cmd.split(" "))
- logging.debug(prompt % (exit_code))
- if check_exit_code and exit_code != 0:
- raise ProcessExecutionError(exit_code=exit_code,
- stdout=None,
- stderr=None,
- cmd=cmd)
+ rv, err = execute(cmd, check_exit_code=check_exit_code)
def generate_uid(topic, size=8):
@@ -270,10 +269,41 @@ class LazyPluggable(object):
return getattr(backend, key)
-def deferredToThread(f):
- def g(*args, **kwargs):
- return deferToThread(f, *args, **kwargs)
- return g
+class LoopingCall(object):
+ def __init__(self, f=None, *args, **kw):
+ self.args = args
+ self.kw = kw
+ self.f = f
+ self._running = False
+
+ def start(self, interval, now=True):
+ self._running = True
+ done = event.Event()
+
+ def _inner():
+ if not now:
+ greenthread.sleep(interval)
+ try:
+ while self._running:
+ self.f(*self.args, **self.kw)
+ greenthread.sleep(interval)
+ except Exception:
+ logging.exception('in looping call')
+ done.send_exception(*sys.exc_info())
+ return
+
+ done.send(True)
+
+ self.done = done
+
+ greenthread.spawn(_inner)
+ return self.done
+
+ def stop(self):
+ self._running = False
+
+ def wait(self):
+ return self.done.wait()
def xhtml_escape(value):
diff --git a/nova/validate.py b/nova/validate.py
deleted file mode 100644
index 7ea27daa6..000000000
--- a/nova/validate.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Decorators for argument validation, courtesy of
-http://rmi.net/~lutz/rangetest.html"""
-
-
-def rangetest(**argchecks):
- """Validate ranges for both + defaults"""
-
- def onDecorator(func):
- """onCall remembers func and argchecks"""
- import sys
- code = func.__code__ if sys.version_info[0] == 3 else func.func_code
- allargs = code.co_varnames[:code.co_argcount]
- funcname = func.__name__
-
- def onCall(*pargs, **kargs):
- # all pargs match first N args by position
- # the rest must be in kargs or omitted defaults
- positionals = list(allargs)
- positionals = positionals[:len(pargs)]
-
- for (argname, (low, high)) in argchecks.items():
- # for all args to be checked
- if argname in kargs:
- # was passed by name
- if float(kargs[argname]) < low or \
- float(kargs[argname]) > high:
- errmsg = '{0} argument "{1}" not in {2}..{3}'
- errmsg = errmsg.format(funcname, argname, low, high)
- raise TypeError(errmsg)
-
- elif argname in positionals:
- # was passed by position
- position = positionals.index(argname)
- if float(pargs[position]) < low or \
- float(pargs[position]) > high:
- errmsg = '{0} argument "{1}" with value of {4} ' \
- 'not in {2}..{3}'
- errmsg = errmsg.format(funcname, argname, low, high,
- pargs[position])
- raise TypeError(errmsg)
- else:
- pass
-
- return func(*pargs, **kargs) # okay: run original call
- return onCall
- return onDecorator
-
-
-def typetest(**argchecks):
- def onDecorator(func):
- import sys
- code = func.__code__ if sys.version_info[0] == 3 else func.func_code
- allargs = code.co_varnames[:code.co_argcount]
- funcname = func.__name__
-
- def onCall(*pargs, **kargs):
- positionals = list(allargs)[:len(pargs)]
- for (argname, typeof) in argchecks.items():
- if argname in kargs:
- if not isinstance(kargs[argname], typeof):
- errmsg = '{0} argument "{1}" not of type {2}'
- errmsg = errmsg.format(funcname, argname, typeof)
- raise TypeError(errmsg)
- elif argname in positionals:
- position = positionals.index(argname)
- if not isinstance(pargs[position], typeof):
- errmsg = '{0} argument "{1}" with value of {2} ' \
- 'not of type {3}'
- errmsg = errmsg.format(funcname, argname,
- pargs[position], typeof)
- raise TypeError(errmsg)
- else:
- pass
- return func(*pargs, **kargs)
- return onCall
- return onDecorator
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 11f0fa8ce..c40bb4bb4 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -25,7 +25,7 @@ import sys
from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
-from nova.virt import xenapi
+from nova.virt import xenapi_conn
FLAGS = flags.FLAGS
@@ -61,7 +61,7 @@ def get_connection(read_only=False):
elif t == 'libvirt':
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
- conn = xenapi.get_connection(read_only)
+ conn = xenapi_conn.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index f855523d3..77bc926c2 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -25,8 +25,6 @@ semantics of real hypervisor connections.
"""
-from twisted.internet import defer
-
from nova import exception
from nova.compute import power_state
@@ -107,7 +105,6 @@ class FakeConnection(object):
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
- return defer.succeed(None)
def reboot(self, instance):
"""
@@ -119,19 +116,19 @@ class FakeConnection(object):
The work will be done asynchronously. This function returns a
Deferred that allows the caller to detect when it is complete.
"""
- return defer.succeed(None)
+ pass
def rescue(self, instance):
"""
Rescue the specified instance.
"""
- return defer.succeed(None)
+ pass
def unrescue(self, instance):
"""
Unrescue the specified instance.
"""
- return defer.succeed(None)
+ pass
def destroy(self, instance):
"""
@@ -144,7 +141,6 @@ class FakeConnection(object):
Deferred that allows the caller to detect when it is complete.
"""
del self.instances[instance.name]
- return defer.succeed(None)
def attach_volume(self, instance_name, device_path, mountpoint):
"""Attach the disk at device_path to the instance at mountpoint"""
diff --git a/nova/virt/images.py b/nova/virt/images.py
index 981aa5cf3..1c9b2e093 100644
--- a/nova/virt/images.py
+++ b/nova/virt/images.py
@@ -26,7 +26,7 @@ import time
import urlparse
from nova import flags
-from nova import process
+from nova import utils
from nova.auth import manager
from nova.auth import signer
from nova.objectstore import image
@@ -50,7 +50,7 @@ def _fetch_s3_image(image, path, user, project):
# This should probably move somewhere else, like e.g. a download_as
# method on User objects and at the same time get rewritten to use
- # twisted web client.
+ # a web client.
headers = {}
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
@@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project):
cmd = ['/usr/bin/curl', '--fail', '--silent', url]
for (k, v) in headers.iteritems():
- cmd += ['-H', '%s: %s' % (k, v)]
+ cmd += ['-H', '"%s: %s"' % (k, v)]
cmd += ['-o', path]
- return process.SharedPool().execute(executable=cmd[0], args=cmd[1:])
+ cmd_out = ' '.join(cmd)
+ return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
source = _image_path('%s/image' % image)
- return process.simple_execute('cp %s %s' % (source, path))
+ return utils.execute('cp %s %s' % (source, path))
def _image_path(path):
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 18085089f..5a8c71850 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -45,16 +45,15 @@ import logging
import os
import shutil
+from eventlet import event
+from eventlet import tpool
+
import IPy
-from twisted.internet import defer
-from twisted.internet import task
-from twisted.internet import threads
from nova import context
from nova import db
from nova import exception
from nova import flags
-from nova import process
from nova import utils
#from nova.api import context
from nova.auth import manager
@@ -184,14 +183,12 @@ class LibvirtConnection(object):
except Exception as _err:
pass
# If the instance is already terminated, we're still happy
- d = defer.Deferred()
- if cleanup:
- d.addCallback(lambda _: self._cleanup(instance))
- # FIXME: What does this comment mean?
- # TODO(termie): short-circuit me for tests
- # WE'LL save this for when we do shutdown,
+
+ done = event.Event()
+
+ # We'll save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_shutdown():
try:
@@ -200,17 +197,26 @@ class LibvirtConnection(object):
instance['id'], state)
if state == power_state.SHUTDOWN:
timer.stop()
- d.callback(None)
except Exception:
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_shutdown
- timer.start(interval=0.5, now=True)
- return d
+ timer_done = timer.start(interval=0.5, now=True)
+
+ # NOTE(termie): this is strictly superfluous (we could put the
+ # cleanup code in the timer), but this emulates the
+ # previous model so I am keeping it around until
+ # everything has been vetted a bit
+ def _wait_for_timer():
+ timer_done.wait()
+ self._cleanup(instance)
+ done.send()
+
+ greenthread.spawn(_wait_for_timer)
+ return done
def _cleanup(self, instance):
target = os.path.join(FLAGS.instances_path, instance['name'])
@@ -219,7 +225,6 @@ class LibvirtConnection(object):
if os.path.exists(target):
shutil.rmtree(target)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, instance_name, device_path, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@@ -230,7 +235,6 @@ class LibvirtConnection(object):
<target dev='%s' bus='virtio'/>
</disk>""" % (device_path, mount_device)
virt_dom.attachDevice(xml)
- yield
def _get_disk_xml(self, xml, device):
"""Returns the xml for the disk mounted at device"""
@@ -252,7 +256,6 @@ class LibvirtConnection(object):
if doc != None:
doc.freeDoc()
- @defer.inlineCallbacks
@exception.wrap_exception
def detach_volume(self, instance_name, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@@ -261,17 +264,13 @@ class LibvirtConnection(object):
if not xml:
raise exception.NotFound("No disk at %s" % mount_device)
virt_dom.detachDevice(xml)
- yield
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot(self, instance):
- yield self.destroy(instance, False)
+ self.destroy(instance, False)
xml = self.to_xml(instance)
- yield self._conn.createXML(xml, 0)
-
- d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ self._conn.createXML(xml, 0)
+ timer = utils.LoopingCall(f=None)
def _wait_for_reboot():
try:
@@ -281,33 +280,28 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: rebooted', instance['name'])
timer.stop()
- d.callback(None)
except Exception, exn:
logging.error('_wait_for_reboot failed: %s', exn)
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_reboot
- timer.start(interval=0.5, now=True)
- yield d
+ return timer.start(interval=0.5, now=True)
- @defer.inlineCallbacks
@exception.wrap_exception
def rescue(self, instance):
- yield self.destroy(instance, False)
+ self.destroy(instance, False)
xml = self.to_xml(instance, rescue=True)
rescue_images = {'image_id': FLAGS.rescue_image_id,
'kernel_id': FLAGS.rescue_kernel_id,
'ramdisk_id': FLAGS.rescue_ramdisk_id}
- yield self._create_image(instance, xml, 'rescue-', rescue_images)
- yield self._conn.createXML(xml, 0)
+ self._create_image(instance, xml, 'rescue-', rescue_images)
+ self._conn.createXML(xml, 0)
- d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_rescue():
try:
@@ -316,27 +310,22 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: rescued', instance['name'])
timer.stop()
- d.callback(None)
except Exception, exn:
logging.error('_wait_for_rescue failed: %s', exn)
db.instance_set_state(None,
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_rescue
- timer.start(interval=0.5, now=True)
- yield d
+ return timer.start(interval=0.5, now=True)
- @defer.inlineCallbacks
@exception.wrap_exception
def unrescue(self, instance):
# NOTE(vish): Because reboot destroys and recreates an instance using
# the normal xml file, we can just call reboot here
- yield self.reboot(instance)
+ self.reboot(instance)
- @defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
xml = self.to_xml(instance)
@@ -344,14 +333,12 @@ class LibvirtConnection(object):
instance['id'],
power_state.NOSTATE,
'launching')
- yield NWFilterFirewall(self._conn).\
- setup_nwfilters_for_instance(instance)
- yield self._create_image(instance, xml)
- yield self._conn.createXML(xml, 0)
+ NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance)
+ self._create_image(instance, xml)
+ self._conn.createXML(xml, 0)
logging.debug("instance %s: is running", instance['name'])
- local_d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_boot():
try:
@@ -361,7 +348,6 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: booted', instance['name'])
timer.stop()
- local_d.callback(None)
except:
logging.exception('instance %s: failed to boot',
instance['name'])
@@ -369,10 +355,9 @@ class LibvirtConnection(object):
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- local_d.callback(None)
+
timer.f = _wait_for_boot
- timer.start(interval=0.5, now=True)
- yield local_d
+ return timer.start(interval=0.5, now=True)
def _flush_xen_console(self, virsh_output):
logging.info('virsh said: %r' % (virsh_output,))
@@ -380,10 +365,9 @@ class LibvirtConnection(object):
if virsh_output.startswith('/dev/'):
logging.info('cool, it\'s a device')
- d = process.simple_execute("sudo dd if=%s iflag=nonblock" %
- virsh_output, check_exit_code=False)
- d.addCallback(lambda r: r[0])
- return d
+ out, err = utils.execute("sudo dd if=%s iflag=nonblock" %
+ virsh_output, check_exit_code=False)
+ return out
else:
return ''
@@ -403,21 +387,20 @@ class LibvirtConnection(object):
def get_console_output(self, instance):
console_log = os.path.join(FLAGS.instances_path, instance['name'],
'console.log')
- d = process.simple_execute('sudo chown %d %s' % (os.getuid(),
- console_log))
+
+ utils.execute('sudo chown %d %s' % (os.getuid(), console_log))
+
if FLAGS.libvirt_type == 'xen':
- # Xen is spethial
- d.addCallback(lambda _:
- process.simple_execute("virsh ttyconsole %s" %
- instance['name']))
- d.addCallback(self._flush_xen_console)
- d.addCallback(self._append_to_file, console_log)
+ # Xen is special
+ virsh_output = utils.execute("virsh ttyconsole %s" %
+ instance['name'])
+ data = self._flush_xen_console(virsh_output)
+ fpath = self._append_to_file(data, console_log)
else:
- d.addCallback(lambda _: defer.succeed(console_log))
- d.addCallback(self._dump_file)
- return d
+ fpath = console_log
+
+ return self._dump_file(fpath)
- @defer.inlineCallbacks
def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None):
# syntactic nicety
basepath = lambda fname = '', prefix = prefix: os.path.join(
@@ -426,8 +409,8 @@ class LibvirtConnection(object):
prefix + fname)
# ensure directories exist and are writable
- yield process.simple_execute('mkdir -p %s' % basepath(prefix=''))
- yield process.simple_execute('chmod 0777 %s' % basepath(prefix=''))
+ utils.execute('mkdir -p %s' % basepath(prefix=''))
+ utils.execute('chmod 0777 %s' % basepath(prefix=''))
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
@@ -448,19 +431,19 @@ class LibvirtConnection(object):
'kernel_id': inst['kernel_id'],
'ramdisk_id': inst['ramdisk_id']}
if not os.path.exists(basepath('disk')):
- yield images.fetch(inst.image_id, basepath('disk-raw'), user,
- project)
+ images.fetch(inst.image_id, basepath('disk-raw'), user,
+ project)
if not os.path.exists(basepath('kernel')):
- yield images.fetch(inst.kernel_id, basepath('kernel'), user,
- project)
+ images.fetch(inst.kernel_id, basepath('kernel'), user,
+ project)
if not os.path.exists(basepath('ramdisk')):
- yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user,
- project)
+ images.fetch(inst.ramdisk_id, basepath('ramdisk'), user,
+ project)
- execute = lambda cmd, process_input = None, check_exit_code = True: \
- process.simple_execute(cmd=cmd,
- process_input=process_input,
- check_exit_code=check_exit_code)
+ def execute(cmd, process_input=None, check_exit_code=True):
+ return utils.execute(cmd=cmd,
+ process_input=process_input,
+ check_exit_code=check_exit_code)
key = str(inst['key_data'])
net = None
@@ -482,11 +465,11 @@ class LibvirtConnection(object):
if net:
logging.info('instance %s: injecting net into image %s',
inst['name'], inst.image_id)
- yield disk.inject_data(basepath('disk-raw'), key, net,
- execute=execute)
+ disk.inject_data(basepath('disk-raw'), key, net,
+ execute=execute)
if os.path.exists(basepath('disk')):
- yield process.simple_execute('rm -f %s' % basepath('disk'))
+ utils.execute('rm -f %s' % basepath('disk'))
local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type]
['local_gb']
@@ -495,12 +478,11 @@ class LibvirtConnection(object):
resize = True
if inst['instance_type'] == 'm1.tiny' or prefix == 'rescue-':
resize = False
- yield disk.partition(basepath('disk-raw'), basepath('disk'),
- local_bytes, resize, execute=execute)
+ disk.partition(basepath('disk-raw'), basepath('disk'),
+ local_bytes, resize, execute=execute)
if FLAGS.libvirt_type == 'uml':
- yield process.simple_execute('sudo chown root %s' %
- basepath('disk'))
+ utils.execute('sudo chown root %s' % basepath('disk'))
def to_xml(self, instance, rescue=False):
# TODO(termie): cache?
@@ -758,15 +740,15 @@ class NWFilterFirewall(object):
def _define_filter(self, xml):
if callable(xml):
xml = xml()
- d = threads.deferToThread(self._conn.nwfilterDefineXML, xml)
- return d
+
+ # execute in a native thread and block current greenthread until done
+ tpool.execute(self._conn.nwfilterDefineXML, xml)
@staticmethod
def _get_net_and_mask(cidr):
net = IPy.IP(cidr)
return str(net.net()), str(net.netmask())
- @defer.inlineCallbacks
def setup_nwfilters_for_instance(self, instance):
"""
Creates an NWFilter for the given instance. In the process,
@@ -774,10 +756,10 @@ class NWFilterFirewall(object):
the base filter are all in place.
"""
- yield self._define_filter(self.nova_base_ipv4_filter)
- yield self._define_filter(self.nova_base_ipv6_filter)
- yield self._define_filter(self.nova_dhcp_filter)
- yield self._define_filter(self.nova_base_filter)
+ self._define_filter(self.nova_base_ipv4_filter)
+ self._define_filter(self.nova_base_ipv6_filter)
+ self._define_filter(self.nova_dhcp_filter)
+ self._define_filter(self.nova_base_filter)
nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \
" <filterref filter='nova-base' />\n" % \
@@ -789,20 +771,19 @@ class NWFilterFirewall(object):
net, mask = self._get_net_and_mask(network_ref['cidr'])
project_filter = self.nova_project_filter(instance['project_id'],
net, mask)
- yield self._define_filter(project_filter)
+ self._define_filter(project_filter)
nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \
instance['project_id']
for security_group in instance.security_groups:
- yield self.ensure_security_group_filter(security_group['id'])
+ self.ensure_security_group_filter(security_group['id'])
nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \
security_group['id']
nwfilter_xml += "</filter>"
- yield self._define_filter(nwfilter_xml)
- return
+ self._define_filter(nwfilter_xml)
def ensure_security_group_filter(self, security_group_id):
return self._define_filter(
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
deleted file mode 100644
index 3169562a5..000000000
--- a/nova/virt/xenapi.py
+++ /dev/null
@@ -1,444 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (c) 2010 Citrix Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-A connection to XenServer or Xen Cloud Platform.
-
-The concurrency model for this class is as follows:
-
-All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
-deferredToThread). They are remote calls, and so may hang for the usual
-reasons. They should not be allowed to block the reactor thread.
-
-All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion. Polling is handled using reactor.callLater.
-
-This combination of techniques means that we don't block the reactor thread at
-all, and at the same time we don't hold lots of threads waiting for
-long-running operations.
-
-FIXME: get_info currently doesn't conform to these rules, and will block the
-reactor thread if the VM.get_by_name_label or VM.get_record calls block.
-
-**Related Flags**
-
-:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
-:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
- Platform (default: root).
-:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
- Platform.
-:xenapi_task_poll_interval: The interval (seconds) used for polling of
- remote tasks (Async.VM.start, etc)
- (default: 0.5).
-
-"""
-
-import logging
-import xmlrpclib
-
-from twisted.internet import defer
-from twisted.internet import reactor
-from twisted.internet import task
-
-from nova import db
-from nova import flags
-from nova import process
-from nova import utils
-from nova.auth.manager import AuthManager
-from nova.compute import instance_types
-from nova.compute import power_state
-from nova.virt import images
-
-XenAPI = None
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string('xenapi_connection_url',
- None,
- 'URL for connection to XenServer/Xen Cloud Platform.'
- ' Required if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_username',
- 'root',
- 'Username for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_password',
- None,
- 'Password for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_float('xenapi_task_poll_interval',
- 0.5,
- 'The interval used for polling of remote tasks '
- '(Async.VM.start, etc). Used only if '
- 'connection_type=xenapi.')
-
-
-XENAPI_POWER_STATE = {
- 'Halted': power_state.SHUTDOWN,
- 'Running': power_state.RUNNING,
- 'Paused': power_state.PAUSED,
- 'Suspended': power_state.SHUTDOWN, # FIXME
- 'Crashed': power_state.CRASHED}
-
-
-def get_connection(_):
- """Note that XenAPI doesn't have a read-only connection mode, so
- the read_only parameter is ignored."""
- # This is loaded late so that there's no need to install this
- # library when not using XenAPI.
- global XenAPI
- if XenAPI is None:
- XenAPI = __import__('XenAPI')
- url = FLAGS.xenapi_connection_url
- username = FLAGS.xenapi_connection_username
- password = FLAGS.xenapi_connection_password
- if not url or password is None:
- raise Exception('Must specify xenapi_connection_url, '
- 'xenapi_connection_username (optionally), and '
- 'xenapi_connection_password to use '
- 'connection_type=xenapi')
- return XenAPIConnection(url, username, password)
-
-
-class XenAPIConnection(object):
- def __init__(self, url, user, pw):
- self._conn = XenAPI.Session(url)
- self._conn.login_with_password(user, pw)
-
- def list_instances(self):
- return [self._conn.xenapi.VM.get_name_label(vm) \
- for vm in self._conn.xenapi.VM.get_all()]
-
- @defer.inlineCallbacks
- def spawn(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is not None:
- raise Exception('Attempted to create non-unique name %s' %
- instance.name)
-
- network = db.project_get_network(None, instance.project_id)
- network_ref = \
- yield self._find_network_with_bridge(network.bridge)
-
- user = AuthManager().get_user(instance.user_id)
- project = AuthManager().get_project(instance.project_id)
- vdi_uuid = yield self._fetch_image(
- instance.image_id, user, project, True)
- kernel = yield self._fetch_image(
- instance.kernel_id, user, project, False)
- ramdisk = yield self._fetch_image(
- instance.ramdisk_id, user, project, False)
- vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
-
- vm_ref = yield self._create_vm(instance, kernel, ramdisk)
- yield self._create_vbd(vm_ref, vdi_ref, 0, True)
- if network_ref:
- yield self._create_vif(vm_ref, network_ref, instance.mac_address)
- logging.debug('Starting VM %s...', vm_ref)
- yield self._call_xenapi('VM.start', vm_ref, False, False)
- logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
-
- @defer.inlineCallbacks
- def _create_vm(self, instance, kernel, ramdisk):
- """Create a VM record. Returns a Deferred that gives the new
- VM reference."""
-
- instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
- mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
- vcpus = str(instance_type['vcpus'])
- rec = {
- 'name_label': instance.name,
- 'name_description': '',
- 'is_a_template': False,
- 'memory_static_min': '0',
- 'memory_static_max': mem,
- 'memory_dynamic_min': mem,
- 'memory_dynamic_max': mem,
- 'VCPUs_at_startup': vcpus,
- 'VCPUs_max': vcpus,
- 'VCPUs_params': {},
- 'actions_after_shutdown': 'destroy',
- 'actions_after_reboot': 'restart',
- 'actions_after_crash': 'destroy',
- 'PV_bootloader': '',
- 'PV_kernel': kernel,
- 'PV_ramdisk': ramdisk,
- 'PV_args': 'root=/dev/xvda1',
- 'PV_bootloader_args': '',
- 'PV_legacy_args': '',
- 'HVM_boot_policy': '',
- 'HVM_boot_params': {},
- 'platform': {},
- 'PCI_bus': '',
- 'recommendations': '',
- 'affinity': '',
- 'user_version': '0',
- 'other_config': {},
- }
- logging.debug('Created VM %s...', instance.name)
- vm_ref = yield self._call_xenapi('VM.create', rec)
- logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- defer.returnValue(vm_ref)
-
- @defer.inlineCallbacks
- def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
- """Create a VBD record. Returns a Deferred that gives the new
- VBD reference."""
-
- vbd_rec = {}
- vbd_rec['VM'] = vm_ref
- vbd_rec['VDI'] = vdi_ref
- vbd_rec['userdevice'] = str(userdevice)
- vbd_rec['bootable'] = bootable
- vbd_rec['mode'] = 'RW'
- vbd_rec['type'] = 'disk'
- vbd_rec['unpluggable'] = True
- vbd_rec['empty'] = False
- vbd_rec['other_config'] = {}
- vbd_rec['qos_algorithm_type'] = ''
- vbd_rec['qos_algorithm_params'] = {}
- vbd_rec['qos_supported_algorithms'] = []
- logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
- logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
- vdi_ref)
- defer.returnValue(vbd_ref)
-
- @defer.inlineCallbacks
- def _create_vif(self, vm_ref, network_ref, mac_address):
- """Create a VIF record. Returns a Deferred that gives the new
- VIF reference."""
-
- vif_rec = {}
- vif_rec['device'] = '0'
- vif_rec['network'] = network_ref
- vif_rec['VM'] = vm_ref
- vif_rec['MAC'] = mac_address
- vif_rec['MTU'] = '1500'
- vif_rec['other_config'] = {}
- vif_rec['qos_algorithm_type'] = ''
- vif_rec['qos_algorithm_params'] = {}
- logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
- network_ref)
- vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
- logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
- vm_ref, network_ref)
- defer.returnValue(vif_ref)
-
- @defer.inlineCallbacks
- def _find_network_with_bridge(self, bridge):
- expr = 'field "bridge" = "%s"' % bridge
- networks = yield self._call_xenapi('network.get_all_records_where',
- expr)
- if len(networks) == 1:
- defer.returnValue(networks.keys()[0])
- elif len(networks) > 1:
- raise Exception('Found non-unique network for bridge %s' % bridge)
- else:
- raise Exception('Found no network for bridge %s' % bridge)
-
- @defer.inlineCallbacks
- def _fetch_image(self, image, user, project, use_sr):
- """use_sr: True to put the image as a VDI in an SR, False to place
- it on dom0's filesystem. The former is for VM disks, the latter for
- its kernel and ramdisk (if external kernels are being used).
- Returns a Deferred that gives the new VDI UUID."""
-
- url = images.image_url(image)
- access = AuthManager().get_access_key(user, project)
- logging.debug("Asking xapi to fetch %s as %s" % (url, access))
- fn = use_sr and 'get_vdi' or 'get_kernel'
- args = {}
- args['src_url'] = url
- args['username'] = access
- args['password'] = user.secret
- if use_sr:
- args['add_partition'] = 'true'
- task = yield self._async_call_plugin('objectstore', fn, args)
- uuid = yield self._wait_for_task(task)
- defer.returnValue(uuid)
-
- @defer.inlineCallbacks
- def reboot(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- raise Exception('instance not present %s' % instance.name)
- task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
- yield self._wait_for_task(task)
-
- @defer.inlineCallbacks
- def destroy(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- # Don't complain, just return. This lets us clean up instances
- # that have already disappeared from the underlying platform.
- defer.returnValue(None)
- # Get the VDIs related to the VM
- vdis = yield self._lookup_vm_vdis(vm)
- try:
- task = yield self._call_xenapi('Async.VM.hard_shutdown', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- # Disk clean-up
- if vdis:
- for vdi in vdis:
- try:
- task = yield self._call_xenapi('Async.VDI.destroy', vdi)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- try:
- task = yield self._call_xenapi('Async.VM.destroy', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
-
- def get_info(self, instance_id):
- vm = self._lookup_blocking(instance_id)
- if vm is None:
- raise Exception('instance not present %s' % instance_id)
- rec = self._conn.xenapi.VM.get_record(vm)
- return {'state': XENAPI_POWER_STATE[rec['power_state']],
- 'max_mem': long(rec['memory_static_max']) >> 10,
- 'mem': long(rec['memory_dynamic_max']) >> 10,
- 'num_cpu': rec['VCPUs_max'],
- 'cpu_time': 0}
-
- def get_console_output(self, instance):
- return 'FAKE CONSOLE OUTPUT'
-
- @utils.deferredToThread
- def _lookup(self, i):
- return self._lookup_blocking(i)
-
- def _lookup_blocking(self, i):
- vms = self._conn.xenapi.VM.get_by_name_label(i)
- n = len(vms)
- if n == 0:
- return None
- elif n > 1:
- raise Exception('duplicate name found: %s' % i)
- else:
- return vms[0]
-
- @utils.deferredToThread
- def _lookup_vm_vdis(self, vm):
- return self._lookup_vm_vdis_blocking(vm)
-
- def _lookup_vm_vdis_blocking(self, vm):
- # Firstly we get the VBDs, then the VDIs.
- # TODO: do we leave the read-only devices?
- vbds = self._conn.xenapi.VM.get_VBDs(vm)
- vdis = []
- if vbds:
- for vbd in vbds:
- try:
- vdi = self._conn.xenapi.VBD.get_VDI(vbd)
- # Test valid VDI
- record = self._conn.xenapi.VDI.get_record(vdi)
- except Exception, exc:
- logging.warn(exc)
- else:
- vdis.append(vdi)
- if len(vdis) > 0:
- return vdis
- else:
- return None
-
- def _wait_for_task(self, task):
- """Return a Deferred that will give the result of the given task.
- The task is polled until it completes."""
- d = defer.Deferred()
- reactor.callLater(0, self._poll_task, task, d)
- return d
-
- @utils.deferredToThread
- def _poll_task(self, task, deferred):
- """Poll the given XenAPI task, and fire the given Deferred if we
- get a result."""
- try:
- #logging.debug('Polling task %s...', task)
- status = self._conn.xenapi.task.get_status(task)
- if status == 'pending':
- reactor.callLater(FLAGS.xenapi_task_poll_interval,
- self._poll_task, task, deferred)
- elif status == 'success':
- result = self._conn.xenapi.task.get_result(task)
- logging.info('Task %s status: success. %s', task, result)
- deferred.callback(_parse_xmlrpc_value(result))
- else:
- error_info = self._conn.xenapi.task.get_error_info(task)
- logging.warn('Task %s status: %s. %s', task, status,
- error_info)
- deferred.errback(XenAPI.Failure(error_info))
- #logging.debug('Polling task %s done.', task)
- except Exception, exc:
- logging.warn(exc)
- deferred.errback(exc)
-
- @utils.deferredToThread
- def _call_xenapi(self, method, *args):
- """Call the specified XenAPI method on a background thread. Returns
- a Deferred for the result."""
- f = self._conn.xenapi
- for m in method.split('.'):
- f = f.__getattr__(m)
- return f(*args)
-
- @utils.deferredToThread
- def _async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread. Returns a
- Deferred with the task reference."""
- return _unwrap_plugin_exceptions(
- self._conn.xenapi.Async.host.call_plugin,
- self._get_xenapi_host(), plugin, fn, args)
-
- def _get_xenapi_host(self):
- return self._conn.xenapi.session.get_this_host(self._conn.handle)
-
-
-def _unwrap_plugin_exceptions(func, *args, **kwargs):
- try:
- return func(*args, **kwargs)
- except XenAPI.Failure, exc:
- logging.debug("Got exception: %s", exc)
- if (len(exc.details) == 4 and
- exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
- exc.details[2] == 'Failure'):
- params = None
- try:
- params = eval(exc.details[3])
- except:
- raise exc
- raise XenAPI.Failure(params)
- else:
- raise
- except xmlrpclib.ProtocolError, exc:
- logging.debug("Got exception: %s", exc)
- raise
-
-
-def _parse_xmlrpc_value(val):
- """Parse the given value as if it were an XML-RPC value. This is
- sometimes used as the format for the task.result field."""
- if not val:
- return val
- x = xmlrpclib.loads(
- '<?xml version="1.0"?><methodResponse><params><param>' +
- val +
- '</param></params></methodResponse>')
- return x[0][0]
diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py
new file mode 100644
index 000000000..3d598c463
--- /dev/null
+++ b/nova/virt/xenapi/__init__.py
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py
new file mode 100644
index 000000000..012954394
--- /dev/null
+++ b/nova/virt/xenapi/network_utils.py
@@ -0,0 +1,41 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of network
+records and their attributes like bridges, PIFs, QoS, as well as
+their lookup functions.
+"""
+
+
+class NetworkHelper():
+ """
+ The class that wraps the helper methods together.
+ """
+ def __init__(self):
+ return
+
+ @classmethod
+ def find_network_with_bridge(cls, session, bridge):
+ """ Return the network on which the bridge is attached, if found."""
+ expr = 'field "bridge" = "%s"' % bridge
+ networks = session.call_xenapi('network.get_all_records_where', expr)
+ if len(networks) == 1:
+ return networks.keys()[0]
+ elif len(networks) > 1:
+ raise Exception('Found non-unique network for bridge %s' % bridge)
+ else:
+ raise Exception('Found no network for bridge %s' % bridge)
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
new file mode 100644
index 000000000..2f5d78e75
--- /dev/null
+++ b/nova/virt/xenapi/vm_utils.py
@@ -0,0 +1,260 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of VM records and
+their attributes like VDIs, VIFs, as well as their lookup functions.
+"""
+
+import logging
+import urllib
+from xml.dom import minidom
+
+from nova import flags
+from nova import utils
+from nova.auth.manager import AuthManager
+from nova.compute import instance_types
+from nova.compute import power_state
+from nova.virt import images
+
+
+FLAGS = flags.FLAGS
+
+XENAPI_POWER_STATE = {
+ 'Halted': power_state.SHUTDOWN,
+ 'Running': power_state.RUNNING,
+ 'Paused': power_state.PAUSED,
+ 'Suspended': power_state.SHUTDOWN, # FIXME
+ 'Crashed': power_state.CRASHED}
+
+XenAPI = None
+
+
+class VMHelper():
+ """
+ The class that wraps the helper methods together.
+ """
+ def __init__(self):
+ return
+
+ @classmethod
+ def late_import(cls):
+ """
+ Load the XenAPI module in for helper class, if required.
+ This is to avoid to install the XenAPI library when other
+ hypervisors are used
+ """
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+
+ @classmethod
+ def create_vm(cls, session, instance, kernel, ramdisk):
+ """Create a VM record. Returns a Deferred that gives the new
+ VM reference."""
+
+ instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
+ mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
+ vcpus = str(instance_type['vcpus'])
+ rec = {
+ 'name_label': instance.name,
+ 'name_description': '',
+ 'is_a_template': False,
+ 'memory_static_min': '0',
+ 'memory_static_max': mem,
+ 'memory_dynamic_min': mem,
+ 'memory_dynamic_max': mem,
+ 'VCPUs_at_startup': vcpus,
+ 'VCPUs_max': vcpus,
+ 'VCPUs_params': {},
+ 'actions_after_shutdown': 'destroy',
+ 'actions_after_reboot': 'restart',
+ 'actions_after_crash': 'destroy',
+ 'PV_bootloader': '',
+ 'PV_kernel': kernel,
+ 'PV_ramdisk': ramdisk,
+ 'PV_args': 'root=/dev/xvda1',
+ 'PV_bootloader_args': '',
+ 'PV_legacy_args': '',
+ 'HVM_boot_policy': '',
+ 'HVM_boot_params': {},
+ 'platform': {},
+ 'PCI_bus': '',
+ 'recommendations': '',
+ 'affinity': '',
+ 'user_version': '0',
+ 'other_config': {},
+ }
+ logging.debug('Created VM %s...', instance.name)
+ vm_ref = session.call_xenapi('VM.create', rec)
+ logging.debug('Created VM %s as %s.', instance.name, vm_ref)
+ return vm_ref
+
+ @classmethod
+ def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable):
+ """Create a VBD record. Returns a Deferred that gives the new
+ VBD reference."""
+
+ vbd_rec = {}
+ vbd_rec['VM'] = vm_ref
+ vbd_rec['VDI'] = vdi_ref
+ vbd_rec['userdevice'] = str(userdevice)
+ vbd_rec['bootable'] = bootable
+ vbd_rec['mode'] = 'RW'
+ vbd_rec['type'] = 'disk'
+ vbd_rec['unpluggable'] = True
+ vbd_rec['empty'] = False
+ vbd_rec['other_config'] = {}
+ vbd_rec['qos_algorithm_type'] = ''
+ vbd_rec['qos_algorithm_params'] = {}
+ vbd_rec['qos_supported_algorithms'] = []
+ logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
+ vbd_ref = session.call_xenapi('VBD.create', vbd_rec)
+ logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
+ vdi_ref)
+ return vbd_ref
+
+ @classmethod
+ def create_vif(cls, session, vm_ref, network_ref, mac_address):
+ """Create a VIF record. Returns a Deferred that gives the new
+ VIF reference."""
+
+ vif_rec = {}
+ vif_rec['device'] = '0'
+ vif_rec['network'] = network_ref
+ vif_rec['VM'] = vm_ref
+ vif_rec['MAC'] = mac_address
+ vif_rec['MTU'] = '1500'
+ vif_rec['other_config'] = {}
+ vif_rec['qos_algorithm_type'] = ''
+ vif_rec['qos_algorithm_params'] = {}
+ logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
+ network_ref)
+ vif_ref = session.call_xenapi('VIF.create', vif_rec)
+ logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
+ vm_ref, network_ref)
+ return vif_ref
+
+ @classmethod
+ def fetch_image(cls, session, image, user, project, use_sr):
+ """use_sr: True to put the image as a VDI in an SR, False to place
+ it on dom0's filesystem. The former is for VM disks, the latter for
+ its kernel and ramdisk (if external kernels are being used).
+ Returns a Deferred that gives the new VDI UUID."""
+
+ url = images.image_url(image)
+ access = AuthManager().get_access_key(user, project)
+ logging.debug("Asking xapi to fetch %s as %s", url, access)
+ fn = use_sr and 'get_vdi' or 'get_kernel'
+ args = {}
+ args['src_url'] = url
+ args['username'] = access
+ args['password'] = user.secret
+ if use_sr:
+ args['add_partition'] = 'true'
+ task = session.async_call_plugin('objectstore', fn, args)
+ uuid = session.wait_for_task(task)
+ return uuid
+
+ @classmethod
+ def lookup(cls, session, i):
+ """ Look the instance i up, and returns it if available """
+ return VMHelper.lookup_blocking(session, i)
+
+ @classmethod
+ def lookup_blocking(cls, session, i):
+ """ Synchronous lookup """
+ vms = session.get_xenapi().VM.get_by_name_label(i)
+ n = len(vms)
+ if n == 0:
+ return None
+ elif n > 1:
+ raise Exception('duplicate name found: %s' % i)
+ else:
+ return vms[0]
+
+ @classmethod
+ def lookup_vm_vdis(cls, session, vm):
+ """ Look for the VDIs that are attached to the VM """
+ return VMHelper.lookup_vm_vdis_blocking(session, vm)
+
+ @classmethod
+ def lookup_vm_vdis_blocking(cls, session, vm):
+ """ Synchronous lookup_vm_vdis """
+ # Firstly we get the VBDs, then the VDIs.
+ # TODO(Armando): do we leave the read-only devices?
+ vbds = session.get_xenapi().VM.get_VBDs(vm)
+ vdis = []
+ if vbds:
+ for vbd in vbds:
+ try:
+ vdi = session.get_xenapi().VBD.get_VDI(vbd)
+ # Test valid VDI
+ record = session.get_xenapi().VDI.get_record(vdi)
+ logging.debug('VDI %s is still available', record['uuid'])
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ else:
+ vdis.append(vdi)
+ if len(vdis) > 0:
+ return vdis
+ else:
+ return None
+
+ @classmethod
+ def compile_info(cls, record):
+ return {'state': XENAPI_POWER_STATE[record['power_state']],
+ 'max_mem': long(record['memory_static_max']) >> 10,
+ 'mem': long(record['memory_dynamic_max']) >> 10,
+ 'num_cpu': record['VCPUs_max'],
+ 'cpu_time': 0}
+
+ @classmethod
+ def compile_diagnostics(cls, session, record):
+ """Compile VM diagnostics data"""
+ try:
+ host = session.get_xenapi_host()
+ host_ip = session.get_xenapi().host.get_record(host)["address"]
+ metrics = session.get_xenapi().VM_guest_metrics.get_record(
+ record["guest_metrics"])
+ diags = {
+ "Kernel": metrics["os_version"]["uname"],
+ "Distro": metrics["os_version"]["name"]}
+ xml = get_rrd(host_ip, record["uuid"])
+ if xml:
+ rrd = minidom.parseString(xml)
+ for i, node in enumerate(rrd.firstChild.childNodes):
+ # We don't want all of the extra garbage
+ if i >= 3 and i <= 11:
+ ref = node.childNodes
+ # Name and Value
+ diags[ref[0].firstChild.data] = ref[6].firstChild.data
+ return diags
+ except XenAPI.Failure as e:
+ return {"Unable to retrieve diagnostics": e}
+
+
+def get_rrd(host, uuid):
+ """Return the VM RRD XML as a string"""
+ try:
+ xml = urllib.urlopen("http://%s:%s@%s/vm_rrd?uuid=%s" % (
+ FLAGS.xenapi_connection_username,
+ FLAGS.xenapi_connection_password,
+ host,
+ uuid))
+ return xml.read()
+ except IOError:
+ return None
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
new file mode 100644
index 000000000..3034df9e1
--- /dev/null
+++ b/nova/virt/xenapi/vmops.py
@@ -0,0 +1,139 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for VM-related functions (spawn, reboot, etc).
+"""
+
+import logging
+
+from nova import db
+from nova import context
+
+from nova.auth.manager import AuthManager
+from nova.virt.xenapi.network_utils import NetworkHelper
+from nova.virt.xenapi.vm_utils import VMHelper
+
+XenAPI = None
+
+
+class VMOps(object):
+ """
+ Management class for VM-related tasks
+ """
+ def __init__(self, session):
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+ self._session = session
+ # Load XenAPI module in the helper class
+ VMHelper.late_import()
+
+ def list_instances(self):
+ """ List VM instances """
+ return [self._session.get_xenapi().VM.get_name_label(vm) \
+ for vm in self._session.get_xenapi().VM.get_all()]
+
+ def spawn(self, instance):
+ """ Create VM instance """
+ vm = VMHelper.lookup(self._session, instance.name)
+ if vm is not None:
+ raise Exception('Attempted to create non-unique name %s' %
+ instance.name)
+
+ bridge = db.project_get_network(context.get_admin_context(),
+ instance.project_id).bridge
+ network_ref = \
+ NetworkHelper.find_network_with_bridge(self._session, bridge)
+
+ user = AuthManager().get_user(instance.user_id)
+ project = AuthManager().get_project(instance.project_id)
+ vdi_uuid = VMHelper.fetch_image(
+ self._session, instance.image_id, user, project, True)
+ kernel = VMHelper.fetch_image(
+ self._session, instance.kernel_id, user, project, False)
+ ramdisk = VMHelper.fetch_image(
+ self._session, instance.ramdisk_id, user, project, False)
+ vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
+ vm_ref = VMHelper.create_vm(
+ self._session, instance, kernel, ramdisk)
+ VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
+ if network_ref:
+ VMHelper.create_vif(self._session, vm_ref,
+ network_ref, instance.mac_address)
+ logging.debug('Starting VM %s...', vm_ref)
+ self._session.call_xenapi('VM.start', vm_ref, False, False)
+ logging.info('Spawning VM %s created %s.', instance.name,
+ vm_ref)
+
+ def reboot(self, instance):
+ """ Reboot VM instance """
+ instance_name = instance.name
+ vm = VMHelper.lookup(self._session, instance_name)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_name)
+ task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
+ self._session.wait_for_task(task)
+
+ def destroy(self, instance):
+ """ Destroy VM instance """
+ vm = VMHelper.lookup(self._session, instance.name)
+ if vm is None:
+ # Don't complain, just return. This lets us clean up instances
+ # that have already disappeared from the underlying platform.
+ return
+ # Get the VDIs related to the VM
+ vdis = VMHelper.lookup_vm_vdis(self._session, vm)
+ try:
+ task = self._session.call_xenapi('Async.VM.hard_shutdown',
+ vm)
+ self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ # Disk clean-up
+ if vdis:
+ for vdi in vdis:
+ try:
+ task = self._session.call_xenapi('Async.VDI.destroy', vdi)
+ self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ try:
+ task = self._session.call_xenapi('Async.VM.destroy', vm)
+ self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+
+ def get_info(self, instance_id):
+ """ Return data about VM instance """
+ vm = VMHelper.lookup_blocking(self._session, instance_id)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_id)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ return VMHelper.compile_info(rec)
+
+ def get_diagnostics(self, instance_id):
+ """Return data about VM diagnostics"""
+ vm = VMHelper.lookup(self._session, instance_id)
+ if vm is None:
+ raise Exception("instance not present %s" % instance_id)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ return VMHelper.compile_diagnostics(self._session, rec)
+
+ def get_console_output(self, instance):
+ """ Return snapshot of console """
+ # TODO: implement this to fix pylint!
+ return 'FAKE CONSOLE OUTPUT of instance'
diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py
new file mode 100644
index 000000000..a4c7a3861
--- /dev/null
+++ b/nova/virt/xenapi/volumeops.py
@@ -0,0 +1,32 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for Storage-related functions (attach, detach, etc).
+"""
+
+
+class VolumeOps(object):
+ def __init__(self, session):
+ self._session = session
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ # FIXME: that's going to be sorted when iscsi-xenapi lands in branch
+ return True
+
+ def detach_volume(self, instance_name, mountpoint):
+ # FIXME: that's going to be sorted when iscsi-xenapi lands in branch
+ return True
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
new file mode 100644
index 000000000..6beb08f5e
--- /dev/null
+++ b/nova/virt/xenapi_conn.py
@@ -0,0 +1,241 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A connection to XenServer or Xen Cloud Platform.
+
+The concurrency model for this class is as follows:
+
+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
+deferredToThread). They are remote calls, and so may hang for the usual
+reasons. They should not be allowed to block the reactor thread.
+
+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion. Polling is handled using reactor.callLater.
+
+This combination of techniques means that we don't block the reactor thread at
+all, and at the same time we don't hold lots of threads waiting for
+long-running operations.
+
+FIXME: get_info currently doesn't conform to these rules, and will block the
+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
+
+**Related Flags**
+
+:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
+:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
+ Platform (default: root).
+:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
+ Platform.
+:xenapi_task_poll_interval: The interval (seconds) used for polling of
+ remote tasks (Async.VM.start, etc)
+ (default: 0.5).
+
+"""
+
+import logging
+import sys
+import xmlrpclib
+
+from eventlet import event
+from eventlet import tpool
+
+from nova import utils
+from nova import flags
+from nova.virt.xenapi.vmops import VMOps
+from nova.virt.xenapi.volumeops import VolumeOps
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('xenapi_connection_url',
+ None,
+ 'URL for connection to XenServer/Xen Cloud Platform.'
+ ' Required if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_username',
+ 'root',
+ 'Username for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_password',
+ None,
+ 'Password for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_float('xenapi_task_poll_interval',
+ 0.5,
+ 'The interval used for polling of remote tasks '
+ '(Async.VM.start, etc). Used only if '
+ 'connection_type=xenapi.')
+
+XenAPI = None
+
+
+def get_connection(_):
+ """Note that XenAPI doesn't have a read-only connection mode, so
+ the read_only parameter is ignored."""
+ # This is loaded late so that there's no need to install this
+ # library when not using XenAPI.
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+ url = FLAGS.xenapi_connection_url
+ username = FLAGS.xenapi_connection_username
+ password = FLAGS.xenapi_connection_password
+ if not url or password is None:
+ raise Exception('Must specify xenapi_connection_url, '
+ 'xenapi_connection_username (optionally), and '
+ 'xenapi_connection_password to use '
+ 'connection_type=xenapi')
+ return XenAPIConnection(url, username, password)
+
+
+class XenAPIConnection(object):
+ """ A connection to XenServer or Xen Cloud Platform """
+ def __init__(self, url, user, pw):
+ session = XenAPISession(url, user, pw)
+ self._vmops = VMOps(session)
+ self._volumeops = VolumeOps(session)
+
+ def list_instances(self):
+ """ List VM instances """
+ return self._vmops.list_instances()
+
+ def spawn(self, instance):
+ """ Create VM instance """
+ self._vmops.spawn(instance)
+
+ def reboot(self, instance):
+ """ Reboot VM instance """
+ self._vmops.reboot(instance)
+
+ def destroy(self, instance):
+ """ Destroy VM instance """
+ self._vmops.destroy(instance)
+
+ def get_info(self, instance_id):
+ """ Return data about VM instance """
+ return self._vmops.get_info(instance_id)
+
+ def get_diagnostics(self, instance_id):
+ """Return data about VM diagnostics"""
+ return self._vmops.get_diagnostics(instance_id)
+
+ def get_console_output(self, instance):
+ """ Return snapshot of console """
+ return self._vmops.get_console_output(instance)
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ """ Attach volume storage to VM instance """
+ return self._volumeops.attach_volume(instance_name,
+ device_path,
+ mountpoint)
+
+ def detach_volume(self, instance_name, mountpoint):
+ """ Detach volume storage to VM instance """
+ return self._volumeops.detach_volume(instance_name, mountpoint)
+
+
+class XenAPISession(object):
+ """ The session to invoke XenAPI SDK calls """
+ def __init__(self, url, user, pw):
+ self._session = XenAPI.Session(url)
+ self._session.login_with_password(user, pw)
+
+ def get_xenapi(self):
+ """ Return the xenapi object """
+ return self._session.xenapi
+
+ def get_xenapi_host(self):
+ """ Return the xenapi host """
+ return self._session.xenapi.session.get_this_host(self._session.handle)
+
+ def call_xenapi(self, method, *args):
+ """Call the specified XenAPI method on a background thread."""
+ f = self._session.xenapi
+ for m in method.split('.'):
+ f = f.__getattr__(m)
+ return tpool.execute(f, *args)
+
+ def async_call_plugin(self, plugin, fn, args):
+ """Call Async.host.call_plugin on a background thread."""
+ return tpool.execute(_unwrap_plugin_exceptions,
+ self._session.xenapi.Async.host.call_plugin,
+ self.get_xenapi_host(), plugin, fn, args)
+
+ def wait_for_task(self, task):
+ """Return a Deferred that will give the result of the given task.
+ The task is polled until it completes."""
+
+ done = event.Event()
+ loop = utils.LoopingCall(self._poll_task, task, done)
+ loop.start(FLAGS.xenapi_task_poll_interval, now=True)
+ rv = done.wait()
+ loop.stop()
+ return rv
+
+ def _poll_task(self, task, done):
+ """Poll the given XenAPI task, and fire the given Deferred if we
+ get a result."""
+ try:
+ #logging.debug('Polling task %s...', task)
+ status = self._session.xenapi.task.get_status(task)
+ if status == 'pending':
+ return
+ elif status == 'success':
+ result = self._session.xenapi.task.get_result(task)
+ logging.info('Task %s status: success. %s', task, result)
+ done.send(_parse_xmlrpc_value(result))
+ else:
+ error_info = self._session.xenapi.task.get_error_info(task)
+ logging.warn('Task %s status: %s. %s', task, status,
+ error_info)
+ done.send_exception(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ done.send_exception(*sys.exc_info())
+
+
+def _unwrap_plugin_exceptions(func, *args, **kwargs):
+ """ Parse exception details """
+ try:
+ return func(*args, **kwargs)
+ except XenAPI.Failure, exc:
+ logging.debug("Got exception: %s", exc)
+ if (len(exc.details) == 4 and
+ exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
+ exc.details[2] == 'Failure'):
+ params = None
+ try:
+ params = eval(exc.details[3])
+ except:
+ raise exc
+ raise XenAPI.Failure(params)
+ else:
+ raise
+ except xmlrpclib.ProtocolError, exc:
+ logging.debug("Got exception: %s", exc)
+ raise
+
+
+def _parse_xmlrpc_value(val):
+ """Parse the given value as if it were an XML-RPC value. This is
+ sometimes used as the format for the task.result field."""
+ if not val:
+ return val
+ x = xmlrpclib.loads(
+ '<?xml version="1.0"?><methodResponse><params><param>' +
+ val +
+ '</param></params></methodResponse>')
+ return x[0][0]
diff --git a/nova/volume/driver.py b/nova/volume/driver.py
index 156aad2a0..1cd4c1fd4 100644
--- a/nova/volume/driver.py
+++ b/nova/volume/driver.py
@@ -22,12 +22,10 @@ Drivers for volumes.
import logging
import os
-
-from twisted.internet import defer
+import time
from nova import exception
from nova import flags
-from nova import process
from nova import utils
@@ -55,14 +53,13 @@ flags.DEFINE_string('iscsi_ip_prefix', '127.0',
class VolumeDriver(object):
"""Executes commands relating to Volumes."""
- def __init__(self, execute=process.simple_execute,
+ def __init__(self, execute=utils.execute,
sync_exec=utils.execute, *args, **kwargs):
# NOTE(vish): db is set by Manager
self.db = None
self._execute = execute
self._sync_exec = sync_exec
- @defer.inlineCallbacks
def _try_execute(self, command):
# NOTE(vish): Volume commands can partially fail due to timing, but
# running them a second time on failure will usually
@@ -70,15 +67,15 @@ class VolumeDriver(object):
tries = 0
while True:
try:
- yield self._execute(command)
- defer.returnValue(True)
+ self._execute(command)
+ return True
except exception.ProcessExecutionError:
tries = tries + 1
if tries >= FLAGS.num_shell_tries:
raise
logging.exception("Recovering from a failed execute."
"Try number %s", tries)
- yield self._execute("sleep %s" % tries ** 2)
+ time.sleep(tries ** 2)
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met"""
@@ -86,53 +83,45 @@ class VolumeDriver(object):
raise exception.Error("volume group %s doesn't exist"
% FLAGS.volume_group)
- @defer.inlineCallbacks
def create_volume(self, volume):
"""Creates a logical volume."""
if int(volume['size']) == 0:
sizestr = '100M'
else:
sizestr = '%sG' % volume['size']
- yield self._try_execute("sudo lvcreate -L %s -n %s %s" %
- (sizestr,
- volume['name'],
- FLAGS.volume_group))
+ self._try_execute("sudo lvcreate -L %s -n %s %s" %
+ (sizestr,
+ volume['name'],
+ FLAGS.volume_group))
- @defer.inlineCallbacks
def delete_volume(self, volume):
"""Deletes a logical volume."""
- yield self._try_execute("sudo lvremove -f %s/%s" %
- (FLAGS.volume_group,
- volume['name']))
+ self._try_execute("sudo lvremove -f %s/%s" %
+ (FLAGS.volume_group,
+ volume['name']))
- @defer.inlineCallbacks
def local_path(self, volume):
- yield # NOTE(vish): stops deprecation warning
+ # NOTE(vish): stops deprecation warning
escaped_group = FLAGS.volume_group.replace('-', '--')
escaped_name = volume['name'].replace('-', '--')
- defer.returnValue("/dev/mapper/%s-%s" % (escaped_group,
- escaped_name))
+ return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
def ensure_export(self, context, volume):
"""Synchronously recreates an export for a logical volume."""
raise NotImplementedError()
- @defer.inlineCallbacks
def create_export(self, context, volume):
"""Exports the volume."""
raise NotImplementedError()
- @defer.inlineCallbacks
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
raise NotImplementedError()
- @defer.inlineCallbacks
def discover_volume(self, volume):
"""Discover volume on a remote host."""
raise NotImplementedError()
- @defer.inlineCallbacks
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
raise NotImplementedError()
@@ -155,14 +144,13 @@ class AOEDriver(VolumeDriver):
dev = {'shelf_id': shelf_id, 'blade_id': blade_id}
self.db.export_device_create_safe(context, dev)
- @defer.inlineCallbacks
def create_export(self, context, volume):
"""Creates an export for a logical volume."""
self._ensure_blades(context)
(shelf_id,
blade_id) = self.db.volume_allocate_shelf_and_blade(context,
volume['id'])
- yield self._try_execute(
+ self._try_execute(
"sudo vblade-persist setup %s %s %s /dev/%s/%s" %
(shelf_id,
blade_id,
@@ -176,33 +164,30 @@ class AOEDriver(VolumeDriver):
# still works for the other volumes, so we
# just wait a bit for the current volume to
# be ready and ignore any errors.
- yield self._execute("sleep 2")
- yield self._execute("sudo vblade-persist auto all",
- check_exit_code=False)
- yield self._execute("sudo vblade-persist start all",
- check_exit_code=False)
+ time.sleep(2)
+ self._execute("sudo vblade-persist auto all",
+ check_exit_code=False)
+ self._execute("sudo vblade-persist start all",
+ check_exit_code=False)
- @defer.inlineCallbacks
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
volume['id'])
- yield self._try_execute("sudo vblade-persist stop %s %s" %
- (shelf_id, blade_id))
- yield self._try_execute("sudo vblade-persist destroy %s %s" %
- (shelf_id, blade_id))
+ self._try_execute("sudo vblade-persist stop %s %s" %
+ (shelf_id, blade_id))
+ self._try_execute("sudo vblade-persist destroy %s %s" %
+ (shelf_id, blade_id))
- @defer.inlineCallbacks
def discover_volume(self, _volume):
"""Discover volume on a remote host."""
- yield self._execute("sudo aoe-discover")
- yield self._execute("sudo aoe-stat", check_exit_code=False)
+ self._execute("sudo aoe-discover")
+ self._execute("sudo aoe-stat", check_exit_code=False)
- @defer.inlineCallbacks
def undiscover_volume(self, _volume):
"""Undiscover volume on a remote host."""
- yield
+ pass
class FakeAOEDriver(AOEDriver):
@@ -252,7 +237,6 @@ class ISCSIDriver(VolumeDriver):
target = {'host': host, 'target_num': target_num}
self.db.iscsi_target_create_safe(context, target)
- @defer.inlineCallbacks
def create_export(self, context, volume):
"""Creates an export for a logical volume."""
self._ensure_iscsi_targets(context, volume['host'])
@@ -261,61 +245,55 @@ class ISCSIDriver(VolumeDriver):
volume['host'])
iscsi_name = "%s%s" % (FLAGS.iscsi_target_prefix, volume['name'])
volume_path = "/dev/%s/%s" % (FLAGS.volume_group, volume['name'])
- yield self._execute("sudo ietadm --op new "
- "--tid=%s --params Name=%s" %
- (iscsi_target, iscsi_name))
- yield self._execute("sudo ietadm --op new --tid=%s "
- "--lun=0 --params Path=%s,Type=fileio" %
- (iscsi_target, volume_path))
-
- @defer.inlineCallbacks
+ self._execute("sudo ietadm --op new "
+ "--tid=%s --params Name=%s" %
+ (iscsi_target, iscsi_name))
+ self._execute("sudo ietadm --op new --tid=%s "
+ "--lun=0 --params Path=%s,Type=fileio" %
+ (iscsi_target, volume_path))
+
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
iscsi_target = self.db.volume_get_iscsi_target_num(context,
volume['id'])
- yield self._execute("sudo ietadm --op delete --tid=%s "
- "--lun=0" % iscsi_target)
- yield self._execute("sudo ietadm --op delete --tid=%s" %
- iscsi_target)
+ self._execute("sudo ietadm --op delete --tid=%s "
+ "--lun=0" % iscsi_target)
+ self._execute("sudo ietadm --op delete --tid=%s" %
+ iscsi_target)
- @defer.inlineCallbacks
def _get_name_and_portal(self, volume_name, host):
"""Gets iscsi name and portal from volume name and host."""
- (out, _err) = yield self._execute("sudo iscsiadm -m discovery -t "
- "sendtargets -p %s" % host)
+ (out, _err) = self._execute("sudo iscsiadm -m discovery -t "
+ "sendtargets -p %s" % host)
for target in out.splitlines():
if FLAGS.iscsi_ip_prefix in target and volume_name in target:
(location, _sep, iscsi_name) = target.partition(" ")
break
iscsi_portal = location.split(",")[0]
- defer.returnValue((iscsi_name, iscsi_portal))
+ return (iscsi_name, iscsi_portal)
- @defer.inlineCallbacks
def discover_volume(self, volume):
"""Discover volume on a remote host."""
- (iscsi_name,
- iscsi_portal) = yield self._get_name_and_portal(volume['name'],
- volume['host'])
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --login" %
- (iscsi_name, iscsi_portal))
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
- "-n node.startup -v automatic" %
- (iscsi_name, iscsi_portal))
- defer.returnValue("/dev/iscsi/%s" % volume['name'])
-
- @defer.inlineCallbacks
+ iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'],
+ volume['host'])
+ self._execute("sudo iscsiadm -m node -T %s -p %s --login" %
+ (iscsi_name, iscsi_portal))
+ self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
+ "-n node.startup -v automatic" %
+ (iscsi_name, iscsi_portal))
+ return "/dev/iscsi/%s" % volume['name']
+
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
- (iscsi_name,
- iscsi_portal) = yield self._get_name_and_portal(volume['name'],
- volume['host'])
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
- "-n node.startup -v manual" %
- (iscsi_name, iscsi_portal))
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --logout " %
- (iscsi_name, iscsi_portal))
- yield self._execute("sudo iscsiadm -m node --op delete "
- "--targetname %s" % iscsi_name)
+ iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'],
+ volume['host'])
+ self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
+ "-n node.startup -v manual" %
+ (iscsi_name, iscsi_portal))
+ self._execute("sudo iscsiadm -m node -T %s -p %s --logout " %
+ (iscsi_name, iscsi_portal))
+ self._execute("sudo iscsiadm -m node --op delete "
+ "--targetname %s" % iscsi_name)
class FakeISCSIDriver(ISCSIDriver):
diff --git a/nova/volume/manager.py b/nova/volume/manager.py
index 589e7d7d9..7da125cac 100644
--- a/nova/volume/manager.py
+++ b/nova/volume/manager.py
@@ -45,7 +45,6 @@ intact.
import logging
import datetime
-from twisted.internet import defer
from nova import context
from nova import exception
@@ -86,7 +85,6 @@ class VolumeManager(manager.Manager):
for volume in volumes:
self.driver.ensure_export(ctxt, volume)
- @defer.inlineCallbacks
def create_volume(self, context, volume_id):
"""Creates and exports the volume."""
context = context.elevated()
@@ -102,19 +100,18 @@ class VolumeManager(manager.Manager):
logging.debug("volume %s: creating lv of size %sG",
volume_ref['name'], volume_ref['size'])
- yield self.driver.create_volume(volume_ref)
+ self.driver.create_volume(volume_ref)
logging.debug("volume %s: creating export", volume_ref['name'])
- yield self.driver.create_export(context, volume_ref)
+ self.driver.create_export(context, volume_ref)
now = datetime.datetime.utcnow()
self.db.volume_update(context,
volume_ref['id'], {'status': 'available',
'launched_at': now})
logging.debug("volume %s: created successfully", volume_ref['name'])
- defer.returnValue(volume_id)
+ return volume_id
- @defer.inlineCallbacks
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
context = context.elevated()
@@ -124,14 +121,13 @@ class VolumeManager(manager.Manager):
if volume_ref['host'] != self.host:
raise exception.Error("Volume is not local to this node")
logging.debug("volume %s: removing export", volume_ref['name'])
- yield self.driver.remove_export(context, volume_ref)
+ self.driver.remove_export(context, volume_ref)
logging.debug("volume %s: deleting", volume_ref['name'])
- yield self.driver.delete_volume(volume_ref)
+ self.driver.delete_volume(volume_ref)
self.db.volume_destroy(context, volume_id)
logging.debug("volume %s: deleted successfully", volume_ref['name'])
- defer.returnValue(True)
+ return True
- @defer.inlineCallbacks
def setup_compute_volume(self, context, volume_id):
"""Setup remote volume on compute host.
@@ -139,17 +135,16 @@ class VolumeManager(manager.Manager):
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
- path = yield self.driver.local_path(volume_ref)
+ path = self.driver.local_path(volume_ref)
else:
- path = yield self.driver.discover_volume(volume_ref)
- defer.returnValue(path)
+ path = self.driver.discover_volume(volume_ref)
+ return path
- @defer.inlineCallbacks
def remove_compute_volume(self, context, volume_id):
"""Remove remote volume on compute host."""
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
- defer.returnValue(True)
+ return True
else:
- yield self.driver.undiscover_volume(volume_ref)
+ self.driver.undiscover_volume(volume_ref)