diff options
author | Michal Minar <miminar@redhat.com> | 2013-03-27 15:35:08 +0100 |
---|---|---|
committer | Michal Minar <miminar@redhat.com> | 2013-04-11 13:51:56 +0200 |
commit | 7c0c3eeb54e177499d164dc9df6b3e1368934a0d (patch) | |
tree | eb592abbc42a8cda563f375811f7935550a9c64e /src/software/openlmi | |
parent | d4e0eafa12c7a2b9a4748021ae7e520845dc5b8c (diff) | |
download | openlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.tar.gz openlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.tar.xz openlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.zip |
added indication manager to openlmi.common
IndicationManager python module is useful for many providers across
OpenLMI project. That's why it's landing in openlmi.common subpackage
from openlmi-storage. It supports checking for static filters
IndicationManager allows to check and install static filters at broker
at runtime
- but it should be avoided
filters should work also under sfcbd
Diffstat (limited to 'src/software/openlmi')
-rw-r--r-- | src/software/openlmi/software/yumdb/jobmanager.py | 233 |
1 files changed, 229 insertions, 4 deletions
diff --git a/src/software/openlmi/software/yumdb/jobmanager.py b/src/software/openlmi/software/yumdb/jobmanager.py index 872e81f..7283d05 100644 --- a/src/software/openlmi/software/yumdb/jobmanager.py +++ b/src/software/openlmi/software/yumdb/jobmanager.py @@ -19,9 +19,16 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -This is a module for JobManager which is a separate thread of -YumWorker process. It keeps a cache of asynchronous jobs and handles +This is a module for ``JobManager`` which is a separate thread of +``YumWorker`` process. It keeps a cache of asynchronous jobs and handles input and output queues. + +This module uses its own logging facilities because it runs in separeted +process not having access to broker logging features. + +Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should +be set to callable taking ``YumJob`` instance and returning +``LMI_SoftwareInstallationJob``. """ import heapq import inspect @@ -32,14 +39,86 @@ import threading import time import traceback +from openlmi.common import cmpi_logging +from openlmi.common.IndicationManager import IndicationManager from openlmi.software.yumdb import errors, jobs from openlmi.software.yumdb.util import trace_function +# This is a callable, which must be initialized before JobManager is used. +# It should be a pointer to function, which takes a job and returns +# corresponding CIM instance. It's used for sending indications. +JOB_TO_MODEL = lambda job: None +JOB_CLASSNAME = "LMI_SoftwareInstallationJob" + # Minimum time to keep asynchronous job in cache after completion. In seconds. MINIMUM_TIME_BEFORE_REMOVAL = 10 +# replacement for cmpi_logging.logger LOG = None +IND_JOB_PERCENT_UPDATED = "PercentUpdated" +IND_JOB_SUCCEEDED = "Succeeded" +IND_JOB_FAILED = "Failed" +IND_JOB_CHANGED = "Changed" +IND_JOB_CREATED = "Created" + +IND_FILTERS = { + IND_JOB_PERCENT_UPDATED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::PercentComplete <> " + "PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for a " + "Concrete Job.", + }, + IND_JOB_SUCCEEDED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance ISA LMI_SoftwareInstallationJob AND " + "SourceInstance.CIM_ConcreteJob::JobState = 17", + # symbolic constants not supported by sfcb + #"CIM_ConcreteJob.JobState#'Completed'" + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'OK'.", + }, + IND_JOB_FAILED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState = 10", + # symbolic constants not supported by sfcb + #"CIM_ConcreteJob.JobState#'Exception'", + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'Error'.", + }, + IND_JOB_CHANGED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState <> " + "PreviousInstance.CIM_ConcreteJob::JobState", + "Description": "Modification of Job State for a ConcreteJob.", + }, + IND_JOB_CREATED: { + "Query" : "SELECT * FROM CIM_InstCreation WHERE " + "SourceInstance ISA %(classname)s", + "Description": "Creation of a ConcreteJob.", + }, +} + +@cmpi_logging.trace_function +def register_filters(indication_manager): + """ + This function registers static indication filters at IndicationManager. + It should be called upon provider's initialization. + """ + to_register = {} + for fltr_id, fltr in IND_FILTERS.items(): + if not indication_manager.is_registered(JOB_CLASSNAME, fltr_id): + fltr["Query"] = fltr["Query"] % {"classname" : JOB_CLASSNAME } + to_register[fltr_id] = fltr + if to_register: + indication_manager.add_filters(JOB_CLASSNAME, + to_register, ensure_installed=False) + # ***************************************************************************** # Decorators # ***************************************************************************** @@ -84,6 +163,119 @@ def job_handler(job_from_target=True): else: return _simple_wrapper +class JobIndicationSender(object): + """ + Makes creation and sending of indications easy. It keeps a reference + to job, which can be *snapshotted* for making CIM instance out of it. + These instances are then used to send indications via IndicationManager. + + Typical usage:: + + sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2]) + ... # modify job + sender.snapshot() + sender.send() + + **Note** that number of kept CIM instances won't exceed 2. First one + is created upon instantiation and the second one be calling + ``snapshot()``. Any successive call to ``snapshot()`` will overwrite + the second instance. + """ + + def __init__(self, indication_manager, job, + indications=IND_JOB_CHANGED, new=None): + """ + :param job (``YumJob``) Is job instance, which will be immediately + snapshoted as old instance and later as a new one. + :param indications (``list``) Can either be a list of indication ids + or a single indication id. + :param new (``YumJob``) A job instance stored as new. + """ + if not isinstance(indication_manager, IndicationManager): + raise TypeError("indication_manager must be a subclass of" + " IndicationManager") + if not isinstance(job, jobs.YumJob): + raise TypeError("job must be an instance of YumJob") + if not new is None and not isinstance(new, jobs.YumJob): + raise TypeError("new must be an instance of YumJob") + self._indication_manager = indication_manager + self._job = job + self._old_instance = JOB_TO_MODEL(job) + if new is not None: + new = JOB_TO_MODEL(job) + self._new_instance = new + self._indications = set() + self.indication_ids = indications + + @property + def job(self): + """ + Return instance of ``YumJob``. + """ + return self._job + + @property + def indication_ids(self): + """ + Return set of indication filter IDs. + """ + return self._indications.copy() + + @indication_ids.setter + def indication_ids(self, indication_ids): + """ + Set the indication filter IDs. + + :param indication_ids (``list``) Can be even single id. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications = set(indication_ids) + + @trace_function + def add_indication_ids(self, indication_ids): + """ + Add filter IDs. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications.update(indication_ids) + + @trace_function + def snapshot(self): + """ + Make a second CIM instance, overwriting previous one (not the first). + """ + self._new_instance = JOB_TO_MODEL(self._job) + + @trace_function + def send(self, make_snapshot=False): + """ + Send all requested indications for given job. + """ + if not self._indications: + raise errors.IndicationError( + "can not send any indication without id") + if make_snapshot: + self.snapshot() + if ( IND_JOB_CHANGED in self._indications + and self._new_instance is None): + raise errors.IndicationError("no snapshot made for modified job") + for fltr_id in self._indications: + if fltr_id == IND_JOB_CREATED: + LOG.debug("sending instance creation indication for job %s", + self._job) + self._indication_manager.send_instcreation( + self._new_instance if self._new_instance is not None + else self._old_instance, + fltr_id) + else: + LOG.debug("sending instance modification indication for job %s" + " with ID: %s", self._job, fltr_id) + self._indication_manager.send_instmodification( + self._old_instance, self._new_instance, + fltr_id) + class JobManager(threading.Thread): """ Separate thread for managing queue of jobs requested by client. @@ -105,10 +297,11 @@ class JobManager(threading.Thread): ACTION_NAMES = ['remove'] - def __init__(self, queue_in, queue_out): + def __init__(self, queue_in, queue_out, indication_manager): threading.Thread.__init__(self, name="JobManager") self._queue_in = queue_in self._queue_out = queue_out + self._indication_manager = indication_manager self._terminate = False # (time, jobid, action) @@ -175,7 +368,9 @@ class JobManager(threading.Thread): LOG.debug('job %s enqued for YumWorker to handle', job) heapq.heappush(self._job_queue, job) if getattr(job, 'async', False) is True: + ind = self._prepare_indication_for(job, IND_JOB_CREATED) self._async_jobs[job.jobid] = job + ind.send() self._job_enqueued.notify() @trace_function @@ -214,6 +409,14 @@ class JobManager(threading.Thread): msg = "unsupported action: %s" % action raise ValueError(msg) + @trace_function + def _prepare_indication_for(self, job, *args, **kwargs): + """ + Return instance of ``JobIndicationSender``. + """ + return JobIndicationSender(self._indication_manager, job, + *args, **kwargs) + # ************************************************************************* # Job handlers # ************************************************************************* @@ -245,9 +448,11 @@ class JobManager(threading.Thread): if not isinstance(new_priority, (int, long)): raise TypeError('priority must be an integer') if job.priority != new_priority: + ind = self._prepare_indication_for(job) job.update(priority=new_priority) if job in self._job_queue: heapq.heapify(self._job_queue) + ind.send(True) return job @job_handler() @@ -266,6 +471,7 @@ class JobManager(threading.Thread): del self._calendar[i] heapq.heapify(self._calendar) break + ind = self._prepare_indication_for(job) if delete_on_completion: schedule_at = time_before_removal if job.finished: @@ -273,6 +479,7 @@ class JobManager(threading.Thread): self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE) job.delete_on_completion = delete_on_completion job.time_before_removal = time_before_removal + ind.send(True) return job @job_handler() @@ -280,7 +487,9 @@ class JobManager(threading.Thread): """ Updates any job metadata. """ + ind = self._prepare_indication_for(job) job.update(**data) + ind.send(True) return job @job_handler() @@ -311,7 +520,9 @@ class JobManager(threading.Thread): raise errors.InvalidJobState('job "%s" already finished' % job) self._job_queue.remove(job) heapq.heapify(self._job_queue) + ind = self._prepare_indication_for(job) job.finish(result=job.RESULT_TERMINATED) + ind.send(True) LOG.info('terminated not started job "%s"', job) return job @@ -344,6 +555,9 @@ class JobManager(threading.Thread): if job.state != job.RUNNING: raise errors.InvalidJobState( 'can not finish not started job "%s"' % job) + if getattr(job, 'async', False): + ind = self._prepare_indication_for(job, + (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED)) job.finish(result, result_data) if getattr(job, 'async', False): if job.delete_on_completion: @@ -351,6 +565,11 @@ class JobManager(threading.Thread): , MINIMUM_TIME_BEFORE_REMOVAL) self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE) + if result == job.RESULT_SUCCESS: + ind.add_indication_ids(IND_JOB_SUCCEEDED) + elif result == job.RESULT_ERROR: + ind.add_indication_ids(IND_JOB_FAILED) + ind.send(True) else: LOG.debug("sending reply for %s: (%s, %s)", job, job.ResultNames[job.result], job.result_data) @@ -377,7 +596,13 @@ class JobManager(threading.Thread): raise Queue.Empty job = heapq.heappop(self._job_queue) if job is not None: - job.start() + if getattr(job, "async", False): + ind = self._prepare_indication_for(job, + (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED)) + job.start() + ind.send(True) + else: + job.start() return job def run(self): |