diff options
Diffstat (limited to 'src/python/openlmi/common/JobManager.py')
-rw-r--r-- | src/python/openlmi/common/JobManager.py | 95 |
1 files changed, 74 insertions, 21 deletions
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py index fc12d7e..9f977c9 100644 --- a/src/python/openlmi/common/JobManager.py +++ b/src/python/openlmi/common/JobManager.py @@ -650,6 +650,8 @@ class JobManager(object): timeout. """ + COMMAND_STOP = 1 + IND_JOB_PERCENT_UPDATED = "PercentUpdated" IND_JOB_SUCCEEDED = "Succeeded" IND_JOB_FAILED = "Failed" @@ -723,7 +725,7 @@ class JobManager(object): # Start the worker thread (don't forget to register it at CIMOM) self.worker = threading.Thread(target=self._worker_main) - self.worker.daemon = True + self.worker.daemon = False self.worker.start() # Various classnames for job-related classes, with correct infixes. @@ -855,30 +857,40 @@ class JobManager(object): This is the main loop of the job queue. It just processes enqueued jobs and never ends. """ + cmpi_logging.logger.info("Started Job thread.") while True: - job = self.queue.get() - # we need to protect from changes between checking state and - # setting new state - job.lock() - if job.job_state == Job.STATE_QUEUED: - # the job was not cancelled - job.change_state(Job.STATE_RUNNING) - job.unlock() - cmpi_logging.logger.info("Starting job %s: '%s'" % - (job.the_id, job.job_name)) - - job.execute() - if job.error: - cmpi_logging.logger.warn("Job %s: '%s' finished with error:" - " %s" % (job.the_id, job.job_name, str(job.error))) - else: - cmpi_logging.logger.info("Job %s: '%s' finished OK" % + command = self.queue.get() + if isinstance(command, Job): + # we need to protect from changes between checking state and + # setting new state + job = command + job.lock() + if job.job_state == Job.STATE_QUEUED: + # the job was not cancelled + job.change_state(Job.STATE_RUNNING) + job.unlock() + cmpi_logging.logger.info("Starting job %s: '%s'" % (job.the_id, job.job_name)) - else: - # just skip suspended and terminated jobs - job.unlock() + + job.execute() + if job.error: + cmpi_logging.logger.warn("Job %s: '%s' finished with error:" + " %s" % (job.the_id, job.job_name, str(job.error))) + else: + cmpi_logging.logger.info("Job %s: '%s' finished OK" % + (job.the_id, job.job_name)) + else: + # just skip suspended and terminated jobs + job.unlock() + + elif isinstance(command, int): + self.queue.task_done() + break + self.queue.task_done() + cmpi_logging.logger.info("Stopped Job thread.") + @cmpi_logging.trace_method def get_next_id(self): """ @@ -906,6 +918,47 @@ class JobManager(object): inst['InstanceID'] = job.get_instance_id() return self.job_provider.get_instance(None, inst) + @cmpi_logging.trace_method + def shutdown(self, timeout=1): + """ + Stop the thread. If a job is running, it may leave the job process + (mkfs, resize2fs, ...) and the worker thread (waiting for the process to + finish) still running. + + JobManager still needs Indication Manager and TimeManager working at + this point! + + :param timeout: Nr. of seconds to wait for the current job. Afterwards + the thread is abandoned, leaving the process still running. + """ + # Empty the queue, we don't want the worker to proceed with any other + # queued job. + while not self.queue.empty(): + queue.get(False) + queue.task_done() + + self.queue.put(self.COMMAND_STOP) + self.worker.join(timeout) + + # Cancel all running/suspended/queued jobs. + # This will send indications. + for job in self.jobs.itervalues(): + if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED, + Job.STATE_RUNNING): + job.cancel() + + if self.worker.isAlive(): + # There is no way, how to stop the thread in Python, so abandon it. + self.worker.daemon = True + self.indication_manager = None + self.timer_manager = None + + def can_shutdown(self): + """ + Return True, if there is no running Job. + """ + return self.queue.empty() + class LMI_ConcreteJob(CIMProvider2): """ |