summaryrefslogtreecommitdiffstats
path: root/nova
diff options
context:
space:
mode:
authorSoren Hansen <soren.hansen@rackspace.com>2010-12-15 11:54:00 +0100
committerSoren Hansen <soren.hansen@rackspace.com>2010-12-15 11:54:00 +0100
commitbb0adb5f3309e9502f1afc34224d7194485f0124 (patch)
tree549d7383e62ad2fdbd7c0222f44aa8d3c25b48bb /nova
parentbf34529e75022451f3833552df0e807139d0e498 (diff)
parent0a93a9298dda075b5519e71289d0bac6fb461404 (diff)
Merge trunk
Diffstat (limited to 'nova')
-rw-r--r--nova/api/cloud.py58
-rw-r--r--nova/api/ec2/cloud.py102
-rw-r--r--nova/api/openstack/__init__.py17
-rw-r--r--nova/api/openstack/auth.py4
-rw-r--r--nova/api/openstack/images.py9
-rw-r--r--nova/api/openstack/servers.py103
-rw-r--r--nova/auth/ldapdriver.py138
-rw-r--r--nova/auth/manager.py4
-rwxr-xr-xnova/auth/opendj.sh119
-rw-r--r--nova/compute/api.py209
-rw-r--r--nova/compute/instance_types.py4
-rw-r--r--nova/db/sqlalchemy/api.py1
-rw-r--r--nova/flags.py2
-rw-r--r--nova/objectstore/image.py9
-rw-r--r--nova/tests/__init__.py5
-rw-r--r--nova/tests/api/openstack/fakes.py16
-rw-r--r--nova/tests/api/openstack/test_adminapi.py61
-rw-r--r--nova/tests/api/openstack/test_auth.py3
-rw-r--r--nova/tests/api/openstack/test_servers.py21
-rw-r--r--nova/tests/cloud_unittest.py13
-rw-r--r--nova/tests/compute_unittest.py24
-rw-r--r--nova/virt/connection.py4
-rw-r--r--nova/virt/xenapi.py442
-rw-r--r--nova/virt/xenapi/__init__.py15
-rw-r--r--nova/virt/xenapi/network_utils.py45
-rw-r--r--nova/virt/xenapi/vm_utils.py268
-rw-r--r--nova/virt/xenapi/vmops.py146
-rw-r--r--nova/virt/xenapi/volumeops.py32
-rw-r--r--nova/virt/xenapi_conn.py242
29 files changed, 1328 insertions, 788 deletions
diff --git a/nova/api/cloud.py b/nova/api/cloud.py
deleted file mode 100644
index b8f15019f..000000000
--- a/nova/api/cloud.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Methods for API calls to control instances via AMQP.
-"""
-
-
-from nova import db
-from nova import flags
-from nova import rpc
-
-FLAGS = flags.FLAGS
-
-
-def reboot(instance_id, context=None):
- """Reboot the given instance."""
- instance_ref = db.instance_get_by_internal_id(context, instance_id)
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "reboot_instance",
- "args": {"instance_id": instance_ref['id']}})
-
-
-def rescue(instance_id, context):
- """Rescue the given instance."""
- instance_ref = db.instance_get_by_internal_id(context, instance_id)
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "rescue_instance",
- "args": {"instance_id": instance_ref['id']}})
-
-
-def unrescue(instance_id, context):
- """Unrescue the given instance."""
- instance_ref = db.instance_get_by_internal_id(context, instance_id)
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "unrescue_instance",
- "args": {"instance_id": instance_ref['id']}})
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index e50906ae1..ebb13aedc 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -41,8 +41,6 @@ from nova import rpc
from nova import utils
from nova.compute import api as compute_api
from nova.compute import instance_types
-from nova.api import cloud
-from nova.image.s3 import S3ImageService
FLAGS = flags.FLAGS
@@ -95,8 +93,9 @@ class CloudController(object):
"""
def __init__(self):
self.network_manager = utils.import_object(FLAGS.network_manager)
- self.compute_api = compute_api.ComputeAPI()
- self.image_service = S3ImageService()
+ self.image_service = utils.import_object(FLAGS.image_service)
+ self.compute_api = compute_api.ComputeAPI(self.network_manager,
+ self.image_service)
self.setup()
def __str__(self):
@@ -120,7 +119,7 @@ class CloudController(object):
def _get_mpi_data(self, context, project_id):
result = {}
- for instance in db.instance_get_all_by_project(context, project_id):
+ for instance in self.compute_api.get_instances(context, project_id):
if instance['fixed_ip']:
line = '%s slots=%d' % (instance['fixed_ip']['address'],
instance['vcpus'])
@@ -439,7 +438,7 @@ class CloudController(object):
# instance_id is passed in as a list of instances
ec2_id = instance_id[0]
internal_id = ec2_id_to_internal_id(ec2_id)
- instance_ref = db.instance_get_by_internal_id(context, internal_id)
+ instance_ref = self.compute_api.get_instance(context, internal_id)
output = rpc.call(context,
'%s.%s' % (FLAGS.compute_topic,
instance_ref['host']),
@@ -451,13 +450,15 @@ class CloudController(object):
"Timestamp": now,
"output": base64.b64encode(output)}
- def describe_volumes(self, context, **kwargs):
+ def describe_volumes(self, context, volume_id=None, **kwargs):
if context.user.is_admin():
volumes = db.volume_get_all(context)
else:
volumes = db.volume_get_all_by_project(context, context.project_id)
- volumes = [self._format_volume(context, v) for v in volumes]
+ # NOTE(vish): volume_id is an optional list of volume ids to filter by.
+ volumes = [self._format_volume(context, v) for v in volumes
+ if volume_id is None or v['ec2_id'] in volume_id]
return {'volumeSet': volumes}
@@ -536,7 +537,7 @@ class CloudController(object):
if volume_ref['attach_status'] == "attached":
raise exception.ApiError("Volume is already attached")
internal_id = ec2_id_to_internal_id(instance_id)
- instance_ref = db.instance_get_by_internal_id(context, internal_id)
+ instance_ref = self.compute_api.get_instance(context, internal_id)
host = instance_ref['host']
rpc.cast(context,
db.queue_get_for(context, FLAGS.compute_topic, host),
@@ -614,11 +615,7 @@ class CloudController(object):
instances = db.instance_get_all_by_reservation(context,
reservation_id)
else:
- if context.user.is_admin():
- instances = db.instance_get_all(context)
- else:
- instances = db.instance_get_all_by_project(context,
- context.project_id)
+ instances = self.compute_api.get_instances(context)
for instance in instances:
if not context.user.is_admin():
if instance['image_id'] == FLAGS.vpn_image_id:
@@ -715,7 +712,7 @@ class CloudController(object):
def associate_address(self, context, instance_id, public_ip, **kwargs):
internal_id = ec2_id_to_internal_id(instance_id)
- instance_ref = db.instance_get_by_internal_id(context, internal_id)
+ instance_ref = self.compute_api.get_instance(context, internal_id)
fixed_address = db.instance_get_fixed_address(context,
instance_ref['id'])
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
@@ -751,16 +748,13 @@ class CloudController(object):
max_count = int(kwargs.get('max_count', 1))
instances = self.compute_api.create_instances(context,
instance_types.get_by_type(kwargs.get('instance_type', None)),
- self.image_service,
kwargs['image_id'],
- self._get_network_topic(context),
min_count=int(kwargs.get('min_count', max_count)),
max_count=max_count,
kernel_id=kwargs.get('kernel_id'),
ramdisk_id=kwargs.get('ramdisk_id'),
- name=kwargs.get('display_name'),
+ display_name=kwargs.get('display_name'),
description=kwargs.get('display_description'),
- user_data=kwargs.get('user_data', ''),
key_name=kwargs.get('key_name'),
security_group=kwargs.get('security_group'),
generate_hostname=internal_id_to_ec2_id)
@@ -769,84 +763,30 @@ class CloudController(object):
def terminate_instances(self, context, instance_id, **kwargs):
"""Terminate each instance in instance_id, which is a list of ec2 ids.
-
- instance_id is a kwarg so its name cannot be modified.
- """
- ec2_id_list = instance_id
+ instance_id is a kwarg so its name cannot be modified."""
logging.debug("Going to start terminating instances")
- for id_str in ec2_id_list:
- internal_id = ec2_id_to_internal_id(id_str)
- logging.debug("Going to try and terminate %s" % id_str)
- try:
- instance_ref = db.instance_get_by_internal_id(context,
- internal_id)
- except exception.NotFound:
- logging.warning("Instance %s was not found during terminate",
- id_str)
- continue
-
- if (instance_ref['state_description'] == 'terminating'):
- logging.warning("Instance %s is already being terminated",
- id_str)
- continue
- now = datetime.datetime.utcnow()
- self.compute_api.update_instance(context,
- instance_ref['id'],
- state_description='terminating',
- state=0,
- terminated_at=now)
-
- # FIXME(ja): where should network deallocate occur?
- address = db.instance_get_floating_address(context,
- instance_ref['id'])
- if address:
- logging.debug("Disassociating address %s" % address)
- # NOTE(vish): Right now we don't really care if the ip is
- # disassociated. We may need to worry about
- # checking this later. Perhaps in the scheduler?
- network_topic = self._get_network_topic(context)
- rpc.cast(context,
- network_topic,
- {"method": "disassociate_floating_ip",
- "args": {"floating_address": address}})
-
- address = db.instance_get_fixed_address(context,
- instance_ref['id'])
- if address:
- logging.debug("Deallocating address %s" % address)
- # NOTE(vish): Currently, nothing needs to be done on the
- # network node until release. If this changes,
- # we will need to cast here.
- self.network_manager.deallocate_fixed_ip(context.elevated(),
- address)
-
- host = instance_ref['host']
- if host:
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "terminate_instance",
- "args": {"instance_id": instance_ref['id']}})
- else:
- db.instance_destroy(context, instance_ref['id'])
+ for ec2_id in instance_id:
+ internal_id = ec2_id_to_internal_id(ec2_id)
+ self.compute_api.delete_instance(context, internal_id)
return True
def reboot_instances(self, context, instance_id, **kwargs):
"""instance_id is a list of instance ids"""
for ec2_id in instance_id:
internal_id = ec2_id_to_internal_id(ec2_id)
- cloud.reboot(internal_id, context=context)
+ self.compute_api.reboot(context, internal_id)
return True
def rescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
internal_id = ec2_id_to_internal_id(instance_id)
- cloud.rescue(internal_id, context=context)
+ self.compute_api.rescue(context, internal_id)
return True
def unrescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
internal_id = ec2_id_to_internal_id(instance_id)
- cloud.unrescue(internal_id, context=context)
+ self.compute_api.unrescue(context, internal_id)
return True
def update_instance(self, context, ec2_id, **kwargs):
@@ -857,7 +797,7 @@ class CloudController(object):
changes[field] = kwargs[field]
if changes:
internal_id = ec2_id_to_internal_id(ec2_id)
- inst = db.instance_get_by_internal_id(context, internal_id)
+ inst = self.compute_api.get_instance(context, internal_id)
db.instance_update(context, inst['id'], kwargs)
return True
diff --git a/nova/api/openstack/__init__.py b/nova/api/openstack/__init__.py
index 4ca108c4e..b9ecbd9b8 100644
--- a/nova/api/openstack/__init__.py
+++ b/nova/api/openstack/__init__.py
@@ -30,6 +30,7 @@ import webob.dec
import webob.exc
import webob
+from nova import context
from nova import flags
from nova import utils
from nova import wsgi
@@ -48,6 +49,10 @@ flags.DEFINE_string('nova_api_auth',
'nova.api.openstack.auth.BasicApiAuthManager',
'The auth mechanism to use for the OpenStack API implemenation')
+flags.DEFINE_bool('allow_admin_api',
+ False,
+ 'When True, this API service will accept admin operations.')
+
class API(wsgi.Middleware):
"""WSGI entry point for all OpenStack API requests."""
@@ -84,9 +89,7 @@ class AuthMiddleware(wsgi.Middleware):
if not user:
return faults.Fault(webob.exc.HTTPUnauthorized())
- if 'nova.context' not in req.environ:
- req.environ['nova.context'] = {}
- req.environ['nova.context']['user'] = user
+ req.environ['nova.context'] = context.RequestContext(user, user)
return self.application
@@ -121,12 +124,12 @@ class RateLimitingMiddleware(wsgi.Middleware):
If the request should be rate limited, return a 413 status with a
Retry-After header giving the time when the request would succeed.
"""
- user_id = req.environ['nova.context']['user']['id']
action_name = self.get_action_name(req)
if not action_name:
# Not rate limited
return self.application
- delay = self.get_delay(action_name, user_id)
+ delay = self.get_delay(action_name,
+ req.environ['nova.context'].user_id)
if delay:
# TODO(gundlach): Get the retry-after format correct.
exc = webob.exc.HTTPRequestEntityTooLarge(
@@ -183,6 +186,10 @@ class APIRouter(wsgi.Router):
mapper.resource("sharedipgroup", "sharedipgroups",
controller=sharedipgroups.Controller())
+ if FLAGS.allow_admin_api:
+ logging.debug("Including admin operations in API.")
+ # TODO: Place routes for admin operations here.
+
super(APIRouter, self).__init__(mapper)
diff --git a/nova/api/openstack/auth.py b/nova/api/openstack/auth.py
index 205035915..fcda97ab1 100644
--- a/nova/api/openstack/auth.py
+++ b/nova/api/openstack/auth.py
@@ -74,9 +74,7 @@ class BasicApiAuthManager(object):
if delta.days >= 2:
self.db.auth_destroy_token(self.context, token)
else:
- #TODO(gundlach): Why not just return dict(id=token.user_id)?
- user = self.auth.get_user(token.user_id)
- return {'id': user.id}
+ return self.auth.get_user(token.user_id)
return None
def _authorize_user(self, username, key, req):
diff --git a/nova/api/openstack/images.py b/nova/api/openstack/images.py
index cdbdc9bdd..4a0a8e6f1 100644
--- a/nova/api/openstack/images.py
+++ b/nova/api/openstack/images.py
@@ -17,7 +17,6 @@
from webob import exc
-from nova import context
from nova import flags
from nova import utils
from nova import wsgi
@@ -47,10 +46,8 @@ class Controller(wsgi.Controller):
def detail(self, req):
"""Return all public images in detail."""
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
try:
- images = self._service.detail(ctxt)
+ images = self._service.detail(req.environ['nova.context'])
images = nova.api.openstack.limited(images, req)
except NotImplementedError:
# Emulate detail() using repeated calls to show()
@@ -61,9 +58,7 @@ class Controller(wsgi.Controller):
def show(self, req, id):
"""Return data about the given image id."""
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- return dict(image=self._service.show(ctxt, id))
+ return dict(image=self._service.show(req.environ['nova.context'], id))
def delete(self, req, id):
# Only public images are supported for now.
diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py
index e7f765c02..7704f48f1 100644
--- a/nova/api/openstack/servers.py
+++ b/nova/api/openstack/servers.py
@@ -15,23 +15,16 @@
# License for the specific language governing permissions and limitations
# under the License.
-import webob
from webob import exc
-from nova import flags
-from nova import rpc
-from nova import utils
+from nova import exception
from nova import wsgi
-from nova import context
-from nova.api import cloud
from nova.api.openstack import faults
+from nova.auth import manager as auth_manager
from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import power_state
import nova.api.openstack
-import nova.image.service
-
-FLAGS = flags.FLAGS
def _entity_list(entities):
@@ -79,11 +72,7 @@ class Controller(wsgi.Controller):
"server": ["id", "imageId", "name", "flavorId", "hostId",
"status", "progress"]}}}
- def __init__(self, db_driver=None):
- if not db_driver:
- db_driver = FLAGS.db_driver
- self.db_driver = utils.import_object(db_driver)
- self.network_manager = utils.import_object(FLAGS.network_manager)
+ def __init__(self):
self.compute_api = compute_api.ComputeAPI()
super(Controller, self).__init__()
@@ -100,32 +89,29 @@ class Controller(wsgi.Controller):
entity_maker - either _entity_detail or _entity_inst
"""
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- instance_list = self.db_driver.instance_get_all_by_user(ctxt, user_id)
+ instance_list = self.compute_api.get_instances(
+ req.environ['nova.context'])
limited_list = nova.api.openstack.limited(instance_list, req)
res = [entity_maker(inst)['server'] for inst in limited_list]
return _entity_list(res)
def show(self, req, id):
""" Returns server details by server id """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- inst = self.db_driver.instance_get_by_internal_id(ctxt, int(id))
- if inst:
- if inst.user_id == user_id:
- return _entity_detail(inst)
- raise faults.Fault(exc.HTTPNotFound())
+ try:
+ instance = self.compute_api.get_instance(
+ req.environ['nova.context'], int(id))
+ return _entity_detail(instance)
+ except exception.NotFound:
+ return faults.Fault(exc.HTTPNotFound())
def delete(self, req, id):
""" Destroys a server """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- instance = self.db_driver.instance_get_by_internal_id(ctxt, int(id))
- if instance and instance['user_id'] == user_id:
- self.db_driver.instance_destroy(ctxt, id)
- return faults.Fault(exc.HTTPAccepted())
- return faults.Fault(exc.HTTPNotFound())
+ try:
+ self.compute_api.delete_instance(req.environ['nova.context'],
+ int(id))
+ except exception.NotFound:
+ return faults.Fault(exc.HTTPNotFound())
+ return exc.HTTPAccepted()
def create(self, req):
""" Creates a new server for a given user """
@@ -133,15 +119,13 @@ class Controller(wsgi.Controller):
if not env:
return faults.Fault(exc.HTTPUnprocessableEntity())
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
- key_pair = self.db_driver.key_pair_get_all_by_user(None, user_id)[0]
- instances = self.compute_api.create_instances(ctxt,
+ key_pair = auth_manager.AuthManager.get_key_pairs(
+ req.environ['nova.context'])[0]
+ instances = self.compute_api.create_instances(
+ req.environ['nova.context'],
instance_types.get_by_flavor_id(env['server']['flavorId']),
- utils.import_object(FLAGS.image_service),
env['server']['imageId'],
- self._get_network_topic(ctxt),
- name=env['server']['name'],
+ display_name=env['server']['name'],
description=env['server']['name'],
key_name=key_pair['name'],
key_data=key_pair['public_key'])
@@ -149,51 +133,36 @@ class Controller(wsgi.Controller):
def update(self, req, id):
""" Updates the server name or password """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
-
inst_dict = self._deserialize(req.body, req)
-
if not inst_dict:
return faults.Fault(exc.HTTPUnprocessableEntity())
- instance = self.db_driver.instance_get_by_internal_id(ctxt, int(id))
- if not instance or instance.user_id != user_id:
- return faults.Fault(exc.HTTPNotFound())
-
update_dict = {}
if 'adminPass' in inst_dict['server']:
update_dict['admin_pass'] = inst_dict['server']['adminPass']
if 'name' in inst_dict['server']:
update_dict['display_name'] = inst_dict['server']['name']
- self.compute_api.update_instance(ctxt, instance['id'], update_dict)
+ try:
+ self.compute_api.update_instance(req.environ['nova.context'],
+ instance['id'],
+ **update_dict)
+ except exception.NotFound:
+ return faults.Fault(exc.HTTPNotFound())
return exc.HTTPNoContent()
def action(self, req, id):
- """ multi-purpose method used to reboot, rebuild, and
+ """ Multi-purpose method used to reboot, rebuild, and
resize a server """
- user_id = req.environ['nova.context']['user']['id']
- ctxt = context.RequestContext(user_id, user_id)
input_dict = self._deserialize(req.body, req)
try:
reboot_type = input_dict['reboot']['type']
except Exception:
- raise faults.Fault(webob.exc.HTTPNotImplemented())
- inst_ref = self.db.instance_get_by_internal_id(ctxt, int(id))
- if not inst_ref or (inst_ref and not inst_ref.user_id == user_id):
+ raise faults.Fault(exc.HTTPNotImplemented())
+ try:
+ # TODO(gundlach): pass reboot_type, support soft reboot in
+ # virt driver
+ self.compute_api.reboot(req.environ['nova.context'], id)
+ except:
return faults.Fault(exc.HTTPUnprocessableEntity())
- #TODO(gundlach): pass reboot_type, support soft reboot in
- #virt driver
- cloud.reboot(id)
-
- def _get_network_topic(self, context):
- """Retrieves the network host for a project"""
- network_ref = self.network_manager.get_network(context)
- host = network_ref['host']
- if not host:
- host = rpc.call(context,
- FLAGS.network_topic,
- {"method": "set_network_host",
- "args": {"network_id": network_ref['id']}})
- return self.db_driver.queue_get_for(context, FLAGS.network_topic, host)
+ return exc.HTTPAccepted()
diff --git a/nova/auth/ldapdriver.py b/nova/auth/ldapdriver.py
index ceade1d65..c10939d74 100644
--- a/nova/auth/ldapdriver.py
+++ b/nova/auth/ldapdriver.py
@@ -40,6 +40,8 @@ flags.DEFINE_string('ldap_user_dn', 'cn=Manager,dc=example,dc=com',
flags.DEFINE_string('ldap_user_unit', 'Users', 'OID for Users')
flags.DEFINE_string('ldap_user_subtree', 'ou=Users,dc=example,dc=com',
'OU for Users')
+flags.DEFINE_boolean('ldap_user_modify_only', False,
+ 'Modify attributes for users instead of creating/deleting')
flags.DEFINE_string('ldap_project_subtree', 'ou=Groups,dc=example,dc=com',
'OU for Projects')
flags.DEFINE_string('role_project_subtree', 'ou=Groups,dc=example,dc=com',
@@ -89,8 +91,7 @@ class LdapDriver(object):
def get_user(self, uid):
"""Retrieve user by id"""
- attr = self.__find_object(self.__uid_to_dn(uid),
- '(objectclass=novaUser)')
+ attr = self.__get_ldap_user(uid)
return self.__to_user(attr)
def get_user_from_access_key(self, access):
@@ -110,7 +111,12 @@ class LdapDriver(object):
"""Retrieve list of users"""
attrs = self.__find_objects(FLAGS.ldap_user_subtree,
'(objectclass=novaUser)')
- return [self.__to_user(attr) for attr in attrs]
+ users = []
+ for attr in attrs:
+ user = self.__to_user(attr)
+ if user is not None:
+ users.append(user)
+ return users
def get_projects(self, uid=None):
"""Retrieve list of projects"""
@@ -125,21 +131,52 @@ class LdapDriver(object):
"""Create a user"""
if self.__user_exists(name):
raise exception.Duplicate("LDAP user %s already exists" % name)
- attr = [
- ('objectclass', ['person',
- 'organizationalPerson',
- 'inetOrgPerson',
- 'novaUser']),
- ('ou', [FLAGS.ldap_user_unit]),
- ('uid', [name]),
- ('sn', [name]),
- ('cn', [name]),
- ('secretKey', [secret_key]),
- ('accessKey', [access_key]),
- ('isAdmin', [str(is_admin).upper()]),
- ]
- self.conn.add_s(self.__uid_to_dn(name), attr)
- return self.__to_user(dict(attr))
+ if FLAGS.ldap_user_modify_only:
+ if self.__ldap_user_exists(name):
+ # Retrieve user by name
+ user = self.__get_ldap_user(name)
+ # Entry could be malformed, test for missing attrs.
+ # Malformed entries are useless, replace attributes found.
+ attr = []
+ if 'secretKey' in user.keys():
+ attr.append((self.ldap.MOD_REPLACE, 'secretKey', \
+ [secret_key]))
+ else:
+ attr.append((self.ldap.MOD_ADD, 'secretKey', \
+ [secret_key]))
+ if 'accessKey' in user.keys():
+ attr.append((self.ldap.MOD_REPLACE, 'accessKey', \
+ [access_key]))
+ else:
+ attr.append((self.ldap.MOD_ADD, 'accessKey', \
+ [access_key]))
+ if 'isAdmin' in user.keys():
+ attr.append((self.ldap.MOD_REPLACE, 'isAdmin', \
+ [str(is_admin).upper()]))
+ else:
+ attr.append((self.ldap.MOD_ADD, 'isAdmin', \
+ [str(is_admin).upper()]))
+ self.conn.modify_s(self.__uid_to_dn(name), attr)
+ return self.get_user(name)
+ else:
+ raise exception.NotFound("LDAP object for %s doesn't exist"
+ % name)
+ else:
+ attr = [
+ ('objectclass', ['person',
+ 'organizationalPerson',
+ 'inetOrgPerson',
+ 'novaUser']),
+ ('ou', [FLAGS.ldap_user_unit]),
+ ('uid', [name]),
+ ('sn', [name]),
+ ('cn', [name]),
+ ('secretKey', [secret_key]),
+ ('accessKey', [access_key]),
+ ('isAdmin', [str(is_admin).upper()]),
+ ]
+ self.conn.add_s(self.__uid_to_dn(name), attr)
+ return self.__to_user(dict(attr))
def create_project(self, name, manager_uid,
description=None, member_uids=None):
@@ -155,7 +192,7 @@ class LdapDriver(object):
if description is None:
description = name
members = []
- if member_uids != None:
+ if member_uids is not None:
for member_uid in member_uids:
if not self.__user_exists(member_uid):
raise exception.NotFound("Project can't be created "
@@ -256,7 +293,24 @@ class LdapDriver(object):
if not self.__user_exists(uid):
raise exception.NotFound("User %s doesn't exist" % uid)
self.__remove_from_all(uid)
- self.conn.delete_s(self.__uid_to_dn(uid))
+ if FLAGS.ldap_user_modify_only:
+ # Delete attributes
+ attr = []
+ # Retrieve user by name
+ user = self.__get_ldap_user(uid)
+ if 'secretKey' in user.keys():
+ attr.append((self.ldap.MOD_DELETE, 'secretKey', \
+ user['secretKey']))
+ if 'accessKey' in user.keys():
+ attr.append((self.ldap.MOD_DELETE, 'accessKey', \
+ user['accessKey']))
+ if 'isAdmin' in user.keys():
+ attr.append((self.ldap.MOD_DELETE, 'isAdmin', \
+ user['isAdmin']))
+ self.conn.modify_s(self.__uid_to_dn(uid), attr)
+ else:
+ # Delete entry
+ self.conn.delete_s(self.__uid_to_dn(uid))
def delete_project(self, project_id):
"""Delete a project"""
@@ -265,7 +319,7 @@ class LdapDriver(object):
self.__delete_group(project_dn)
def modify_user(self, uid, access_key=None, secret_key=None, admin=None):
- """Modify an existing project"""
+ """Modify an existing user"""
if not access_key and not secret_key and admin is None:
return
attr = []
@@ -279,11 +333,21 @@ class LdapDriver(object):
def __user_exists(self, uid):
"""Check if user exists"""
- return self.get_user(uid) != None
+ return self.get_user(uid) is not None
+
+ def __ldap_user_exists(self, uid):
+ """Check if the user exists in ldap"""
+ return self.__get_ldap_user(uid) is not None
def __project_exists(self, project_id):
"""Check if project exists"""
- return self.get_project(project_id) != None
+ return self.get_project(project_id) is not None
+
+ def __get_ldap_user(self, uid):
+ """Retrieve LDAP user entry by id"""
+ attr = self.__find_object(self.__uid_to_dn(uid),
+ '(objectclass=novaUser)')
+ return attr
def __find_object(self, dn, query=None, scope=None):
"""Find an object by dn and query"""
@@ -330,12 +394,12 @@ class LdapDriver(object):
def __group_exists(self, dn):
"""Check if group exists"""
- return self.__find_object(dn, '(objectclass=groupOfNames)') != None
+ return self.__find_object(dn, '(objectclass=groupOfNames)') is not None
@staticmethod
def __role_to_dn(role, project_id=None):
"""Convert role to corresponding dn"""
- if project_id == None:
+ if project_id is None:
return FLAGS.__getitem__("ldap_%s" % role).value
else:
return 'cn=%s,cn=%s,%s' % (role,
@@ -349,7 +413,7 @@ class LdapDriver(object):
raise exception.Duplicate("Group can't be created because "
"group %s already exists" % name)
members = []
- if member_uids != None:
+ if member_uids is not None:
for member_uid in member_uids:
if not self.__user_exists(member_uid):
raise exception.NotFound("Group can't be created "
@@ -375,7 +439,7 @@ class LdapDriver(object):
res = self.__find_object(group_dn,
'(member=%s)' % self.__uid_to_dn(uid),
self.ldap.SCOPE_BASE)
- return res != None
+ return res is not None
def __add_to_group(self, uid, group_dn):
"""Add user to group"""
@@ -447,18 +511,22 @@ class LdapDriver(object):
@staticmethod
def __to_user(attr):
"""Convert ldap attributes to User object"""
- if attr == None:
+ if attr is None:
+ return None
+ if ('accessKey' in attr.keys() and 'secretKey' in attr.keys() \
+ and 'isAdmin' in attr.keys()):
+ return {
+ 'id': attr['uid'][0],
+ 'name': attr['cn'][0],
+ 'access': attr['accessKey'][0],
+ 'secret': attr['secretKey'][0],
+ 'admin': (attr['isAdmin'][0] == 'TRUE')}
+ else:
return None
- return {
- 'id': attr['uid'][0],
- 'name': attr['cn'][0],
- 'access': attr['accessKey'][0],
- 'secret': attr['secretKey'][0],
- 'admin': (attr['isAdmin'][0] == 'TRUE')}
def __to_project(self, attr):
"""Convert ldap attributes to Project object"""
- if attr == None:
+ if attr is None:
return None
member_dns = attr.get('member', [])
return {
diff --git a/nova/auth/manager.py b/nova/auth/manager.py
index 7b2b68161..11c3bd6df 100644
--- a/nova/auth/manager.py
+++ b/nova/auth/manager.py
@@ -624,6 +624,10 @@ class AuthManager(object):
with self.driver() as drv:
drv.modify_user(uid, access_key, secret_key, admin)
+ @staticmethod
+ def get_key_pairs(context):
+ return db.key_pair_get_all_by_user(context.elevated(), context.user_id)
+
def get_credentials(self, user, project=None):
"""Get credential zip for user in project"""
if not isinstance(user, User):
diff --git a/nova/auth/opendj.sh b/nova/auth/opendj.sh
new file mode 100755
index 000000000..8052c077d
--- /dev/null
+++ b/nova/auth/opendj.sh
@@ -0,0 +1,119 @@
+#!/usr/bin/env bash
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# LDAP INSTALL SCRIPT - IS IDEMPOTENT, does not scrub users
+
+apt-get install -y ldap-utils python-ldap openjdk-6-jre
+
+if [ ! -d "/usr/opendj" ]
+then
+ # TODO(rlane): Wikimedia Foundation is the current package maintainer.
+ # After the package is included in Ubuntu's channel, change this.
+ wget http://apt.wikimedia.org/wikimedia/pool/main/o/opendj/opendj_2.4.0-7_amd64.deb
+ dpkg -i opendj_2.4.0-7_amd64.deb
+fi
+
+abspath=`dirname "$(cd "${0%/*}" 2>/dev/null; echo "$PWD"/"${0##*/}")"`
+schemapath='/var/opendj/instance/config/schema'
+cp $abspath/openssh-lpk_sun.schema $schemapath/97-openssh-lpk_sun.ldif
+cp $abspath/nova_sun.schema $schemapath/98-nova_sun.ldif
+chown opendj:opendj $schemapath/97-openssh-lpk_sun.ldif
+chown opendj:opendj $schemapath/98-nova_sun.ldif
+
+cat >/etc/ldap/ldap.conf <<LDAP_CONF_EOF
+# LDAP Client Settings
+URI ldap://localhost
+BASE dc=example,dc=com
+BINDDN cn=Directory Manager
+SIZELIMIT 0
+TIMELIMIT 0
+LDAP_CONF_EOF
+
+cat >/etc/ldap/base.ldif <<BASE_LDIF_EOF
+# This is the root of the directory tree
+dn: dc=example,dc=com
+description: Example.Com, your trusted non-existent corporation.
+dc: example
+o: Example.Com
+objectClass: top
+objectClass: dcObject
+objectClass: organization
+
+# Subtree for users
+dn: ou=Users,dc=example,dc=com
+ou: Users
+description: Users
+objectClass: organizationalUnit
+
+# Subtree for groups
+dn: ou=Groups,dc=example,dc=com
+ou: Groups
+description: Groups
+objectClass: organizationalUnit
+
+# Subtree for system accounts
+dn: ou=System,dc=example,dc=com
+ou: System
+description: Special accounts used by software applications.
+objectClass: organizationalUnit
+
+# Special Account for Authentication:
+dn: uid=authenticate,ou=System,dc=example,dc=com
+uid: authenticate
+ou: System
+description: Special account for authenticating users
+userPassword: {MD5}TLnIqASP0CKUR3/LGkEZGg==
+objectClass: account
+objectClass: simpleSecurityObject
+
+# create the sysadmin entry
+
+dn: cn=developers,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: developers
+description: IT admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=sysadmins,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: sysadmins
+description: IT admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=netadmins,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: netadmins
+description: Network admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=cloudadmins,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: cloudadmins
+description: Cloud admin group
+member: uid=admin,ou=Users,dc=example,dc=com
+
+dn: cn=itsec,ou=Groups,dc=example,dc=com
+objectclass: groupOfNames
+cn: itsec
+description: IT security users group
+member: uid=admin,ou=Users,dc=example,dc=com
+BASE_LDIF_EOF
+
+/etc/init.d/opendj stop
+su - opendj -c '/usr/opendj/setup -i -b "dc=example,dc=com" -l /etc/ldap/base.ldif -S -w changeme -O -n --noPropertiesFile'
+/etc/init.d/opendj start
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 929342a1e..8e0efa4cc 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -20,6 +20,7 @@
Handles all API requests relating to instances (guest vms).
"""
+import datetime
import logging
import time
@@ -43,16 +44,18 @@ def generate_default_hostname(internal_id):
class ComputeAPI(base.Base):
"""API for interacting with the compute manager."""
- def __init__(self, **kwargs):
- self.network_manager = utils.import_object(FLAGS.network_manager)
+ def __init__(self, network_manager=None, image_service=None, **kwargs):
+ if not network_manager:
+ network_manager = utils.import_object(FLAGS.network_manager)
+ self.network_manager = network_manager
+ if not image_service:
+ image_service = utils.import_object(FLAGS.image_service)
+ self.image_service = image_service
super(ComputeAPI, self).__init__(**kwargs)
- # TODO(eday): network_topic arg should go away once we push network
- # allocation into the scheduler or compute worker.
- def create_instances(self, context, instance_type, image_service, image_id,
- network_topic, min_count=1, max_count=1,
- kernel_id=None, ramdisk_id=None, name='',
- description='', user_data='', key_name=None,
+ def create_instances(self, context, instance_type, image_id, min_count=1,
+ max_count=1, kernel_id=None, ramdisk_id=None,
+ display_name='', description='', key_name=None,
key_data=None, security_group='default',
generate_hostname=generate_default_hostname):
"""Create the number of instances requested if quote and
@@ -69,15 +72,15 @@ class ComputeAPI(base.Base):
is_vpn = image_id == FLAGS.vpn_image_id
if not is_vpn:
- image = image_service.show(context, image_id)
+ image = self.image_service.show(context, image_id)
if kernel_id is None:
kernel_id = image.get('kernelId', FLAGS.default_kernel)
if ramdisk_id is None:
ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
# Make sure we have access to kernel and ramdisk
- image_service.show(context, kernel_id)
- image_service.show(context, ramdisk_id)
+ self.image_service.show(context, kernel_id)
+ self.image_service.show(context, ramdisk_id)
if security_group is None:
security_group = ['default']
@@ -110,7 +113,7 @@ class ComputeAPI(base.Base):
'memory_mb': type_data['memory_mb'],
'vcpus': type_data['vcpus'],
'local_gb': type_data['local_gb'],
- 'display_name': name,
+ 'display_name': display_name,
'display_description': description,
'key_name': key_name,
'key_data': key_data}
@@ -122,14 +125,25 @@ class ComputeAPI(base.Base):
instance = dict(mac_address=utils.generate_mac(),
launch_index=num,
**base_options)
- instance_ref = self.create_instance(context, security_groups,
- **instance)
- instance_id = instance_ref['id']
- internal_id = instance_ref['internal_id']
- hostname = generate_hostname(internal_id)
- self.update_instance(context, instance_id, hostname=hostname)
- instances.append(dict(id=instance_id, internal_id=internal_id,
- hostname=hostname, **instance))
+ instance = self.db.instance_create(context, instance)
+ instance_id = instance['id']
+ internal_id = instance['internal_id']
+
+ elevated = context.elevated()
+ if not security_groups:
+ security_groups = []
+ for security_group_id in security_groups:
+ self.db.instance_add_security_group(elevated,
+ instance_id,
+ security_group_id)
+
+ # Set sane defaults if not specified
+ updates = dict(hostname=generate_hostname(internal_id))
+ if 'display_name' not in instance:
+ updates['display_name'] = "Server %s" % internal_id
+
+ instance = self.update_instance(context, instance_id, **updates)
+ instances.append(instance)
# TODO(vish): This probably should be done in the scheduler
# or in compute as a call. The network should be
@@ -139,12 +153,12 @@ class ComputeAPI(base.Base):
instance_id,
is_vpn)
rpc.cast(elevated,
- network_topic,
+ self._get_network_topic(context),
{"method": "setup_fixed_ip",
"args": {"address": address}})
- logging.debug("Casting to scheduler for %s/%s's instance %s" %
- (context.project_id, context.user_id, instance_id))
+ logging.debug("Casting to scheduler for %s/%s's instance %s",
+ context.project_id, context.user_id, instance_id)
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "run_instance",
@@ -154,6 +168,12 @@ class ComputeAPI(base.Base):
return instances
def ensure_default_security_group(self, context):
+ """ Create security group for the security context if it
+ does not already exist
+
+ :param context: the security context
+
+ """
try:
db.security_group_get_by_name(context, context.project_id,
'default')
@@ -162,40 +182,7 @@ class ComputeAPI(base.Base):
'description': 'default',
'user_id': context.user_id,
'project_id': context.project_id}
- group = db.security_group_create(context, values)
-
- def create_instance(self, context, security_groups=None, **kwargs):
- """Creates the instance in the datastore and returns the
- new instance as a mapping
-
- :param context: The security context
- :param security_groups: list of security group ids to
- attach to the instance
- :param kwargs: All additional keyword args are treated
- as data fields of the instance to be
- created
-
- :retval Returns a mapping of the instance information
- that has just been created
-
- """
- instance_ref = self.db.instance_create(context, kwargs)
- inst_id = instance_ref['id']
- # Set sane defaults if not specified
- if kwargs.get('display_name') is None:
- display_name = "Server %s" % instance_ref['internal_id']
- instance_ref['display_name'] = display_name
- self.db.instance_update(context, inst_id,
- {'display_name': display_name})
-
- elevated = context.elevated()
- if not security_groups:
- security_groups = []
- for security_group_id in security_groups:
- self.db.instance_add_security_group(elevated,
- inst_id,
- security_group_id)
- return instance_ref
+ db.security_group_create(context, values)
def update_instance(self, context, instance_id, **kwargs):
"""Updates the instance in the datastore.
@@ -209,4 +196,110 @@ class ComputeAPI(base.Base):
:retval None
"""
- self.db.instance_update(context, instance_id, kwargs)
+ return self.db.instance_update(context, instance_id, kwargs)
+
+ def delete_instance(self, context, instance_id):
+ logging.debug("Going to try and terminate %d" % instance_id)
+ try:
+ instance = self.db.instance_get_by_internal_id(context,
+ instance_id)
+ except exception.NotFound as e:
+ logging.warning("Instance %d was not found during terminate",
+ instance_id)
+ raise e
+
+ if (instance['state_description'] == 'terminating'):
+ logging.warning("Instance %d is already being terminated",
+ instance_id)
+ return
+
+ self.update_instance(context,
+ instance['id'],
+ state_description='terminating',
+ state=0,
+ terminated_at=datetime.datetime.utcnow())
+
+ # FIXME(ja): where should network deallocate occur?
+ address = self.db.instance_get_floating_address(context,
+ instance['id'])
+ if address:
+ logging.debug("Disassociating address %s" % address)
+ # NOTE(vish): Right now we don't really care if the ip is
+ # disassociated. We may need to worry about
+ # checking this later. Perhaps in the scheduler?
+ rpc.cast(context,
+ self._get_network_topic(context),
+ {"method": "disassociate_floating_ip",
+ "args": {"floating_address": address}})
+
+ address = self.db.instance_get_fixed_address(context, instance['id'])
+ if address:
+ logging.debug("Deallocating address %s" % address)
+ # NOTE(vish): Currently, nothing needs to be done on the
+ # network node until release. If this changes,
+ # we will need to cast here.
+ self.network_manager.deallocate_fixed_ip(context.elevated(),
+ address)
+
+ host = instance['host']
+ if host:
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "terminate_instance",
+ "args": {"instance_id": instance['id']}})
+ else:
+ self.db.instance_destroy(context, instance['id'])
+
+ def get_instances(self, context, project_id=None):
+ """Get all instances, possibly filtered by project ID or
+ user ID. If there is no filter and the context is an admin,
+ it will retreive all instances in the system."""
+ if project_id or not context.is_admin:
+ if not context.project:
+ return self.db.instance_get_all_by_user(context,
+ context.user_id)
+ if project_id is None:
+ project_id = context.project_id
+ return self.db.instance_get_all_by_project(context, project_id)
+ return self.db.instance_get_all(context)
+
+ def get_instance(self, context, instance_id):
+ return self.db.instance_get_by_internal_id(context, instance_id)
+
+ def reboot(self, context, instance_id):
+ """Reboot the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "reboot_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def rescue(self, context, instance_id):
+ """Rescue the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "rescue_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def unrescue(self, context, instance_id):
+ """Unrescue the given instance."""
+ instance = self.db.instance_get_by_internal_id(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "unrescue_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def _get_network_topic(self, context):
+ """Retrieves the network host for a project"""
+ network_ref = self.network_manager.get_network(context)
+ host = network_ref['host']
+ if not host:
+ host = rpc.call(context,
+ FLAGS.network_topic,
+ {"method": "set_network_host",
+ "args": {"network_id": network_ref['id']}})
+ return self.db.queue_get_for(context, FLAGS.network_topic, host)
diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py
index a2679e0fc..6e47170bd 100644
--- a/nova/compute/instance_types.py
+++ b/nova/compute/instance_types.py
@@ -22,6 +22,7 @@ The built-in instance properties.
"""
from nova import flags
+from nova import exception
FLAGS = flags.FLAGS
INSTANCE_TYPES = {
@@ -37,8 +38,7 @@ def get_by_type(instance_type):
if instance_type is None:
return FLAGS.default_instance_type
if instance_type not in INSTANCE_TYPES:
- raise exception.ApiError("Unknown instance type: %s",
- instance_type)
+ raise exception.ApiError("Unknown instance type: %s" % instance_type)
return instance_type
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 2dc140274..55036d1d1 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -732,6 +732,7 @@ def instance_update(context, instance_id, values):
instance_ref = instance_get(context, instance_id, session=session)
instance_ref.update(values)
instance_ref.save(session=session)
+ return instance_ref
def instance_add_security_group(context, instance_id, security_group_id):
diff --git a/nova/flags.py b/nova/flags.py
index 1f94feb08..c6578023d 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -259,7 +259,7 @@ DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
'Manager for scheduler')
# The service to use for image search and retrieval
-DEFINE_string('image_service', 'nova.image.local.LocalImageService',
+DEFINE_string('image_service', 'nova.image.s3.S3ImageService',
'The service to use for retrieving and searching for images.')
DEFINE_string('host', socket.gethostname(),
diff --git a/nova/objectstore/image.py b/nova/objectstore/image.py
index 7292dbab8..34a90b0a2 100644
--- a/nova/objectstore/image.py
+++ b/nova/objectstore/image.py
@@ -21,7 +21,6 @@ Take uploaded bucket contents and register them as disk images (AMIs).
Requires decryption using keys in the manifest.
"""
-# TODO(jesse): Got these from Euca2ools, will need to revisit them
import binascii
import glob
@@ -29,7 +28,6 @@ import json
import os
import shutil
import tarfile
-import tempfile
from xml.etree import ElementTree
from nova import exception
@@ -199,12 +197,17 @@ class Image(object):
except:
ramdisk_id = None
+ try:
+ arch = manifest.find("machine_configuration/architecture").text
+ except:
+ arch = 'x86_64'
+
info = {
'imageId': image_id,
'imageLocation': image_location,
'imageOwnerId': context.project_id,
'isPublic': False, # FIXME: grab public from manifest
- 'architecture': 'x86_64', # FIXME: grab architecture from manifest
+ 'architecture': arch,
'imageType': image_type}
if kernel_id:
diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py
index aaf213923..8dc87d0e2 100644
--- a/nova/tests/__init__.py
+++ b/nova/tests/__init__.py
@@ -29,3 +29,8 @@
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
+
+# See http://code.google.com/p/python-nose/issues/detail?id=373
+# The code below enables nosetests to work with i18n _() blocks
+import __builtin__
+setattr(__builtin__, '_', lambda x: x)
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 7c0343942..21b8aac1c 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -24,9 +24,10 @@ import webob
import webob.dec
from nova import auth
-from nova import utils
-from nova import flags
+from nova import context
from nova import exception as exc
+from nova import flags
+from nova import utils
import nova.api.openstack.auth
from nova.image import service
from nova.image import glance
@@ -58,7 +59,7 @@ def fake_auth_init(self):
@webob.dec.wsgify
def fake_wsgi(self, req):
- req.environ['nova.context'] = dict(user=dict(id=1))
+ req.environ['nova.context'] = context.RequestContext(1, 1)
if req.body:
req.environ['inst_dict'] = json.loads(req.body)
return self.application
@@ -67,8 +68,7 @@ def fake_wsgi(self, req):
def stub_out_key_pair_funcs(stubs):
def key_pair(context, user_id):
return [dict(name='key', public_key='public_key')]
- stubs.Set(nova.db.api, 'key_pair_get_all_by_user',
- key_pair)
+ stubs.Set(nova.db, 'key_pair_get_all_by_user', key_pair)
def stub_out_image_service(stubs):
@@ -172,6 +172,12 @@ class FakeToken(object):
setattr(self, k, v)
+class FakeRequestContext(object):
+ def __init__(self, user, project):
+ self.user_id = 1
+ self.project_id = 1
+
+
class FakeAuthDatabase(object):
data = {}
diff --git a/nova/tests/api/openstack/test_adminapi.py b/nova/tests/api/openstack/test_adminapi.py
new file mode 100644
index 000000000..1b2e1654d
--- /dev/null
+++ b/nova/tests/api/openstack/test_adminapi.py
@@ -0,0 +1,61 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import stubout
+import webob
+
+import nova.api
+from nova import flags
+from nova.tests.api.openstack import fakes
+
+FLAGS = flags.FLAGS
+
+
+class AdminAPITest(unittest.TestCase):
+ def setUp(self):
+ self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.auth_data = {}
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_networking(self.stubs)
+ fakes.stub_out_rate_limiting(self.stubs)
+ fakes.stub_out_auth(self.stubs)
+ self.allow_admin = FLAGS.allow_admin_api
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.allow_admin_api = self.allow_admin
+
+ def test_admin_enabled(self):
+ FLAGS.allow_admin_api = True
+ # We should still be able to access public operations.
+ req = webob.Request.blank('/v1.0/flavors')
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 200)
+ # TODO: Confirm admin operations are available.
+
+ def test_admin_disabled(self):
+ FLAGS.allow_admin_api = False
+ # We should still be able to access public operations.
+ req = webob.Request.blank('/v1.0/flavors')
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 200)
+ # TODO: Confirm admin operations are unavailable.
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py
index 14e720be4..7b427c2db 100644
--- a/nova/tests/api/openstack/test_auth.py
+++ b/nova/tests/api/openstack/test_auth.py
@@ -26,6 +26,7 @@ import nova.api
import nova.api.openstack.auth
import nova.auth.manager
from nova import auth
+from nova import context
from nova.tests.api.openstack import fakes
@@ -35,6 +36,7 @@ class Test(unittest.TestCase):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
'__init__', fakes.fake_auth_init)
+ self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_rate_limiting(self.stubs)
@@ -131,6 +133,7 @@ class TestLimiter(unittest.TestCase):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
'__init__', fakes.fake_auth_init)
+ self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 44ac8f342..8444b6fce 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -47,6 +47,14 @@ def return_security_group(context, instance_id, security_group_id):
pass
+def instance_update(context, instance_id, kwargs):
+ return stub_instance(instance_id)
+
+
+def instance_address(context, instance_id):
+ return None
+
+
def stub_instance(id, user_id=1):
return Instance(id=id + 123456, state=0, image_id=10, user_id=user_id,
display_name='server%s' % id, internal_id=id)
@@ -69,6 +77,11 @@ class ServersTest(unittest.TestCase):
return_servers)
self.stubs.Set(nova.db.api, 'instance_add_security_group',
return_security_group)
+ self.stubs.Set(nova.db.api, 'instance_update', instance_update)
+ self.stubs.Set(nova.db.api, 'instance_get_fixed_address',
+ instance_address)
+ self.stubs.Set(nova.db.api, 'instance_get_floating_address',
+ instance_address)
def tearDown(self):
self.stubs.UnsetAll()
@@ -93,11 +106,11 @@ class ServersTest(unittest.TestCase):
i += 1
def test_create_instance(self):
- def server_update(context, id, params):
- pass
-
def instance_create(context, inst):
- return {'id': 1, 'internal_id': 1}
+ return {'id': 1, 'internal_id': 1, 'display_name': ''}
+
+ def server_update(context, id, params):
+ return instance_create(context, id)
def fake_method(*args, **kwargs):
pass
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 9886a2449..770c94219 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -126,6 +126,19 @@ class CloudTestCase(test.TrialTestCase):
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
+ def test_describe_volumes(self):
+ """Makes sure describe_volumes works and filters results."""
+ vol1 = db.volume_create(self.context, {})
+ vol2 = db.volume_create(self.context, {})
+ result = self.cloud.describe_volumes(self.context)
+ self.assertEqual(len(result['volumeSet']), 2)
+ result = self.cloud.describe_volumes(self.context,
+ volume_id=[vol2['ec2_id']])
+ self.assertEqual(len(result['volumeSet']), 1)
+ self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id'])
+ db.volume_destroy(self.context, vol1['id'])
+ db.volume_destroy(self.context, vol2['id'])
+
def test_console_output(self):
image_id = FLAGS.default_image
instance_type = FLAGS.default_instance_type
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index a55449739..6f3ef96cb 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -72,33 +72,27 @@ class ComputeTestCase(test.TrialTestCase):
"""Verify that an instance cannot be created without a display_name."""
cases = [dict(), dict(display_name=None)]
for instance in cases:
- ref = self.compute_api.create_instance(self.context, None,
- **instance)
+ ref = self.compute_api.create_instances(self.context,
+ FLAGS.default_instance_type, None, **instance)
try:
- self.assertNotEqual(ref.display_name, None)
+ self.assertNotEqual(ref[0].display_name, None)
finally:
- db.instance_destroy(self.context, ref['id'])
+ db.instance_destroy(self.context, ref[0]['id'])
def test_create_instance_associates_security_groups(self):
- """Make sure create_instance associates security groups"""
- inst = {}
- inst['user_id'] = self.user.id
- inst['project_id'] = self.project.id
+ """Make sure create_instances associates security groups"""
values = {'name': 'default',
'description': 'default',
'user_id': self.user.id,
'project_id': self.project.id}
group = db.security_group_create(self.context, values)
- ref = self.compute_api.create_instance(self.context,
- security_groups=[group['id']],
- **inst)
- # reload to get groups
- instance_ref = db.instance_get(self.context, ref['id'])
+ ref = self.compute_api.create_instances(self.context,
+ FLAGS.default_instance_type, None, security_group=['default'])
try:
- self.assertEqual(len(instance_ref['security_groups']), 1)
+ self.assertEqual(len(ref[0]['security_groups']), 1)
finally:
db.security_group_destroy(self.context, group['id'])
- db.instance_destroy(self.context, instance_ref['id'])
+ db.instance_destroy(self.context, ref[0]['id'])
@defer.inlineCallbacks
def test_run_terminate(self):
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 11f0fa8ce..c40bb4bb4 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -25,7 +25,7 @@ import sys
from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
-from nova.virt import xenapi
+from nova.virt import xenapi_conn
FLAGS = flags.FLAGS
@@ -61,7 +61,7 @@ def get_connection(read_only=False):
elif t == 'libvirt':
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
- conn = xenapi.get_connection(read_only)
+ conn = xenapi_conn.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
deleted file mode 100644
index de3d68582..000000000
--- a/nova/virt/xenapi.py
+++ /dev/null
@@ -1,442 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (c) 2010 Citrix Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-A connection to XenServer or Xen Cloud Platform.
-
-The concurrency model for this class is as follows:
-
-All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
-deferredToThread). They are remote calls, and so may hang for the usual
-reasons. They should not be allowed to block the reactor thread.
-
-All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion. Polling is handled using reactor.callLater.
-
-This combination of techniques means that we don't block the reactor thread at
-all, and at the same time we don't hold lots of threads waiting for
-long-running operations.
-
-FIXME: get_info currently doesn't conform to these rules, and will block the
-reactor thread if the VM.get_by_name_label or VM.get_record calls block.
-
-**Related Flags**
-
-:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
-:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
- Platform (default: root).
-:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
- Platform.
-:xenapi_task_poll_interval: The interval (seconds) used for polling of
- remote tasks (Async.VM.start, etc)
- (default: 0.5).
-
-"""
-
-import logging
-import xmlrpclib
-
-from twisted.internet import defer
-from twisted.internet import reactor
-
-from nova import db
-from nova import flags
-from nova import utils
-from nova.auth.manager import AuthManager
-from nova.compute import instance_types
-from nova.compute import power_state
-from nova.virt import images
-
-XenAPI = None
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string('xenapi_connection_url',
- None,
- 'URL for connection to XenServer/Xen Cloud Platform.'
- ' Required if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_username',
- 'root',
- 'Username for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_string('xenapi_connection_password',
- None,
- 'Password for connection to XenServer/Xen Cloud Platform.'
- ' Used only if connection_type=xenapi.')
-flags.DEFINE_float('xenapi_task_poll_interval',
- 0.5,
- 'The interval used for polling of remote tasks '
- '(Async.VM.start, etc). Used only if '
- 'connection_type=xenapi.')
-
-
-XENAPI_POWER_STATE = {
- 'Halted': power_state.SHUTDOWN,
- 'Running': power_state.RUNNING,
- 'Paused': power_state.PAUSED,
- 'Suspended': power_state.SHUTDOWN, # FIXME
- 'Crashed': power_state.CRASHED}
-
-
-def get_connection(_):
- """Note that XenAPI doesn't have a read-only connection mode, so
- the read_only parameter is ignored."""
- # This is loaded late so that there's no need to install this
- # library when not using XenAPI.
- global XenAPI
- if XenAPI is None:
- XenAPI = __import__('XenAPI')
- url = FLAGS.xenapi_connection_url
- username = FLAGS.xenapi_connection_username
- password = FLAGS.xenapi_connection_password
- if not url or password is None:
- raise Exception('Must specify xenapi_connection_url, '
- 'xenapi_connection_username (optionally), and '
- 'xenapi_connection_password to use '
- 'connection_type=xenapi')
- return XenAPIConnection(url, username, password)
-
-
-class XenAPIConnection(object):
- def __init__(self, url, user, pw):
- self._conn = XenAPI.Session(url)
- self._conn.login_with_password(user, pw)
-
- def list_instances(self):
- return [self._conn.xenapi.VM.get_name_label(vm) \
- for vm in self._conn.xenapi.VM.get_all()]
-
- @defer.inlineCallbacks
- def spawn(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is not None:
- raise Exception('Attempted to create non-unique name %s' %
- instance.name)
-
- network = db.project_get_network(None, instance.project_id)
- network_ref = \
- yield self._find_network_with_bridge(network.bridge)
-
- user = AuthManager().get_user(instance.user_id)
- project = AuthManager().get_project(instance.project_id)
- vdi_uuid = yield self._fetch_image(
- instance.image_id, user, project, True)
- kernel = yield self._fetch_image(
- instance.kernel_id, user, project, False)
- ramdisk = yield self._fetch_image(
- instance.ramdisk_id, user, project, False)
- vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
-
- vm_ref = yield self._create_vm(instance, kernel, ramdisk)
- yield self._create_vbd(vm_ref, vdi_ref, 0, True)
- if network_ref:
- yield self._create_vif(vm_ref, network_ref, instance.mac_address)
- logging.debug('Starting VM %s...', vm_ref)
- yield self._call_xenapi('VM.start', vm_ref, False, False)
- logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
-
- @defer.inlineCallbacks
- def _create_vm(self, instance, kernel, ramdisk):
- """Create a VM record. Returns a Deferred that gives the new
- VM reference."""
-
- instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
- mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
- vcpus = str(instance_type['vcpus'])
- rec = {
- 'name_label': instance.name,
- 'name_description': '',
- 'is_a_template': False,
- 'memory_static_min': '0',
- 'memory_static_max': mem,
- 'memory_dynamic_min': mem,
- 'memory_dynamic_max': mem,
- 'VCPUs_at_startup': vcpus,
- 'VCPUs_max': vcpus,
- 'VCPUs_params': {},
- 'actions_after_shutdown': 'destroy',
- 'actions_after_reboot': 'restart',
- 'actions_after_crash': 'destroy',
- 'PV_bootloader': '',
- 'PV_kernel': kernel,
- 'PV_ramdisk': ramdisk,
- 'PV_args': 'root=/dev/xvda1',
- 'PV_bootloader_args': '',
- 'PV_legacy_args': '',
- 'HVM_boot_policy': '',
- 'HVM_boot_params': {},
- 'platform': {},
- 'PCI_bus': '',
- 'recommendations': '',
- 'affinity': '',
- 'user_version': '0',
- 'other_config': {},
- }
- logging.debug('Created VM %s...', instance.name)
- vm_ref = yield self._call_xenapi('VM.create', rec)
- logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- defer.returnValue(vm_ref)
-
- @defer.inlineCallbacks
- def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
- """Create a VBD record. Returns a Deferred that gives the new
- VBD reference."""
-
- vbd_rec = {}
- vbd_rec['VM'] = vm_ref
- vbd_rec['VDI'] = vdi_ref
- vbd_rec['userdevice'] = str(userdevice)
- vbd_rec['bootable'] = bootable
- vbd_rec['mode'] = 'RW'
- vbd_rec['type'] = 'disk'
- vbd_rec['unpluggable'] = True
- vbd_rec['empty'] = False
- vbd_rec['other_config'] = {}
- vbd_rec['qos_algorithm_type'] = ''
- vbd_rec['qos_algorithm_params'] = {}
- vbd_rec['qos_supported_algorithms'] = []
- logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
- logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
- vdi_ref)
- defer.returnValue(vbd_ref)
-
- @defer.inlineCallbacks
- def _create_vif(self, vm_ref, network_ref, mac_address):
- """Create a VIF record. Returns a Deferred that gives the new
- VIF reference."""
-
- vif_rec = {}
- vif_rec['device'] = '0'
- vif_rec['network'] = network_ref
- vif_rec['VM'] = vm_ref
- vif_rec['MAC'] = mac_address
- vif_rec['MTU'] = '1500'
- vif_rec['other_config'] = {}
- vif_rec['qos_algorithm_type'] = ''
- vif_rec['qos_algorithm_params'] = {}
- logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
- network_ref)
- vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
- logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
- vm_ref, network_ref)
- defer.returnValue(vif_ref)
-
- @defer.inlineCallbacks
- def _find_network_with_bridge(self, bridge):
- expr = 'field "bridge" = "%s"' % bridge
- networks = yield self._call_xenapi('network.get_all_records_where',
- expr)
- if len(networks) == 1:
- defer.returnValue(networks.keys()[0])
- elif len(networks) > 1:
- raise Exception('Found non-unique network for bridge %s' % bridge)
- else:
- raise Exception('Found no network for bridge %s' % bridge)
-
- @defer.inlineCallbacks
- def _fetch_image(self, image, user, project, use_sr):
- """use_sr: True to put the image as a VDI in an SR, False to place
- it on dom0's filesystem. The former is for VM disks, the latter for
- its kernel and ramdisk (if external kernels are being used).
- Returns a Deferred that gives the new VDI UUID."""
-
- url = images.image_url(image)
- access = AuthManager().get_access_key(user, project)
- logging.debug("Asking xapi to fetch %s as %s" % (url, access))
- fn = use_sr and 'get_vdi' or 'get_kernel'
- args = {}
- args['src_url'] = url
- args['username'] = access
- args['password'] = user.secret
- if use_sr:
- args['add_partition'] = 'true'
- task = yield self._async_call_plugin('objectstore', fn, args)
- uuid = yield self._wait_for_task(task)
- defer.returnValue(uuid)
-
- @defer.inlineCallbacks
- def reboot(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- raise Exception('instance not present %s' % instance.name)
- task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
- yield self._wait_for_task(task)
-
- @defer.inlineCallbacks
- def destroy(self, instance):
- vm = yield self._lookup(instance.name)
- if vm is None:
- # Don't complain, just return. This lets us clean up instances
- # that have already disappeared from the underlying platform.
- defer.returnValue(None)
- # Get the VDIs related to the VM
- vdis = yield self._lookup_vm_vdis(vm)
- try:
- task = yield self._call_xenapi('Async.VM.hard_shutdown', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- # Disk clean-up
- if vdis:
- for vdi in vdis:
- try:
- task = yield self._call_xenapi('Async.VDI.destroy', vdi)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
- try:
- task = yield self._call_xenapi('Async.VM.destroy', vm)
- yield self._wait_for_task(task)
- except Exception, exc:
- logging.warn(exc)
-
- def get_info(self, instance_id):
- vm = self._lookup_blocking(instance_id)
- if vm is None:
- raise Exception('instance not present %s' % instance_id)
- rec = self._conn.xenapi.VM.get_record(vm)
- return {'state': XENAPI_POWER_STATE[rec['power_state']],
- 'max_mem': long(rec['memory_static_max']) >> 10,
- 'mem': long(rec['memory_dynamic_max']) >> 10,
- 'num_cpu': rec['VCPUs_max'],
- 'cpu_time': 0}
-
- def get_console_output(self, instance):
- return 'FAKE CONSOLE OUTPUT'
-
- @utils.deferredToThread
- def _lookup(self, i):
- return self._lookup_blocking(i)
-
- def _lookup_blocking(self, i):
- vms = self._conn.xenapi.VM.get_by_name_label(i)
- n = len(vms)
- if n == 0:
- return None
- elif n > 1:
- raise Exception('duplicate name found: %s' % i)
- else:
- return vms[0]
-
- @utils.deferredToThread
- def _lookup_vm_vdis(self, vm):
- return self._lookup_vm_vdis_blocking(vm)
-
- def _lookup_vm_vdis_blocking(self, vm):
- # Firstly we get the VBDs, then the VDIs.
- # TODO: do we leave the read-only devices?
- vbds = self._conn.xenapi.VM.get_VBDs(vm)
- vdis = []
- if vbds:
- for vbd in vbds:
- try:
- vdi = self._conn.xenapi.VBD.get_VDI(vbd)
- # Test valid VDI
- record = self._conn.xenapi.VDI.get_record(vdi)
- except Exception, exc:
- logging.warn(exc)
- else:
- vdis.append(vdi)
- if len(vdis) > 0:
- return vdis
- else:
- return None
-
- def _wait_for_task(self, task):
- """Return a Deferred that will give the result of the given task.
- The task is polled until it completes."""
- d = defer.Deferred()
- reactor.callLater(0, self._poll_task, task, d)
- return d
-
- @utils.deferredToThread
- def _poll_task(self, task, deferred):
- """Poll the given XenAPI task, and fire the given Deferred if we
- get a result."""
- try:
- #logging.debug('Polling task %s...', task)
- status = self._conn.xenapi.task.get_status(task)
- if status == 'pending':
- reactor.callLater(FLAGS.xenapi_task_poll_interval,
- self._poll_task, task, deferred)
- elif status == 'success':
- result = self._conn.xenapi.task.get_result(task)
- logging.info('Task %s status: success. %s', task, result)
- deferred.callback(_parse_xmlrpc_value(result))
- else:
- error_info = self._conn.xenapi.task.get_error_info(task)
- logging.warn('Task %s status: %s. %s', task, status,
- error_info)
- deferred.errback(XenAPI.Failure(error_info))
- #logging.debug('Polling task %s done.', task)
- except Exception, exc:
- logging.warn(exc)
- deferred.errback(exc)
-
- @utils.deferredToThread
- def _call_xenapi(self, method, *args):
- """Call the specified XenAPI method on a background thread. Returns
- a Deferred for the result."""
- f = self._conn.xenapi
- for m in method.split('.'):
- f = f.__getattr__(m)
- return f(*args)
-
- @utils.deferredToThread
- def _async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread. Returns a
- Deferred with the task reference."""
- return _unwrap_plugin_exceptions(
- self._conn.xenapi.Async.host.call_plugin,
- self._get_xenapi_host(), plugin, fn, args)
-
- def _get_xenapi_host(self):
- return self._conn.xenapi.session.get_this_host(self._conn.handle)
-
-
-def _unwrap_plugin_exceptions(func, *args, **kwargs):
- try:
- return func(*args, **kwargs)
- except XenAPI.Failure, exc:
- logging.debug("Got exception: %s", exc)
- if (len(exc.details) == 4 and
- exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
- exc.details[2] == 'Failure'):
- params = None
- try:
- params = eval(exc.details[3])
- except:
- raise exc
- raise XenAPI.Failure(params)
- else:
- raise
- except xmlrpclib.ProtocolError, exc:
- logging.debug("Got exception: %s", exc)
- raise
-
-
-def _parse_xmlrpc_value(val):
- """Parse the given value as if it were an XML-RPC value. This is
- sometimes used as the format for the task.result field."""
- if not val:
- return val
- x = xmlrpclib.loads(
- '<?xml version="1.0"?><methodResponse><params><param>' +
- val +
- '</param></params></methodResponse>')
- return x[0][0]
diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py
new file mode 100644
index 000000000..3d598c463
--- /dev/null
+++ b/nova/virt/xenapi/__init__.py
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py
new file mode 100644
index 000000000..8cb4cce3a
--- /dev/null
+++ b/nova/virt/xenapi/network_utils.py
@@ -0,0 +1,45 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of network
+records and their attributes like bridges, PIFs, QoS, as well as
+their lookup functions.
+"""
+
+from twisted.internet import defer
+
+
+class NetworkHelper():
+ """
+ The class that wraps the helper methods together.
+ """
+ def __init__(self):
+ return
+
+ @classmethod
+ @defer.inlineCallbacks
+ def find_network_with_bridge(cls, session, bridge):
+ """ Return the network on which the bridge is attached, if found """
+ expr = 'field "bridge" = "%s"' % bridge
+ networks = yield session.call_xenapi('network.get_all_records_where',
+ expr)
+ if len(networks) == 1:
+ defer.returnValue(networks.keys()[0])
+ elif len(networks) > 1:
+ raise Exception('Found non-unique network for bridge %s' % bridge)
+ else:
+ raise Exception('Found no network for bridge %s' % bridge)
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
new file mode 100644
index 000000000..77edb576e
--- /dev/null
+++ b/nova/virt/xenapi/vm_utils.py
@@ -0,0 +1,268 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Helper methods for operations related to the management of VM records and
+their attributes like VDIs, VIFs, as well as their lookup functions.
+"""
+
+import logging
+import urllib
+
+from twisted.internet import defer
+from xml.dom import minidom
+
+from nova import flags
+from nova import utils
+
+from nova.auth.manager import AuthManager
+from nova.compute import instance_types
+from nova.compute import power_state
+from nova.virt import images
+
+FLAGS = flags.FLAGS
+
+XENAPI_POWER_STATE = {
+ 'Halted': power_state.SHUTDOWN,
+ 'Running': power_state.RUNNING,
+ 'Paused': power_state.PAUSED,
+ 'Suspended': power_state.SHUTDOWN, # FIXME
+ 'Crashed': power_state.CRASHED}
+
+XenAPI = None
+
+
+class VMHelper():
+ """
+ The class that wraps the helper methods together.
+ """
+ def __init__(self):
+ return
+
+ @classmethod
+ def late_import(cls):
+ """
+ Load the XenAPI module in for helper class, if required.
+ This is to avoid to install the XenAPI library when other
+ hypervisors are used
+ """
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vm(cls, session, instance, kernel, ramdisk):
+ """Create a VM record. Returns a Deferred that gives the new
+ VM reference."""
+
+ instance_type = instance_types.INSTANCE_TYPES[instance.instance_type]
+ mem = str(long(instance_type['memory_mb']) * 1024 * 1024)
+ vcpus = str(instance_type['vcpus'])
+ rec = {
+ 'name_label': instance.name,
+ 'name_description': '',
+ 'is_a_template': False,
+ 'memory_static_min': '0',
+ 'memory_static_max': mem,
+ 'memory_dynamic_min': mem,
+ 'memory_dynamic_max': mem,
+ 'VCPUs_at_startup': vcpus,
+ 'VCPUs_max': vcpus,
+ 'VCPUs_params': {},
+ 'actions_after_shutdown': 'destroy',
+ 'actions_after_reboot': 'restart',
+ 'actions_after_crash': 'destroy',
+ 'PV_bootloader': '',
+ 'PV_kernel': kernel,
+ 'PV_ramdisk': ramdisk,
+ 'PV_args': 'root=/dev/xvda1',
+ 'PV_bootloader_args': '',
+ 'PV_legacy_args': '',
+ 'HVM_boot_policy': '',
+ 'HVM_boot_params': {},
+ 'platform': {},
+ 'PCI_bus': '',
+ 'recommendations': '',
+ 'affinity': '',
+ 'user_version': '0',
+ 'other_config': {},
+ }
+ logging.debug('Created VM %s...', instance.name)
+ vm_ref = yield session.call_xenapi('VM.create', rec)
+ logging.debug('Created VM %s as %s.', instance.name, vm_ref)
+ defer.returnValue(vm_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable):
+ """Create a VBD record. Returns a Deferred that gives the new
+ VBD reference."""
+
+ vbd_rec = {}
+ vbd_rec['VM'] = vm_ref
+ vbd_rec['VDI'] = vdi_ref
+ vbd_rec['userdevice'] = str(userdevice)
+ vbd_rec['bootable'] = bootable
+ vbd_rec['mode'] = 'RW'
+ vbd_rec['type'] = 'disk'
+ vbd_rec['unpluggable'] = True
+ vbd_rec['empty'] = False
+ vbd_rec['other_config'] = {}
+ vbd_rec['qos_algorithm_type'] = ''
+ vbd_rec['qos_algorithm_params'] = {}
+ vbd_rec['qos_supported_algorithms'] = []
+ logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
+ vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec)
+ logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
+ vdi_ref)
+ defer.returnValue(vbd_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def create_vif(cls, session, vm_ref, network_ref, mac_address):
+ """Create a VIF record. Returns a Deferred that gives the new
+ VIF reference."""
+
+ vif_rec = {}
+ vif_rec['device'] = '0'
+ vif_rec['network'] = network_ref
+ vif_rec['VM'] = vm_ref
+ vif_rec['MAC'] = mac_address
+ vif_rec['MTU'] = '1500'
+ vif_rec['other_config'] = {}
+ vif_rec['qos_algorithm_type'] = ''
+ vif_rec['qos_algorithm_params'] = {}
+ logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
+ network_ref)
+ vif_ref = yield session.call_xenapi('VIF.create', vif_rec)
+ logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
+ vm_ref, network_ref)
+ defer.returnValue(vif_ref)
+
+ @classmethod
+ @defer.inlineCallbacks
+ def fetch_image(cls, session, image, user, project, use_sr):
+ """use_sr: True to put the image as a VDI in an SR, False to place
+ it on dom0's filesystem. The former is for VM disks, the latter for
+ its kernel and ramdisk (if external kernels are being used).
+ Returns a Deferred that gives the new VDI UUID."""
+
+ url = images.image_url(image)
+ access = AuthManager().get_access_key(user, project)
+ logging.debug("Asking xapi to fetch %s as %s", url, access)
+ fn = use_sr and 'get_vdi' or 'get_kernel'
+ args = {}
+ args['src_url'] = url
+ args['username'] = access
+ args['password'] = user.secret
+ if use_sr:
+ args['add_partition'] = 'true'
+ task = yield session.async_call_plugin('objectstore', fn, args)
+ uuid = yield session.wait_for_task(task)
+ defer.returnValue(uuid)
+
+ @classmethod
+ @utils.deferredToThread
+ def lookup(cls, session, i):
+ """ Look the instance i up, and returns it if available """
+ return VMHelper.lookup_blocking(session, i)
+
+ @classmethod
+ def lookup_blocking(cls, session, i):
+ """ Synchronous lookup """
+ vms = session.get_xenapi().VM.get_by_name_label(i)
+ n = len(vms)
+ if n == 0:
+ return None
+ elif n > 1:
+ raise Exception('duplicate name found: %s' % i)
+ else:
+ return vms[0]
+
+ @classmethod
+ @utils.deferredToThread
+ def lookup_vm_vdis(cls, session, vm):
+ """ Look for the VDIs that are attached to the VM """
+ return VMHelper.lookup_vm_vdis_blocking(session, vm)
+
+ @classmethod
+ def lookup_vm_vdis_blocking(cls, session, vm):
+ """ Synchronous lookup_vm_vdis """
+ # Firstly we get the VBDs, then the VDIs.
+ # TODO(Armando): do we leave the read-only devices?
+ vbds = session.get_xenapi().VM.get_VBDs(vm)
+ vdis = []
+ if vbds:
+ for vbd in vbds:
+ try:
+ vdi = session.get_xenapi().VBD.get_VDI(vbd)
+ # Test valid VDI
+ record = session.get_xenapi().VDI.get_record(vdi)
+ logging.debug('VDI %s is still available', record['uuid'])
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ else:
+ vdis.append(vdi)
+ if len(vdis) > 0:
+ return vdis
+ else:
+ return None
+
+ @classmethod
+ def compile_info(cls, record):
+ return {'state': XENAPI_POWER_STATE[record['power_state']],
+ 'max_mem': long(record['memory_static_max']) >> 10,
+ 'mem': long(record['memory_dynamic_max']) >> 10,
+ 'num_cpu': record['VCPUs_max'],
+ 'cpu_time': 0}
+
+ @classmethod
+ def compile_diagnostics(cls, session, record):
+ """Compile VM diagnostics data"""
+ try:
+ host = session.get_xenapi_host()
+ host_ip = session.get_xenapi().host.get_record(host)["address"]
+ metrics = session.get_xenapi().VM_guest_metrics.get_record(
+ record["guest_metrics"])
+ diags = {
+ "Kernel": metrics["os_version"]["uname"],
+ "Distro": metrics["os_version"]["name"]}
+ xml = get_rrd(host_ip, record["uuid"])
+ if xml:
+ rrd = minidom.parseString(xml)
+ for i, node in enumerate(rrd.firstChild.childNodes):
+ # We don't want all of the extra garbage
+ if i >= 3 and i <= 11:
+ ref = node.childNodes
+ # Name and Value
+ diags[ref[0].firstChild.data] = ref[6].firstChild.data
+ return diags
+ except XenAPI.Failure as e:
+ return {"Unable to retrieve diagnostics": e}
+
+
+def get_rrd(host, uuid):
+ """Return the VM RRD XML as a string"""
+ try:
+ xml = urllib.urlopen("http://%s:%s@%s/vm_rrd?uuid=%s" % (
+ FLAGS.xenapi_connection_username,
+ FLAGS.xenapi_connection_password,
+ host,
+ uuid))
+ return xml.read()
+ except IOError:
+ return None
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
new file mode 100644
index 000000000..9bfd07267
--- /dev/null
+++ b/nova/virt/xenapi/vmops.py
@@ -0,0 +1,146 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for VM-related functions (spawn, reboot, etc).
+"""
+
+import logging
+
+from twisted.internet import defer
+
+from nova import db
+from nova import context
+
+from nova.auth.manager import AuthManager
+from nova.virt.xenapi.network_utils import NetworkHelper
+from nova.virt.xenapi.vm_utils import VMHelper
+
+XenAPI = None
+
+
+class VMOps(object):
+ """
+ Management class for VM-related tasks
+ """
+ def __init__(self, session):
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+ self._session = session
+ # Load XenAPI module in the helper class
+ VMHelper.late_import()
+
+ def list_instances(self):
+ """ List VM instances """
+ return [self._session.get_xenapi().VM.get_name_label(vm) \
+ for vm in self._session.get_xenapi().VM.get_all()]
+
+ @defer.inlineCallbacks
+ def spawn(self, instance):
+ """ Create VM instance """
+ vm = yield VMHelper.lookup(self._session, instance.name)
+ if vm is not None:
+ raise Exception('Attempted to create non-unique name %s' %
+ instance.name)
+
+ bridge = db.project_get_network(context.get_admin_context(),
+ instance.project_id).bridge
+ network_ref = \
+ yield NetworkHelper.find_network_with_bridge(self._session, bridge)
+
+ user = AuthManager().get_user(instance.user_id)
+ project = AuthManager().get_project(instance.project_id)
+ vdi_uuid = yield VMHelper.fetch_image(self._session,
+ instance.image_id, user, project, True)
+ kernel = yield VMHelper.fetch_image(self._session,
+ instance.kernel_id, user, project, False)
+ ramdisk = yield VMHelper.fetch_image(self._session,
+ instance.ramdisk_id, user, project, False)
+ vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
+ vm_ref = yield VMHelper.create_vm(self._session,
+ instance, kernel, ramdisk)
+ yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
+ if network_ref:
+ yield VMHelper.create_vif(self._session, vm_ref,
+ network_ref, instance.mac_address)
+ logging.debug('Starting VM %s...', vm_ref)
+ yield self._session.call_xenapi('VM.start', vm_ref, False, False)
+ logging.info('Spawning VM %s created %s.', instance.name,
+ vm_ref)
+
+ @defer.inlineCallbacks
+ def reboot(self, instance):
+ """ Reboot VM instance """
+ instance_name = instance.name
+ vm = yield VMHelper.lookup(self._session, instance_name)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_name)
+ task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm)
+ yield self._session.wait_for_task(task)
+
+ @defer.inlineCallbacks
+ def destroy(self, instance):
+ """ Destroy VM instance """
+ vm = yield VMHelper.lookup(self._session, instance.name)
+ if vm is None:
+ # Don't complain, just return. This lets us clean up instances
+ # that have already disappeared from the underlying platform.
+ defer.returnValue(None)
+ # Get the VDIs related to the VM
+ vdis = yield VMHelper.lookup_vm_vdis(self._session, vm)
+ try:
+ task = yield self._session.call_xenapi('Async.VM.hard_shutdown',
+ vm)
+ yield self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ # Disk clean-up
+ if vdis:
+ for vdi in vdis:
+ try:
+ task = yield self._session.call_xenapi('Async.VDI.destroy',
+ vdi)
+ yield self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ try:
+ task = yield self._session.call_xenapi('Async.VM.destroy', vm)
+ yield self._session.wait_for_task(task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+
+ def get_info(self, instance_id):
+ """ Return data about VM instance """
+ vm = VMHelper.lookup_blocking(self._session, instance_id)
+ if vm is None:
+ raise Exception('instance not present %s' % instance_id)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ return VMHelper.compile_info(rec)
+
+ @defer.inlineCallbacks
+ def get_diagnostics(self, instance_id):
+ """Return data about VM diagnostics"""
+ vm = yield VMHelper.lookup(self._session, instance_id)
+ if vm is None:
+ raise Exception("instance not present %s" % instance_id)
+ rec = yield self._session.get_xenapi().VM.get_record(vm)
+ defer.returnValue(VMHelper.compile_diagnostics(self._session, rec))
+
+ def get_console_output(self, instance):
+ """ Return snapshot of console """
+ # TODO: implement this to fix pylint!
+ return 'FAKE CONSOLE OUTPUT of instance'
diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py
new file mode 100644
index 000000000..a4c7a3861
--- /dev/null
+++ b/nova/virt/xenapi/volumeops.py
@@ -0,0 +1,32 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for Storage-related functions (attach, detach, etc).
+"""
+
+
+class VolumeOps(object):
+ def __init__(self, session):
+ self._session = session
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ # FIXME: that's going to be sorted when iscsi-xenapi lands in branch
+ return True
+
+ def detach_volume(self, instance_name, mountpoint):
+ # FIXME: that's going to be sorted when iscsi-xenapi lands in branch
+ return True
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
new file mode 100644
index 000000000..2153810c8
--- /dev/null
+++ b/nova/virt/xenapi_conn.py
@@ -0,0 +1,242 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A connection to XenServer or Xen Cloud Platform.
+
+The concurrency model for this class is as follows:
+
+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
+deferredToThread). They are remote calls, and so may hang for the usual
+reasons. They should not be allowed to block the reactor thread.
+
+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion. Polling is handled using reactor.callLater.
+
+This combination of techniques means that we don't block the reactor thread at
+all, and at the same time we don't hold lots of threads waiting for
+long-running operations.
+
+FIXME: get_info currently doesn't conform to these rules, and will block the
+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
+
+**Related Flags**
+
+:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
+:xenapi_connection_username: Username for connection to XenServer/Xen Cloud
+ Platform (default: root).
+:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
+ Platform.
+:xenapi_task_poll_interval: The interval (seconds) used for polling of
+ remote tasks (Async.VM.start, etc)
+ (default: 0.5).
+
+"""
+
+import logging
+import xmlrpclib
+
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from nova import utils
+from nova import flags
+from nova.virt.xenapi.vmops import VMOps
+from nova.virt.xenapi.volumeops import VolumeOps
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('xenapi_connection_url',
+ None,
+ 'URL for connection to XenServer/Xen Cloud Platform.'
+ ' Required if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_username',
+ 'root',
+ 'Username for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_string('xenapi_connection_password',
+ None,
+ 'Password for connection to XenServer/Xen Cloud Platform.'
+ ' Used only if connection_type=xenapi.')
+flags.DEFINE_float('xenapi_task_poll_interval',
+ 0.5,
+ 'The interval used for polling of remote tasks '
+ '(Async.VM.start, etc). Used only if '
+ 'connection_type=xenapi.')
+
+XenAPI = None
+
+
+def get_connection(_):
+ """Note that XenAPI doesn't have a read-only connection mode, so
+ the read_only parameter is ignored."""
+ # This is loaded late so that there's no need to install this
+ # library when not using XenAPI.
+ global XenAPI
+ if XenAPI is None:
+ XenAPI = __import__('XenAPI')
+ url = FLAGS.xenapi_connection_url
+ username = FLAGS.xenapi_connection_username
+ password = FLAGS.xenapi_connection_password
+ if not url or password is None:
+ raise Exception('Must specify xenapi_connection_url, '
+ 'xenapi_connection_username (optionally), and '
+ 'xenapi_connection_password to use '
+ 'connection_type=xenapi')
+ return XenAPIConnection(url, username, password)
+
+
+class XenAPIConnection(object):
+ """ A connection to XenServer or Xen Cloud Platform """
+ def __init__(self, url, user, pw):
+ session = XenAPISession(url, user, pw)
+ self._vmops = VMOps(session)
+ self._volumeops = VolumeOps(session)
+
+ def list_instances(self):
+ """ List VM instances """
+ return self._vmops.list_instances()
+
+ def spawn(self, instance):
+ """ Create VM instance """
+ self._vmops.spawn(instance)
+
+ def reboot(self, instance):
+ """ Reboot VM instance """
+ self._vmops.reboot(instance)
+
+ def destroy(self, instance):
+ """ Destroy VM instance """
+ self._vmops.destroy(instance)
+
+ def get_info(self, instance_id):
+ """ Return data about VM instance """
+ return self._vmops.get_info(instance_id)
+
+ def get_diagnostics(self, instance_id):
+ """Return data about VM diagnostics"""
+ return self._vmops.get_diagnostics(instance_id)
+
+ def get_console_output(self, instance):
+ """ Return snapshot of console """
+ return self._vmops.get_console_output(instance)
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ """ Attach volume storage to VM instance """
+ return self._volumeops.attach_volume(instance_name,
+ device_path,
+ mountpoint)
+
+ def detach_volume(self, instance_name, mountpoint):
+ """ Detach volume storage to VM instance """
+ return self._volumeops.detach_volume(instance_name, mountpoint)
+
+
+class XenAPISession(object):
+ """ The session to invoke XenAPI SDK calls """
+ def __init__(self, url, user, pw):
+ self._session = XenAPI.Session(url)
+ self._session.login_with_password(user, pw)
+
+ def get_xenapi(self):
+ """ Return the xenapi object """
+ return self._session.xenapi
+
+ def get_xenapi_host(self):
+ """ Return the xenapi host """
+ return self._session.xenapi.session.get_this_host(self._session.handle)
+
+ @utils.deferredToThread
+ def call_xenapi(self, method, *args):
+ """Call the specified XenAPI method on a background thread. Returns
+ a Deferred for the result."""
+ f = self._session.xenapi
+ for m in method.split('.'):
+ f = f.__getattr__(m)
+ return f(*args)
+
+ @utils.deferredToThread
+ def async_call_plugin(self, plugin, fn, args):
+ """Call Async.host.call_plugin on a background thread. Returns a
+ Deferred with the task reference."""
+ return _unwrap_plugin_exceptions(
+ self._session.xenapi.Async.host.call_plugin,
+ self.get_xenapi_host(), plugin, fn, args)
+
+ def wait_for_task(self, task):
+ """Return a Deferred that will give the result of the given task.
+ The task is polled until it completes."""
+ d = defer.Deferred()
+ reactor.callLater(0, self._poll_task, task, d)
+ return d
+
+ @utils.deferredToThread
+ def _poll_task(self, task, deferred):
+ """Poll the given XenAPI task, and fire the given Deferred if we
+ get a result."""
+ try:
+ #logging.debug('Polling task %s...', task)
+ status = self._session.xenapi.task.get_status(task)
+ if status == 'pending':
+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
+ self._poll_task, task, deferred)
+ elif status == 'success':
+ result = self._session.xenapi.task.get_result(task)
+ logging.info('Task %s status: success. %s', task, result)
+ deferred.callback(_parse_xmlrpc_value(result))
+ else:
+ error_info = self._session.xenapi.task.get_error_info(task)
+ logging.warn('Task %s status: %s. %s', task, status,
+ error_info)
+ deferred.errback(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
+ except XenAPI.Failure, exc:
+ logging.warn(exc)
+ deferred.errback(exc)
+
+
+def _unwrap_plugin_exceptions(func, *args, **kwargs):
+ """ Parse exception details """
+ try:
+ return func(*args, **kwargs)
+ except XenAPI.Failure, exc:
+ logging.debug("Got exception: %s", exc)
+ if (len(exc.details) == 4 and
+ exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
+ exc.details[2] == 'Failure'):
+ params = None
+ try:
+ params = eval(exc.details[3])
+ except:
+ raise exc
+ raise XenAPI.Failure(params)
+ else:
+ raise
+ except xmlrpclib.ProtocolError, exc:
+ logging.debug("Got exception: %s", exc)
+ raise
+
+
+def _parse_xmlrpc_value(val):
+ """Parse the given value as if it were an XML-RPC value. This is
+ sometimes used as the format for the task.result field."""
+ if not val:
+ return val
+ x = xmlrpclib.loads(
+ '<?xml version="1.0"?><methodResponse><params><param>' +
+ val +
+ '</param></params></methodResponse>')
+ return x[0][0]