summaryrefslogtreecommitdiffstats
path: root/nova
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@gmail.com>2010-08-25 16:22:49 -0700
committerVishvananda Ishaya <vishvananda@gmail.com>2010-08-25 16:22:49 -0700
commitaf959df4076d41d950baa27905d15fbda017e0cb (patch)
tree10fe2897471a00fb424c8e25fad3c86326b27019 /nova
parent70112ea9941b92aa98e32c0c37f0208877953557 (diff)
parent686ad09fe4416bb578661a42f6f083528d4a7ca9 (diff)
merged trunk and fixed tests
Diffstat (limited to 'nova')
-rw-r--r--nova/api/rackspace/images.py101
-rw-r--r--nova/api/rackspace/notes.txt23
-rw-r--r--nova/flags.py1
-rw-r--r--nova/image/service.py90
-rw-r--r--nova/network/model.py5
-rw-r--r--nova/network/service.py7
-rw-r--r--nova/server.py6
-rw-r--r--nova/tests/network_unittest.py33
-rw-r--r--nova/twistd.py16
-rw-r--r--nova/utils.py8
-rw-r--r--nova/virt/xenapi.py167
-rw-r--r--nova/wsgi.py19
12 files changed, 426 insertions, 50 deletions
diff --git a/nova/api/rackspace/images.py b/nova/api/rackspace/images.py
index 986f11434..370980fe9 100644
--- a/nova/api/rackspace/images.py
+++ b/nova/api/rackspace/images.py
@@ -15,4 +15,103 @@
# License for the specific language governing permissions and limitations
# under the License.
-class Controller(object): pass
+from nova import datastore
+from nova import image
+from nova.api.rackspace import base
+from webob import exc
+
+class Controller(base.Controller):
+
+ _serialization_metadata = {
+ 'application/xml': {
+ "attributes": {
+ "image": [ "id", "name", "updated", "created", "status",
+ "serverId", "progress" ]
+ }
+ }
+ }
+
+ def __init__(self):
+ self._service = image.service.ImageService.load()
+ self._id_translator = RackspaceAPIImageIdTranslator()
+
+ def _to_rs_id(self, image_id):
+ """
+ Convert an image id from the format of our ImageService strategy
+ to the Rackspace API format (an int).
+ """
+ strategy = self._service.__class__.__name__
+ return self._id_translator.to_rs_id(strategy, image_id)
+
+ def _from_rs_id(self, rs_image_id):
+ """
+ Convert an image id from the Rackspace API format (an int) to the
+ format of our ImageService strategy.
+ """
+ strategy = self._service.__class__.__name__
+ return self._id_translator.from_rs_id(strategy, rs_image_id)
+
+ def index(self, req):
+ """Return all public images."""
+ data = self._service.index()
+ for img in data:
+ img['id'] = self._to_rs_id(img['id'])
+ return dict(images=data)
+
+ def show(self, req, id):
+ """Return data about the given image id."""
+ opaque_id = self._from_rs_id(id)
+ img = self._service.show(opaque_id)
+ img['id'] = id
+ return dict(image=img)
+
+ def delete(self, req, id):
+ # Only public images are supported for now.
+ raise exc.HTTPNotFound()
+
+ def create(self, req):
+ # Only public images are supported for now, so a request to
+ # make a backup of a server cannot be supproted.
+ raise exc.HTTPNotFound()
+
+ def update(self, req, id):
+ # Users may not modify public images, and that's all that
+ # we support for now.
+ raise exc.HTTPNotFound()
+
+
+class RackspaceAPIImageIdTranslator(object):
+ """
+ Converts Rackspace API image ids to and from the id format for a given
+ strategy.
+ """
+
+ def __init__(self):
+ self._store = datastore.Redis.instance()
+ self._key_template = "rsapi.idstrategies.image.%s.%s"
+
+ def to_rs_id(self, strategy_name, opaque_id):
+ """Convert an id from a strategy-specific one to a Rackspace one."""
+ key = self._key_template % (strategy_name, "fwd")
+ result = self._store.hget(key, str(opaque_id))
+ if result: # we have a mapping from opaque to RS for this strategy
+ return int(result)
+ else:
+ # Store the mapping.
+ nextid = self._store.incr("%s.lastid" % key)
+ if self._store.hsetnx(key, str(opaque_id), nextid):
+ # If someone else didn't beat us to it, store the reverse
+ # mapping as well.
+ key = self._key_template % (strategy_name, "rev")
+ self._store.hset(key, nextid, str(opaque_id))
+ return nextid
+ else:
+ # Someone beat us to it; use their number instead, and
+ # discard nextid (which is OK -- we don't require that
+ # every int id be used.)
+ return int(self._store.hget(key, str(opaque_id)))
+
+ def from_rs_id(self, strategy_name, rs_id):
+ """Convert a Rackspace id to a strategy-specific one."""
+ key = self._key_template % (strategy_name, "rev")
+ return self._store.hget(key, rs_id)
diff --git a/nova/api/rackspace/notes.txt b/nova/api/rackspace/notes.txt
new file mode 100644
index 000000000..e133bf5ea
--- /dev/null
+++ b/nova/api/rackspace/notes.txt
@@ -0,0 +1,23 @@
+We will need:
+
+ImageService
+a service that can do crud on image information. not user-specific. opaque
+image ids.
+
+GlanceImageService(ImageService):
+image ids are URIs.
+
+LocalImageService(ImageService):
+image ids are random strings.
+
+RackspaceAPITranslationStore:
+translates RS server/images/flavor/etc ids into formats required
+by a given ImageService strategy.
+
+api.rackspace.images.Controller:
+uses an ImageService strategy behind the scenes to do its fetching; it just
+converts int image id into a strategy-specific image id.
+
+who maintains the mapping from user to [images he owns]? nobody, because
+we have no way of enforcing access to his images, without kryptex which
+won't be in Austin.
diff --git a/nova/flags.py b/nova/flags.py
index e3feb252d..6f9f906dd 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -141,6 +141,7 @@ def _wrapper(func):
return _wrapped
+DEFINE = _wrapper(gflags.DEFINE)
DEFINE_string = _wrapper(gflags.DEFINE_string)
DEFINE_integer = _wrapper(gflags.DEFINE_integer)
DEFINE_bool = _wrapper(gflags.DEFINE_bool)
diff --git a/nova/image/service.py b/nova/image/service.py
new file mode 100644
index 000000000..1a7a258b7
--- /dev/null
+++ b/nova/image/service.py
@@ -0,0 +1,90 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import cPickle as pickle
+import os.path
+import random
+import string
+
+class ImageService(object):
+ """Provides storage and retrieval of disk image objects."""
+
+ @staticmethod
+ def load():
+ """Factory method to return image service."""
+ #TODO(gundlach): read from config.
+ class_ = LocalImageService
+ return class_()
+
+ def index(self):
+ """
+ Return a dict from opaque image id to image data.
+ """
+
+ def show(self, id):
+ """
+ Returns a dict containing image data for the given opaque image id.
+ """
+
+
+class GlanceImageService(ImageService):
+ """Provides storage and retrieval of disk image objects within Glance."""
+ # TODO(gundlach): once Glance has an API, build this.
+ pass
+
+
+class LocalImageService(ImageService):
+ """Image service storing images to local disk."""
+
+ def __init__(self):
+ self._path = "/tmp/nova/images"
+ try:
+ os.makedirs(self._path)
+ except OSError: # exists
+ pass
+
+ def _path_to(self, image_id=''):
+ return os.path.join(self._path, image_id)
+
+ def _ids(self):
+ """The list of all image ids."""
+ return os.listdir(self._path)
+
+ def index(self):
+ return [ self.show(id) for id in self._ids() ]
+
+ def show(self, id):
+ return pickle.load(open(self._path_to(id)))
+
+ def create(self, data):
+ """
+ Store the image data and return the new image id.
+ """
+ id = ''.join(random.choice(string.letters) for _ in range(20))
+ data['id'] = id
+ self.update(id, data)
+ return id
+
+ def update(self, image_id, data):
+ """Replace the contents of the given image with the new data."""
+ pickle.dump(data, open(self._path_to(image_id), 'w'))
+
+ def delete(self, image_id):
+ """
+ Delete the given image. Raises OSError if the image does not exist.
+ """
+ os.unlink(self._path_to(image_id))
diff --git a/nova/network/model.py b/nova/network/model.py
index 6c12836b7..557fc92a6 100644
--- a/nova/network/model.py
+++ b/nova/network/model.py
@@ -290,6 +290,11 @@ class BaseNetwork(datastore.BasicModel):
return 'available:%s' % self.identifier
@property
+ def num_available_ips(self):
+ redis = datastore.Redis.instance()
+ return redis.scard(self._available_key)
+
+ @property
def assigned(self):
"""Returns a list of all assigned addresses"""
return self.address_class.associated_keys('network', self.identifier)
diff --git a/nova/network/service.py b/nova/network/service.py
index d3aa1c46f..3dba0a9ef 100644
--- a/nova/network/service.py
+++ b/nova/network/service.py
@@ -26,7 +26,7 @@ from nova import flags
from nova import service
from nova import utils
from nova.auth import manager
-from nova.network import exception
+from nova.network import exception as network_exception
from nova.network import model
from nova.network import vpn
@@ -64,8 +64,7 @@ def type_to_class(network_type):
def setup_compute_network(network_type, user_id, project_id, security_group):
"""Sets up the network on a compute host"""
srv = type_to_class(network_type)
- srv.setup_compute_network(network_type,
- user_id,
+ srv.setup_compute_network(user_id,
project_id,
security_group)
@@ -170,7 +169,7 @@ class FlatNetworkService(BaseNetworkService):
redis.sadd('ips', fixed_ip)
fixed_ip = redis.spop('ips')
if not fixed_ip:
- raise exception.NoMoreAddresses()
+ raise network_exception.NoMoreAddresses()
# TODO(vish): some sort of dns handling for hostname should
# probably be done here.
return {'inject_network': True,
diff --git a/nova/server.py b/nova/server.py
index 96550f078..c6b60e090 100644
--- a/nova/server.py
+++ b/nova/server.py
@@ -44,6 +44,8 @@ flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing')
flags.DEFINE_string('logfile', None, 'log file to output to')
flags.DEFINE_string('pidfile', None, 'pid file to output to')
flags.DEFINE_string('working_directory', './', 'working directory...')
+flags.DEFINE_integer('uid', os.getuid(), 'uid under which to run')
+flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run')
def stop(pidfile):
@@ -135,6 +137,8 @@ def daemonize(args, name, main):
threaded=False),
stdin=stdin,
stdout=stdout,
- stderr=stderr
+ stderr=stderr,
+ uid=FLAGS.uid,
+ gid=FLAGS.gid
):
main(args)
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index 34b68f1ed..00b0b97e7 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -184,13 +184,23 @@ class NetworkTestCase(test.TrialTestCase):
def test_ips_are_reused(self):
"""Makes sure that ip addresses that are deallocated get reused"""
- result = self.service.allocate_fixed_ip(
- self.user.id, self.projects[0].id)
- mac = result['mac_address']
- address = result['private_dns_name']
+ net = model.get_project_network(self.projects[0].id, "default")
hostname = "reuse-host"
- net = model.get_project_network(self.projects[0].id, "default")
+ macs = {}
+ addresses = {}
+ num_available_ips = net.num_available_ips
+ for i in range(num_available_ips - 1):
+ result = self.service.allocate_fixed_ip(self.user.id,
+ self.projects[0].id)
+ macs[i] = result['mac_address']
+ addresses[i] = result['private_dns_name']
+ issue_ip(macs[i], addresses[i], hostname, net.bridge_name)
+
+ result = self.service.allocate_fixed_ip(self.user.id,
+ self.projects[0].id)
+ mac = result['mac_address']
+ address = result['private_dns_name']
issue_ip(mac, address, hostname, net.bridge_name)
self.service.deallocate_fixed_ip(address)
@@ -205,6 +215,10 @@ class NetworkTestCase(test.TrialTestCase):
self.service.deallocate_fixed_ip(secondaddress)
release_ip(secondmac, secondaddress, hostname, net.bridge_name)
+ for i in range(len(addresses)):
+ self.service.deallocate_fixed_ip(addresses[i])
+ release_ip(macs[i], addresses[i], hostname, net.bridge_name)
+
def test_available_ips(self):
"""Make sure the number of available ips for the network is correct
@@ -222,7 +236,7 @@ class NetworkTestCase(test.TrialTestCase):
num_available_ips = net_size - (net.num_bottom_reserved_ips +
num_preallocated_ips +
net.num_top_reserved_ips)
- self.assertEqual(num_available_ips, len(list(net.available)))
+ self.assertEqual(num_available_ips, net.num_available_ips)
def test_too_many_addresses(self):
"""Test for a NoMoreAddresses exception when all fixed ips are used.
@@ -232,8 +246,7 @@ class NetworkTestCase(test.TrialTestCase):
hostname = "toomany-hosts"
macs = {}
addresses = {}
- # Number of availaible ips is len of the available list
- num_available_ips = len(list(net.available))
+ num_available_ips = net.num_available_ips
for i in range(num_available_ips):
result = self.service.allocate_fixed_ip(self.user.id,
self.projects[0].id)
@@ -241,14 +254,14 @@ class NetworkTestCase(test.TrialTestCase):
addresses[i] = result['private_dns_name']
issue_ip(macs[i], addresses[i], hostname, net.bridge_name)
- self.assertEqual(len(list(net.available)), 0)
+ self.assertEqual(net.num_available_ips, 0)
self.assertRaises(NoMoreAddresses, self.service.allocate_fixed_ip,
self.user.id, self.projects[0].id)
for i in range(len(addresses)):
self.service.deallocate_fixed_ip(addresses[i])
release_ip(macs[i], addresses[i], hostname, net.bridge_name)
- self.assertEqual(len(list(net.available)), num_available_ips)
+ self.assertEqual(net.num_available_ips, num_available_ips)
def is_in_project(address, project_id):
diff --git a/nova/twistd.py b/nova/twistd.py
index 8de322aa5..9511c231c 100644
--- a/nova/twistd.py
+++ b/nova/twistd.py
@@ -21,6 +21,7 @@ Twisted daemon helpers, specifically to parse out gFlags from twisted flags,
manage pid files and support syslogging.
"""
+import gflags
import logging
import os
import signal
@@ -49,6 +50,14 @@ class TwistdServerOptions(ServerOptions):
return
+class FlagParser(object):
+ def __init__(self, parser):
+ self.parser = parser
+
+ def Parse(self, s):
+ return self.parser(s)
+
+
def WrapTwistedOptions(wrapped):
class TwistedOptionsToFlags(wrapped):
subCommands = None
@@ -79,7 +88,12 @@ def WrapTwistedOptions(wrapped):
reflect.accumulateClassList(self.__class__, 'optParameters', twistd_params)
for param in twistd_params:
key = param[0].replace('-', '_')
- flags.DEFINE_string(key, param[2], str(param[-1]))
+ if len(param) > 4:
+ flags.DEFINE(FlagParser(param[4]),
+ key, param[2], str(param[3]),
+ serializer=gflags.ArgumentSerializer())
+ else:
+ flags.DEFINE_string(key, param[2], str(param[3]))
def _absorbHandlers(self):
twistd_handlers = {}
diff --git a/nova/utils.py b/nova/utils.py
index dc3c626ec..ef8405fc0 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -29,6 +29,8 @@ import subprocess
import socket
import sys
+from twisted.internet.threads import deferToThread
+
from nova import exception
from nova import flags
@@ -148,3 +150,9 @@ def isotime(at=None):
def parse_isotime(timestr):
return datetime.datetime.strptime(timestr, TIME_FORMAT)
+
+
+def deferredToThread(f):
+ def g(*args, **kwargs):
+ return deferToThread(f, *args, **kwargs)
+ return g
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
index 2f5994983..b44ac383a 100644
--- a/nova/virt/xenapi.py
+++ b/nova/virt/xenapi.py
@@ -16,17 +16,35 @@
"""
A connection to XenServer or Xen Cloud Platform.
+
+The concurrency model for this class is as follows:
+
+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
+deferredToThread). They are remote calls, and so may hang for the usual
+reasons. They should not be allowed to block the reactor thread.
+
+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion. Polling is handled using reactor.callLater.
+
+This combination of techniques means that we don't block the reactor thread at
+all, and at the same time we don't hold lots of threads waiting for
+long-running operations.
+
+FIXME: get_info currently doesn't conform to these rules, and will block the
+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import logging
import xmlrpclib
from twisted.internet import defer
+from twisted.internet import reactor
from twisted.internet import task
-from nova import exception
from nova import flags
from nova import process
+from nova import utils
from nova.auth.manager import AuthManager
from nova.compute import power_state
from nova.virt import images
@@ -47,6 +65,11 @@ flags.DEFINE_string('xenapi_connection_password',
None,
'Password for connection to XenServer/Xen Cloud Platform.'
' Used only if connection_type=xenapi.')
+flags.DEFINE_float('xenapi_task_poll_interval',
+ 0.5,
+ 'The interval used for polling of remote tasks '
+ '(Async.VM.start, etc). Used only if '
+ 'connection_type=xenapi.')
XENAPI_POWER_STATE = {
@@ -84,9 +107,8 @@ class XenAPIConnection(object):
for vm in self._conn.xenapi.VM.get_all()]
@defer.inlineCallbacks
- @exception.wrap_exception
def spawn(self, instance):
- vm = yield self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
@@ -105,21 +127,27 @@ class XenAPIConnection(object):
user = AuthManager().get_user(instance.datamodel['user_id'])
project = AuthManager().get_project(instance.datamodel['project_id'])
- vdi_uuid = yield self.fetch_image(
+ vdi_uuid = yield self._fetch_image(
instance.datamodel['image_id'], user, project, True)
- kernel = yield self.fetch_image(
+ kernel = yield self._fetch_image(
instance.datamodel['kernel_id'], user, project, False)
- ramdisk = yield self.fetch_image(
+ ramdisk = yield self._fetch_image(
instance.datamodel['ramdisk_id'], user, project, False)
- vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)
+ vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
- vm_ref = yield self.create_vm(instance, kernel, ramdisk)
- yield self.create_vbd(vm_ref, vdi_ref, 0, True)
+ vm_ref = yield self._create_vm(instance, kernel, ramdisk)
+ yield self._create_vbd(vm_ref, vdi_ref, 0, True)
if network_ref:
yield self._create_vif(vm_ref, network_ref, mac_address)
- yield self._conn.xenapi.VM.start(vm_ref, False, False)
+ logging.debug('Starting VM %s...', vm_ref)
+ yield self._call_xenapi('VM.start', vm_ref, False, False)
+ logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
- def create_vm(self, instance, kernel, ramdisk):
+ @defer.inlineCallbacks
+ def _create_vm(self, instance, kernel, ramdisk):
+ """Create a VM record. Returns a Deferred that gives the new
+ VM reference."""
+
mem = str(long(instance.datamodel['memory_kb']) * 1024)
vcpus = str(instance.datamodel['vcpus'])
rec = {
@@ -152,11 +180,15 @@ class XenAPIConnection(object):
'other_config': {},
}
logging.debug('Created VM %s...', instance.name)
- vm_ref = self._conn.xenapi.VM.create(rec)
+ vm_ref = yield self._call_xenapi('VM.create', rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- return vm_ref
+ defer.returnValue(vm_ref)
- def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
+ @defer.inlineCallbacks
+ def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
+ """Create a VBD record. Returns a Deferred that gives the new
+ VBD reference."""
+
vbd_rec = {}
vbd_rec['VM'] = vm_ref
vbd_rec['VDI'] = vdi_ref
@@ -171,12 +203,16 @@ class XenAPIConnection(object):
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)
+ vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
- return vbd_ref
+ defer.returnValue(vbd_ref)
+ @defer.inlineCallbacks
def _create_vif(self, vm_ref, network_ref, mac_address):
+ """Create a VIF record. Returns a Deferred that gives the new
+ VIF reference."""
+
vif_rec = {}
vif_rec['device'] = '0'
vif_rec['network']= network_ref
@@ -188,25 +224,29 @@ class XenAPIConnection(object):
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
- vif_ref = self._conn.xenapi.VIF.create(vif_rec)
+ vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
- return vif_ref
+ defer.returnValue(vif_ref)
+ @defer.inlineCallbacks
def _find_network_with_bridge(self, bridge):
expr = 'field "bridge" = "%s"' % bridge
- networks = self._conn.xenapi.network.get_all_records_where(expr)
+ networks = yield self._call_xenapi('network.get_all_records_where',
+ expr)
if len(networks) == 1:
- return networks.keys()[0]
+ defer.returnValue(networks.keys()[0])
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:
raise Exception('Found no network for bridge %s' % bridge)
- def fetch_image(self, image, user, project, use_sr):
+ @defer.inlineCallbacks
+ def _fetch_image(self, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
- its kernel and ramdisk (if external kernels are being used)."""
+ its kernel and ramdisk (if external kernels are being used).
+ Returns a Deferred that gives the new VDI UUID."""
url = images.image_url(image)
access = AuthManager().get_access_key(user, project)
@@ -218,22 +258,28 @@ class XenAPIConnection(object):
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
- return self._call_plugin('objectstore', fn, args)
+ task = yield self._async_call_plugin('objectstore', fn, args)
+ uuid = yield self._wait_for_task(task)
+ defer.returnValue(uuid)
+ @defer.inlineCallbacks
def reboot(self, instance):
- vm = self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
- yield self._conn.xenapi.VM.clean_reboot(vm)
+ task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
+ yield self._wait_for_task(task)
+ @defer.inlineCallbacks
def destroy(self, instance):
- vm = self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
- yield self._conn.xenapi.VM.destroy(vm)
+ task = yield self._call_xenapi('Async.VM.destroy', vm)
+ yield self._wait_for_task(task)
def get_info(self, instance_id):
- vm = self.lookup(instance_id)
+ vm = self._lookup_blocking(instance_id)
if vm is None:
raise Exception('instance not present %s' % instance_id)
rec = self._conn.xenapi.VM.get_record(vm)
@@ -243,7 +289,11 @@ class XenAPIConnection(object):
'num_cpu': rec['VCPUs_max'],
'cpu_time': 0}
- def lookup(self, i):
+ @utils.deferredToThread
+ def _lookup(self, i):
+ return self._lookup_blocking(i)
+
+ def _lookup_blocking(self, i):
vms = self._conn.xenapi.VM.get_by_name_label(i)
n = len(vms)
if n == 0:
@@ -253,9 +303,52 @@ class XenAPIConnection(object):
else:
return vms[0]
- def _call_plugin(self, plugin, fn, args):
+ def _wait_for_task(self, task):
+ """Return a Deferred that will give the result of the given task.
+ The task is polled until it completes."""
+ d = defer.Deferred()
+ reactor.callLater(0, self._poll_task, task, d)
+ return d
+
+ @utils.deferredToThread
+ def _poll_task(self, task, deferred):
+ """Poll the given XenAPI task, and fire the given Deferred if we
+ get a result."""
+ try:
+ #logging.debug('Polling task %s...', task)
+ status = self._conn.xenapi.task.get_status(task)
+ if status == 'pending':
+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
+ self._poll_task, task, deferred)
+ elif status == 'success':
+ result = self._conn.xenapi.task.get_result(task)
+ logging.info('Task %s status: success. %s', task, result)
+ deferred.callback(_parse_xmlrpc_value(result))
+ else:
+ error_info = self._conn.xenapi.task.get_error_info(task)
+ logging.warn('Task %s status: %s. %s', task, status,
+ error_info)
+ deferred.errback(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
+ except Exception, exn:
+ logging.warn(exn)
+ deferred.errback(exn)
+
+ @utils.deferredToThread
+ def _call_xenapi(self, method, *args):
+ """Call the specified XenAPI method on a background thread. Returns
+ a Deferred for the result."""
+ f = self._conn.xenapi
+ for m in method.split('.'):
+ f = f.__getattr__(m)
+ return f(*args)
+
+ @utils.deferredToThread
+ def _async_call_plugin(self, plugin, fn, args):
+ """Call Async.host.call_plugin on a background thread. Returns a
+ Deferred with the task reference."""
return _unwrap_plugin_exceptions(
- self._conn.xenapi.host.call_plugin,
+ self._conn.xenapi.Async.host.call_plugin,
self._get_xenapi_host(), plugin, fn, args)
def _get_xenapi_host(self):
@@ -281,3 +374,15 @@ def _unwrap_plugin_exceptions(func, *args, **kwargs):
except xmlrpclib.ProtocolError, exn:
logging.debug("Got exception: %s", exn)
raise
+
+
+def _parse_xmlrpc_value(val):
+ """Parse the given value as if it were an XML-RPC value. This is
+ sometimes used as the format for the task.result field."""
+ if not val:
+ return val
+ x = xmlrpclib.loads(
+ '<?xml version="1.0"?><methodResponse><params><param>' +
+ val +
+ '</param></params></methodResponse>')
+ return x[0][0]
diff --git a/nova/wsgi.py b/nova/wsgi.py
index baf6cccd9..bec0a7b1c 100644
--- a/nova/wsgi.py
+++ b/nova/wsgi.py
@@ -196,7 +196,8 @@ class Controller(object):
WSGI app that reads routing information supplied by RoutesMiddleware
and calls the requested action method upon itself. All action methods
must, in addition to their normal parameters, accept a 'req' argument
- which is the incoming webob.Request.
+ which is the incoming webob.Request. They raise a webob.exc exception,
+ or return a dict which will be serialized by requested content type.
"""
@webob.dec.wsgify
@@ -210,7 +211,21 @@ class Controller(object):
del arg_dict['controller']
del arg_dict['action']
arg_dict['req'] = req
- return method(**arg_dict)
+ result = method(**arg_dict)
+ if type(result) is dict:
+ return self._serialize(result, req)
+ else:
+ return result
+
+ def _serialize(self, data, request):
+ """
+ Serialize the given dict to the response type requested in request.
+ Uses self._serialization_metadata if it exists, which is a dict mapping
+ MIME types to information needed to serialize to that type.
+ """
+ _metadata = getattr(type(self), "_serialization_metadata", {})
+ serializer = Serializer(request.environ, _metadata)
+ return serializer.to_content_type(data)
class Serializer(object):