summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-03-27 15:35:08 +0100
committerMichal Minar <miminar@redhat.com>2013-04-11 13:51:56 +0200
commit7c0c3eeb54e177499d164dc9df6b3e1368934a0d (patch)
treeeb592abbc42a8cda563f375811f7935550a9c64e /src/software/openlmi
parentd4e0eafa12c7a2b9a4748021ae7e520845dc5b8c (diff)
downloadopenlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.tar.gz
openlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.tar.xz
openlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.zip
added indication manager to openlmi.common
IndicationManager python module is useful for many providers across OpenLMI project. That's why it's landing in openlmi.common subpackage from openlmi-storage. It supports checking for static filters IndicationManager allows to check and install static filters at broker at runtime - but it should be avoided filters should work also under sfcbd
Diffstat (limited to 'src/software/openlmi')
-rw-r--r--src/software/openlmi/software/yumdb/jobmanager.py233
1 files changed, 229 insertions, 4 deletions
diff --git a/src/software/openlmi/software/yumdb/jobmanager.py b/src/software/openlmi/software/yumdb/jobmanager.py
index 872e81f..7283d05 100644
--- a/src/software/openlmi/software/yumdb/jobmanager.py
+++ b/src/software/openlmi/software/yumdb/jobmanager.py
@@ -19,9 +19,16 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-This is a module for JobManager which is a separate thread of
-YumWorker process. It keeps a cache of asynchronous jobs and handles
+This is a module for ``JobManager`` which is a separate thread of
+``YumWorker`` process. It keeps a cache of asynchronous jobs and handles
input and output queues.
+
+This module uses its own logging facilities because it runs in separeted
+process not having access to broker logging features.
+
+Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should
+be set to callable taking ``YumJob`` instance and returning
+``LMI_SoftwareInstallationJob``.
"""
import heapq
import inspect
@@ -32,14 +39,86 @@ import threading
import time
import traceback
+from openlmi.common import cmpi_logging
+from openlmi.common.IndicationManager import IndicationManager
from openlmi.software.yumdb import errors, jobs
from openlmi.software.yumdb.util import trace_function
+# This is a callable, which must be initialized before JobManager is used.
+# It should be a pointer to function, which takes a job and returns
+# corresponding CIM instance. It's used for sending indications.
+JOB_TO_MODEL = lambda job: None
+JOB_CLASSNAME = "LMI_SoftwareInstallationJob"
+
# Minimum time to keep asynchronous job in cache after completion. In seconds.
MINIMUM_TIME_BEFORE_REMOVAL = 10
+# replacement for cmpi_logging.logger
LOG = None
+IND_JOB_PERCENT_UPDATED = "PercentUpdated"
+IND_JOB_SUCCEEDED = "Succeeded"
+IND_JOB_FAILED = "Failed"
+IND_JOB_CHANGED = "Changed"
+IND_JOB_CREATED = "Created"
+
+IND_FILTERS = {
+ IND_JOB_PERCENT_UPDATED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::PercentComplete <> "
+ "PreviousInstance.CIM_ConcreteJob::PercentComplete",
+ "Description" : "Modification of Percentage Complete for a "
+ "Concrete Job.",
+ },
+ IND_JOB_SUCCEEDED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance ISA LMI_SoftwareInstallationJob AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 17",
+ # symbolic constants not supported by sfcb
+ #"CIM_ConcreteJob.JobState#'Completed'"
+ "Description": "Modification of Operational Status for a "
+ "Concrete Job to 'Complete' and 'OK'.",
+ },
+ IND_JOB_FAILED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 10",
+ # symbolic constants not supported by sfcb
+ #"CIM_ConcreteJob.JobState#'Exception'",
+ "Description": "Modification of Operational Status for a "
+ "Concrete Job to 'Complete' and 'Error'.",
+ },
+ IND_JOB_CHANGED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState <> "
+ "PreviousInstance.CIM_ConcreteJob::JobState",
+ "Description": "Modification of Job State for a ConcreteJob.",
+ },
+ IND_JOB_CREATED: {
+ "Query" : "SELECT * FROM CIM_InstCreation WHERE "
+ "SourceInstance ISA %(classname)s",
+ "Description": "Creation of a ConcreteJob.",
+ },
+}
+
+@cmpi_logging.trace_function
+def register_filters(indication_manager):
+ """
+ This function registers static indication filters at IndicationManager.
+ It should be called upon provider's initialization.
+ """
+ to_register = {}
+ for fltr_id, fltr in IND_FILTERS.items():
+ if not indication_manager.is_registered(JOB_CLASSNAME, fltr_id):
+ fltr["Query"] = fltr["Query"] % {"classname" : JOB_CLASSNAME }
+ to_register[fltr_id] = fltr
+ if to_register:
+ indication_manager.add_filters(JOB_CLASSNAME,
+ to_register, ensure_installed=False)
+
# *****************************************************************************
# Decorators
# *****************************************************************************
@@ -84,6 +163,119 @@ def job_handler(job_from_target=True):
else:
return _simple_wrapper
+class JobIndicationSender(object):
+ """
+ Makes creation and sending of indications easy. It keeps a reference
+ to job, which can be *snapshotted* for making CIM instance out of it.
+ These instances are then used to send indications via IndicationManager.
+
+ Typical usage::
+
+ sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2])
+ ... # modify job
+ sender.snapshot()
+ sender.send()
+
+ **Note** that number of kept CIM instances won't exceed 2. First one
+ is created upon instantiation and the second one be calling
+ ``snapshot()``. Any successive call to ``snapshot()`` will overwrite
+ the second instance.
+ """
+
+ def __init__(self, indication_manager, job,
+ indications=IND_JOB_CHANGED, new=None):
+ """
+ :param job (``YumJob``) Is job instance, which will be immediately
+ snapshoted as old instance and later as a new one.
+ :param indications (``list``) Can either be a list of indication ids
+ or a single indication id.
+ :param new (``YumJob``) A job instance stored as new.
+ """
+ if not isinstance(indication_manager, IndicationManager):
+ raise TypeError("indication_manager must be a subclass of"
+ " IndicationManager")
+ if not isinstance(job, jobs.YumJob):
+ raise TypeError("job must be an instance of YumJob")
+ if not new is None and not isinstance(new, jobs.YumJob):
+ raise TypeError("new must be an instance of YumJob")
+ self._indication_manager = indication_manager
+ self._job = job
+ self._old_instance = JOB_TO_MODEL(job)
+ if new is not None:
+ new = JOB_TO_MODEL(job)
+ self._new_instance = new
+ self._indications = set()
+ self.indication_ids = indications
+
+ @property
+ def job(self):
+ """
+ Return instance of ``YumJob``.
+ """
+ return self._job
+
+ @property
+ def indication_ids(self):
+ """
+ Return set of indication filter IDs.
+ """
+ return self._indications.copy()
+
+ @indication_ids.setter
+ def indication_ids(self, indication_ids):
+ """
+ Set the indication filter IDs.
+
+ :param indication_ids (``list``) Can be even single id.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications = set(indication_ids)
+
+ @trace_function
+ def add_indication_ids(self, indication_ids):
+ """
+ Add filter IDs.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications.update(indication_ids)
+
+ @trace_function
+ def snapshot(self):
+ """
+ Make a second CIM instance, overwriting previous one (not the first).
+ """
+ self._new_instance = JOB_TO_MODEL(self._job)
+
+ @trace_function
+ def send(self, make_snapshot=False):
+ """
+ Send all requested indications for given job.
+ """
+ if not self._indications:
+ raise errors.IndicationError(
+ "can not send any indication without id")
+ if make_snapshot:
+ self.snapshot()
+ if ( IND_JOB_CHANGED in self._indications
+ and self._new_instance is None):
+ raise errors.IndicationError("no snapshot made for modified job")
+ for fltr_id in self._indications:
+ if fltr_id == IND_JOB_CREATED:
+ LOG.debug("sending instance creation indication for job %s",
+ self._job)
+ self._indication_manager.send_instcreation(
+ self._new_instance if self._new_instance is not None
+ else self._old_instance,
+ fltr_id)
+ else:
+ LOG.debug("sending instance modification indication for job %s"
+ " with ID: %s", self._job, fltr_id)
+ self._indication_manager.send_instmodification(
+ self._old_instance, self._new_instance,
+ fltr_id)
+
class JobManager(threading.Thread):
"""
Separate thread for managing queue of jobs requested by client.
@@ -105,10 +297,11 @@ class JobManager(threading.Thread):
ACTION_NAMES = ['remove']
- def __init__(self, queue_in, queue_out):
+ def __init__(self, queue_in, queue_out, indication_manager):
threading.Thread.__init__(self, name="JobManager")
self._queue_in = queue_in
self._queue_out = queue_out
+ self._indication_manager = indication_manager
self._terminate = False
# (time, jobid, action)
@@ -175,7 +368,9 @@ class JobManager(threading.Thread):
LOG.debug('job %s enqued for YumWorker to handle', job)
heapq.heappush(self._job_queue, job)
if getattr(job, 'async', False) is True:
+ ind = self._prepare_indication_for(job, IND_JOB_CREATED)
self._async_jobs[job.jobid] = job
+ ind.send()
self._job_enqueued.notify()
@trace_function
@@ -214,6 +409,14 @@ class JobManager(threading.Thread):
msg = "unsupported action: %s" % action
raise ValueError(msg)
+ @trace_function
+ def _prepare_indication_for(self, job, *args, **kwargs):
+ """
+ Return instance of ``JobIndicationSender``.
+ """
+ return JobIndicationSender(self._indication_manager, job,
+ *args, **kwargs)
+
# *************************************************************************
# Job handlers
# *************************************************************************
@@ -245,9 +448,11 @@ class JobManager(threading.Thread):
if not isinstance(new_priority, (int, long)):
raise TypeError('priority must be an integer')
if job.priority != new_priority:
+ ind = self._prepare_indication_for(job)
job.update(priority=new_priority)
if job in self._job_queue:
heapq.heapify(self._job_queue)
+ ind.send(True)
return job
@job_handler()
@@ -266,6 +471,7 @@ class JobManager(threading.Thread):
del self._calendar[i]
heapq.heapify(self._calendar)
break
+ ind = self._prepare_indication_for(job)
if delete_on_completion:
schedule_at = time_before_removal
if job.finished:
@@ -273,6 +479,7 @@ class JobManager(threading.Thread):
self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE)
job.delete_on_completion = delete_on_completion
job.time_before_removal = time_before_removal
+ ind.send(True)
return job
@job_handler()
@@ -280,7 +487,9 @@ class JobManager(threading.Thread):
"""
Updates any job metadata.
"""
+ ind = self._prepare_indication_for(job)
job.update(**data)
+ ind.send(True)
return job
@job_handler()
@@ -311,7 +520,9 @@ class JobManager(threading.Thread):
raise errors.InvalidJobState('job "%s" already finished' % job)
self._job_queue.remove(job)
heapq.heapify(self._job_queue)
+ ind = self._prepare_indication_for(job)
job.finish(result=job.RESULT_TERMINATED)
+ ind.send(True)
LOG.info('terminated not started job "%s"', job)
return job
@@ -344,6 +555,9 @@ class JobManager(threading.Thread):
if job.state != job.RUNNING:
raise errors.InvalidJobState(
'can not finish not started job "%s"' % job)
+ if getattr(job, 'async', False):
+ ind = self._prepare_indication_for(job,
+ (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED))
job.finish(result, result_data)
if getattr(job, 'async', False):
if job.delete_on_completion:
@@ -351,6 +565,11 @@ class JobManager(threading.Thread):
, MINIMUM_TIME_BEFORE_REMOVAL)
self._schedule_event(schedule_at, job.jobid,
self.ACTION_REMOVE)
+ if result == job.RESULT_SUCCESS:
+ ind.add_indication_ids(IND_JOB_SUCCEEDED)
+ elif result == job.RESULT_ERROR:
+ ind.add_indication_ids(IND_JOB_FAILED)
+ ind.send(True)
else:
LOG.debug("sending reply for %s: (%s, %s)", job,
job.ResultNames[job.result], job.result_data)
@@ -377,7 +596,13 @@ class JobManager(threading.Thread):
raise Queue.Empty
job = heapq.heappop(self._job_queue)
if job is not None:
- job.start()
+ if getattr(job, "async", False):
+ ind = self._prepare_indication_for(job,
+ (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED))
+ job.start()
+ ind.send(True)
+ else:
+ job.start()
return job
def run(self):