diff options
author | Jan Safranek <jsafrane@redhat.com> | 2013-05-16 09:23:31 +0200 |
---|---|---|
committer | Jan Safranek <jsafrane@redhat.com> | 2013-05-16 09:23:31 +0200 |
commit | 04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6 (patch) | |
tree | 620eed4fc8442e1264656ca047bb6ce07e6fb624 /src | |
parent | 0d2ff2523333f4b93154a46d70e1c8143ac789e7 (diff) | |
download | openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.gz openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.xz openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.zip |
Add method to stop the running thread to all Managers.
We should cleanly shut down all threads when the provider is unloaded,
just to be nice to CIMOMs.
Diffstat (limited to 'src')
-rw-r--r-- | src/python/openlmi/common/IndicationManager.py | 36 | ||||
-rw-r--r-- | src/python/openlmi/common/JobManager.py | 95 | ||||
-rw-r--r-- | src/python/openlmi/common/TimerManager.py | 29 |
3 files changed, 129 insertions, 31 deletions
diff --git a/src/python/openlmi/common/IndicationManager.py b/src/python/openlmi/common/IndicationManager.py index a17703f..f9a1655 100644 --- a/src/python/openlmi/common/IndicationManager.py +++ b/src/python/openlmi/common/IndicationManager.py @@ -253,6 +253,8 @@ class IndicationManager(singletonmixin.Singleton): """ SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity + COMMAND_STOP = 1 # Command to the IndicationManager thread to stop. + @cmpi_logging.trace_method def __init__(self, env, nameprefix, namespace, ns_interop=None, queue=None): @@ -295,7 +297,7 @@ class IndicationManager(singletonmixin.Singleton): new_broker = ch.PrepareAttachThread() self._indication_sender = threading.Thread( target=self._send_indications_loop, args=(new_broker,)) - self._indication_sender.daemon = True + self._indication_sender.daemon = False self._indication_sender.start() @property @@ -714,7 +716,7 @@ class IndicationManager(singletonmixin.Singleton): :param fltr_id: (``string``) ID of the filter to check. """ with self._access_lock: - return ( class_name in self._filters + return (class_name in self._filters and fltr_id in self._filters[class_name]) def _send_indications_loop(self, broker): @@ -726,9 +728,31 @@ class IndicationManager(singletonmixin.Singleton): """ broker.AttachThread() while True: - indication = self._queue.get() - cmpi_logging.logger.trace_info("Delivering indication %s" % - (str(indication.path))) - broker.DeliverIndication(self.namespace, indication) + command = self._queue.get() + + if isinstance(command, pywbem.CIMInstance) : + indication = command + cmpi_logging.logger.trace_info("Delivering indication %s" % + (str(indication.path))) + broker.DeliverIndication(self.namespace, indication) + + elif isinstance(command, int): + cmpi_logging.logger.trace_info("Received command %d", command) + if command == self.COMMAND_STOP: + if hasattr(self._queue, "task_done"): + self._queue.task_done() + break + if hasattr(self._queue, "task_done"): self._queue.task_done() + + cmpi_logging.logger.info("Stopped Indication thread.") + + @cmpi_logging.trace_method + def shutdown(self): + """ + Stop the thread. This method blocks until the thread is safely + destroyed. + """ + self._queue.put(self.COMMAND_STOP) + self._indication_sender.join() diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py index fc12d7e..9f977c9 100644 --- a/src/python/openlmi/common/JobManager.py +++ b/src/python/openlmi/common/JobManager.py @@ -650,6 +650,8 @@ class JobManager(object): timeout. """ + COMMAND_STOP = 1 + IND_JOB_PERCENT_UPDATED = "PercentUpdated" IND_JOB_SUCCEEDED = "Succeeded" IND_JOB_FAILED = "Failed" @@ -723,7 +725,7 @@ class JobManager(object): # Start the worker thread (don't forget to register it at CIMOM) self.worker = threading.Thread(target=self._worker_main) - self.worker.daemon = True + self.worker.daemon = False self.worker.start() # Various classnames for job-related classes, with correct infixes. @@ -855,30 +857,40 @@ class JobManager(object): This is the main loop of the job queue. It just processes enqueued jobs and never ends. """ + cmpi_logging.logger.info("Started Job thread.") while True: - job = self.queue.get() - # we need to protect from changes between checking state and - # setting new state - job.lock() - if job.job_state == Job.STATE_QUEUED: - # the job was not cancelled - job.change_state(Job.STATE_RUNNING) - job.unlock() - cmpi_logging.logger.info("Starting job %s: '%s'" % - (job.the_id, job.job_name)) - - job.execute() - if job.error: - cmpi_logging.logger.warn("Job %s: '%s' finished with error:" - " %s" % (job.the_id, job.job_name, str(job.error))) - else: - cmpi_logging.logger.info("Job %s: '%s' finished OK" % + command = self.queue.get() + if isinstance(command, Job): + # we need to protect from changes between checking state and + # setting new state + job = command + job.lock() + if job.job_state == Job.STATE_QUEUED: + # the job was not cancelled + job.change_state(Job.STATE_RUNNING) + job.unlock() + cmpi_logging.logger.info("Starting job %s: '%s'" % (job.the_id, job.job_name)) - else: - # just skip suspended and terminated jobs - job.unlock() + + job.execute() + if job.error: + cmpi_logging.logger.warn("Job %s: '%s' finished with error:" + " %s" % (job.the_id, job.job_name, str(job.error))) + else: + cmpi_logging.logger.info("Job %s: '%s' finished OK" % + (job.the_id, job.job_name)) + else: + # just skip suspended and terminated jobs + job.unlock() + + elif isinstance(command, int): + self.queue.task_done() + break + self.queue.task_done() + cmpi_logging.logger.info("Stopped Job thread.") + @cmpi_logging.trace_method def get_next_id(self): """ @@ -906,6 +918,47 @@ class JobManager(object): inst['InstanceID'] = job.get_instance_id() return self.job_provider.get_instance(None, inst) + @cmpi_logging.trace_method + def shutdown(self, timeout=1): + """ + Stop the thread. If a job is running, it may leave the job process + (mkfs, resize2fs, ...) and the worker thread (waiting for the process to + finish) still running. + + JobManager still needs Indication Manager and TimeManager working at + this point! + + :param timeout: Nr. of seconds to wait for the current job. Afterwards + the thread is abandoned, leaving the process still running. + """ + # Empty the queue, we don't want the worker to proceed with any other + # queued job. + while not self.queue.empty(): + queue.get(False) + queue.task_done() + + self.queue.put(self.COMMAND_STOP) + self.worker.join(timeout) + + # Cancel all running/suspended/queued jobs. + # This will send indications. + for job in self.jobs.itervalues(): + if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED, + Job.STATE_RUNNING): + job.cancel() + + if self.worker.isAlive(): + # There is no way, how to stop the thread in Python, so abandon it. + self.worker.daemon = True + self.indication_manager = None + self.timer_manager = None + + def can_shutdown(self): + """ + Return True, if there is no running Job. + """ + return self.queue.empty() + class LMI_ConcreteJob(CIMProvider2): """ diff --git a/src/python/openlmi/common/TimerManager.py b/src/python/openlmi/common/TimerManager.py index 5eb39ae..42d48cd 100644 --- a/src/python/openlmi/common/TimerManager.py +++ b/src/python/openlmi/common/TimerManager.py @@ -94,6 +94,7 @@ class Timer(object): A class representing a timer. A timer has a timeout and after the timeout, given callback is called and the timer is deleted. """ + @cmpi_logging.trace_method def __init__(self, timer_manager, name, callback=None, *args, **kwargs): """ @@ -193,6 +194,10 @@ class TimerManager(singletonmixin.Singleton): you provider initialization. """ + # Commands to the timer thread + COMMAND_STOP = 1 + COMMAND_RESCHEDULE = 2 + @cmpi_logging.trace_method def __init__(self, env=None): """ @@ -215,7 +220,7 @@ class TimerManager(singletonmixin.Singleton): self._timer_thread = threading.Thread( target=self._timer_loop, args=(new_broker,)) - self._timer_thread.daemon = True + self._timer_thread.daemon = False self._timer_thread.start() def create_timer(self, name, callback=None, *args, **kwargs): @@ -239,17 +244,22 @@ class TimerManager(singletonmixin.Singleton): """ if broker: broker.AttachThread() + cmpi_logging.logger.info("Started Timer thread.") while True: self._handle_expired() timeout = self._find_timeout() if timeout != 0: # Wait for the timeout or any change in timers. try: - self._queue.get(timeout=timeout) + command = self._queue.get(timeout=timeout) self._queue.task_done() + if command == self.COMMAND_STOP: + break # stop the thread + # process COMMAND_RESCHEDULE in next loop except Queue.Empty: # Timeout has happened, ignore the exception. pass + cmpi_logging.logger.info("Stopped Timer thread.") @cmpi_logging.trace_method def _handle_expired(self): @@ -320,7 +330,7 @@ class TimerManager(singletonmixin.Singleton): with self._lock: self._timers.append(timer) # Wake up the timer manager thread. - self._queue.put("") + self._queue.put(self.COMMAND_RESCHEDULE) cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name)) @cmpi_logging.trace_method @@ -336,7 +346,7 @@ class TimerManager(singletonmixin.Singleton): except ValueError: pass # Wake up the timer manager thread. - self._queue.put("") + self._queue.put(self.COMMAND_RESCHEDULE) cmpi_logging.logger.trace_info("Timer: Timer %s removed" % (timer._name)) @@ -350,6 +360,15 @@ class TimerManager(singletonmixin.Singleton): """ return self._clock.now() + @cmpi_logging.trace_method + def shutdown(self): + """ + Stop the thread. This method blocks until the thread is safely + destroyed. + """ + self._queue.put(self.COMMAND_STOP) + self._timer_thread.join() + if __name__ == "__main__": cmpi_logging.logger = cmpi_logging.CMPILogger("") import time @@ -398,3 +417,5 @@ if __name__ == "__main__": t15.start(1.5) time.sleep(4) + + mgr.stop_thread() |