diff options
Diffstat (limited to 'src')
27 files changed, 2673 insertions, 70 deletions
diff --git a/src/python/openlmi/common/IndicationManager.py b/src/python/openlmi/common/IndicationManager.py new file mode 100644 index 0000000..dbe25f3 --- /dev/null +++ b/src/python/openlmi/common/IndicationManager.py @@ -0,0 +1,686 @@ +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek <jsafrane@redhat.com> +# Authors: Michal Minar <miminar@redhat.com> +# -*- coding: utf-8 -*- +""" + .. autoclass:: IndicationManager + :members: +""" + +import pywbem +from Queue import Queue +import re +import socket +import threading + +import openlmi.common.cmpi_logging as cmpi_logging +from openlmi.common import singletonmixin + +RE_FILTER_NAME = re.compile(r'^(?P<prefix>lmi:' + r'(?P<class_name>[a-z0-9_]+):)(?P<filter_id>.*)$', re.IGNORECASE) + +FILTER_DEFAULTS = { + "SourceNamespaces" : ["root/cimv2"], + "QueryLanguage" : "CIM:CQL" +} + +@cmpi_logging.trace_function +def enumerate_namespaces(ch): + """ + Return tuple ``([CIM_Namespace instance, ...], ns_interop)``. Where + first item is a list of object paths of all namespaces in broker and + the second is a name of namespace, where this information can be found. + + :param ch: CIMOM handle. + """ + nsclasses = ["CIM_Namespace", "__Namespace"] + namespaces = ['root/cimv2', 'root/PG_InterOp', 'Interop', + 'interop', 'root', 'root/interop'] + nspaths = [] + ns = None + for cls in nsclasses: + for ns in namespaces: + try: + nspaths = [nm for nm in ch.EnumerateInstanceNames(ns, cls)] + if nspaths: + break + except pywbem.CIMError as exc: + if exc[0] in ( + pywbem.CIM_ERR_INVALID_NAMESPACE, + pywbem.CIM_ERR_NOT_SUPPORTED, + pywbem.CIM_ERR_INVALID_CLASS): + pass + if exc[0] == pywbem.CIM_ERR_FAILED: + cmpi_logging.logger.error("EnumerateInstanceNames failed" + " for %s:%s: %s", ns, cls, str(exc)) + else: + raise + if nspaths: + break + if not nspaths: + cmpi_logging.logger.error("failed to enumerate namespaces") + ns = None + return (nspaths, ns) + +@cmpi_logging.trace_function +def find_ns_interop(ch): + """ + Return name of interop namespace, where ``CIM_IndicationFilter`` + class reside. + + :param ch: CIMOM handle. + """ + _, ns_interop = enumerate_namespaces(ch) + return ns_interop + +def make_filter_name(class_name, fltr_id): + """ + Return value for ``CIM_IndicationFilter.Name`` property. + """ + return "LMI:%s:%s" % (class_name, fltr_id) + +def parse_filter_name(name): + """ + Return tuple ``(class_name, filter_id)``. + + :param name: (``string``) Value of cim filter's *Name* property. + """ + match = RE_FILTER_NAME.match(name) + if not match: + raise ValueError('Could not parse filter name: "%s"' % name) + return (match.group("class_name"), match.group("filter_id")) + +@cmpi_logging.trace_function +def make_indication_filter_path(class_name, fltr_id, ns_interop): + """ + Return CIM_IndicationFilter instance path for given filter id. + + :param class_name: (``string``) *Scoped class* name. + :param fltr_id: (``string``) Filter name. + :param ns_interop: (``string``) Interop namespace. + """ + for arg in ('class_name', 'fltr_id', 'ns_interop'): + if not isinstance(locals()[arg], basestring): + raise TypeError("%s must be basestring" % arg) + cop = pywbem.CIMInstanceName("CIM_IndicationFilter", + namespace=ns_interop) + cop['CreationClassName'] = 'CIM_IndicationFilter' + cop['SystemCreationClassName'] = 'CIM_ComputerSystem' + cop['SystemName'] = socket.gethostname() + cop['Name'] = make_filter_name(class_name, fltr_id) + return cop + +@cmpi_logging.trace_function +def remove_cimom_filter(ch, fltr_path): + """ + Deletes instance of CIM_IndicationFilter installed at broker with all + referencing subscriptions. + + Returns list of subscription instace names, that were deleted. + + :param ch: CIMOM handle. + :param fltr_path: (``CIMInstanceName``) Path of ``CIM_IndicationFilter`` to + remove. + """ + if not isinstance(fltr_path, pywbem.CIMInstanceName): + raise TypeError("fltr_path must be a CIMInstanceName") + + referents = [] + for ref in ch.AssociatorNames(fltr_path, + role="Filter", + resultRole="Handler", + resultClass="CIM_IndicationSubscription"): + ch.DeleteInstance(ref) + referents.append(ref) + ch.DeleteInstance(fltr_path) + cmpi_logging.logger.debug('removed indication filter "%s" with %d' + ' referents', fltr_path["Name"], len(referents)) + return referents + +class IndicationManager(singletonmixin.Singleton): + """ + Using ``IndicationManager`` class + providers can send indications without bothering with handling of + indication subscriptions. + + Usage: + + 1. Subclass CIM_InstCreation and CIM_InstModification. + + 2. In your initialization routine, create one ``IndicationManager`` + instance. E.g. one for whole ``LMI_Storage`` may is enough. Like + this:: + + indication_manager = \ + IndicationManager.get_instance(env, "Storage", "root/cimv2") + + 3. Call ``indication_manager.add_filters()`` with all filters your + providers support for particular CIM class. This method can be called + multiple times. + For example:: + + filters = { + "JobPercentUpdated": { + "Query" : "SELECT * FROM CIM_InstModification WHERE" + " SourceInstance ISA LMI_StorageJob AND" + " SourceInstance.CIM_ConcreteJob::PercentComplete <>" + " PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for" + " a Concrete Job.", + }, + "JobSucceeded": { + "Query" : "SELECT * FROM CIM_InstModification WHERE" + " SourceInstance ISA LMI_StorageJob AND" + " SourceInstance.CIM_ConcreteJob::JobState = " + " CIM_ConcreteJob.JobState#'Completed'", + "Description": "Modification of Operational Status for" + " a Concrete Job to 'Complete' and 'OK'.", + }, + #... other indications + } + instance_manager.add_filters("LMI_StorageJob", filters) + + First argument is a name of class to which indications apply. We'll call + it *Scoping class*. + + 4. In your provider module, implement indication functions like this:: + + def authorize_filter(env, fltr, ns, classes, owner): + indication_manager.authorize_filter(env, fltr, ns, classes, owner) + + def activate_filter (env, fltr, ns, classes, first_activation): + indication_manager.activate_filter(env, fltr, ns, classes, + first_activation) + + def deactivate_filter(env, fltr, ns, classes, last_activation): + indication_manager.deactivate_filter(env, fltr, ns, classes, + last_activation) + + def enable_indications(env): + indication_manager.enable_indications(env) + + def disable_indications(env): + indication_manager.disable_indications(env) + + From now on, the ``IndicationManager`` will track all subscribed filters. + You can query the ``indication_manager.is_subscribed()`` before you create + and send an indication. Use ``indication_manager.send_indication()`` + to send your indications. + + Only static (=preconfigured, read-only) indication filters are + supported. + + For user to use these preconfigured filters, they need to be installed + at broker as instances of ``CIM_IndicationFilter``. But since they can + not be guarded against removel by accident, this object provides a way + to reinstall them. But using this is not recomended, since it can upset + users. See :ref:`_update_context-label`. + + The supported filters must be passed to add_filters method. The filters + are passed as dictionary ``'filter_id' -> {dictionary 'IndicationFilter + property' -> 'value'}``. There must be at least ``Query`` property in + each filter, CQL is assumed. + + This helper automatically tracks which filters are subscribed. Provider + can query ``is_subscribed()`` to check, if filter with given + ``filter_id`` is subscribed before generating indications. + + The CMPI interface to send indications is complicated - + when an indication is send from CIMOM callback (e.g. ``get_instance``), + it must use current ``env`` parameter of the callback and it would be + tedious to pass it to ``IndicationManager`` each time. Therefore + ``IndicationManager`` creates its own thread, registers it at CIMOM + using ``PrepareAttachThread``/``AttachThread``. + + As side-effect, indication can be sent from any thread, there is no + need to call ``PrepareAttachThread``/``AttachThread``. + """ + SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity + + def __init__(self, env, nameprefix, namespace, ns_interop=None, + queue=None): + """ + Create new ``IndicationManager``. Usually only one instance + is necessary for one provider process. + + :param env: (``ProviderEnvironment``) Provider enviroment, taken + from CIMOM callback (e.g. ``get_providers()``). + :param nameprefix: (``string``) Prefix of your ``CIM_InstCreation`` + and ``CIM_InstModification`` subclasses, e.g. 'Storage' for + ``LMI_StorageInstCreation``. + :param namespace: (``string``) Namespace, which will be set to + outgoing indications instances. + :param ns_interop: (``string``) Namespace, where filters and + subscriptions are stored. + :param queue: Optional custom input queue with the same interface as + ``Queue.Queue``. + """ + + # { class_name : + # { filter_id : filter_properties + # , ... } + # } + self._filters = pywbem.NocaseDict() + self._enabled = False + # { (class_name, filter_id), ... } + self._subscribed_filters = set() + self._nameprefix = nameprefix + self._namespace = namespace + self._ns_interop = ns_interop + self._access_lock = threading.RLock() + self._env = env + + if queue is None: + queue = Queue() + self._queue = queue + # prepare indication thread + ch = env.get_cimom_handle() + new_broker = ch.PrepareAttachThread() + self._indication_sender = threading.Thread( + target=self._send_indications_loop, args=(new_broker,)) + self._indication_sender.daemon = True + self._indication_sender.start() + + @property + def enabled(self): + """ + Return a boolean saying, whether indication sending is enabled. + """ + with self._access_lock: + return self.enabled + + @property + def namespace(self): + """ + Return namespace of outgoing indication instances. + """ + return self._namespace + + @property + def nameprefix(self): + """ + Return prefix of indication class names. + """ + return self._nameprefix + + @property + def ns_interop(self): + """ + Return interop namespace name. + """ + with self._access_lock: + if self._ns_interop is None: + ch = self._env.get_cimom_handle() + self._ns_interop = find_ns_interop(ch) + cmpi_logging.logger.info('found interop namespace: %s', + self._ns_interop) + return self._ns_interop + + @property + def instcreation_classname(self): + """ + Return whole class name of InstCreation indication. + """ + return "LMI_" + self._nameprefix + "InstCreation" + + @property + def instmodification_classname(self): + """ + Return whole class name of InstModification indication. + """ + return "LMI_" + self._nameprefix + "InstModification" + + @property + def instdeletetion_classname(self): + """ + Return whole class name of InstDeletion indication. + """ + return "LMI_" + self._nameprefix + "InstDeletion" + + @cmpi_logging.trace_method + def _get_filter_inst(self, class_name, fltr_id): + """ + Return instance of CIM_IndicationFilter registered in CIMOM if any. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) Indication name. + """ + ch = self._env.get_cimom_handle() + cop = make_indication_filter_path(class_name, fltr_id, self.ns_interop) + try: + return ch.GetInstance(cop) + except pywbem.CIMError as exc: + if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND: + return None + raise + + @cmpi_logging.trace_method + def _ensure_cimom_has_filter(self, class_name, fltr_id): + """ + Ensures, that cimom has ``fltr_id`` filter registered as instance. + If it has, but the query differs it is recreated at broker. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) Indication name. + """ + inst = self._get_filter_inst(class_name, fltr_id) + ch = self._env.get_cimom_handle() + installed = inst is not None + referents = [] + if installed: + for prop_name, val in self._filters[class_name][fltr_id].items(): + if inst[prop_name] != val: + cmpi_logging.logger.info("filter \"%s\" is installed, but" + " its property \"%s\" has outdated value;" + " removing...", fltr_id, prop_name) + referents = remove_cimom_filter(ch, inst.path) + installed = False + if not installed: + if inst is not None: + path = inst.path + else: + path = make_indication_filter_path(class_name, fltr_id, + self.ns_interop) + inst = pywbem.CIMInstance(path.classname, path=path) + kwargs = FILTER_DEFAULTS.copy() + for key, val in path.keybindings.items(): + kwargs[key] = val + kwargs.update(self._filters[class_name][fltr_id]) + inst.update(kwargs) + try: + inst = ch.CreateInstance(inst) + cmpi_logging.logger.info("filter \"%s\" installed", fltr_id) + except pywbem.CIMError: + cmpi_logging.logger.exception( + "failed to install indication filter \"%s\"", + fltr_id) + if referents: + cmpi_logging.logger.debug('reinstalling %d filter' + ' subscriptions', len(referents)) + for ref in referents: + ch.CreateInstance(ref) + return inst + + @cmpi_logging.trace_method + def _get_matching_filter(self, query): + """ + Try to find matching filter properties in local ``_filters`` storage + and return it. ``None`` is returned if not found. + + Return a tuple ``(class_name, filter_id, filter_properties)``. + + :param query: (``string``) Is filter query. + """ + if not isinstance(query, basestring): + raise TypeError("query must be a string") + for clsname, fltrs in self._filters.iteritems(): + for fltr_id, props in fltrs.iteritems(): + if query == props["Query"]: + return (clsname, fltr_id, props) + return None + + @cmpi_logging.trace_method + def ensure_filters_installed(self, class_name=None, fltr_id=None): + """ + This function checks for existence of filters at broker. Filters + must be registered with this instance before the check can be done. + Without arguments all registered filters will be checked. + + :param class_name: (``string``) Name of *Scoped class* that reduces + searched filters. + :param fltr_id: (``string``) Indication name reducing filters that + will be checked. + """ + cls_to_check = self._filters.keys() + if class_name is not None: + cls_to_check = [class_name] + filters_to_check = list( + (c, f) + for c in cls_to_check + for f in self._filters[c].keys() + if fltr_id is None or fltr_id == f) + with self._access_lock: + try: + for clsname, fltr_id in filters_to_check: + self._ensure_cimom_has_filter(clsname, fltr_id) + cmpi_logging.logger.debug('filters installed') + return True + except pywbem.CIMError as exc: + if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED: + cmpi_logging.logger.error("filters could not be checked" + " for presence due to invalid context") + return False + raise + + @cmpi_logging.trace_method + def update_context(self, env): + """ + .. _update_context-label + + When ``IndicationManager`` is initialized upon provider initialization, + the conxet given does not contain any user credentials that are + needed for communication with broker. In order to check for filter's + existence at broker, this method needs to be called first with + context containing user's credentials. + + This needs to be called only once. + + **Note** that if you don't plan to check for filter's presence at + broker at runtime, you are not interested in this function. + """ + with self._access_lock: + self._env = env + + @cmpi_logging.trace_method + def add_filters(self, class_name, filters, ensure_installed=False): + """ + Add new filters to the helper. These filters will be allowed for + subscription. + + :param filters: (``dictionary filter_id -> filter properties``) + The filters. ``filter properties`` is dictionary + ``property_name -> value``, where at least ``Query`` property + must be set. ``Name`` property will be automatically created + as 'LMI:<class_name>:<filter_id>'. + :param ensure_installed: (``bool``) Whether to check for filter presence + at broker and install them if missing. **Note** That in order + for this to work, the context must be updated with user's + credentials. See :ref:`update_context-label`. + """ + with self._access_lock: + if not class_name in self._filters: + self._filters[class_name] = pywbem.NocaseDict() + self._filters[class_name].update(filters) + if ensure_installed: + self.ensure_filters_installed(class_name=class_name) + + @cmpi_logging.trace_method + def authorize_filter(self, _env, fltr, _ns, _classes, _owner): + """ + AuthorizeFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + """ + with self._access_lock: + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.add((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s authorized", + make_filter_name(res[0], res[1]), fltr) + return True + return False + + @cmpi_logging.trace_method + def activate_filter(self, _env, fltr, _ns, _classes, first_activation): + """ + ActivateFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + """ + with self._access_lock: + if not first_activation: + return + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.add((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s started", + make_filter_name(res[0], res[1]), fltr) + + @cmpi_logging.trace_method + def deactivate_filter(self, _env, fltr, _ns, _classes, last_activation): + """ + DeactivateFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + """ + with self._access_lock: + if not last_activation: + return + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.remove((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s stopped", + make_filter_name(res[0], res[1]), fltr) + + @cmpi_logging.trace_method + def enable_indications(self, _env): + """ + EnableIndications callback from CIMOM. Call this method from + appropriate CIMOM callback. + """ + with self._access_lock: + self._enabled = True + cmpi_logging.logger.info("Indications enabled") + + @cmpi_logging.trace_method + def disable_indications(self, _env): + """ + EnableIndications callback from CIMOM. Call this method from + appropriate CIMOM callback. + """ + with self._access_lock: + self._enabled = False + cmpi_logging.logger.info("Indications disabled") + + @cmpi_logging.trace_method + def send_indication(self, indication): + """ + Send indication to all subscribers. Call this method from appropriate + CIMOM callback. + """ + self._queue.put(indication) + + @cmpi_logging.trace_method + def send_instcreation(self, instance, filter_id): + """ + Send ``LMI_<nameprefix>InstCreation`` indication with given instance. + + :param instance: (``CIMInstance``) The created instance. + :param filter_id: (``string``) The ID of registered filter which + corresponds to this indication. + """ + if not self.is_subscribed(instance.classname, filter_id): + return + path = pywbem.CIMInstanceName( + classname=self.instcreation_classname, + namespace=self.namespace) + ind = pywbem.CIMInstance( + self.instcreation_classname, + path=path) + ind['SourceInstance'] = instance + ind['SourceInstanceHost'] = socket.gethostname() + ind['SourceInstanceModelPath'] = str(instance.path) + ind['IndicationFilterName'] = make_filter_name(instance.classname, filter_id) + ind['PerceivedSeverity'] = self.SEVERITY_INFO + + cmpi_logging.logger.info("Sending indication %s for %s" % + (ind["IndicationFilterName"], str(path))) + self.send_indication(ind) + + @cmpi_logging.trace_method + def send_instmodification(self, old_instance, new_instance, filter_id): + """ + Send ``LMI_<nameprefix>InstModification`` indication with given + instance. + + :param old_instance: (``CIMInstance``) The instance before + modification. + :param new_instance: (``CIMInstance``) The instance after modification. + :param filter_id: (``string``) The ID of registered filter which + corresponds to this indication. + """ + if not self.is_subscribed(new_instance.classname, filter_id): + return + path = pywbem.CIMInstanceName( + classname=self.instmodification_classname, + namespace=self.namespace) + ind = pywbem.CIMInstance( + self.instcreation_classname, + path=path) + ind['SourceInstance'] = new_instance + ind['PreviousInstance'] = old_instance + ind['SourceInstanceHost'] = socket.gethostname() + ind['SourceInstanceModelPath'] = str(new_instance.path) + ind['IndicationFilterName'] = make_filter_name( + new_instance.classname, filter_id) + ind['PerceivedSeverity'] = self.SEVERITY_INFO + + cmpi_logging.logger.info("Sending indication %s for %s" % + (ind["IndicationFilterName"], str(path))) + self.send_indication(ind) + + @cmpi_logging.trace_method + def is_subscribed(self, class_name, fltr_id): + """ + Return True, if there is someone subscribed for given filter. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) ID of the filter to check. + """ + with self._access_lock: + if not self._enabled: + return False + if (class_name, fltr_id) in self._subscribed_filters: + return True + return False + + @cmpi_logging.trace_method + def is_registered(self, class_name, fltr_id): + """ + Return True, if filter id has been registered with current instance. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) ID of the filter to check. + """ + with self._access_lock: + return ( class_name in self._filters + and fltr_id in self._filters[class_name]) + + @cmpi_logging.trace_method + def _send_indications_loop(self, broker): + """ + This method runs in its own thread. It just sends all enqueued + indications. + + :param broker: (``BrokerCIMOMHandle``) Handle of the CIMOM. + """ + broker.AttachThread() + while True: + indication = self._queue.get() + cmpi_logging.logger.trace_info("Delivering indication %s" % + (str(indication.path))) + broker.DeliverIndication(self.namespace, indication) + if hasattr(self._queue, "task_done"): + self._queue.task_done() diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py new file mode 100644 index 0000000..a923f29 --- /dev/null +++ b/src/python/openlmi/common/JobManager.py @@ -0,0 +1,1532 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek <jsafrane@redhat.com> +# -*- coding: utf-8 -*- +""" + Basic infrastructure for asynchronous jobs. All necessary CIM classes and + indications are implemented here. + + .. autoclass:: JobManager + :members: + + .. autoclass:: Job + :members: + + .. autoclass:: LMI_ConcreteJob + :members: + + .. autoclass:: LMI_OwningJobElement + :members: + + .. autoclass:: LMI_AffectedJobElement + :members: + + .. autoclass:: LMI_MethodResult + :members: + + .. autoclass:: LMI_AssociatedJobMethodResult + :members: +""" + +from datetime import datetime, timedelta +import threading +from Queue import Queue +import pywbem +import openlmi.common.cmpi_logging as cmpi_logging +from pywbem.cim_provider2 import CIMProvider2 +import socket + +# Too many instance attributes +# pylint: disable-msg=R0902 +class Job(object): + """ + Generic abstract class representing one CIM_ConcreteJob. + It remembers input and output arguments, affected ManagedElements and + owning ManagedElement (to be able to create associations to them) + and all CIM_ConcreteJob properties. + + Due to multiple threads processing the job, each job has its own + lock to guard its status changes. It is expected that number of jobs + is quite low. + """ + + DEFAULT_TIME_BEFORE_REMOVAL = 60 # in seconds + + STATE_QUEUED = 1 # Job has not started yet + STATE_RUNNING = 2 # Job is running + STATE_FINISHED_OK = 3 # Job finished OK + STATE_FAILED = 4 # Job finished with error + STATE_SUSPENDED = 5 # Job is queued and suspended + STATE_TERMINATED = 6 # Job was queued and terminated + + FINAL_STATES = [STATE_FINISHED_OK, STATE_FAILED, STATE_SUSPENDED, + STATE_TERMINATED] + + # There is no way how to suspend/terminate running job! + + @cmpi_logging.trace_method + def __init__(self, job_manager, job_name, input_arguments, + method_name, affected_elements, owning_element): + """ + Create new storage job. + + :param job_manager: (``JobManager``) Reference to ``JobManager``, which + will manage this job. + :param job_name: (``string``) User-friendly name of the job. + :param input_arguments: (``dictionary param_name -> param_value``) + Input arguments of the method, which spawned this job. + :param method_name: (``string``) Name of the CIM method, which spawned + this job. + :param affected_elements: (``array of CIMInstanceName``) List of + affected elements. ``LMI_AffectedJobElement`` association will be + created for them. + :param owning_element: (``CIMInstanceName``) Reference to service, which + spawned the job. ``LMI_OwningJobElement`` association will be + created for it. + """ + self.job_manager = job_manager + + # Unique ID + self.the_id = job_manager.get_next_id() + + # User friendly name of the job + self.job_name = job_name + + # Dictionary of input arguments, 'parameter_name' -> 'parameter_value' + # The parameter value must be CIMProperty or something that can be + # assigned to it. + self.input_arguments = input_arguments + + # Dictionary of output arguments, 'parameter_name' -> 'parameter_value' + # The parameter value must be CIMProperty or something that can be + # assigned to it. + self.output_arguments = None + + # Method return value, as CIMProperty or something that can be + # assigned to it. + self.return_value = None + # Value of Job.ReturnValueType + self.return_value_type = None + + # Name of the method + self.method_name = method_name + + # Time when the job was created + self.time_submitted = datetime.utcnow() + + # Nr. of seconds before the job is removed when the job finishes + self.time_before_removal = self.DEFAULT_TIME_BEFORE_REMOVAL + + # If the job should be removed after completion + self.delete_on_completion = True + + self.percent_complete = 0 + + # State of the job + self.job_state = self.STATE_QUEUED + + # Last change of job state + self.time_of_last_state_change = self.time_submitted + + # Duration of the job in RUNNING state + self.elapsed_time = None + + # When the job started (= switched to RUNNING) + self.start_time = None + # When the job finished (= switched from RUNNING) + self.finish_time = None + + # Array of CIMInstanceNames of affected elements, so we can + # enumerate associations to them. + self.affected_elements = affected_elements + + # CIMInstanceName to owning element (service), so we can enumerate + # instances. + self.owning_element = owning_element + + # Timer used to delete the job after time_before_removal seconds + self.timer = None + + # CIMError with result code + self.error = None + + # internal lock to protect state changes from races + self._lock = threading.RLock() + + self._execute = None + self._execargs = None + self._execkwargs = None + self._cancel = None + self._cancelargs = None + self._cancelkwargs = None + + @cmpi_logging.trace_method + def set_execute_action(self, callback, *args, **kwargs): + """ + Set callback, which will be called when the job is to be executed. It is + expected that the callback will take some time to execute. The callback + must change state of the job and set output parameters and error in a + thread-safe way, i.e. by calling ``finish_method()``. + + :param callback: (``function``) Reference to callback to call. + :param args, kwargs: All other parameters will be passed to the + callback. It is highly recommended to add reference to the job + to the callback. + """ + self._execute = callback + self._execargs = args + self._execkwargs = kwargs + + @cmpi_logging.trace_method + def set_cancel_action(self, callback, *args, **kwargs): + """ + Set callbacks, which will be called when the job is to be + cancelled. The callback must be quick, the job is already locked! + + :param callback: (``function``) Reference to callback to call. + :param args, kwargs: All other parameters will be passed to the + callback. It is highly recommended to add reference to the job + to the callback. + """ + self._cancel = callback + self._cancelargs = args + self._cancelkwargs = kwargs + + @cmpi_logging.trace_method + def finish_method(self, new_state, return_value=None, return_type=None, + output_arguments=None, error=None): + """ + Mark the job as finished, with given return value, output parameters and + error. + This method is thread-safe. + + :param new_state: (``Job.STATE_* value``) Resulting state of the job. + :param return_value: (``string``) Return value of the job, encoded + into string. Can be None when the job does not return any value. + :param return_type: (``Job.RetunValueType.* value``) Type of the return + value. Can be None when the job does not return any value. + :param output_arguments: (``dictionary param_name -> param_value``) + Output arguments of the job. Can be None when the job does not have + any output parameters. + :param error: (``CIMError``) Error raised by the job. Can be None, + when the job finished successfully. + """ + self.lock() + self.return_value = return_value + self.return_value_type = return_type + self.output_arguments = output_arguments + self.error = error + self.change_state(new_state, 100) + self.unlock() + + @cmpi_logging.trace_method + def change_state(self, new_state, percent=None): + """ + Change state of a job. (Re-)calculate various times based on the state + change. Send indications as necessary. + This method is thread-safe. + + :param new_state: (``Job.STATE_* value``) New state of the job. + It can be the same as the previous state to indicate progress of + the job. + :param percent: (``int``)) Percent complete of the job. When None, + this valuu will be automatically calculated (in progress = 50%, + finished = 100%). + """ + self.lock() + + cmpi_logging.logger.debug("Job %s: %s changes state from %d to %d" + % (self.the_id, self.job_name, self.job_state, new_state)) + + # For sending indications + prev_instance = None + send_indication = False + indication_ids = [] + + if self.job_state != new_state: + # Remember to send indications + prev_instance = self.job_manager.get_job_instance(self) + send_indication = True + indication_ids.append(JobManager.IND_JOB_CHANGED) + + # Check if the job has just finished + if (self.job_state not in self.FINAL_STATES + and new_state in self.FINAL_STATES): + # Remember finish time + self.finish_time = datetime.utcnow() + # Remember job execution time. + if self.start_time: + self.elapsed_time = self.finish_time - self.start_time + # Send indication + if self.job_state == self.STATE_FAILED: + indication_ids.append(JobManager.IND_JOB_FAILED) + if self.job_state == self.STATE_SUSPENDED: + indication_ids.append(JobManager.IND_JOB_SUCCEEDED) + + # Check if the job has just started + if new_state == self.STATE_RUNNING: + self.start_time = datetime.utcnow() + + self.time_of_last_state_change = datetime.now() + self.job_state = new_state + + if percent is None: + # guess the percentage from status + if new_state == self.STATE_QUEUED: + percent = 0 + elif new_state == self.STATE_RUNNING: + percent = 50 + else: + percent = 100 + if self.percent_complete != percent: + # Remember to send indications + if not send_indication: + self.time_of_last_state_change = datetime.now() + prev_instance = self.job_manager.get_job_instance(self) + send_indication = True + indication_ids.append(JobManager.IND_JOB_PERCENT_UPDATED) + self.percent_complete = percent + + if send_indication: + current_instance = self.job_manager.get_job_instance(self) + self.job_manager.send_modify_indications( + prev_instance, current_instance, indication_ids) + + # start / update the timer if necesasry + self._restart_timer() + self.unlock() + + @cmpi_logging.trace_method + def _expire(self): + """ + Callback when a Job completes and time_before_removal second passed. + The job gets removed from its JobManager. + """ + cmpi_logging.logger.debug("Got timeout for job %s: '%s', removing" + " the job" % (self.the_id, self.job_name)) + self.job_manager.remove_job(self) + + @cmpi_logging.trace_method + def _restart_timer(self): + """ + Re-schedule timer for TimeBeforeRemoval because some property has + changed. + """ + if not self.job_state in self.FINAL_STATES: + return + + # Stop the old timer. + if self.timer: + self.timer.cancel() + self.timer = None + + # Start the new timer. + if self.delete_on_completion: + now = datetime.utcnow() + passed = now - self.finish_time + timeout = self.time_before_removal - passed.total_seconds() + cmpi_logging.logger.debug("Starting timer for job %s: '%s' for %f" + " seconds" % (self.the_id, self.job_name, timeout)) + self.timer = threading.Timer(timeout, self._expire) + self.timer.start() + + @cmpi_logging.trace_method + def lock(self): + """ + Lock internal mutex. Other threads will block on subsequent lock(). + The lock is recursive, i.e. can be called multiple times from + single thread. + """ + self._lock.acquire() + + @cmpi_logging.trace_method + def unlock(self): + """ Unlock internal mutex.""" + self._lock.release() + + @cmpi_logging.trace_method + def execute(self): + """ + Start executing the job. It calls the execute callback, set by + ``set_execute_action()``. + + job_state must be already set to STATE_RUNNING. + Any exception is translated to CIMError and appropriate state is set. + """ + try: + self._execute(*(self._execargs), **(self._execkwargs)) + except pywbem.CIMError, error: + self.finish_method(Job.STATE_FAILED, error=error) + except Exception, ex: + error = pywbem.CIMError(pywbem.CIM_ERR_FAILED, str(ex)) + self.finish_method(Job.STATE_FAILED, error=error) + + @cmpi_logging.trace_method + def cancel(self): + """ + Cancels queued action. The action must have not been started. + """ + self.change_state(self.STATE_TERMINATED) + if self._cancel: + self._cancel(*(self._cancelargs), **(self._cancelkwargs)) + + @cmpi_logging.trace_method + def get_name(self): + """ + Return CIMInstanceName of the job. + + :rtype: ``CIMInstanceName`` + """ + name = pywbem.CIMInstanceName( + classname=self.job_manager.job_classname, + namespace=self.job_manager.namespace, + keybindings={ + 'InstanceID': self.get_instance_id() + }) + return name + + @cmpi_logging.trace_method + def get_instance_id(self, classname=None): + """ + Return InstanceID. + + :param classname: (``string``) Optional classname to generate InstanceID + for different class, e.g. for LMI_<name>MethodResult. + :rtype: ``string`` + """ + if classname is None: + classname = self.job_manager.job_classname + return 'LMI:' + classname + ':' + str(self.the_id) + + @staticmethod + def parse_instance_id(instance_id, job_manager, classname=None): + """ + Return the last part of instance_id. + + :param instance_id: (``string``) InstanceID to parse. + :param job_manager: (``JobManager``) JobManager to query for Job's + classname. + :param classname: (``string``) Optional classname. If not given, + JobManager's job_classname will be used for parsing. Other + classnames may be used to parse e.g. LMI_<name>MethodResult + InstanceIDs. + + :rtype: ``string`` or None if the ``instance_id`` has wrong format. + """ + if classname is None: + classname = job_manager.job_classname + parts = instance_id.split(":") + if len(parts) != 3: + return None + if parts[0] != 'LMI': + return None + if parts[1] != classname: + return None + if not parts[2].isdigit(): + return None + return parts[2] + + @cmpi_logging.trace_method + def get_pre_call(self): + """ + Return indication that describes the pre-execution values of the + job's invocation. + + :rtype: ``CIMInstance of CIM_InstMethodCall`` + """ + path = pywbem.CIMInstanceName( + classname="CIM_InstMethodCall", + keybindings={}, + host=socket.gethostname(), + namespace=self.job_manager.namespace) + inst = pywbem.CIMInstance( + classname="CIM_InstMethodCall", + path=path, + properties={ + 'MethodName' : self.method_name, + 'MethodParameters' : pywbem.CIMProperty( + name="MethodParameters", + type='instance', + value=self._get_method_params(False)), + 'PreCall' : True, + }) + src_instance = self._get_cim_instance() + inst['SourceInstance'] = src_instance + inst['SourceInstanceModelPath'] = str(src_instance.path) + return inst + + @cmpi_logging.trace_method + def get_post_call(self): + """ + Return indication that describes the post-execution values of the + job's invocation. + + :rtype: ``CIMInstance of CIM_InstMethodCall`` + """ + path = pywbem.CIMInstanceName( + classname="CIM_InstMethodCall", + keybindings={}, + host=socket.gethostname(), + namespace=self.job_manager.namespace) + inst = pywbem.CIMInstance( + classname="CIM_InstMethodCall", + path=path, + properties={ + 'MethodName' : self.method_name, + 'MethodParameters' : self._get_method_params(True), + 'PreCall' : False + }) + src_instance = self._get_cim_instance() + inst['SourceInstance'] = src_instance + inst['SourceInstanceModelPath'] = str(src_instance.path) + + if self.return_value_type is not None: + inst['ReturnValueType'] = self.return_value_type + if self.return_value is not None: + inst['ReturnValue'] = self.return_value + if self.error is not None: + inst['Error'] = self.error + return inst + + @cmpi_logging.trace_method + def _get_cim_instance(self): + """ + Return CIMInstance of this job. + + :rtype: CIMInstance + """ + return self.job_manager.get_job_instance(self) + + @cmpi_logging.trace_method + def _get_method_params(self, output=True): + """ + Assemble __MethodParameters for CIM_InstMethodCall indication. + + :rtype: CIMInstance of __MethodParameters. + """ + path = pywbem.CIMInstanceName( + classname="__MethodParameters", + namespace=self.job_manager.namespace, + keybindings={}) + inst = pywbem.CIMInstance(classname="__MethodParameters", path=path) + for (name, value) in self.input_arguments.iteritems(): + inst[name] = value + if output: + # overwrite any input parameter + for (name, value) in self.output_arguments.iteritems(): + inst[name] = value + return inst + + # pylint: disable-msg=R0903 + class ReturnValueType(object): + """ CIM_InstMethodCall.ReturnValueType values.""" + Boolean = pywbem.Uint16(2) + String = pywbem.Uint16(3) + Char16 = pywbem.Uint16(4) + Uint8 = pywbem.Uint16(5) + Sint8 = pywbem.Uint16(6) + Uint16 = pywbem.Uint16(7) + Sint16 = pywbem.Uint16(8) + Uint32 = pywbem.Uint16(9) + Sint32 = pywbem.Uint16(10) + Uint64 = pywbem.Uint16(11) + Sint64 = pywbem.Uint16(12) + Datetime = pywbem.Uint16(13) + Real32 = pywbem.Uint16(14) + Real64 = pywbem.Uint16(15) + Reference = pywbem.Uint16(16) + +class JobManager(object): + """ + Container of all queued, running or finished ``LMI_ConcreteJobs``. + + Usage: + + 1. Create MOF file for these classes: + + * ``LMI_<name>Job`` + + * ``LMI_<name>MethodResult`` + + * ``LMI_Affected<name>JobElement`` + + * ``LMI_Owning<name>JobElement`` + + * ``LMI_Associated<name>JobMethodResult`` + + Where ``<name>`` is prefix of your classes, for example 'Storage' + + 2. During initialization, create ``JobManager``. + + 3. When needed. create new Job instance: + + 4. Set its execute callback using ``set_execute_action()``. This callback + will be called when the job is to be executed. It will be called in + context of ``JobManager`` worker thread! + + 5. Optionally, set cancel callback using ``set_execute_action()``. This + callback will be called when the job is still queued and is cancelled by + application. This callback will be called in context of CIMOM callback + and should be quick! + + 6. Enqueue the job using ``JobManager.add_job()`` method. + + 7. When your execute callback is called, you can optionally call + ``job.change_state()`` to update percentage of completion. + + 8. When your execute callback is finished, don't forget to set method + result using ``job.finish_method()``. + + * ``JobManager`` automatically sends all job-related indications. + * ``Job`` automatically tracks various timestamps. + * By default, the job automatically disappears after 60 seconds after it + finishes. Application may set ``DeleteOnCompletion`` and + ``TimeBeforeRemoval`` properties of ``LMI_<name>Job`` to override this + timeout. + """ + + IND_JOB_PERCENT_UPDATED = "JobPercentUpdated" + IND_JOB_SUCCEEDED = "JobSucceeded" + IND_JOB_FAILED = "JobFailed" + IND_JOB_CHANGED = "JobChanged" + IND_JOB_CREATED = "JobCreated" + + @cmpi_logging.trace_method + def __init__(self, name, namespace, indication_manager): + """ + Initialize new Manager. It automatically registers all job-related + filters to indication_manager and starts a worker thread. + + :param name: (``string``) String with classname infix. For example + 'Storage' for ``LMI_StorageJob``, ``LMI_StorageJobMethodResult`` + etc. + :param namespace: (``string``) Namespace of all providers. + :param indication_manager: (``IndicationManager``): a manager where + indications and filters should be added. + """ + # List of all jobs. Dictionary job_id -> Job. + self.jobs = {} + # Queue of jobs scheduled to execute. + self.queue = Queue() + # Last created job_id. + self.last_instance_id = 0 + # Classname infix. + self.name = name + # CIMProvider2 instances for job classes. + self.providers = {} + self.namespace = namespace + self.indication_manager = indication_manager + + # Start the worker thread (don't forget to register it at CIMOM) + self.worker = threading.Thread(target=self._worker_main) + self.worker.daemon=True + self.worker.start() + + # Various classnames for job-related classes, with correct infixes. + self.job_classname = 'LMI_' + self.name + 'Job' + self.method_result_classname = "LMI_" + self.name + "MethodResult" + self.affected_classname = "LMI_Affected" + self.name + "JobElement" + self.owning_classname = "LMI_Owning" + self.name + "JobElement" + self.associated_result_classname = ('LMI_Associated' + self.name + + 'JobMethodResult') + self.indication_filter_classname = ('LMI_' + self.name + + 'JobIndicationFilter') + self.job_provider = None + self._add_indication_filters() + + @cmpi_logging.trace_method + def _add_indication_filters(self): + """ + Add all job-related ``IndicationFilters`` to indication manager. + """ + filters = { + self.IND_JOB_PERCENT_UPDATED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA CIM_ConcreteJob AND " + "SourceInstance.CIM_ConcreteJob::PercentComplete <> " + "PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for a " + "Concrete Job.", + }, + self.IND_JOB_SUCCEEDED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA CIM_ConcreteJob AND ANY " + "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 17 " + "AND ANY " + "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 2", + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'OK'.", + }, + self.IND_JOB_FAILED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA CIM_ConcreteJob AND ANY " + "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 17 " + "AND ANY " + "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 6", + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'Error'.", + }, + self.IND_JOB_CHANGED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA CIM_ConcreteJob AND " + "SourceInstance.CIM_ConcreteJob::JobState <> " + "PreviousInstance.CIM_ConcreteJob::JobState", + "Description": "Modification of Job State for a ConcreteJob.", + }, + self.IND_JOB_CREATED: { + "Query" : "SELECT * FROM CIM_InstCreation WHERE " + "SourceInstance ISA CIM_ConcreteJob", + "Description": "Creation of a ConcreteJob.", + }, + } + self.indication_manager.add_filters(filters) + + @cmpi_logging.trace_method + def get_providers(self): + """ + Get dictionary of providers for these classes: + + * ``LMI_<name>Job`` + * ``LMI_<name>MethodResult`` + * ``LMI_Affected<name>JobElement`` + * ``LMI_Owning<name>JobElement`` + * ``LMI_Associated<name>JobMethodResult`` + + :rtype: dictionary class_name -> CIMProvider2 + """ + + if not self.providers: + job_provider = LMI_ConcreteJob(self.job_classname, job_manager=self) + self.providers[self.job_classname] = job_provider + self.job_provider = job_provider + + provider = LMI_MethodResult( + self.method_result_classname, job_manager=self) + self.providers[self.method_result_classname] = provider + + provider = LMI_AffectedJobElement( + self.affected_classname, job_manager=self) + self.providers[self.affected_classname] = provider + + provider = LMI_OwningJobElement( + self.owning_classname, job_manager=self) + self.providers[self.owning_classname] = provider + + provider = LMI_AssociatedJobMethodResult( + self.owning_classname, job_manager=self) + self.providers[self.associated_result_classname] = provider + + return self.providers + + @cmpi_logging.trace_method + def add_job(self, job): + """ + Enqueue new job. Send indication when needed. + + :param job: (``Job``) A job to enqueue. + """ + cmpi_logging.logger.debug("Job %s: '%s' enqueued" + % (job.the_id, job.job_name)) + + self.jobs[job.the_id] = job + self.queue.put(job) + # send indication + if self.indication_manager.is_subscribed(self.IND_JOB_CREATED): + job_instance = self.get_job_instance(job) + self.indication_manager.send_instcreation( + job_instance, self.IND_JOB_CREATED) + + def send_modify_indications(self, prev_instance, current_instance, + indication_ids): + """ + Send InstModification. This is helper method called by ``Job`` when + needed. + + :param prev_instance: Instance of ``LMI_<name>Job`` before it was + modified. + :param current_instance: Instance of ``LMI_<name>Job`` after it was + modified. + """ + for _id in indication_ids: + self.indication_manager.send_instmodification(prev_instance, + current_instance, _id) + + @cmpi_logging.trace_method + def remove_job(self, job): + """ + Remove existing job. Note that jobs are removed automatically after a + timeout, providers should not call this method directly. + + :param job: (``Job``) Job to remove. + """ + cmpi_logging.logger.debug("Removing job %s: '%s'" + % (job.the_id, job.job_name)) + del self.jobs[job.the_id] + # The job may still be in the queue! + # There is no way, how to remove it, it will be skipped by the + # worker thread. + + @cmpi_logging.trace_method + def get_job_for_instance_id(self, instance_id, classname=None): + """ + Return Job for given InstanceID or None when no such Job exist. + + :param instance_id: (``string``) InstanceID value to parse. + :param classname: (``string``) Optional classname to parse the + InstanceID (e.g. when parsing InstanceID of + ``LMI_<name>MethodResult``). + :rtype: ``Job`` + """ + if classname is None: + classname = self.job_classname + the_id = Job.parse_instance_id(instance_id, self, classname) + if the_id: + return self.jobs.get(the_id, None) + else: + return None + + @cmpi_logging.trace_method + def _worker_main(self): + """ + This is the main loop of the job queue. It just processes enqueued + jobs and never ends. + """ + while True: + job = self.queue.get() + # we need to protect from changes between checking state and + # setting new state + job.lock() + if job.job_state == Job.STATE_QUEUED: + # the job was not cancelled + job.change_state(Job.STATE_RUNNING) + job.unlock() + cmpi_logging.logger.info("Starting job %s: '%s'" % + (job.the_id, job.job_name)) + + job.execute() + if job.error: + cmpi_logging.logger.warn("Job %s: '%s' finished with error:" + " %s" % (job.the_id, job.job_name, str(job.error))) + else: + cmpi_logging.logger.info("Job %s: '%s' finished OK" % + (job.the_id, job.job_name)) + else: + # just skip suspended and terminated jobs + job.unlock() + self.queue.task_done() + + @cmpi_logging.trace_method + def get_next_id(self): + """ + Return next unused job id. + + :rtype: string + """ + self.last_instance_id += 1 + return str(self.last_instance_id) + + @cmpi_logging.trace_method + def get_job_instance(self, job): + """ + Return CIMInstance for given job. + + :param job: (``Job``) + :rtype: ``CIMInstance`` + """ + path = pywbem.CIMInstanceName( + classname=self.job_classname, + keybindings={'InstanceID': job.get_instance_id()}, + host=socket.gethostname(), + namespace=self.namespace) + inst = pywbem.CIMInstance(classname=self.job_classname, path=path) + inst['InstanceID'] = job.get_instance_id() + return self.job_provider.get_instance(None, inst) + + +class LMI_ConcreteJob(CIMProvider2): + """ + Provider of LMI_ConcreteJob class or its subclass. + """ + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """ + Provider implementation of EnumerateInstances intrinsic method. + """ + model.path.update({'InstanceID': None}) + for job in self.job_manager.jobs.values(): + model['InstanceID'] = job.get_instance_id() + if keys_only: + yield model + else: + yield self.get_instance(env, model, job) + + @cmpi_logging.trace_method + def get_job_states(self, job): + """ + Return JobState and OperationalStatus property values. + + :param job: (``int``) Job.STATE_* value. + :rtype: tuple ``(JobState, OperationalStatus)`` values. + """ + if job.job_state == Job.STATE_QUEUED: + jobstate = self.Values.JobState.New + opstate = [self.Values.OperationalStatus.Dormant] + elif job.job_state == Job.STATE_RUNNING: + jobstate = self.Values.JobState.Running + opstate = [self.Values.OperationalStatus.OK] + elif job.job_state == Job.STATE_FINISHED_OK: + jobstate = self.Values.JobState.Completed + opstate = [self.Values.OperationalStatus.OK, + self.Values.OperationalStatus.Completed] + elif job.job_state == Job.STATE_SUSPENDED: + jobstate = self.Values.JobState.Suspended + opstate = [self.Values.OperationalStatus.OK] + elif job.job_state == Job.STATE_FAILED: + jobstate = self.Values.JobState.Exception + opstate = [self.Values.OperationalStatus.Error, + self.Values.OperationalStatus.Completed] + elif job.job_state == Job.STATE_TERMINATED: + jobstate = self.Values.JobState.Terminated + opstate = [self.Values.OperationalStatus.Stopped] + return jobstate, opstate + + @cmpi_logging.trace_method + # pylint: disable-msg=W0221 + def get_instance(self, env, model, job=None): + """ + Provider implementation of GetInstance intrinsic method. + """ + if not job: + instance_id = model['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + model['DeleteOnCompletion'] = job.delete_on_completion + model['Name'] = job.job_name + + # convert seconds to timedelta + seconds = job.time_before_removal + if seconds: + delta = timedelta(seconds=seconds) + model['TimeBeforeRemoval'] = pywbem.CIMDateTime(delta) + else: + model['TimeBeforeRemoval'] = pywbem.CIMProperty( + name='TimeBeforeRemoval', + value=None, + type='datetime') + + if job.time_of_last_state_change: + model['TimeOfLastStateChange'] = pywbem.CIMDateTime( + job.time_of_last_state_change) + else: + model['TimeOfLastStateChange'] = pywbem.CIMProperty( + name='TimeOfLastStateChange', + value=None, + type='datetime') + + if job.elapsed_time: + model['ElapsedTime'] = pywbem.CIMDateTime(job.elapsed_time) + else: + model['ElapsedTime'] = pywbem.CIMProperty( + name='ElapsedTime', + value=None, + type='datetime') + + model['Description'] = job.job_name + model['LocalOrUtcTime'] = self.Values.LocalOrUtcTime.UTC_Time + model['PercentComplete'] = pywbem.Uint16(job.percent_complete) + if job.start_time: + model['StartTime'] = pywbem.CIMDateTime(job.start_time) + else: + model['StartTime'] = pywbem.CIMProperty( + name='StartTime', + value=None, + type='datetime') + + model['TimeSubmitted'] = pywbem.CIMDateTime(job.time_submitted) + # set correct state + jobstate, opstate = self.get_job_states(job) + model['JobState'] = jobstate + model['OperationalStatus'] = opstate + return model + + @cmpi_logging.trace_method + def set_instance(self, env, instance, modify_existing): + """Return a newly created or modified instance. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param instance: The new pywbem.CIMInstance. If modifying an existing + instance, the properties on this instance have been filtered by + the PropertyList from the request. + :param modify_existing: True if ModifyInstance, False if CreateInstance + + Return the new instance. The keys must be set on the new instance. + """ + if not modify_existing: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "Creation of Job instances is not supported.") + + job = self.job_manager.get_job_for_instance_id(instance['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + try: + job.lock() + restart_timer = False + + for (key, value) in instance.iteritems(): + if value is None: + continue + if key == 'DeleteOnCompletion': + job.delete_on_completion = value + restart_timer = True + elif key == 'TimeBeforeRemoval': + job.time_before_removal = value.total_seconds() + restart_timer = True + elif key == 'JobRunTimes': + if value != 1: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "JobRunTimes property is not supported.") + elif key == 'LocalOrUtcTime': + if value != self.Values.LocalOrUtcTime.UTC_Time: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "Setting of LocalOrUtcTime property is not" + " supported.") + else: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "Setting of %s property is not supported." % (key,)) + + if restart_timer: + job._restart_timer() + finally: + job.unlock() + return instance + + @cmpi_logging.trace_method + def delete_instance(self, env, instance_name): + """Delete an instance. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param instance_name: A pywbem.CIMInstanceName specifying the instance + to delete. + """ + job = self.job_manager.get_job_for_instance_id( + instance_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + if not job.job_status in Job.FINAL_STATES: + raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, + "Job has not finished.") + + self.job_manager.remove_job(job) + + @cmpi_logging.trace_method + def cim_method_geterrors(self, env, object_name): + """Implements LMI_StorageJob.GetErrors() + + If JobState is "Completed" and Operational Status is "Completed" + then no instance of CIM_Error is returned. + + If JobState is "Exception" then GetErrors may return intances of + CIM_Error related to the execution of the procedure or method invoked by + the job. + + If Operatational Status is not "OK" or "Completed" then + GetErrors may return CIM_Error instances related to the running of + the job. + + :param env: -- Provider Environment (pycimmb.ProviderEnvironment) + :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName + specifying the object on which the method GetErrors() + should be invoked. + + Output parameters: + + * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...)) + If the OperationalStatus on the Job is not "OK", then this + method will return one or more CIM Error instance(s). + Otherwise, when the Job is "OK", null is returned. + """ + job = self.job_manager.get_job_for_instance_id( + object_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + if job.error is None: + errors = [] + else: + errors = [job.error, ] + out_params = [ + pywbem.CIMParameter( + name='errors', + value=errors, + type='instance', + is_array=True, + array_size=len(errors)) + ] + rval = self.Values.GetErrors.Success + + return (rval, out_params) + + @cmpi_logging.trace_method + def cim_method_requeststatechange(self, env, object_name, + param_requestedstate=None, + param_timeoutperiod=None): + """Implements LMI_StorageJob.RequestStateChange() + + Requests that the state of the job be changed to the value + specified in the RequestedState parameter. Invoking the + RequestStateChange method multiple times could result in earlier + requests being overwritten or lost. + + If 0 is returned, then the + task completed successfully. Any other return code indicates an + error condition. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName + specifying the object on which the method RequestStateChange() + should be invoked. + :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState) + RequestStateChange changes the state of a job. The possible + values are as follows: Start (2) changes the state to + \'Running\'. Suspend (3) stops the job temporarily. The + intention is to subsequently restart the job with \'Start\'. + It might be possible to enter the \'Service\' state while + suspended. (This is job-specific.) Terminate (4) stops the + job cleanly, saving data, preserving the state, and shutting + down all underlying processes in an orderly manner. Kill (5) + terminates the job immediately with no requirement to save + data or preserve the state. Service (6) puts the job into a + vendor-specific service state. It might be possible to restart + the job. + + :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime) + A timeout period that specifies the maximum amount of time that + the client expects the transition to the new state to take. + The interval format must be used to specify the TimeoutPeriod. + A value of 0 or a null parameter indicates that the client has + no time requirements for the transition. If this property + does not contain 0 or null and the implementation does not + support this parameter, a return code of \'Use Of Timeout + Parameter Not Supported\' must be returned. + """ + job = self.job_manager.get_job_for_instance_id( + object_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + try: + job.lock() + states = self.Values.RequestStateChange.RequestedState + retcodes = self.Values.RequestStateChange + if param_requestedstate == states.Suspend: + if job.job_state != Job.STATE_QUEUED: + # Can suspend only queued jobs + rval = retcodes.Invalid_State_Transition + else: + job.change_state(Job.STATE_SUSPENDED) + rval = retcodes.Completed_with_No_Error + + elif param_requestedstate == states.Terminate: + if job.job_state not in (Job.STATE_QUEUED, Job.STATE_SUSPENDED): + # Can terminate only queued or suspended jobs + rval = retcodes.Invalid_State_Transition + else: + job.cancel() + rval = retcodes.Completed_with_No_Error + + elif param_requestedstate == states.Start: + if job.job_state != Job.STATE_SUSPENDED: + # Can start only suspended jobs + rval = retcodes.Invalid_State_Transition + else: + job.change_state(Job.STATE_QUEUED) + # Enqueue the job again, it may be already processed + # (we might get the job in the queue twice, but + # we have only one worker thread so it won't collide). + self.job_manager.add_job(job) + rval = retcodes.Completed_with_No_Error + + else: + rval = retcodes.Invalid_State_Transition + finally: + job.unlock() + return (rval, []) + + @cmpi_logging.trace_method + def cim_method_killjob(self, env, object_name, + param_deleteonkill=None): + """Implements LMI_StorageJob.KillJob() """ + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) + + @cmpi_logging.trace_method + def cim_method_geterror(self, env, object_name): + """Implements LMI_StorageJob.GetError() + + GetError is deprecated because Error should be an array,not a + scalar. + + When the job is executing or has terminated without + error, then this method returns no CIM_Error instance. However, if + the job has failed because of some internal problem or because the + job has been terminated by a client, then a CIM_Error instance is + returned. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName + specifying the object on which the method GetError() + should be invoked. + + Output parameters: + + * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``) + If the OperationalStatus on the Job is not "OK", then this + method will return a CIM Error instance. Otherwise, when the + Job is "OK", null is returned. + """ + job = self.job_manager.get_job_for_instance_id( + object_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + if job.error is None: + error = pywbem.CIMParameter( + name='error', + value=None, + type='instance', + is_array=False) + else: + error = pywbem.CIMParameter( + name='error', + value=job.error, + type='instance') + rval = self.Values.GetError.Success + return (rval, [error]) + + class Values(object): + class JobState(object): + New = pywbem.Uint16(2) + Starting = pywbem.Uint16(3) + Running = pywbem.Uint16(4) + Suspended = pywbem.Uint16(5) + Shutting_Down = pywbem.Uint16(6) + Completed = pywbem.Uint16(7) + Terminated = pywbem.Uint16(8) + Killed = pywbem.Uint16(9) + Exception = pywbem.Uint16(10) + Service = pywbem.Uint16(11) + Query_Pending = pywbem.Uint16(12) + # DMTF_Reserved = 13..32767 + # Vendor_Reserved = 32768..65535 + + class LocalOrUtcTime(object): + Local_Time = pywbem.Uint16(1) + UTC_Time = pywbem.Uint16(2) + + class OperationalStatus(object): + Unknown = pywbem.Uint16(0) + Other = pywbem.Uint16(1) + OK = pywbem.Uint16(2) + Degraded = pywbem.Uint16(3) + Stressed = pywbem.Uint16(4) + Predictive_Failure = pywbem.Uint16(5) + Error = pywbem.Uint16(6) + Non_Recoverable_Error = pywbem.Uint16(7) + Starting = pywbem.Uint16(8) + Stopping = pywbem.Uint16(9) + Stopped = pywbem.Uint16(10) + In_Service = pywbem.Uint16(11) + No_Contact = pywbem.Uint16(12) + Lost_Communication = pywbem.Uint16(13) + Aborted = pywbem.Uint16(14) + Dormant = pywbem.Uint16(15) + Supporting_Entity_in_Error = pywbem.Uint16(16) + Completed = pywbem.Uint16(17) + Power_Mode = pywbem.Uint16(18) + Relocating = pywbem.Uint16(19) + # DMTF_Reserved = .. + # Vendor_Reserved = 0x8000.. + + class GetErrors(object): + Success = pywbem.Uint32(0) + Not_Supported = pywbem.Uint32(1) + Unspecified_Error = pywbem.Uint32(2) + Timeout = pywbem.Uint32(3) + Failed = pywbem.Uint32(4) + Invalid_Parameter = pywbem.Uint32(5) + Access_Denied = pywbem.Uint32(6) + # DMTF_Reserved = .. + # Vendor_Specific = 32768..65535 + + class GetError(object): + Success = pywbem.Uint32(0) + Not_Supported = pywbem.Uint32(1) + Unspecified_Error = pywbem.Uint32(2) + Timeout = pywbem.Uint32(3) + Failed = pywbem.Uint32(4) + Invalid_Parameter = pywbem.Uint32(5) + Access_Denied = pywbem.Uint32(6) + # DMTF_Reserved = .. + # Vendor_Specific = 32768..65535 + + class RequestStateChange(object): + Completed_with_No_Error = pywbem.Uint32(0) + Not_Supported = pywbem.Uint32(1) + Unknown_Unspecified_Error = pywbem.Uint32(2) + Can_NOT_complete_within_Timeout_Period = pywbem.Uint32(3) + Failed = pywbem.Uint32(4) + Invalid_Parameter = pywbem.Uint32(5) + In_Use = pywbem.Uint32(6) + # DMTF_Reserved = .. + Method_Parameters_Checked___Transition_Started = pywbem.Uint32(4096) + Invalid_State_Transition = pywbem.Uint32(4097) + Use_of_Timeout_Parameter_Not_Supported = pywbem.Uint32(4098) + Busy = pywbem.Uint32(4099) + # Method_Reserved = 4100..32767 + # Vendor_Specific = 32768..65535 + class RequestedState(object): + Start = pywbem.Uint16(2) + Suspend = pywbem.Uint16(3) + Terminate = pywbem.Uint16(4) + Kill = pywbem.Uint16(5) + Service = pywbem.Uint16(6) + # DMTF_Reserved = 7..32767 + # Vendor_Reserved = 32768..65535 + +class LMI_OwningJobElement(CIMProvider2): + """ Instrumentation of LMI_OwningJobElement class and its subclasses.""" + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def get_instance(self, env, model): + """Return an instance.""" + instance_id = model['OwnedElement']['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "OwnedElement not found.") + + if job.owning_element != model['OwningElement']: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "OwnedElement is not associated to OwningElement.") + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'OwnedElement': None, 'OwningElement': None}) + for job in self.job_manager.jobs.values(): + if job.owning_element: + model['OwnedElement'] = job.get_name() + model['OwningElement'] = job.owning_element + yield model + + @cmpi_logging.trace_method + def references(self, env, object_name, model, result_class_name, role, + result_role, keys_only): + """Instrument Associations.""" + ch = env.get_cimom_handle() + if ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super='CIM_ManagedElement') or \ + ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.job_classname): + return self.simple_refs(env, object_name, model, + result_class_name, role, result_role, keys_only) + +class LMI_AffectedJobElement(CIMProvider2): + """ Instrumentation of LMI_AffectedJobElement class and its subclasses.""" + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def get_instance(self, env, model): + """Return an instance.""" + instance_id = model['AffectingElement']['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "AffectingElement not found.") + + if model['AffectedElement'] not in job.affected_elements: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "AffectedElement is not associated to AffectingElement.") + model['ElementEffects'] = [self.Values.ElementEffects.Unknown, ] + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'AffectingElement': None, 'AffectedElement': None}) + for job in self.job_manager.jobs.values(): + for element in job.affected_elements: + model['AffectingElement'] = job.get_name() + model['AffectedElement'] = element + if keys_only: + yield model + else: + yield self.get_instance(env, model) + + @cmpi_logging.trace_method + def references(self, env, object_name, model, result_class_name, role, + result_role, keys_only): + """Instrument Associations.""" + ch = env.get_cimom_handle() + if ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super='CIM_ManagedElement') or \ + ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.job_classname): + return self.simple_refs(env, object_name, model, + result_class_name, role, result_role, keys_only) + + class Values(object): + class ElementEffects(object): + Unknown = pywbem.Uint16(0) + Other = pywbem.Uint16(1) + Exclusive_Use = pywbem.Uint16(2) + Performance_Impact = pywbem.Uint16(3) + Element_Integrity = pywbem.Uint16(4) + Create = pywbem.Uint16(5) + + +class LMI_MethodResult(CIMProvider2): + """Instrumentation of LMI_MethodResult class and its subclasses.""" + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + # pylint: disable-msg=W0221 + def get_instance(self, env, model, job=None): + """Return an instance.""" + if not job: + instance_id = model['InstanceID'] + job = self.job_manager.get_job_for_instance_id( + instance_id, self.classname) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + model['Description'] = job.job_name + if job.job_state in Job.FINAL_STATES: + model['PostCallIndication'] = pywbem.CIMProperty( + name='PostCallIndication', + value=job.get_post_call()) + else: + model['PostCallIndication'] = pywbem.CIMProperty( + name='PostCallIndication', + type='instance', + value=None) + model['PreCallIndication'] = pywbem.CIMProperty( + name='PreCallIndication', + value=job.get_pre_call()) + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'InstanceID': None}) + for job in self.job_manager.jobs.values(): + model['InstanceID'] = job.get_instance_id( + classname=self.classname) + if keys_only: + yield model + else: + yield self.get_instance(env, model, job) + +class LMI_AssociatedJobMethodResult(CIMProvider2): + """ + Instrumentation of LMI_AssociatedJobMethodResult class and its + subclasses. + """ + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def get_instance(self, env, model): + """Return an instance.""" + instance_id = model['Job']['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + expected_result_id = job.get_instance_id( + classname=self.job_manager.method_result_classname) + if model['JobParameters']['InstanceID'] != expected_result_id: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job is not associated to JobParameters.") + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'JobParameters': None, 'Job': None}) + for job in self.job_manager.jobs.values(): + if job.owning_element: + model['Job'] = job.get_name() + model['JobParameters'] = pywbem.CIMInstanceName( + classname=self.job_manager.method_result_classname, + namespace=self.job_manager.namespace, + keybindings={ + 'InstanceID': job.get_instance_id( + classname=self.job_manager.method_result_classname) + }) + yield model + + @cmpi_logging.trace_method + def references(self, env, object_name, model, result_class_name, role, + result_role, keys_only): + """Instrument Associations.""" + ch = env.get_cimom_handle() + if ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.method_result_classname) or \ + ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.job_classname): + return self.simple_refs(env, object_name, model, + result_class_name, role, result_role, keys_only) + diff --git a/src/software/openlmi/software/util/singletonmixin.py b/src/python/openlmi/common/singletonmixin.py index c252676..c252676 100644 --- a/src/software/openlmi/software/util/singletonmixin.py +++ b/src/python/openlmi/common/singletonmixin.py diff --git a/src/software/cli/software.py b/src/software/cli/software.py index 55c3592..effc0c4 100755 --- a/src/software/cli/software.py +++ b/src/software/cli/software.py @@ -106,7 +106,7 @@ def list_available(conn): Role="Collection", ResultRole="Member") for nevra in (i['InstanceID'] for i in inames): - print nevra[len("LMI:SoftwareIdentity:"):] + print nevra[len("LMI:LMI_SoftwareIdentity:"):] def list_installed(_conn): """List nevra strings of installed packages.""" diff --git a/src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py b/src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py index 811298c..82c2049 100644 --- a/src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py +++ b/src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py @@ -107,10 +107,10 @@ class LMI_InstalledSoftwareIdentity(CIMProvider2): "Expected object path for InstalledSoftware!") model["System"] = model.path["System"] = ComputerSystem.get_path() - pkg_info = Identity.object_path2pkg( - model['InstalledSoftware'], kind='installed') - model['InstalledSoftware'] = Identity.pkg2model(pkg_info) - return model + pkg_info = Identity.object_path2pkg( + model['InstalledSoftware'], kind='installed') + model['InstalledSoftware'] = Identity.pkg2model(pkg_info) + return model @cmpi_logging.trace_method def enum_instances(self, env, model, keys_only): diff --git a/src/software/openlmi/software/LMI_SoftwareInstallationService.py b/src/software/openlmi/software/LMI_SoftwareInstallationService.py index 8d29b3d..f1e0ea7 100644 --- a/src/software/openlmi/software/LMI_SoftwareInstallationService.py +++ b/src/software/openlmi/software/LMI_SoftwareInstallationService.py @@ -80,7 +80,7 @@ class LMI_SoftwareInstallationService(CIMProvider2): model['EnabledDefault'] = self.values.EnabledDefault.Not_Applicable model['EnabledState'] = self.values.EnabledState.Not_Applicable model['HealthState'] = self.values.HealthState.OK - model['InstanceID'] = 'LMI:InstallationService' + model['InstanceID'] = 'LMI:LMI_InstallationService' model['OperatingStatus'] = self.values.OperatingStatus.Servicing model['OperationalStatus'] = [self.values.OperationalStatus.OK] model['PrimaryStatus'] = self.values.PrimaryStatus.OK @@ -372,7 +372,8 @@ class LMI_SoftwareInstallationService(CIMProvider2): env, "uri", param_uri, param_target, None, param_installoptions, param_installoptionsvalues) - rval = self.values.InstallFromURI.Job_Completed_with_No_Error + rval = self.values.InstallFromURI. \ + Method_Parameters_Checked___Job_Started out_params[0].value = InstallationJob.job2model(jobid) except InstallationService.InstallationError as exc: cmpi_logging.logger.error( @@ -658,7 +659,7 @@ class LMI_SoftwareInstallationService(CIMProvider2): param_target, param_collection, param_installoptions, param_installoptionsvalues) rval = self.values.InstallFromSoftwareIdentity. \ - Job_Completed_with_No_Error + Method_Parameters_Checked___Job_Started out_params[0].value = InstallationJob.job2model(jobid) except InstallationService.InstallationError as exc: cmpi_logging.logger.error( diff --git a/src/software/openlmi/software/cimom_entry.py b/src/software/openlmi/software/cimom_entry.py index 4aa1d44..ad944c8 100644 --- a/src/software/openlmi/software/cimom_entry.py +++ b/src/software/openlmi/software/cimom_entry.py @@ -21,10 +21,13 @@ # """ -Entry module for OpenLMI Software proviers. +Entry module for OpenLMI Software providers. """ +from multiprocessing import Queue from openlmi.common import cmpi_logging +from openlmi.common.IndicationManager import IndicationManager +from openlmi.software.core import InstallationJob from openlmi.software.LMI_SoftwareIdentity import LMI_SoftwareIdentity from openlmi.software.LMI_SystemSoftwareCollection import \ LMI_SystemSoftwareCollection @@ -61,14 +64,20 @@ from openlmi.software.LMI_AssociatedSoftwareJobMethodResult import \ LMI_AssociatedSoftwareJobMethodResult from openlmi.software.LMI_OwningSoftwareJobElement import \ LMI_OwningSoftwareJobElement -from openlmi.software.yumdb import YumDB +from openlmi.software.yumdb import jobmanager, YumDB def get_providers(env): """ @return mapping of provider names to corresponding provider instances. """ + cmpi_logging.LogManager.LOGGER_NAME = 'openlmi.software' cmpi_logging.LogManager(env) + # jobmanager does not understand CIM models, give it a way to transform + # job to CIMIndication instance + jobmanager.JOB_TO_MODEL = lambda job: \ + InstallationJob.job2model(job, keys_only=False) + providers = { "LMI_SoftwareIdentity" : LMI_SoftwareIdentity(env), "LMI_SystemSoftwareCollection" : LMI_SystemSoftwareCollection(env), @@ -91,6 +100,8 @@ def get_providers(env): "LMI_SoftwareInstallationServiceAffectsElement" : \ LMI_SoftwareInstallationServiceAffectsElement(env), "LMI_SoftwareInstallationJob" : LMI_SoftwareInstallationJob(env), + "LMI_SoftwareInstCreation" : LMI_SoftwareInstallationJob(env), + "LMI_SoftwareInstModification" : LMI_SoftwareInstallationJob(env), "LMI_SoftwareMethodResult" : LMI_SoftwareMethodResult(env), "LMI_AffectedSoftwareJobElement" : LMI_AffectedSoftwareJobElement(env), "LMI_AssociatedSoftwareJobMethodResult" : \ @@ -98,12 +109,40 @@ def get_providers(env): "LMI_OwningSoftwareJobElement" : LMI_OwningSoftwareJobElement(env) } + # Initialization of indication manager -- running in separate thread as + # daemon. That means it does not have to be cleaned up. + im = IndicationManager.get_instance( + env, "Software", "root/cimv2", queue=Queue()) + jobmanager.register_filters(im) + return providers +def authorize_filter(env, fltr, ns, classes, owner): + IndicationManager.get_instance().authorize_filter( + env, fltr, ns, classes, owner) + +def activate_filter (env, fltr, ns, classes, first_activation): + IndicationManager.get_instance().activate_filter( + env, fltr, ns, classes, first_activation) + +def deactivate_filter(env, fltr, ns, classes, last_activation): + IndicationManager.get_instance().deactivate_filter( + env, fltr, ns, classes, last_activation) + +def enable_indications(env): + IndicationManager.get_instance().enable_indications(env) + +def disable_indications(env): + IndicationManager.get_instance().disable_indications(env) + def can_unload(_env): - """ Says, whether providers can be unlouded. """ + """ + Says, whether providers can be unloaded. + """ return True def shutdown(_env): - """ Release resources upon cleanup. """ + """ + Release resources upon cleanup. + """ YumDB.get_instance().clean_up() diff --git a/src/software/openlmi/software/core/Identity.py b/src/software/openlmi/software/core/Identity.py index eda0ab9..9208dcb 100644 --- a/src/software/openlmi/software/core/Identity.py +++ b/src/software/openlmi/software/core/Identity.py @@ -170,10 +170,10 @@ def object_path2nevra(op, with_epoch='NOT_ZERO'): if (not "InstanceID" in op or not op['InstanceID']): raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, "Wrong keys.") instid = op['InstanceID'] - if not instid.lower().startswith("lmi:softwareidentity:"): + if not instid.lower().startswith("lmi:lmi_softwareidentity:"): raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, - "InstanceID must start with LMI:SoftwareIdentity: prefix.") - instid = instid[len("LMI:SoftwareIdentity:"):] + "InstanceID must start with LMI:LMI_SoftwareIdentity: prefix.") + instid = instid[len("LMI:LMI_SoftwareIdentity:"):] match = util.RE_NEVRA_OPT_EPOCH.match(instid) if not match: raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, @@ -241,7 +241,7 @@ def pkg2model(pkg, keys_only=True, model=None): if not keys_only: model = pywbem.CIMInstance("LMI_SoftwareIdentity", path=model) nevra = pkg if isinstance(pkg, basestring) else pkg.nevra - model['InstanceID'] = 'LMI:SoftwareIdentity:'+nevra + model['InstanceID'] = 'LMI:LMI_SoftwareIdentity:'+nevra if not keys_only: model.path['InstanceID'] = model['InstanceID'] #pylint: disable=E1103 model['Caption'] = pkg.summary diff --git a/src/software/openlmi/software/core/IdentityResource.py b/src/software/openlmi/software/core/IdentityResource.py index 579b0f1..cae81df 100644 --- a/src/software/openlmi/software/core/IdentityResource.py +++ b/src/software/openlmi/software/core/IdentityResource.py @@ -587,7 +587,7 @@ def _fill_non_keys(repo, model): model['Generation'] = pywbem.CIMProperty('Generation', None, type='uint64') model['InfoFormat'] = Values.InfoFormat.URL - model['InstanceID'] = 'LMI:SoftwareIdentityResource:' + repo.repoid + model['InstanceID'] = 'LMI:LMI_SoftwareIdentityResource:' + repo.repoid if repo.mirror_list: model["MirrorList"] = repo.mirror_list else: diff --git a/src/software/openlmi/software/core/InstallationJob.py b/src/software/openlmi/software/core/InstallationJob.py index 2ac1f10..7b06be2 100644 --- a/src/software/openlmi/software/core/InstallationJob.py +++ b/src/software/openlmi/software/core/InstallationJob.py @@ -481,7 +481,7 @@ def job2model(job, keys_only=True, model=None): path=model) jobid = job.jobid if isinstance(job, jobs.YumAsyncJob) else job - model['InstanceID'] = 'LMI:SoftwareInstallationJob:%d' % jobid + model['InstanceID'] = 'LMI:LMI_SoftwareInstallationJob:%d' % jobid if isinstance(model, pywbem.CIMInstance): model.path['InstanceID'] = model['InstanceID'] #pylint: disable=E1103 if not keys_only: @@ -500,11 +500,11 @@ def object_path2job(op): if (not "InstanceID" in op or not op['InstanceID']): raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, "Wrong keys.") instid = op['InstanceID'] - if not instid.lower().startswith("lmi:softwareinstallationjob:"): + if not instid.lower().startswith("lmi:lmi_softwareinstallationjob:"): raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, - "InstanceID must start with LMI:SoftwareInstallationJob: prefix.") + "InstanceID must start with LMI:LMI_SoftwareInstallationJob: prefix.") try: - instid = int(instid[len("LMI:SoftwareInstallationJob:"):]) + instid = int(instid[len("LMI:LMI_SoftwareInstallationJob:"):]) except ValueError: raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, 'Invalid InstanceID "%s"' % instid) diff --git a/src/software/openlmi/software/core/InstallationService.py b/src/software/openlmi/software/core/InstallationService.py index beb4351..acc7ca6 100644 --- a/src/software/openlmi/software/core/InstallationService.py +++ b/src/software/openlmi/software/core/InstallationService.py @@ -547,7 +547,7 @@ def get_path(): systemop = ComputerSystem.get_path() op["SystemCreationClassName"] = systemop.classname op['SystemName'] = systemop["Name"] - op["Name"] = "LMI:SoftwareInstallationService" + op["Name"] = "LMI:LMI_SoftwareInstallationService" return op @cmpi_logging.trace_function diff --git a/src/software/openlmi/software/core/InstallationServiceCapabilities.py b/src/software/openlmi/software/core/InstallationServiceCapabilities.py index 4fbc5ec..f801790 100644 --- a/src/software/openlmi/software/core/InstallationServiceCapabilities.py +++ b/src/software/openlmi/software/core/InstallationServiceCapabilities.py @@ -147,7 +147,7 @@ def get_path(): op = pywbem.CIMInstanceName( classname="LMI_SoftwareInstallationServiceCapabilities", namespace="root/cimv2") - op['InstanceID'] = "LMI:SoftwareInstallationServiceCapabilities" + op['InstanceID'] = "LMI:LMI_SoftwareInstallationServiceCapabilities" return op @cmpi_logging.trace_function @@ -200,9 +200,9 @@ def get_instance(model=None): model['InstanceID'] = path['InstanceID'] model['CanAddToCollection'] = True - model['Caption'] = 'Capabilities of LMI:SoftwareInstallationService' + model['Caption'] = 'Capabilities of LMI:LMI_SoftwareInstallationService' model['Description'] = ('This instance provides information' - ' about LMI:SoftwareInstallationService\'s capabilities.') + ' about LMI:LMI_SoftwareInstallationService\'s capabilities.') model['SupportedAsynchronousActions'] = [ Values.SupportedAsynchronousActions.Install_From_Software_Identity, Values.SupportedAsynchronousActions.Install_from_URI] diff --git a/src/software/openlmi/software/core/MethodResult.py b/src/software/openlmi/software/core/MethodResult.py index 7bfdac5..723edbb 100644 --- a/src/software/openlmi/software/core/MethodResult.py +++ b/src/software/openlmi/software/core/MethodResult.py @@ -32,10 +32,10 @@ def object_path2jobid(op): """ if not isinstance(op, pywbem.CIMInstanceName): raise TypeError("op must be a CIMInstanceName") - if not op["InstanceID"].lower().startswith('lmi:softwaremethodresult:'): + if not op["InstanceID"].lower().startswith('lmi:lmi_softwaremethodresult:'): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Missing 'LMI:SoftwareMethodResult:' prefix in InstanceID.") - instid = op['InstanceID'][len('LMI:SoftwareMethodResult:'):] + "Missing 'LMI:LMI_SoftwareMethodResult:' prefix in InstanceID.") + instid = op['InstanceID'][len('LMI:LMI_SoftwareMethodResult:'):] try: instid = int(instid) except ValueError: @@ -66,7 +66,7 @@ def job2model(job, keys_only=True, model=None): namespace="root/cimv2") if not keys_only: model = pywbem.CIMInstance("LMI_SoftwareMethodResult", path=model) - model['InstanceID'] = "LMI:SoftwareMethodResult:"+str(job.jobid) + model['InstanceID'] = "LMI:LMI_SoftwareMethodResult:"+str(job.jobid) if not keys_only: model.path['InstanceID'] = model['InstanceID'] #pylint: disable=E1103 model['Caption'] = 'Result of method %s' % job.metadata['method_name'] diff --git a/src/software/openlmi/software/core/SystemCollection.py b/src/software/openlmi/software/core/SystemCollection.py index 2d07130..d07ecdd 100644 --- a/src/software/openlmi/software/core/SystemCollection.py +++ b/src/software/openlmi/software/core/SystemCollection.py @@ -28,7 +28,7 @@ def get_path(): op = pywbem.CIMInstanceName( classname="LMI_SystemSoftwareCollection", namespace="root/cimv2") - op['InstanceID'] = "LMI:SystemSoftwareCollection" + op['InstanceID'] = "LMI:LMI_SystemSoftwareCollection" return op @cmpi_logging.trace_function diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py index d64c0f4..04f2191 100644 --- a/src/software/openlmi/software/yumdb/__init__.py +++ b/src/software/openlmi/software/yumdb/__init__.py @@ -43,7 +43,8 @@ import Queue as TQueue # T as threaded import threading import yum -from openlmi.common import cmpi_logging +from openlmi.common import cmpi_logging, singletonmixin +from openlmi.common.IndicationManager import IndicationManager from openlmi.software.yumdb import jobs from openlmi.software.yumdb import errors from openlmi.software.yumdb.packageinfo import PackageInfo @@ -52,16 +53,17 @@ from openlmi.software.yumdb.packagecheck import PackageCheck from openlmi.software.yumdb.process import YumWorker from openlmi.software.yumdb.repository import Repository from openlmi.software.yumdb.util import DispatchingFormatter -from openlmi.software.util import get_signal_name, singletonmixin +from openlmi.software.util import get_signal_name # Maximum time in seconds to wait for a job to accomplish. # If timeout expires, spawned process is checked (it might # be possibly killed) and is respawned in case it's dead. MAX_JOB_WAIT_TIME = 30 -# this may be used as an argument to YumWorker +# this may be used as an argument to YumWorker to setup logging YUM_WORKER_DEBUG_LOGGING_CONFIG = { "version" : 1, + 'disable_existing_loggers' : True, "formatters": { # this is a message format for logging function/method calls # it's manually set up in YumWorker's init method @@ -85,7 +87,7 @@ YUM_WORKER_DEBUG_LOGGING_CONFIG = { }, "loggers" : { "root": { - "level": "ERROR", + "level": "DEBUG", "handlers" : ["file"] }, "openlmi.software.yumdb": { @@ -401,6 +403,7 @@ class YumDB(singletonmixin.Singleton): uplink = Queue() downlink = Queue() self._process = YumWorker(uplink, downlink, + indication_manager=IndicationManager.get_instance(), yum_kwargs=self._yum_kwargs) #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) self._process.start() @@ -590,35 +593,75 @@ class YumDB(singletonmixin.Singleton): # ************************************************************************* @job_request() def get_job(self, jobid): + """ + Return instance of ``YumJob`` with given ``jobid``. + """ return self._do_job(jobs.YumJobGet(jobid)) @job_request() def get_job_list(self): + """ + Return list of all asynchronous jobs. + """ return self._do_job(jobs.YumJobGetList()) @job_request() def get_job_by_name(self, name): + """ + Return asynchronous job filtered by its name. + """ return self._do_job(jobs.YumJobGetByName(name)) @job_request() def set_job_priority(self, jobid, priority): + """ + Change priority of asynchronous job. This will change + its order in queue, if it is still enqeueued. + + Return object of job. + """ return self._do_job(jobs.YumJobSetPriority(jobid, priority)) @job_request() def update_job(self, jobid, **kwargs): + """ + Update metadata of job. + + :param kwargs: (``dict``) Is a dictionary of job's property names + with mapped new values. Only keys given will be changed in + desired job. + + **Note** that only keys, that do not affect job's priority or its + scheduling for deletion can be changed. See :ref:`YumJobUpdate`. + """ return self._do_job(jobs.YumJobUpdate(jobid, **kwargs)) @job_request() def reschedule_job(self, jobid, delete_on_completion, time_before_removal): + """ + Change the scheduling of job for deletion. + + :param delete_on_completion: (``bool``) Says, whether the job will + be scheduled for deletion at ``finished + time_before_removal`` + time. + :param time_before_removal: (``int``) Number of seconds, after the job + is finished, it will be kept alive. + """ return self._do_job(jobs.YumJobReschedule(jobid, delete_on_completion, time_before_removal)) @job_request() def delete_job(self, jobid): + """ + Delete job object. This can be called only on finished job. + """ return self._do_job(jobs.YumJobDelete(jobid)) @job_request() def terminate_job(self, jobid): + """ + Terminate job. This can be called only on *NEW* job. + """ return self._do_job(jobs.YumJobTerminate(jobid)) diff --git a/src/software/openlmi/software/yumdb/errors.py b/src/software/openlmi/software/yumdb/errors.py index 101c40b..a6824b9 100644 --- a/src/software/openlmi/software/yumdb/errors.py +++ b/src/software/openlmi/software/yumdb/errors.py @@ -89,3 +89,6 @@ class JobNotFound(JobControlError): class InvalidJobState(JobControlError): pass +class IndicationError(YumDBError): + pass + diff --git a/src/software/openlmi/software/yumdb/jobmanager.py b/src/software/openlmi/software/yumdb/jobmanager.py index 872e81f..7283d05 100644 --- a/src/software/openlmi/software/yumdb/jobmanager.py +++ b/src/software/openlmi/software/yumdb/jobmanager.py @@ -19,9 +19,16 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -This is a module for JobManager which is a separate thread of -YumWorker process. It keeps a cache of asynchronous jobs and handles +This is a module for ``JobManager`` which is a separate thread of +``YumWorker`` process. It keeps a cache of asynchronous jobs and handles input and output queues. + +This module uses its own logging facilities because it runs in separeted +process not having access to broker logging features. + +Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should +be set to callable taking ``YumJob`` instance and returning +``LMI_SoftwareInstallationJob``. """ import heapq import inspect @@ -32,14 +39,86 @@ import threading import time import traceback +from openlmi.common import cmpi_logging +from openlmi.common.IndicationManager import IndicationManager from openlmi.software.yumdb import errors, jobs from openlmi.software.yumdb.util import trace_function +# This is a callable, which must be initialized before JobManager is used. +# It should be a pointer to function, which takes a job and returns +# corresponding CIM instance. It's used for sending indications. +JOB_TO_MODEL = lambda job: None +JOB_CLASSNAME = "LMI_SoftwareInstallationJob" + # Minimum time to keep asynchronous job in cache after completion. In seconds. MINIMUM_TIME_BEFORE_REMOVAL = 10 +# replacement for cmpi_logging.logger LOG = None +IND_JOB_PERCENT_UPDATED = "PercentUpdated" +IND_JOB_SUCCEEDED = "Succeeded" +IND_JOB_FAILED = "Failed" +IND_JOB_CHANGED = "Changed" +IND_JOB_CREATED = "Created" + +IND_FILTERS = { + IND_JOB_PERCENT_UPDATED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::PercentComplete <> " + "PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for a " + "Concrete Job.", + }, + IND_JOB_SUCCEEDED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance ISA LMI_SoftwareInstallationJob AND " + "SourceInstance.CIM_ConcreteJob::JobState = 17", + # symbolic constants not supported by sfcb + #"CIM_ConcreteJob.JobState#'Completed'" + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'OK'.", + }, + IND_JOB_FAILED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState = 10", + # symbolic constants not supported by sfcb + #"CIM_ConcreteJob.JobState#'Exception'", + "Description": "Modification of Operational Status for a " + "Concrete Job to 'Complete' and 'Error'.", + }, + IND_JOB_CHANGED: { + "Query" : "SELECT * FROM CIM_InstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState <> " + "PreviousInstance.CIM_ConcreteJob::JobState", + "Description": "Modification of Job State for a ConcreteJob.", + }, + IND_JOB_CREATED: { + "Query" : "SELECT * FROM CIM_InstCreation WHERE " + "SourceInstance ISA %(classname)s", + "Description": "Creation of a ConcreteJob.", + }, +} + +@cmpi_logging.trace_function +def register_filters(indication_manager): + """ + This function registers static indication filters at IndicationManager. + It should be called upon provider's initialization. + """ + to_register = {} + for fltr_id, fltr in IND_FILTERS.items(): + if not indication_manager.is_registered(JOB_CLASSNAME, fltr_id): + fltr["Query"] = fltr["Query"] % {"classname" : JOB_CLASSNAME } + to_register[fltr_id] = fltr + if to_register: + indication_manager.add_filters(JOB_CLASSNAME, + to_register, ensure_installed=False) + # ***************************************************************************** # Decorators # ***************************************************************************** @@ -84,6 +163,119 @@ def job_handler(job_from_target=True): else: return _simple_wrapper +class JobIndicationSender(object): + """ + Makes creation and sending of indications easy. It keeps a reference + to job, which can be *snapshotted* for making CIM instance out of it. + These instances are then used to send indications via IndicationManager. + + Typical usage:: + + sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2]) + ... # modify job + sender.snapshot() + sender.send() + + **Note** that number of kept CIM instances won't exceed 2. First one + is created upon instantiation and the second one be calling + ``snapshot()``. Any successive call to ``snapshot()`` will overwrite + the second instance. + """ + + def __init__(self, indication_manager, job, + indications=IND_JOB_CHANGED, new=None): + """ + :param job (``YumJob``) Is job instance, which will be immediately + snapshoted as old instance and later as a new one. + :param indications (``list``) Can either be a list of indication ids + or a single indication id. + :param new (``YumJob``) A job instance stored as new. + """ + if not isinstance(indication_manager, IndicationManager): + raise TypeError("indication_manager must be a subclass of" + " IndicationManager") + if not isinstance(job, jobs.YumJob): + raise TypeError("job must be an instance of YumJob") + if not new is None and not isinstance(new, jobs.YumJob): + raise TypeError("new must be an instance of YumJob") + self._indication_manager = indication_manager + self._job = job + self._old_instance = JOB_TO_MODEL(job) + if new is not None: + new = JOB_TO_MODEL(job) + self._new_instance = new + self._indications = set() + self.indication_ids = indications + + @property + def job(self): + """ + Return instance of ``YumJob``. + """ + return self._job + + @property + def indication_ids(self): + """ + Return set of indication filter IDs. + """ + return self._indications.copy() + + @indication_ids.setter + def indication_ids(self, indication_ids): + """ + Set the indication filter IDs. + + :param indication_ids (``list``) Can be even single id. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications = set(indication_ids) + + @trace_function + def add_indication_ids(self, indication_ids): + """ + Add filter IDs. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications.update(indication_ids) + + @trace_function + def snapshot(self): + """ + Make a second CIM instance, overwriting previous one (not the first). + """ + self._new_instance = JOB_TO_MODEL(self._job) + + @trace_function + def send(self, make_snapshot=False): + """ + Send all requested indications for given job. + """ + if not self._indications: + raise errors.IndicationError( + "can not send any indication without id") + if make_snapshot: + self.snapshot() + if ( IND_JOB_CHANGED in self._indications + and self._new_instance is None): + raise errors.IndicationError("no snapshot made for modified job") + for fltr_id in self._indications: + if fltr_id == IND_JOB_CREATED: + LOG.debug("sending instance creation indication for job %s", + self._job) + self._indication_manager.send_instcreation( + self._new_instance if self._new_instance is not None + else self._old_instance, + fltr_id) + else: + LOG.debug("sending instance modification indication for job %s" + " with ID: %s", self._job, fltr_id) + self._indication_manager.send_instmodification( + self._old_instance, self._new_instance, + fltr_id) + class JobManager(threading.Thread): """ Separate thread for managing queue of jobs requested by client. @@ -105,10 +297,11 @@ class JobManager(threading.Thread): ACTION_NAMES = ['remove'] - def __init__(self, queue_in, queue_out): + def __init__(self, queue_in, queue_out, indication_manager): threading.Thread.__init__(self, name="JobManager") self._queue_in = queue_in self._queue_out = queue_out + self._indication_manager = indication_manager self._terminate = False # (time, jobid, action) @@ -175,7 +368,9 @@ class JobManager(threading.Thread): LOG.debug('job %s enqued for YumWorker to handle', job) heapq.heappush(self._job_queue, job) if getattr(job, 'async', False) is True: + ind = self._prepare_indication_for(job, IND_JOB_CREATED) self._async_jobs[job.jobid] = job + ind.send() self._job_enqueued.notify() @trace_function @@ -214,6 +409,14 @@ class JobManager(threading.Thread): msg = "unsupported action: %s" % action raise ValueError(msg) + @trace_function + def _prepare_indication_for(self, job, *args, **kwargs): + """ + Return instance of ``JobIndicationSender``. + """ + return JobIndicationSender(self._indication_manager, job, + *args, **kwargs) + # ************************************************************************* # Job handlers # ************************************************************************* @@ -245,9 +448,11 @@ class JobManager(threading.Thread): if not isinstance(new_priority, (int, long)): raise TypeError('priority must be an integer') if job.priority != new_priority: + ind = self._prepare_indication_for(job) job.update(priority=new_priority) if job in self._job_queue: heapq.heapify(self._job_queue) + ind.send(True) return job @job_handler() @@ -266,6 +471,7 @@ class JobManager(threading.Thread): del self._calendar[i] heapq.heapify(self._calendar) break + ind = self._prepare_indication_for(job) if delete_on_completion: schedule_at = time_before_removal if job.finished: @@ -273,6 +479,7 @@ class JobManager(threading.Thread): self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE) job.delete_on_completion = delete_on_completion job.time_before_removal = time_before_removal + ind.send(True) return job @job_handler() @@ -280,7 +487,9 @@ class JobManager(threading.Thread): """ Updates any job metadata. """ + ind = self._prepare_indication_for(job) job.update(**data) + ind.send(True) return job @job_handler() @@ -311,7 +520,9 @@ class JobManager(threading.Thread): raise errors.InvalidJobState('job "%s" already finished' % job) self._job_queue.remove(job) heapq.heapify(self._job_queue) + ind = self._prepare_indication_for(job) job.finish(result=job.RESULT_TERMINATED) + ind.send(True) LOG.info('terminated not started job "%s"', job) return job @@ -344,6 +555,9 @@ class JobManager(threading.Thread): if job.state != job.RUNNING: raise errors.InvalidJobState( 'can not finish not started job "%s"' % job) + if getattr(job, 'async', False): + ind = self._prepare_indication_for(job, + (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED)) job.finish(result, result_data) if getattr(job, 'async', False): if job.delete_on_completion: @@ -351,6 +565,11 @@ class JobManager(threading.Thread): , MINIMUM_TIME_BEFORE_REMOVAL) self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE) + if result == job.RESULT_SUCCESS: + ind.add_indication_ids(IND_JOB_SUCCEEDED) + elif result == job.RESULT_ERROR: + ind.add_indication_ids(IND_JOB_FAILED) + ind.send(True) else: LOG.debug("sending reply for %s: (%s, %s)", job, job.ResultNames[job.result], job.result_data) @@ -377,7 +596,13 @@ class JobManager(threading.Thread): raise Queue.Empty job = heapq.heappop(self._job_queue) if job is not None: - job.start() + if getattr(job, "async", False): + ind = self._prepare_indication_for(job, + (IND_JOB_CHANGED, IND_JOB_PERCENT_UPDATED)) + job.start() + ind.send(True) + else: + job.start() return job def run(self): diff --git a/src/software/openlmi/software/yumdb/jobs.py b/src/software/openlmi/software/yumdb/jobs.py index a46a2ca..84fd3ed 100644 --- a/src/software/openlmi/software/yumdb/jobs.py +++ b/src/software/openlmi/software/yumdb/jobs.py @@ -293,7 +293,14 @@ class YumJobSetPriority(YumJobOnJob): #pylint: disable=R0903 self.new_priority = priority class YumJobUpdate(YumJobOnJob): #pylint: disable=R0903 - """Update job's metadata.""" + """ + .. _YumJobUpdate: + + Update job's metadata. There are some forbidden properties, that + can not be changed in this way. Those are all affecting job's priority + and its scheduling for deletion. Plus any that store job's state. + All forbidden properties are listed in ``FORBIDDEN_PROPERTIES``. + """ __slots__ = ('data', ) FORBIDDEN_PROPERTIES = ( 'async', 'jobid', 'created', 'started', 'priority', 'finished', diff --git a/src/software/openlmi/software/yumdb/process.py b/src/software/openlmi/software/yumdb/process.py index 0b0c4c1..8ae6e46 100644 --- a/src/software/openlmi/software/yumdb/process.py +++ b/src/software/openlmi/software/yumdb/process.py @@ -210,10 +210,11 @@ class YumWorker(Process): def __init__(self, queue_in, queue_out, + indication_manager, yum_kwargs=None, logging_config=None): Process.__init__(self, name="YumWorker") - self._jobmgr = JobManager(queue_in, queue_out) + self._jobmgr = JobManager(queue_in, queue_out, indication_manager) self._session_level = 0 self._session_ended = False diff --git a/src/software/openlmi/software/yumdb/util.py b/src/software/openlmi/software/yumdb/util.py index f2af151..a1ab338 100644 --- a/src/software/openlmi/software/yumdb/util.py +++ b/src/software/openlmi/software/yumdb/util.py @@ -1,12 +1,48 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# + +""" +Common utilities meant to be used only be ``yumdb`` subpackage. +""" + from itertools import chain import inspect import logging import os -from openlmi.software.yumdb import errors +from openlmi.common import cmpi_logging class DispatchingFormatter: + """ + Formatter class for logging module. It allows to predefine different + format string for paricular module names. + """ def __init__(self, formatters, default): + """ + *format* in parameters description can be either ``string`` or + another formatter object. + + :param formatters (``dict``) Mapping of module names to *format*. + :param default Default *format*. + """ for k, formatter in formatters.items(): if isinstance(formatter, basestring): formatters[k] = logging.Formatter(formatter) @@ -16,6 +52,9 @@ class DispatchingFormatter: self._default_formatter = default def format(self, record): + """ + Interface for logging module. + """ formatter = self._formatters.get(record.name, self._default_formatter) return formatter.format(record) @@ -78,20 +117,47 @@ def trace_function(func): logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s" " %(module)s:%(func)s:%(lineno)d", logargs) except Exception as exc: - logargs['action'] = 'exiting' - logargs['error'] = str(exc) - logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s" - " %(module)s:%(func)s:%(lineno)d with error: %(error)s", - logargs) + if logger.isEnabledFor(logging.DEBUG): + logargs['action'] = 'exiting' + logargs['error'] = str(exc) + logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s" + " %(module)s:%(func)s:%(lineno)d with error: %(error)s", + logargs) raise return result return _wrapper def setup_logging(config): + """ + This is meant to be used by ``YumWorker`` process to setup logging + independent of what providers are using. Unfortunately ``YumWorker`` + can not use the same facilities as the rest of program, because + logging is done through *broker*. + """ try: logging.config.dictConfig(config) + cmpi_logging.logger = logging.getLogger('openlmi.software.yumdb') except Exception: #pylint: disable=W0703 # logging is not set up but client expects us to work - pass + # all messages are dumped to /dev/null + logging.config.dictConfig({ + 'version' : 1, + 'disable_existing_loggers' : True, + 'handlers': { + 'null' : { + 'class': 'logging.handlers.FileHandler', + 'level': 'CRITICAL', + 'filename': '/dev/null' + } + }, + 'loggers' : { + 'root' : { + 'level': 'CRITICAL', + 'handlers' : ['null'], + 'propagate' : False + } + } + }) + diff --git a/src/software/test/test_hosted_software_collection.py b/src/software/test/test_hosted_software_collection.py index ccd00b8..c260210 100755 --- a/src/software/test/test_hosted_software_collection.py +++ b/src/software/test/test_hosted_software_collection.py @@ -57,7 +57,7 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase): classname="LMI_SystemSoftwareCollection", namespace="root/cimv2", keybindings=pywbem.NocaseDict({ - "InstanceID" : "LMI:SystemSoftwareCollection" + "InstanceID" : "LMI:LMI_SystemSoftwareCollection" })) return objpath diff --git a/src/software/test/test_installed_software_identity.py b/src/software/test/test_installed_software_identity.py index ade841b..72bb7cd 100755 --- a/src/software/test/test_installed_software_identity.py +++ b/src/software/test/test_installed_software_identity.py @@ -53,7 +53,7 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): classname="LMI_SoftwareIdentity", namespace="root/cimv2", keybindings=pywbem.NocaseDict({ - "InstanceID" : 'LMI:SoftwareIdentity:' + pkg.get_nevra(newer=newer, + "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.get_nevra(newer=newer, with_epoch="ALWAYS") })) return objpath @@ -104,7 +104,7 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): self.assertEqual(objpath["System"], iname["System"]) nevra_set = set(i["InstalledSoftware"]["InstanceID"] for i in inames) for pkg in self.safe_pkgs: - nevra = 'LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") + nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") self.assertTrue(nevra in nevra_set, 'Missing nevra "%s".' % nevra) @@ -126,7 +126,7 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): self.assertEqual(inst[key], inst.path[key]) nevra_set = set(i["InstalledSoftware"]["InstanceID"] for i in insts) for pkg in self.safe_pkgs: - nevra = 'LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") + nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") self.assertTrue(nevra in nevra_set, "Missing pkg %s in nevra_set." % nevra) @@ -143,21 +143,21 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): inames1 = self.conn.EnumerateInstanceNames( ClassName=self.CLASS_NAME) self.assertGreater(len(inames1), 1) - self.assertIn('LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), + self.assertIn('LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), set(i["InstalledSoftware"]["InstanceID"] for i in inames1)) rpmcache.remove_pkg(pkg.name) inames2 = self.conn.EnumerateInstanceNames( ClassName=self.CLASS_NAME) self.assertEqual(len(inames1), len(inames2) + 1) - self.assertNotIn('LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), + self.assertNotIn('LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), set(i["InstalledSoftware"]["InstanceID"] for i in inames2)) rpmcache.install_pkg(pkg) inames3 = self.conn.EnumerateInstanceNames( ClassName=self.CLASS_NAME) self.assertEqual(len(inames1), len(inames3)) - self.assertIn('LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), + self.assertIn('LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), set(i["InstalledSoftware"]["InstanceID"] for i in inames3)) @base.mark_dangerous @@ -221,11 +221,11 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): self.assertEqual(ref.namespace, 'root/cimv2') self.assertEqual(ref.classname, "LMI_SoftwareIdentity") self.assertEqual(sorted(ref.keys()), ["InstanceID"]) - self.assertTrue(ref["InstanceID"].startswith("LMI:SoftwareIdentity:")) + self.assertTrue(ref["InstanceID"].startswith("LMI:LMI_SoftwareIdentity:")) nevra_set = set(i["InstanceID"] for i in refs) for pkg in self.safe_pkgs: - nevra = 'LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") + nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") self.assertTrue(nevra in nevra_set, 'Missing nevra "%s".' % nevra) diff --git a/src/software/test/test_member_of_software_collection.py b/src/software/test/test_member_of_software_collection.py index 0f6bac8..50c3b86 100755 --- a/src/software/test/test_member_of_software_collection.py +++ b/src/software/test/test_member_of_software_collection.py @@ -44,13 +44,13 @@ class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase): classname="LMI_SystemSoftwareCollection", namespace="root/cimv2", keybindings=pywbem.NocaseDict({ - "InstanceID" : "LMI:SystemSoftwareCollection" + "InstanceID" : "LMI:LMI_SystemSoftwareCollection" })) objpath["Member"] = pywbem.CIMInstanceName( classname="LMI_SoftwareIdentity", namespace="root/cimv2", keybindings=pywbem.NocaseDict({ - "InstanceID" : 'LMI:SoftwareIdentity:' + pkg.get_nevra(newer=newer, + "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.get_nevra(newer=newer, with_epoch="ALWAYS") })) return objpath @@ -85,7 +85,7 @@ class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase): # self.assertEqual(objpath["Collection"], iname["Collection"]) # nevra_set = set(i["Member"]["InstanceID"] for i in inames) # for pkg in self.safe_pkgs: -# nevra = 'LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") +# nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") # self.assertTrue(nevra in nevra_set, # 'Missing nevra "%s".' % nevra) @@ -123,11 +123,11 @@ class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase): # self.assertEqual(ref.namespace, 'root/cimv2') # self.assertEqual(ref.classname, "LMI_SoftwareIdentity") # self.assertEqual(sorted(ref.keys()), ["InstanceID"]) -# self.assertTrue(ref["InstanceID"].startswith("LMI:SoftwareIdentity:")) +# self.assertTrue(ref["InstanceID"].startswith("LMI:LMI_SoftwareIdentity:")) # nevra_set = set(i["InstanceID"] for i in refs) # # NOTE: installed packages might not be available # for pkg in self.dangerous_pkgs: -# nevra = 'LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") +# nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS") # self.assertTrue(nevra in nevra_set, # 'Missing nevra "%s".' % nevra) # diff --git a/src/software/test/test_resource_for_software_identity.py b/src/software/test/test_resource_for_software_identity.py index 0a6afbb..d38da46 100755 --- a/src/software/test/test_resource_for_software_identity.py +++ b/src/software/test/test_resource_for_software_identity.py @@ -63,7 +63,7 @@ class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase): namespace="root/cimv2") if pkg is not None: objpath["ManagedElement"]["InstanceID"] = \ - 'LMI:SoftwareIdentity:' + pkg.get_nevra(newer=newer, with_epoch="ALWAYS") + 'LMI:LMI_SoftwareIdentity:' + pkg.get_nevra(newer=newer, with_epoch="ALWAYS") return objpath @base.mark_dangerous @@ -105,13 +105,13 @@ class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase): self.assertEqual(ref.namespace, 'root/cimv2') self.assertEqual(ref.classname, "LMI_SoftwareIdentity") self.assertEqual(sorted(ref.keys()), ["InstanceID"]) - self.assertTrue(ref["InstanceID"].startswith("LMI:SoftwareIdentity:")) + self.assertTrue(ref["InstanceID"].startswith("LMI:LMI_SoftwareIdentity:")) nevra_set = set(i["InstanceID"] for i in refs) # NOTE: installed packages might not be available for pkg, up in ((pkg, up) for pkg in self.dangerous_pkgs for up in (True, False)): - nevra = 'LMI:SoftwareIdentity:'+pkg.get_nevra( + nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra( newer=up, with_epoch="ALWAYS") reponame = getattr(pkg, 'up_repo' if up else 'repo') if reponame == repo.repoid: @@ -139,7 +139,7 @@ class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase): self.assertEqual(ref.namespace, 'root/cimv2') self.assertEqual(ref.classname, "LMI_SoftwareIdentity") self.assertEqual(sorted(ref.keys()), ["InstanceID"]) - self.assertTrue(ref["InstanceID"].startswith("LMI:SoftwareIdentity:")) + self.assertTrue(ref["InstanceID"].startswith("LMI:LMI_SoftwareIdentity:")) @base.mark_dangerous def test_get_managed_element_referents(self): diff --git a/src/software/test/test_software_identity.py b/src/software/test/test_software_identity.py index cf92bab..3b94e01 100755 --- a/src/software/test/test_software_identity.py +++ b/src/software/test/test_software_identity.py @@ -45,7 +45,7 @@ class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904 @return object path of SoftwareIdentity """ objpath = self.objpath.copy() - objpath["InstanceID"] = 'LMI:SoftwareIdentity:'+pkg.get_nevra(newer, "ALWAYS") + objpath["InstanceID"] = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(newer, "ALWAYS") return objpath @base.mark_dangerous @@ -120,7 +120,7 @@ class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904 self.assertEqual(sorted(iname.keys()), sorted(self.KEYS)) nevra_set = set(i["InstanceID"] for i in inames) for pkg in self.safe_pkgs: - self.assertIn('LMI:SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), + self.assertIn('LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(with_epoch="ALWAYS"), nevra_set) # @base.mark_tedious @@ -141,7 +141,7 @@ class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904 # nevra_set.add(inst["InstanceID"]) # name_set.add(inst["Name"]) # for pkg in self.safe_pkgs: -# self.assertIn("LMI:SoftwareIdentity:"+pkg.get_nevra(with_epoch="ALWAYS"), +# self.assertIn("LMI:LMI_SoftwareIdentity:"+pkg.get_nevra(with_epoch="ALWAYS"), # nevra_set) # self.assertIn(pkg.name, name_set) diff --git a/src/software/test/test_software_identity_resource.py b/src/software/test/test_software_identity_resource.py index 34e5008..5c3c329 100755 --- a/src/software/test/test_software_identity_resource.py +++ b/src/software/test/test_software_identity_resource.py @@ -106,7 +106,7 @@ class TestSoftwareIdentityResource( self.assertEqual(5, inst["HealthState"]) self.assertIsInstance(inst["GPGCheck"], bool) self.assertEqual(200, inst["InfoFormat"]) - self.assertEqual("LMI:SoftwareIdentityResource:"+repo.repoid, + self.assertEqual("LMI:LMI_SoftwareIdentityResource:"+repo.repoid, inst["InstanceID"]) if repo.mirror_list is None and repo.metalink is None: self.assertIsNone(inst["MirrorList"]) diff --git a/src/software/test/test_system_software_collection.py b/src/software/test/test_system_software_collection.py index d68da47..bf33c56 100755 --- a/src/software/test/test_system_software_collection.py +++ b/src/software/test/test_system_software_collection.py @@ -41,7 +41,7 @@ class TestSystemSoftwareCollection( @return object path of SoftwareIdentity """ objpath = self.objpath.copy() - objpath["InstanceID"] = "LMI:SystemSoftwareCollection" + objpath["InstanceID"] = "LMI:LMI_SystemSoftwareCollection" return objpath def test_get_instance(self): |