diff options
| author | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-08-25 16:22:49 -0700 |
|---|---|---|
| committer | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-08-25 16:22:49 -0700 |
| commit | af959df4076d41d950baa27905d15fbda017e0cb (patch) | |
| tree | 10fe2897471a00fb424c8e25fad3c86326b27019 /nova | |
| parent | 70112ea9941b92aa98e32c0c37f0208877953557 (diff) | |
| parent | 686ad09fe4416bb578661a42f6f083528d4a7ca9 (diff) | |
merged trunk and fixed tests
Diffstat (limited to 'nova')
| -rw-r--r-- | nova/api/rackspace/images.py | 101 | ||||
| -rw-r--r-- | nova/api/rackspace/notes.txt | 23 | ||||
| -rw-r--r-- | nova/flags.py | 1 | ||||
| -rw-r--r-- | nova/image/service.py | 90 | ||||
| -rw-r--r-- | nova/network/model.py | 5 | ||||
| -rw-r--r-- | nova/network/service.py | 7 | ||||
| -rw-r--r-- | nova/server.py | 6 | ||||
| -rw-r--r-- | nova/tests/network_unittest.py | 33 | ||||
| -rw-r--r-- | nova/twistd.py | 16 | ||||
| -rw-r--r-- | nova/utils.py | 8 | ||||
| -rw-r--r-- | nova/virt/xenapi.py | 167 | ||||
| -rw-r--r-- | nova/wsgi.py | 19 |
12 files changed, 426 insertions, 50 deletions
diff --git a/nova/api/rackspace/images.py b/nova/api/rackspace/images.py index 986f11434..370980fe9 100644 --- a/nova/api/rackspace/images.py +++ b/nova/api/rackspace/images.py @@ -15,4 +15,103 @@ # License for the specific language governing permissions and limitations # under the License. -class Controller(object): pass +from nova import datastore +from nova import image +from nova.api.rackspace import base +from webob import exc + +class Controller(base.Controller): + + _serialization_metadata = { + 'application/xml': { + "attributes": { + "image": [ "id", "name", "updated", "created", "status", + "serverId", "progress" ] + } + } + } + + def __init__(self): + self._service = image.service.ImageService.load() + self._id_translator = RackspaceAPIImageIdTranslator() + + def _to_rs_id(self, image_id): + """ + Convert an image id from the format of our ImageService strategy + to the Rackspace API format (an int). + """ + strategy = self._service.__class__.__name__ + return self._id_translator.to_rs_id(strategy, image_id) + + def _from_rs_id(self, rs_image_id): + """ + Convert an image id from the Rackspace API format (an int) to the + format of our ImageService strategy. + """ + strategy = self._service.__class__.__name__ + return self._id_translator.from_rs_id(strategy, rs_image_id) + + def index(self, req): + """Return all public images.""" + data = self._service.index() + for img in data: + img['id'] = self._to_rs_id(img['id']) + return dict(images=data) + + def show(self, req, id): + """Return data about the given image id.""" + opaque_id = self._from_rs_id(id) + img = self._service.show(opaque_id) + img['id'] = id + return dict(image=img) + + def delete(self, req, id): + # Only public images are supported for now. + raise exc.HTTPNotFound() + + def create(self, req): + # Only public images are supported for now, so a request to + # make a backup of a server cannot be supproted. + raise exc.HTTPNotFound() + + def update(self, req, id): + # Users may not modify public images, and that's all that + # we support for now. + raise exc.HTTPNotFound() + + +class RackspaceAPIImageIdTranslator(object): + """ + Converts Rackspace API image ids to and from the id format for a given + strategy. + """ + + def __init__(self): + self._store = datastore.Redis.instance() + self._key_template = "rsapi.idstrategies.image.%s.%s" + + def to_rs_id(self, strategy_name, opaque_id): + """Convert an id from a strategy-specific one to a Rackspace one.""" + key = self._key_template % (strategy_name, "fwd") + result = self._store.hget(key, str(opaque_id)) + if result: # we have a mapping from opaque to RS for this strategy + return int(result) + else: + # Store the mapping. + nextid = self._store.incr("%s.lastid" % key) + if self._store.hsetnx(key, str(opaque_id), nextid): + # If someone else didn't beat us to it, store the reverse + # mapping as well. + key = self._key_template % (strategy_name, "rev") + self._store.hset(key, nextid, str(opaque_id)) + return nextid + else: + # Someone beat us to it; use their number instead, and + # discard nextid (which is OK -- we don't require that + # every int id be used.) + return int(self._store.hget(key, str(opaque_id))) + + def from_rs_id(self, strategy_name, rs_id): + """Convert a Rackspace id to a strategy-specific one.""" + key = self._key_template % (strategy_name, "rev") + return self._store.hget(key, rs_id) diff --git a/nova/api/rackspace/notes.txt b/nova/api/rackspace/notes.txt new file mode 100644 index 000000000..e133bf5ea --- /dev/null +++ b/nova/api/rackspace/notes.txt @@ -0,0 +1,23 @@ +We will need: + +ImageService +a service that can do crud on image information. not user-specific. opaque +image ids. + +GlanceImageService(ImageService): +image ids are URIs. + +LocalImageService(ImageService): +image ids are random strings. + +RackspaceAPITranslationStore: +translates RS server/images/flavor/etc ids into formats required +by a given ImageService strategy. + +api.rackspace.images.Controller: +uses an ImageService strategy behind the scenes to do its fetching; it just +converts int image id into a strategy-specific image id. + +who maintains the mapping from user to [images he owns]? nobody, because +we have no way of enforcing access to his images, without kryptex which +won't be in Austin. diff --git a/nova/flags.py b/nova/flags.py index e3feb252d..6f9f906dd 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -141,6 +141,7 @@ def _wrapper(func): return _wrapped +DEFINE = _wrapper(gflags.DEFINE) DEFINE_string = _wrapper(gflags.DEFINE_string) DEFINE_integer = _wrapper(gflags.DEFINE_integer) DEFINE_bool = _wrapper(gflags.DEFINE_bool) diff --git a/nova/image/service.py b/nova/image/service.py new file mode 100644 index 000000000..1a7a258b7 --- /dev/null +++ b/nova/image/service.py @@ -0,0 +1,90 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import cPickle as pickle +import os.path +import random +import string + +class ImageService(object): + """Provides storage and retrieval of disk image objects.""" + + @staticmethod + def load(): + """Factory method to return image service.""" + #TODO(gundlach): read from config. + class_ = LocalImageService + return class_() + + def index(self): + """ + Return a dict from opaque image id to image data. + """ + + def show(self, id): + """ + Returns a dict containing image data for the given opaque image id. + """ + + +class GlanceImageService(ImageService): + """Provides storage and retrieval of disk image objects within Glance.""" + # TODO(gundlach): once Glance has an API, build this. + pass + + +class LocalImageService(ImageService): + """Image service storing images to local disk.""" + + def __init__(self): + self._path = "/tmp/nova/images" + try: + os.makedirs(self._path) + except OSError: # exists + pass + + def _path_to(self, image_id=''): + return os.path.join(self._path, image_id) + + def _ids(self): + """The list of all image ids.""" + return os.listdir(self._path) + + def index(self): + return [ self.show(id) for id in self._ids() ] + + def show(self, id): + return pickle.load(open(self._path_to(id))) + + def create(self, data): + """ + Store the image data and return the new image id. + """ + id = ''.join(random.choice(string.letters) for _ in range(20)) + data['id'] = id + self.update(id, data) + return id + + def update(self, image_id, data): + """Replace the contents of the given image with the new data.""" + pickle.dump(data, open(self._path_to(image_id), 'w')) + + def delete(self, image_id): + """ + Delete the given image. Raises OSError if the image does not exist. + """ + os.unlink(self._path_to(image_id)) diff --git a/nova/network/model.py b/nova/network/model.py index 6c12836b7..557fc92a6 100644 --- a/nova/network/model.py +++ b/nova/network/model.py @@ -290,6 +290,11 @@ class BaseNetwork(datastore.BasicModel): return 'available:%s' % self.identifier @property + def num_available_ips(self): + redis = datastore.Redis.instance() + return redis.scard(self._available_key) + + @property def assigned(self): """Returns a list of all assigned addresses""" return self.address_class.associated_keys('network', self.identifier) diff --git a/nova/network/service.py b/nova/network/service.py index d3aa1c46f..3dba0a9ef 100644 --- a/nova/network/service.py +++ b/nova/network/service.py @@ -26,7 +26,7 @@ from nova import flags from nova import service from nova import utils from nova.auth import manager -from nova.network import exception +from nova.network import exception as network_exception from nova.network import model from nova.network import vpn @@ -64,8 +64,7 @@ def type_to_class(network_type): def setup_compute_network(network_type, user_id, project_id, security_group): """Sets up the network on a compute host""" srv = type_to_class(network_type) - srv.setup_compute_network(network_type, - user_id, + srv.setup_compute_network(user_id, project_id, security_group) @@ -170,7 +169,7 @@ class FlatNetworkService(BaseNetworkService): redis.sadd('ips', fixed_ip) fixed_ip = redis.spop('ips') if not fixed_ip: - raise exception.NoMoreAddresses() + raise network_exception.NoMoreAddresses() # TODO(vish): some sort of dns handling for hostname should # probably be done here. return {'inject_network': True, diff --git a/nova/server.py b/nova/server.py index 96550f078..c6b60e090 100644 --- a/nova/server.py +++ b/nova/server.py @@ -44,6 +44,8 @@ flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing') flags.DEFINE_string('logfile', None, 'log file to output to') flags.DEFINE_string('pidfile', None, 'pid file to output to') flags.DEFINE_string('working_directory', './', 'working directory...') +flags.DEFINE_integer('uid', os.getuid(), 'uid under which to run') +flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run') def stop(pidfile): @@ -135,6 +137,8 @@ def daemonize(args, name, main): threaded=False), stdin=stdin, stdout=stdout, - stderr=stderr + stderr=stderr, + uid=FLAGS.uid, + gid=FLAGS.gid ): main(args) diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index 34b68f1ed..00b0b97e7 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -184,13 +184,23 @@ class NetworkTestCase(test.TrialTestCase): def test_ips_are_reused(self): """Makes sure that ip addresses that are deallocated get reused""" - result = self.service.allocate_fixed_ip( - self.user.id, self.projects[0].id) - mac = result['mac_address'] - address = result['private_dns_name'] + net = model.get_project_network(self.projects[0].id, "default") hostname = "reuse-host" - net = model.get_project_network(self.projects[0].id, "default") + macs = {} + addresses = {} + num_available_ips = net.num_available_ips + for i in range(num_available_ips - 1): + result = self.service.allocate_fixed_ip(self.user.id, + self.projects[0].id) + macs[i] = result['mac_address'] + addresses[i] = result['private_dns_name'] + issue_ip(macs[i], addresses[i], hostname, net.bridge_name) + + result = self.service.allocate_fixed_ip(self.user.id, + self.projects[0].id) + mac = result['mac_address'] + address = result['private_dns_name'] issue_ip(mac, address, hostname, net.bridge_name) self.service.deallocate_fixed_ip(address) @@ -205,6 +215,10 @@ class NetworkTestCase(test.TrialTestCase): self.service.deallocate_fixed_ip(secondaddress) release_ip(secondmac, secondaddress, hostname, net.bridge_name) + for i in range(len(addresses)): + self.service.deallocate_fixed_ip(addresses[i]) + release_ip(macs[i], addresses[i], hostname, net.bridge_name) + def test_available_ips(self): """Make sure the number of available ips for the network is correct @@ -222,7 +236,7 @@ class NetworkTestCase(test.TrialTestCase): num_available_ips = net_size - (net.num_bottom_reserved_ips + num_preallocated_ips + net.num_top_reserved_ips) - self.assertEqual(num_available_ips, len(list(net.available))) + self.assertEqual(num_available_ips, net.num_available_ips) def test_too_many_addresses(self): """Test for a NoMoreAddresses exception when all fixed ips are used. @@ -232,8 +246,7 @@ class NetworkTestCase(test.TrialTestCase): hostname = "toomany-hosts" macs = {} addresses = {} - # Number of availaible ips is len of the available list - num_available_ips = len(list(net.available)) + num_available_ips = net.num_available_ips for i in range(num_available_ips): result = self.service.allocate_fixed_ip(self.user.id, self.projects[0].id) @@ -241,14 +254,14 @@ class NetworkTestCase(test.TrialTestCase): addresses[i] = result['private_dns_name'] issue_ip(macs[i], addresses[i], hostname, net.bridge_name) - self.assertEqual(len(list(net.available)), 0) + self.assertEqual(net.num_available_ips, 0) self.assertRaises(NoMoreAddresses, self.service.allocate_fixed_ip, self.user.id, self.projects[0].id) for i in range(len(addresses)): self.service.deallocate_fixed_ip(addresses[i]) release_ip(macs[i], addresses[i], hostname, net.bridge_name) - self.assertEqual(len(list(net.available)), num_available_ips) + self.assertEqual(net.num_available_ips, num_available_ips) def is_in_project(address, project_id): diff --git a/nova/twistd.py b/nova/twistd.py index 8de322aa5..9511c231c 100644 --- a/nova/twistd.py +++ b/nova/twistd.py @@ -21,6 +21,7 @@ Twisted daemon helpers, specifically to parse out gFlags from twisted flags, manage pid files and support syslogging. """ +import gflags import logging import os import signal @@ -49,6 +50,14 @@ class TwistdServerOptions(ServerOptions): return +class FlagParser(object): + def __init__(self, parser): + self.parser = parser + + def Parse(self, s): + return self.parser(s) + + def WrapTwistedOptions(wrapped): class TwistedOptionsToFlags(wrapped): subCommands = None @@ -79,7 +88,12 @@ def WrapTwistedOptions(wrapped): reflect.accumulateClassList(self.__class__, 'optParameters', twistd_params) for param in twistd_params: key = param[0].replace('-', '_') - flags.DEFINE_string(key, param[2], str(param[-1])) + if len(param) > 4: + flags.DEFINE(FlagParser(param[4]), + key, param[2], str(param[3]), + serializer=gflags.ArgumentSerializer()) + else: + flags.DEFINE_string(key, param[2], str(param[3])) def _absorbHandlers(self): twistd_handlers = {} diff --git a/nova/utils.py b/nova/utils.py index dc3c626ec..ef8405fc0 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -29,6 +29,8 @@ import subprocess import socket import sys +from twisted.internet.threads import deferToThread + from nova import exception from nova import flags @@ -148,3 +150,9 @@ def isotime(at=None): def parse_isotime(timestr): return datetime.datetime.strptime(timestr, TIME_FORMAT) + + +def deferredToThread(f): + def g(*args, **kwargs): + return deferToThread(f, *args, **kwargs) + return g diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py index 2f5994983..b44ac383a 100644 --- a/nova/virt/xenapi.py +++ b/nova/virt/xenapi.py @@ -16,17 +16,35 @@ """ A connection to XenServer or Xen Cloud Platform. + +The concurrency model for this class is as follows: + +All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator +deferredToThread). They are remote calls, and so may hang for the usual +reasons. They should not be allowed to block the reactor thread. + +All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async +(using XenAPI.VM.async_start etc). These return a task, which can then be +polled for completion. Polling is handled using reactor.callLater. + +This combination of techniques means that we don't block the reactor thread at +all, and at the same time we don't hold lots of threads waiting for +long-running operations. + +FIXME: get_info currently doesn't conform to these rules, and will block the +reactor thread if the VM.get_by_name_label or VM.get_record calls block. """ import logging import xmlrpclib from twisted.internet import defer +from twisted.internet import reactor from twisted.internet import task -from nova import exception from nova import flags from nova import process +from nova import utils from nova.auth.manager import AuthManager from nova.compute import power_state from nova.virt import images @@ -47,6 +65,11 @@ flags.DEFINE_string('xenapi_connection_password', None, 'Password for connection to XenServer/Xen Cloud Platform.' ' Used only if connection_type=xenapi.') +flags.DEFINE_float('xenapi_task_poll_interval', + 0.5, + 'The interval used for polling of remote tasks ' + '(Async.VM.start, etc). Used only if ' + 'connection_type=xenapi.') XENAPI_POWER_STATE = { @@ -84,9 +107,8 @@ class XenAPIConnection(object): for vm in self._conn.xenapi.VM.get_all()] @defer.inlineCallbacks - @exception.wrap_exception def spawn(self, instance): - vm = yield self.lookup(instance.name) + vm = yield self._lookup(instance.name) if vm is not None: raise Exception('Attempted to create non-unique name %s' % instance.name) @@ -105,21 +127,27 @@ class XenAPIConnection(object): user = AuthManager().get_user(instance.datamodel['user_id']) project = AuthManager().get_project(instance.datamodel['project_id']) - vdi_uuid = yield self.fetch_image( + vdi_uuid = yield self._fetch_image( instance.datamodel['image_id'], user, project, True) - kernel = yield self.fetch_image( + kernel = yield self._fetch_image( instance.datamodel['kernel_id'], user, project, False) - ramdisk = yield self.fetch_image( + ramdisk = yield self._fetch_image( instance.datamodel['ramdisk_id'], user, project, False) - vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid) + vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid) - vm_ref = yield self.create_vm(instance, kernel, ramdisk) - yield self.create_vbd(vm_ref, vdi_ref, 0, True) + vm_ref = yield self._create_vm(instance, kernel, ramdisk) + yield self._create_vbd(vm_ref, vdi_ref, 0, True) if network_ref: yield self._create_vif(vm_ref, network_ref, mac_address) - yield self._conn.xenapi.VM.start(vm_ref, False, False) + logging.debug('Starting VM %s...', vm_ref) + yield self._call_xenapi('VM.start', vm_ref, False, False) + logging.info('Spawning VM %s created %s.', instance.name, vm_ref) - def create_vm(self, instance, kernel, ramdisk): + @defer.inlineCallbacks + def _create_vm(self, instance, kernel, ramdisk): + """Create a VM record. Returns a Deferred that gives the new + VM reference.""" + mem = str(long(instance.datamodel['memory_kb']) * 1024) vcpus = str(instance.datamodel['vcpus']) rec = { @@ -152,11 +180,15 @@ class XenAPIConnection(object): 'other_config': {}, } logging.debug('Created VM %s...', instance.name) - vm_ref = self._conn.xenapi.VM.create(rec) + vm_ref = yield self._call_xenapi('VM.create', rec) logging.debug('Created VM %s as %s.', instance.name, vm_ref) - return vm_ref + defer.returnValue(vm_ref) - def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): + @defer.inlineCallbacks + def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): + """Create a VBD record. Returns a Deferred that gives the new + VBD reference.""" + vbd_rec = {} vbd_rec['VM'] = vm_ref vbd_rec['VDI'] = vdi_ref @@ -171,12 +203,16 @@ class XenAPIConnection(object): vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = self._conn.xenapi.VBD.create(vbd_rec) + vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec) logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, vdi_ref) - return vbd_ref + defer.returnValue(vbd_ref) + @defer.inlineCallbacks def _create_vif(self, vm_ref, network_ref, mac_address): + """Create a VIF record. Returns a Deferred that gives the new + VIF reference.""" + vif_rec = {} vif_rec['device'] = '0' vif_rec['network']= network_ref @@ -188,25 +224,29 @@ class XenAPIConnection(object): vif_rec['qos_algorithm_params'] = {} logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, network_ref) - vif_ref = self._conn.xenapi.VIF.create(vif_rec) + vif_ref = yield self._call_xenapi('VIF.create', vif_rec) logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, vm_ref, network_ref) - return vif_ref + defer.returnValue(vif_ref) + @defer.inlineCallbacks def _find_network_with_bridge(self, bridge): expr = 'field "bridge" = "%s"' % bridge - networks = self._conn.xenapi.network.get_all_records_where(expr) + networks = yield self._call_xenapi('network.get_all_records_where', + expr) if len(networks) == 1: - return networks.keys()[0] + defer.returnValue(networks.keys()[0]) elif len(networks) > 1: raise Exception('Found non-unique network for bridge %s' % bridge) else: raise Exception('Found no network for bridge %s' % bridge) - def fetch_image(self, image, user, project, use_sr): + @defer.inlineCallbacks + def _fetch_image(self, image, user, project, use_sr): """use_sr: True to put the image as a VDI in an SR, False to place it on dom0's filesystem. The former is for VM disks, the latter for - its kernel and ramdisk (if external kernels are being used).""" + its kernel and ramdisk (if external kernels are being used). + Returns a Deferred that gives the new VDI UUID.""" url = images.image_url(image) access = AuthManager().get_access_key(user, project) @@ -218,22 +258,28 @@ class XenAPIConnection(object): args['password'] = user.secret if use_sr: args['add_partition'] = 'true' - return self._call_plugin('objectstore', fn, args) + task = yield self._async_call_plugin('objectstore', fn, args) + uuid = yield self._wait_for_task(task) + defer.returnValue(uuid) + @defer.inlineCallbacks def reboot(self, instance): - vm = self.lookup(instance.name) + vm = yield self._lookup(instance.name) if vm is None: raise Exception('instance not present %s' % instance.name) - yield self._conn.xenapi.VM.clean_reboot(vm) + task = yield self._call_xenapi('Async.VM.clean_reboot', vm) + yield self._wait_for_task(task) + @defer.inlineCallbacks def destroy(self, instance): - vm = self.lookup(instance.name) + vm = yield self._lookup(instance.name) if vm is None: raise Exception('instance not present %s' % instance.name) - yield self._conn.xenapi.VM.destroy(vm) + task = yield self._call_xenapi('Async.VM.destroy', vm) + yield self._wait_for_task(task) def get_info(self, instance_id): - vm = self.lookup(instance_id) + vm = self._lookup_blocking(instance_id) if vm is None: raise Exception('instance not present %s' % instance_id) rec = self._conn.xenapi.VM.get_record(vm) @@ -243,7 +289,11 @@ class XenAPIConnection(object): 'num_cpu': rec['VCPUs_max'], 'cpu_time': 0} - def lookup(self, i): + @utils.deferredToThread + def _lookup(self, i): + return self._lookup_blocking(i) + + def _lookup_blocking(self, i): vms = self._conn.xenapi.VM.get_by_name_label(i) n = len(vms) if n == 0: @@ -253,9 +303,52 @@ class XenAPIConnection(object): else: return vms[0] - def _call_plugin(self, plugin, fn, args): + def _wait_for_task(self, task): + """Return a Deferred that will give the result of the given task. + The task is polled until it completes.""" + d = defer.Deferred() + reactor.callLater(0, self._poll_task, task, d) + return d + + @utils.deferredToThread + def _poll_task(self, task, deferred): + """Poll the given XenAPI task, and fire the given Deferred if we + get a result.""" + try: + #logging.debug('Polling task %s...', task) + status = self._conn.xenapi.task.get_status(task) + if status == 'pending': + reactor.callLater(FLAGS.xenapi_task_poll_interval, + self._poll_task, task, deferred) + elif status == 'success': + result = self._conn.xenapi.task.get_result(task) + logging.info('Task %s status: success. %s', task, result) + deferred.callback(_parse_xmlrpc_value(result)) + else: + error_info = self._conn.xenapi.task.get_error_info(task) + logging.warn('Task %s status: %s. %s', task, status, + error_info) + deferred.errback(XenAPI.Failure(error_info)) + #logging.debug('Polling task %s done.', task) + except Exception, exn: + logging.warn(exn) + deferred.errback(exn) + + @utils.deferredToThread + def _call_xenapi(self, method, *args): + """Call the specified XenAPI method on a background thread. Returns + a Deferred for the result.""" + f = self._conn.xenapi + for m in method.split('.'): + f = f.__getattr__(m) + return f(*args) + + @utils.deferredToThread + def _async_call_plugin(self, plugin, fn, args): + """Call Async.host.call_plugin on a background thread. Returns a + Deferred with the task reference.""" return _unwrap_plugin_exceptions( - self._conn.xenapi.host.call_plugin, + self._conn.xenapi.Async.host.call_plugin, self._get_xenapi_host(), plugin, fn, args) def _get_xenapi_host(self): @@ -281,3 +374,15 @@ def _unwrap_plugin_exceptions(func, *args, **kwargs): except xmlrpclib.ProtocolError, exn: logging.debug("Got exception: %s", exn) raise + + +def _parse_xmlrpc_value(val): + """Parse the given value as if it were an XML-RPC value. This is + sometimes used as the format for the task.result field.""" + if not val: + return val + x = xmlrpclib.loads( + '<?xml version="1.0"?><methodResponse><params><param>' + + val + + '</param></params></methodResponse>') + return x[0][0] diff --git a/nova/wsgi.py b/nova/wsgi.py index baf6cccd9..bec0a7b1c 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -196,7 +196,8 @@ class Controller(object): WSGI app that reads routing information supplied by RoutesMiddleware and calls the requested action method upon itself. All action methods must, in addition to their normal parameters, accept a 'req' argument - which is the incoming webob.Request. + which is the incoming webob.Request. They raise a webob.exc exception, + or return a dict which will be serialized by requested content type. """ @webob.dec.wsgify @@ -210,7 +211,21 @@ class Controller(object): del arg_dict['controller'] del arg_dict['action'] arg_dict['req'] = req - return method(**arg_dict) + result = method(**arg_dict) + if type(result) is dict: + return self._serialize(result, req) + else: + return result + + def _serialize(self, data, request): + """ + Serialize the given dict to the response type requested in request. + Uses self._serialization_metadata if it exists, which is a dict mapping + MIME types to information needed to serialize to that type. + """ + _metadata = getattr(type(self), "_serialization_metadata", {}) + serializer = Serializer(request.environ, _metadata) + return serializer.to_content_type(data) class Serializer(object): |
