diff options
-rwxr-xr-x | bin/nova-combined | 14 | ||||
-rwxr-xr-x | bin/nova-compute | 6 | ||||
-rwxr-xr-x | bin/nova-network | 6 | ||||
-rwxr-xr-x | bin/nova-scheduler | 6 | ||||
-rwxr-xr-x | bin/nova-volume | 6 | ||||
-rw-r--r-- | nova/service.py | 169 | ||||
-rw-r--r-- | nova/service_eventlet.py | 303 | ||||
-rw-r--r-- | nova/tests/scheduler_unittest.py | 2 | ||||
-rw-r--r-- | nova/tests/service_unittest.py | 2 |
9 files changed, 160 insertions, 354 deletions
diff --git a/bin/nova-combined b/bin/nova-combined index c86584328..c70d1d9ef 100755 --- a/bin/nova-combined +++ b/bin/nova-combined @@ -37,7 +37,7 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): from nova import api from nova import flags -from nova import service_eventlet +from nova import service from nova import wsgi @@ -54,13 +54,13 @@ flags.DEFINE_string('ec2api_host', '0.0.0.0', 'EC2 API host') if __name__ == '__main__': FLAGS(sys.argv) - compute = service_eventlet.Service.create(binary='nova-compute') - network = service_eventlet.Service.create(binary='nova-network') - volume = service_eventlet.Service.create(binary='nova-volume') - scheduler = service_eventlet.Service.create(binary='nova-scheduler') - #objectstore = service_eventlet.Service.create(binary='nova-objectstore') + compute = service.Service.create(binary='nova-compute') + network = service.Service.create(binary='nova-network') + volume = service.Service.create(binary='nova-volume') + scheduler = service.Service.create(binary='nova-scheduler') + #objectstore = service.Service.create(binary='nova-objectstore') - service_eventlet.serve(compute, network, volume, scheduler) + service.serve(compute, network, volume, scheduler) server = wsgi.Server() server.start(api.API('os'), FLAGS.osapi_port, host=FLAGS.osapi_host) diff --git a/bin/nova-compute b/bin/nova-compute index 307f7cb59..4baf47e29 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): sys.path.insert(0, possible_topdir) -from nova import service_eventlet +from nova import service if __name__ == '__main__': - service_eventlet.serve() - service_eventlet.wait() + service.serve() + service.wait() diff --git a/bin/nova-network b/bin/nova-network index 307f7cb59..4baf47e29 100755 --- a/bin/nova-network +++ b/bin/nova-network @@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): sys.path.insert(0, possible_topdir) -from nova import service_eventlet +from nova import service if __name__ == '__main__': - service_eventlet.serve() - service_eventlet.wait() + service.serve() + service.wait() diff --git a/bin/nova-scheduler b/bin/nova-scheduler index e4aa0dcae..a1b7ddf60 100755 --- a/bin/nova-scheduler +++ b/bin/nova-scheduler @@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): sys.path.insert(0, possible_topdir) -from nova import service_eventlet +from nova import service if __name__ == '__main__': - service_eventlet.serve() - service_eventlet.wait() + service.serve() + service.wait() diff --git a/bin/nova-volume b/bin/nova-volume index 395d16041..ba4a3a502 100755 --- a/bin/nova-volume +++ b/bin/nova-volume @@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): sys.path.insert(0, possible_topdir) -from nova import service_eventlet +from nova import service if __name__ == '__main__': - service_eventlet.serve() - service_eventlet.wait() + service.serve() + service.wait() diff --git a/nova/service.py b/nova/service.py index 55a0bb212..576cf8b85 100644 --- a/nova/service.py +++ b/nova/service.py @@ -17,21 +17,17 @@ # under the License. """ -A service is a very thin wrapper around a Manager object. It exposes the -manager's public methods to other components of the system via rpc. It will -report state periodically to the database and is responsible for initiating -any periodic tasts that need to be executed on a given host. - -This module contains Service, a generic baseclass for all workers. +Generic Node baseclass for all workers that run on hosts """ import inspect import logging import os +import sys -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service +from eventlet import event +from eventlet import greenthread +from eventlet import greenpool from nova import context from nova import db @@ -50,8 +46,16 @@ flags.DEFINE_integer('periodic_interval', 60, 'seconds between running periodic tasks', lower_bound=1) +flags.DEFINE_string('pidfile', None, + 'pidfile to use for this service') + + +flags.DEFINE_flag(flags.HelpFlag()) +flags.DEFINE_flag(flags.HelpshortFlag()) +flags.DEFINE_flag(flags.HelpXMLFlag()) + -class Service(object, service.Service): +class Service(object): """Base class for workers that run on hosts.""" def __init__(self, host, binary, topic, manager, report_interval=None, @@ -64,8 +68,9 @@ class Service(object, service.Service): self.periodic_interval = periodic_interval super(Service, self).__init__(*args, **kwargs) self.saved_args, self.saved_kwargs = args, kwargs + self.timers = [] - def startService(self): # pylint: disable-msg C0103 + def start(self): manager_class = utils.import_class(self.manager_class_name) self.manager = manager_class(host=self.host, *self.saved_args, **self.saved_kwargs) @@ -80,26 +85,29 @@ class Service(object, service.Service): except exception.NotFound: self._create_service_ref(ctxt) - conn = rpc.Connection.instance() + conn1 = rpc.Connection.instance(new=True) + conn2 = rpc.Connection.instance(new=True) if self.report_interval: consumer_all = rpc.AdapterConsumer( - connection=conn, + connection=conn1, topic=self.topic, proxy=self) consumer_node = rpc.AdapterConsumer( - connection=conn, + connection=conn2, topic='%s.%s' % (self.topic, self.host), proxy=self) - consumer_all.attach_to_twisted() - consumer_node.attach_to_twisted() - - pulse = task.LoopingCall(self.report_state) + self.timers.append(consumer_all.attach_to_eventlet()) + self.timers.append(consumer_node.attach_to_eventlet()) + + pulse = utils.LoopingCall(self.report_state) pulse.start(interval=self.report_interval, now=False) + self.timers.append(pulse) if self.periodic_interval: - pulse = task.LoopingCall(self.periodic_tasks) - pulse.start(interval=self.periodic_interval, now=False) + periodic = utils.LoopingCall(self.periodic_tasks) + periodic.start(interval=self.periodic_interval, now=False) + self.timers.append(periodic) def _create_service_ref(self, context): service_ref = db.service_create(context, @@ -114,7 +122,7 @@ class Service(object, service.Service): return getattr(manager, key) @classmethod - def create(cls, + def create(cls, host=None, binary=None, topic=None, @@ -147,24 +155,28 @@ class Service(object, service.Service): service_obj = cls(host, binary, topic, manager, report_interval, periodic_interval) - # This is the parent service that twistd will be looking for when it - # parses this file, return it so that we can get it into globals. - application = service.Application(binary) - service_obj.setServiceParent(application) - return application + return service_obj def kill(self): """Destroy the service object in the datastore""" + self.stop() try: db.service_destroy(context.get_admin_context(), self.service_id) except exception.NotFound: logging.warn("Service killed that has no database entry") + def stop(self): + for x in self.timers: + try: + x.stop() + except Exception: + pass + self.timers = [] + def periodic_tasks(self): """Tasks to be run at a periodic interval""" - yield self.manager.periodic_tasks(context.get_admin_context()) + self.manager.periodic_tasks(context.get_admin_context()) - @defer.inlineCallbacks def report_state(self): """Update the state of this service in the datastore.""" ctxt = context.get_admin_context() @@ -180,7 +192,7 @@ class Service(object, service.Service): db.service_update(ctxt, self.service_id, {'report_count': service_ref['report_count'] + 1}) - + # TODO(termie): make this pattern be more elegant. if getattr(self, "model_disconnected", False): self.model_disconnected = False @@ -191,4 +203,101 @@ class Service(object, service.Service): if not getattr(self, "model_disconnected", False): self.model_disconnected = True logging.exception("model server went away") - yield + + +def stop(pidfile): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = file(pidfile, 'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % pidfile) + # Not an error in a restart + return + + # Try killing the daemon process + try: + while 1: + os.kill(pid, signal.SIGKILL) + time.sleep(0.1) + except OSError, err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(pidfile): + os.remove(pidfile) + else: + print str(err) + sys.exit(1) + + +def serve(*services): + argv = FLAGS(sys.argv) + + if not services: + services = [Service.create()] + + name = '_'.join(x.binary for x in services) + logging.debug("Serving %s" % name) + + logging.getLogger('amqplib').setLevel(logging.DEBUG) + + if not FLAGS.pidfile: + FLAGS.pidfile = '%s.pid' % name + # NOTE(vish): if we're running nodaemon, redirect the log to stdout + #if FLAGS.nodaemon and not FLAGS.logfile: + # FLAGS.logfile = "-" + #if not FLAGS.logfile: + # FLAGS.logfile = '%s.log' % name + #if not FLAGS.prefix: + # FLAGS.prefix = name + #elif FLAGS.prefix.endswith('twisted'): + # FLAGS.prefix = FLAGS.prefix.replace('twisted', name) + + action = 'start' + if len(argv) > 1: + action = argv.pop() + + if action == 'stop': + stop(FLAGS.pidfile) + sys.exit() + elif action == 'restart': + stop(FLAGS.pidfile) + elif action == 'start': + pass + else: + print 'usage: %s [options] [start|stop|restart]' % argv[0] + sys.exit(1) + + #formatter = logging.Formatter( + # '(%(name)s): %(levelname)s %(message)s') + #handler = logging.StreamHandler() + #handler.setFormatter(formatter) + #logging.getLogger().addHandler(handler) + + if FLAGS.verbose: + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.getLogger().setLevel(logging.WARNING) + + logging.debug("Full set of FLAGS:") + for flag in FLAGS: + logging.debug("%s : %s" % (flag, FLAGS.get(flag, None))) + + for x in services: + x.start() + + #while True: + # greenthread.sleep(5) + + +def wait(): + while True: + greenthread.sleep(5) diff --git a/nova/service_eventlet.py b/nova/service_eventlet.py deleted file mode 100644 index 576cf8b85..000000000 --- a/nova/service_eventlet.py +++ /dev/null @@ -1,303 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Generic Node baseclass for all workers that run on hosts -""" - -import inspect -import logging -import os -import sys - -from eventlet import event -from eventlet import greenthread -from eventlet import greenpool - -from nova import context -from nova import db -from nova import exception -from nova import flags -from nova import rpc -from nova import utils - - -FLAGS = flags.FLAGS -flags.DEFINE_integer('report_interval', 10, - 'seconds between nodes reporting state to datastore', - lower_bound=1) - -flags.DEFINE_integer('periodic_interval', 60, - 'seconds between running periodic tasks', - lower_bound=1) - -flags.DEFINE_string('pidfile', None, - 'pidfile to use for this service') - - -flags.DEFINE_flag(flags.HelpFlag()) -flags.DEFINE_flag(flags.HelpshortFlag()) -flags.DEFINE_flag(flags.HelpXMLFlag()) - - -class Service(object): - """Base class for workers that run on hosts.""" - - def __init__(self, host, binary, topic, manager, report_interval=None, - periodic_interval=None, *args, **kwargs): - self.host = host - self.binary = binary - self.topic = topic - self.manager_class_name = manager - self.report_interval = report_interval - self.periodic_interval = periodic_interval - super(Service, self).__init__(*args, **kwargs) - self.saved_args, self.saved_kwargs = args, kwargs - self.timers = [] - - def start(self): - manager_class = utils.import_class(self.manager_class_name) - self.manager = manager_class(host=self.host, *self.saved_args, - **self.saved_kwargs) - self.manager.init_host() - self.model_disconnected = False - ctxt = context.get_admin_context() - try: - service_ref = db.service_get_by_args(ctxt, - self.host, - self.binary) - self.service_id = service_ref['id'] - except exception.NotFound: - self._create_service_ref(ctxt) - - conn1 = rpc.Connection.instance(new=True) - conn2 = rpc.Connection.instance(new=True) - if self.report_interval: - consumer_all = rpc.AdapterConsumer( - connection=conn1, - topic=self.topic, - proxy=self) - consumer_node = rpc.AdapterConsumer( - connection=conn2, - topic='%s.%s' % (self.topic, self.host), - proxy=self) - - self.timers.append(consumer_all.attach_to_eventlet()) - self.timers.append(consumer_node.attach_to_eventlet()) - - pulse = utils.LoopingCall(self.report_state) - pulse.start(interval=self.report_interval, now=False) - self.timers.append(pulse) - - if self.periodic_interval: - periodic = utils.LoopingCall(self.periodic_tasks) - periodic.start(interval=self.periodic_interval, now=False) - self.timers.append(periodic) - - def _create_service_ref(self, context): - service_ref = db.service_create(context, - {'host': self.host, - 'binary': self.binary, - 'topic': self.topic, - 'report_count': 0}) - self.service_id = service_ref['id'] - - def __getattr__(self, key): - manager = self.__dict__.get('manager', None) - return getattr(manager, key) - - @classmethod - def create(cls, - host=None, - binary=None, - topic=None, - manager=None, - report_interval=None, - periodic_interval=None): - """Instantiates class and passes back application object. - - Args: - host, defaults to FLAGS.host - binary, defaults to basename of executable - topic, defaults to bin_name - "nova-" part - manager, defaults to FLAGS.<topic>_manager - report_interval, defaults to FLAGS.report_interval - periodic_interval, defaults to FLAGS.periodic_interval - """ - if not host: - host = FLAGS.host - if not binary: - binary = os.path.basename(inspect.stack()[-1][1]) - if not topic: - topic = binary.rpartition("nova-")[2] - if not manager: - manager = FLAGS.get('%s_manager' % topic, None) - if not report_interval: - report_interval = FLAGS.report_interval - if not periodic_interval: - periodic_interval = FLAGS.periodic_interval - logging.warn("Starting %s node", topic) - service_obj = cls(host, binary, topic, manager, - report_interval, periodic_interval) - - return service_obj - - def kill(self): - """Destroy the service object in the datastore""" - self.stop() - try: - db.service_destroy(context.get_admin_context(), self.service_id) - except exception.NotFound: - logging.warn("Service killed that has no database entry") - - def stop(self): - for x in self.timers: - try: - x.stop() - except Exception: - pass - self.timers = [] - - def periodic_tasks(self): - """Tasks to be run at a periodic interval""" - self.manager.periodic_tasks(context.get_admin_context()) - - def report_state(self): - """Update the state of this service in the datastore.""" - ctxt = context.get_admin_context() - try: - try: - service_ref = db.service_get(ctxt, self.service_id) - except exception.NotFound: - logging.debug("The service database object disappeared, " - "Recreating it.") - self._create_service_ref(ctxt) - service_ref = db.service_get(ctxt, self.service_id) - - db.service_update(ctxt, - self.service_id, - {'report_count': service_ref['report_count'] + 1}) - - # TODO(termie): make this pattern be more elegant. - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - # TODO(vish): this should probably only catch connection errors - except Exception: # pylint: disable-msg=W0702 - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - - -def stop(pidfile): - """ - Stop the daemon - """ - # Get the pid from the pidfile - try: - pf = file(pidfile, 'r') - pid = int(pf.read().strip()) - pf.close() - except IOError: - pid = None - - if not pid: - message = "pidfile %s does not exist. Daemon not running?\n" - sys.stderr.write(message % pidfile) - # Not an error in a restart - return - - # Try killing the daemon process - try: - while 1: - os.kill(pid, signal.SIGKILL) - time.sleep(0.1) - except OSError, err: - err = str(err) - if err.find("No such process") > 0: - if os.path.exists(pidfile): - os.remove(pidfile) - else: - print str(err) - sys.exit(1) - - -def serve(*services): - argv = FLAGS(sys.argv) - - if not services: - services = [Service.create()] - - name = '_'.join(x.binary for x in services) - logging.debug("Serving %s" % name) - - logging.getLogger('amqplib').setLevel(logging.DEBUG) - - if not FLAGS.pidfile: - FLAGS.pidfile = '%s.pid' % name - # NOTE(vish): if we're running nodaemon, redirect the log to stdout - #if FLAGS.nodaemon and not FLAGS.logfile: - # FLAGS.logfile = "-" - #if not FLAGS.logfile: - # FLAGS.logfile = '%s.log' % name - #if not FLAGS.prefix: - # FLAGS.prefix = name - #elif FLAGS.prefix.endswith('twisted'): - # FLAGS.prefix = FLAGS.prefix.replace('twisted', name) - - action = 'start' - if len(argv) > 1: - action = argv.pop() - - if action == 'stop': - stop(FLAGS.pidfile) - sys.exit() - elif action == 'restart': - stop(FLAGS.pidfile) - elif action == 'start': - pass - else: - print 'usage: %s [options] [start|stop|restart]' % argv[0] - sys.exit(1) - - #formatter = logging.Formatter( - # '(%(name)s): %(levelname)s %(message)s') - #handler = logging.StreamHandler() - #handler.setFormatter(formatter) - #logging.getLogger().addHandler(handler) - - if FLAGS.verbose: - logging.getLogger().setLevel(logging.DEBUG) - else: - logging.getLogger().setLevel(logging.WARNING) - - logging.debug("Full set of FLAGS:") - for flag in FLAGS: - logging.debug("%s : %s" % (flag, FLAGS.get(flag, None))) - - for x in services: - x.start() - - #while True: - # greenthread.sleep(5) - - -def wait(): - while True: - greenthread.sleep(5) diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py index 37d15567f..f442a4bc2 100644 --- a/nova/tests/scheduler_unittest.py +++ b/nova/tests/scheduler_unittest.py @@ -22,7 +22,7 @@ Tests For Scheduler from nova import context from nova import db from nova import flags -from nova import service_eventlet as service +from nova import service from nova import test from nova import rpc from nova import utils diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py index a2bac9af0..9f6d1af7d 100644 --- a/nova/tests/service_unittest.py +++ b/nova/tests/service_unittest.py @@ -26,7 +26,7 @@ from nova import exception from nova import flags from nova import rpc from nova import test -from nova import service_eventlet as service +from nova import service from nova import manager FLAGS = flags.FLAGS |