summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJan Safranek <jsafrane@redhat.com>2013-05-17 13:38:46 +0200
committerJan Safranek <jsafrane@redhat.com>2013-05-17 13:38:46 +0200
commit028b8f02d9b204efebdbb97d765b2c18ca33c0a8 (patch)
tree4244750ecfe4c748f1de21c6277b3e10d34b3a1d
parent4d644654b44b975594982e083eb1656e06af7df5 (diff)
parent04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6 (diff)
downloadopenlmi-providers-028b8f02d9b204efebdbb97d765b2c18ca33c0a8.tar.gz
openlmi-providers-028b8f02d9b204efebdbb97d765b2c18ca33c0a8.tar.xz
openlmi-providers-028b8f02d9b204efebdbb97d765b2c18ca33c0a8.zip
Merge branch 'devel/stop-threads'
-rw-r--r--src/python/openlmi/common/IndicationManager.py36
-rw-r--r--src/python/openlmi/common/JobManager.py95
-rw-r--r--src/python/openlmi/common/TimerManager.py29
3 files changed, 129 insertions, 31 deletions
diff --git a/src/python/openlmi/common/IndicationManager.py b/src/python/openlmi/common/IndicationManager.py
index a17703f..f9a1655 100644
--- a/src/python/openlmi/common/IndicationManager.py
+++ b/src/python/openlmi/common/IndicationManager.py
@@ -253,6 +253,8 @@ class IndicationManager(singletonmixin.Singleton):
"""
SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity
+ COMMAND_STOP = 1 # Command to the IndicationManager thread to stop.
+
@cmpi_logging.trace_method
def __init__(self, env, nameprefix, namespace, ns_interop=None,
queue=None):
@@ -295,7 +297,7 @@ class IndicationManager(singletonmixin.Singleton):
new_broker = ch.PrepareAttachThread()
self._indication_sender = threading.Thread(
target=self._send_indications_loop, args=(new_broker,))
- self._indication_sender.daemon = True
+ self._indication_sender.daemon = False
self._indication_sender.start()
@property
@@ -714,7 +716,7 @@ class IndicationManager(singletonmixin.Singleton):
:param fltr_id: (``string``) ID of the filter to check.
"""
with self._access_lock:
- return ( class_name in self._filters
+ return (class_name in self._filters
and fltr_id in self._filters[class_name])
def _send_indications_loop(self, broker):
@@ -726,9 +728,31 @@ class IndicationManager(singletonmixin.Singleton):
"""
broker.AttachThread()
while True:
- indication = self._queue.get()
- cmpi_logging.logger.trace_info("Delivering indication %s" %
- (str(indication.path)))
- broker.DeliverIndication(self.namespace, indication)
+ command = self._queue.get()
+
+ if isinstance(command, pywbem.CIMInstance) :
+ indication = command
+ cmpi_logging.logger.trace_info("Delivering indication %s" %
+ (str(indication.path)))
+ broker.DeliverIndication(self.namespace, indication)
+
+ elif isinstance(command, int):
+ cmpi_logging.logger.trace_info("Received command %d", command)
+ if command == self.COMMAND_STOP:
+ if hasattr(self._queue, "task_done"):
+ self._queue.task_done()
+ break
+
if hasattr(self._queue, "task_done"):
self._queue.task_done()
+
+ cmpi_logging.logger.info("Stopped Indication thread.")
+
+ @cmpi_logging.trace_method
+ def shutdown(self):
+ """
+ Stop the thread. This method blocks until the thread is safely
+ destroyed.
+ """
+ self._queue.put(self.COMMAND_STOP)
+ self._indication_sender.join()
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py
index fc12d7e..9f977c9 100644
--- a/src/python/openlmi/common/JobManager.py
+++ b/src/python/openlmi/common/JobManager.py
@@ -650,6 +650,8 @@ class JobManager(object):
timeout.
"""
+ COMMAND_STOP = 1
+
IND_JOB_PERCENT_UPDATED = "PercentUpdated"
IND_JOB_SUCCEEDED = "Succeeded"
IND_JOB_FAILED = "Failed"
@@ -723,7 +725,7 @@ class JobManager(object):
# Start the worker thread (don't forget to register it at CIMOM)
self.worker = threading.Thread(target=self._worker_main)
- self.worker.daemon = True
+ self.worker.daemon = False
self.worker.start()
# Various classnames for job-related classes, with correct infixes.
@@ -855,30 +857,40 @@ class JobManager(object):
This is the main loop of the job queue. It just processes enqueued
jobs and never ends.
"""
+ cmpi_logging.logger.info("Started Job thread.")
while True:
- job = self.queue.get()
- # we need to protect from changes between checking state and
- # setting new state
- job.lock()
- if job.job_state == Job.STATE_QUEUED:
- # the job was not cancelled
- job.change_state(Job.STATE_RUNNING)
- job.unlock()
- cmpi_logging.logger.info("Starting job %s: '%s'" %
- (job.the_id, job.job_name))
-
- job.execute()
- if job.error:
- cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
- " %s" % (job.the_id, job.job_name, str(job.error)))
- else:
- cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ command = self.queue.get()
+ if isinstance(command, Job):
+ # we need to protect from changes between checking state and
+ # setting new state
+ job = command
+ job.lock()
+ if job.job_state == Job.STATE_QUEUED:
+ # the job was not cancelled
+ job.change_state(Job.STATE_RUNNING)
+ job.unlock()
+ cmpi_logging.logger.info("Starting job %s: '%s'" %
(job.the_id, job.job_name))
- else:
- # just skip suspended and terminated jobs
- job.unlock()
+
+ job.execute()
+ if job.error:
+ cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
+ " %s" % (job.the_id, job.job_name, str(job.error)))
+ else:
+ cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ (job.the_id, job.job_name))
+ else:
+ # just skip suspended and terminated jobs
+ job.unlock()
+
+ elif isinstance(command, int):
+ self.queue.task_done()
+ break
+
self.queue.task_done()
+ cmpi_logging.logger.info("Stopped Job thread.")
+
@cmpi_logging.trace_method
def get_next_id(self):
"""
@@ -906,6 +918,47 @@ class JobManager(object):
inst['InstanceID'] = job.get_instance_id()
return self.job_provider.get_instance(None, inst)
+ @cmpi_logging.trace_method
+ def shutdown(self, timeout=1):
+ """
+ Stop the thread. If a job is running, it may leave the job process
+ (mkfs, resize2fs, ...) and the worker thread (waiting for the process to
+ finish) still running.
+
+ JobManager still needs Indication Manager and TimeManager working at
+ this point!
+
+ :param timeout: Nr. of seconds to wait for the current job. Afterwards
+ the thread is abandoned, leaving the process still running.
+ """
+ # Empty the queue, we don't want the worker to proceed with any other
+ # queued job.
+ while not self.queue.empty():
+ queue.get(False)
+ queue.task_done()
+
+ self.queue.put(self.COMMAND_STOP)
+ self.worker.join(timeout)
+
+ # Cancel all running/suspended/queued jobs.
+ # This will send indications.
+ for job in self.jobs.itervalues():
+ if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED,
+ Job.STATE_RUNNING):
+ job.cancel()
+
+ if self.worker.isAlive():
+ # There is no way, how to stop the thread in Python, so abandon it.
+ self.worker.daemon = True
+ self.indication_manager = None
+ self.timer_manager = None
+
+ def can_shutdown(self):
+ """
+ Return True, if there is no running Job.
+ """
+ return self.queue.empty()
+
class LMI_ConcreteJob(CIMProvider2):
"""
diff --git a/src/python/openlmi/common/TimerManager.py b/src/python/openlmi/common/TimerManager.py
index 5eb39ae..42d48cd 100644
--- a/src/python/openlmi/common/TimerManager.py
+++ b/src/python/openlmi/common/TimerManager.py
@@ -94,6 +94,7 @@ class Timer(object):
A class representing a timer. A timer has a timeout and after the timeout,
given callback is called and the timer is deleted.
"""
+
@cmpi_logging.trace_method
def __init__(self, timer_manager, name, callback=None, *args, **kwargs):
"""
@@ -193,6 +194,10 @@ class TimerManager(singletonmixin.Singleton):
you provider initialization.
"""
+ # Commands to the timer thread
+ COMMAND_STOP = 1
+ COMMAND_RESCHEDULE = 2
+
@cmpi_logging.trace_method
def __init__(self, env=None):
"""
@@ -215,7 +220,7 @@ class TimerManager(singletonmixin.Singleton):
self._timer_thread = threading.Thread(
target=self._timer_loop, args=(new_broker,))
- self._timer_thread.daemon = True
+ self._timer_thread.daemon = False
self._timer_thread.start()
def create_timer(self, name, callback=None, *args, **kwargs):
@@ -239,17 +244,22 @@ class TimerManager(singletonmixin.Singleton):
"""
if broker:
broker.AttachThread()
+ cmpi_logging.logger.info("Started Timer thread.")
while True:
self._handle_expired()
timeout = self._find_timeout()
if timeout != 0:
# Wait for the timeout or any change in timers.
try:
- self._queue.get(timeout=timeout)
+ command = self._queue.get(timeout=timeout)
self._queue.task_done()
+ if command == self.COMMAND_STOP:
+ break # stop the thread
+ # process COMMAND_RESCHEDULE in next loop
except Queue.Empty:
# Timeout has happened, ignore the exception.
pass
+ cmpi_logging.logger.info("Stopped Timer thread.")
@cmpi_logging.trace_method
def _handle_expired(self):
@@ -320,7 +330,7 @@ class TimerManager(singletonmixin.Singleton):
with self._lock:
self._timers.append(timer)
# Wake up the timer manager thread.
- self._queue.put("")
+ self._queue.put(self.COMMAND_RESCHEDULE)
cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name))
@cmpi_logging.trace_method
@@ -336,7 +346,7 @@ class TimerManager(singletonmixin.Singleton):
except ValueError:
pass
# Wake up the timer manager thread.
- self._queue.put("")
+ self._queue.put(self.COMMAND_RESCHEDULE)
cmpi_logging.logger.trace_info("Timer: Timer %s removed"
% (timer._name))
@@ -350,6 +360,15 @@ class TimerManager(singletonmixin.Singleton):
"""
return self._clock.now()
+ @cmpi_logging.trace_method
+ def shutdown(self):
+ """
+ Stop the thread. This method blocks until the thread is safely
+ destroyed.
+ """
+ self._queue.put(self.COMMAND_STOP)
+ self._timer_thread.join()
+
if __name__ == "__main__":
cmpi_logging.logger = cmpi_logging.CMPILogger("")
import time
@@ -398,3 +417,5 @@ if __name__ == "__main__":
t15.start(1.5)
time.sleep(4)
+
+ mgr.stop_thread()