summaryrefslogtreecommitdiffstats
path: root/src/python/openlmi/common/JobManager.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/openlmi/common/JobManager.py')
-rw-r--r--src/python/openlmi/common/JobManager.py95
1 files changed, 74 insertions, 21 deletions
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py
index fc12d7e..9f977c9 100644
--- a/src/python/openlmi/common/JobManager.py
+++ b/src/python/openlmi/common/JobManager.py
@@ -650,6 +650,8 @@ class JobManager(object):
timeout.
"""
+ COMMAND_STOP = 1
+
IND_JOB_PERCENT_UPDATED = "PercentUpdated"
IND_JOB_SUCCEEDED = "Succeeded"
IND_JOB_FAILED = "Failed"
@@ -723,7 +725,7 @@ class JobManager(object):
# Start the worker thread (don't forget to register it at CIMOM)
self.worker = threading.Thread(target=self._worker_main)
- self.worker.daemon = True
+ self.worker.daemon = False
self.worker.start()
# Various classnames for job-related classes, with correct infixes.
@@ -855,30 +857,40 @@ class JobManager(object):
This is the main loop of the job queue. It just processes enqueued
jobs and never ends.
"""
+ cmpi_logging.logger.info("Started Job thread.")
while True:
- job = self.queue.get()
- # we need to protect from changes between checking state and
- # setting new state
- job.lock()
- if job.job_state == Job.STATE_QUEUED:
- # the job was not cancelled
- job.change_state(Job.STATE_RUNNING)
- job.unlock()
- cmpi_logging.logger.info("Starting job %s: '%s'" %
- (job.the_id, job.job_name))
-
- job.execute()
- if job.error:
- cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
- " %s" % (job.the_id, job.job_name, str(job.error)))
- else:
- cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ command = self.queue.get()
+ if isinstance(command, Job):
+ # we need to protect from changes between checking state and
+ # setting new state
+ job = command
+ job.lock()
+ if job.job_state == Job.STATE_QUEUED:
+ # the job was not cancelled
+ job.change_state(Job.STATE_RUNNING)
+ job.unlock()
+ cmpi_logging.logger.info("Starting job %s: '%s'" %
(job.the_id, job.job_name))
- else:
- # just skip suspended and terminated jobs
- job.unlock()
+
+ job.execute()
+ if job.error:
+ cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
+ " %s" % (job.the_id, job.job_name, str(job.error)))
+ else:
+ cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ (job.the_id, job.job_name))
+ else:
+ # just skip suspended and terminated jobs
+ job.unlock()
+
+ elif isinstance(command, int):
+ self.queue.task_done()
+ break
+
self.queue.task_done()
+ cmpi_logging.logger.info("Stopped Job thread.")
+
@cmpi_logging.trace_method
def get_next_id(self):
"""
@@ -906,6 +918,47 @@ class JobManager(object):
inst['InstanceID'] = job.get_instance_id()
return self.job_provider.get_instance(None, inst)
+ @cmpi_logging.trace_method
+ def shutdown(self, timeout=1):
+ """
+ Stop the thread. If a job is running, it may leave the job process
+ (mkfs, resize2fs, ...) and the worker thread (waiting for the process to
+ finish) still running.
+
+ JobManager still needs Indication Manager and TimeManager working at
+ this point!
+
+ :param timeout: Nr. of seconds to wait for the current job. Afterwards
+ the thread is abandoned, leaving the process still running.
+ """
+ # Empty the queue, we don't want the worker to proceed with any other
+ # queued job.
+ while not self.queue.empty():
+ queue.get(False)
+ queue.task_done()
+
+ self.queue.put(self.COMMAND_STOP)
+ self.worker.join(timeout)
+
+ # Cancel all running/suspended/queued jobs.
+ # This will send indications.
+ for job in self.jobs.itervalues():
+ if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED,
+ Job.STATE_RUNNING):
+ job.cancel()
+
+ if self.worker.isAlive():
+ # There is no way, how to stop the thread in Python, so abandon it.
+ self.worker.daemon = True
+ self.indication_manager = None
+ self.timer_manager = None
+
+ def can_shutdown(self):
+ """
+ Return True, if there is no running Job.
+ """
+ return self.queue.empty()
+
class LMI_ConcreteJob(CIMProvider2):
"""