summaryrefslogtreecommitdiffstats
path: root/src/python/openlmi/common/JobManager.py
diff options
context:
space:
mode:
authorJan Safranek <jsafrane@redhat.com>2013-05-16 09:23:31 +0200
committerJan Safranek <jsafrane@redhat.com>2013-05-16 09:23:31 +0200
commit04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6 (patch)
tree620eed4fc8442e1264656ca047bb6ce07e6fb624 /src/python/openlmi/common/JobManager.py
parent0d2ff2523333f4b93154a46d70e1c8143ac789e7 (diff)
downloadopenlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.gz
openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.xz
openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.zip
Add method to stop the running thread to all Managers.
We should cleanly shut down all threads when the provider is unloaded, just to be nice to CIMOMs.
Diffstat (limited to 'src/python/openlmi/common/JobManager.py')
-rw-r--r--src/python/openlmi/common/JobManager.py95
1 files changed, 74 insertions, 21 deletions
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py
index fc12d7e..9f977c9 100644
--- a/src/python/openlmi/common/JobManager.py
+++ b/src/python/openlmi/common/JobManager.py
@@ -650,6 +650,8 @@ class JobManager(object):
timeout.
"""
+ COMMAND_STOP = 1
+
IND_JOB_PERCENT_UPDATED = "PercentUpdated"
IND_JOB_SUCCEEDED = "Succeeded"
IND_JOB_FAILED = "Failed"
@@ -723,7 +725,7 @@ class JobManager(object):
# Start the worker thread (don't forget to register it at CIMOM)
self.worker = threading.Thread(target=self._worker_main)
- self.worker.daemon = True
+ self.worker.daemon = False
self.worker.start()
# Various classnames for job-related classes, with correct infixes.
@@ -855,30 +857,40 @@ class JobManager(object):
This is the main loop of the job queue. It just processes enqueued
jobs and never ends.
"""
+ cmpi_logging.logger.info("Started Job thread.")
while True:
- job = self.queue.get()
- # we need to protect from changes between checking state and
- # setting new state
- job.lock()
- if job.job_state == Job.STATE_QUEUED:
- # the job was not cancelled
- job.change_state(Job.STATE_RUNNING)
- job.unlock()
- cmpi_logging.logger.info("Starting job %s: '%s'" %
- (job.the_id, job.job_name))
-
- job.execute()
- if job.error:
- cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
- " %s" % (job.the_id, job.job_name, str(job.error)))
- else:
- cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ command = self.queue.get()
+ if isinstance(command, Job):
+ # we need to protect from changes between checking state and
+ # setting new state
+ job = command
+ job.lock()
+ if job.job_state == Job.STATE_QUEUED:
+ # the job was not cancelled
+ job.change_state(Job.STATE_RUNNING)
+ job.unlock()
+ cmpi_logging.logger.info("Starting job %s: '%s'" %
(job.the_id, job.job_name))
- else:
- # just skip suspended and terminated jobs
- job.unlock()
+
+ job.execute()
+ if job.error:
+ cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
+ " %s" % (job.the_id, job.job_name, str(job.error)))
+ else:
+ cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ (job.the_id, job.job_name))
+ else:
+ # just skip suspended and terminated jobs
+ job.unlock()
+
+ elif isinstance(command, int):
+ self.queue.task_done()
+ break
+
self.queue.task_done()
+ cmpi_logging.logger.info("Stopped Job thread.")
+
@cmpi_logging.trace_method
def get_next_id(self):
"""
@@ -906,6 +918,47 @@ class JobManager(object):
inst['InstanceID'] = job.get_instance_id()
return self.job_provider.get_instance(None, inst)
+ @cmpi_logging.trace_method
+ def shutdown(self, timeout=1):
+ """
+ Stop the thread. If a job is running, it may leave the job process
+ (mkfs, resize2fs, ...) and the worker thread (waiting for the process to
+ finish) still running.
+
+ JobManager still needs Indication Manager and TimeManager working at
+ this point!
+
+ :param timeout: Nr. of seconds to wait for the current job. Afterwards
+ the thread is abandoned, leaving the process still running.
+ """
+ # Empty the queue, we don't want the worker to proceed with any other
+ # queued job.
+ while not self.queue.empty():
+ queue.get(False)
+ queue.task_done()
+
+ self.queue.put(self.COMMAND_STOP)
+ self.worker.join(timeout)
+
+ # Cancel all running/suspended/queued jobs.
+ # This will send indications.
+ for job in self.jobs.itervalues():
+ if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED,
+ Job.STATE_RUNNING):
+ job.cancel()
+
+ if self.worker.isAlive():
+ # There is no way, how to stop the thread in Python, so abandon it.
+ self.worker.daemon = True
+ self.indication_manager = None
+ self.timer_manager = None
+
+ def can_shutdown(self):
+ """
+ Return True, if there is no running Job.
+ """
+ return self.queue.empty()
+
class LMI_ConcreteJob(CIMProvider2):
"""