diff options
author | Jan Safranek <jsafrane@redhat.com> | 2013-05-16 09:23:31 +0200 |
---|---|---|
committer | Jan Safranek <jsafrane@redhat.com> | 2013-05-16 09:23:31 +0200 |
commit | 04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6 (patch) | |
tree | 620eed4fc8442e1264656ca047bb6ce07e6fb624 /src/python/openlmi/common/JobManager.py | |
parent | 0d2ff2523333f4b93154a46d70e1c8143ac789e7 (diff) | |
download | openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.gz openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.tar.xz openlmi-providers-04bf9226c8cc2b00dee4bb07d4f7d4a1c04428d6.zip |
Add method to stop the running thread to all Managers.
We should cleanly shut down all threads when the provider is unloaded,
just to be nice to CIMOMs.
Diffstat (limited to 'src/python/openlmi/common/JobManager.py')
-rw-r--r-- | src/python/openlmi/common/JobManager.py | 95 |
1 files changed, 74 insertions, 21 deletions
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py index fc12d7e..9f977c9 100644 --- a/src/python/openlmi/common/JobManager.py +++ b/src/python/openlmi/common/JobManager.py @@ -650,6 +650,8 @@ class JobManager(object): timeout. """ + COMMAND_STOP = 1 + IND_JOB_PERCENT_UPDATED = "PercentUpdated" IND_JOB_SUCCEEDED = "Succeeded" IND_JOB_FAILED = "Failed" @@ -723,7 +725,7 @@ class JobManager(object): # Start the worker thread (don't forget to register it at CIMOM) self.worker = threading.Thread(target=self._worker_main) - self.worker.daemon = True + self.worker.daemon = False self.worker.start() # Various classnames for job-related classes, with correct infixes. @@ -855,30 +857,40 @@ class JobManager(object): This is the main loop of the job queue. It just processes enqueued jobs and never ends. """ + cmpi_logging.logger.info("Started Job thread.") while True: - job = self.queue.get() - # we need to protect from changes between checking state and - # setting new state - job.lock() - if job.job_state == Job.STATE_QUEUED: - # the job was not cancelled - job.change_state(Job.STATE_RUNNING) - job.unlock() - cmpi_logging.logger.info("Starting job %s: '%s'" % - (job.the_id, job.job_name)) - - job.execute() - if job.error: - cmpi_logging.logger.warn("Job %s: '%s' finished with error:" - " %s" % (job.the_id, job.job_name, str(job.error))) - else: - cmpi_logging.logger.info("Job %s: '%s' finished OK" % + command = self.queue.get() + if isinstance(command, Job): + # we need to protect from changes between checking state and + # setting new state + job = command + job.lock() + if job.job_state == Job.STATE_QUEUED: + # the job was not cancelled + job.change_state(Job.STATE_RUNNING) + job.unlock() + cmpi_logging.logger.info("Starting job %s: '%s'" % (job.the_id, job.job_name)) - else: - # just skip suspended and terminated jobs - job.unlock() + + job.execute() + if job.error: + cmpi_logging.logger.warn("Job %s: '%s' finished with error:" + " %s" % (job.the_id, job.job_name, str(job.error))) + else: + cmpi_logging.logger.info("Job %s: '%s' finished OK" % + (job.the_id, job.job_name)) + else: + # just skip suspended and terminated jobs + job.unlock() + + elif isinstance(command, int): + self.queue.task_done() + break + self.queue.task_done() + cmpi_logging.logger.info("Stopped Job thread.") + @cmpi_logging.trace_method def get_next_id(self): """ @@ -906,6 +918,47 @@ class JobManager(object): inst['InstanceID'] = job.get_instance_id() return self.job_provider.get_instance(None, inst) + @cmpi_logging.trace_method + def shutdown(self, timeout=1): + """ + Stop the thread. If a job is running, it may leave the job process + (mkfs, resize2fs, ...) and the worker thread (waiting for the process to + finish) still running. + + JobManager still needs Indication Manager and TimeManager working at + this point! + + :param timeout: Nr. of seconds to wait for the current job. Afterwards + the thread is abandoned, leaving the process still running. + """ + # Empty the queue, we don't want the worker to proceed with any other + # queued job. + while not self.queue.empty(): + queue.get(False) + queue.task_done() + + self.queue.put(self.COMMAND_STOP) + self.worker.join(timeout) + + # Cancel all running/suspended/queued jobs. + # This will send indications. + for job in self.jobs.itervalues(): + if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED, + Job.STATE_RUNNING): + job.cancel() + + if self.worker.isAlive(): + # There is no way, how to stop the thread in Python, so abandon it. + self.worker.daemon = True + self.indication_manager = None + self.timer_manager = None + + def can_shutdown(self): + """ + Return True, if there is no running Job. + """ + return self.queue.empty() + class LMI_ConcreteJob(CIMProvider2): """ |