summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-03-27 15:35:08 +0100
committerMichal Minar <miminar@redhat.com>2013-04-11 13:51:56 +0200
commit7c0c3eeb54e177499d164dc9df6b3e1368934a0d (patch)
treeeb592abbc42a8cda563f375811f7935550a9c64e /src
parentd4e0eafa12c7a2b9a4748021ae7e520845dc5b8c (diff)
downloadopenlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.tar.gz
openlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.tar.xz
openlmi-providers-7c0c3eeb54e177499d164dc9df6b3e1368934a0d.zip
added indication manager to openlmi.common
IndicationManager python module is useful for many providers across OpenLMI project. That's why it's landing in openlmi.common subpackage from openlmi-storage. It supports checking for static filters IndicationManager allows to check and install static filters at broker at runtime - but it should be avoided filters should work also under sfcbd
Diffstat (limited to 'src')
-rw-r--r--src/python/openlmi/common/IndicationManager.py681
-rw-r--r--src/software/openlmi/software/yumdb/jobmanager.py233
2 files changed, 910 insertions, 4 deletions
diff --git a/src/python/openlmi/common/IndicationManager.py b/src/python/openlmi/common/IndicationManager.py
new file mode 100644
index 0000000..fe01b70
--- /dev/null
+++ b/src/python/openlmi/common/IndicationManager.py
@@ -0,0 +1,681 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+# -*- coding: utf-8 -*-
+"""
+ .. autoclass:: IndicationManager
+ :members:
+"""
+
+import pywbem
+from Queue import Queue
+import re
+import socket
+import threading
+
+import openlmi.common.cmpi_logging as cmpi_logging
+from openlmi.common import singletonmixin
+
+RE_FILTER_NAME = re.compile(r'^(?P<prefix>lmi:'
+ r'(?P<class_name>[a-z0-9_]+):)(?P<filter_id>.*)$', re.IGNORECASE)
+
+FILTER_DEFAULTS = {
+ "SourceNamespaces" : ["root/cimv2"],
+ "QueryLanguage" : "CIM:CQL"
+}
+
+@cmpi_logging.trace_function
+def enumerate_namespaces(ch):
+ """
+ Return tuple ``([CIM_Namespace instance, ...], ns_interop)``. Where
+ first item is a list of object paths of all namespaces in broker and
+ the second is a name of namespace, where this information can be found.
+
+ :param ch CIMOM handle.
+ """
+ nsclasses = ["CIM_Namespace", "__Namespace"]
+ namespaces = ['root/cimv2', 'root/PG_InterOp', 'Interop',
+ 'interop', 'root', 'root/interop']
+ nspaths = []
+ ns = None
+ for cls in nsclasses:
+ for ns in namespaces:
+ try:
+ nspaths = [nm for nm in ch.EnumerateInstanceNames(ns, cls)]
+ if nspaths:
+ break
+ except pywbem.CIMError as exc:
+ if exc[0] in (
+ pywbem.CIM_ERR_INVALID_NAMESPACE,
+ pywbem.CIM_ERR_NOT_SUPPORTED,
+ pywbem.CIM_ERR_INVALID_CLASS):
+ pass
+ if exc[0] == pywbem.CIM_ERR_FAILED:
+ cmpi_logging.logger.error("EnumerateInstanceNames failed"
+ " for %s:%s: %s", ns, cls, str(exc))
+ else:
+ raise
+ if nspaths:
+ break
+ if not nspaths:
+ cmpi_logging.logger.error("failed to enumerate namespaces")
+ ns = None
+ return (nspaths, ns)
+
+@cmpi_logging.trace_function
+def find_ns_interop(ch):
+ """
+ Return name of interop namespace, where ``CIM_IndicationFilter``
+ class reside.
+
+ :param ch CIMOM handle.
+ """
+ _, ns_interop = enumerate_namespaces(ch)
+ return ns_interop
+
+def make_filter_name(class_name, fltr_id):
+ """
+ Return value for ``CIM_IndicationFilter.Name`` property.
+ """
+ return "LMI:%s:%s" % (class_name, fltr_id)
+
+def parse_filter_name(name):
+ """
+ Return tuple ``(class_name, filter_id)``.
+
+ :param name (``string``) Value of cim filter's *Name* property.
+ """
+ match = RE_FILTER_NAME.match(name)
+ if not match:
+ raise ValueError('Could not parse filter name: "%s"' % name)
+ return (match.group("class_name"), match.group("filter_id"))
+
+@cmpi_logging.trace_function
+def make_indication_filter_path(class_name, fltr_id, ns_interop):
+ """
+ Return CIM_IndicationFilter instance path for given filter id.
+
+ :param fltr_id (``string``) Filter name.
+ """
+ for arg in ('class_name', 'fltr_id', 'ns_interop'):
+ if not isinstance(locals()[arg], basestring):
+ raise TypeError("%s must be basestring" % arg)
+ cop = pywbem.CIMInstanceName("CIM_IndicationFilter",
+ namespace=ns_interop)
+ cop['CreationClassName'] = 'CIM_IndicationFilter'
+ cop['SystemCreationClassName'] = 'CIM_ComputerSystem'
+ cop['SystemName'] = socket.gethostname()
+ cop['Name'] = make_filter_name(class_name, fltr_id)
+ return cop
+
+@cmpi_logging.trace_function
+def remove_cimom_filter(ch, fltr_path):
+ """
+ Deletes instance of CIM_IndicationFilter installed at broker with all
+ referencing subscriptions.
+
+ Returns list of subscription instace names, that were deleted.
+
+ :param fltr_path (``CIMInstanceName``) Path of ``CIM_IndicationFilter`` to
+ remove.
+ """
+ if not isinstance(fltr_path, pywbem.CIMInstanceName):
+ raise TypeError("fltr_path must be a CIMInstanceName")
+
+ referents = []
+ for ref in ch.AssociatorNames(fltr_path,
+ role="Filter",
+ resultRole="Handler",
+ resultClass="CIM_IndicationSubscription"):
+ ch.DeleteInstance(ref)
+ referents.append(ref)
+ ch.DeleteInstance(fltr_path)
+ cmpi_logging.logger.debug('removed indication filter "%s" with %d'
+ ' referents', fltr_path["Name"], len(referents))
+ return referents
+
+class IndicationManager(singletonmixin.Singleton):
+ """
+ Using ``IndicationManager`` class
+ providers can send indications without bothering with handling of
+ indication subscriptions.
+
+ Usage:
+
+ 1. Subclass CIM_InstCreation and CIM_InstModification.
+
+ 2. In your initialization routine, create one ``IndicationManager``
+ instance. E.g. one for whole ``LMI_Storage`` may is enough. Like
+ this::
+
+ indication_manager = \
+ IndicationManager.get_instance(env, "Storage", "root/cimv2")
+
+ 3. Call ``indication_manager.add_filters()`` with all filters your
+ providers support for particular CIM class. This method can be called
+ multiple times.
+ For example::
+
+ filters = {
+ "JobPercentUpdated": {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE"
+ " SourceInstance ISA LMI_StorageJob AND"
+ " SourceInstance.CIM_ConcreteJob::PercentComplete <>"
+ " PreviousInstance.CIM_ConcreteJob::PercentComplete",
+ "Description" : "Modification of Percentage Complete for"
+ " a Concrete Job.",
+ },
+ "JobSucceeded": {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE"
+ " SourceInstance ISA LMI_StorageJob AND"
+ " SourceInstance.CIM_ConcreteJob::JobState = "
+ " CIM_ConcreteJob.JobState#'Completed'",
+ "Description": "Modification of Operational Status for"
+ " a Concrete Job to 'Complete' and 'OK'.",
+ },
+ #... other indications
+ }
+ instance_manager.add_filters("LMI_StorageJob", filters)
+
+ First argument is a name of class to which indications apply. We'll call
+ it *Scoping class*.
+
+ 4. In your provider module, implement indication functions like this::
+
+ def authorize_filter(env, fltr, ns, classes, owner):
+ indication_manager.authorize_filter(env, fltr, ns, classes, owner)
+
+ def activate_filter (env, fltr, ns, classes, first_activation):
+ indication_manager.activate_filter(env, fltr, ns, classes,
+ first_activation)
+
+ def deactivate_filter(env, fltr, ns, classes, last_activation):
+ indication_manager.deactivate_filter(env, fltr, ns, classes,
+ last_activation)
+
+ def enable_indications(env):
+ indication_manager.enable_indications(env)
+
+ def disable_indications(env):
+ indication_manager.disable_indications(env)
+
+ From now on, the ``IndicationManager`` will track all subscribed filters.
+ You can query the ``indication_manager.is_subscribed()`` before you create
+ and send an indication. Use ``indication_manager.send_indication()``
+ to send your indications.
+
+ Only static (=preconfigured, read-only) indication filters are
+ supported.
+
+ For user to use these preconfigured filters, they need to be installed
+ at broker as instances of ``CIM_IndicationFilter``. But since they can
+ not be guarded against removel by accident, this object provides a way
+ to reinstall them. But using this is not recomended, since it can upset
+ users. See :ref:`_update_context-label`.
+
+ The supported filters must be passed to add_filters method. The filters
+ are passed as dictionary ``'filter_id' -> {dictionary 'IndicationFilter
+ property' -> 'value'}``. There must be at least ``Query`` property in
+ each filter, CQL is assumed.
+
+ This helper automatically tracks which filters are subscribed. Provider
+ can query ``is_subscribed()`` to check, if filter with given
+ ``filter_id`` is subscribed before generating indications.
+
+ The CMPI interface to send indications is complicated -
+ when an indication is send from CIMOM callback (e.g. ``get_instance``),
+ it must use current ``env`` parameter of the callback and it would be
+ tedious to pass it to ``IndicationManager`` each time. Therefore
+ ``IndicationManager`` creates its own thread, registers it at CIMOM
+ using ``PrepareAttachThread``/``AttachThread``.
+
+ As side-effect, indication can be sent from any thread, there is no
+ need to call ``PrepareAttachThread``/``AttachThread``.
+ """
+ SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity
+
+ def __init__(self, env, nameprefix, namespace, ns_interop=None,
+ queue=None):
+ """
+ Create new ``IndicationManager``. Usually only one instance
+ is necessary for one provider process.
+
+ :param env: (``ProviderEnvironment``) Provider enviroment, taken
+ from CIMOM callback (e.g. ``get_providers()``).
+ :param nameprefix: (``string``) Prefix of your ``CIM_InstCreation``
+ and ``CIM_InstModification`` subclasses, e.g. 'Storage' for
+ ``LMI_StorageInstCreation``.
+ :param namespace: (``string``) Namespace, which will be set to
+ outgoing indications instances.
+ :param ns_interop: (``string``) Namespace, where filters and
+ subscriptions are stored.
+ :param queue: Optional custom input queue with the same interface as
+ ``Queue.Queue``.
+ """
+
+ # { class_name :
+ # { filter_id : filter_properties
+ # , ... }
+ # }
+ self._filters = pywbem.NocaseDict()
+ self._enabled = False
+ # { (class_name, filter_id), ... }
+ self._subscribed_filters = set()
+ self._nameprefix = nameprefix
+ self._namespace = namespace
+ self._ns_interop = ns_interop
+ self._access_lock = threading.RLock()
+ self._env = env
+
+ if queue is None:
+ queue = Queue()
+ self._queue = queue
+ # prepare indication thread
+ ch = env.get_cimom_handle()
+ new_broker = ch.PrepareAttachThread()
+ self._indication_sender = threading.Thread(
+ target=self._send_indications_loop, args=(new_broker,))
+ self._indication_sender.daemon = True
+ self._indication_sender.start()
+
+ @property
+ def enabled(self):
+ """
+ Return a boolean saying, whether indication sending is enabled.
+ """
+ with self._access_lock:
+ return self.enabled
+
+ @property
+ def namespace(self):
+ """
+ Return namespace of outgoing indication instances.
+ """
+ return self._namespace
+
+ @property
+ def nameprefix(self):
+ """
+ Return prefix of indication class names.
+ """
+ return self._nameprefix
+
+ @property
+ def ns_interop(self):
+ """
+ Return interop namespace name.
+ """
+ with self._access_lock:
+ if self._ns_interop is None:
+ ch = self._env.get_cimom_handle()
+ self._ns_interop = find_ns_interop(ch)
+ cmpi_logging.logger.info('found interop namespace: %s',
+ self._ns_interop)
+ return self._ns_interop
+
+ @property
+ def instcreation_classname(self):
+ """
+ Return whole class name of InstCreation indication.
+ """
+ return "LMI_" + self._nameprefix + "InstCreation"
+
+ @property
+ def instmodification_classname(self):
+ """
+ Return whole class name of InstModification indication.
+ """
+ return "LMI_" + self._nameprefix + "InstModification"
+
+ @property
+ def instdeletetion_classname(self):
+ """
+ Return whole class name of InstDeletion indication.
+ """
+ return "LMI_" + self._nameprefix + "InstDeletion"
+
+ @cmpi_logging.trace_method
+ def _get_filter_inst(self, class_name, fltr_id):
+ """
+ Return instance of CIM_IndicationFilter registered in CIMOM if any.
+
+ :param class_name (``string``) *Scoping class* name.
+ :param fltr_id (``string``) Indication name.
+ """
+ ch = self._env.get_cimom_handle()
+ cop = make_indication_filter_path(class_name, fltr_id, self.ns_interop)
+ try:
+ return ch.GetInstance(cop)
+ except pywbem.CIMError as exc:
+ if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND:
+ return None
+ raise
+
+ @cmpi_logging.trace_method
+ def _ensure_cimom_has_filter(self, class_name, fltr_id):
+ """
+ Ensures, that cimom has ``fltr_id`` filter registered as instance.
+ If it has, but the query differs it is recreated at broker.
+
+ :param class_name (``string``) *Scoping class* name.
+ :param fltr_id (``string``) Indication name.
+ """
+ inst = self._get_filter_inst(class_name, fltr_id)
+ ch = self._env.get_cimom_handle()
+ installed = inst is not None
+ referents = []
+ if installed:
+ for prop_name, val in self._filters[class_name][fltr_id].items():
+ if inst[prop_name] != val:
+ cmpi_logging.logger.info("filter \"%s\" is installed, but"
+ " its property \"%s\" has outdated value;"
+ " removing...", fltr_id, prop_name)
+ referents = remove_cimom_filter(ch, inst.path)
+ installed = False
+ if not installed:
+ if inst is not None:
+ path = inst.path
+ else:
+ path = make_indication_filter_path(class_name, fltr_id,
+ self.ns_interop)
+ inst = pywbem.CIMInstance(path.classname, path=path)
+ kwargs = FILTER_DEFAULTS.copy()
+ for key, val in path.keybindings.items():
+ kwargs[key] = val
+ kwargs.update(self._filters[class_name][fltr_id])
+ inst.update(kwargs)
+ try:
+ inst = ch.CreateInstance(inst)
+ cmpi_logging.logger.info("filter \"%s\" installed", fltr_id)
+ except pywbem.CIMError:
+ cmpi_logging.logger.exception(
+ "failed to install indication filter \"%s\"",
+ fltr_id)
+ if referents:
+ cmpi_logging.logger.debug('reinstalling %d filter'
+ ' subscriptions', len(referents))
+ for ref in referents:
+ ch.CreateInstance(ref)
+ return inst
+
+ @cmpi_logging.trace_method
+ def _get_matching_filter(self, query):
+ """
+ Try to find matching filter properties in local ``_filters`` storage
+ and return it. ``None`` is returned if not found.
+
+ Return a tuple ``(class_name, filter_id, filter_properties)``.
+
+ :param query (``string``) Is filter query.
+ """
+ if not isinstance(query, basestring):
+ raise TypeError("query must be a string")
+ for clsname, fltrs in self._filters.iteritems():
+ for fltr_id, props in fltrs.iteritems():
+ if query == props["Query"]:
+ return (clsname, fltr_id, props)
+ return None
+
+ @cmpi_logging.trace_method
+ def ensure_filters_installed(self, class_name=None, fltr_id=None):
+ """
+ This function checks for existence of filters at broker. Filters
+ must be registered with this instance before the check can be done.
+ Without arguments all registered filters will be checked.
+
+ :param class_name (``string``) Name of *Scoped class* that reduces
+ searched filters.
+ :param fltr_id (``string``) Indication name reducing filters that
+ will be checked.
+ """
+ cls_to_check = self._filters.keys()
+ if class_name is not None:
+ cls_to_check = [class_name]
+ filters_to_check = list(
+ (c, f)
+ for c in cls_to_check
+ for f in self._filters[c].keys()
+ if fltr_id is None or fltr_id == f)
+ with self._access_lock:
+ try:
+ for clsname, fltr_id in filters_to_check:
+ self._ensure_cimom_has_filter(clsname, fltr_id)
+ cmpi_logging.logger.debug('filters installed')
+ return True
+ except pywbem.CIMError as exc:
+ if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED:
+ cmpi_logging.logger.error("filters could not be checked"
+ " for presence due to invalid context")
+ return False
+ raise
+
+ @cmpi_logging.trace_method
+ def update_context(self, env):
+ """
+ .. _update_context-label
+
+ When ``IndicationManager`` is initialized upon provider initialization,
+ the conxet given does not contain any user credentials that are
+ needed for communication with broker. In order to check for filter's
+ existence at broker, this method needs to be called first with
+ context containing user's credentials.
+
+ This needs to be called only once.
+
+ **Note** that if you don't plan to check for filter's presence at
+ broker at runtime, you are not interested in this function.
+ """
+ with self._access_lock:
+ self._env = env
+
+ @cmpi_logging.trace_method
+ def add_filters(self, class_name, filters, ensure_installed=False):
+ """
+ Add new filters to the helper. These filters will be allowed for
+ subscription.
+
+ :param filters: (``dictionary filter_id -> filter properties``)
+ The filters. ``filter properties`` is dictionary
+ ``property_name -> value``, where at least ``Query`` property
+ must be set. ``Name`` property will be automatically created
+ as 'LMI:<class_name>:<filter_id>'.
+ :param ensure_installed (``bool``) Whether to check for filter presence
+ at broker and install them if missing. **Note** That in order
+ for this to work, the context must be updated with user's
+ credentials. See :ref:`update_context-label`.
+ """
+ with self._access_lock:
+ if not class_name in self._filters:
+ self._filters[class_name] = pywbem.NocaseDict()
+ self._filters[class_name].update(filters)
+ if ensure_installed:
+ self.ensure_filters_installed(class_name=class_name)
+
+ @cmpi_logging.trace_method
+ def authorize_filter(self, _env, fltr, _ns, _classes, _owner):
+ """
+ AuthorizeFilter callback from CIMOM. Call this method from appropriate
+ CIMOM callback.
+ """
+ with self._access_lock:
+ res = self._get_matching_filter(fltr)
+ if res is not None:
+ self._subscribed_filters.add((res[0], res[1]))
+ cmpi_logging.logger.info("InstanceFilter %s: %s authorized",
+ make_filter_name(res[0], res[1]), fltr)
+ return True
+ return False
+
+ @cmpi_logging.trace_method
+ def activate_filter(self, _env, fltr, _ns, _classes, first_activation):
+ """
+ ActivateFilter callback from CIMOM. Call this method from appropriate
+ CIMOM callback.
+ """
+ with self._access_lock:
+ if not first_activation:
+ return
+ res = self._get_matching_filter(fltr)
+ if res is not None:
+ self._subscribed_filters.add((res[0], res[1]))
+ cmpi_logging.logger.info("InstanceFilter %s: %s started",
+ make_filter_name(res[0], res[1]), fltr)
+
+ @cmpi_logging.trace_method
+ def deactivate_filter(self, _env, fltr, _ns, _classes, last_activation):
+ """
+ DeactivateFilter callback from CIMOM. Call this method from appropriate
+ CIMOM callback.
+ """
+ with self._access_lock:
+ if not last_activation:
+ return
+ res = self._get_matching_filter(fltr)
+ if res is not None:
+ self._subscribed_filters.remove((res[0], res[1]))
+ cmpi_logging.logger.info("InstanceFilter %s: %s stopped",
+ make_filter_name(res[0], res[1]), fltr)
+
+ @cmpi_logging.trace_method
+ def enable_indications(self, _env):
+ """
+ EnableIndications callback from CIMOM. Call this method from
+ appropriate CIMOM callback.
+ """
+ with self._access_lock:
+ self._enabled = True
+ cmpi_logging.logger.info("Indications enabled")
+
+ @cmpi_logging.trace_method
+ def disable_indications(self, _env):
+ """
+ EnableIndications callback from CIMOM. Call this method from
+ appropriate CIMOM callback.
+ """
+ with self._access_lock:
+ self._enabled = False
+ cmpi_logging.logger.info("Indications disabled")
+
+ @cmpi_logging.trace_method
+ def send_indication(self, indication):
+ """
+ Send indication to all subscribers. Call this method from appropriate
+ CIMOM callback.
+ """
+ self._queue.put(indication)
+
+ @cmpi_logging.trace_method
+ def send_instcreation(self, instance, filter_id):
+ """
+ Send ``LMI_<nameprefix>InstCreation`` indication with given instance.
+
+ :param instance: (``CIMInstance``) The created instance.
+ :param filter_id: (``string``) The ID of registered filter which
+ corresponds to this indication.
+ """
+ if not self.is_subscribed(instance.classname, filter_id):
+ return
+ path = pywbem.CIMInstanceName(
+ classname=self.instcreation_classname,
+ namespace=self.namespace)
+ ind = pywbem.CIMInstance(
+ self.instcreation_classname,
+ path=path)
+ ind['SourceInstance'] = instance
+ ind['SourceInstanceHost'] = socket.gethostname()
+ ind['SourceInstanceModelPath'] = str(instance.path)
+ ind['IndicationFilterName'] = make_filter_name(instance.classname, filter_id)
+ ind['PerceivedSeverity'] = self.SEVERITY_INFO
+
+ cmpi_logging.logger.info("Sending indication %s for %s" %
+ (ind["IndicationFilterName"], str(path)))
+ self.send_indication(ind)
+
+ @cmpi_logging.trace_method
+ def send_instmodification(self, old_instance, new_instance, filter_id):
+ """
+ Send ``LMI_<nameprefix>InstModification`` indication with given
+ instance.
+
+ :param old_instance: (``CIMInstance``) The instance before
+ modification.
+ :param new_instance: (``CIMInstance``) The instance after modification.
+ :param filter_id: (``string``) The ID of registered filter which
+ corresponds to this indication.
+ """
+ if not self.is_subscribed(new_instance.classname, filter_id):
+ return
+ path = pywbem.CIMInstanceName(
+ classname=self.instmodification_classname,
+ namespace=self.namespace)
+ ind = pywbem.CIMInstance(
+ self.instcreation_classname,
+ path=path)
+ ind['SourceInstance'] = new_instance
+ ind['PreviousInstance'] = old_instance
+ ind['SourceInstanceHost'] = socket.gethostname()
+ ind['SourceInstanceModelPath'] = str(new_instance.path)
+ ind['IndicationFilterName'] = make_filter_name(
+ new_instance.classname, filter_id)
+ ind['PerceivedSeverity'] = self.SEVERITY_INFO
+
+ cmpi_logging.logger.info("Sending indication %s for %s" %
+ (ind["IndicationFilterName"], str(path)))
+ self.send_indication(ind)
+
+ @cmpi_logging.trace_method
+ def is_subscribed(self, class_name, fltr_id):
+ """
+ Return True, if there is someone subscribed for given filter.
+
+ :param fltr_id: (``string``) ID of the filter to check.
+ """
+ with self._access_lock:
+ if not self._enabled:
+ return False
+ if (class_name, fltr_id) in self._subscribed_filters:
+ return True
+ return False
+
+ @cmpi_logging.trace_method
+ def is_registered(self, class_name, fltr_id):
+ """
+ Return True, if filter id has been registered with current instance.
+
+ :param fltr_id: (``string``) ID of the filter to check.
+ """
+ with self._access_lock:
+ return ( class_name in self._filters
+ and fltr_id in self._filters[class_name])
+
+ @cmpi_logging.trace_method
+ def _send_indications_loop(self, broker):
+ """
+ This method runs in its own thread. It just sends all enqueued
+ indications.
+
+ :param broker: (``BrokerCIMOMHandle``) Handle of the CIMOM.
+ """
+ broker.AttachThread()
+ while True:
+ indication = self._queue.get()
+ cmpi_logging.logger.trace_info("Delivering indication %s" %
+ (str(indication.path)))
+ broker.DeliverIndication(self.namespace, indication)
+ if hasattr(self._queue, "task_done"):
+ self._queue.task_done()
diff --git a/src/software/openlmi/software/yumdb/jobmanager.py b/src/software/openlmi/software/yumdb/jobmanager.py
index 872e81f..7283d05 100644
--- a/src/software/openlmi/software/yumdb/jobmanager.py
+++ b/src/software/openlmi/software/yumdb/jobmanager.py
@@ -19,9 +19,16 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-This is a module for JobManager which is a separate thread of
-YumWorker process. It keeps a cache of asynchronous jobs and handles
+This is a module for ``JobManager`` which is a separate thread of
+``YumWorker`` process. It keeps a cache of asynchronous jobs and handles
input and output queues.
+
+This module uses its own logging facilities because it runs in separeted
+process not having access to broker logging features.
+
+Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should
+be set to callable taking ``YumJob`` instance and returning
+``LMI_SoftwareInstallationJob``.
"""
import heapq
import inspect
@@ -32,14 +39,86 @@ import threading
import time
import traceback
+from openlmi.common import cmpi_logging
+from openlmi.common.IndicationManager import IndicationManager
from openlmi.software.yumdb import errors, jobs
from openlmi.software.yumdb.util import trace_function
+# This is a callable, which must be initialized before JobManager is used.
+# It should be a pointer to function, which takes a job and returns
+# corresponding CIM instance. It's used for sending indications.
+JOB_TO_MODEL = lambda job: None
+JOB_CLASSNAME = "LMI_SoftwareInstallationJob"
+
# Minimum time to keep asynchronous job in cache after completion. In seconds.
MINIMUM_TIME_BEFORE_REMOVAL = 10
+# replacement for cmpi_logging.logger
LOG = None
+IND_JOB_PERCENT_UPDATED = "PercentUpdated"
+IND_JOB_SUCCEEDED = "Succeeded"
+IND_JOB_FAILED = "Failed"
+IND_JOB_CHANGED = "Changed"
+IND_JOB_CREATED = "Created"
+
+IND_FILTERS = {
+ IND_JOB_PERCENT_UPDATED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::PercentComplete <> "
+ "PreviousInstance.CIM_ConcreteJob::PercentComplete",
+ "Description" : "Modification of Percentage Complete for a "
+ "Concrete Job.",
+ },
+ IND_JOB_SUCCEEDED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance ISA LMI_SoftwareInstallationJob AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 17",
+ # symbolic constants not supported by sfcb
+ #"CIM_ConcreteJob.JobState#'Completed'"
+ "Description": "Modification of Operational Status for a "
+ "Concrete Job to 'Complete' and 'OK'.",
+ },
+ IND_JOB_FAILED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 10",
+ # symbolic constants not supported by sfcb
+ #"CIM_ConcreteJob.JobState#'Exception'",
+ "Description": "Modification of Operational Status for a "
+ "Concrete Job to 'Complete' and 'Error'.",
+ },
+ IND_JOB_CHANGED: {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState <> "
+ "PreviousInstance.CIM_ConcreteJob::JobState",
+ "Description": "Modification of Job State for a ConcreteJob.",
+ },
+ IND_JOB_CREATED: {
+ "Query" : "SELECT * FROM CIM_InstCreation WHERE "
+ "SourceInstance ISA %(classname)s",
+ "Description": "Creation of a ConcreteJob.",
+ },
+}
+
+@cmpi_logging.trace_function
+def register_filters(indication_manager):
+ """
+ This function registers static indication filters at IndicationManager.
+ It should be called upon provider's initialization.
+ """
+ to_register = {}
+ for fltr_id, fltr in IND_FILTERS.items():
+ if not indication_manager.is_registered(JOB_CLASSNAME, fltr_id):
+ fltr["Query"] = fltr["Query"] % {"classname" : JOB_CLASSNAME }
+ to_register[fltr_id] = fltr
+ if to_register:
+ indication_manager.add_filters(JOB_CLASSNAME,
+ to_register, ensure_installed=False)
+
# *****************************************************************************
# Decorators
# *****************************************************************************
@@ -84,6 +163,119 @@ def job_handler(job_from_target=True):
else:
return _simple_wrapper
+class JobIndicationSender(object):
+ """
+ Makes creation and sending of indications easy. It keeps a reference
+ to job, which can be *snapshotted* for making CIM instance out of it.
+ These instances are then used to send indications via IndicationManager.
+
+ Typical usage::
+
+ sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2])
+ ... # modify job
+ sender.snapshot()
+ sender.send()
+
+ **Note** that number of kept CIM instances won't exceed 2. First one
+ is created upon instantiation and the second one be calling
+ ``snapshot()``. Any successive call to ``snapshot()`` will overwrite
+ the second instance.
+ """
+
+ def __init__(self, indication_manager, job,
+ indications=IND_JOB_CHANGED, new=None):
+ """
+ :param job (``YumJob``) Is job instance, which will be immediately
+ snapshoted as old instance and later as a new one.
+ :param indications (``list``) Can either be a list of indication ids
+ or a single indication id.
+ :param new (``YumJob``) A job instance stored as new.
+ """
+ if not isinstance(indication_manager, IndicationManager):
+ raise TypeError("indication_manager must be a subclass of"
+ " IndicationManager")
+ if not isinstance(job, jobs.YumJob):
+ raise TypeError("job must be an instance of YumJob")
+ if not new is None and not isinstance(new, jobs.YumJob):
+ raise TypeError("new must be an instance of YumJob")
+ self._indication_manager = indication_manager
+ self._job = job
+ self._old_instance = JOB_TO_MODEL(job)
+ if new is not None:
+ new = JOB_TO_MODEL(job)
+ self._new_instance = new
+ self._indications = set()
+ self.indication_ids = indications
+
+ @property
+ def job(self):
+ """
+ Return instance of ``YumJob``.
+ """
+ return self._job
+
+ @property
+ def indication_ids(self):
+ """
+ Return set of indication filter IDs.
+ """
+ return self._indications.copy()
+
+ @indication_ids.setter
+ def indication_ids(self, indication_ids):
+ """
+ Set the indication filter IDs.
+
+ :param indication_ids (``list``) Can be even single id.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications = set(indication_ids)
+
+ @trace_function
+ def add_indication_ids(self, indication_ids):
+ """
+ Add filter IDs.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications.update(indication_ids)
+
+ @trace_function
+ def snapshot(self):
+ """
+ Make a second CIM instance, overwriting previous one (not the first).
+ """
+ self._new_instance = JOB_TO_MODEL(self._job)
+
+ @trace_function
+ def send(self, make_snapshot=False):
+ """
+ Send all requested indications for given job.
+ """
+ if not self._indications:
+ raise errors.IndicationError(
+ "can not send any indication without id")
+ if make_snapshot:
+ self.snapshot()
+ if ( IND_JOB_CHANGED in self._indications
+ and self._new_instance is None):
+ raise errors.IndicationError("no snapshot made for modified job")
+ for fltr_id in self._indications:
+ if fltr_id == IND_JOB_CREATED:
+ LOG.debug("sending instance creation indication for job %s",
+ self._job)
+ self._indication_manager.send_instcreation(
+ self._new_instance if self._new_instance is not None
+ else self._old_instance,
+ fltr_id)
+ else:
+ LOG.debug("sending instance modification indication for job %s"
+ " with ID: %s", self._job, fltr_id)
+ self._indication_manager.send_instmodification(
+ self._old_instance, self._new_instance,
+ fltr_id)
+
class JobManager(threading.Thread):
"""
Separate thread for managing queue of jobs requested by client.
@@ -105,10 +297,11 @@ class JobManager(threading.Thread):
ACTION_NAMES = ['remove']
- def __init__(self, queue_in, queue_out):
+ def __init__(self, queue_in, queue_out, indication_manager):
threading.Thread.__init__(self, name="JobManager")
self._queue_in = queue_in
self._queue_out = queue_out
+ self._indication_manager = indication_manager
self._terminate = False
# (time, jobid, action)
@@ -175,7 +368,9 @@ class JobManager(threading.Thread):
LOG.debug('job %s enqued for YumWorker to handle', job)
heapq.heappush(self._job_queue, job)
if getattr(job, 'async', False) is True:
+ ind = self._prepare_indication_for(job, IND_JOB_CREATED)
self._async_jobs[job.jobid] = job
+ ind.send()
self._job_enqueued.notify()
@trace_function
@@ -214,6 +409,14 @@ class JobManager(threading.Thread):
msg = "unsupported action: %s" % action
raise ValueError(msg)
+ @trace_function
+ def _prepare_indication_for(self, job, *args, **kwargs):
+ """
+ Return instance of ``JobIndicationSender``.
+ """
+ return JobIndicationSender(self._indication_manager, job,
+ *args, **kwargs)
+
# *************************************************************************
# Job handlers
# *************************************************************************
@@ -245,9 +448,11 @@ class JobManager(threading.Thread):
if not isinstance(new_priority, (int, long)):
raise TypeError('priority must be an integer')
if job.priority != new_priority:
+ ind = self._prepare_indication_for(job)
job.update(priority=new_priority)
if job in self._job_queue:
heapq.heapify(self._job_queue)
+ ind.send(True)
return job
@job_handler()
@@ -266,6 +471,7 @@ class JobManager(threading.Thread):
del self._calendar[i]
heapq.heapify(self._calendar)
break
+ ind = self._prepare_indication_for(job)
if delete_on_completion:
schedule_at = time_before_removal
if job.finished:
@@ -273,6 +479,7 @@ class JobManager(threading.Thread):
self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE)
job.delete_on_completion = delete_on_completion
job.time_before_removal = time_before_removal
+ ind.send(True)
return job
@job_handler()
@@ -280,7 +487,9 @@ class JobManager(threading.Thread):
"""
Updates any job metadata.
"""
+ ind = self._prepare_indication_for(job)
job.update(**data)
+ ind.send(True)
return job
@job_handler()
@@ -311,7 +520,9 @@ class JobManager(threading.Thread):
raise errors.InvalidJobState('job "%s" already finished' % job)
self._job_queue.remove(job)
heapq.heapify(self._job_queue)
+ ind = self._prepare_indication_for(job)
job.finish(result=job.RESULT_TERMINATED)
+ ind.send(True)
LOG.info('terminated not started job "%s"', job)
return job
@@ -344,6 +555,9 @@ class JobManager(threading.Thread):
if job.state != job.RUNNING:
raise errors.InvalidJobState(
'can not finish not started job "%s"' % job)
+ if getattr(job, 'async', False):
+ ind = self._prepare_indication_for(job,
+ (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED))
job.finish(result, result_data)
if getattr(job, 'async', False):
if job.delete_on_completion:
@@ -351,6 +565,11 @@ class JobManager(threading.Thread):
, MINIMUM_TIME_BEFORE_REMOVAL)
self._schedule_event(schedule_at, job.jobid,
self.ACTION_REMOVE)
+ if result == job.RESULT_SUCCESS:
+ ind.add_indication_ids(IND_JOB_SUCCEEDED)
+ elif result == job.RESULT_ERROR:
+ ind.add_indication_ids(IND_JOB_FAILED)
+ ind.send(True)
else:
LOG.debug("sending reply for %s: (%s, %s)", job,
job.ResultNames[job.result], job.result_data)
@@ -377,7 +596,13 @@ class JobManager(threading.Thread):
raise Queue.Empty
job = heapq.heappop(self._job_queue)
if job is not None:
- job.start()
+ if getattr(job, "async", False):
+ ind = self._prepare_indication_for(job,
+ (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED))
+ job.start()
+ ind.send(True)
+ else:
+ job.start()
return job
def run(self):