diff options
-rw-r--r-- | rteval/modules/__init__.py | 146 |
1 files changed, 142 insertions, 4 deletions
diff --git a/rteval/modules/__init__.py b/rteval/modules/__init__.py index bde1832..2e8b25d 100644 --- a/rteval/modules/__init__.py +++ b/rteval/modules/__init__.py @@ -24,7 +24,145 @@ from Log import Log from rtevalConfig import rtevalCfgSection -import time, libxml2 +import time, libxml2, threading + +__all__ = ["rtevalModulePrototype", "ModuleContainer", "RtEvalModules"] + + +class rtevalModulePrototype(threading.Thread): + "Prototype class for rteval modules - must be inherited by the real module" + + def __init__(self, modtype, name, logger=None): + if logger and not isinstance(logger, Log): + raise TypeError("logger attribute is not a Log() object") + + threading.Thread.__init__(self) + + self._module_type = modtype + self._name = name + self.__logger = logger + self.__ready = False + self.__events = {"start": threading.Event(), + "stop": threading.Event(), + "finished": threading.Event()} + + + def _log(self, logtype, msg): + "Common log function for rteval modules" + if self.__logger: + self.__logger.log(logtype, "[%s] %s" % (self._name, msg)) + + + def isReady(self): + "Returns a boolean if the module is ready to run" + return self.__ready + + + def _setReady(self, state=True): + "Sets the ready flag for the module" + self.__ready = state + + + def setStart(self): + "Sets the start event state" + self.__events["start"].set() + + + def shouldStart(self): + "Returns the start event state - indicating the module can start" + return self.__events["start"].isSet() + + + def setStop(self): + "Sets the stop event state" + self.__events["stop"].set() + + + def shouldStop(self): + "Returns the stop event state - indicating the module should stop" + return self.__events["stop"].isSet() + + + def _setFinished(self): + "Sets the finished event state - indicating the module has completed" + self.__events["finished"].set() + + + def WaitForCompletion(self, wtime = None): + "Blocks until the module has completed its workload" + return self.__events["finished"].wait(wtime) + + + def _WorkloadSetup(self): + "Required module method, which purpose is to do the initial workload setup, preparing for _WorkloadBuild()" + raise NotImplementedError("_WorkloadSetup() method must be implemented in the %s module" % self._name) + + def _WorkloadBuild(self): + "Required module method, which purpose is to compile additional code needed for the worklaod" + raise NotImplementedError("_WorkloadBuild() method must be implemented in the %s module" % self._name) + + + def _WorkloadPrepare(self): + "Required module method, which will initialise and prepare the workload just before it is about to start" + raise NotImplementedError("_WorkloadPrepare() method must be implemented in the %s module" % self._name) + + + def _WorkloadTask(self): + "Required module method, which kicks off the workload" + raise NotImplementedError("_WorkloadTask() method must be implemented in the %s module" % self._name) + + + def _WorkloadAlive(self): + "Required module method, which should return True if the workload is still alive" + raise NotImplementedError("_WorkloadAlive() method must be implemented in the %s module" % self._name) + + + def _WorkloadCleanup(self): + "Required module method, which will be run after the _WorkloadTask() has completed or been aborted by the 'stop event flag'" + raise NotImplementedError("_WorkloadCleanup() method must be implemented in the %s module" % self._name) + + + def run(self): + "Workload thread runner - takes care of keeping the workload running as long as needed" + if self.shouldStop(): + return + + # Initial workload setups + self._WorkloadSetup() + + # Compile the workload + self._WorkloadBuild() + + # Do final preparations of workload before we're ready to start running + self._WorkloadPrepare() + + # Wait until we're released + while True: + if self.shouldStop(): + return + self.__events["start"].wait(1.0) + if self.shouldStart(): + break + + self._log(Log.DEBUG, "starting %s module" % self._module_type) + while not self.shouldStop(): + # Run the workload + self._WorkloadTask() + + if self.shouldStop(): + break + if not self._WorkloadAlive(): + self._log(Log.DEBUG, "%s workload died! bailng out..." % self._module_type) + break + time.sleep(1.0) + self._log(Log.DEBUG, "stopping %s workload" % self._module_type) + self._WorkloadCleanup() + + + def MakeReport(self): + "required module method, needs to return an libxml2.xmlNode object with the the results from running" + raise NotImplementedError("MakeReport() method must be implemented in the%s module" % self._name) + class ModuleContainer(object): @@ -204,7 +342,7 @@ start their workloads yet""" nthreads = 0 self._logger.log(Log.INFO, "sending start event to all %s modules" % self._module_type) for (modname, mod) in self.__modules: - mod.startevent.set() + mod.setStart() nthreads += 1 return nthreads @@ -218,7 +356,7 @@ start their workloads yet""" self._logger.log(Log.INFO, "Stopping %s modules" % self._module_type) for (modname, mod) in self.__modules: - mod.stopevent.set() + mod.setStop() self._logger.log(Log.DEBUG, "\t - Stopping %s" % modname) mod.join(2.0) @@ -229,7 +367,7 @@ start their workloads yet""" self._logger.log(Log.INFO, "Waiting for %s modules to complete" % self._module_type) for (modname, mod) in self.__modules: self._logger.log(Log.DEBUG, "\t - Waiting for %s" % modname) - mod.finished.wait(wtime) + mod.WaitForCompletion(wtime) self._logger.log(Log.DEBUG, "All %s modules completed" % self._module_type) |