summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/nova-combined14
-rwxr-xr-xbin/nova-compute6
-rwxr-xr-xbin/nova-network6
-rwxr-xr-xbin/nova-scheduler6
-rwxr-xr-xbin/nova-volume6
-rw-r--r--nova/service.py169
-rw-r--r--nova/service_eventlet.py303
-rw-r--r--nova/tests/scheduler_unittest.py2
-rw-r--r--nova/tests/service_unittest.py2
9 files changed, 160 insertions, 354 deletions
diff --git a/bin/nova-combined b/bin/nova-combined
index c86584328..c70d1d9ef 100755
--- a/bin/nova-combined
+++ b/bin/nova-combined
@@ -37,7 +37,7 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
from nova import api
from nova import flags
-from nova import service_eventlet
+from nova import service
from nova import wsgi
@@ -54,13 +54,13 @@ flags.DEFINE_string('ec2api_host', '0.0.0.0', 'EC2 API host')
if __name__ == '__main__':
FLAGS(sys.argv)
- compute = service_eventlet.Service.create(binary='nova-compute')
- network = service_eventlet.Service.create(binary='nova-network')
- volume = service_eventlet.Service.create(binary='nova-volume')
- scheduler = service_eventlet.Service.create(binary='nova-scheduler')
- #objectstore = service_eventlet.Service.create(binary='nova-objectstore')
+ compute = service.Service.create(binary='nova-compute')
+ network = service.Service.create(binary='nova-network')
+ volume = service.Service.create(binary='nova-volume')
+ scheduler = service.Service.create(binary='nova-scheduler')
+ #objectstore = service.Service.create(binary='nova-objectstore')
- service_eventlet.serve(compute, network, volume, scheduler)
+ service.serve(compute, network, volume, scheduler)
server = wsgi.Server()
server.start(api.API('os'), FLAGS.osapi_port, host=FLAGS.osapi_host)
diff --git a/bin/nova-compute b/bin/nova-compute
index 307f7cb59..4baf47e29 100755
--- a/bin/nova-compute
+++ b/bin/nova-compute
@@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
-from nova import service_eventlet
+from nova import service
if __name__ == '__main__':
- service_eventlet.serve()
- service_eventlet.wait()
+ service.serve()
+ service.wait()
diff --git a/bin/nova-network b/bin/nova-network
index 307f7cb59..4baf47e29 100755
--- a/bin/nova-network
+++ b/bin/nova-network
@@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
-from nova import service_eventlet
+from nova import service
if __name__ == '__main__':
- service_eventlet.serve()
- service_eventlet.wait()
+ service.serve()
+ service.wait()
diff --git a/bin/nova-scheduler b/bin/nova-scheduler
index e4aa0dcae..a1b7ddf60 100755
--- a/bin/nova-scheduler
+++ b/bin/nova-scheduler
@@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
-from nova import service_eventlet
+from nova import service
if __name__ == '__main__':
- service_eventlet.serve()
- service_eventlet.wait()
+ service.serve()
+ service.wait()
diff --git a/bin/nova-volume b/bin/nova-volume
index 395d16041..ba4a3a502 100755
--- a/bin/nova-volume
+++ b/bin/nova-volume
@@ -35,8 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
-from nova import service_eventlet
+from nova import service
if __name__ == '__main__':
- service_eventlet.serve()
- service_eventlet.wait()
+ service.serve()
+ service.wait()
diff --git a/nova/service.py b/nova/service.py
index 55a0bb212..576cf8b85 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -17,21 +17,17 @@
# under the License.
"""
-A service is a very thin wrapper around a Manager object. It exposes the
-manager's public methods to other components of the system via rpc. It will
-report state periodically to the database and is responsible for initiating
-any periodic tasts that need to be executed on a given host.
-
-This module contains Service, a generic baseclass for all workers.
+Generic Node baseclass for all workers that run on hosts
"""
import inspect
import logging
import os
+import sys
-from twisted.internet import defer
-from twisted.internet import task
-from twisted.application import service
+from eventlet import event
+from eventlet import greenthread
+from eventlet import greenpool
from nova import context
from nova import db
@@ -50,8 +46,16 @@ flags.DEFINE_integer('periodic_interval', 60,
'seconds between running periodic tasks',
lower_bound=1)
+flags.DEFINE_string('pidfile', None,
+ 'pidfile to use for this service')
+
+
+flags.DEFINE_flag(flags.HelpFlag())
+flags.DEFINE_flag(flags.HelpshortFlag())
+flags.DEFINE_flag(flags.HelpXMLFlag())
+
-class Service(object, service.Service):
+class Service(object):
"""Base class for workers that run on hosts."""
def __init__(self, host, binary, topic, manager, report_interval=None,
@@ -64,8 +68,9 @@ class Service(object, service.Service):
self.periodic_interval = periodic_interval
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
+ self.timers = []
- def startService(self): # pylint: disable-msg C0103
+ def start(self):
manager_class = utils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *self.saved_args,
**self.saved_kwargs)
@@ -80,26 +85,29 @@ class Service(object, service.Service):
except exception.NotFound:
self._create_service_ref(ctxt)
- conn = rpc.Connection.instance()
+ conn1 = rpc.Connection.instance(new=True)
+ conn2 = rpc.Connection.instance(new=True)
if self.report_interval:
consumer_all = rpc.AdapterConsumer(
- connection=conn,
+ connection=conn1,
topic=self.topic,
proxy=self)
consumer_node = rpc.AdapterConsumer(
- connection=conn,
+ connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
- consumer_all.attach_to_twisted()
- consumer_node.attach_to_twisted()
-
- pulse = task.LoopingCall(self.report_state)
+ self.timers.append(consumer_all.attach_to_eventlet())
+ self.timers.append(consumer_node.attach_to_eventlet())
+
+ pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
+ self.timers.append(pulse)
if self.periodic_interval:
- pulse = task.LoopingCall(self.periodic_tasks)
- pulse.start(interval=self.periodic_interval, now=False)
+ periodic = utils.LoopingCall(self.periodic_tasks)
+ periodic.start(interval=self.periodic_interval, now=False)
+ self.timers.append(periodic)
def _create_service_ref(self, context):
service_ref = db.service_create(context,
@@ -114,7 +122,7 @@ class Service(object, service.Service):
return getattr(manager, key)
@classmethod
- def create(cls,
+ def create(cls,
host=None,
binary=None,
topic=None,
@@ -147,24 +155,28 @@ class Service(object, service.Service):
service_obj = cls(host, binary, topic, manager,
report_interval, periodic_interval)
- # This is the parent service that twistd will be looking for when it
- # parses this file, return it so that we can get it into globals.
- application = service.Application(binary)
- service_obj.setServiceParent(application)
- return application
+ return service_obj
def kill(self):
"""Destroy the service object in the datastore"""
+ self.stop()
try:
db.service_destroy(context.get_admin_context(), self.service_id)
except exception.NotFound:
logging.warn("Service killed that has no database entry")
+ def stop(self):
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception:
+ pass
+ self.timers = []
+
def periodic_tasks(self):
"""Tasks to be run at a periodic interval"""
- yield self.manager.periodic_tasks(context.get_admin_context())
+ self.manager.periodic_tasks(context.get_admin_context())
- @defer.inlineCallbacks
def report_state(self):
"""Update the state of this service in the datastore."""
ctxt = context.get_admin_context()
@@ -180,7 +192,7 @@ class Service(object, service.Service):
db.service_update(ctxt,
self.service_id,
{'report_count': service_ref['report_count'] + 1})
-
+
# TODO(termie): make this pattern be more elegant.
if getattr(self, "model_disconnected", False):
self.model_disconnected = False
@@ -191,4 +203,101 @@ class Service(object, service.Service):
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
- yield
+
+
+def stop(pidfile):
+ """
+ Stop the daemon
+ """
+ # Get the pid from the pidfile
+ try:
+ pf = file(pidfile, 'r')
+ pid = int(pf.read().strip())
+ pf.close()
+ except IOError:
+ pid = None
+
+ if not pid:
+ message = "pidfile %s does not exist. Daemon not running?\n"
+ sys.stderr.write(message % pidfile)
+ # Not an error in a restart
+ return
+
+ # Try killing the daemon process
+ try:
+ while 1:
+ os.kill(pid, signal.SIGKILL)
+ time.sleep(0.1)
+ except OSError, err:
+ err = str(err)
+ if err.find("No such process") > 0:
+ if os.path.exists(pidfile):
+ os.remove(pidfile)
+ else:
+ print str(err)
+ sys.exit(1)
+
+
+def serve(*services):
+ argv = FLAGS(sys.argv)
+
+ if not services:
+ services = [Service.create()]
+
+ name = '_'.join(x.binary for x in services)
+ logging.debug("Serving %s" % name)
+
+ logging.getLogger('amqplib').setLevel(logging.DEBUG)
+
+ if not FLAGS.pidfile:
+ FLAGS.pidfile = '%s.pid' % name
+ # NOTE(vish): if we're running nodaemon, redirect the log to stdout
+ #if FLAGS.nodaemon and not FLAGS.logfile:
+ # FLAGS.logfile = "-"
+ #if not FLAGS.logfile:
+ # FLAGS.logfile = '%s.log' % name
+ #if not FLAGS.prefix:
+ # FLAGS.prefix = name
+ #elif FLAGS.prefix.endswith('twisted'):
+ # FLAGS.prefix = FLAGS.prefix.replace('twisted', name)
+
+ action = 'start'
+ if len(argv) > 1:
+ action = argv.pop()
+
+ if action == 'stop':
+ stop(FLAGS.pidfile)
+ sys.exit()
+ elif action == 'restart':
+ stop(FLAGS.pidfile)
+ elif action == 'start':
+ pass
+ else:
+ print 'usage: %s [options] [start|stop|restart]' % argv[0]
+ sys.exit(1)
+
+ #formatter = logging.Formatter(
+ # '(%(name)s): %(levelname)s %(message)s')
+ #handler = logging.StreamHandler()
+ #handler.setFormatter(formatter)
+ #logging.getLogger().addHandler(handler)
+
+ if FLAGS.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ else:
+ logging.getLogger().setLevel(logging.WARNING)
+
+ logging.debug("Full set of FLAGS:")
+ for flag in FLAGS:
+ logging.debug("%s : %s" % (flag, FLAGS.get(flag, None)))
+
+ for x in services:
+ x.start()
+
+ #while True:
+ # greenthread.sleep(5)
+
+
+def wait():
+ while True:
+ greenthread.sleep(5)
diff --git a/nova/service_eventlet.py b/nova/service_eventlet.py
deleted file mode 100644
index 576cf8b85..000000000
--- a/nova/service_eventlet.py
+++ /dev/null
@@ -1,303 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Generic Node baseclass for all workers that run on hosts
-"""
-
-import inspect
-import logging
-import os
-import sys
-
-from eventlet import event
-from eventlet import greenthread
-from eventlet import greenpool
-
-from nova import context
-from nova import db
-from nova import exception
-from nova import flags
-from nova import rpc
-from nova import utils
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_integer('report_interval', 10,
- 'seconds between nodes reporting state to datastore',
- lower_bound=1)
-
-flags.DEFINE_integer('periodic_interval', 60,
- 'seconds between running periodic tasks',
- lower_bound=1)
-
-flags.DEFINE_string('pidfile', None,
- 'pidfile to use for this service')
-
-
-flags.DEFINE_flag(flags.HelpFlag())
-flags.DEFINE_flag(flags.HelpshortFlag())
-flags.DEFINE_flag(flags.HelpXMLFlag())
-
-
-class Service(object):
- """Base class for workers that run on hosts."""
-
- def __init__(self, host, binary, topic, manager, report_interval=None,
- periodic_interval=None, *args, **kwargs):
- self.host = host
- self.binary = binary
- self.topic = topic
- self.manager_class_name = manager
- self.report_interval = report_interval
- self.periodic_interval = periodic_interval
- super(Service, self).__init__(*args, **kwargs)
- self.saved_args, self.saved_kwargs = args, kwargs
- self.timers = []
-
- def start(self):
- manager_class = utils.import_class(self.manager_class_name)
- self.manager = manager_class(host=self.host, *self.saved_args,
- **self.saved_kwargs)
- self.manager.init_host()
- self.model_disconnected = False
- ctxt = context.get_admin_context()
- try:
- service_ref = db.service_get_by_args(ctxt,
- self.host,
- self.binary)
- self.service_id = service_ref['id']
- except exception.NotFound:
- self._create_service_ref(ctxt)
-
- conn1 = rpc.Connection.instance(new=True)
- conn2 = rpc.Connection.instance(new=True)
- if self.report_interval:
- consumer_all = rpc.AdapterConsumer(
- connection=conn1,
- topic=self.topic,
- proxy=self)
- consumer_node = rpc.AdapterConsumer(
- connection=conn2,
- topic='%s.%s' % (self.topic, self.host),
- proxy=self)
-
- self.timers.append(consumer_all.attach_to_eventlet())
- self.timers.append(consumer_node.attach_to_eventlet())
-
- pulse = utils.LoopingCall(self.report_state)
- pulse.start(interval=self.report_interval, now=False)
- self.timers.append(pulse)
-
- if self.periodic_interval:
- periodic = utils.LoopingCall(self.periodic_tasks)
- periodic.start(interval=self.periodic_interval, now=False)
- self.timers.append(periodic)
-
- def _create_service_ref(self, context):
- service_ref = db.service_create(context,
- {'host': self.host,
- 'binary': self.binary,
- 'topic': self.topic,
- 'report_count': 0})
- self.service_id = service_ref['id']
-
- def __getattr__(self, key):
- manager = self.__dict__.get('manager', None)
- return getattr(manager, key)
-
- @classmethod
- def create(cls,
- host=None,
- binary=None,
- topic=None,
- manager=None,
- report_interval=None,
- periodic_interval=None):
- """Instantiates class and passes back application object.
-
- Args:
- host, defaults to FLAGS.host
- binary, defaults to basename of executable
- topic, defaults to bin_name - "nova-" part
- manager, defaults to FLAGS.<topic>_manager
- report_interval, defaults to FLAGS.report_interval
- periodic_interval, defaults to FLAGS.periodic_interval
- """
- if not host:
- host = FLAGS.host
- if not binary:
- binary = os.path.basename(inspect.stack()[-1][1])
- if not topic:
- topic = binary.rpartition("nova-")[2]
- if not manager:
- manager = FLAGS.get('%s_manager' % topic, None)
- if not report_interval:
- report_interval = FLAGS.report_interval
- if not periodic_interval:
- periodic_interval = FLAGS.periodic_interval
- logging.warn("Starting %s node", topic)
- service_obj = cls(host, binary, topic, manager,
- report_interval, periodic_interval)
-
- return service_obj
-
- def kill(self):
- """Destroy the service object in the datastore"""
- self.stop()
- try:
- db.service_destroy(context.get_admin_context(), self.service_id)
- except exception.NotFound:
- logging.warn("Service killed that has no database entry")
-
- def stop(self):
- for x in self.timers:
- try:
- x.stop()
- except Exception:
- pass
- self.timers = []
-
- def periodic_tasks(self):
- """Tasks to be run at a periodic interval"""
- self.manager.periodic_tasks(context.get_admin_context())
-
- def report_state(self):
- """Update the state of this service in the datastore."""
- ctxt = context.get_admin_context()
- try:
- try:
- service_ref = db.service_get(ctxt, self.service_id)
- except exception.NotFound:
- logging.debug("The service database object disappeared, "
- "Recreating it.")
- self._create_service_ref(ctxt)
- service_ref = db.service_get(ctxt, self.service_id)
-
- db.service_update(ctxt,
- self.service_id,
- {'report_count': service_ref['report_count'] + 1})
-
- # TODO(termie): make this pattern be more elegant.
- if getattr(self, "model_disconnected", False):
- self.model_disconnected = False
- logging.error("Recovered model server connection!")
-
- # TODO(vish): this should probably only catch connection errors
- except Exception: # pylint: disable-msg=W0702
- if not getattr(self, "model_disconnected", False):
- self.model_disconnected = True
- logging.exception("model server went away")
-
-
-def stop(pidfile):
- """
- Stop the daemon
- """
- # Get the pid from the pidfile
- try:
- pf = file(pidfile, 'r')
- pid = int(pf.read().strip())
- pf.close()
- except IOError:
- pid = None
-
- if not pid:
- message = "pidfile %s does not exist. Daemon not running?\n"
- sys.stderr.write(message % pidfile)
- # Not an error in a restart
- return
-
- # Try killing the daemon process
- try:
- while 1:
- os.kill(pid, signal.SIGKILL)
- time.sleep(0.1)
- except OSError, err:
- err = str(err)
- if err.find("No such process") > 0:
- if os.path.exists(pidfile):
- os.remove(pidfile)
- else:
- print str(err)
- sys.exit(1)
-
-
-def serve(*services):
- argv = FLAGS(sys.argv)
-
- if not services:
- services = [Service.create()]
-
- name = '_'.join(x.binary for x in services)
- logging.debug("Serving %s" % name)
-
- logging.getLogger('amqplib').setLevel(logging.DEBUG)
-
- if not FLAGS.pidfile:
- FLAGS.pidfile = '%s.pid' % name
- # NOTE(vish): if we're running nodaemon, redirect the log to stdout
- #if FLAGS.nodaemon and not FLAGS.logfile:
- # FLAGS.logfile = "-"
- #if not FLAGS.logfile:
- # FLAGS.logfile = '%s.log' % name
- #if not FLAGS.prefix:
- # FLAGS.prefix = name
- #elif FLAGS.prefix.endswith('twisted'):
- # FLAGS.prefix = FLAGS.prefix.replace('twisted', name)
-
- action = 'start'
- if len(argv) > 1:
- action = argv.pop()
-
- if action == 'stop':
- stop(FLAGS.pidfile)
- sys.exit()
- elif action == 'restart':
- stop(FLAGS.pidfile)
- elif action == 'start':
- pass
- else:
- print 'usage: %s [options] [start|stop|restart]' % argv[0]
- sys.exit(1)
-
- #formatter = logging.Formatter(
- # '(%(name)s): %(levelname)s %(message)s')
- #handler = logging.StreamHandler()
- #handler.setFormatter(formatter)
- #logging.getLogger().addHandler(handler)
-
- if FLAGS.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
- else:
- logging.getLogger().setLevel(logging.WARNING)
-
- logging.debug("Full set of FLAGS:")
- for flag in FLAGS:
- logging.debug("%s : %s" % (flag, FLAGS.get(flag, None)))
-
- for x in services:
- x.start()
-
- #while True:
- # greenthread.sleep(5)
-
-
-def wait():
- while True:
- greenthread.sleep(5)
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
index 37d15567f..f442a4bc2 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/scheduler_unittest.py
@@ -22,7 +22,7 @@ Tests For Scheduler
from nova import context
from nova import db
from nova import flags
-from nova import service_eventlet as service
+from nova import service
from nova import test
from nova import rpc
from nova import utils
diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py
index a2bac9af0..9f6d1af7d 100644
--- a/nova/tests/service_unittest.py
+++ b/nova/tests/service_unittest.py
@@ -26,7 +26,7 @@ from nova import exception
from nova import flags
from nova import rpc
from nova import test
-from nova import service_eventlet as service
+from nova import service
from nova import manager
FLAGS = flags.FLAGS