summaryrefslogtreecommitdiffstats
path: root/src/python/openlmi
diff options
context:
space:
mode:
authorJan Safranek <jsafrane@redhat.com>2013-05-16 09:23:31 +0200
committerJan Safranek <jsafrane@redhat.com>2013-05-16 09:23:31 +0200
commit04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6 (patch)
tree620eed4fc8442e1264656ca047bb6ce07e6fb624 /src/python/openlmi
parent0d2ff2523333f4b93154a46d70e1c8143ac789e7 (diff)
downloadopenlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.gz
openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.xz
openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.zip
Add method to stop the running thread to all Managers.
We should cleanly shut down all threads when the provider is unloaded, just to be nice to CIMOMs.
Diffstat (limited to 'src/python/openlmi')
-rw-r--r--src/python/openlmi/common/IndicationManager.py36
-rw-r--r--src/python/openlmi/common/JobManager.py95
-rw-r--r--src/python/openlmi/common/TimerManager.py29
3 files changed, 129 insertions, 31 deletions
diff --git a/src/python/openlmi/common/IndicationManager.py b/src/python/openlmi/common/IndicationManager.py
index a17703f..f9a1655 100644
--- a/src/python/openlmi/common/IndicationManager.py
+++ b/src/python/openlmi/common/IndicationManager.py
@@ -253,6 +253,8 @@ class IndicationManager(singletonmixin.Singleton):
"""
SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity
+ COMMAND_STOP = 1 # Command to the IndicationManager thread to stop.
+
@cmpi_logging.trace_method
def __init__(self, env, nameprefix, namespace, ns_interop=None,
queue=None):
@@ -295,7 +297,7 @@ class IndicationManager(singletonmixin.Singleton):
new_broker = ch.PrepareAttachThread()
self._indication_sender = threading.Thread(
target=self._send_indications_loop, args=(new_broker,))
- self._indication_sender.daemon = True
+ self._indication_sender.daemon = False
self._indication_sender.start()
@property
@@ -714,7 +716,7 @@ class IndicationManager(singletonmixin.Singleton):
:param fltr_id: (``string``) ID of the filter to check.
"""
with self._access_lock:
- return ( class_name in self._filters
+ return (class_name in self._filters
and fltr_id in self._filters[class_name])
def _send_indications_loop(self, broker):
@@ -726,9 +728,31 @@ class IndicationManager(singletonmixin.Singleton):
"""
broker.AttachThread()
while True:
- indication = self._queue.get()
- cmpi_logging.logger.trace_info("Delivering indication %s" %
- (str(indication.path)))
- broker.DeliverIndication(self.namespace, indication)
+ command = self._queue.get()
+
+ if isinstance(command, pywbem.CIMInstance) :
+ indication = command
+ cmpi_logging.logger.trace_info("Delivering indication %s" %
+ (str(indication.path)))
+ broker.DeliverIndication(self.namespace, indication)
+
+ elif isinstance(command, int):
+ cmpi_logging.logger.trace_info("Received command %d", command)
+ if command == self.COMMAND_STOP:
+ if hasattr(self._queue, "task_done"):
+ self._queue.task_done()
+ break
+
if hasattr(self._queue, "task_done"):
self._queue.task_done()
+
+ cmpi_logging.logger.info("Stopped Indication thread.")
+
+ @cmpi_logging.trace_method
+ def shutdown(self):
+ """
+ Stop the thread. This method blocks until the thread is safely
+ destroyed.
+ """
+ self._queue.put(self.COMMAND_STOP)
+ self._indication_sender.join()
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py
index fc12d7e..9f977c9 100644
--- a/src/python/openlmi/common/JobManager.py
+++ b/src/python/openlmi/common/JobManager.py
@@ -650,6 +650,8 @@ class JobManager(object):
timeout.
"""
+ COMMAND_STOP = 1
+
IND_JOB_PERCENT_UPDATED = "PercentUpdated"
IND_JOB_SUCCEEDED = "Succeeded"
IND_JOB_FAILED = "Failed"
@@ -723,7 +725,7 @@ class JobManager(object):
# Start the worker thread (don't forget to register it at CIMOM)
self.worker = threading.Thread(target=self._worker_main)
- self.worker.daemon = True
+ self.worker.daemon = False
self.worker.start()
# Various classnames for job-related classes, with correct infixes.
@@ -855,30 +857,40 @@ class JobManager(object):
This is the main loop of the job queue. It just processes enqueued
jobs and never ends.
"""
+ cmpi_logging.logger.info("Started Job thread.")
while True:
- job = self.queue.get()
- # we need to protect from changes between checking state and
- # setting new state
- job.lock()
- if job.job_state == Job.STATE_QUEUED:
- # the job was not cancelled
- job.change_state(Job.STATE_RUNNING)
- job.unlock()
- cmpi_logging.logger.info("Starting job %s: '%s'" %
- (job.the_id, job.job_name))
-
- job.execute()
- if job.error:
- cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
- " %s" % (job.the_id, job.job_name, str(job.error)))
- else:
- cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ command = self.queue.get()
+ if isinstance(command, Job):
+ # we need to protect from changes between checking state and
+ # setting new state
+ job = command
+ job.lock()
+ if job.job_state == Job.STATE_QUEUED:
+ # the job was not cancelled
+ job.change_state(Job.STATE_RUNNING)
+ job.unlock()
+ cmpi_logging.logger.info("Starting job %s: '%s'" %
(job.the_id, job.job_name))
- else:
- # just skip suspended and terminated jobs
- job.unlock()
+
+ job.execute()
+ if job.error:
+ cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
+ " %s" % (job.the_id, job.job_name, str(job.error)))
+ else:
+ cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ (job.the_id, job.job_name))
+ else:
+ # just skip suspended and terminated jobs
+ job.unlock()
+
+ elif isinstance(command, int):
+ self.queue.task_done()
+ break
+
self.queue.task_done()
+ cmpi_logging.logger.info("Stopped Job thread.")
+
@cmpi_logging.trace_method
def get_next_id(self):
"""
@@ -906,6 +918,47 @@ class JobManager(object):
inst['InstanceID'] = job.get_instance_id()
return self.job_provider.get_instance(None, inst)
+ @cmpi_logging.trace_method
+ def shutdown(self, timeout=1):
+ """
+ Stop the thread. If a job is running, it may leave the job process
+ (mkfs, resize2fs, ...) and the worker thread (waiting for the process to
+ finish) still running.
+
+ JobManager still needs Indication Manager and TimeManager working at
+ this point!
+
+ :param timeout: Nr. of seconds to wait for the current job. Afterwards
+ the thread is abandoned, leaving the process still running.
+ """
+ # Empty the queue, we don't want the worker to proceed with any other
+ # queued job.
+ while not self.queue.empty():
+ queue.get(False)
+ queue.task_done()
+
+ self.queue.put(self.COMMAND_STOP)
+ self.worker.join(timeout)
+
+ # Cancel all running/suspended/queued jobs.
+ # This will send indications.
+ for job in self.jobs.itervalues():
+ if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED,
+ Job.STATE_RUNNING):
+ job.cancel()
+
+ if self.worker.isAlive():
+ # There is no way, how to stop the thread in Python, so abandon it.
+ self.worker.daemon = True
+ self.indication_manager = None
+ self.timer_manager = None
+
+ def can_shutdown(self):
+ """
+ Return True, if there is no running Job.
+ """
+ return self.queue.empty()
+
class LMI_ConcreteJob(CIMProvider2):
"""
diff --git a/src/python/openlmi/common/TimerManager.py b/src/python/openlmi/common/TimerManager.py
index 5eb39ae..42d48cd 100644
--- a/src/python/openlmi/common/TimerManager.py
+++ b/src/python/openlmi/common/TimerManager.py
@@ -94,6 +94,7 @@ class Timer(object):
A class representing a timer. A timer has a timeout and after the timeout,
given callback is called and the timer is deleted.
"""
+
@cmpi_logging.trace_method
def __init__(self, timer_manager, name, callback=None, *args, **kwargs):
"""
@@ -193,6 +194,10 @@ class TimerManager(singletonmixin.Singleton):
you provider initialization.
"""
+ # Commands to the timer thread
+ COMMAND_STOP = 1
+ COMMAND_RESCHEDULE = 2
+
@cmpi_logging.trace_method
def __init__(self, env=None):
"""
@@ -215,7 +220,7 @@ class TimerManager(singletonmixin.Singleton):
self._timer_thread = threading.Thread(
target=self._timer_loop, args=(new_broker,))
- self._timer_thread.daemon = True
+ self._timer_thread.daemon = False
self._timer_thread.start()
def create_timer(self, name, callback=None, *args, **kwargs):
@@ -239,17 +244,22 @@ class TimerManager(singletonmixin.Singleton):
"""
if broker:
broker.AttachThread()
+ cmpi_logging.logger.info("Started Timer thread.")
while True:
self._handle_expired()
timeout = self._find_timeout()
if timeout != 0:
# Wait for the timeout or any change in timers.
try:
- self._queue.get(timeout=timeout)
+ command = self._queue.get(timeout=timeout)
self._queue.task_done()
+ if command == self.COMMAND_STOP:
+ break # stop the thread
+ # process COMMAND_RESCHEDULE in next loop
except Queue.Empty:
# Timeout has happened, ignore the exception.
pass
+ cmpi_logging.logger.info("Stopped Timer thread.")
@cmpi_logging.trace_method
def _handle_expired(self):
@@ -320,7 +330,7 @@ class TimerManager(singletonmixin.Singleton):
with self._lock:
self._timers.append(timer)
# Wake up the timer manager thread.
- self._queue.put("")
+ self._queue.put(self.COMMAND_RESCHEDULE)
cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name))
@cmpi_logging.trace_method
@@ -336,7 +346,7 @@ class TimerManager(singletonmixin.Singleton):
except ValueError:
pass
# Wake up the timer manager thread.
- self._queue.put("")
+ self._queue.put(self.COMMAND_RESCHEDULE)
cmpi_logging.logger.trace_info("Timer: Timer %s removed"
% (timer._name))
@@ -350,6 +360,15 @@ class TimerManager(singletonmixin.Singleton):
"""
return self._clock.now()
+ @cmpi_logging.trace_method
+ def shutdown(self):
+ """
+ Stop the thread. This method blocks until the thread is safely
+ destroyed.
+ """
+ self._queue.put(self.COMMAND_STOP)
+ self._timer_thread.join()
+
if __name__ == "__main__":
cmpi_logging.logger = cmpi_logging.CMPILogger("")
import time
@@ -398,3 +417,5 @@ if __name__ == "__main__":
t15.start(1.5)
time.sleep(4)
+
+ mgr.stop_thread()