diff options
-rw-r--r-- | mof/LMI_SoftwareIndicationFilters.mof | 104 | ||||
-rw-r--r-- | src/python/openlmi/common/IndicationManager.py | 681 | ||||
-rw-r--r-- | src/software/openlmi/software/yumdb/jobmanager.py | 233 |
3 files changed, 1014 insertions, 4 deletions
diff --git a/mof/LMI_SoftwareIndicationFilters.mof b/mof/LMI_SoftwareIndicationFilters.mof new file mode 100644 index 0000000..e3af599 --- /dev/null +++ b/mof/LMI_SoftwareIndicationFilters.mof @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: Michal Minar <miminar@redhat.com> + * + * These are static CIM_IndicationFilter instances used by software managent + * providers. They should be installed upon providers installation. + * They should go to root/interop namespace for sfcb and root/PG_Interop + * for pegasus. + */ + +instance of CIM_IndicationFilter { + CreationClassName = "CIM_IndicationFilter"; + SystemCreationClassName = "CIM_ComputerSystem"; + SystemName = "kvm-fedora18"; + Name = "LMI:LMI_SoftwareInstallationJob:PercentUpdated"; + Query = "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA LMI_SoftwareInstallationJob AND " + "SourceInstance.CIM_ConcreteJob::PercentComplete <> " + "PreviousInstance.CIM_ConcreteJob::PercentComplete"; + /* CIM: prefix required for pegasus */ + QueryLanguage = "CIM:CQL"; + Description = "Modification of Percentage Complete for a " + "Concrete Job."; + /* required for sfcb - but deprecated */ + SourceNamespace = "root/cimv2"; + SourceNamespaces = {"root/cimv2"}; +}; + +instance of CIM_IndicationFilter { + CreationClassName = "CIM_IndicationFilter"; + SystemCreationClassName = "CIM_ComputerSystem"; + SystemName = "kvm-fedora18"; + Name = "LMI:LMI_SoftwareInstallationJob:Succeeded"; + QueryLanguage = "CIM:CQL"; + Query = "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA LMI_SoftwareInstallationJob AND " + "SourceInstance.CIM_ConcreteJob::JobState" + " = 17" + /* This is not supported by sfcb: + * " = CIM_ConcreteJob.JobState#'Completed'" */; + SourceNamespace = "root/cimv2"; + SourceNamespaces = {"root/cimv2"}; +}; + +instance of CIM_IndicationFilter { + CreationClassName = "CIM_IndicationFilter"; + SystemCreationClassName = "CIM_ComputerSystem"; + SystemName = "kvm-fedora18"; + Name = "LMI:LMI_SoftwareInstallationJob:Failed"; + Query = "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA LMI_SoftwareInstallationJob AND " + "SourceInstance.CIM_ConcreteJob::JobState" + " = 10" + /* This is not supported by sfcb: + * "CIM_ConcreteJob.JobState#'Exception'" */; + QueryLanguage = "CIM:CQL"; + Description = "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'OK'."; + SourceNamespace = "root/cimv2"; + SourceNamespaces = {"root/cimv2"}; +}; + +instance of CIM_IndicationFilter { + CreationClassName = "CIM_IndicationFilter"; + SystemCreationClassName = "CIM_ComputerSystem"; + SystemName = "kvm-fedora18"; + Name = "LMI:LMI_SoftwareInstallationJob:Changed"; + Query = "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA LMI_SoftwareInstallationJob AND " + "SourceInstance.CIM_ConcreteJob::JobState <> " + "PreviousInstance.CIM_ConcreteJob::JobState"; + QueryLanguage = "CIM:CQL"; + Description = "Modification of Job State for a ConcreteJob."; + SourceNamespace = "root/cimv2"; + SourceNamespaces = {"root/cimv2"}; +}; + +instance of CIM_IndicationFilter { + CreationClassName = "CIM_IndicationFilter"; + SystemCreationClassName = "CIM_ComputerSystem"; + SystemName = "kvm-fedora18"; + Name = "LMI:LMI_SoftwareInstallationJob:Created"; + Query = "SELECT * FROM CIM_InstCreation WHERE" + " SourceInstance ISA LMI_SoftwareInstallationJob"; + QueryLanguage = "CIM:CQL"; + Description = "Creation of a ConcreteJob."; + SourceNamespace = "root/cimv2"; + SourceNamespaces = {"root/cimv2"}; +}; diff --git a/src/python/openlmi/common/IndicationManager.py b/src/python/openlmi/common/IndicationManager.py new file mode 100644 index 0000000..fe01b70 --- /dev/null +++ b/src/python/openlmi/common/IndicationManager.py @@ -0,0 +1,681 @@ +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek <jsafrane@redhat.com> +# Authors: Michal Minar <miminar@redhat.com> +# -*- coding: utf-8 -*- +""" + .. autoclass:: IndicationManager + :members: +""" + +import pywbem +from Queue import Queue +import re +import socket +import threading + +import openlmi.common.cmpi_logging as cmpi_logging +from openlmi.common import singletonmixin + +RE_FILTER_NAME = re.compile(r'^(?P<prefix>lmi:' + r'(?P<class_name>[a-z0-9_]+):)(?P<filter_id>.*)$', re.IGNORECASE) + +FILTER_DEFAULTS = { + "SourceNamespaces" : ["root/cimv2"], + "QueryLanguage" : "CIM:CQL" +} + +@cmpi_logging.trace_function +def enumerate_namespaces(ch): + """ + Return tuple ``([CIM_Namespace instance, ...], ns_interop)``. Where + first item is a list of object paths of all namespaces in broker and + the second is a name of namespace, where this information can be found. + + :param ch CIMOM handle. + """ + nsclasses = ["CIM_Namespace", "__Namespace"] + namespaces = ['root/cimv2', 'root/PG_InterOp', 'Interop', + 'interop', 'root', 'root/interop'] + nspaths = [] + ns = None + for cls in nsclasses: + for ns in namespaces: + try: + nspaths = [nm for nm in ch.EnumerateInstanceNames(ns, cls)] + if nspaths: + break + except pywbem.CIMError as exc: + if exc[0] in ( + pywbem.CIM_ERR_INVALID_NAMESPACE, + pywbem.CIM_ERR_NOT_SUPPORTED, + pywbem.CIM_ERR_INVALID_CLASS): + pass + if exc[0] == pywbem.CIM_ERR_FAILED: + cmpi_logging.logger.error("EnumerateInstanceNames failed" + " for %s:%s: %s", ns, cls, str(exc)) + else: + raise + if nspaths: + break + if not nspaths: + cmpi_logging.logger.error("failed to enumerate namespaces") + ns = None + return (nspaths, ns) + +@cmpi_logging.trace_function +def find_ns_interop(ch): + """ + Return name of interop namespace, where ``CIM_IndicationFilter`` + class reside. + + :param ch CIMOM handle. + """ + _, ns_interop = enumerate_namespaces(ch) + return ns_interop + +def make_filter_name(class_name, fltr_id): + """ + Return value for ``CIM_IndicationFilter.Name`` property. + """ + return "LMI:%s:%s" % (class_name, fltr_id) + +def parse_filter_name(name): + """ + Return tuple ``(class_name, filter_id)``. + + :param name (``string``) Value of cim filter's *Name* property. + """ + match = RE_FILTER_NAME.match(name) + if not match: + raise ValueError('Could not parse filter name: "%s"' % name) + return (match.group("class_name"), match.group("filter_id")) + +@cmpi_logging.trace_function +def make_indication_filter_path(class_name, fltr_id, ns_interop): + """ + Return CIM_IndicationFilter instance path for given filter id. + + :param fltr_id (``string``) Filter name. + """ + for arg in ('class_name', 'fltr_id', 'ns_interop'): + if not isinstance(locals()[arg], basestring): + raise TypeError("%s must be basestring" % arg) + cop = pywbem.CIMInstanceName("CIM_IndicationFilter", + namespace=ns_interop) + cop['CreationClassName'] = 'CIM_IndicationFilter' + cop['SystemCreationClassName'] = 'CIM_ComputerSystem' + cop['SystemName'] = socket.gethostname() + cop['Name'] = make_filter_name(class_name, fltr_id) + return cop + +@cmpi_logging.trace_function +def remove_cimom_filter(ch, fltr_path): + """ + Deletes instance of CIM_IndicationFilter installed at broker with all + referencing subscriptions. + + Returns list of subscription instace names, that were deleted. + + :param fltr_path (``CIMInstanceName``) Path of ``CIM_IndicationFilter`` to + remove. + """ + if not isinstance(fltr_path, pywbem.CIMInstanceName): + raise TypeError("fltr_path must be a CIMInstanceName") + + referents = [] + for ref in ch.AssociatorNames(fltr_path, + role="Filter", + resultRole="Handler", + resultClass="CIM_IndicationSubscription"): + ch.DeleteInstance(ref) + referents.append(ref) + ch.DeleteInstance(fltr_path) + cmpi_logging.logger.debug('removed indication filter "%s" with %d' + ' referents', fltr_path["Name"], len(referents)) + return referents + +class IndicationManager(singletonmixin.Singleton): + """ + Using ``IndicationManager`` class + providers can send indications without bothering with handling of + indication subscriptions. + + Usage: + + 1. Subclass CIM_InstCreation and CIM_InstModification. + + 2. In your initialization routine, create one ``IndicationManager`` + instance. E.g. one for whole ``LMI_Storage`` may is enough. Like + this:: + + indication_manager = \ + IndicationManager.get_instance(env, "Storage", "root/cimv2") + + 3. Call ``indication_manager.add_filters()`` with all filters your + providers support for particular CIM class. This method can be called + multiple times. + For example:: + + filters = { + "JobPercentUpdated": { + "Query" : "SELECT * FROM CIM_InstModification WHERE" + " SourceInstance ISA LMI_StorageJob AND" + " SourceInstance.CIM_ConcreteJob::PercentComplete <>" + " PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for" + " a Concrete Job.", + }, + "JobSucceeded": { + "Query" : "SELECT * FROM CIM_InstModification WHERE" + " SourceInstance ISA LMI_StorageJob AND" + " SourceInstance.CIM_ConcreteJob::JobState = " + " CIM_ConcreteJob.JobState#'Completed'", + "Description": "Modification of Operational Status for" + " a Concrete Job to 'Complete' and 'OK'.", + }, + #... other indications + } + instance_manager.add_filters("LMI_StorageJob", filters) + + First argument is a name of class to which indications apply. We'll call + it *Scoping class*. + + 4. In your provider module, implement indication functions like this:: + + def authorize_filter(env, fltr, ns, classes, owner): + indication_manager.authorize_filter(env, fltr, ns, classes, owner) + + def activate_filter (env, fltr, ns, classes, first_activation): + indication_manager.activate_filter(env, fltr, ns, classes, + first_activation) + + def deactivate_filter(env, fltr, ns, classes, last_activation): + indication_manager.deactivate_filter(env, fltr, ns, classes, + last_activation) + + def enable_indications(env): + indication_manager.enable_indications(env) + + def disable_indications(env): + indication_manager.disable_indications(env) + + From now on, the ``IndicationManager`` will track all subscribed filters. + You can query the ``indication_manager.is_subscribed()`` before you create + and send an indication. Use ``indication_manager.send_indication()`` + to send your indications. + + Only static (=preconfigured, read-only) indication filters are + supported. + + For user to use these preconfigured filters, they need to be installed + at broker as instances of ``CIM_IndicationFilter``. But since they can + not be guarded against removel by accident, this object provides a way + to reinstall them. But using this is not recomended, since it can upset + users. See :ref:`_update_context-label`. + + The supported filters must be passed to add_filters method. The filters + are passed as dictionary ``'filter_id' -> {dictionary 'IndicationFilter + property' -> 'value'}``. There must be at least ``Query`` property in + each filter, CQL is assumed. + + This helper automatically tracks which filters are subscribed. Provider + can query ``is_subscribed()`` to check, if filter with given + ``filter_id`` is subscribed before generating indications. + + The CMPI interface to send indications is complicated - + when an indication is send from CIMOM callback (e.g. ``get_instance``), + it must use current ``env`` parameter of the callback and it would be + tedious to pass it to ``IndicationManager`` each time. Therefore + ``IndicationManager`` creates its own thread, registers it at CIMOM + using ``PrepareAttachThread``/``AttachThread``. + + As side-effect, indication can be sent from any thread, there is no + need to call ``PrepareAttachThread``/``AttachThread``. + """ + SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity + + def __init__(self, env, nameprefix, namespace, ns_interop=None, + queue=None): + """ + Create new ``IndicationManager``. Usually only one instance + is necessary for one provider process. + + :param env: (``ProviderEnvironment``) Provider enviroment, taken + from CIMOM callback (e.g. ``get_providers()``). + :param nameprefix: (``string``) Prefix of your ``CIM_InstCreation`` + and ``CIM_InstModification`` subclasses, e.g. 'Storage' for + ``LMI_StorageInstCreation``. + :param namespace: (``string``) Namespace, which will be set to + outgoing indications instances. + :param ns_interop: (``string``) Namespace, where filters and + subscriptions are stored. + :param queue: Optional custom input queue with the same interface as + ``Queue.Queue``. + """ + + # { class_name : + # { filter_id : filter_properties + # , ... } + # } + self._filters = pywbem.NocaseDict() + self._enabled = False + # { (class_name, filter_id), ... } + self._subscribed_filters = set() + self._nameprefix = nameprefix + self._namespace = namespace + self._ns_interop = ns_interop + self._access_lock = threading.RLock() + self._env = env + + if queue is None: + queue = Queue() + self._queue = queue + # prepare indication thread + ch = env.get_cimom_handle() + new_broker = ch.PrepareAttachThread() + self._indication_sender = threading.Thread( + target=self._send_indications_loop, args=(new_broker,)) + self._indication_sender.daemon = True + self._indication_sender.start() + + @property + def enabled(self): + """ + Return a boolean saying, whether indication sending is enabled. + """ + with self._access_lock: + return self.enabled + + @property + def namespace(self): + """ + Return namespace of outgoing indication instances. + """ + return self._namespace + + @property + def nameprefix(self): + """ + Return prefix of indication class names. + """ + return self._nameprefix + + @property + def ns_interop(self): + """ + Return interop namespace name. + """ + with self._access_lock: + if self._ns_interop is None: + ch = self._env.get_cimom_handle() + self._ns_interop = find_ns_interop(ch) + cmpi_logging.logger.info('found interop namespace: %s', + self._ns_interop) + return self._ns_interop + + @property + def instcreation_classname(self): + """ + Return whole class name of InstCreation indication. + """ + return "LMI_" + self._nameprefix + "InstCreation" + + @property + def instmodification_classname(self): + """ + Return whole class name of InstModification indication. + """ + return "LMI_" + self._nameprefix + "InstModification" + + @property + def instdeletetion_classname(self): + """ + Return whole class name of InstDeletion indication. + """ + return "LMI_" + self._nameprefix + "InstDeletion" + + @cmpi_logging.trace_method + def _get_filter_inst(self, class_name, fltr_id): + """ + Return instance of CIM_IndicationFilter registered in CIMOM if any. + + :param class_name (``string``) *Scoping class* name. + :param fltr_id (``string``) Indication name. + """ + ch = self._env.get_cimom_handle() + cop = make_indication_filter_path(class_name, fltr_id, self.ns_interop) + try: + return ch.GetInstance(cop) + except pywbem.CIMError as exc: + if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND: + return None + raise + + @cmpi_logging.trace_method + def _ensure_cimom_has_filter(self, class_name, fltr_id): + """ + Ensures, that cimom has ``fltr_id`` filter registered as instance. + If it has, but the query differs it is recreated at broker. + + :param class_name (``string``) *Scoping class* name. + :param fltr_id (``string``) Indication name. + """ + inst = self._get_filter_inst(class_name, fltr_id) + ch = self._env.get_cimom_handle() + installed = inst is not None + referents = [] + if installed: + for prop_name, val in self._filters[class_name][fltr_id].items(): + if inst[prop_name] != val: + cmpi_logging.logger.info("filter \"%s\" is installed, but" + " its property \"%s\" has outdated value;" + " removing...", fltr_id, prop_name) + referents = remove_cimom_filter(ch, inst.path) + installed = False + if not installed: + if inst is not None: + path = inst.path + else: + path = make_indication_filter_path(class_name, fltr_id, + self.ns_interop) + inst = pywbem.CIMInstance(path.classname, path=path) + kwargs = FILTER_DEFAULTS.copy() + for key, val in path.keybindings.items(): + kwargs[key] = val + kwargs.update(self._filters[class_name][fltr_id]) + inst.update(kwargs) + try: + inst = ch.CreateInstance(inst) + cmpi_logging.logger.info("filter \"%s\" installed", fltr_id) + except pywbem.CIMError: + cmpi_logging.logger.exception( + "failed to install indication filter \"%s\"", + fltr_id) + if referents: + cmpi_logging.logger.debug('reinstalling %d filter' + ' subscriptions', len(referents)) + for ref in referents: + ch.CreateInstance(ref) + return inst + + @cmpi_logging.trace_method + def _get_matching_filter(self, query): + """ + Try to find matching filter properties in local ``_filters`` storage + and return it. ``None`` is returned if not found. + + Return a tuple ``(class_name, filter_id, filter_properties)``. + + :param query (``string``) Is filter query. + """ + if not isinstance(query, basestring): + raise TypeError("query must be a string") + for clsname, fltrs in self._filters.iteritems(): + for fltr_id, props in fltrs.iteritems(): + if query == props["Query"]: + return (clsname, fltr_id, props) + return None + + @cmpi_logging.trace_method + def ensure_filters_installed(self, class_name=None, fltr_id=None): + """ + This function checks for existence of filters at broker. Filters + must be registered with this instance before the check can be done. + Without arguments all registered filters will be checked. + + :param class_name (``string``) Name of *Scoped class* that reduces + searched filters. + :param fltr_id (``string``) Indication name reducing filters that + will be checked. + """ + cls_to_check = self._filters.keys() + if class_name is not None: + cls_to_check = [class_name] + filters_to_check = list( + (c, f) + for c in cls_to_check + for f in self._filters[c].keys() + if fltr_id is None or fltr_id == f) + with self._access_lock: + try: + for clsname, fltr_id in filters_to_check: + self._ensure_cimom_has_filter(clsname, fltr_id) + cmpi_logging.logger.debug('filters installed') + return True + except pywbem.CIMError as exc: + if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED: + cmpi_logging.logger.error("filters could not be checked" + " for presence due to invalid context") + return False + raise + + @cmpi_logging.trace_method + def update_context(self, env): + """ + .. _update_context-label + + When ``IndicationManager`` is initialized upon provider initialization, + the conxet given does not contain any user credentials that are + needed for communication with broker. In order to check for filter's + existence at broker, this method needs to be called first with + context containing user's credentials. + + This needs to be called only once. + + **Note** that if you don't plan to check for filter's presence at + broker at runtime, you are not interested in this function. + """ + with self._access_lock: + self._env = env + + @cmpi_logging.trace_method + def add_filters(self, class_name, filters, ensure_installed=False): + """ + Add new filters to the helper. These filters will be allowed for + subscription. + + :param filters: (``dictionary filter_id -> filter properties``) + The filters. ``filter properties`` is dictionary + ``property_name -> value``, where at least ``Query`` property + must be set. ``Name`` property will be automatically created + as 'LMI:<class_name>:<filter_id>'. + :param ensure_installed (``bool``) Whether to check for filter presence + at broker and install them if missing. **Note** That in order + for this to work, the context must be updated with user's + credentials. See :ref:`update_context-label`. + """ + with self._access_lock: + if not class_name in self._filters: + self._filters[class_name] = pywbem.NocaseDict() + self._filters[class_name].update(filters) + if ensure_installed: + self.ensure_filters_installed(class_name=class_name) + + @cmpi_logging.trace_method + def authorize_filter(self, _env, fltr, _ns, _classes, _owner): + """ + AuthorizeFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + """ + with self._access_lock: + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.add((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s authorized", + make_filter_name(res[0], res[1]), fltr) + return True + return False + + @cmpi_logging.trace_method + def activate_filter(self, _env, fltr, _ns, _classes, first_activation): + """ + ActivateFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + """ + with self._access_lock: + if not first_activation: + return + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.add((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s started", + make_filter_name(res[0], res[1]), fltr) + + @cmpi_logging.trace_method + def deactivate_filter(self, _env, fltr, _ns, _classes, last_activation): + """ + DeactivateFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + """ + with self._access_lock: + if not last_activation: + return + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.remove((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s stopped", + make_filter_name(res[0], res[1]), fltr) + + @cmpi_logging.trace_method + def enable_indications(self, _env): + """ + EnableIndications callback from CIMOM. Call this method from + appropriate CIMOM callback. + """ + with self._access_lock: + self._enabled = True + cmpi_logging.logger.info("Indications enabled") + + @cmpi_logging.trace_method + def disable_indications(self, _env): + """ + EnableIndications callback from CIMOM. Call this method from + appropriate CIMOM callback. + """ + with self._access_lock: + self._enabled = False + cmpi_logging.logger.info("Indications disabled") + + @cmpi_logging.trace_method + def send_indication(self, indication): + """ + Send indication to all subscribers. Call this method from appropriate + CIMOM callback. + """ + self._queue.put(indication) + + @cmpi_logging.trace_method + def send_instcreation(self, instance, filter_id): + """ + Send ``LMI_<nameprefix>InstCreation`` indication with given instance. + + :param instance: (``CIMInstance``) The created instance. + :param filter_id: (``string``) The ID of registered filter which + corresponds to this indication. + """ + if not self.is_subscribed(instance.classname, filter_id): + return + path = pywbem.CIMInstanceName( + classname=self.instcreation_classname, + namespace=self.namespace) + ind = pywbem.CIMInstance( + self.instcreation_classname, + path=path) + ind['SourceInstance'] = instance + ind['SourceInstanceHost'] = socket.gethostname() + ind['SourceInstanceModelPath'] = str(instance.path) + ind['IndicationFilterName'] = make_filter_name(instance.classname, filter_id) + ind['PerceivedSeverity'] = self.SEVERITY_INFO + + cmpi_logging.logger.info("Sending indication %s for %s" % + (ind["IndicationFilterName"], str(path))) + self.send_indication(ind) + + @cmpi_logging.trace_method + def send_instmodification(self, old_instance, new_instance, filter_id): + """ + Send ``LMI_<nameprefix>InstModification`` indication with given + instance. + + :param old_instance: (``CIMInstance``) The instance before + modification. + :param new_instance: (``CIMInstance``) The instance after modification. + :param filter_id: (``string``) The ID of registered filter which + corresponds to this indication. + """ + if not self.is_subscribed(new_instance.classname, filter_id): + return + path = pywbem.CIMInstanceName( + classname=self.instmodification_classname, + namespace=self.namespace) + ind = pywbem.CIMInstance( + self.instcreation_classname, + path=path) + ind['SourceInstance'] = new_instance + ind['PreviousInstance'] = old_instance + ind['SourceInstanceHost'] = socket.gethostname() + ind['SourceInstanceModelPath'] = str(new_instance.path) + ind['IndicationFilterName'] = make_filter_name( + new_instance.classname, filter_id) + ind['PerceivedSeverity'] = self.SEVERITY_INFO + + cmpi_logging.logger.info("Sending indication %s for %s" % + (ind["IndicationFilterName"], str(path))) + self.send_indication(ind) + + @cmpi_logging.trace_method + def is_subscribed(self, class_name, fltr_id): + """ + Return True, if there is someone subscribed for given filter. + + :param fltr_id: (``string``) ID of the filter to check. + """ + with self._access_lock: + if not self._enabled: + return False + if (class_name, fltr_id) in self._subscribed_filters: + return True + return False + + @cmpi_logging.trace_method + def is_registered(self, class_name, fltr_id): + """ + Return True, if filter id has been registered with current instance. + + :param fltr_id: (``string``) ID of the filter to check. + """ + with self._access_lock: + return ( class_name in self._filters + and fltr_id in self._filters[class_name]) + + @cmpi_logging.trace_method + def _send_indications_loop(self, broker): + """ + This method runs in its own thread. It just sends all enqueued + indications. + + :param broker: (``BrokerCIMOMHandle``) Handle of the CIMOM. + """ + broker.AttachThread() + while True: + indication = self._queue.get() + cmpi_logging.logger.trace_info("Delivering indication %s" % + (str(indication.path))) + broker.DeliverIndication(self.namespace, indication) + if hasattr(self._queue, "task_done"): + self._queue.task_done() diff --git a/src/software/openlmi/software/yumdb/jobmanager.py b/src/software/openlmi/software/yumdb/jobmanager.py index 872e81f..7283d05 100644 --- a/src/software/openlmi/software/yumdb/jobmanager.py +++ b/src/software/openlmi/software/yumdb/jobmanager.py @@ -19,9 +19,16 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -This is a module for JobManager which is a separate thread of -YumWorker process. It keeps a cache of asynchronous jobs and handles +This is a module for ``JobManager`` which is a separate thread of +``YumWorker`` process. It keeps a cache of asynchronous jobs and handles input and output queues. + +This module uses its own logging facilities because it runs in separeted +process not having access to broker logging features. + +Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should +be set to callable taking ``YumJob`` instance and returning +``LMI_SoftwareInstallationJob``. """ import heapq import inspect @@ -32,14 +39,86 @@ import threading import time import traceback +from openlmi.common import cmpi_logging +from openlmi.common.IndicationManager import IndicationManager from openlmi.software.yumdb import errors, jobs from openlmi.software.yumdb.util import trace_function +# This is a callable, which must be initialized before JobManager is used. +# It should be a pointer to function, which takes a job and returns +# corresponding CIM instance. It's used for sending indications. +JOB_TO_MODEL = lambda job: None +JOB_CLASSNAME = "LMI_SoftwareInstallationJob" + # Minimum time to keep asynchronous job in cache after completion. In seconds. MINIMUM_TIME_BEFORE_REMOVAL = 10 +# replacement for cmpi_logging.logger LOG = None +IND_JOB_PERCENT_UPDATED = "PercentUpdated" +IND_JOB_SUCCEEDED = "Succeeded" +IND_JOB_FAILED = "Failed" +IND_JOB_CHANGED = "Changed" +IND_JOB_CREATED = "Created" + +IND_FILTERS = { + IND_JOB_PERCENT_UPDATED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::PercentComplete <> " + "PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for a " + "Concrete Job.", + }, + IND_JOB_SUCCEEDED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance ISA LMI_SoftwareInstallationJob AND " + "SourceInstance.CIM_ConcreteJob::JobState = 17", + # symbolic constants not supported by sfcb + #"CIM_ConcreteJob.JobState#'Completed'" + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'OK'.", + }, + IND_JOB_FAILED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState = 10", + # symbolic constants not supported by sfcb + #"CIM_ConcreteJob.JobState#'Exception'", + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'Error'.", + }, + IND_JOB_CHANGED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState <> " + "PreviousInstance.CIM_ConcreteJob::JobState", + "Description": "Modification of Job State for a ConcreteJob.", + }, + IND_JOB_CREATED: { + "Query" : "SELECT * FROM CIM_InstCreation WHERE " + "SourceInstance ISA %(classname)s", + "Description": "Creation of a ConcreteJob.", + }, +} + +@cmpi_logging.trace_function +def register_filters(indication_manager): + """ + This function registers static indication filters at IndicationManager. + It should be called upon provider's initialization. + """ + to_register = {} + for fltr_id, fltr in IND_FILTERS.items(): + if not indication_manager.is_registered(JOB_CLASSNAME, fltr_id): + fltr["Query"] = fltr["Query"] % {"classname" : JOB_CLASSNAME } + to_register[fltr_id] = fltr + if to_register: + indication_manager.add_filters(JOB_CLASSNAME, + to_register, ensure_installed=False) + # ***************************************************************************** # Decorators # ***************************************************************************** @@ -84,6 +163,119 @@ def job_handler(job_from_target=True): else: return _simple_wrapper +class JobIndicationSender(object): + """ + Makes creation and sending of indications easy. It keeps a reference + to job, which can be *snapshotted* for making CIM instance out of it. + These instances are then used to send indications via IndicationManager. + + Typical usage:: + + sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2]) + ... # modify job + sender.snapshot() + sender.send() + + **Note** that number of kept CIM instances won't exceed 2. First one + is created upon instantiation and the second one be calling + ``snapshot()``. Any successive call to ``snapshot()`` will overwrite + the second instance. + """ + + def __init__(self, indication_manager, job, + indications=IND_JOB_CHANGED, new=None): + """ + :param job (``YumJob``) Is job instance, which will be immediately + snapshoted as old instance and later as a new one. + :param indications (``list``) Can either be a list of indication ids + or a single indication id. + :param new (``YumJob``) A job instance stored as new. + """ + if not isinstance(indication_manager, IndicationManager): + raise TypeError("indication_manager must be a subclass of" + " IndicationManager") + if not isinstance(job, jobs.YumJob): + raise TypeError("job must be an instance of YumJob") + if not new is None and not isinstance(new, jobs.YumJob): + raise TypeError("new must be an instance of YumJob") + self._indication_manager = indication_manager + self._job = job + self._old_instance = JOB_TO_MODEL(job) + if new is not None: + new = JOB_TO_MODEL(job) + self._new_instance = new + self._indications = set() + self.indication_ids = indications + + @property + def job(self): + """ + Return instance of ``YumJob``. + """ + return self._job + + @property + def indication_ids(self): + """ + Return set of indication filter IDs. + """ + return self._indications.copy() + + @indication_ids.setter + def indication_ids(self, indication_ids): + """ + Set the indication filter IDs. + + :param indication_ids (``list``) Can be even single id. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications = set(indication_ids) + + @trace_function + def add_indication_ids(self, indication_ids): + """ + Add filter IDs. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications.update(indication_ids) + + @trace_function + def snapshot(self): + """ + Make a second CIM instance, overwriting previous one (not the first). + """ + self._new_instance = JOB_TO_MODEL(self._job) + + @trace_function + def send(self, make_snapshot=False): + """ + Send all requested indications for given job. + """ + if not self._indications: + raise errors.IndicationError( + "can not send any indication without id") + if make_snapshot: + self.snapshot() + if ( IND_JOB_CHANGED in self._indications + and self._new_instance is None): + raise errors.IndicationError("no snapshot made for modified job") + for fltr_id in self._indications: + if fltr_id == IND_JOB_CREATED: + LOG.debug("sending instance creation indication for job %s", + self._job) + self._indication_manager.send_instcreation( + self._new_instance if self._new_instance is not None + else self._old_instance, + fltr_id) + else: + LOG.debug("sending instance modification indication for job %s" + " with ID: %s", self._job, fltr_id) + self._indication_manager.send_instmodification( + self._old_instance, self._new_instance, + fltr_id) + class JobManager(threading.Thread): """ Separate thread for managing queue of jobs requested by client. @@ -105,10 +297,11 @@ class JobManager(threading.Thread): ACTION_NAMES = ['remove'] - def __init__(self, queue_in, queue_out): + def __init__(self, queue_in, queue_out, indication_manager): threading.Thread.__init__(self, name="JobManager") self._queue_in = queue_in self._queue_out = queue_out + self._indication_manager = indication_manager self._terminate = False # (time, jobid, action) @@ -175,7 +368,9 @@ class JobManager(threading.Thread): LOG.debug('job %s enqued for YumWorker to handle', job) heapq.heappush(self._job_queue, job) if getattr(job, 'async', False) is True: + ind = self._prepare_indication_for(job, IND_JOB_CREATED) self._async_jobs[job.jobid] = job + ind.send() self._job_enqueued.notify() @trace_function @@ -214,6 +409,14 @@ class JobManager(threading.Thread): msg = "unsupported action: %s" % action raise ValueError(msg) + @trace_function + def _prepare_indication_for(self, job, *args, **kwargs): + """ + Return instance of ``JobIndicationSender``. + """ + return JobIndicationSender(self._indication_manager, job, + *args, **kwargs) + # ************************************************************************* # Job handlers # ************************************************************************* @@ -245,9 +448,11 @@ class JobManager(threading.Thread): if not isinstance(new_priority, (int, long)): raise TypeError('priority must be an integer') if job.priority != new_priority: + ind = self._prepare_indication_for(job) job.update(priority=new_priority) if job in self._job_queue: heapq.heapify(self._job_queue) + ind.send(True) return job @job_handler() @@ -266,6 +471,7 @@ class JobManager(threading.Thread): del self._calendar[i] heapq.heapify(self._calendar) break + ind = self._prepare_indication_for(job) if delete_on_completion: schedule_at = time_before_removal if job.finished: @@ -273,6 +479,7 @@ class JobManager(threading.Thread): self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE) job.delete_on_completion = delete_on_completion job.time_before_removal = time_before_removal + ind.send(True) return job @job_handler() @@ -280,7 +487,9 @@ class JobManager(threading.Thread): """ Updates any job metadata. """ + ind = self._prepare_indication_for(job) job.update(**data) + ind.send(True) return job @job_handler() @@ -311,7 +520,9 @@ class JobManager(threading.Thread): raise errors.InvalidJobState('job "%s" already finished' % job) self._job_queue.remove(job) heapq.heapify(self._job_queue) + ind = self._prepare_indication_for(job) job.finish(result=job.RESULT_TERMINATED) + ind.send(True) LOG.info('terminated not started job "%s"', job) return job @@ -344,6 +555,9 @@ class JobManager(threading.Thread): if job.state != job.RUNNING: raise errors.InvalidJobState( 'can not finish not started job "%s"' % job) + if getattr(job, 'async', False): + ind = self._prepare_indication_for(job, + (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED)) job.finish(result, result_data) if getattr(job, 'async', False): if job.delete_on_completion: @@ -351,6 +565,11 @@ class JobManager(threading.Thread): , MINIMUM_TIME_BEFORE_REMOVAL) self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE) + if result == job.RESULT_SUCCESS: + ind.add_indication_ids(IND_JOB_SUCCEEDED) + elif result == job.RESULT_ERROR: + ind.add_indication_ids(IND_JOB_FAILED) + ind.send(True) else: LOG.debug("sending reply for %s: (%s, %s)", job, job.ResultNames[job.result], job.result_data) @@ -377,7 +596,13 @@ class JobManager(threading.Thread): raise Queue.Empty job = heapq.heappop(self._job_queue) if job is not None: - job.start() + if getattr(job, "async", False): + ind = self._prepare_indication_for(job, + (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED)) + job.start() + ind.send(True) + else: + job.start() return job def run(self): |