diff options
| author | Michael Gundlach <michael.gundlach@rackspace.com> | 2010-08-25 16:49:32 -0400 |
|---|---|---|
| committer | Michael Gundlach <michael.gundlach@rackspace.com> | 2010-08-25 16:49:32 -0400 |
| commit | 0aa2eceba81e569dcde8bca318b696d121fe9f81 (patch) | |
| tree | ecd58921e5373857634aff0f227feaa01fba3795 | |
| parent | cf0b5de1f78fd81ada2bada8c84e26b3238b8596 (diff) | |
| parent | 686ad09fe4416bb578661a42f6f083528d4a7ca9 (diff) | |
| download | nova-0aa2eceba81e569dcde8bca318b696d121fe9f81.tar.gz nova-0aa2eceba81e569dcde8bca318b696d121fe9f81.tar.xz nova-0aa2eceba81e569dcde8bca318b696d121fe9f81.zip | |
Merge from trunk
| -rw-r--r-- | nova/api/rackspace/images.py | 2 | ||||
| -rw-r--r-- | nova/api/services/__init__.py | 0 | ||||
| -rw-r--r-- | nova/flags.py | 1 | ||||
| -rw-r--r-- | nova/image/service.py (renamed from nova/api/services/image.py) | 0 | ||||
| -rw-r--r-- | nova/network/service.py | 7 | ||||
| -rw-r--r-- | nova/server.py | 6 | ||||
| -rw-r--r-- | nova/twistd.py | 16 | ||||
| -rw-r--r-- | nova/utils.py | 8 | ||||
| -rw-r--r-- | nova/virt/xenapi.py | 167 | ||||
| -rw-r--r-- | nova/wsgi.py | 5 | ||||
| -rw-r--r-- | tools/install_venv.py | 4 |
11 files changed, 175 insertions, 41 deletions
diff --git a/nova/api/rackspace/images.py b/nova/api/rackspace/images.py index 09f55ea96..06fb0d38f 100644 --- a/nova/api/rackspace/images.py +++ b/nova/api/rackspace/images.py @@ -15,9 +15,9 @@ # License for the specific language governing permissions and limitations # under the License. +from nova import image from nova.api.rackspace import base from nova.api.rackspace import _id_translator -from nova.api.services import image from webob import exc class Controller(base.Controller): diff --git a/nova/api/services/__init__.py b/nova/api/services/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/nova/api/services/__init__.py +++ /dev/null diff --git a/nova/flags.py b/nova/flags.py index e3feb252d..6f9f906dd 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -141,6 +141,7 @@ def _wrapper(func): return _wrapped +DEFINE = _wrapper(gflags.DEFINE) DEFINE_string = _wrapper(gflags.DEFINE_string) DEFINE_integer = _wrapper(gflags.DEFINE_integer) DEFINE_bool = _wrapper(gflags.DEFINE_bool) diff --git a/nova/api/services/image.py b/nova/image/service.py index 1a7a258b7..1a7a258b7 100644 --- a/nova/api/services/image.py +++ b/nova/image/service.py diff --git a/nova/network/service.py b/nova/network/service.py index d3aa1c46f..3dba0a9ef 100644 --- a/nova/network/service.py +++ b/nova/network/service.py @@ -26,7 +26,7 @@ from nova import flags from nova import service from nova import utils from nova.auth import manager -from nova.network import exception +from nova.network import exception as network_exception from nova.network import model from nova.network import vpn @@ -64,8 +64,7 @@ def type_to_class(network_type): def setup_compute_network(network_type, user_id, project_id, security_group): """Sets up the network on a compute host""" srv = type_to_class(network_type) - srv.setup_compute_network(network_type, - user_id, + srv.setup_compute_network(user_id, project_id, security_group) @@ -170,7 +169,7 @@ class FlatNetworkService(BaseNetworkService): redis.sadd('ips', fixed_ip) fixed_ip = redis.spop('ips') if not fixed_ip: - raise exception.NoMoreAddresses() + raise network_exception.NoMoreAddresses() # TODO(vish): some sort of dns handling for hostname should # probably be done here. return {'inject_network': True, diff --git a/nova/server.py b/nova/server.py index 96550f078..c6b60e090 100644 --- a/nova/server.py +++ b/nova/server.py @@ -44,6 +44,8 @@ flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing') flags.DEFINE_string('logfile', None, 'log file to output to') flags.DEFINE_string('pidfile', None, 'pid file to output to') flags.DEFINE_string('working_directory', './', 'working directory...') +flags.DEFINE_integer('uid', os.getuid(), 'uid under which to run') +flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run') def stop(pidfile): @@ -135,6 +137,8 @@ def daemonize(args, name, main): threaded=False), stdin=stdin, stdout=stdout, - stderr=stderr + stderr=stderr, + uid=FLAGS.uid, + gid=FLAGS.gid ): main(args) diff --git a/nova/twistd.py b/nova/twistd.py index 8de322aa5..9511c231c 100644 --- a/nova/twistd.py +++ b/nova/twistd.py @@ -21,6 +21,7 @@ Twisted daemon helpers, specifically to parse out gFlags from twisted flags, manage pid files and support syslogging. """ +import gflags import logging import os import signal @@ -49,6 +50,14 @@ class TwistdServerOptions(ServerOptions): return +class FlagParser(object): + def __init__(self, parser): + self.parser = parser + + def Parse(self, s): + return self.parser(s) + + def WrapTwistedOptions(wrapped): class TwistedOptionsToFlags(wrapped): subCommands = None @@ -79,7 +88,12 @@ def WrapTwistedOptions(wrapped): reflect.accumulateClassList(self.__class__, 'optParameters', twistd_params) for param in twistd_params: key = param[0].replace('-', '_') - flags.DEFINE_string(key, param[2], str(param[-1])) + if len(param) > 4: + flags.DEFINE(FlagParser(param[4]), + key, param[2], str(param[3]), + serializer=gflags.ArgumentSerializer()) + else: + flags.DEFINE_string(key, param[2], str(param[3])) def _absorbHandlers(self): twistd_handlers = {} diff --git a/nova/utils.py b/nova/utils.py index dc3c626ec..ef8405fc0 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -29,6 +29,8 @@ import subprocess import socket import sys +from twisted.internet.threads import deferToThread + from nova import exception from nova import flags @@ -148,3 +150,9 @@ def isotime(at=None): def parse_isotime(timestr): return datetime.datetime.strptime(timestr, TIME_FORMAT) + + +def deferredToThread(f): + def g(*args, **kwargs): + return deferToThread(f, *args, **kwargs) + return g diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py index 2f5994983..b44ac383a 100644 --- a/nova/virt/xenapi.py +++ b/nova/virt/xenapi.py @@ -16,17 +16,35 @@ """ A connection to XenServer or Xen Cloud Platform. + +The concurrency model for this class is as follows: + +All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator +deferredToThread). They are remote calls, and so may hang for the usual +reasons. They should not be allowed to block the reactor thread. + +All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async +(using XenAPI.VM.async_start etc). These return a task, which can then be +polled for completion. Polling is handled using reactor.callLater. + +This combination of techniques means that we don't block the reactor thread at +all, and at the same time we don't hold lots of threads waiting for +long-running operations. + +FIXME: get_info currently doesn't conform to these rules, and will block the +reactor thread if the VM.get_by_name_label or VM.get_record calls block. """ import logging import xmlrpclib from twisted.internet import defer +from twisted.internet import reactor from twisted.internet import task -from nova import exception from nova import flags from nova import process +from nova import utils from nova.auth.manager import AuthManager from nova.compute import power_state from nova.virt import images @@ -47,6 +65,11 @@ flags.DEFINE_string('xenapi_connection_password', None, 'Password for connection to XenServer/Xen Cloud Platform.' ' Used only if connection_type=xenapi.') +flags.DEFINE_float('xenapi_task_poll_interval', + 0.5, + 'The interval used for polling of remote tasks ' + '(Async.VM.start, etc). Used only if ' + 'connection_type=xenapi.') XENAPI_POWER_STATE = { @@ -84,9 +107,8 @@ class XenAPIConnection(object): for vm in self._conn.xenapi.VM.get_all()] @defer.inlineCallbacks - @exception.wrap_exception def spawn(self, instance): - vm = yield self.lookup(instance.name) + vm = yield self._lookup(instance.name) if vm is not None: raise Exception('Attempted to create non-unique name %s' % instance.name) @@ -105,21 +127,27 @@ class XenAPIConnection(object): user = AuthManager().get_user(instance.datamodel['user_id']) project = AuthManager().get_project(instance.datamodel['project_id']) - vdi_uuid = yield self.fetch_image( + vdi_uuid = yield self._fetch_image( instance.datamodel['image_id'], user, project, True) - kernel = yield self.fetch_image( + kernel = yield self._fetch_image( instance.datamodel['kernel_id'], user, project, False) - ramdisk = yield self.fetch_image( + ramdisk = yield self._fetch_image( instance.datamodel['ramdisk_id'], user, project, False) - vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid) + vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid) - vm_ref = yield self.create_vm(instance, kernel, ramdisk) - yield self.create_vbd(vm_ref, vdi_ref, 0, True) + vm_ref = yield self._create_vm(instance, kernel, ramdisk) + yield self._create_vbd(vm_ref, vdi_ref, 0, True) if network_ref: yield self._create_vif(vm_ref, network_ref, mac_address) - yield self._conn.xenapi.VM.start(vm_ref, False, False) + logging.debug('Starting VM %s...', vm_ref) + yield self._call_xenapi('VM.start', vm_ref, False, False) + logging.info('Spawning VM %s created %s.', instance.name, vm_ref) - def create_vm(self, instance, kernel, ramdisk): + @defer.inlineCallbacks + def _create_vm(self, instance, kernel, ramdisk): + """Create a VM record. Returns a Deferred that gives the new + VM reference.""" + mem = str(long(instance.datamodel['memory_kb']) * 1024) vcpus = str(instance.datamodel['vcpus']) rec = { @@ -152,11 +180,15 @@ class XenAPIConnection(object): 'other_config': {}, } logging.debug('Created VM %s...', instance.name) - vm_ref = self._conn.xenapi.VM.create(rec) + vm_ref = yield self._call_xenapi('VM.create', rec) logging.debug('Created VM %s as %s.', instance.name, vm_ref) - return vm_ref + defer.returnValue(vm_ref) - def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): + @defer.inlineCallbacks + def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): + """Create a VBD record. Returns a Deferred that gives the new + VBD reference.""" + vbd_rec = {} vbd_rec['VM'] = vm_ref vbd_rec['VDI'] = vdi_ref @@ -171,12 +203,16 @@ class XenAPIConnection(object): vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = self._conn.xenapi.VBD.create(vbd_rec) + vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec) logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, vdi_ref) - return vbd_ref + defer.returnValue(vbd_ref) + @defer.inlineCallbacks def _create_vif(self, vm_ref, network_ref, mac_address): + """Create a VIF record. Returns a Deferred that gives the new + VIF reference.""" + vif_rec = {} vif_rec['device'] = '0' vif_rec['network']= network_ref @@ -188,25 +224,29 @@ class XenAPIConnection(object): vif_rec['qos_algorithm_params'] = {} logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, network_ref) - vif_ref = self._conn.xenapi.VIF.create(vif_rec) + vif_ref = yield self._call_xenapi('VIF.create', vif_rec) logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, vm_ref, network_ref) - return vif_ref + defer.returnValue(vif_ref) + @defer.inlineCallbacks def _find_network_with_bridge(self, bridge): expr = 'field "bridge" = "%s"' % bridge - networks = self._conn.xenapi.network.get_all_records_where(expr) + networks = yield self._call_xenapi('network.get_all_records_where', + expr) if len(networks) == 1: - return networks.keys()[0] + defer.returnValue(networks.keys()[0]) elif len(networks) > 1: raise Exception('Found non-unique network for bridge %s' % bridge) else: raise Exception('Found no network for bridge %s' % bridge) - def fetch_image(self, image, user, project, use_sr): + @defer.inlineCallbacks + def _fetch_image(self, image, user, project, use_sr): """use_sr: True to put the image as a VDI in an SR, False to place it on dom0's filesystem. The former is for VM disks, the latter for - its kernel and ramdisk (if external kernels are being used).""" + its kernel and ramdisk (if external kernels are being used). + Returns a Deferred that gives the new VDI UUID.""" url = images.image_url(image) access = AuthManager().get_access_key(user, project) @@ -218,22 +258,28 @@ class XenAPIConnection(object): args['password'] = user.secret if use_sr: args['add_partition'] = 'true' - return self._call_plugin('objectstore', fn, args) + task = yield self._async_call_plugin('objectstore', fn, args) + uuid = yield self._wait_for_task(task) + defer.returnValue(uuid) + @defer.inlineCallbacks def reboot(self, instance): - vm = self.lookup(instance.name) + vm = yield self._lookup(instance.name) if vm is None: raise Exception('instance not present %s' % instance.name) - yield self._conn.xenapi.VM.clean_reboot(vm) + task = yield self._call_xenapi('Async.VM.clean_reboot', vm) + yield self._wait_for_task(task) + @defer.inlineCallbacks def destroy(self, instance): - vm = self.lookup(instance.name) + vm = yield self._lookup(instance.name) if vm is None: raise Exception('instance not present %s' % instance.name) - yield self._conn.xenapi.VM.destroy(vm) + task = yield self._call_xenapi('Async.VM.destroy', vm) + yield self._wait_for_task(task) def get_info(self, instance_id): - vm = self.lookup(instance_id) + vm = self._lookup_blocking(instance_id) if vm is None: raise Exception('instance not present %s' % instance_id) rec = self._conn.xenapi.VM.get_record(vm) @@ -243,7 +289,11 @@ class XenAPIConnection(object): 'num_cpu': rec['VCPUs_max'], 'cpu_time': 0} - def lookup(self, i): + @utils.deferredToThread + def _lookup(self, i): + return self._lookup_blocking(i) + + def _lookup_blocking(self, i): vms = self._conn.xenapi.VM.get_by_name_label(i) n = len(vms) if n == 0: @@ -253,9 +303,52 @@ class XenAPIConnection(object): else: return vms[0] - def _call_plugin(self, plugin, fn, args): + def _wait_for_task(self, task): + """Return a Deferred that will give the result of the given task. + The task is polled until it completes.""" + d = defer.Deferred() + reactor.callLater(0, self._poll_task, task, d) + return d + + @utils.deferredToThread + def _poll_task(self, task, deferred): + """Poll the given XenAPI task, and fire the given Deferred if we + get a result.""" + try: + #logging.debug('Polling task %s...', task) + status = self._conn.xenapi.task.get_status(task) + if status == 'pending': + reactor.callLater(FLAGS.xenapi_task_poll_interval, + self._poll_task, task, deferred) + elif status == 'success': + result = self._conn.xenapi.task.get_result(task) + logging.info('Task %s status: success. %s', task, result) + deferred.callback(_parse_xmlrpc_value(result)) + else: + error_info = self._conn.xenapi.task.get_error_info(task) + logging.warn('Task %s status: %s. %s', task, status, + error_info) + deferred.errback(XenAPI.Failure(error_info)) + #logging.debug('Polling task %s done.', task) + except Exception, exn: + logging.warn(exn) + deferred.errback(exn) + + @utils.deferredToThread + def _call_xenapi(self, method, *args): + """Call the specified XenAPI method on a background thread. Returns + a Deferred for the result.""" + f = self._conn.xenapi + for m in method.split('.'): + f = f.__getattr__(m) + return f(*args) + + @utils.deferredToThread + def _async_call_plugin(self, plugin, fn, args): + """Call Async.host.call_plugin on a background thread. Returns a + Deferred with the task reference.""" return _unwrap_plugin_exceptions( - self._conn.xenapi.host.call_plugin, + self._conn.xenapi.Async.host.call_plugin, self._get_xenapi_host(), plugin, fn, args) def _get_xenapi_host(self): @@ -281,3 +374,15 @@ def _unwrap_plugin_exceptions(func, *args, **kwargs): except xmlrpclib.ProtocolError, exn: logging.debug("Got exception: %s", exn) raise + + +def _parse_xmlrpc_value(val): + """Parse the given value as if it were an XML-RPC value. This is + sometimes used as the format for the task.result field.""" + if not val: + return val + x = xmlrpclib.loads( + '<?xml version="1.0"?><methodResponse><params><param>' + + val + + '</param></params></methodResponse>') + return x[0][0] diff --git a/nova/wsgi.py b/nova/wsgi.py index 096d5843f..bec0a7b1c 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -212,7 +212,10 @@ class Controller(object): del arg_dict['action'] arg_dict['req'] = req result = method(**arg_dict) - return self._serialize(result, req) if type(result) is dict else result + if type(result) is dict: + return self._serialize(result, req) + else: + return result def _serialize(self, data, request): """ diff --git a/tools/install_venv.py b/tools/install_venv.py index 1f0fa3cc7..5d2369a96 100644 --- a/tools/install_venv.py +++ b/tools/install_venv.py @@ -95,8 +95,8 @@ def install_dependencies(venv=VENV): # Tell the virtual env how to "import nova" - pathfile=os.path.join(venv, "lib", "python2.6", "site-packages", "nova.pth") - f=open(pathfile, 'w') + pthfile = os.path.join(venv, "lib", "python2.6", "site-packages", "nova.pth") + f = open(pthfile, 'w') f.write("%s\n" % ROOT) |
