summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Gundlach <michael.gundlach@rackspace.com>2010-08-25 10:38:25 -0400
committerMichael Gundlach <michael.gundlach@rackspace.com>2010-08-25 10:38:25 -0400
commit428b3256ad7e47e9f389ac8ce0ff70cc3d720e9e (patch)
treede20500a4346af56a96b94348bdf4f7d3834ec65
parent96ae5e2776218adfee2cb22a4c0d7358a498a451 (diff)
parente185a95ae6396e86e92b4a5e7ec92b5b422847d5 (diff)
downloadnova-428b3256ad7e47e9f389ac8ce0ff70cc3d720e9e.tar.gz
nova-428b3256ad7e47e9f389ac8ce0ff70cc3d720e9e.tar.xz
nova-428b3256ad7e47e9f389ac8ce0ff70cc3d720e9e.zip
Merge from trunk
-rw-r--r--nova/flags.py1
-rw-r--r--nova/network/service.py7
-rw-r--r--nova/server.py6
-rw-r--r--nova/twistd.py16
-rw-r--r--nova/utils.py8
-rw-r--r--nova/virt/xenapi.py167
6 files changed, 168 insertions, 37 deletions
diff --git a/nova/flags.py b/nova/flags.py
index e3feb252d..6f9f906dd 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -141,6 +141,7 @@ def _wrapper(func):
return _wrapped
+DEFINE = _wrapper(gflags.DEFINE)
DEFINE_string = _wrapper(gflags.DEFINE_string)
DEFINE_integer = _wrapper(gflags.DEFINE_integer)
DEFINE_bool = _wrapper(gflags.DEFINE_bool)
diff --git a/nova/network/service.py b/nova/network/service.py
index d3aa1c46f..3dba0a9ef 100644
--- a/nova/network/service.py
+++ b/nova/network/service.py
@@ -26,7 +26,7 @@ from nova import flags
from nova import service
from nova import utils
from nova.auth import manager
-from nova.network import exception
+from nova.network import exception as network_exception
from nova.network import model
from nova.network import vpn
@@ -64,8 +64,7 @@ def type_to_class(network_type):
def setup_compute_network(network_type, user_id, project_id, security_group):
"""Sets up the network on a compute host"""
srv = type_to_class(network_type)
- srv.setup_compute_network(network_type,
- user_id,
+ srv.setup_compute_network(user_id,
project_id,
security_group)
@@ -170,7 +169,7 @@ class FlatNetworkService(BaseNetworkService):
redis.sadd('ips', fixed_ip)
fixed_ip = redis.spop('ips')
if not fixed_ip:
- raise exception.NoMoreAddresses()
+ raise network_exception.NoMoreAddresses()
# TODO(vish): some sort of dns handling for hostname should
# probably be done here.
return {'inject_network': True,
diff --git a/nova/server.py b/nova/server.py
index 96550f078..c6b60e090 100644
--- a/nova/server.py
+++ b/nova/server.py
@@ -44,6 +44,8 @@ flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing')
flags.DEFINE_string('logfile', None, 'log file to output to')
flags.DEFINE_string('pidfile', None, 'pid file to output to')
flags.DEFINE_string('working_directory', './', 'working directory...')
+flags.DEFINE_integer('uid', os.getuid(), 'uid under which to run')
+flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run')
def stop(pidfile):
@@ -135,6 +137,8 @@ def daemonize(args, name, main):
threaded=False),
stdin=stdin,
stdout=stdout,
- stderr=stderr
+ stderr=stderr,
+ uid=FLAGS.uid,
+ gid=FLAGS.gid
):
main(args)
diff --git a/nova/twistd.py b/nova/twistd.py
index 8de322aa5..9511c231c 100644
--- a/nova/twistd.py
+++ b/nova/twistd.py
@@ -21,6 +21,7 @@ Twisted daemon helpers, specifically to parse out gFlags from twisted flags,
manage pid files and support syslogging.
"""
+import gflags
import logging
import os
import signal
@@ -49,6 +50,14 @@ class TwistdServerOptions(ServerOptions):
return
+class FlagParser(object):
+ def __init__(self, parser):
+ self.parser = parser
+
+ def Parse(self, s):
+ return self.parser(s)
+
+
def WrapTwistedOptions(wrapped):
class TwistedOptionsToFlags(wrapped):
subCommands = None
@@ -79,7 +88,12 @@ def WrapTwistedOptions(wrapped):
reflect.accumulateClassList(self.__class__, 'optParameters', twistd_params)
for param in twistd_params:
key = param[0].replace('-', '_')
- flags.DEFINE_string(key, param[2], str(param[-1]))
+ if len(param) > 4:
+ flags.DEFINE(FlagParser(param[4]),
+ key, param[2], str(param[3]),
+ serializer=gflags.ArgumentSerializer())
+ else:
+ flags.DEFINE_string(key, param[2], str(param[3]))
def _absorbHandlers(self):
twistd_handlers = {}
diff --git a/nova/utils.py b/nova/utils.py
index dc3c626ec..ef8405fc0 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -29,6 +29,8 @@ import subprocess
import socket
import sys
+from twisted.internet.threads import deferToThread
+
from nova import exception
from nova import flags
@@ -148,3 +150,9 @@ def isotime(at=None):
def parse_isotime(timestr):
return datetime.datetime.strptime(timestr, TIME_FORMAT)
+
+
+def deferredToThread(f):
+ def g(*args, **kwargs):
+ return deferToThread(f, *args, **kwargs)
+ return g
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
index 2f5994983..b44ac383a 100644
--- a/nova/virt/xenapi.py
+++ b/nova/virt/xenapi.py
@@ -16,17 +16,35 @@
"""
A connection to XenServer or Xen Cloud Platform.
+
+The concurrency model for this class is as follows:
+
+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
+deferredToThread). They are remote calls, and so may hang for the usual
+reasons. They should not be allowed to block the reactor thread.
+
+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion. Polling is handled using reactor.callLater.
+
+This combination of techniques means that we don't block the reactor thread at
+all, and at the same time we don't hold lots of threads waiting for
+long-running operations.
+
+FIXME: get_info currently doesn't conform to these rules, and will block the
+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import logging
import xmlrpclib
from twisted.internet import defer
+from twisted.internet import reactor
from twisted.internet import task
-from nova import exception
from nova import flags
from nova import process
+from nova import utils
from nova.auth.manager import AuthManager
from nova.compute import power_state
from nova.virt import images
@@ -47,6 +65,11 @@ flags.DEFINE_string('xenapi_connection_password',
None,
'Password for connection to XenServer/Xen Cloud Platform.'
' Used only if connection_type=xenapi.')
+flags.DEFINE_float('xenapi_task_poll_interval',
+ 0.5,
+ 'The interval used for polling of remote tasks '
+ '(Async.VM.start, etc). Used only if '
+ 'connection_type=xenapi.')
XENAPI_POWER_STATE = {
@@ -84,9 +107,8 @@ class XenAPIConnection(object):
for vm in self._conn.xenapi.VM.get_all()]
@defer.inlineCallbacks
- @exception.wrap_exception
def spawn(self, instance):
- vm = yield self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
@@ -105,21 +127,27 @@ class XenAPIConnection(object):
user = AuthManager().get_user(instance.datamodel['user_id'])
project = AuthManager().get_project(instance.datamodel['project_id'])
- vdi_uuid = yield self.fetch_image(
+ vdi_uuid = yield self._fetch_image(
instance.datamodel['image_id'], user, project, True)
- kernel = yield self.fetch_image(
+ kernel = yield self._fetch_image(
instance.datamodel['kernel_id'], user, project, False)
- ramdisk = yield self.fetch_image(
+ ramdisk = yield self._fetch_image(
instance.datamodel['ramdisk_id'], user, project, False)
- vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)
+ vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
- vm_ref = yield self.create_vm(instance, kernel, ramdisk)
- yield self.create_vbd(vm_ref, vdi_ref, 0, True)
+ vm_ref = yield self._create_vm(instance, kernel, ramdisk)
+ yield self._create_vbd(vm_ref, vdi_ref, 0, True)
if network_ref:
yield self._create_vif(vm_ref, network_ref, mac_address)
- yield self._conn.xenapi.VM.start(vm_ref, False, False)
+ logging.debug('Starting VM %s...', vm_ref)
+ yield self._call_xenapi('VM.start', vm_ref, False, False)
+ logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
- def create_vm(self, instance, kernel, ramdisk):
+ @defer.inlineCallbacks
+ def _create_vm(self, instance, kernel, ramdisk):
+ """Create a VM record. Returns a Deferred that gives the new
+ VM reference."""
+
mem = str(long(instance.datamodel['memory_kb']) * 1024)
vcpus = str(instance.datamodel['vcpus'])
rec = {
@@ -152,11 +180,15 @@ class XenAPIConnection(object):
'other_config': {},
}
logging.debug('Created VM %s...', instance.name)
- vm_ref = self._conn.xenapi.VM.create(rec)
+ vm_ref = yield self._call_xenapi('VM.create', rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- return vm_ref
+ defer.returnValue(vm_ref)
- def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
+ @defer.inlineCallbacks
+ def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
+ """Create a VBD record. Returns a Deferred that gives the new
+ VBD reference."""
+
vbd_rec = {}
vbd_rec['VM'] = vm_ref
vbd_rec['VDI'] = vdi_ref
@@ -171,12 +203,16 @@ class XenAPIConnection(object):
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)
+ vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
- return vbd_ref
+ defer.returnValue(vbd_ref)
+ @defer.inlineCallbacks
def _create_vif(self, vm_ref, network_ref, mac_address):
+ """Create a VIF record. Returns a Deferred that gives the new
+ VIF reference."""
+
vif_rec = {}
vif_rec['device'] = '0'
vif_rec['network']= network_ref
@@ -188,25 +224,29 @@ class XenAPIConnection(object):
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
- vif_ref = self._conn.xenapi.VIF.create(vif_rec)
+ vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
- return vif_ref
+ defer.returnValue(vif_ref)
+ @defer.inlineCallbacks
def _find_network_with_bridge(self, bridge):
expr = 'field "bridge" = "%s"' % bridge
- networks = self._conn.xenapi.network.get_all_records_where(expr)
+ networks = yield self._call_xenapi('network.get_all_records_where',
+ expr)
if len(networks) == 1:
- return networks.keys()[0]
+ defer.returnValue(networks.keys()[0])
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:
raise Exception('Found no network for bridge %s' % bridge)
- def fetch_image(self, image, user, project, use_sr):
+ @defer.inlineCallbacks
+ def _fetch_image(self, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
- its kernel and ramdisk (if external kernels are being used)."""
+ its kernel and ramdisk (if external kernels are being used).
+ Returns a Deferred that gives the new VDI UUID."""
url = images.image_url(image)
access = AuthManager().get_access_key(user, project)
@@ -218,22 +258,28 @@ class XenAPIConnection(object):
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
- return self._call_plugin('objectstore', fn, args)
+ task = yield self._async_call_plugin('objectstore', fn, args)
+ uuid = yield self._wait_for_task(task)
+ defer.returnValue(uuid)
+ @defer.inlineCallbacks
def reboot(self, instance):
- vm = self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
- yield self._conn.xenapi.VM.clean_reboot(vm)
+ task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
+ yield self._wait_for_task(task)
+ @defer.inlineCallbacks
def destroy(self, instance):
- vm = self.lookup(instance.name)
+ vm = yield self._lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
- yield self._conn.xenapi.VM.destroy(vm)
+ task = yield self._call_xenapi('Async.VM.destroy', vm)
+ yield self._wait_for_task(task)
def get_info(self, instance_id):
- vm = self.lookup(instance_id)
+ vm = self._lookup_blocking(instance_id)
if vm is None:
raise Exception('instance not present %s' % instance_id)
rec = self._conn.xenapi.VM.get_record(vm)
@@ -243,7 +289,11 @@ class XenAPIConnection(object):
'num_cpu': rec['VCPUs_max'],
'cpu_time': 0}
- def lookup(self, i):
+ @utils.deferredToThread
+ def _lookup(self, i):
+ return self._lookup_blocking(i)
+
+ def _lookup_blocking(self, i):
vms = self._conn.xenapi.VM.get_by_name_label(i)
n = len(vms)
if n == 0:
@@ -253,9 +303,52 @@ class XenAPIConnection(object):
else:
return vms[0]
- def _call_plugin(self, plugin, fn, args):
+ def _wait_for_task(self, task):
+ """Return a Deferred that will give the result of the given task.
+ The task is polled until it completes."""
+ d = defer.Deferred()
+ reactor.callLater(0, self._poll_task, task, d)
+ return d
+
+ @utils.deferredToThread
+ def _poll_task(self, task, deferred):
+ """Poll the given XenAPI task, and fire the given Deferred if we
+ get a result."""
+ try:
+ #logging.debug('Polling task %s...', task)
+ status = self._conn.xenapi.task.get_status(task)
+ if status == 'pending':
+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
+ self._poll_task, task, deferred)
+ elif status == 'success':
+ result = self._conn.xenapi.task.get_result(task)
+ logging.info('Task %s status: success. %s', task, result)
+ deferred.callback(_parse_xmlrpc_value(result))
+ else:
+ error_info = self._conn.xenapi.task.get_error_info(task)
+ logging.warn('Task %s status: %s. %s', task, status,
+ error_info)
+ deferred.errback(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
+ except Exception, exn:
+ logging.warn(exn)
+ deferred.errback(exn)
+
+ @utils.deferredToThread
+ def _call_xenapi(self, method, *args):
+ """Call the specified XenAPI method on a background thread. Returns
+ a Deferred for the result."""
+ f = self._conn.xenapi
+ for m in method.split('.'):
+ f = f.__getattr__(m)
+ return f(*args)
+
+ @utils.deferredToThread
+ def _async_call_plugin(self, plugin, fn, args):
+ """Call Async.host.call_plugin on a background thread. Returns a
+ Deferred with the task reference."""
return _unwrap_plugin_exceptions(
- self._conn.xenapi.host.call_plugin,
+ self._conn.xenapi.Async.host.call_plugin,
self._get_xenapi_host(), plugin, fn, args)
def _get_xenapi_host(self):
@@ -281,3 +374,15 @@ def _unwrap_plugin_exceptions(func, *args, **kwargs):
except xmlrpclib.ProtocolError, exn:
logging.debug("Got exception: %s", exn)
raise
+
+
+def _parse_xmlrpc_value(val):
+ """Parse the given value as if it were an XML-RPC value. This is
+ sometimes used as the format for the task.result field."""
+ if not val:
+ return val
+ x = xmlrpclib.loads(
+ '<?xml version="1.0"?><methodResponse><params><param>' +
+ val +
+ '</param></params></methodResponse>')
+ return x[0][0]