diff options
author | Andy Smith <code@term.ie> | 2010-10-25 03:45:19 +0900 |
---|---|---|
committer | Andy Smith <code@term.ie> | 2010-10-25 03:45:19 +0900 |
commit | 2337fab0979b72bbc7e7730e94518a0e835a2751 (patch) | |
tree | 38b6526f52095d34f321fe68b6ad40757fcb8196 | |
parent | 81e8c5256c1e52326b6b64cf237128364d1bcb22 (diff) | |
download | nova-2337fab0979b72bbc7e7730e94518a0e835a2751.tar.gz nova-2337fab0979b72bbc7e7730e94518a0e835a2751.tar.xz nova-2337fab0979b72bbc7e7730e94518a0e835a2751.zip |
part way through porting the codebase off of twisted
this provides a very basic eventlet-based service replacement for the
twistd-based services, a replacement for task.LoopingCall
also adds nova-combined with the goal of running a single service when
doing local testing and dev
-rwxr-xr-x | bin/nova-combined | 61 | ||||
-rwxr-xr-x | bin/nova-compute | 12 | ||||
-rwxr-xr-x | bin/nova-network | 14 | ||||
-rw-r--r-- | nova/compute/disk.py | 66 | ||||
-rw-r--r-- | nova/compute/manager.py | 28 | ||||
-rw-r--r-- | nova/flags.py | 7 | ||||
-rw-r--r-- | nova/manager.py | 1 | ||||
-rw-r--r-- | nova/network/manager.py | 4 | ||||
-rw-r--r-- | nova/server.py | 6 | ||||
-rw-r--r-- | nova/service_eventlet.py | 288 | ||||
-rw-r--r-- | nova/utils.py | 36 | ||||
-rw-r--r-- | nova/virt/fake.py | 6 | ||||
-rw-r--r-- | nova/virt/libvirt_conn.py | 172 | ||||
-rw-r--r-- | nova/virt/xenapi.py | 103 |
14 files changed, 574 insertions, 230 deletions
diff --git a/bin/nova-combined b/bin/nova-combined new file mode 100755 index 000000000..65865acd9 --- /dev/null +++ b/bin/nova-combined @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" + Twistd daemon for the nova compute nodes. +""" + +import eventlet +eventlet.monkey_patch() + +import os +import sys + +from eventlet import greenthread + +# If ../nova/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): + sys.path.insert(0, possible_topdir) + +from nova import api +from nova import flags +from nova import service_eventlet +from nova import wsgi + + +FLAGS = flags.FLAGS +flags.DEFINE_integer('api_port', 8773, 'API port') + + +if __name__ == '__main__': + FLAGS(sys.argv) + + compute = service_eventlet.Service.create(binary='nova-compute') + network = service_eventlet.Service.create(binary='nova-network') + volume = service_eventlet.Service.create(binary='nova-volume') + scheduler = service_eventlet.Service.create(binary='nova-scheduler') + #objectstore = service_eventlet.Service.create(binary='nova-objectstore') + + service_eventlet.serve(compute, network, volume, scheduler) + wsgi.run_server(api.API(), FLAGS.api_port) + diff --git a/bin/nova-compute b/bin/nova-compute index 1724e9659..600fbb897 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -21,6 +21,9 @@ Twistd daemon for the nova compute nodes. """ +import eventlet +eventlet.monkey_patch() + import os import sys @@ -32,12 +35,7 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): sys.path.insert(0, possible_topdir) -from nova import service -from nova import twistd - +from nova import service_eventlet if __name__ == '__main__': - twistd.serve(__file__) - -if __name__ == '__builtin__': - application = service.Service.create() # pylint: disable=C0103 + service_eventlet.serve() diff --git a/bin/nova-network b/bin/nova-network index fa88aeb47..600fbb897 100755 --- a/bin/nova-network +++ b/bin/nova-network @@ -18,9 +18,12 @@ # under the License. """ - Twistd daemon for the nova network nodes. + Twistd daemon for the nova compute nodes. """ +import eventlet +eventlet.monkey_patch() + import os import sys @@ -32,12 +35,7 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): sys.path.insert(0, possible_topdir) -from nova import service -from nova import twistd - +from nova import service_eventlet if __name__ == '__main__': - twistd.serve(__file__) - -if __name__ == '__builtin__': - application = service.Service.create() # pylint: disable-msg=C0103 + service_eventlet.serve() diff --git a/nova/compute/disk.py b/nova/compute/disk.py index e362b4507..ad4c2c092 100644 --- a/nova/compute/disk.py +++ b/nova/compute/disk.py @@ -25,8 +25,6 @@ import logging import os import tempfile -from twisted.internet import defer - from nova import exception from nova import flags @@ -38,7 +36,6 @@ flags.DEFINE_integer('block_size', 1024 * 1024 * 256, 'block_size to use for dd') -@defer.inlineCallbacks def partition(infile, outfile, local_bytes=0, resize=True, local_type='ext2', execute=None): """Takes a single partition represented by infile and writes a bootable @@ -60,10 +57,10 @@ def partition(infile, outfile, local_bytes=0, resize=True, file_size = os.path.getsize(infile) if resize and file_size < FLAGS.minimum_root_size: last_sector = FLAGS.minimum_root_size / sector_size - 1 - yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' - % (infile, last_sector, sector_size)) - yield execute('e2fsck -fp %s' % infile, check_exit_code=False) - yield execute('resize2fs %s' % infile) + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (infile, last_sector, sector_size)) + execute('e2fsck -fp %s' % infile, check_exit_code=False) + execute('resize2fs %s' % infile) file_size = FLAGS.minimum_root_size elif file_size % sector_size != 0: logging.warn("Input partition size not evenly divisible by" @@ -82,30 +79,29 @@ def partition(infile, outfile, local_bytes=0, resize=True, last_sector = local_last # e # create an empty file - yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' - % (outfile, mbr_last, sector_size)) + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (outfile, mbr_last, sector_size)) # make mbr partition - yield execute('parted --script %s mklabel msdos' % outfile) + execute('parted --script %s mklabel msdos' % outfile) # append primary file - yield execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append' - % (infile, outfile, FLAGS.block_size)) + execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append' + % (infile, outfile, FLAGS.block_size)) # make primary partition - yield execute('parted --script %s mkpart primary %ds %ds' - % (outfile, primary_first, primary_last)) + execute('parted --script %s mkpart primary %ds %ds' + % (outfile, primary_first, primary_last)) if local_bytes > 0: # make the file bigger - yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' - % (outfile, last_sector, sector_size)) + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (outfile, last_sector, sector_size)) # make and format local partition - yield execute('parted --script %s mkpartfs primary %s %ds %ds' - % (outfile, local_type, local_first, local_last)) + execute('parted --script %s mkpartfs primary %s %ds %ds' + % (outfile, local_type, local_first, local_last)) -@defer.inlineCallbacks def inject_data(image, key=None, net=None, partition=None, execute=None): """Injects a ssh key and optionally net data into a disk image. @@ -115,26 +111,26 @@ def inject_data(image, key=None, net=None, partition=None, execute=None): If partition is not specified it mounts the image as a single partition. """ - out, err = yield execute('sudo losetup -f --show %s' % image) + out, err = execute('sudo losetup -f --show %s' % image) if err: raise exception.Error('Could not attach image to loopback: %s' % err) device = out.strip() try: if not partition is None: # create partition - out, err = yield execute('sudo kpartx -a %s' % device) + out, err = execute('sudo kpartx -a %s' % device) if err: raise exception.Error('Failed to load partition: %s' % err) mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1], partition) else: mapped_device = device - out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device) + out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device) tmpdir = tempfile.mkdtemp() try: # mount loopback to dir - out, err = yield execute( + out, err = execute( 'sudo mount %s %s' % (mapped_device, tmpdir)) if err: raise exception.Error('Failed to mount filesystem: %s' % err) @@ -142,35 +138,33 @@ def inject_data(image, key=None, net=None, partition=None, execute=None): try: if key: # inject key file - yield _inject_key_into_fs(key, tmpdir, execute=execute) + _inject_key_into_fs(key, tmpdir, execute=execute) if net: - yield _inject_net_into_fs(net, tmpdir, execute=execute) + _inject_net_into_fs(net, tmpdir, execute=execute) finally: # unmount device - yield execute('sudo umount %s' % mapped_device) + execute('sudo umount %s' % mapped_device) finally: # remove temporary directory - yield execute('rmdir %s' % tmpdir) + execute('rmdir %s' % tmpdir) if not partition is None: # remove partitions - yield execute('sudo kpartx -d %s' % device) + execute('sudo kpartx -d %s' % device) finally: # remove loopback - yield execute('sudo losetup -d %s' % device) + execute('sudo losetup -d %s' % device) -@defer.inlineCallbacks def _inject_key_into_fs(key, fs, execute=None): sshdir = os.path.join(os.path.join(fs, 'root'), '.ssh') - yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter - yield execute('sudo chown root %s' % sshdir) - yield execute('sudo chmod 700 %s' % sshdir) + execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter + execute('sudo chown root %s' % sshdir) + execute('sudo chmod 700 %s' % sshdir) keyfile = os.path.join(sshdir, 'authorized_keys') - yield execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n') + execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n') -@defer.inlineCallbacks def _inject_net_into_fs(net, fs, execute=None): netfile = os.path.join(os.path.join(os.path.join( fs, 'etc'), 'network'), 'interfaces') - yield execute('sudo tee %s' % netfile, net) + execute('sudo tee %s' % netfile, net) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 523bb8893..a105a1dd0 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -25,8 +25,6 @@ import datetime import logging import os -from twisted.internet import defer - from nova import exception from nova import flags from nova import manager @@ -54,7 +52,7 @@ class ComputeManager(manager.Manager): self.network_manager = utils.import_object(FLAGS.network_manager) self.volume_manager = utils.import_object(FLAGS.volume_manager) super(ComputeManager, self).__init__(*args, **kwargs) - + def _update_state(self, context, instance_id): """Update the state of an instance from the driver info""" # FIXME(ja): include other fields from state? @@ -62,12 +60,10 @@ class ComputeManager(manager.Manager): state = self.driver.get_info(instance_ref.name)['state'] self.db.instance_set_state(context, instance_id, state) - @defer.inlineCallbacks @exception.wrap_exception def refresh_security_group(self, context, security_group_id, **_kwargs): - yield self.driver.refresh_security_group(security_group_id) + self.driver.refresh_security_group(security_group_id) - @defer.inlineCallbacks @exception.wrap_exception def run_instance(self, context, instance_id, **_kwargs): """Launch a new instance with specified options.""" @@ -89,7 +85,7 @@ class ComputeManager(manager.Manager): 'spawning') try: - yield self.driver.spawn(instance_ref) + self.driver.spawn(instance_ref) now = datetime.datetime.utcnow() self.db.instance_update(context, instance_id, @@ -103,7 +99,6 @@ class ComputeManager(manager.Manager): self._update_state(context, instance_id) - @defer.inlineCallbacks @exception.wrap_exception def terminate_instance(self, context, instance_id): """Terminate an instance on this machine.""" @@ -116,12 +111,11 @@ class ComputeManager(manager.Manager): raise exception.Error('trying to destroy already destroyed' ' instance: %s' % instance_id) - yield self.driver.destroy(instance_ref) + self.driver.destroy(instance_ref) # TODO(ja): should we keep it in a terminated state for a bit? self.db.instance_destroy(context, instance_id) - @defer.inlineCallbacks @exception.wrap_exception def reboot_instance(self, context, instance_id): """Reboot an instance on this server.""" @@ -142,7 +136,7 @@ class ComputeManager(manager.Manager): instance_id, power_state.NOSTATE, 'rebooting') - yield self.driver.reboot(instance_ref) + self.driver.reboot(instance_ref) self._update_state(context, instance_id) @exception.wrap_exception @@ -154,7 +148,6 @@ class ComputeManager(manager.Manager): return self.driver.get_console_output(instance_ref) - @defer.inlineCallbacks @exception.wrap_exception def attach_volume(self, context, instance_id, volume_id, mountpoint): """Attach a volume to an instance.""" @@ -164,13 +157,12 @@ class ComputeManager(manager.Manager): instance_ref = self.db.instance_get(context, instance_id) dev_path = yield self.volume_manager.setup_compute_volume(context, volume_id) - yield self.driver.attach_volume(instance_ref['ec2_id'], - dev_path, - mountpoint) + self.driver.attach_volume(instance_ref['ec2_id'], + dev_path, + mountpoint) self.db.volume_attached(context, volume_id, instance_id, mountpoint) defer.returnValue(True) - @defer.inlineCallbacks @exception.wrap_exception def detach_volume(self, context, instance_id, volume_id): """Detach a volume from an instance.""" @@ -180,7 +172,7 @@ class ComputeManager(manager.Manager): volume_id) instance_ref = self.db.instance_get(context, instance_id) volume_ref = self.db.volume_get(context, volume_id) - yield self.driver.detach_volume(instance_ref['ec2_id'], - volume_ref['mountpoint']) + self.driver.detach_volume(instance_ref['ec2_id'], + volume_ref['mountpoint']) self.db.volume_detached(context, volume_id) defer.returnValue(True) diff --git a/nova/flags.py b/nova/flags.py index f3b0384ad..da9987700 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -139,6 +139,8 @@ class FlagValues(gflags.FlagValues): FLAGS = FlagValues() +gflags.FLAGS = FLAGS + def _wrapper(func): def _wrapped(*args, **kw): @@ -159,6 +161,11 @@ DEFINE_list = _wrapper(gflags.DEFINE_list) DEFINE_spaceseplist = _wrapper(gflags.DEFINE_spaceseplist) DEFINE_multistring = _wrapper(gflags.DEFINE_multistring) DEFINE_multi_int = _wrapper(gflags.DEFINE_multi_int) +DEFINE_flag = _wrapper(gflags.DEFINE_flag) + +HelpFlag = gflags.HelpFlag +HelpshortFlag = gflags.HelpshortFlag +HelpXMLFlag = gflags.HelpXMLFlag def DECLARE(name, module_string, flag_values=FLAGS): diff --git a/nova/manager.py b/nova/manager.py index 4244b2db4..994d6e7af 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -39,7 +39,6 @@ class Manager(object): db_driver = FLAGS.db_driver self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103 - @defer.inlineCallbacks def periodic_tasks(self, context=None): """Tasks to be run at a periodic interval""" yield diff --git a/nova/network/manager.py b/nova/network/manager.py index fddb77663..37a15ae05 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -25,7 +25,6 @@ import logging import math import IPy -from twisted.internet import defer from nova import context from nova import db @@ -315,10 +314,9 @@ class FlatDHCPManager(NetworkManager): class VlanManager(NetworkManager): """Vlan network with dhcp""" - @defer.inlineCallbacks def periodic_tasks(self, context=None): """Tasks to be run at a periodic interval""" - yield super(VlanManager, self).periodic_tasks(context) + super(VlanManager, self).periodic_tasks(context) now = datetime.datetime.utcnow() timeout = FLAGS.fixed_ip_disassociate_timeout time = now - datetime.timedelta(seconds=timeout) diff --git a/nova/server.py b/nova/server.py index cb424caa1..fba340a54 100644 --- a/nova/server.py +++ b/nova/server.py @@ -135,9 +135,9 @@ def daemonize(args, name, main): with daemon.DaemonContext( detach_process=FLAGS.daemonize, working_directory=FLAGS.working_directory, - pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile, - acquire_timeout=1, - threaded=False), + #pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile, + # acquire_timeout=1, + # threaded=False), stdin=stdin, stdout=stdout, stderr=stderr, diff --git a/nova/service_eventlet.py b/nova/service_eventlet.py new file mode 100644 index 000000000..eac45a981 --- /dev/null +++ b/nova/service_eventlet.py @@ -0,0 +1,288 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Generic Node baseclass for all workers that run on hosts +""" + +import inspect +import logging +import os +import sys + +from eventlet import event +from eventlet import greenthread +from eventlet import greenpool + +from nova import context +from nova import db +from nova import exception +from nova import flags +from nova import rpc +from nova import utils + + +FLAGS = flags.FLAGS +flags.DEFINE_integer('report_interval', 10, + 'seconds between nodes reporting state to datastore', + lower_bound=1) + +flags.DEFINE_integer('periodic_interval', 60, + 'seconds between running periodic tasks', + lower_bound=1) + +flags.DEFINE_string('pidfile', None, + 'pidfile to use for this service') + + +flags.DEFINE_flag(flags.HelpFlag()) +flags.DEFINE_flag(flags.HelpshortFlag()) +flags.DEFINE_flag(flags.HelpXMLFlag()) + + +class Service(object): + """Base class for workers that run on hosts.""" + + def __init__(self, host, binary, topic, manager, report_interval=None, + periodic_interval=None, *args, **kwargs): + self.host = host + self.binary = binary + self.topic = topic + self.manager_class_name = manager + self.report_interval = report_interval + self.periodic_interval = periodic_interval + super(Service, self).__init__(*args, **kwargs) + self.saved_args, self.saved_kwargs = args, kwargs + + def start(self): + manager_class = utils.import_class(self.manager_class_name) + self.manager = manager_class(host=self.host, *self.saved_args, + **self.saved_kwargs) + self.manager.init_host() + self.model_disconnected = False + ctxt = context.get_admin_context() + try: + service_ref = db.service_get_by_args(ctxt, + self.host, + self.binary) + self.service_id = service_ref['id'] + except exception.NotFound: + self._create_service_ref(ctxt) + + conn1 = rpc.Connection.instance(new=True) + conn2 = rpc.Connection.instance(new=True) + if self.report_interval: + consumer_all = rpc.AdapterConsumer( + connection=conn1, + topic=self.topic, + proxy=self) + consumer_node = rpc.AdapterConsumer( + connection=conn2, + topic='%s.%s' % (self.topic, self.host), + proxy=self) + + consumer_all.attach_to_eventlet() + consumer_node.attach_to_eventlet() + + pulse = utils.LoopingCall(self.report_state) + pulse.start(interval=self.report_interval, now=False) + + if self.periodic_interval: + pulse = utils.LoopingCall(self.periodic_tasks) + pulse.start(interval=self.periodic_interval, now=False) + + def _create_service_ref(self, context): + service_ref = db.service_create(context, + {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0}) + self.service_id = service_ref['id'] + + def __getattr__(self, key): + manager = self.__dict__.get('manager', None) + return getattr(manager, key) + + @classmethod + def create(cls, + host=None, + binary=None, + topic=None, + manager=None, + report_interval=None, + periodic_interval=None): + """Instantiates class and passes back application object. + + Args: + host, defaults to FLAGS.host + binary, defaults to basename of executable + topic, defaults to bin_name - "nova-" part + manager, defaults to FLAGS.<topic>_manager + report_interval, defaults to FLAGS.report_interval + periodic_interval, defaults to FLAGS.periodic_interval + """ + if not host: + host = FLAGS.host + if not binary: + binary = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = binary.rpartition("nova-")[2] + if not manager: + manager = FLAGS.get('%s_manager' % topic, None) + if not report_interval: + report_interval = FLAGS.report_interval + if not periodic_interval: + periodic_interval = FLAGS.periodic_interval + logging.warn("Starting %s node", topic) + service_obj = cls(host, binary, topic, manager, + report_interval, periodic_interval) + + return service_obj + + def kill(self): + """Destroy the service object in the datastore""" + try: + db.service_destroy(context.get_admin_context(), self.service_id) + except exception.NotFound: + logging.warn("Service killed that has no database entry") + + def periodic_tasks(self): + """Tasks to be run at a periodic interval""" + self.manager.periodic_tasks(context.get_admin_context()) + + def report_state(self): + """Update the state of this service in the datastore.""" + ctxt = context.get_admin_context() + try: + try: + service_ref = db.service_get(ctxt, self.service_id) + except exception.NotFound: + logging.debug("The service database object disappeared, " + "Recreating it.") + self._create_service_ref(ctxt) + service_ref = db.service_get(ctxt, self.service_id) + + db.service_update(ctxt, + self.service_id, + {'report_count': service_ref['report_count'] + 1}) + + # TODO(termie): make this pattern be more elegant. + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + # TODO(vish): this should probably only catch connection errors + except Exception: # pylint: disable-msg=W0702 + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + + +def stop(pidfile): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = file(pidfile, 'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % pidfile) + # Not an error in a restart + return + + # Try killing the daemon process + try: + while 1: + os.kill(pid, signal.SIGKILL) + time.sleep(0.1) + except OSError, err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(pidfile): + os.remove(pidfile) + else: + print str(err) + sys.exit(1) + + +def serve(*services): + argv = FLAGS(sys.argv) + + if not services: + services = [Service.create()] + + name = '_'.join(x.binary for x in services) + logging.debug("Serving %s" % name) + + logging.getLogger('amqplib').setLevel(logging.DEBUG) + + if not FLAGS.pidfile: + FLAGS.pidfile = '%s.pid' % name + # NOTE(vish): if we're running nodaemon, redirect the log to stdout + #if FLAGS.nodaemon and not FLAGS.logfile: + # FLAGS.logfile = "-" + #if not FLAGS.logfile: + # FLAGS.logfile = '%s.log' % name + #if not FLAGS.prefix: + # FLAGS.prefix = name + #elif FLAGS.prefix.endswith('twisted'): + # FLAGS.prefix = FLAGS.prefix.replace('twisted', name) + + action = 'start' + if len(argv) > 1: + action = argv.pop() + + if action == 'stop': + stop(FLAGS.pidfile) + sys.exit() + elif action == 'restart': + stop(FLAGS.pidfile) + elif action == 'start': + pass + else: + print 'usage: %s [options] [start|stop|restart]' % argv[0] + sys.exit(1) + + #formatter = logging.Formatter( + # '(%(name)s): %(levelname)s %(message)s') + #handler = logging.StreamHandler() + #handler.setFormatter(formatter) + #logging.getLogger().addHandler(handler) + + if FLAGS.verbose: + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.getLogger().setLevel(logging.WARNING) + + logging.debug("Full set of FLAGS:") + for flag in FLAGS: + logging.debug("%s : %s" % (flag, FLAGS.get(flag, None))) + + for x in services: + x.start() + + #while True: + # greenthread.sleep(5) + + diff --git a/nova/utils.py b/nova/utils.py index 7683fc9f4..a219e47bf 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -29,6 +29,9 @@ import subprocess import socket import sys +from eventlet import event +from eventlet import greenthread + from twisted.internet.threads import deferToThread from nova import exception @@ -212,3 +215,36 @@ def deferredToThread(f): def g(*args, **kwargs): return deferToThread(f, *args, **kwargs) return g + + +class LoopingCall(object): + def __init__(self, f=None, *args, **kw): + self.args = args + self.kw = kw + self.f = f + self._running = False + + def start(self, interval, now=True): + self._running = True + done = event.Event() + def _inner(): + if not now: + greenthread.sleep(interval) + try: + while self._running: + self.f(*self.args, **self.kw) + greenthread.sleep(interval) + except Exception: + logging.exception('hhmm') + done.send_exception(*sys.exc_info()) + return + + done.send(True) + + greenthread.spawn(_inner) + return done + + def stop(self): + self._running = False + + diff --git a/nova/virt/fake.py b/nova/virt/fake.py index eaa2261f5..0684a0877 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -24,8 +24,6 @@ This module also documents the semantics of real hypervisor connections. import logging -from twisted.internet import defer - from nova.compute import power_state @@ -105,7 +103,6 @@ class FakeConnection(object): fake_instance = FakeInstance() self.instances[instance.name] = fake_instance fake_instance._state = power_state.RUNNING - return defer.succeed(None) def reboot(self, instance): """ @@ -117,7 +114,7 @@ class FakeConnection(object): The work will be done asynchronously. This function returns a Deferred that allows the caller to detect when it is complete. """ - return defer.succeed(None) + pass def destroy(self, instance): """ @@ -130,7 +127,6 @@ class FakeConnection(object): Deferred that allows the caller to detect when it is complete. """ del self.instances[instance.name] - return defer.succeed(None) def attach_volume(self, instance_name, device_path, mountpoint): """Attach the disk at device_path to the instance at mountpoint""" diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 509ed97a0..9ca97bd1b 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -25,10 +25,10 @@ import logging import os import shutil +from eventlet import event +from eventlet import tpool + import IPy -from twisted.internet import defer -from twisted.internet import task -from twisted.internet import threads from nova import context from nova import db @@ -145,13 +145,12 @@ class LibvirtConnection(object): except Exception as _err: pass # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda _: self._cleanup(instance)) - # FIXME: What does this comment mean? - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, + + done = event.Event() + + # We'll save this for when we do shutdown, # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_shutdown(): try: @@ -160,17 +159,26 @@ class LibvirtConnection(object): instance['id'], state) if state == power_state.SHUTDOWN: timer.stop() - d.callback(None) except Exception: db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d + timer_done = timer.start(interval=0.5, now=True) + + # NOTE(termie): this is strictly superfluous (we could put the + # cleanup code in the timer), but this emulates the + # previous model so I am keeping it around until + # everything has been vetted a bit + def _wait_for_timer(): + timer_done.wait() + self._cleanup(instance) + done.send() + + greenthread.spawn(_wait_for_time) + return done def _cleanup(self, instance): target = os.path.join(FLAGS.instances_path, instance['name']) @@ -179,32 +187,28 @@ class LibvirtConnection(object): if os.path.exists(target): shutil.rmtree(target) - @defer.inlineCallbacks @exception.wrap_exception def attach_volume(self, instance_name, device_path, mountpoint): - yield process.simple_execute("sudo virsh attach-disk %s %s %s" % - (instance_name, - device_path, - mountpoint.rpartition('/dev/')[2])) + process.simple_execute("sudo virsh attach-disk %s %s %s" % + (instance_name, + device_path, + mountpoint.rpartition('/dev/')[2])) - @defer.inlineCallbacks @exception.wrap_exception def detach_volume(self, instance_name, mountpoint): # NOTE(vish): despite the documentation, virsh detach-disk just # wants the device name without the leading /dev/ - yield process.simple_execute("sudo virsh detach-disk %s %s" % - (instance_name, - mountpoint.rpartition('/dev/')[2])) + process.simple_execute("sudo virsh detach-disk %s %s" % + (instance_name, + mountpoint.rpartition('/dev/')[2])) - @defer.inlineCallbacks @exception.wrap_exception def reboot(self, instance): xml = self.to_xml(instance) - yield self._conn.lookupByName(instance['name']).destroy() - yield self._conn.createXML(xml, 0) + self._conn.lookupByName(instance['name']).destroy() + self._conn.createXML(xml, 0) - d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_reboot(): try: @@ -214,20 +218,16 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug('instance %s: rebooted', instance['name']) timer.stop() - d.callback(None) except Exception, exn: logging.error('_wait_for_reboot failed: %s', exn) db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d + return timer.start(interval=0.5, now=True) - @defer.inlineCallbacks @exception.wrap_exception def spawn(self, instance): xml = self.to_xml(instance) @@ -235,16 +235,12 @@ class LibvirtConnection(object): instance['id'], power_state.NOSTATE, 'launching') - yield NWFilterFirewall(self._conn).\ - setup_nwfilters_for_instance(instance) - yield self._create_image(instance, xml) - yield self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot + NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance) + self._create_image(instance, xml) + self._conn.createXML(xml, 0) logging.debug("instance %s: is running", instance['name']) - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_boot(): try: @@ -254,7 +250,6 @@ class LibvirtConnection(object): if state == power_state.RUNNING: logging.debug('instance %s: booted', instance['name']) timer.stop() - local_d.callback(None) except: logging.exception('instance %s: failed to boot', instance['name']) @@ -262,10 +257,9 @@ class LibvirtConnection(object): instance['id'], power_state.SHUTDOWN) timer.stop() - local_d.callback(None) + timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - yield local_d + return timer.start(interval=0.5, now=True) def _flush_xen_console(self, virsh_output): logging.info('virsh said: %r' % (virsh_output,)) @@ -273,10 +267,9 @@ class LibvirtConnection(object): if virsh_output.startswith('/dev/'): logging.info('cool, it\'s a device') - d = process.simple_execute("sudo dd if=%s iflag=nonblock" % + r = process.simple_execute("sudo dd if=%s iflag=nonblock" % virsh_output, check_exit_code=False) - d.addCallback(lambda r: r[0]) - return d + return r[0] else: return '' @@ -296,21 +289,21 @@ class LibvirtConnection(object): def get_console_output(self, instance): console_log = os.path.join(FLAGS.instances_path, instance['name'], 'console.log') - d = process.simple_execute('sudo chown %d %s' % (os.getuid(), - console_log)) + + process.simple_execute('sudo chown %d %s' % (os.getuid(), + console_log)) + if FLAGS.libvirt_type == 'xen': - # Xen is spethial - d.addCallback(lambda _: - process.simple_execute("virsh ttyconsole %s" % - instance['name'])) - d.addCallback(self._flush_xen_console) - d.addCallback(self._append_to_file, console_log) + # Xen is special + virsh_output = process.simple_execute("virsh ttyconsole %s" % + instance['name']) + data = self._flush_xen_console(virsh_output) + fpath = self._append_to_file(data, console_log) else: - d.addCallback(lambda _: defer.succeed(console_log)) - d.addCallback(self._dump_file) - return d + fpath = console_log + + return self._dump_file(fpath) - @defer.inlineCallbacks def _create_image(self, inst, libvirt_xml): # syntactic nicety basepath = lambda fname='': os.path.join(FLAGS.instances_path, @@ -318,8 +311,8 @@ class LibvirtConnection(object): fname) # ensure directories exist and are writable - yield process.simple_execute('mkdir -p %s' % basepath()) - yield process.simple_execute('chmod 0777 %s' % basepath()) + process.simple_execute('mkdir -p %s' % basepath()) + process.simple_execute('chmod 0777 %s' % basepath()) # TODO(termie): these are blocking calls, it would be great # if they weren't. @@ -335,19 +328,19 @@ class LibvirtConnection(object): project = manager.AuthManager().get_project(inst['project_id']) if not os.path.exists(basepath('disk')): - yield images.fetch(inst.image_id, basepath('disk-raw'), user, - project) + images.fetch(inst.image_id, basepath('disk-raw'), user, + project) if not os.path.exists(basepath('kernel')): - yield images.fetch(inst.kernel_id, basepath('kernel'), user, - project) + images.fetch(inst.kernel_id, basepath('kernel'), user, + project) if not os.path.exists(basepath('ramdisk')): - yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, - project) - - execute = lambda cmd, process_input=None, check_exit_code=True: \ - process.simple_execute(cmd=cmd, - process_input=process_input, - check_exit_code=check_exit_code) + images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, + project) + + def execute(cmd, process_input=None, check_exit_code=True): + return process.simple_execute(cmd=cmd, + process_input=process_input, + check_exit_code=check_exit_code) key = str(inst['key_data']) net = None @@ -369,23 +362,23 @@ class LibvirtConnection(object): if net: logging.info('instance %s: injecting net into image %s', inst['name'], inst.image_id) - yield disk.inject_data(basepath('disk-raw'), key, net, - execute=execute) + disk.inject_data(basepath('disk-raw'), key, net, + execute=execute) if os.path.exists(basepath('disk')): - yield process.simple_execute('rm -f %s' % basepath('disk')) + process.simple_execute('rm -f %s' % basepath('disk')) local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type] ['local_gb'] * 1024 * 1024 * 1024) resize = inst['instance_type'] != 'm1.tiny' - yield disk.partition(basepath('disk-raw'), basepath('disk'), - local_bytes, resize, execute=execute) + disk.partition(basepath('disk-raw'), basepath('disk'), + local_bytes, resize, execute=execute) if FLAGS.libvirt_type == 'uml': - yield process.simple_execute('sudo chown root %s' % - basepath('disk')) + process.simple_execute('sudo chown root %s' % + basepath('disk')) def to_xml(self, instance): # TODO(termie): cache? @@ -637,15 +630,15 @@ class NWFilterFirewall(object): def _define_filter(self, xml): if callable(xml): xml = xml() - d = threads.deferToThread(self._conn.nwfilterDefineXML, xml) - return d + + # execute in a native thread and block until done + tpool.execute(self._conn.nwfilterDefineXML, xml) @staticmethod def _get_net_and_mask(cidr): net = IPy.IP(cidr) return str(net.net()), str(net.netmask()) - @defer.inlineCallbacks def setup_nwfilters_for_instance(self, instance): """ Creates an NWFilter for the given instance. In the process, @@ -653,10 +646,10 @@ class NWFilterFirewall(object): the base filter are all in place. """ - yield self._define_filter(self.nova_base_ipv4_filter) - yield self._define_filter(self.nova_base_ipv6_filter) - yield self._define_filter(self.nova_dhcp_filter) - yield self._define_filter(self.nova_base_filter) + self._define_filter(self.nova_base_ipv4_filter) + self._define_filter(self.nova_base_ipv6_filter) + self._define_filter(self.nova_dhcp_filter) + self._define_filter(self.nova_base_filter) nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \ " <filterref filter='nova-base' />\n" % \ @@ -668,20 +661,19 @@ class NWFilterFirewall(object): net, mask = self._get_net_and_mask(network_ref['cidr']) project_filter = self.nova_project_filter(instance['project_id'], net, mask) - yield self._define_filter(project_filter) + self._define_filter(project_filter) nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \ instance['project_id'] for security_group in instance.security_groups: - yield self.ensure_security_group_filter(security_group['id']) + self.ensure_security_group_filter(security_group['id']) nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \ security_group['id'] nwfilter_xml += "</filter>" - yield self._define_filter(nwfilter_xml) - return + self._define_filter(nwfilter_xml) def ensure_security_group_filter(self, security_group_id): return self._define_filter( diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py index a17e405ab..f997d01d7 100644 --- a/nova/virt/xenapi.py +++ b/nova/virt/xenapi.py @@ -36,11 +36,10 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. """ import logging +import sys import xmlrpclib -from twisted.internet import defer -from twisted.internet import reactor -from twisted.internet import task +from eventlet import tpool from nova import db from nova import flags @@ -110,36 +109,33 @@ class XenAPIConnection(object): return [self._conn.xenapi.VM.get_name_label(vm) \ for vm in self._conn.xenapi.VM.get_all()] - @defer.inlineCallbacks def spawn(self, instance): - vm = yield self._lookup(instance.name) + vm = self._lookup(instance.name) if vm is not None: raise Exception('Attempted to create non-unique name %s' % instance.name) network = db.project_get_network(None, instance.project_id) - network_ref = \ - yield self._find_network_with_bridge(network.bridge) + network_ref = self._find_network_with_bridge(network.bridge) user = AuthManager().get_user(instance.user_id) project = AuthManager().get_project(instance.project_id) - vdi_uuid = yield self._fetch_image( + vdi_uuid = self._fetch_image( instance.image_id, user, project, True) - kernel = yield self._fetch_image( + kernel = self._fetch_image( instance.kernel_id, user, project, False) - ramdisk = yield self._fetch_image( + ramdisk = self._fetch_image( instance.ramdisk_id, user, project, False) - vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid) + vdi_ref = self._call_xenapi('VDI.get_by_uuid', vdi_uuid) - vm_ref = yield self._create_vm(instance, kernel, ramdisk) - yield self._create_vbd(vm_ref, vdi_ref, 0, True) + vm_ref = self._create_vm(instance, kernel, ramdisk) + self._create_vbd(vm_ref, vdi_ref, 0, True) if network_ref: - yield self._create_vif(vm_ref, network_ref, instance.mac_address) + self._create_vif(vm_ref, network_ref, instance.mac_address) logging.debug('Starting VM %s...', vm_ref) - yield self._call_xenapi('VM.start', vm_ref, False, False) + self._call_xenapi('VM.start', vm_ref, False, False) logging.info('Spawning VM %s created %s.', instance.name, vm_ref) - @defer.inlineCallbacks def _create_vm(self, instance, kernel, ramdisk): """Create a VM record. Returns a Deferred that gives the new VM reference.""" @@ -177,11 +173,10 @@ class XenAPIConnection(object): 'other_config': {}, } logging.debug('Created VM %s...', instance.name) - vm_ref = yield self._call_xenapi('VM.create', rec) + vm_ref = self._call_xenapi('VM.create', rec) logging.debug('Created VM %s as %s.', instance.name, vm_ref) - defer.returnValue(vm_ref) + return vm_ref - @defer.inlineCallbacks def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): """Create a VBD record. Returns a Deferred that gives the new VBD reference.""" @@ -200,12 +195,11 @@ class XenAPIConnection(object): vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec) + vbd_ref = self._call_xenapi('VBD.create', vbd_rec) logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, vdi_ref) - defer.returnValue(vbd_ref) + return vbd_ref - @defer.inlineCallbacks def _create_vif(self, vm_ref, network_ref, mac_address): """Create a VIF record. Returns a Deferred that gives the new VIF reference.""" @@ -221,24 +215,22 @@ class XenAPIConnection(object): vif_rec['qos_algorithm_params'] = {} logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, network_ref) - vif_ref = yield self._call_xenapi('VIF.create', vif_rec) + vif_ref = self._call_xenapi('VIF.create', vif_rec) logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, vm_ref, network_ref) - defer.returnValue(vif_ref) + return vif_ref - @defer.inlineCallbacks def _find_network_with_bridge(self, bridge): expr = 'field "bridge" = "%s"' % bridge - networks = yield self._call_xenapi('network.get_all_records_where', - expr) + networks = self._call_xenapi('network.get_all_records_where', + expr) if len(networks) == 1: - defer.returnValue(networks.keys()[0]) + return networks.keys()[0] elif len(networks) > 1: raise Exception('Found non-unique network for bridge %s' % bridge) else: raise Exception('Found no network for bridge %s' % bridge) - @defer.inlineCallbacks def _fetch_image(self, image, user, project, use_sr): """use_sr: True to put the image as a VDI in an SR, False to place it on dom0's filesystem. The former is for VM disks, the latter for @@ -255,33 +247,31 @@ class XenAPIConnection(object): args['password'] = user.secret if use_sr: args['add_partition'] = 'true' - task = yield self._async_call_plugin('objectstore', fn, args) - uuid = yield self._wait_for_task(task) - defer.returnValue(uuid) + task = self._async_call_plugin('objectstore', fn, args) + uuid = self._wait_for_task(task) + return uuid - @defer.inlineCallbacks def reboot(self, instance): - vm = yield self._lookup(instance.name) + vm = self._lookup(instance.name) if vm is None: raise Exception('instance not present %s' % instance.name) - task = yield self._call_xenapi('Async.VM.clean_reboot', vm) - yield self._wait_for_task(task) + task = self._call_xenapi('Async.VM.clean_reboot', vm) + self._wait_for_task(task) - @defer.inlineCallbacks def destroy(self, instance): - vm = yield self._lookup(instance.name) + vm = self._lookup(instance.name) if vm is None: # Don't complain, just return. This lets us clean up instances # that have already disappeared from the underlying platform. - defer.returnValue(None) + return try: - task = yield self._call_xenapi('Async.VM.hard_shutdown', vm) - yield self._wait_for_task(task) + task = self._call_xenapi('Async.VM.hard_shutdown', vm) + self._wait_for_task(task) except Exception, exc: logging.warn(exc) try: - task = yield self._call_xenapi('Async.VM.destroy', vm) - yield self._wait_for_task(task) + task = self._call_xenapi('Async.VM.destroy', vm) + self._wait_for_task(task) except Exception, exc: logging.warn(exc) @@ -299,7 +289,6 @@ class XenAPIConnection(object): def get_console_output(self, instance): return 'FAKE CONSOLE OUTPUT' - @utils.deferredToThread def _lookup(self, i): return self._lookup_blocking(i) @@ -316,35 +305,32 @@ class XenAPIConnection(object): def _wait_for_task(self, task): """Return a Deferred that will give the result of the given task. The task is polled until it completes.""" - d = defer.Deferred() - reactor.callLater(0, self._poll_task, task, d) - return d - @utils.deferredToThread - def _poll_task(self, task, deferred): + done = event.Event() + loop = utis.LoopingTask(self._poll_task, task, done) + loop.start(FLAGS.xenapi_task_poll_interval, now=True) + return done.wait() + + def _poll_task(self, task, done): """Poll the given XenAPI task, and fire the given Deferred if we get a result.""" try: - #logging.debug('Polling task %s...', task) status = self._conn.xenapi.task.get_status(task) if status == 'pending': - reactor.callLater(FLAGS.xenapi_task_poll_interval, - self._poll_task, task, deferred) + return elif status == 'success': result = self._conn.xenapi.task.get_result(task) logging.info('Task %s status: success. %s', task, result) - deferred.callback(_parse_xmlrpc_value(result)) + done.send(_parse_xmlrpc_value(result)) else: error_info = self._conn.xenapi.task.get_error_info(task) logging.warn('Task %s status: %s. %s', task, status, error_info) - deferred.errback(XenAPI.Failure(error_info)) - #logging.debug('Polling task %s done.', task) + done.send_exception(XenAPI.Failure(error_info)) except Exception, exc: logging.warn(exc) - deferred.errback(exc) + done.send_exception(*sys.exc_info()) - @utils.deferredToThread def _call_xenapi(self, method, *args): """Call the specified XenAPI method on a background thread. Returns a Deferred for the result.""" @@ -353,11 +339,10 @@ class XenAPIConnection(object): f = f.__getattr__(m) return f(*args) - @utils.deferredToThread def _async_call_plugin(self, plugin, fn, args): """Call Async.host.call_plugin on a background thread. Returns a Deferred with the task reference.""" - return _unwrap_plugin_exceptions( + return tpool.execute(_unwrap_plugin_exceptions, self._conn.xenapi.Async.host.call_plugin, self._get_xenapi_host(), plugin, fn, args) |