summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rteval/modules/__init__.py146
1 files changed, 142 insertions, 4 deletions
diff --git a/rteval/modules/__init__.py b/rteval/modules/__init__.py
index bde1832..2e8b25d 100644
--- a/rteval/modules/__init__.py
+++ b/rteval/modules/__init__.py
@@ -24,7 +24,145 @@
from Log import Log
from rtevalConfig import rtevalCfgSection
-import time, libxml2
+import time, libxml2, threading
+
+__all__ = ["rtevalModulePrototype", "ModuleContainer", "RtEvalModules"]
+
+
+class rtevalModulePrototype(threading.Thread):
+ "Prototype class for rteval modules - must be inherited by the real module"
+
+ def __init__(self, modtype, name, logger=None):
+ if logger and not isinstance(logger, Log):
+ raise TypeError("logger attribute is not a Log() object")
+
+ threading.Thread.__init__(self)
+
+ self._module_type = modtype
+ self._name = name
+ self.__logger = logger
+ self.__ready = False
+ self.__events = {"start": threading.Event(),
+ "stop": threading.Event(),
+ "finished": threading.Event()}
+
+
+ def _log(self, logtype, msg):
+ "Common log function for rteval modules"
+ if self.__logger:
+ self.__logger.log(logtype, "[%s] %s" % (self._name, msg))
+
+
+ def isReady(self):
+ "Returns a boolean if the module is ready to run"
+ return self.__ready
+
+
+ def _setReady(self, state=True):
+ "Sets the ready flag for the module"
+ self.__ready = state
+
+
+ def setStart(self):
+ "Sets the start event state"
+ self.__events["start"].set()
+
+
+ def shouldStart(self):
+ "Returns the start event state - indicating the module can start"
+ return self.__events["start"].isSet()
+
+
+ def setStop(self):
+ "Sets the stop event state"
+ self.__events["stop"].set()
+
+
+ def shouldStop(self):
+ "Returns the stop event state - indicating the module should stop"
+ return self.__events["stop"].isSet()
+
+
+ def _setFinished(self):
+ "Sets the finished event state - indicating the module has completed"
+ self.__events["finished"].set()
+
+
+ def WaitForCompletion(self, wtime = None):
+ "Blocks until the module has completed its workload"
+ return self.__events["finished"].wait(wtime)
+
+
+ def _WorkloadSetup(self):
+ "Required module method, which purpose is to do the initial workload setup, preparing for _WorkloadBuild()"
+ raise NotImplementedError("_WorkloadSetup() method must be implemented in the %s module" % self._name)
+
+ def _WorkloadBuild(self):
+ "Required module method, which purpose is to compile additional code needed for the worklaod"
+ raise NotImplementedError("_WorkloadBuild() method must be implemented in the %s module" % self._name)
+
+
+ def _WorkloadPrepare(self):
+ "Required module method, which will initialise and prepare the workload just before it is about to start"
+ raise NotImplementedError("_WorkloadPrepare() method must be implemented in the %s module" % self._name)
+
+
+ def _WorkloadTask(self):
+ "Required module method, which kicks off the workload"
+ raise NotImplementedError("_WorkloadTask() method must be implemented in the %s module" % self._name)
+
+
+ def _WorkloadAlive(self):
+ "Required module method, which should return True if the workload is still alive"
+ raise NotImplementedError("_WorkloadAlive() method must be implemented in the %s module" % self._name)
+
+
+ def _WorkloadCleanup(self):
+ "Required module method, which will be run after the _WorkloadTask() has completed or been aborted by the 'stop event flag'"
+ raise NotImplementedError("_WorkloadCleanup() method must be implemented in the %s module" % self._name)
+
+
+ def run(self):
+ "Workload thread runner - takes care of keeping the workload running as long as needed"
+ if self.shouldStop():
+ return
+
+ # Initial workload setups
+ self._WorkloadSetup()
+
+ # Compile the workload
+ self._WorkloadBuild()
+
+ # Do final preparations of workload before we're ready to start running
+ self._WorkloadPrepare()
+
+ # Wait until we're released
+ while True:
+ if self.shouldStop():
+ return
+ self.__events["start"].wait(1.0)
+ if self.shouldStart():
+ break
+
+ self._log(Log.DEBUG, "starting %s module" % self._module_type)
+ while not self.shouldStop():
+ # Run the workload
+ self._WorkloadTask()
+
+ if self.shouldStop():
+ break
+ if not self._WorkloadAlive():
+ self._log(Log.DEBUG, "%s workload died! bailng out..." % self._module_type)
+ break
+ time.sleep(1.0)
+ self._log(Log.DEBUG, "stopping %s workload" % self._module_type)
+ self._WorkloadCleanup()
+
+
+ def MakeReport(self):
+ "required module method, needs to return an libxml2.xmlNode object with the the results from running"
+ raise NotImplementedError("MakeReport() method must be implemented in the%s module" % self._name)
+
class ModuleContainer(object):
@@ -204,7 +342,7 @@ start their workloads yet"""
nthreads = 0
self._logger.log(Log.INFO, "sending start event to all %s modules" % self._module_type)
for (modname, mod) in self.__modules:
- mod.startevent.set()
+ mod.setStart()
nthreads += 1
return nthreads
@@ -218,7 +356,7 @@ start their workloads yet"""
self._logger.log(Log.INFO, "Stopping %s modules" % self._module_type)
for (modname, mod) in self.__modules:
- mod.stopevent.set()
+ mod.setStop()
self._logger.log(Log.DEBUG, "\t - Stopping %s" % modname)
mod.join(2.0)
@@ -229,7 +367,7 @@ start their workloads yet"""
self._logger.log(Log.INFO, "Waiting for %s modules to complete" % self._module_type)
for (modname, mod) in self.__modules:
self._logger.log(Log.DEBUG, "\t - Waiting for %s" % modname)
- mod.finished.wait(wtime)
+ mod.WaitForCompletion(wtime)
self._logger.log(Log.DEBUG, "All %s modules completed" % self._module_type)