summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndy Smith <code@term.ie>2010-10-25 03:45:19 +0900
committerAndy Smith <code@term.ie>2010-10-25 03:45:19 +0900
commit2337fab0979b72bbc7e7730e94518a0e835a2751 (patch)
tree38b6526f52095d34f321fe68b6ad40757fcb8196
parent81e8c5256c1e52326b6b64cf237128364d1bcb22 (diff)
downloadnova-2337fab0979b72bbc7e7730e94518a0e835a2751.tar.gz
nova-2337fab0979b72bbc7e7730e94518a0e835a2751.tar.xz
nova-2337fab0979b72bbc7e7730e94518a0e835a2751.zip
part way through porting the codebase off of twisted
this provides a very basic eventlet-based service replacement for the twistd-based services, a replacement for task.LoopingCall also adds nova-combined with the goal of running a single service when doing local testing and dev
-rwxr-xr-xbin/nova-combined61
-rwxr-xr-xbin/nova-compute12
-rwxr-xr-xbin/nova-network14
-rw-r--r--nova/compute/disk.py66
-rw-r--r--nova/compute/manager.py28
-rw-r--r--nova/flags.py7
-rw-r--r--nova/manager.py1
-rw-r--r--nova/network/manager.py4
-rw-r--r--nova/server.py6
-rw-r--r--nova/service_eventlet.py288
-rw-r--r--nova/utils.py36
-rw-r--r--nova/virt/fake.py6
-rw-r--r--nova/virt/libvirt_conn.py172
-rw-r--r--nova/virt/xenapi.py103
14 files changed, 574 insertions, 230 deletions
diff --git a/bin/nova-combined b/bin/nova-combined
new file mode 100755
index 000000000..65865acd9
--- /dev/null
+++ b/bin/nova-combined
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+ Twistd daemon for the nova compute nodes.
+"""
+
+import eventlet
+eventlet.monkey_patch()
+
+import os
+import sys
+
+from eventlet import greenthread
+
+# If ../nova/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+ os.pardir,
+ os.pardir))
+if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
+ sys.path.insert(0, possible_topdir)
+
+from nova import api
+from nova import flags
+from nova import service_eventlet
+from nova import wsgi
+
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer('api_port', 8773, 'API port')
+
+
+if __name__ == '__main__':
+ FLAGS(sys.argv)
+
+ compute = service_eventlet.Service.create(binary='nova-compute')
+ network = service_eventlet.Service.create(binary='nova-network')
+ volume = service_eventlet.Service.create(binary='nova-volume')
+ scheduler = service_eventlet.Service.create(binary='nova-scheduler')
+ #objectstore = service_eventlet.Service.create(binary='nova-objectstore')
+
+ service_eventlet.serve(compute, network, volume, scheduler)
+ wsgi.run_server(api.API(), FLAGS.api_port)
+
diff --git a/bin/nova-compute b/bin/nova-compute
index 1724e9659..600fbb897 100755
--- a/bin/nova-compute
+++ b/bin/nova-compute
@@ -21,6 +21,9 @@
Twistd daemon for the nova compute nodes.
"""
+import eventlet
+eventlet.monkey_patch()
+
import os
import sys
@@ -32,12 +35,7 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
-from nova import service
-from nova import twistd
-
+from nova import service_eventlet
if __name__ == '__main__':
- twistd.serve(__file__)
-
-if __name__ == '__builtin__':
- application = service.Service.create() # pylint: disable=C0103
+ service_eventlet.serve()
diff --git a/bin/nova-network b/bin/nova-network
index fa88aeb47..600fbb897 100755
--- a/bin/nova-network
+++ b/bin/nova-network
@@ -18,9 +18,12 @@
# under the License.
"""
- Twistd daemon for the nova network nodes.
+ Twistd daemon for the nova compute nodes.
"""
+import eventlet
+eventlet.monkey_patch()
+
import os
import sys
@@ -32,12 +35,7 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
-from nova import service
-from nova import twistd
-
+from nova import service_eventlet
if __name__ == '__main__':
- twistd.serve(__file__)
-
-if __name__ == '__builtin__':
- application = service.Service.create() # pylint: disable-msg=C0103
+ service_eventlet.serve()
diff --git a/nova/compute/disk.py b/nova/compute/disk.py
index e362b4507..ad4c2c092 100644
--- a/nova/compute/disk.py
+++ b/nova/compute/disk.py
@@ -25,8 +25,6 @@ import logging
import os
import tempfile
-from twisted.internet import defer
-
from nova import exception
from nova import flags
@@ -38,7 +36,6 @@ flags.DEFINE_integer('block_size', 1024 * 1024 * 256,
'block_size to use for dd')
-@defer.inlineCallbacks
def partition(infile, outfile, local_bytes=0, resize=True,
local_type='ext2', execute=None):
"""Takes a single partition represented by infile and writes a bootable
@@ -60,10 +57,10 @@ def partition(infile, outfile, local_bytes=0, resize=True,
file_size = os.path.getsize(infile)
if resize and file_size < FLAGS.minimum_root_size:
last_sector = FLAGS.minimum_root_size / sector_size - 1
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (infile, last_sector, sector_size))
- yield execute('e2fsck -fp %s' % infile, check_exit_code=False)
- yield execute('resize2fs %s' % infile)
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (infile, last_sector, sector_size))
+ execute('e2fsck -fp %s' % infile, check_exit_code=False)
+ execute('resize2fs %s' % infile)
file_size = FLAGS.minimum_root_size
elif file_size % sector_size != 0:
logging.warn("Input partition size not evenly divisible by"
@@ -82,30 +79,29 @@ def partition(infile, outfile, local_bytes=0, resize=True,
last_sector = local_last # e
# create an empty file
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, mbr_last, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, mbr_last, sector_size))
# make mbr partition
- yield execute('parted --script %s mklabel msdos' % outfile)
+ execute('parted --script %s mklabel msdos' % outfile)
# append primary file
- yield execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
- % (infile, outfile, FLAGS.block_size))
+ execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
+ % (infile, outfile, FLAGS.block_size))
# make primary partition
- yield execute('parted --script %s mkpart primary %ds %ds'
- % (outfile, primary_first, primary_last))
+ execute('parted --script %s mkpart primary %ds %ds'
+ % (outfile, primary_first, primary_last))
if local_bytes > 0:
# make the file bigger
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, last_sector, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, last_sector, sector_size))
# make and format local partition
- yield execute('parted --script %s mkpartfs primary %s %ds %ds'
- % (outfile, local_type, local_first, local_last))
+ execute('parted --script %s mkpartfs primary %s %ds %ds'
+ % (outfile, local_type, local_first, local_last))
-@defer.inlineCallbacks
def inject_data(image, key=None, net=None, partition=None, execute=None):
"""Injects a ssh key and optionally net data into a disk image.
@@ -115,26 +111,26 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
If partition is not specified it mounts the image as a single partition.
"""
- out, err = yield execute('sudo losetup -f --show %s' % image)
+ out, err = execute('sudo losetup -f --show %s' % image)
if err:
raise exception.Error('Could not attach image to loopback: %s' % err)
device = out.strip()
try:
if not partition is None:
# create partition
- out, err = yield execute('sudo kpartx -a %s' % device)
+ out, err = execute('sudo kpartx -a %s' % device)
if err:
raise exception.Error('Failed to load partition: %s' % err)
mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1],
partition)
else:
mapped_device = device
- out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
+ out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
tmpdir = tempfile.mkdtemp()
try:
# mount loopback to dir
- out, err = yield execute(
+ out, err = execute(
'sudo mount %s %s' % (mapped_device, tmpdir))
if err:
raise exception.Error('Failed to mount filesystem: %s' % err)
@@ -142,35 +138,33 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
try:
if key:
# inject key file
- yield _inject_key_into_fs(key, tmpdir, execute=execute)
+ _inject_key_into_fs(key, tmpdir, execute=execute)
if net:
- yield _inject_net_into_fs(net, tmpdir, execute=execute)
+ _inject_net_into_fs(net, tmpdir, execute=execute)
finally:
# unmount device
- yield execute('sudo umount %s' % mapped_device)
+ execute('sudo umount %s' % mapped_device)
finally:
# remove temporary directory
- yield execute('rmdir %s' % tmpdir)
+ execute('rmdir %s' % tmpdir)
if not partition is None:
# remove partitions
- yield execute('sudo kpartx -d %s' % device)
+ execute('sudo kpartx -d %s' % device)
finally:
# remove loopback
- yield execute('sudo losetup -d %s' % device)
+ execute('sudo losetup -d %s' % device)
-@defer.inlineCallbacks
def _inject_key_into_fs(key, fs, execute=None):
sshdir = os.path.join(os.path.join(fs, 'root'), '.ssh')
- yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
- yield execute('sudo chown root %s' % sshdir)
- yield execute('sudo chmod 700 %s' % sshdir)
+ execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
+ execute('sudo chown root %s' % sshdir)
+ execute('sudo chmod 700 %s' % sshdir)
keyfile = os.path.join(sshdir, 'authorized_keys')
- yield execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
+ execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
-@defer.inlineCallbacks
def _inject_net_into_fs(net, fs, execute=None):
netfile = os.path.join(os.path.join(os.path.join(
fs, 'etc'), 'network'), 'interfaces')
- yield execute('sudo tee %s' % netfile, net)
+ execute('sudo tee %s' % netfile, net)
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 523bb8893..a105a1dd0 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -25,8 +25,6 @@ import datetime
import logging
import os
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import manager
@@ -54,7 +52,7 @@ class ComputeManager(manager.Manager):
self.network_manager = utils.import_object(FLAGS.network_manager)
self.volume_manager = utils.import_object(FLAGS.volume_manager)
super(ComputeManager, self).__init__(*args, **kwargs)
-
+
def _update_state(self, context, instance_id):
"""Update the state of an instance from the driver info"""
# FIXME(ja): include other fields from state?
@@ -62,12 +60,10 @@ class ComputeManager(manager.Manager):
state = self.driver.get_info(instance_ref.name)['state']
self.db.instance_set_state(context, instance_id, state)
- @defer.inlineCallbacks
@exception.wrap_exception
def refresh_security_group(self, context, security_group_id, **_kwargs):
- yield self.driver.refresh_security_group(security_group_id)
+ self.driver.refresh_security_group(security_group_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def run_instance(self, context, instance_id, **_kwargs):
"""Launch a new instance with specified options."""
@@ -89,7 +85,7 @@ class ComputeManager(manager.Manager):
'spawning')
try:
- yield self.driver.spawn(instance_ref)
+ self.driver.spawn(instance_ref)
now = datetime.datetime.utcnow()
self.db.instance_update(context,
instance_id,
@@ -103,7 +99,6 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
@@ -116,12 +111,11 @@ class ComputeManager(manager.Manager):
raise exception.Error('trying to destroy already destroyed'
' instance: %s' % instance_id)
- yield self.driver.destroy(instance_ref)
+ self.driver.destroy(instance_ref)
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
@@ -142,7 +136,7 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'rebooting')
- yield self.driver.reboot(instance_ref)
+ self.driver.reboot(instance_ref)
self._update_state(context, instance_id)
@exception.wrap_exception
@@ -154,7 +148,6 @@ class ComputeManager(manager.Manager):
return self.driver.get_console_output(instance_ref)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
@@ -164,13 +157,12 @@ class ComputeManager(manager.Manager):
instance_ref = self.db.instance_get(context, instance_id)
dev_path = yield self.volume_manager.setup_compute_volume(context,
volume_id)
- yield self.driver.attach_volume(instance_ref['ec2_id'],
- dev_path,
- mountpoint)
+ self.driver.attach_volume(instance_ref['ec2_id'],
+ dev_path,
+ mountpoint)
self.db.volume_attached(context, volume_id, instance_id, mountpoint)
defer.returnValue(True)
- @defer.inlineCallbacks
@exception.wrap_exception
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
@@ -180,7 +172,7 @@ class ComputeManager(manager.Manager):
volume_id)
instance_ref = self.db.instance_get(context, instance_id)
volume_ref = self.db.volume_get(context, volume_id)
- yield self.driver.detach_volume(instance_ref['ec2_id'],
- volume_ref['mountpoint'])
+ self.driver.detach_volume(instance_ref['ec2_id'],
+ volume_ref['mountpoint'])
self.db.volume_detached(context, volume_id)
defer.returnValue(True)
diff --git a/nova/flags.py b/nova/flags.py
index f3b0384ad..da9987700 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -139,6 +139,8 @@ class FlagValues(gflags.FlagValues):
FLAGS = FlagValues()
+gflags.FLAGS = FLAGS
+
def _wrapper(func):
def _wrapped(*args, **kw):
@@ -159,6 +161,11 @@ DEFINE_list = _wrapper(gflags.DEFINE_list)
DEFINE_spaceseplist = _wrapper(gflags.DEFINE_spaceseplist)
DEFINE_multistring = _wrapper(gflags.DEFINE_multistring)
DEFINE_multi_int = _wrapper(gflags.DEFINE_multi_int)
+DEFINE_flag = _wrapper(gflags.DEFINE_flag)
+
+HelpFlag = gflags.HelpFlag
+HelpshortFlag = gflags.HelpshortFlag
+HelpXMLFlag = gflags.HelpXMLFlag
def DECLARE(name, module_string, flag_values=FLAGS):
diff --git a/nova/manager.py b/nova/manager.py
index 4244b2db4..994d6e7af 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -39,7 +39,6 @@ class Manager(object):
db_driver = FLAGS.db_driver
self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103
- @defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval"""
yield
diff --git a/nova/network/manager.py b/nova/network/manager.py
index fddb77663..37a15ae05 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -25,7 +25,6 @@ import logging
import math
import IPy
-from twisted.internet import defer
from nova import context
from nova import db
@@ -315,10 +314,9 @@ class FlatDHCPManager(NetworkManager):
class VlanManager(NetworkManager):
"""Vlan network with dhcp"""
- @defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval"""
- yield super(VlanManager, self).periodic_tasks(context)
+ super(VlanManager, self).periodic_tasks(context)
now = datetime.datetime.utcnow()
timeout = FLAGS.fixed_ip_disassociate_timeout
time = now - datetime.timedelta(seconds=timeout)
diff --git a/nova/server.py b/nova/server.py
index cb424caa1..fba340a54 100644
--- a/nova/server.py
+++ b/nova/server.py
@@ -135,9 +135,9 @@ def daemonize(args, name, main):
with daemon.DaemonContext(
detach_process=FLAGS.daemonize,
working_directory=FLAGS.working_directory,
- pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile,
- acquire_timeout=1,
- threaded=False),
+ #pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile,
+ # acquire_timeout=1,
+ # threaded=False),
stdin=stdin,
stdout=stdout,
stderr=stderr,
diff --git a/nova/service_eventlet.py b/nova/service_eventlet.py
new file mode 100644
index 000000000..eac45a981
--- /dev/null
+++ b/nova/service_eventlet.py
@@ -0,0 +1,288 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Generic Node baseclass for all workers that run on hosts
+"""
+
+import inspect
+import logging
+import os
+import sys
+
+from eventlet import event
+from eventlet import greenthread
+from eventlet import greenpool
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova import rpc
+from nova import utils
+
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer('report_interval', 10,
+ 'seconds between nodes reporting state to datastore',
+ lower_bound=1)
+
+flags.DEFINE_integer('periodic_interval', 60,
+ 'seconds between running periodic tasks',
+ lower_bound=1)
+
+flags.DEFINE_string('pidfile', None,
+ 'pidfile to use for this service')
+
+
+flags.DEFINE_flag(flags.HelpFlag())
+flags.DEFINE_flag(flags.HelpshortFlag())
+flags.DEFINE_flag(flags.HelpXMLFlag())
+
+
+class Service(object):
+ """Base class for workers that run on hosts."""
+
+ def __init__(self, host, binary, topic, manager, report_interval=None,
+ periodic_interval=None, *args, **kwargs):
+ self.host = host
+ self.binary = binary
+ self.topic = topic
+ self.manager_class_name = manager
+ self.report_interval = report_interval
+ self.periodic_interval = periodic_interval
+ super(Service, self).__init__(*args, **kwargs)
+ self.saved_args, self.saved_kwargs = args, kwargs
+
+ def start(self):
+ manager_class = utils.import_class(self.manager_class_name)
+ self.manager = manager_class(host=self.host, *self.saved_args,
+ **self.saved_kwargs)
+ self.manager.init_host()
+ self.model_disconnected = False
+ ctxt = context.get_admin_context()
+ try:
+ service_ref = db.service_get_by_args(ctxt,
+ self.host,
+ self.binary)
+ self.service_id = service_ref['id']
+ except exception.NotFound:
+ self._create_service_ref(ctxt)
+
+ conn1 = rpc.Connection.instance(new=True)
+ conn2 = rpc.Connection.instance(new=True)
+ if self.report_interval:
+ consumer_all = rpc.AdapterConsumer(
+ connection=conn1,
+ topic=self.topic,
+ proxy=self)
+ consumer_node = rpc.AdapterConsumer(
+ connection=conn2,
+ topic='%s.%s' % (self.topic, self.host),
+ proxy=self)
+
+ consumer_all.attach_to_eventlet()
+ consumer_node.attach_to_eventlet()
+
+ pulse = utils.LoopingCall(self.report_state)
+ pulse.start(interval=self.report_interval, now=False)
+
+ if self.periodic_interval:
+ pulse = utils.LoopingCall(self.periodic_tasks)
+ pulse.start(interval=self.periodic_interval, now=False)
+
+ def _create_service_ref(self, context):
+ service_ref = db.service_create(context,
+ {'host': self.host,
+ 'binary': self.binary,
+ 'topic': self.topic,
+ 'report_count': 0})
+ self.service_id = service_ref['id']
+
+ def __getattr__(self, key):
+ manager = self.__dict__.get('manager', None)
+ return getattr(manager, key)
+
+ @classmethod
+ def create(cls,
+ host=None,
+ binary=None,
+ topic=None,
+ manager=None,
+ report_interval=None,
+ periodic_interval=None):
+ """Instantiates class and passes back application object.
+
+ Args:
+ host, defaults to FLAGS.host
+ binary, defaults to basename of executable
+ topic, defaults to bin_name - "nova-" part
+ manager, defaults to FLAGS.<topic>_manager
+ report_interval, defaults to FLAGS.report_interval
+ periodic_interval, defaults to FLAGS.periodic_interval
+ """
+ if not host:
+ host = FLAGS.host
+ if not binary:
+ binary = os.path.basename(inspect.stack()[-1][1])
+ if not topic:
+ topic = binary.rpartition("nova-")[2]
+ if not manager:
+ manager = FLAGS.get('%s_manager' % topic, None)
+ if not report_interval:
+ report_interval = FLAGS.report_interval
+ if not periodic_interval:
+ periodic_interval = FLAGS.periodic_interval
+ logging.warn("Starting %s node", topic)
+ service_obj = cls(host, binary, topic, manager,
+ report_interval, periodic_interval)
+
+ return service_obj
+
+ def kill(self):
+ """Destroy the service object in the datastore"""
+ try:
+ db.service_destroy(context.get_admin_context(), self.service_id)
+ except exception.NotFound:
+ logging.warn("Service killed that has no database entry")
+
+ def periodic_tasks(self):
+ """Tasks to be run at a periodic interval"""
+ self.manager.periodic_tasks(context.get_admin_context())
+
+ def report_state(self):
+ """Update the state of this service in the datastore."""
+ ctxt = context.get_admin_context()
+ try:
+ try:
+ service_ref = db.service_get(ctxt, self.service_id)
+ except exception.NotFound:
+ logging.debug("The service database object disappeared, "
+ "Recreating it.")
+ self._create_service_ref(ctxt)
+ service_ref = db.service_get(ctxt, self.service_id)
+
+ db.service_update(ctxt,
+ self.service_id,
+ {'report_count': service_ref['report_count'] + 1})
+
+ # TODO(termie): make this pattern be more elegant.
+ if getattr(self, "model_disconnected", False):
+ self.model_disconnected = False
+ logging.error("Recovered model server connection!")
+
+ # TODO(vish): this should probably only catch connection errors
+ except Exception: # pylint: disable-msg=W0702
+ if not getattr(self, "model_disconnected", False):
+ self.model_disconnected = True
+ logging.exception("model server went away")
+
+
+def stop(pidfile):
+ """
+ Stop the daemon
+ """
+ # Get the pid from the pidfile
+ try:
+ pf = file(pidfile, 'r')
+ pid = int(pf.read().strip())
+ pf.close()
+ except IOError:
+ pid = None
+
+ if not pid:
+ message = "pidfile %s does not exist. Daemon not running?\n"
+ sys.stderr.write(message % pidfile)
+ # Not an error in a restart
+ return
+
+ # Try killing the daemon process
+ try:
+ while 1:
+ os.kill(pid, signal.SIGKILL)
+ time.sleep(0.1)
+ except OSError, err:
+ err = str(err)
+ if err.find("No such process") > 0:
+ if os.path.exists(pidfile):
+ os.remove(pidfile)
+ else:
+ print str(err)
+ sys.exit(1)
+
+
+def serve(*services):
+ argv = FLAGS(sys.argv)
+
+ if not services:
+ services = [Service.create()]
+
+ name = '_'.join(x.binary for x in services)
+ logging.debug("Serving %s" % name)
+
+ logging.getLogger('amqplib').setLevel(logging.DEBUG)
+
+ if not FLAGS.pidfile:
+ FLAGS.pidfile = '%s.pid' % name
+ # NOTE(vish): if we're running nodaemon, redirect the log to stdout
+ #if FLAGS.nodaemon and not FLAGS.logfile:
+ # FLAGS.logfile = "-"
+ #if not FLAGS.logfile:
+ # FLAGS.logfile = '%s.log' % name
+ #if not FLAGS.prefix:
+ # FLAGS.prefix = name
+ #elif FLAGS.prefix.endswith('twisted'):
+ # FLAGS.prefix = FLAGS.prefix.replace('twisted', name)
+
+ action = 'start'
+ if len(argv) > 1:
+ action = argv.pop()
+
+ if action == 'stop':
+ stop(FLAGS.pidfile)
+ sys.exit()
+ elif action == 'restart':
+ stop(FLAGS.pidfile)
+ elif action == 'start':
+ pass
+ else:
+ print 'usage: %s [options] [start|stop|restart]' % argv[0]
+ sys.exit(1)
+
+ #formatter = logging.Formatter(
+ # '(%(name)s): %(levelname)s %(message)s')
+ #handler = logging.StreamHandler()
+ #handler.setFormatter(formatter)
+ #logging.getLogger().addHandler(handler)
+
+ if FLAGS.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ else:
+ logging.getLogger().setLevel(logging.WARNING)
+
+ logging.debug("Full set of FLAGS:")
+ for flag in FLAGS:
+ logging.debug("%s : %s" % (flag, FLAGS.get(flag, None)))
+
+ for x in services:
+ x.start()
+
+ #while True:
+ # greenthread.sleep(5)
+
+
diff --git a/nova/utils.py b/nova/utils.py
index 7683fc9f4..a219e47bf 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -29,6 +29,9 @@ import subprocess
import socket
import sys
+from eventlet import event
+from eventlet import greenthread
+
from twisted.internet.threads import deferToThread
from nova import exception
@@ -212,3 +215,36 @@ def deferredToThread(f):
def g(*args, **kwargs):
return deferToThread(f, *args, **kwargs)
return g
+
+
+class LoopingCall(object):
+ def __init__(self, f=None, *args, **kw):
+ self.args = args
+ self.kw = kw
+ self.f = f
+ self._running = False
+
+ def start(self, interval, now=True):
+ self._running = True
+ done = event.Event()
+ def _inner():
+ if not now:
+ greenthread.sleep(interval)
+ try:
+ while self._running:
+ self.f(*self.args, **self.kw)
+ greenthread.sleep(interval)
+ except Exception:
+ logging.exception('hhmm')
+ done.send_exception(*sys.exc_info())
+ return
+
+ done.send(True)
+
+ greenthread.spawn(_inner)
+ return done
+
+ def stop(self):
+ self._running = False
+
+
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index eaa2261f5..0684a0877 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -24,8 +24,6 @@ This module also documents the semantics of real hypervisor connections.
import logging
-from twisted.internet import defer
-
from nova.compute import power_state
@@ -105,7 +103,6 @@ class FakeConnection(object):
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
- return defer.succeed(None)
def reboot(self, instance):
"""
@@ -117,7 +114,7 @@ class FakeConnection(object):
The work will be done asynchronously. This function returns a
Deferred that allows the caller to detect when it is complete.
"""
- return defer.succeed(None)
+ pass
def destroy(self, instance):
"""
@@ -130,7 +127,6 @@ class FakeConnection(object):
Deferred that allows the caller to detect when it is complete.
"""
del self.instances[instance.name]
- return defer.succeed(None)
def attach_volume(self, instance_name, device_path, mountpoint):
"""Attach the disk at device_path to the instance at mountpoint"""
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 509ed97a0..9ca97bd1b 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -25,10 +25,10 @@ import logging
import os
import shutil
+from eventlet import event
+from eventlet import tpool
+
import IPy
-from twisted.internet import defer
-from twisted.internet import task
-from twisted.internet import threads
from nova import context
from nova import db
@@ -145,13 +145,12 @@ class LibvirtConnection(object):
except Exception as _err:
pass
# If the instance is already terminated, we're still happy
- d = defer.Deferred()
- d.addCallback(lambda _: self._cleanup(instance))
- # FIXME: What does this comment mean?
- # TODO(termie): short-circuit me for tests
- # WE'LL save this for when we do shutdown,
+
+ done = event.Event()
+
+ # We'll save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_shutdown():
try:
@@ -160,17 +159,26 @@ class LibvirtConnection(object):
instance['id'], state)
if state == power_state.SHUTDOWN:
timer.stop()
- d.callback(None)
except Exception:
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_shutdown
- timer.start(interval=0.5, now=True)
- return d
+ timer_done = timer.start(interval=0.5, now=True)
+
+ # NOTE(termie): this is strictly superfluous (we could put the
+ # cleanup code in the timer), but this emulates the
+ # previous model so I am keeping it around until
+ # everything has been vetted a bit
+ def _wait_for_timer():
+ timer_done.wait()
+ self._cleanup(instance)
+ done.send()
+
+ greenthread.spawn(_wait_for_time)
+ return done
def _cleanup(self, instance):
target = os.path.join(FLAGS.instances_path, instance['name'])
@@ -179,32 +187,28 @@ class LibvirtConnection(object):
if os.path.exists(target):
shutil.rmtree(target)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, instance_name, device_path, mountpoint):
- yield process.simple_execute("sudo virsh attach-disk %s %s %s" %
- (instance_name,
- device_path,
- mountpoint.rpartition('/dev/')[2]))
+ process.simple_execute("sudo virsh attach-disk %s %s %s" %
+ (instance_name,
+ device_path,
+ mountpoint.rpartition('/dev/')[2]))
- @defer.inlineCallbacks
@exception.wrap_exception
def detach_volume(self, instance_name, mountpoint):
# NOTE(vish): despite the documentation, virsh detach-disk just
# wants the device name without the leading /dev/
- yield process.simple_execute("sudo virsh detach-disk %s %s" %
- (instance_name,
- mountpoint.rpartition('/dev/')[2]))
+ process.simple_execute("sudo virsh detach-disk %s %s" %
+ (instance_name,
+ mountpoint.rpartition('/dev/')[2]))
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot(self, instance):
xml = self.to_xml(instance)
- yield self._conn.lookupByName(instance['name']).destroy()
- yield self._conn.createXML(xml, 0)
+ self._conn.lookupByName(instance['name']).destroy()
+ self._conn.createXML(xml, 0)
- d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_reboot():
try:
@@ -214,20 +218,16 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: rebooted', instance['name'])
timer.stop()
- d.callback(None)
except Exception, exn:
logging.error('_wait_for_reboot failed: %s', exn)
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_reboot
- timer.start(interval=0.5, now=True)
- yield d
+ return timer.start(interval=0.5, now=True)
- @defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
xml = self.to_xml(instance)
@@ -235,16 +235,12 @@ class LibvirtConnection(object):
instance['id'],
power_state.NOSTATE,
'launching')
- yield NWFilterFirewall(self._conn).\
- setup_nwfilters_for_instance(instance)
- yield self._create_image(instance, xml)
- yield self._conn.createXML(xml, 0)
- # TODO(termie): this should actually register
- # a callback to check for successful boot
+ NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance)
+ self._create_image(instance, xml)
+ self._conn.createXML(xml, 0)
logging.debug("instance %s: is running", instance['name'])
- local_d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_boot():
try:
@@ -254,7 +250,6 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: booted', instance['name'])
timer.stop()
- local_d.callback(None)
except:
logging.exception('instance %s: failed to boot',
instance['name'])
@@ -262,10 +257,9 @@ class LibvirtConnection(object):
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- local_d.callback(None)
+
timer.f = _wait_for_boot
- timer.start(interval=0.5, now=True)
- yield local_d
+ return timer.start(interval=0.5, now=True)
def _flush_xen_console(self, virsh_output):
logging.info('virsh said: %r' % (virsh_output,))
@@ -273,10 +267,9 @@ class LibvirtConnection(object):
if virsh_output.startswith('/dev/'):
logging.info('cool, it\'s a device')
- d = process.simple_execute("sudo dd if=%s iflag=nonblock" %
+ r = process.simple_execute("sudo dd if=%s iflag=nonblock" %
virsh_output, check_exit_code=False)
- d.addCallback(lambda r: r[0])
- return d
+ return r[0]
else:
return ''
@@ -296,21 +289,21 @@ class LibvirtConnection(object):
def get_console_output(self, instance):
console_log = os.path.join(FLAGS.instances_path, instance['name'],
'console.log')
- d = process.simple_execute('sudo chown %d %s' % (os.getuid(),
- console_log))
+
+ process.simple_execute('sudo chown %d %s' % (os.getuid(),
+ console_log))
+
if FLAGS.libvirt_type == 'xen':
- # Xen is spethial
- d.addCallback(lambda _:
- process.simple_execute("virsh ttyconsole %s" %
- instance['name']))
- d.addCallback(self._flush_xen_console)
- d.addCallback(self._append_to_file, console_log)
+ # Xen is special
+ virsh_output = process.simple_execute("virsh ttyconsole %s" %
+ instance['name'])
+ data = self._flush_xen_console(virsh_output)
+ fpath = self._append_to_file(data, console_log)
else:
- d.addCallback(lambda _: defer.succeed(console_log))
- d.addCallback(self._dump_file)
- return d
+ fpath = console_log
+
+ return self._dump_file(fpath)
- @defer.inlineCallbacks
def _create_image(self, inst, libvirt_xml):
# syntactic nicety
basepath = lambda fname='': os.path.join(FLAGS.instances_path,
@@ -318,8 +311,8 @@ class LibvirtConnection(object):
fname)
# ensure directories exist and are writable
- yield process.simple_execute('mkdir -p %s' % basepath())
- yield process.simple_execute('chmod 0777 %s' % basepath())
+ process.simple_execute('mkdir -p %s' % basepath())
+ process.simple_execute('chmod 0777 %s' % basepath())
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
@@ -335,19 +328,19 @@ class LibvirtConnection(object):
project = manager.AuthManager().get_project(inst['project_id'])
if not os.path.exists(basepath('disk')):
- yield images.fetch(inst.image_id, basepath('disk-raw'), user,
- project)
+ images.fetch(inst.image_id, basepath('disk-raw'), user,
+ project)
if not os.path.exists(basepath('kernel')):
- yield images.fetch(inst.kernel_id, basepath('kernel'), user,
- project)
+ images.fetch(inst.kernel_id, basepath('kernel'), user,
+ project)
if not os.path.exists(basepath('ramdisk')):
- yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user,
- project)
-
- execute = lambda cmd, process_input=None, check_exit_code=True: \
- process.simple_execute(cmd=cmd,
- process_input=process_input,
- check_exit_code=check_exit_code)
+ images.fetch(inst.ramdisk_id, basepath('ramdisk'), user,
+ project)
+
+ def execute(cmd, process_input=None, check_exit_code=True):
+ return process.simple_execute(cmd=cmd,
+ process_input=process_input,
+ check_exit_code=check_exit_code)
key = str(inst['key_data'])
net = None
@@ -369,23 +362,23 @@ class LibvirtConnection(object):
if net:
logging.info('instance %s: injecting net into image %s',
inst['name'], inst.image_id)
- yield disk.inject_data(basepath('disk-raw'), key, net,
- execute=execute)
+ disk.inject_data(basepath('disk-raw'), key, net,
+ execute=execute)
if os.path.exists(basepath('disk')):
- yield process.simple_execute('rm -f %s' % basepath('disk'))
+ process.simple_execute('rm -f %s' % basepath('disk'))
local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type]
['local_gb']
* 1024 * 1024 * 1024)
resize = inst['instance_type'] != 'm1.tiny'
- yield disk.partition(basepath('disk-raw'), basepath('disk'),
- local_bytes, resize, execute=execute)
+ disk.partition(basepath('disk-raw'), basepath('disk'),
+ local_bytes, resize, execute=execute)
if FLAGS.libvirt_type == 'uml':
- yield process.simple_execute('sudo chown root %s' %
- basepath('disk'))
+ process.simple_execute('sudo chown root %s' %
+ basepath('disk'))
def to_xml(self, instance):
# TODO(termie): cache?
@@ -637,15 +630,15 @@ class NWFilterFirewall(object):
def _define_filter(self, xml):
if callable(xml):
xml = xml()
- d = threads.deferToThread(self._conn.nwfilterDefineXML, xml)
- return d
+
+ # execute in a native thread and block until done
+ tpool.execute(self._conn.nwfilterDefineXML, xml)
@staticmethod
def _get_net_and_mask(cidr):
net = IPy.IP(cidr)
return str(net.net()), str(net.netmask())
- @defer.inlineCallbacks
def setup_nwfilters_for_instance(self, instance):
"""
Creates an NWFilter for the given instance. In the process,
@@ -653,10 +646,10 @@ class NWFilterFirewall(object):
the base filter are all in place.
"""
- yield self._define_filter(self.nova_base_ipv4_filter)
- yield self._define_filter(self.nova_base_ipv6_filter)
- yield self._define_filter(self.nova_dhcp_filter)
- yield self._define_filter(self.nova_base_filter)
+ self._define_filter(self.nova_base_ipv4_filter)
+ self._define_filter(self.nova_base_ipv6_filter)
+ self._define_filter(self.nova_dhcp_filter)
+ self._define_filter(self.nova_base_filter)
nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \
" <filterref filter='nova-base' />\n" % \
@@ -668,20 +661,19 @@ class NWFilterFirewall(object):
net, mask = self._get_net_and_mask(network_ref['cidr'])
project_filter = self.nova_project_filter(instance['project_id'],
net, mask)
- yield self._define_filter(project_filter)
+ self._define_filter(project_filter)
nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \
instance['project_id']
for security_group in instance.security_groups:
- yield self.ensure_security_group_filter(security_group['id'])
+ self.ensure_security_group_filter(security_group['id'])
nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \
security_group['id']
nwfilter_xml += "</filter>"
- yield self._define_filter(nwfilter_xml)
- return
+ self._define_filter(nwfilter_xml)
def ensure_security_group_filter(self, security_group_id):
return self._define_filter(
diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py
index a17e405ab..f997d01d7 100644
--- a/nova/virt/xenapi.py
+++ b/nova/virt/xenapi.py
@@ -36,11 +36,10 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import logging
+import sys
import xmlrpclib
-from twisted.internet import defer
-from twisted.internet import reactor
-from twisted.internet import task
+from eventlet import tpool
from nova import db
from nova import flags
@@ -110,36 +109,33 @@ class XenAPIConnection(object):
return [self._conn.xenapi.VM.get_name_label(vm) \
for vm in self._conn.xenapi.VM.get_all()]
- @defer.inlineCallbacks
def spawn(self, instance):
- vm = yield self._lookup(instance.name)
+ vm = self._lookup(instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
network = db.project_get_network(None, instance.project_id)
- network_ref = \
- yield self._find_network_with_bridge(network.bridge)
+ network_ref = self._find_network_with_bridge(network.bridge)
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
- vdi_uuid = yield self._fetch_image(
+ vdi_uuid = self._fetch_image(
instance.image_id, user, project, True)
- kernel = yield self._fetch_image(
+ kernel = self._fetch_image(
instance.kernel_id, user, project, False)
- ramdisk = yield self._fetch_image(
+ ramdisk = self._fetch_image(
instance.ramdisk_id, user, project, False)
- vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
+ vdi_ref = self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
- vm_ref = yield self._create_vm(instance, kernel, ramdisk)
- yield self._create_vbd(vm_ref, vdi_ref, 0, True)
+ vm_ref = self._create_vm(instance, kernel, ramdisk)
+ self._create_vbd(vm_ref, vdi_ref, 0, True)
if network_ref:
- yield self._create_vif(vm_ref, network_ref, instance.mac_address)
+ self._create_vif(vm_ref, network_ref, instance.mac_address)
logging.debug('Starting VM %s...', vm_ref)
- yield self._call_xenapi('VM.start', vm_ref, False, False)
+ self._call_xenapi('VM.start', vm_ref, False, False)
logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
- @defer.inlineCallbacks
def _create_vm(self, instance, kernel, ramdisk):
"""Create a VM record. Returns a Deferred that gives the new
VM reference."""
@@ -177,11 +173,10 @@ class XenAPIConnection(object):
'other_config': {},
}
logging.debug('Created VM %s...', instance.name)
- vm_ref = yield self._call_xenapi('VM.create', rec)
+ vm_ref = self._call_xenapi('VM.create', rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- defer.returnValue(vm_ref)
+ return vm_ref
- @defer.inlineCallbacks
def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
"""Create a VBD record. Returns a Deferred that gives the new
VBD reference."""
@@ -200,12 +195,11 @@ class XenAPIConnection(object):
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
+ vbd_ref = self._call_xenapi('VBD.create', vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
- defer.returnValue(vbd_ref)
+ return vbd_ref
- @defer.inlineCallbacks
def _create_vif(self, vm_ref, network_ref, mac_address):
"""Create a VIF record. Returns a Deferred that gives the new
VIF reference."""
@@ -221,24 +215,22 @@ class XenAPIConnection(object):
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
- vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
+ vif_ref = self._call_xenapi('VIF.create', vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
- defer.returnValue(vif_ref)
+ return vif_ref
- @defer.inlineCallbacks
def _find_network_with_bridge(self, bridge):
expr = 'field "bridge" = "%s"' % bridge
- networks = yield self._call_xenapi('network.get_all_records_where',
- expr)
+ networks = self._call_xenapi('network.get_all_records_where',
+ expr)
if len(networks) == 1:
- defer.returnValue(networks.keys()[0])
+ return networks.keys()[0]
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:
raise Exception('Found no network for bridge %s' % bridge)
- @defer.inlineCallbacks
def _fetch_image(self, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
@@ -255,33 +247,31 @@ class XenAPIConnection(object):
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
- task = yield self._async_call_plugin('objectstore', fn, args)
- uuid = yield self._wait_for_task(task)
- defer.returnValue(uuid)
+ task = self._async_call_plugin('objectstore', fn, args)
+ uuid = self._wait_for_task(task)
+ return uuid
- @defer.inlineCallbacks
def reboot(self, instance):
- vm = yield self._lookup(instance.name)
+ vm = self._lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
- task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
- yield self._wait_for_task(task)
+ task = self._call_xenapi('Async.VM.clean_reboot', vm)
+ self._wait_for_task(task)
- @defer.inlineCallbacks
def destroy(self, instance):
- vm = yield self._lookup(instance.name)
+ vm = self._lookup(instance.name)
if vm is None:
# Don't complain, just return. This lets us clean up instances
# that have already disappeared from the underlying platform.
- defer.returnValue(None)
+ return
try:
- task = yield self._call_xenapi('Async.VM.hard_shutdown', vm)
- yield self._wait_for_task(task)
+ task = self._call_xenapi('Async.VM.hard_shutdown', vm)
+ self._wait_for_task(task)
except Exception, exc:
logging.warn(exc)
try:
- task = yield self._call_xenapi('Async.VM.destroy', vm)
- yield self._wait_for_task(task)
+ task = self._call_xenapi('Async.VM.destroy', vm)
+ self._wait_for_task(task)
except Exception, exc:
logging.warn(exc)
@@ -299,7 +289,6 @@ class XenAPIConnection(object):
def get_console_output(self, instance):
return 'FAKE CONSOLE OUTPUT'
- @utils.deferredToThread
def _lookup(self, i):
return self._lookup_blocking(i)
@@ -316,35 +305,32 @@ class XenAPIConnection(object):
def _wait_for_task(self, task):
"""Return a Deferred that will give the result of the given task.
The task is polled until it completes."""
- d = defer.Deferred()
- reactor.callLater(0, self._poll_task, task, d)
- return d
- @utils.deferredToThread
- def _poll_task(self, task, deferred):
+ done = event.Event()
+ loop = utis.LoopingTask(self._poll_task, task, done)
+ loop.start(FLAGS.xenapi_task_poll_interval, now=True)
+ return done.wait()
+
+ def _poll_task(self, task, done):
"""Poll the given XenAPI task, and fire the given Deferred if we
get a result."""
try:
- #logging.debug('Polling task %s...', task)
status = self._conn.xenapi.task.get_status(task)
if status == 'pending':
- reactor.callLater(FLAGS.xenapi_task_poll_interval,
- self._poll_task, task, deferred)
+ return
elif status == 'success':
result = self._conn.xenapi.task.get_result(task)
logging.info('Task %s status: success. %s', task, result)
- deferred.callback(_parse_xmlrpc_value(result))
+ done.send(_parse_xmlrpc_value(result))
else:
error_info = self._conn.xenapi.task.get_error_info(task)
logging.warn('Task %s status: %s. %s', task, status,
error_info)
- deferred.errback(XenAPI.Failure(error_info))
- #logging.debug('Polling task %s done.', task)
+ done.send_exception(XenAPI.Failure(error_info))
except Exception, exc:
logging.warn(exc)
- deferred.errback(exc)
+ done.send_exception(*sys.exc_info())
- @utils.deferredToThread
def _call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread. Returns
a Deferred for the result."""
@@ -353,11 +339,10 @@ class XenAPIConnection(object):
f = f.__getattr__(m)
return f(*args)
- @utils.deferredToThread
def _async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread. Returns a
Deferred with the task reference."""
- return _unwrap_plugin_exceptions(
+ return tpool.execute(_unwrap_plugin_exceptions,
self._conn.xenapi.Async.host.call_plugin,
self._get_xenapi_host(), plugin, fn, args)