diff options
author | Jan Safranek <jsafrane@redhat.com> | 2013-05-17 13:38:46 +0200 |
---|---|---|
committer | Jan Safranek <jsafrane@redhat.com> | 2013-05-17 13:38:46 +0200 |
commit | 028b8f02d9b204efebdbb97d765b2c18ca33c0a8 (patch) | |
tree | 4244750ecfe4c748f1de21c6277b3e10d34b3a1d | |
parent | 4d644654b44b975594982e083eb1656e06af7df5 (diff) | |
parent | 04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6 (diff) | |
download | openlmi-providers-028b8f02d9b204efebdbb97d765b2c18ca33c0a8.tar.gz openlmi-providers-028b8f02d9b204efebdbb97d765b2c18ca33c0a8.tar.xz openlmi-providers-028b8f02d9b204efebdbb97d765b2c18ca33c0a8.zip |
Merge branch 'devel/stop-threads'
-rw-r--r-- | src/python/openlmi/common/IndicationManager.py | 36 | ||||
-rw-r--r-- | src/python/openlmi/common/JobManager.py | 95 | ||||
-rw-r--r-- | src/python/openlmi/common/TimerManager.py | 29 |
3 files changed, 129 insertions, 31 deletions
diff --git a/src/python/openlmi/common/IndicationManager.py b/src/python/openlmi/common/IndicationManager.py index a17703f..f9a1655 100644 --- a/src/python/openlmi/common/IndicationManager.py +++ b/src/python/openlmi/common/IndicationManager.py @@ -253,6 +253,8 @@ class IndicationManager(singletonmixin.Singleton): """ SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity + COMMAND_STOP = 1 # Command to the IndicationManager thread to stop. + @cmpi_logging.trace_method def __init__(self, env, nameprefix, namespace, ns_interop=None, queue=None): @@ -295,7 +297,7 @@ class IndicationManager(singletonmixin.Singleton): new_broker = ch.PrepareAttachThread() self._indication_sender = threading.Thread( target=self._send_indications_loop, args=(new_broker,)) - self._indication_sender.daemon = True + self._indication_sender.daemon = False self._indication_sender.start() @property @@ -714,7 +716,7 @@ class IndicationManager(singletonmixin.Singleton): :param fltr_id: (``string``) ID of the filter to check. """ with self._access_lock: - return ( class_name in self._filters + return (class_name in self._filters and fltr_id in self._filters[class_name]) def _send_indications_loop(self, broker): @@ -726,9 +728,31 @@ class IndicationManager(singletonmixin.Singleton): """ broker.AttachThread() while True: - indication = self._queue.get() - cmpi_logging.logger.trace_info("Delivering indication %s" % - (str(indication.path))) - broker.DeliverIndication(self.namespace, indication) + command = self._queue.get() + + if isinstance(command, pywbem.CIMInstance) : + indication = command + cmpi_logging.logger.trace_info("Delivering indication %s" % + (str(indication.path))) + broker.DeliverIndication(self.namespace, indication) + + elif isinstance(command, int): + cmpi_logging.logger.trace_info("Received command %d", command) + if command == self.COMMAND_STOP: + if hasattr(self._queue, "task_done"): + self._queue.task_done() + break + if hasattr(self._queue, "task_done"): self._queue.task_done() + + cmpi_logging.logger.info("Stopped Indication thread.") + + @cmpi_logging.trace_method + def shutdown(self): + """ + Stop the thread. This method blocks until the thread is safely + destroyed. + """ + self._queue.put(self.COMMAND_STOP) + self._indication_sender.join() diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py index fc12d7e..9f977c9 100644 --- a/src/python/openlmi/common/JobManager.py +++ b/src/python/openlmi/common/JobManager.py @@ -650,6 +650,8 @@ class JobManager(object): timeout. """ + COMMAND_STOP = 1 + IND_JOB_PERCENT_UPDATED = "PercentUpdated" IND_JOB_SUCCEEDED = "Succeeded" IND_JOB_FAILED = "Failed" @@ -723,7 +725,7 @@ class JobManager(object): # Start the worker thread (don't forget to register it at CIMOM) self.worker = threading.Thread(target=self._worker_main) - self.worker.daemon = True + self.worker.daemon = False self.worker.start() # Various classnames for job-related classes, with correct infixes. @@ -855,30 +857,40 @@ class JobManager(object): This is the main loop of the job queue. It just processes enqueued jobs and never ends. """ + cmpi_logging.logger.info("Started Job thread.") while True: - job = self.queue.get() - # we need to protect from changes between checking state and - # setting new state - job.lock() - if job.job_state == Job.STATE_QUEUED: - # the job was not cancelled - job.change_state(Job.STATE_RUNNING) - job.unlock() - cmpi_logging.logger.info("Starting job %s: '%s'" % - (job.the_id, job.job_name)) - - job.execute() - if job.error: - cmpi_logging.logger.warn("Job %s: '%s' finished with error:" - " %s" % (job.the_id, job.job_name, str(job.error))) - else: - cmpi_logging.logger.info("Job %s: '%s' finished OK" % + command = self.queue.get() + if isinstance(command, Job): + # we need to protect from changes between checking state and + # setting new state + job = command + job.lock() + if job.job_state == Job.STATE_QUEUED: + # the job was not cancelled + job.change_state(Job.STATE_RUNNING) + job.unlock() + cmpi_logging.logger.info("Starting job %s: '%s'" % (job.the_id, job.job_name)) - else: - # just skip suspended and terminated jobs - job.unlock() + + job.execute() + if job.error: + cmpi_logging.logger.warn("Job %s: '%s' finished with error:" + " %s" % (job.the_id, job.job_name, str(job.error))) + else: + cmpi_logging.logger.info("Job %s: '%s' finished OK" % + (job.the_id, job.job_name)) + else: + # just skip suspended and terminated jobs + job.unlock() + + elif isinstance(command, int): + self.queue.task_done() + break + self.queue.task_done() + cmpi_logging.logger.info("Stopped Job thread.") + @cmpi_logging.trace_method def get_next_id(self): """ @@ -906,6 +918,47 @@ class JobManager(object): inst['InstanceID'] = job.get_instance_id() return self.job_provider.get_instance(None, inst) + @cmpi_logging.trace_method + def shutdown(self, timeout=1): + """ + Stop the thread. If a job is running, it may leave the job process + (mkfs, resize2fs, ...) and the worker thread (waiting for the process to + finish) still running. + + JobManager still needs Indication Manager and TimeManager working at + this point! + + :param timeout: Nr. of seconds to wait for the current job. Afterwards + the thread is abandoned, leaving the process still running. + """ + # Empty the queue, we don't want the worker to proceed with any other + # queued job. + while not self.queue.empty(): + queue.get(False) + queue.task_done() + + self.queue.put(self.COMMAND_STOP) + self.worker.join(timeout) + + # Cancel all running/suspended/queued jobs. + # This will send indications. + for job in self.jobs.itervalues(): + if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED, + Job.STATE_RUNNING): + job.cancel() + + if self.worker.isAlive(): + # There is no way, how to stop the thread in Python, so abandon it. + self.worker.daemon = True + self.indication_manager = None + self.timer_manager = None + + def can_shutdown(self): + """ + Return True, if there is no running Job. + """ + return self.queue.empty() + class LMI_ConcreteJob(CIMProvider2): """ diff --git a/src/python/openlmi/common/TimerManager.py b/src/python/openlmi/common/TimerManager.py index 5eb39ae..42d48cd 100644 --- a/src/python/openlmi/common/TimerManager.py +++ b/src/python/openlmi/common/TimerManager.py @@ -94,6 +94,7 @@ class Timer(object): A class representing a timer. A timer has a timeout and after the timeout, given callback is called and the timer is deleted. """ + @cmpi_logging.trace_method def __init__(self, timer_manager, name, callback=None, *args, **kwargs): """ @@ -193,6 +194,10 @@ class TimerManager(singletonmixin.Singleton): you provider initialization. """ + # Commands to the timer thread + COMMAND_STOP = 1 + COMMAND_RESCHEDULE = 2 + @cmpi_logging.trace_method def __init__(self, env=None): """ @@ -215,7 +220,7 @@ class TimerManager(singletonmixin.Singleton): self._timer_thread = threading.Thread( target=self._timer_loop, args=(new_broker,)) - self._timer_thread.daemon = True + self._timer_thread.daemon = False self._timer_thread.start() def create_timer(self, name, callback=None, *args, **kwargs): @@ -239,17 +244,22 @@ class TimerManager(singletonmixin.Singleton): """ if broker: broker.AttachThread() + cmpi_logging.logger.info("Started Timer thread.") while True: self._handle_expired() timeout = self._find_timeout() if timeout != 0: # Wait for the timeout or any change in timers. try: - self._queue.get(timeout=timeout) + command = self._queue.get(timeout=timeout) self._queue.task_done() + if command == self.COMMAND_STOP: + break # stop the thread + # process COMMAND_RESCHEDULE in next loop except Queue.Empty: # Timeout has happened, ignore the exception. pass + cmpi_logging.logger.info("Stopped Timer thread.") @cmpi_logging.trace_method def _handle_expired(self): @@ -320,7 +330,7 @@ class TimerManager(singletonmixin.Singleton): with self._lock: self._timers.append(timer) # Wake up the timer manager thread. - self._queue.put("") + self._queue.put(self.COMMAND_RESCHEDULE) cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name)) @cmpi_logging.trace_method @@ -336,7 +346,7 @@ class TimerManager(singletonmixin.Singleton): except ValueError: pass # Wake up the timer manager thread. - self._queue.put("") + self._queue.put(self.COMMAND_RESCHEDULE) cmpi_logging.logger.trace_info("Timer: Timer %s removed" % (timer._name)) @@ -350,6 +360,15 @@ class TimerManager(singletonmixin.Singleton): """ return self._clock.now() + @cmpi_logging.trace_method + def shutdown(self): + """ + Stop the thread. This method blocks until the thread is safely + destroyed. + """ + self._queue.put(self.COMMAND_STOP) + self._timer_thread.join() + if __name__ == "__main__": cmpi_logging.logger = cmpi_logging.CMPILogger("") import time @@ -398,3 +417,5 @@ if __name__ == "__main__": t15.start(1.5) time.sleep(4) + + mgr.stop_thread() |