summaryrefslogtreecommitdiffstats
path: root/src/python/lmi
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/lmi')
-rw-r--r--src/python/lmi/__init__.py1
-rw-r--r--src/python/lmi/common/IndicationManager.py758
-rw-r--r--src/python/lmi/common/JobManager.py1654
-rw-r--r--src/python/lmi/common/TimerManager.py421
-rw-r--r--src/python/lmi/common/__init__.py42
-rw-r--r--src/python/lmi/common/cmpi_logging.py204
-rw-r--r--src/python/lmi/common/singletonmixin.py560
7 files changed, 3640 insertions, 0 deletions
diff --git a/src/python/lmi/__init__.py b/src/python/lmi/__init__.py
new file mode 100644
index 0000000..de40ea7
--- /dev/null
+++ b/src/python/lmi/__init__.py
@@ -0,0 +1 @@
+__import__('pkg_resources').declare_namespace(__name__)
diff --git a/src/python/lmi/common/IndicationManager.py b/src/python/lmi/common/IndicationManager.py
new file mode 100644
index 0000000..0467227
--- /dev/null
+++ b/src/python/lmi/common/IndicationManager.py
@@ -0,0 +1,758 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+# -*- coding: utf-8 -*-
+"""
+ .. autoclass:: IndicationManager
+ :members:
+"""
+
+import pywbem
+from Queue import Queue
+import re
+import socket
+import threading
+
+import lmi.common.cmpi_logging as cmpi_logging
+from lmi.common import singletonmixin
+
+RE_FILTER_NAME = re.compile(r'^(?P<prefix>lmi:'
+ r'(?P<class_name>[a-z0-9_]+):)(?P<filter_id>.*)$', re.IGNORECASE)
+
+FILTER_DEFAULTS = {
+ "SourceNamespace" : "root/cimv2",
+ "SourceNamespaces" : ["root/cimv2"],
+ "QueryLanguage" : "CIM:CQL"
+}
+
+@cmpi_logging.trace_function
+def enumerate_namespaces(ch):
+ """
+ Return tuple ``([CIM_Namespace instance, ...], ns_interop)``. Where
+ first item is a list of object paths of all namespaces in broker and
+ the second is a name of namespace, where this information can be found.
+
+ :param ch: CIMOM handle.
+ """
+ nsclasses = ["CIM_Namespace", "__Namespace"]
+ namespaces = ['root/cimv2', 'root/PG_InterOp', 'Interop',
+ 'interop', 'root', 'root/interop']
+ nspaths = []
+ ns = None
+ for cls in nsclasses:
+ for ns in namespaces:
+ try:
+ nspaths = [nm for nm in ch.EnumerateInstanceNames(ns, cls)]
+ if nspaths:
+ break
+ except pywbem.CIMError as exc:
+ if exc[0] in (
+ pywbem.CIM_ERR_INVALID_NAMESPACE,
+ pywbem.CIM_ERR_NOT_SUPPORTED,
+ pywbem.CIM_ERR_INVALID_CLASS):
+ pass
+ if exc[0] == pywbem.CIM_ERR_FAILED:
+ cmpi_logging.logger.error("EnumerateInstanceNames failed"
+ " for %s:%s: %s", ns, cls, str(exc))
+ else:
+ raise
+ if nspaths:
+ break
+ if not nspaths:
+ cmpi_logging.logger.error("failed to enumerate namespaces")
+ ns = None
+ return (nspaths, ns)
+
+@cmpi_logging.trace_function
+def find_ns_interop(ch):
+ """
+ Return name of interop namespace, where ``CIM_IndicationFilter``
+ class reside.
+
+ :param ch: CIMOM handle.
+ """
+ _, ns_interop = enumerate_namespaces(ch)
+ return ns_interop
+
+def make_filter_name(class_name, fltr_id):
+ """
+ Return value for ``CIM_IndicationFilter.Name`` property.
+ """
+ return "LMI:%s:%s" % (class_name, fltr_id)
+
+def parse_filter_name(name):
+ """
+ Return tuple ``(class_name, filter_id)``.
+
+ :param name: (``string``) Value of cim filter's *Name* property.
+ """
+ match = RE_FILTER_NAME.match(name)
+ if not match:
+ raise ValueError('Could not parse filter name: "%s"' % name)
+ return (match.group("class_name"), match.group("filter_id"))
+
+@cmpi_logging.trace_function
+def make_indication_filter_path(class_name, fltr_id, ns_interop):
+ """
+ Return CIM_IndicationFilter instance path for given filter id.
+
+ :param class_name: (``string``) *Scoped class* name.
+ :param fltr_id: (``string``) Filter name.
+ :param ns_interop: (``string``) Interop namespace.
+ """
+ for arg in ('class_name', 'fltr_id', 'ns_interop'):
+ if not isinstance(locals()[arg], basestring):
+ raise TypeError("%s must be basestring" % arg)
+ cop = pywbem.CIMInstanceName("CIM_IndicationFilter",
+ namespace=ns_interop)
+ cop['CreationClassName'] = 'CIM_IndicationFilter'
+ cop['SystemCreationClassName'] = 'CIM_ComputerSystem'
+ cop['SystemName'] = socket.gethostname()
+ cop['Name'] = make_filter_name(class_name, fltr_id)
+ return cop
+
+@cmpi_logging.trace_function
+def remove_cimom_filter(ch, fltr_path):
+ """
+ Deletes instance of CIM_IndicationFilter installed at broker with all
+ referencing subscriptions.
+
+ Returns list of subscription instace names, that were deleted.
+
+ :param ch: CIMOM handle.
+ :param fltr_path: (``CIMInstanceName``) Path of ``CIM_IndicationFilter`` to
+ remove.
+ """
+ if not isinstance(fltr_path, pywbem.CIMInstanceName):
+ raise TypeError("fltr_path must be a CIMInstanceName")
+
+ referents = []
+ for ref in ch.AssociatorNames(fltr_path,
+ role="Filter",
+ resultRole="Handler",
+ resultClass="CIM_IndicationSubscription"):
+ ch.DeleteInstance(ref)
+ referents.append(ref)
+ ch.DeleteInstance(fltr_path)
+ cmpi_logging.logger.debug('removed indication filter "%s" with %d'
+ ' referents', fltr_path["Name"], len(referents))
+ return referents
+
+class IndicationManager(singletonmixin.Singleton):
+ """
+ Using ``IndicationManager`` class
+ providers can send indications without bothering with handling of
+ indication subscriptions.
+
+ Usage:
+
+ 1. Subclass CIM_InstCreation and CIM_InstModification.
+
+ 2. In your initialization routine, create one ``IndicationManager``
+ instance. E.g. one for whole ``LMI_Storage`` may is enough. Like
+ this::
+
+ indication_manager = \
+ IndicationManager.get_instance(env, "Storage", "root/cimv2")
+
+ 3. Call ``indication_manager.add_filters()`` with all filters your
+ providers support for particular CIM class. This method can be called
+ multiple times.
+ For example::
+
+ filters = {
+ "JobPercentUpdated": {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE"
+ " SourceInstance ISA LMI_StorageJob AND"
+ " SourceInstance.CIM_ConcreteJob::PercentComplete <>"
+ " PreviousInstance.CIM_ConcreteJob::PercentComplete",
+ "Description" : "Modification of Percentage Complete for"
+ " a Concrete Job.",
+ },
+ "JobSucceeded": {
+ "Query" : "SELECT * FROM CIM_InstModification WHERE"
+ " SourceInstance ISA LMI_StorageJob AND"
+ " SourceInstance.CIM_ConcreteJob::JobState = "
+ " CIM_ConcreteJob.JobState#'Completed'",
+ "Description": "Modification of Operational Status for"
+ " a Concrete Job to 'Complete' and 'OK'.",
+ },
+ #... other indications
+ }
+ instance_manager.add_filters("LMI_StorageJob", filters)
+
+ First argument is a name of class to which indications apply. We'll call
+ it *Scoping class*.
+
+ 4. In your provider module, implement indication functions like this::
+
+ def authorize_filter(env, fltr, ns, classes, owner):
+ indication_manager.authorize_filter(env, fltr, ns, classes, owner)
+
+ def activate_filter (env, fltr, ns, classes, first_activation):
+ indication_manager.activate_filter(env, fltr, ns, classes,
+ first_activation)
+
+ def deactivate_filter(env, fltr, ns, classes, last_activation):
+ indication_manager.deactivate_filter(env, fltr, ns, classes,
+ last_activation)
+
+ def enable_indications(env):
+ indication_manager.enable_indications(env)
+
+ def disable_indications(env):
+ indication_manager.disable_indications(env)
+
+ From now on, the ``IndicationManager`` will track all subscribed filters.
+ You can query the ``indication_manager.is_subscribed()`` before you create
+ and send an indication. Use ``indication_manager.send_indication()``
+ to send your indications.
+
+ Only static (=preconfigured, read-only) indication filters are
+ supported.
+
+ For user to use these preconfigured filters, they need to be installed
+ at broker as instances of ``CIM_IndicationFilter``. But since they can
+ not be guarded against removel by accident, this object provides a way
+ to reinstall them. But using this is not recomended, since it can upset
+ users. See :ref:`_update_context-label`.
+
+ The supported filters must be passed to add_filters method. The filters
+ are passed as dictionary ``'filter_id' -> {dictionary 'IndicationFilter
+ property' -> 'value'}``. There must be at least ``Query`` property in
+ each filter, CQL is assumed.
+
+ This helper automatically tracks which filters are subscribed. Provider
+ can query ``is_subscribed()`` to check, if filter with given
+ ``filter_id`` is subscribed before generating indications.
+
+ The CMPI interface to send indications is complicated -
+ when an indication is send from CIMOM callback (e.g. ``get_instance``),
+ it must use current ``env`` parameter of the callback and it would be
+ tedious to pass it to ``IndicationManager`` each time. Therefore
+ ``IndicationManager`` creates its own thread, registers it at CIMOM
+ using ``PrepareAttachThread``/``AttachThread``.
+
+ As side-effect, indication can be sent from any thread, there is no
+ need to call ``PrepareAttachThread``/``AttachThread``.
+ """
+ SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity
+
+ COMMAND_STOP = 1 # Command to the IndicationManager thread to stop.
+
+ @cmpi_logging.trace_method
+ def __init__(self, env, nameprefix, namespace, ns_interop=None,
+ queue=None):
+ """
+ Create new ``IndicationManager``. Usually only one instance
+ is necessary for one provider process.
+
+ :param env: (``ProviderEnvironment``) Provider enviroment, taken
+ from CIMOM callback (e.g. ``get_providers()``).
+ :param nameprefix: (``string``) Prefix of your ``CIM_InstCreation``
+ and ``CIM_InstModification`` subclasses, e.g. 'Storage' for
+ ``LMI_StorageInstCreation``.
+ :param namespace: (``string``) Namespace, which will be set to
+ outgoing indications instances.
+ :param ns_interop: (``string``) Namespace, where filters and
+ subscriptions are stored.
+ :param queue: Optional custom input queue with the same interface as
+ ``Queue.Queue``.
+ """
+
+ # { class_name :
+ # { filter_id : filter_properties
+ # , ... }
+ # }
+ self._filters = pywbem.NocaseDict()
+ self._enabled = False
+ # { (class_name, filter_id), ... }
+ self._subscribed_filters = set()
+ self._nameprefix = nameprefix
+ self._namespace = namespace
+ self._ns_interop = ns_interop
+ self._access_lock = threading.RLock()
+ self._env = env
+
+ if queue is None:
+ queue = Queue()
+ self._queue = queue
+ # prepare indication thread
+ ch = env.get_cimom_handle()
+ new_broker = ch.PrepareAttachThread()
+ self._indication_sender = threading.Thread(
+ target=self._send_indications_loop, args=(new_broker,))
+ self._indication_sender.daemon = False
+ self._indication_sender.start()
+
+ @property
+ def enabled(self):
+ """
+ Return a boolean saying, whether indication sending is enabled.
+ """
+ with self._access_lock:
+ return self.enabled
+
+ @property
+ def namespace(self):
+ """
+ Return namespace of outgoing indication instances.
+ """
+ return self._namespace
+
+ @property
+ def nameprefix(self):
+ """
+ Return prefix of indication class names.
+ """
+ return self._nameprefix
+
+ @property
+ def ns_interop(self):
+ """
+ Return interop namespace name.
+ """
+ with self._access_lock:
+ if self._ns_interop is None:
+ ch = self._env.get_cimom_handle()
+ self._ns_interop = find_ns_interop(ch)
+ cmpi_logging.logger.info('found interop namespace: %s',
+ self._ns_interop)
+ return self._ns_interop
+
+ @property
+ def instcreation_classname(self):
+ """
+ Return whole class name of InstCreation indication.
+ """
+ return "LMI_" + self._nameprefix + "InstCreation"
+
+ @property
+ def instmodification_classname(self):
+ """
+ Return whole class name of InstModification indication.
+ """
+ return "LMI_" + self._nameprefix + "InstModification"
+
+ @property
+ def instdeletetion_classname(self):
+ """
+ Return whole class name of InstDeletion indication.
+ """
+ return "LMI_" + self._nameprefix + "InstDeletion"
+
+ @cmpi_logging.trace_method
+ def _get_filter_inst(self, class_name, fltr_id):
+ """
+ Return instance of CIM_IndicationFilter registered in CIMOM if any.
+
+ :param class_name: (``string``) *Scoping class* name.
+ :param fltr_id: (``string``) Indication name.
+ """
+ ch = self._env.get_cimom_handle()
+ cop = make_indication_filter_path(class_name, fltr_id, self.ns_interop)
+ try:
+ return ch.GetInstance(cop)
+ except pywbem.CIMError as exc:
+ if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND:
+ return None
+ raise
+
+ @cmpi_logging.trace_method
+ def _ensure_cimom_has_filter(self, class_name, fltr_id):
+ """
+ Ensures, that cimom has ``fltr_id`` filter registered as instance.
+ If it has, but the query differs it is recreated at broker.
+
+ :param class_name: (``string``) *Scoping class* name.
+ :param fltr_id: (``string``) Indication name.
+ """
+ inst = self._get_filter_inst(class_name, fltr_id)
+ ch = self._env.get_cimom_handle()
+ installed = inst is not None
+ referents = []
+ if installed:
+ for prop_name, val in self._filters[class_name][fltr_id].items():
+ if inst[prop_name] != val:
+ cmpi_logging.logger.info("filter \"%s\" is installed, but"
+ " its property \"%s\" has outdated value;"
+ " removing...", fltr_id, prop_name)
+ referents = remove_cimom_filter(ch, inst.path)
+ installed = False
+ if not installed:
+ if inst is not None:
+ path = inst.path
+ else:
+ path = make_indication_filter_path(class_name, fltr_id,
+ self.ns_interop)
+ inst = pywbem.CIMInstance(path.classname, path=path)
+ kwargs = FILTER_DEFAULTS.copy()
+ for key, val in path.keybindings.items():
+ kwargs[key] = val
+ kwargs.update(self._filters[class_name][fltr_id])
+ inst.update(kwargs)
+ try:
+ inst = ch.CreateInstance(inst)
+ cmpi_logging.logger.info("filter \"%s\" installed", fltr_id)
+ except pywbem.CIMError:
+ cmpi_logging.logger.exception(
+ "failed to install indication filter \"%s\"",
+ fltr_id)
+ if referents:
+ cmpi_logging.logger.debug('reinstalling %d filter'
+ ' subscriptions', len(referents))
+ for ref in referents:
+ ch.CreateInstance(ref)
+ return inst
+
+ @cmpi_logging.trace_method
+ def _get_matching_filter(self, query):
+ """
+ Try to find matching filter properties in local ``_filters`` storage
+ and return it. ``None`` is returned if not found.
+
+ Return a tuple ``(class_name, filter_id, filter_properties)``.
+
+ :param query: (``string``) Is filter query.
+ """
+ if not isinstance(query, basestring):
+ raise TypeError("query must be a string")
+ for clsname, fltrs in self._filters.iteritems():
+ for fltr_id, props in fltrs.iteritems():
+ if query == props["Query"]:
+ return (clsname, fltr_id, props)
+ return None
+
+ @cmpi_logging.trace_method
+ def ensure_filters_installed(self, class_name=None, fltr_id=None):
+ """
+ This function checks for existence of filters at broker. Filters
+ must be registered with this instance before the check can be done.
+ Without arguments all registered filters will be checked.
+
+ :param class_name: (``string``) Name of *Scoped class* that reduces
+ searched filters.
+ :param fltr_id: (``string``) Indication name reducing filters that
+ will be checked.
+ """
+ cls_to_check = self._filters.keys()
+ if class_name is not None:
+ cls_to_check = [class_name]
+ filters_to_check = list(
+ (c, f)
+ for c in cls_to_check
+ for f in self._filters[c].keys()
+ if fltr_id is None or fltr_id == f)
+ with self._access_lock:
+ try:
+ for clsname, fltr_id in filters_to_check:
+ self._ensure_cimom_has_filter(clsname, fltr_id)
+ cmpi_logging.logger.debug('filters installed')
+ return True
+ except pywbem.CIMError as exc:
+ if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED:
+ cmpi_logging.logger.error("filters could not be checked"
+ " for presence due to invalid context")
+ return False
+ raise
+
+ @cmpi_logging.trace_method
+ def update_context(self, env):
+ """
+ .. _update_context-label
+
+ When ``IndicationManager`` is initialized upon provider initialization,
+ the conxet given does not contain any user credentials that are
+ needed for communication with broker. In order to check for filter's
+ existence at broker, this method needs to be called first with
+ context containing user's credentials.
+
+ This needs to be called only once.
+
+ **Note** that if you don't plan to check for filter's presence at
+ broker at runtime, you are not interested in this function.
+ """
+ with self._access_lock:
+ self._env = env
+
+ @cmpi_logging.trace_method
+ def add_filters(self, class_name, filters, ensure_installed=False):
+ """
+ Add new filters to the helper. These filters will be allowed for
+ subscription.
+
+ :param filters: (``dictionary filter_id -> filter properties``)
+ The filters. ``filter properties`` is dictionary
+ ``property_name -> value``, where at least ``Query`` property
+ must be set. ``Name`` property will be automatically created
+ as 'LMI:<class_name>:<filter_id>'.
+ :param ensure_installed: (``bool``) Whether to check for filter presence
+ at broker and install them if missing. **Note** That in order
+ for this to work, the context must be updated with user's
+ credentials. See :ref:`update_context-label`.
+ """
+ with self._access_lock:
+ if not class_name in self._filters:
+ self._filters[class_name] = pywbem.NocaseDict()
+ self._filters[class_name].update(filters)
+ if ensure_installed:
+ self.ensure_filters_installed(class_name=class_name)
+
+ @cmpi_logging.trace_method
+ def authorize_filter(self, _env, fltr, _class_name, _op, _owner):
+ """
+ AuthorizeFilter callback from CIMOM. Call this method from appropriate
+ CIMOM callback
+
+ It asks us to verify whether this filter is allowed.
+
+ :param fltr: Contains the filter that must be authorized.
+ :param _class_name: (``String``) Contains the class name extracted
+ from the filter FROM clause.
+ :param _op: The name of the class for which monitoring is required.
+ Only the namespace part is set if className is a process indication.
+ :param _owner The owner argument is the destination owner.
+ """
+ with self._access_lock:
+ res = self._get_matching_filter(fltr)
+ if res is not None:
+ self._subscribed_filters.add((res[0], res[1]))
+ cmpi_logging.logger.info("InstanceFilter %s: %s authorized",
+ make_filter_name(res[0], res[1]), fltr)
+ return True
+ return False
+
+ @cmpi_logging.trace_method
+ def activate_filter(self, _env, fltr, _class_name, _class_path,
+ first_activation):
+ """
+ ActivateFilter callback from CIMOM. Call this method from appropriate
+ CIMOM callback.
+
+ It ask us to begin monitoring a resource. The function shall begin
+ monitoring the resource according to the filter express only.
+
+ :param fltr: The filter argument contains the filter specification
+ for this subscription to become active.
+ :param _class_name: (``String``) The class name extracted from the filter
+ FROM clause.
+ :param _class_path: (``CIMInstanceName``) The name of the class for
+ which monitoring is required. Only the namespace part is set if
+ eventType is a process indication.
+ :param first_activation: (``bool``) Set to true if this is the first
+ filter for className.
+ """
+ with self._access_lock:
+ if not first_activation:
+ return
+ res = self._get_matching_filter(fltr)
+ if res is not None:
+ self._subscribed_filters.add((res[0], res[1]))
+ cmpi_logging.logger.info("InstanceFilter %s: %s started",
+ make_filter_name(res[0], res[1]), fltr)
+
+ @cmpi_logging.trace_method
+ def deactivate_filter(self, _env, fltr, _class_name, _class_path,
+ last_activation):
+ """
+ DeactivateFilter callback from CIMOM. Call this method from appropriate
+ CIMOM callback.
+
+ Informs us that monitoring using this filter should stop.
+
+ :param fltr: The filter argument contains the filter specification for
+ this subscription to become active.
+ :param class_name: (``String``) The class name extracted from the filter
+ FROM clause.
+ :param class_path: (``CIMInstanceName``) class_path The name of the
+ class for which monitoring is required. Only the namespace part is
+ set if className is a process indication.
+ :last_activation: (``bool``) Set to true if this is the last filter for
+ className.
+ """
+ with self._access_lock:
+ if not last_activation:
+ return
+ res = self._get_matching_filter(fltr)
+ if res is not None:
+ self._subscribed_filters.remove((res[0], res[1]))
+ cmpi_logging.logger.info("InstanceFilter %s: %s stopped",
+ make_filter_name(res[0], res[1]), fltr)
+
+ @cmpi_logging.trace_method
+ def enable_indications(self, _env):
+ """
+ EnableIndications callback from CIMOM. Call this method from
+ appropriate CIMOM callback.
+
+ Tells us that indications can now be generated. The MB is now prepared
+ to process indications. The function is normally called by the MB after
+ having done its intialization and processing of persistent subscription
+ requests.
+ """
+ with self._access_lock:
+ self._enabled = True
+ cmpi_logging.logger.info("Indications enabled")
+
+ @cmpi_logging.trace_method
+ def disable_indications(self, _env):
+ """
+ EnableIndications callback from CIMOM. Call this method from
+ appropriate CIMOM callback.
+
+ Tells us that we should stop generating indications. MB will not accept
+ any indications until enabled again. The function is normally called
+ when the MB is shutting down indication services either temporarily or
+ permanently.
+ """
+ with self._access_lock:
+ self._enabled = False
+ cmpi_logging.logger.info("Indications disabled")
+
+ @cmpi_logging.trace_method
+ def send_indication(self, indication):
+ """
+ Send indication to all subscribers. Call this method from appropriate
+ CIMOM callback.
+ """
+ self._queue.put(indication)
+
+ @cmpi_logging.trace_method
+ def send_instcreation(self, instance, filter_id):
+ """
+ Send ``LMI_<nameprefix>InstCreation`` indication with given instance.
+
+ :param instance: (``CIMInstance``) The created instance.
+ :param filter_id: (``string``) The ID of registered filter which
+ corresponds to this indication.
+ """
+ if not self.is_subscribed(instance.classname, filter_id):
+ return
+ path = pywbem.CIMInstanceName(
+ classname=self.instcreation_classname,
+ namespace=self.namespace)
+ ind = pywbem.CIMInstance(
+ self.instcreation_classname,
+ path=path)
+ ind['SourceInstance'] = instance
+ ind['SourceInstanceHost'] = socket.gethostname()
+ ind['SourceInstanceModelPath'] = str(instance.path)
+ ind['IndicationFilterName'] = make_filter_name(
+ instance.classname, filter_id)
+ ind['PerceivedSeverity'] = self.SEVERITY_INFO
+
+ cmpi_logging.logger.info("Sending indication %s for %s" %
+ (ind["IndicationFilterName"], str(path)))
+ self.send_indication(ind)
+
+ @cmpi_logging.trace_method
+ def send_instmodification(self, old_instance, new_instance, filter_id):
+ """
+ Send ``LMI_<nameprefix>InstModification`` indication with given
+ instance.
+
+ :param old_instance: (``CIMInstance``) The instance before
+ modification.
+ :param new_instance: (``CIMInstance``) The instance after modification.
+ :param filter_id: (``string``) The ID of registered filter which
+ corresponds to this indication.
+ """
+ if not self.is_subscribed(new_instance.classname, filter_id):
+ return
+ path = pywbem.CIMInstanceName(
+ classname=self.instmodification_classname,
+ namespace=self.namespace)
+ ind = pywbem.CIMInstance(
+ self.instcreation_classname,
+ path=path)
+ ind['SourceInstance'] = new_instance
+ ind['PreviousInstance'] = old_instance
+ ind['SourceInstanceHost'] = socket.gethostname()
+ ind['SourceInstanceModelPath'] = str(new_instance.path)
+ ind['IndicationFilterName'] = make_filter_name(
+ new_instance.classname, filter_id)
+ ind['PerceivedSeverity'] = self.SEVERITY_INFO
+
+ cmpi_logging.logger.info("Sending indication %s for %s",
+ ind["IndicationFilterName"], str(path))
+ self.send_indication(ind)
+
+ @cmpi_logging.trace_method
+ def is_subscribed(self, class_name, fltr_id):
+ """
+ Return True, if there is someone subscribed for given filter.
+
+ :param class_name: (``string``) *Scoping class* name.
+ :param fltr_id: (``string``) ID of the filter to check.
+ """
+ with self._access_lock:
+ if not self._enabled:
+ return False
+ if (class_name, fltr_id) in self._subscribed_filters:
+ return True
+ return False
+
+ @cmpi_logging.trace_method
+ def is_registered(self, class_name, fltr_id):
+ """
+ Return True, if filter id has been registered with current instance.
+
+ :param class_name: (``string``) *Scoping class* name.
+ :param fltr_id: (``string``) ID of the filter to check.
+ """
+ with self._access_lock:
+ return (class_name in self._filters
+ and fltr_id in self._filters[class_name])
+
+ def _send_indications_loop(self, broker):
+ """
+ This method runs in its own thread. It just sends all enqueued
+ indications.
+
+ :param broker: (``BrokerCIMOMHandle``) Handle of the CIMOM.
+ """
+ broker.AttachThread()
+ while True:
+ command = self._queue.get()
+
+ if isinstance(command, pywbem.CIMInstance) :
+ indication = command
+ cmpi_logging.logger.trace_info("Delivering indication %s" %
+ (str(indication.path)))
+ broker.DeliverIndication(self.namespace, indication)
+
+ elif isinstance(command, int):
+ cmpi_logging.logger.trace_info("Received command %d", command)
+ if command == self.COMMAND_STOP:
+ if hasattr(self._queue, "task_done"):
+ self._queue.task_done()
+ break
+
+ if hasattr(self._queue, "task_done"):
+ self._queue.task_done()
+
+ cmpi_logging.logger.info("Stopped Indication thread.")
+
+ @cmpi_logging.trace_method
+ def shutdown(self):
+ """
+ Stop the thread. This method blocks until the thread is safely
+ destroyed.
+ """
+ self._queue.put(self.COMMAND_STOP)
+ self._indication_sender.join()
diff --git a/src/python/lmi/common/JobManager.py b/src/python/lmi/common/JobManager.py
new file mode 100644
index 0000000..59e80e3
--- /dev/null
+++ b/src/python/lmi/common/JobManager.py
@@ -0,0 +1,1654 @@
+# Copyright (C) 2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# -*- coding: utf-8 -*-
+"""
+ Basic infrastructure for asynchronous jobs. All necessary CIM classes and
+ indications are implemented here.
+
+ .. autoclass:: JobManager
+ :members:
+
+ .. autoclass:: Job
+ :members:
+
+ .. autoclass:: LMI_ConcreteJob
+ :members:
+
+ .. autoclass:: LMI_OwningJobElement
+ :members:
+
+ .. autoclass:: LMI_AffectedJobElement
+ :members:
+
+ .. autoclass:: LMI_MethodResult
+ :members:
+
+ .. autoclass:: LMI_AssociatedJobMethodResult
+ :members:
+"""
+
+from datetime import datetime, timedelta
+import threading
+from Queue import Queue
+import pywbem
+import lmi.common.cmpi_logging as cmpi_logging
+import lmi.common
+from lmi.common.IndicationManager import IndicationManager
+from pywbem.cim_provider2 import CIMProvider2
+import socket
+import traceback
+
+@cmpi_logging.trace_function
+def register_filters(job_clsname, indication_manager=None):
+ """
+ This function registers static indication filters at IndicationManager.
+ It should be called upon provider's initialization.
+
+ :param job_clsname: (``String``) CIM class name for asynchonous jobs.
+ Will be part of filter queries.
+ :param indication_manager: If not given, global instance will be obtained.
+ """
+ if indication_manager is None:
+ ind_manager = IndicationManager.get_instance()
+ filters = {}
+ query_args = {
+ "classname" : job_clsname,
+ "prefix" : indication_manager.nameprefix
+ }
+ for fltr_id, fltr_props in JobManager.IND_FILTERS.items():
+ filters[fltr_id] = fltr_props.copy()
+ filters[fltr_id]['Query'] = fltr_props['Query'] % query_args
+ indication_manager.add_filters(job_clsname, filters)
+
+# Too many instance attributes
+# pylint: disable-msg=R0902
+class Job(object):
+ """
+ Generic abstract class representing one CIM_ConcreteJob.
+ It remembers input and output arguments, affected ManagedElements and
+ owning ManagedElement (to be able to create associations to them)
+ and all CIM_ConcreteJob properties.
+
+ Due to multiple threads processing the job, each job has its own
+ lock to guard its status changes. It is expected that number of jobs
+ is quite low.
+ """
+
+ DEFAULT_TIME_BEFORE_REMOVAL = 60 # in seconds
+
+ STATE_QUEUED = 1 # Job has not started yet
+ STATE_RUNNING = 2 # Job is running
+ STATE_FINISHED_OK = 3 # Job finished OK
+ STATE_FAILED = 4 # Job finished with error
+ STATE_SUSPENDED = 5 # Job is queued and suspended
+ STATE_TERMINATED = 6 # Job was queued and terminated
+
+ FINAL_STATES = [STATE_FINISHED_OK, STATE_FAILED, STATE_SUSPENDED,
+ STATE_TERMINATED]
+
+ # There is no way how to suspend/terminate running job!
+
+ @cmpi_logging.trace_method
+ def __init__(self, job_manager, job_name, input_arguments,
+ method_name, affected_elements, owning_element):
+ """
+ Create new storage job.
+
+ :param job_manager: (``JobManager``) Reference to ``JobManager``, which
+ will manage this job.
+ :param job_name: (``string``) User-friendly name of the job.
+ :param input_arguments: (``dictionary param_name -> param_value``)
+ Input arguments of the method, which spawned this job.
+ :param method_name: (``string``) Name of the CIM method, which spawned
+ this job.
+ :param affected_elements: (``array of CIMInstanceName``) List of
+ affected elements. ``LMI_AffectedJobElement`` association will be
+ created for them.
+ :param owning_element: (``CIMInstanceName``) Reference to service, which
+ spawned the job. ``LMI_OwningJobElement`` association will be
+ created for it.
+ """
+ self.job_manager = job_manager
+ self.timer_manager = job_manager.timer_manager
+
+ # Unique ID
+ self.the_id = job_manager.get_next_id()
+
+ # User friendly name of the job
+ self.job_name = job_name
+
+ # Dictionary of input arguments, 'parameter_name' -> 'parameter_value'
+ # The parameter value must be CIMProperty or something that can be
+ # assigned to it.
+ self.input_arguments = input_arguments
+
+ # Dictionary of output arguments, 'parameter_name' -> 'parameter_value'
+ # The parameter value must be CIMProperty or something that can be
+ # assigned to it.
+ self.output_arguments = None
+
+ # Method return value, as CIMProperty or something that can be
+ # assigned to it.
+ self.return_value = None
+ # Value of Job.ReturnValueType
+ self.return_value_type = None
+
+ # Name of the method
+ self.method_name = method_name
+
+ # Time when the job was created
+ self.time_submitted = datetime.utcnow()
+
+ # Nr. of seconds before the job is removed when the job finishes
+ self.time_before_removal = self.DEFAULT_TIME_BEFORE_REMOVAL
+
+ # If the job should be removed after completion
+ self.delete_on_completion = True
+
+ self.percent_complete = 0
+
+ # State of the job
+ self.job_state = self.STATE_QUEUED
+
+ # Last change of job state, wall clock time
+ self.clocktime_of_last_state_change = self.time_submitted
+
+ # Duration of the job in RUNNING state (in seconds)
+ self.elapsed_time = None
+
+ # When the job started (= switched to RUNNING), wall clock time
+ self.start_clocktime = None
+ # When the job started (= switched to RUNNING), monotonic clock time
+ self.start_monotime = None
+ # When the job finished (= switched from RUNNING), monotonic clock time
+ self.finish_monotime = None
+
+ # Array of CIMInstanceNames of affected elements, so we can
+ # enumerate associations to them.
+ self.affected_elements = affected_elements
+
+ # CIMInstanceName to owning element (service), so we can enumerate
+ # instances.
+ self.owning_element = owning_element
+
+ # Timer used to delete the job after time_before_removal seconds
+ self.timer = None
+
+ # CIMError with result code
+ self.error = None
+
+ # internal lock to protect state changes from races
+ self._lock = threading.RLock()
+
+ self._execute = None
+ self._execargs = None
+ self._execkwargs = None
+ self._cancel = None
+ self._cancelargs = None
+ self._cancelkwargs = None
+
+ @cmpi_logging.trace_method
+ def set_execute_action(self, callback, *args, **kwargs):
+ """
+ Set callback, which will be called when the job is to be executed. It is
+ expected that the callback will take some time to execute. The callback
+ must change state of the job and set output parameters and error in a
+ thread-safe way, i.e. by calling ``finish_method()``.
+
+ :param callback: (``function``) Reference to callback to call.
+ :param args, kwargs: All other parameters will be passed to the
+ callback. It is highly recommended to add reference to the job
+ to the callback.
+ """
+ self._execute = callback
+ self._execargs = args
+ self._execkwargs = kwargs
+
+ @cmpi_logging.trace_method
+ def set_cancel_action(self, callback, *args, **kwargs):
+ """
+ Set callbacks, which will be called when the job is to be
+ cancelled. The callback must be quick, the job is already locked!
+
+ :param callback: (``function``) Reference to callback to call.
+ :param args, kwargs: All other parameters will be passed to the
+ callback. It is highly recommended to add reference to the job
+ to the callback.
+ """
+ self._cancel = callback
+ self._cancelargs = args
+ self._cancelkwargs = kwargs
+
+ @cmpi_logging.trace_method
+ def finish_method(self, new_state, return_value=None, return_type=None,
+ output_arguments=None, error=None, affected_elements=None):
+ """
+ Mark the job as finished, with given return value, output parameters and
+ error.
+ This method is thread-safe.
+
+ :param new_state: (``Job.STATE_* value``) Resulting state of the job.
+ :param return_value: (``string``) Return value of the job, encoded
+ into string. Can be None when the job does not return any value.
+ :param return_type: (``Job.RetunValueType.* value``) Type of the return
+ value. Can be None when the job does not return any value.
+ :param output_arguments: (``dictionary param_name -> param_value``)
+ Output arguments of the job. Can be None when the job does not have
+ any output parameters.
+ :param error: (``CIMError``) Error raised by the job. Can be None,
+ when the job finished successfully.
+ :param affected_elements: (``array of CIMInstanceName``) New list of
+ affected elements to generate LMI_<name>JobAffectedElement
+ association. If None, the old list, passed to constructor, remains
+ untouched.
+ """
+ self.lock()
+ self.return_value = return_value
+ self.return_value_type = return_type
+ self.output_arguments = output_arguments
+ self.error = error
+ if affected_elements is not None:
+ self.affected_elements = affected_elements
+ self.change_state(new_state, 100)
+ self.unlock()
+
+ @cmpi_logging.trace_method
+ def change_state(self, new_state, percent=None):
+ """
+ Change state of a job. (Re-)calculate various times based on the state
+ change. Send indications as necessary.
+ This method is thread-safe.
+
+ :param new_state: (``Job.STATE_* value``) New state of the job.
+ It can be the same as the previous state to indicate progress of
+ the job.
+ :param percent: (``int``)) Percent complete of the job. When None,
+ this valuu will be automatically calculated (in progress = 50%,
+ finished = 100%).
+ """
+ self.lock()
+
+ cmpi_logging.logger.debug("Job %s: %s changes state from %d to %d"
+ % (self.the_id, self.job_name, self.job_state, new_state))
+
+ # For sending indications
+ prev_instance = None
+ send_indication = False
+ indication_ids = []
+
+ if self.job_state != new_state:
+ # Remember to send indications
+ prev_instance = self.job_manager.get_job_instance(self)
+ send_indication = True
+ indication_ids.append(JobManager.IND_JOB_CHANGED)
+
+ # Check if the job has just finished
+ if (self.job_state not in self.FINAL_STATES
+ and new_state in self.FINAL_STATES):
+ # Remember finish time
+ self.finish_clocktime = datetime.utcnow()
+ self.finish_monotime = self.timer_manager.now()
+ # Remember job execution time.
+ if self.start_monotime:
+ self.elapsed_time = self.finish_monotime \
+ - self.start_monotime
+ # Send indication
+ if self.job_state == self.STATE_FAILED:
+ indication_ids.append(JobManager.IND_JOB_FAILED)
+ if self.job_state == self.STATE_SUSPENDED:
+ indication_ids.append(JobManager.IND_JOB_SUCCEEDED)
+
+ # Check if the job has just started
+ if new_state == self.STATE_RUNNING:
+ self.start_clocktime = datetime.utcnow()
+ self.start_monotime = self.timer_manager.now()
+
+ self.clocktime_of_last_state_change = datetime.utcnow()
+ self.job_state = new_state
+
+ if percent is None:
+ # guess the percentage from status
+ if new_state == self.STATE_QUEUED:
+ percent = 0
+ elif new_state == self.STATE_RUNNING:
+ percent = 50
+ else:
+ percent = 100
+ if self.percent_complete != percent:
+ # Remember to send indications
+ if not send_indication:
+ self.clocktime_of_last_state_change = datetime.utcnow()
+ prev_instance = self.job_manager.get_job_instance(self)
+ send_indication = True
+ indication_ids.append(JobManager.IND_JOB_PERCENT_UPDATED)
+ self.percent_complete = percent
+
+ if send_indication:
+ current_instance = self.job_manager.get_job_instance(self)
+ self.job_manager.send_modify_indications(
+ prev_instance, current_instance, indication_ids)
+
+ # start / update the timer if necesasry
+ self._restart_timer()
+ self.unlock()
+
+ @cmpi_logging.trace_method
+ def _expire(self):
+ """
+ Callback when a Job completes and time_before_removal second passed.
+ The job gets removed from its JobManager.
+ """
+ cmpi_logging.logger.debug("Job %s: %s expired"
+ % (self.the_id, self.job_name))
+
+ self.job_manager.remove_job(self)
+
+ @cmpi_logging.trace_method
+ def _restart_timer(self):
+ """
+ Re-schedule timer for TimeBeforeRemoval because some property has
+ changed.
+ """
+ if not self.job_state in self.FINAL_STATES:
+ return
+
+ # Stop the old timer.
+ if self.timer:
+ self.timer.cancel()
+ self.timer = None
+
+ # Start the new timer.
+ if self.delete_on_completion:
+ now = self.timer_manager.now()
+ passed = now - self.finish_monotime
+ timeout = self.time_before_removal - passed
+ if timeout <= 0:
+ # Just in case...
+ self._expire()
+ return
+
+ cmpi_logging.logger.debug("Starting timer for job %s: '%s' for %f"
+ " seconds" % (self.the_id, self.job_name, timeout))
+ self.timer = self.timer_manager.create_timer(
+ "Job " + self.job_name,
+ callback=self._expire)
+ self.timer.start(timeout)
+
+ @cmpi_logging.trace_method
+ def lock(self):
+ """
+ Lock internal mutex. Other threads will block on subsequent lock().
+ The lock is recursive, i.e. can be called multiple times from
+ single thread.
+ """
+ self._lock.acquire()
+
+ @cmpi_logging.trace_method
+ def unlock(self):
+ """ Unlock internal mutex."""
+ self._lock.release()
+
+ @cmpi_logging.trace_method
+ def execute(self):
+ """
+ Start executing the job. It calls the execute callback, set by
+ ``set_execute_action()``.
+
+ job_state must be already set to STATE_RUNNING.
+ Any exception is translated to CIMError and appropriate state is set.
+ """
+ try:
+ self._execute(*(self._execargs), **(self._execkwargs))
+ except pywbem.CIMError, error:
+ cmpi_logging.logger.trace_warn("Job.execute caught an CIMError %s",
+ str(error))
+ cmpi_logging.logger.trace_verbose("traceback: %s",
+ traceback.format_exc())
+ self.finish_method(Job.STATE_FAILED, error=error)
+ except Exception, ex:
+ cmpi_logging.logger.trace_warn("Job.execute caught an Exception %s",
+ str(ex))
+ cmpi_logging.logger.trace_verbose("traceback: %s",
+ traceback.format_exc())
+ error = pywbem.CIMError(pywbem.CIM_ERR_FAILED, str(ex))
+ self.finish_method(Job.STATE_FAILED, error=error)
+
+ @cmpi_logging.trace_method
+ def cancel(self):
+ """
+ Cancels queued action. The action must have not been started.
+ """
+ self.change_state(self.STATE_TERMINATED)
+ if self._cancel:
+ self._cancel(*(self._cancelargs), **(self._cancelkwargs))
+
+ @cmpi_logging.trace_method
+ def get_name(self):
+ """
+ Return CIMInstanceName of the job.
+
+ :rtype: ``CIMInstanceName``
+ """
+ name = pywbem.CIMInstanceName(
+ classname=self.job_manager.job_classname,
+ namespace=self.job_manager.namespace,
+ keybindings={
+ 'InstanceID': self.get_instance_id()
+ })
+ return name
+
+ @cmpi_logging.trace_method
+ def get_instance_id(self, classname=None):
+ """
+ Return InstanceID.
+
+ :param classname: (``string``) Optional classname to generate InstanceID
+ for different class, e.g. for LMI_<name>MethodResult.
+ :rtype: ``string``
+ """
+ if classname is None:
+ classname = self.job_manager.job_classname
+ return 'LMI:' + classname + ':' + str(self.the_id)
+
+ @cmpi_logging.trace_method
+ def get_pre_call(self):
+ """
+ Return indication that describes the pre-execution values of the
+ job's invocation.
+
+ :rtype: ``CIMInstance of CIM_InstMethodCall``
+ """
+ path = pywbem.CIMInstanceName(
+ classname="CIM_InstMethodCall",
+ keybindings={},
+ host=socket.gethostname(),
+ namespace=self.job_manager.namespace)
+ inst = pywbem.CIMInstance(
+ classname="CIM_InstMethodCall",
+ path=path)
+ src_instance = self._get_cim_instance()
+ inst['SourceInstance'] = src_instance
+ inst['SourceInstanceModelPath'] = str(src_instance.path)
+ inst['MethodName'] = self.method_name
+ inst['MethodParameters'] = self.get_method_params(
+ '__MethodParameters', True, False)
+ inst['PreCall'] = True
+ return inst
+
+ @cmpi_logging.trace_method
+ def get_cim_error(self):
+ """
+ Return job error as CIMInstance of CIM_Error.
+ :returns: CIMInstance of CIM_Error
+ """
+ path = pywbem.CIMInstanceName(
+ classname="CIM_Error",
+ host=socket.gethostname(),
+ namespace=self.job_manager.namespace)
+ err = pywbem.CIMInstance(
+ classname="CIM_Error",
+ path=path)
+ err['CIMStatusCode'] = pywbem.Uint32(self.error[0])
+ err['Message'] = self.error[1]
+ return err
+
+ @cmpi_logging.trace_method
+ def get_post_call(self):
+ """
+ Return indication that describes the post-execution values of the
+ job's invocation.
+
+ :rtype: ``CIMInstance of CIM_InstMethodCall``
+ """
+ path = pywbem.CIMInstanceName(
+ classname="CIM_InstMethodCall",
+ keybindings={},
+ host=socket.gethostname(),
+ namespace=self.job_manager.namespace)
+ inst = pywbem.CIMInstance(
+ classname="CIM_InstMethodCall",
+ path=path)
+
+ src_instance = self._get_cim_instance()
+ inst['SourceInstance'] = src_instance
+ inst['SourceInstanceModelPath'] = str(src_instance.path)
+ inst['MethodName'] = self.method_name
+ inst['MethodParameters'] = self.get_method_params(
+ '__MethodParameters', True, True)
+ inst['PreCall'] = False
+
+ if self.return_value_type is not None:
+ inst['ReturnValueType'] = self.return_value_type
+ if self.return_value is not None:
+ inst['ReturnValue'] = str(self.return_value)
+ if self.error is not None:
+ err = self.get_cim_error()
+ inst['Error'] = [err, ]
+ return inst
+
+ @cmpi_logging.trace_method
+ def _get_cim_instance(self):
+ """
+ Return CIMInstance of this job.
+
+ :rtype: CIMInstance
+ """
+ return self.job_manager.get_job_instance(self)
+
+ @cmpi_logging.trace_method
+ def get_method_params(self, class_name, include_input, include_output):
+ """
+ Create a class of given name with all input or output parameters
+ of the asynchronous method. Typically used to assemble
+ CIM_ConcreteJob.JobInParameters or CIM_InstMethodCall.MethodParameters
+ values.
+
+ :param class_name: (``string``) Name of the class to create.
+ :param input: (``boolean``) Whether input parameters should be
+ included in the returned class
+ :param output: (``boolean``) Whether output parameters should be
+ included in the returned class
+ :rtype: CIMInstance of the created class.
+ """
+ # TODO: this is workaround for bug #920763, use class_name
+ # when it's fixed
+ clsname = "CIM_ManagedElement"
+ path = pywbem.CIMInstanceName(
+ classname=clsname,
+ namespace=self.job_manager.namespace)
+ inst = pywbem.CIMInstance(classname=clsname, path=path)
+ if include_input and self.input_arguments:
+ for (name, value) in self.input_arguments.iteritems():
+ inst[name] = value
+ if include_output and self.output_arguments:
+ # overwrite any input parameter
+ for (name, value) in self.output_arguments.iteritems():
+ inst[name] = value
+ return inst
+
+ # pylint: disable-msg=R0903
+ class ReturnValueType(object):
+ """ CIM_InstMethodCall.ReturnValueType values."""
+ Boolean = pywbem.Uint16(2)
+ String = pywbem.Uint16(3)
+ Char16 = pywbem.Uint16(4)
+ Uint8 = pywbem.Uint16(5)
+ Sint8 = pywbem.Uint16(6)
+ Uint16 = pywbem.Uint16(7)
+ Sint16 = pywbem.Uint16(8)
+ Uint32 = pywbem.Uint16(9)
+ Sint32 = pywbem.Uint16(10)
+ Uint64 = pywbem.Uint16(11)
+ Sint64 = pywbem.Uint16(12)
+ Datetime = pywbem.Uint16(13)
+ Real32 = pywbem.Uint16(14)
+ Real64 = pywbem.Uint16(15)
+ Reference = pywbem.Uint16(16)
+
+class JobManager(object):
+ """
+ Container of all queued, running or finished ``LMI_ConcreteJobs``.
+
+ Usage:
+
+ 1. Create MOF file for these classes:
+
+ * ``LMI_<name>Job``
+
+ * ``LMI_<name>MethodResult``
+
+ * ``LMI_Affected<name>JobElement``
+
+ * ``LMI_Owning<name>JobElement``
+
+ * ``LMI_Associated<name>JobMethodResult``
+
+ Where ``<name>`` is prefix of your classes, for example 'Storage'
+
+ 2. During initialization, initialize ``TimerManager`` and create
+ ``JobManager``.
+
+ 3. When needed. create new Job instance:
+
+ 4. Set its execute callback using ``set_execute_action()``. This callback
+ will be called when the job is to be executed. It will be called in
+ context of ``JobManager`` worker thread!
+
+ 5. Optionally, set cancel callback using ``set_execute_action()``. This
+ callback will be called when the job is still queued and is cancelled by
+ application. This callback will be called in context of CIMOM callback
+ and should be quick!
+
+ 6. Enqueue the job using ``JobManager.add_job()`` method.
+
+ 7. When your execute callback is called, you can optionally call
+ ``job.change_state()`` to update percentage of completion.
+
+ 8. When your execute callback is finished, don't forget to set method
+ result using ``job.finish_method()``.
+
+ * ``JobManager`` automatically sends all job-related indications.
+ * ``Job`` automatically tracks various timestamps.
+ * By default, the job automatically disappears after 60 seconds after it
+ finishes. Application may set ``DeleteOnCompletion`` and
+ ``TimeBeforeRemoval`` properties of ``LMI_<name>Job`` to override this
+ timeout.
+ """
+
+ COMMAND_STOP = 1
+
+ IND_JOB_PERCENT_UPDATED = "PercentUpdated"
+ IND_JOB_SUCCEEDED = "Succeeded"
+ IND_JOB_FAILED = "Failed"
+ IND_JOB_CHANGED = "Changed"
+ IND_JOB_CREATED = "Created"
+
+ IND_FILTERS = {
+ IND_JOB_PERCENT_UPDATED: {
+ "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::PercentComplete <> "
+ "PreviousInstance.CIM_ConcreteJob::PercentComplete",
+ "Description" : "Modification of Percentage Complete for a "
+ "Concrete Job.",
+ },
+ IND_JOB_SUCCEEDED: {
+ "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 17",
+ "Description": "Modification of Job State for a "
+ "Concrete Job to 'Complete'.",
+ },
+ IND_JOB_FAILED: {
+ "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 10",
+ "Description": "Modification of Job State for a "
+ "Concrete Job to 'Exception'.",
+ },
+ IND_JOB_CHANGED: {
+ "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState <> "
+ "PreviousInstance.CIM_ConcreteJob::JobState",
+ "Description": "Modification of Job State for a ConcreteJob.",
+ },
+ IND_JOB_CREATED: {
+ "Query" : "SELECT * FROM LMI_%(prefix)sInstCreation WHERE "
+ "SourceInstance ISA %(classname)s",
+ "Description": "Creation of a ConcreteJob.",
+ },
+ }
+
+ @cmpi_logging.trace_method
+ def __init__(self, name, namespace, indication_manager, timer_manager):
+ """
+ Initialize new Manager. It automatically registers all job-related
+ filters to indication_manager and starts a worker thread.
+
+ :param name: (``string``) String with classname infix. For example
+ 'Storage' for ``LMI_StorageJob``, ``LMI_StorageJobMethodResult``
+ etc.
+ :param namespace: (``string``) Namespace of all providers.
+ :param indication_manager: (``IndicationManager``): a manager where
+ indications and filters should be added.
+ :param timer_manager: (``TimerManager``): Timer manager instance.
+ """
+ # List of all jobs. Dictionary job_id -> Job.
+ self.jobs = {}
+ # Queue of jobs scheduled to execute.
+ self.queue = Queue()
+ # Last created job_id.
+ self.last_instance_id = 0
+ # Classname infix.
+ self.name = name
+ # CIMProvider2 instances for job classes.
+ self.providers = {}
+ self.namespace = namespace
+ self.indication_manager = indication_manager
+ self.timer_manager = timer_manager
+
+ # Start the worker thread (don't forget to register it at CIMOM)
+ self.worker = threading.Thread(target=self._worker_main)
+ self.worker.daemon = False
+ self.worker.start()
+
+ # Various classnames for job-related classes, with correct infixes.
+ self.job_classname = 'LMI_' + self.name + 'Job'
+ self.method_result_classname = "LMI_" + self.name + "MethodResult"
+ self.affected_classname = "LMI_Affected" + self.name + "JobElement"
+ self.owning_classname = "LMI_Owning" + self.name + "JobElement"
+ self.associated_result_classname = ('LMI_Associated' + self.name
+ + 'JobMethodResult')
+ self.indication_filter_classname = ('LMI_' + self.name
+ + 'JobIndicationFilter')
+ self.job_provider = None
+ self._add_indication_filters()
+
+ @cmpi_logging.trace_method
+ def _add_indication_filters(self):
+ """
+ Add all job-related ``IndicationFilters`` to indication manager.
+ """
+ register_filters(self.job_classname, self.indication_manager)
+
+ @cmpi_logging.trace_method
+ def get_providers(self):
+ """
+ Get dictionary of providers for these classes:
+
+ * ``LMI_<name>Job``
+ * ``LMI_<name>MethodResult``
+ * ``LMI_Affected<name>JobElement``
+ * ``LMI_Owning<name>JobElement``
+ * ``LMI_Associated<name>JobMethodResult``
+
+ :rtype: dictionary class_name -> CIMProvider2
+ """
+
+ if not self.providers:
+ job_provider = LMI_ConcreteJob(self.job_classname, job_manager=self)
+ self.providers[self.job_classname] = job_provider
+ self.job_provider = job_provider
+
+ provider = LMI_MethodResult(
+ self.method_result_classname, job_manager=self)
+ self.providers[self.method_result_classname] = provider
+
+ provider = LMI_AffectedJobElement(
+ self.affected_classname, job_manager=self)
+ self.providers[self.affected_classname] = provider
+
+ provider = LMI_OwningJobElement(
+ self.owning_classname, job_manager=self)
+ self.providers[self.owning_classname] = provider
+
+ provider = LMI_AssociatedJobMethodResult(
+ self.owning_classname, job_manager=self)
+ self.providers[self.associated_result_classname] = provider
+
+ return self.providers
+
+ @cmpi_logging.trace_method
+ def add_job(self, job):
+ """
+ Enqueue new job. Send indication when needed.
+
+ :param job: (``Job``) A job to enqueue.
+ """
+ cmpi_logging.logger.debug("Job %s: '%s' enqueued"
+ % (job.the_id, job.job_name))
+
+ self.jobs[job.the_id] = job
+ self.queue.put(job)
+ # send indication
+ if self.indication_manager.is_subscribed(
+ self.job_classname, self.IND_JOB_CREATED):
+ job_instance = self.get_job_instance(job)
+ self.indication_manager.send_instcreation(
+ job_instance, self.IND_JOB_CREATED)
+
+ def send_modify_indications(self, prev_instance, current_instance,
+ indication_ids):
+ """
+ Send InstModification. This is helper method called by ``Job`` when
+ needed.
+
+ :param prev_instance: Instance of ``LMI_<name>Job`` before it was
+ modified.
+ :param current_instance: Instance of ``LMI_<name>Job`` after it was
+ modified.
+ """
+ for _id in indication_ids:
+ self.indication_manager.send_instmodification(prev_instance,
+ current_instance, _id)
+
+ @cmpi_logging.trace_method
+ def remove_job(self, job):
+ """
+ Remove existing job. Note that jobs are removed automatically after a
+ timeout, providers should not call this method directly.
+
+ :param job: (``Job``) Job to remove.
+ """
+ cmpi_logging.logger.debug("Job %s: '%s' removed from queue."
+ % (job.the_id, job.job_name))
+ del self.jobs[job.the_id]
+ # The job may still be in the queue!
+ # There is no way, how to remove it, it will be skipped by the
+ # worker thread.
+
+ @cmpi_logging.trace_method
+ def get_job_for_instance_id(self, instance_id, classname=None):
+ """
+ Return Job for given InstanceID or None when no such Job exist.
+
+ :param instance_id: (``string``) InstanceID value to parse.
+ :param classname: (``string``) Optional classname to parse the
+ InstanceID (e.g. when parsing InstanceID of
+ ``LMI_<name>MethodResult``).
+ :rtype: ``Job``
+ """
+ if classname is None:
+ classname = self.job_classname
+ the_id = lmi.common.parse_instance_id(instance_id, classname)
+ if not the_id.isdigit():
+ return None
+ return self.jobs.get(the_id, None)
+
+ @cmpi_logging.trace_method
+ def _worker_main(self):
+ """
+ This is the main loop of the job queue. It just processes enqueued
+ jobs and never ends.
+ """
+ cmpi_logging.logger.info("Started Job thread.")
+ while True:
+ command = self.queue.get()
+ if isinstance(command, Job):
+ # we need to protect from changes between checking state and
+ # setting new state
+ job = command
+ job.lock()
+ if job.job_state == Job.STATE_QUEUED:
+ # the job was not cancelled
+ job.change_state(Job.STATE_RUNNING)
+ job.unlock()
+ cmpi_logging.logger.info("Starting job %s: '%s'" %
+ (job.the_id, job.job_name))
+
+ job.execute()
+ if job.error:
+ cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
+ " %s" % (job.the_id, job.job_name, str(job.error)))
+ else:
+ cmpi_logging.logger.info("Job %s: '%s' finished OK" %
+ (job.the_id, job.job_name))
+ else:
+ # just skip suspended and terminated jobs
+ job.unlock()
+
+ elif isinstance(command, int):
+ self.queue.task_done()
+ break
+
+ self.queue.task_done()
+
+ cmpi_logging.logger.info("Stopped Job thread.")
+
+ @cmpi_logging.trace_method
+ def get_next_id(self):
+ """
+ Return next unused job id.
+
+ :rtype: string
+ """
+ self.last_instance_id += 1
+ return str(self.last_instance_id)
+
+ @cmpi_logging.trace_method
+ def get_job_instance(self, job):
+ """
+ Return CIMInstance for given job.
+
+ :param job: (``Job``)
+ :rtype: ``CIMInstance``
+ """
+ path = pywbem.CIMInstanceName(
+ classname=self.job_classname,
+ keybindings={'InstanceID': job.get_instance_id()},
+ host=socket.gethostname(),
+ namespace=self.namespace)
+ inst = pywbem.CIMInstance(classname=self.job_classname, path=path)
+ inst['InstanceID'] = job.get_instance_id()
+ return self.job_provider.get_instance(None, inst)
+
+ @cmpi_logging.trace_method
+ def shutdown(self, timeout=1):
+ """
+ Stop the thread. If a job is running, it may leave the job process
+ (mkfs, resize2fs, ...) and the worker thread (waiting for the process to
+ finish) still running.
+
+ JobManager still needs Indication Manager and TimeManager working at
+ this point!
+
+ :param timeout: Nr. of seconds to wait for the current job. Afterwards
+ the thread is abandoned, leaving the process still running.
+ """
+ # Empty the queue, we don't want the worker to proceed with any other
+ # queued job.
+ while not self.queue.empty():
+ queue.get(False)
+ queue.task_done()
+
+ self.queue.put(self.COMMAND_STOP)
+ self.worker.join(timeout)
+
+ # Cancel all running/suspended/queued jobs.
+ # This will send indications.
+ for job in self.jobs.itervalues():
+ if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED,
+ Job.STATE_RUNNING):
+ job.cancel()
+
+ if self.worker.isAlive():
+ # There is no way, how to stop the thread in Python, so abandon it.
+ self.worker.daemon = True
+ self.indication_manager = None
+ self.timer_manager = None
+
+ def can_shutdown(self):
+ """
+ Return True, if there is no running Job.
+ """
+ return self.queue.empty()
+
+
+class LMI_ConcreteJob(CIMProvider2):
+ """
+ Provider of LMI_ConcreteJob class or its subclass.
+ """
+ @cmpi_logging.trace_method
+ def __init__(self, classname, job_manager):
+ self.classname = classname
+ self.job_manager = job_manager
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """
+ Provider implementation of EnumerateInstances intrinsic method.
+ """
+ model.path.update({'InstanceID': None})
+ for job in self.job_manager.jobs.values():
+ model['InstanceID'] = job.get_instance_id()
+ if keys_only:
+ yield model
+ else:
+ yield self.get_instance(env, model, job)
+
+ @cmpi_logging.trace_method
+ def get_job_states(self, job):
+ """
+ Return JobState and OperationalStatus property values.
+
+ :param job: (``int``) Job.STATE_* value.
+ :rtype: tuple ``(JobState, OperationalStatus)`` values.
+ """
+ if job.job_state == Job.STATE_QUEUED:
+ jobstate = self.Values.JobState.New
+ opstate = [self.Values.OperationalStatus.Dormant]
+ elif job.job_state == Job.STATE_RUNNING:
+ jobstate = self.Values.JobState.Running
+ opstate = [self.Values.OperationalStatus.OK]
+ elif job.job_state == Job.STATE_FINISHED_OK:
+ jobstate = self.Values.JobState.Completed
+ opstate = [self.Values.OperationalStatus.OK,
+ self.Values.OperationalStatus.Completed]
+ elif job.job_state == Job.STATE_SUSPENDED:
+ jobstate = self.Values.JobState.Suspended
+ opstate = [self.Values.OperationalStatus.OK]
+ elif job.job_state == Job.STATE_FAILED:
+ jobstate = self.Values.JobState.Exception
+ opstate = [self.Values.OperationalStatus.Error,
+ self.Values.OperationalStatus.Completed]
+ elif job.job_state == Job.STATE_TERMINATED:
+ jobstate = self.Values.JobState.Terminated
+ opstate = [self.Values.OperationalStatus.Stopped]
+ return jobstate, opstate
+
+ @cmpi_logging.trace_method
+ # pylint: disable-msg=W0221
+ def get_instance(self, env, model, job=None):
+ """
+ Provider implementation of GetInstance intrinsic method.
+ """
+ if not job:
+ instance_id = model['InstanceID']
+ job = self.job_manager.get_job_for_instance_id(instance_id)
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+
+ model['DeleteOnCompletion'] = job.delete_on_completion
+ model['Name'] = job.job_name
+
+ # convert seconds to timedelta
+ seconds = job.time_before_removal
+ if seconds:
+ delta = timedelta(seconds=seconds)
+ model['TimeBeforeRemoval'] = pywbem.CIMDateTime(delta)
+ else:
+ model['TimeBeforeRemoval'] = pywbem.CIMProperty(
+ name='TimeBeforeRemoval',
+ value=None,
+ type='datetime')
+
+ if job.clocktime_of_last_state_change:
+ model['TimeOfLastStateChange'] = pywbem.CIMDateTime(
+ job.clocktime_of_last_state_change)
+ else:
+ model['TimeOfLastStateChange'] = pywbem.CIMProperty(
+ name='TimeOfLastStateChange',
+ value=None,
+ type='datetime')
+
+ if job.elapsed_time:
+ elapsed_time = timedelta(seconds=job.elapsed_time)
+ model['ElapsedTime'] = pywbem.CIMDateTime(elapsed_time)
+ else:
+ model['ElapsedTime'] = pywbem.CIMProperty(
+ name='ElapsedTime',
+ value=None,
+ type='datetime')
+
+ model['Description'] = job.job_name
+ model['LocalOrUtcTime'] = self.Values.LocalOrUtcTime.UTC_Time
+ model['PercentComplete'] = pywbem.Uint16(job.percent_complete)
+ if job.start_clocktime:
+ model['StartTime'] = pywbem.CIMDateTime(job.start_clocktime)
+ else:
+ model['StartTime'] = pywbem.CIMProperty(
+ name='StartTime',
+ value=None,
+ type='datetime')
+
+ if job.input_arguments:
+ model['JobInParameters'] = job.get_method_params(
+ "__JobInParameters", True, False)
+
+ if job.job_state in Job.FINAL_STATES:
+ # assemble output parameters with return value
+ outparams = job.get_method_params("__JobOutParameters", False, True)
+ if job.return_value is not None:
+ outparams['__ReturnValue'] = job.return_value
+ model['JobOutParameters'] = outparams
+
+ model['TimeSubmitted'] = pywbem.CIMDateTime(job.time_submitted)
+ # set correct state
+ jobstate, opstate = self.get_job_states(job)
+ model['JobState'] = jobstate
+ model['OperationalStatus'] = opstate
+ return model
+
+ @cmpi_logging.trace_method
+ def set_instance(self, env, instance, modify_existing):
+ """Return a newly created or modified instance.
+
+ :param env: Provider Environment (pycimmb.ProviderEnvironment)
+ :param instance: The new pywbem.CIMInstance. If modifying an existing
+ instance, the properties on this instance have been filtered by
+ the PropertyList from the request.
+ :param modify_existing: True if ModifyInstance, False if CreateInstance
+
+ Return the new instance. The keys must be set on the new instance.
+ """
+ if not modify_existing:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
+ "Creation of Job instances is not supported.")
+
+ job = self.job_manager.get_job_for_instance_id(instance['InstanceID'])
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+
+ try:
+ job.lock()
+ restart_timer = False
+
+ for (key, value) in instance.iteritems():
+ if value is None:
+ continue
+ if key == 'DeleteOnCompletion':
+ job.delete_on_completion = value
+ restart_timer = True
+ elif key == 'TimeBeforeRemoval':
+ job.time_before_removal = value.total_seconds()
+ restart_timer = True
+ elif key == 'JobRunTimes':
+ if value != 1:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
+ "JobRunTimes property is not supported.")
+ elif key == 'LocalOrUtcTime':
+ if value != self.Values.LocalOrUtcTime.UTC_Time:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
+ "Setting of LocalOrUtcTime property is not"
+ " supported.")
+ else:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
+ "Setting of %s property is not supported." % (key,))
+
+ if restart_timer:
+ job._restart_timer()
+ finally:
+ job.unlock()
+ return instance
+
+ @cmpi_logging.trace_method
+ def delete_instance(self, env, instance_name):
+ """Delete an instance.
+
+ :param env: Provider Environment (pycimmb.ProviderEnvironment)
+ :param instance_name: A pywbem.CIMInstanceName specifying the instance
+ to delete.
+ """
+ job = self.job_manager.get_job_for_instance_id(
+ instance_name['InstanceID'])
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+ if not job.job_status in Job.FINAL_STATES:
+ raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
+ "Job has not finished.")
+
+ self.job_manager.remove_job(job)
+
+ @cmpi_logging.trace_method
+ def cim_method_geterrors(self, env, object_name):
+ """Implements LMI_StorageJob.GetErrors()
+
+ If JobState is "Completed" and Operational Status is "Completed"
+ then no instance of CIM_Error is returned.
+
+ If JobState is "Exception" then GetErrors may return intances of
+ CIM_Error related to the execution of the procedure or method invoked by
+ the job.
+
+ If Operatational Status is not "OK" or "Completed" then
+ GetErrors may return CIM_Error instances related to the running of
+ the job.
+
+ :param env: -- Provider Environment (pycimmb.ProviderEnvironment)
+ :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
+ specifying the object on which the method GetErrors()
+ should be invoked.
+
+ Output parameters:
+
+ * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...))
+ If the OperationalStatus on the Job is not "OK", then this
+ method will return one or more CIM Error instance(s).
+ Otherwise, when the Job is "OK", null is returned.
+ """
+ job = self.job_manager.get_job_for_instance_id(
+ object_name['InstanceID'])
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+
+ if job.error is None:
+ errors = []
+ else:
+ err = job.get_cim_error()
+ errors = [err, ]
+ out_params = [
+ pywbem.CIMParameter(
+ name='Errors',
+ value=errors,
+ type='instance',
+ is_array=True,
+ array_size=len(errors))
+ ]
+ rval = self.Values.GetErrors.Success
+
+ return (rval, out_params)
+
+ @cmpi_logging.trace_method
+ def cim_method_requeststatechange(self, env, object_name,
+ param_requestedstate=None,
+ param_timeoutperiod=None):
+ """Implements LMI_StorageJob.RequestStateChange()
+
+ Requests that the state of the job be changed to the value
+ specified in the RequestedState parameter. Invoking the
+ RequestStateChange method multiple times could result in earlier
+ requests being overwritten or lost.
+
+ If 0 is returned, then the
+ task completed successfully. Any other return code indicates an
+ error condition.
+
+ :param env: Provider Environment (pycimmb.ProviderEnvironment)
+ :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
+ specifying the object on which the method RequestStateChange()
+ should be invoked.
+ :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState)
+ RequestStateChange changes the state of a job. The possible
+ values are as follows: Start (2) changes the state to
+ \'Running\'. Suspend (3) stops the job temporarily. The
+ intention is to subsequently restart the job with \'Start\'.
+ It might be possible to enter the \'Service\' state while
+ suspended. (This is job-specific.) Terminate (4) stops the
+ job cleanly, saving data, preserving the state, and shutting
+ down all underlying processes in an orderly manner. Kill (5)
+ terminates the job immediately with no requirement to save
+ data or preserve the state. Service (6) puts the job into a
+ vendor-specific service state. It might be possible to restart
+ the job.
+
+ :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime)
+ A timeout period that specifies the maximum amount of time that
+ the client expects the transition to the new state to take.
+ The interval format must be used to specify the TimeoutPeriod.
+ A value of 0 or a null parameter indicates that the client has
+ no time requirements for the transition. If this property
+ does not contain 0 or null and the implementation does not
+ support this parameter, a return code of \'Use Of Timeout
+ Parameter Not Supported\' must be returned.
+ """
+ job = self.job_manager.get_job_for_instance_id(
+ object_name['InstanceID'])
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+
+ try:
+ job.lock()
+ states = self.Values.RequestStateChange.RequestedState
+ retcodes = self.Values.RequestStateChange
+ if param_requestedstate == states.Suspend:
+ if job.job_state != Job.STATE_QUEUED:
+ # Can suspend only queued jobs
+ rval = retcodes.Invalid_State_Transition
+ else:
+ job.change_state(Job.STATE_SUSPENDED)
+ rval = retcodes.Completed_with_No_Error
+
+ elif param_requestedstate == states.Terminate:
+ if job.job_state not in (Job.STATE_QUEUED, Job.STATE_SUSPENDED):
+ # Can terminate only queued or suspended jobs
+ rval = retcodes.Invalid_State_Transition
+ else:
+ job.cancel()
+ rval = retcodes.Completed_with_No_Error
+
+ elif param_requestedstate == states.Start:
+ if job.job_state != Job.STATE_SUSPENDED:
+ # Can start only suspended jobs
+ rval = retcodes.Invalid_State_Transition
+ else:
+ job.change_state(Job.STATE_QUEUED)
+ # Enqueue the job again, it may be already processed
+ # (we might get the job in the queue twice, but
+ # we have only one worker thread so it won't collide).
+ self.job_manager.add_job(job)
+ rval = retcodes.Completed_with_No_Error
+
+ else:
+ rval = retcodes.Invalid_State_Transition
+ finally:
+ job.unlock()
+ return (rval, [])
+
+ @cmpi_logging.trace_method
+ def cim_method_killjob(self, env, object_name,
+ param_deleteonkill=None):
+ """Implements LMI_StorageJob.KillJob() """
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
+
+ @cmpi_logging.trace_method
+ def cim_method_geterror(self, env, object_name):
+ """Implements LMI_StorageJob.GetError()
+
+ GetError is deprecated because Error should be an array,not a
+ scalar.
+
+ When the job is executing or has terminated without
+ error, then this method returns no CIM_Error instance. However, if
+ the job has failed because of some internal problem or because the
+ job has been terminated by a client, then a CIM_Error instance is
+ returned.
+
+ :param env: Provider Environment (pycimmb.ProviderEnvironment)
+ :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
+ specifying the object on which the method GetError()
+ should be invoked.
+
+ Output parameters:
+
+ * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``)
+ If the OperationalStatus on the Job is not "OK", then this
+ method will return a CIM Error instance. Otherwise, when the
+ Job is "OK", null is returned.
+ """
+ job = self.job_manager.get_job_for_instance_id(
+ object_name['InstanceID'])
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+
+ if job.error is None:
+ error = pywbem.CIMParameter(
+ name='error',
+ value=None,
+ type='instance',
+ is_array=False)
+ else:
+ err = job.get_cim_error()
+ error = pywbem.CIMParameter(
+ name='error',
+ value=err,
+ type='instance')
+ rval = self.Values.GetError.Success
+ return (rval, [error])
+
+ class Values(object):
+ class JobState(object):
+ New = pywbem.Uint16(2)
+ Starting = pywbem.Uint16(3)
+ Running = pywbem.Uint16(4)
+ Suspended = pywbem.Uint16(5)
+ Shutting_Down = pywbem.Uint16(6)
+ Completed = pywbem.Uint16(7)
+ Terminated = pywbem.Uint16(8)
+ Killed = pywbem.Uint16(9)
+ Exception = pywbem.Uint16(10)
+ Service = pywbem.Uint16(11)
+ Query_Pending = pywbem.Uint16(12)
+ # DMTF_Reserved = 13..32767
+ # Vendor_Reserved = 32768..65535
+
+ class LocalOrUtcTime(object):
+ Local_Time = pywbem.Uint16(1)
+ UTC_Time = pywbem.Uint16(2)
+
+ class OperationalStatus(object):
+ Unknown = pywbem.Uint16(0)
+ Other = pywbem.Uint16(1)
+ OK = pywbem.Uint16(2)
+ Degraded = pywbem.Uint16(3)
+ Stressed = pywbem.Uint16(4)
+ Predictive_Failure = pywbem.Uint16(5)
+ Error = pywbem.Uint16(6)
+ Non_Recoverable_Error = pywbem.Uint16(7)
+ Starting = pywbem.Uint16(8)
+ Stopping = pywbem.Uint16(9)
+ Stopped = pywbem.Uint16(10)
+ In_Service = pywbem.Uint16(11)
+ No_Contact = pywbem.Uint16(12)
+ Lost_Communication = pywbem.Uint16(13)
+ Aborted = pywbem.Uint16(14)
+ Dormant = pywbem.Uint16(15)
+ Supporting_Entity_in_Error = pywbem.Uint16(16)
+ Completed = pywbem.Uint16(17)
+ Power_Mode = pywbem.Uint16(18)
+ Relocating = pywbem.Uint16(19)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..
+
+ class GetErrors(object):
+ Success = pywbem.Uint32(0)
+ Not_Supported = pywbem.Uint32(1)
+ Unspecified_Error = pywbem.Uint32(2)
+ Timeout = pywbem.Uint32(3)
+ Failed = pywbem.Uint32(4)
+ Invalid_Parameter = pywbem.Uint32(5)
+ Access_Denied = pywbem.Uint32(6)
+ # DMTF_Reserved = ..
+ # Vendor_Specific = 32768..65535
+
+ class GetError(object):
+ Success = pywbem.Uint32(0)
+ Not_Supported = pywbem.Uint32(1)
+ Unspecified_Error = pywbem.Uint32(2)
+ Timeout = pywbem.Uint32(3)
+ Failed = pywbem.Uint32(4)
+ Invalid_Parameter = pywbem.Uint32(5)
+ Access_Denied = pywbem.Uint32(6)
+ # DMTF_Reserved = ..
+ # Vendor_Specific = 32768..65535
+
+ class RequestStateChange(object):
+ Completed_with_No_Error = pywbem.Uint32(0)
+ Not_Supported = pywbem.Uint32(1)
+ Unknown_Unspecified_Error = pywbem.Uint32(2)
+ Can_NOT_complete_within_Timeout_Period = pywbem.Uint32(3)
+ Failed = pywbem.Uint32(4)
+ Invalid_Parameter = pywbem.Uint32(5)
+ In_Use = pywbem.Uint32(6)
+ # DMTF_Reserved = ..
+ Method_Parameters_Checked___Transition_Started = pywbem.Uint32(4096)
+ Invalid_State_Transition = pywbem.Uint32(4097)
+ Use_of_Timeout_Parameter_Not_Supported = pywbem.Uint32(4098)
+ Busy = pywbem.Uint32(4099)
+ # Method_Reserved = 4100..32767
+ # Vendor_Specific = 32768..65535
+ class RequestedState(object):
+ Start = pywbem.Uint16(2)
+ Suspend = pywbem.Uint16(3)
+ Terminate = pywbem.Uint16(4)
+ Kill = pywbem.Uint16(5)
+ Service = pywbem.Uint16(6)
+ # DMTF_Reserved = 7..32767
+ # Vendor_Reserved = 32768..65535
+
+class LMI_OwningJobElement(CIMProvider2):
+ """ Instrumentation of LMI_OwningJobElement class and its subclasses."""
+
+ @cmpi_logging.trace_method
+ def __init__(self, classname, job_manager):
+ self.classname = classname
+ self.job_manager = job_manager
+
+ @cmpi_logging.trace_method
+ def get_instance(self, env, model):
+ """Return an instance."""
+ instance_id = model['OwnedElement']['InstanceID']
+ job = self.job_manager.get_job_for_instance_id(instance_id)
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "OwnedElement not found.")
+
+ if job.owning_element != model['OwningElement']:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "OwnedElement is not associated to OwningElement.")
+ return model
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """Enumerate instances."""
+ model.path.update({'OwnedElement': None, 'OwningElement': None})
+ for job in self.job_manager.jobs.values():
+ if job.owning_element:
+ model['OwnedElement'] = job.get_name()
+ model['OwningElement'] = job.owning_element
+ yield model
+
+ @cmpi_logging.trace_method
+ def references(self, env, object_name, model, result_class_name, role,
+ result_role, keys_only):
+ """Instrument Associations."""
+ ch = env.get_cimom_handle()
+ if ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super='CIM_ManagedElement') or \
+ ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super=self.job_manager.job_classname):
+ return self.simple_refs(env, object_name, model,
+ result_class_name, role, result_role, keys_only)
+
+class LMI_AffectedJobElement(CIMProvider2):
+ """ Instrumentation of LMI_AffectedJobElement class and its subclasses."""
+
+ @cmpi_logging.trace_method
+ def __init__(self, classname, job_manager):
+ self.classname = classname
+ self.job_manager = job_manager
+
+ @cmpi_logging.trace_method
+ def get_instance(self, env, model):
+ """Return an instance."""
+ instance_id = model['AffectingElement']['InstanceID']
+ job = self.job_manager.get_job_for_instance_id(instance_id)
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "AffectingElement not found.")
+
+ if job.affected_elements is None:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "The AffectingElement has no AffectedElement.")
+ if model['AffectedElement'] not in job.affected_elements:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "AffectedElement is not associated to AffectingElement.")
+ model['ElementEffects'] = [self.Values.ElementEffects.Unknown, ]
+ return model
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """Enumerate instances."""
+ model.path.update({'AffectingElement': None, 'AffectedElement': None})
+ for job in self.job_manager.jobs.values():
+ if job.affected_elements is None:
+ continue
+ for element in job.affected_elements:
+ model['AffectingElement'] = job.get_name()
+ model['AffectedElement'] = element
+ if keys_only:
+ yield model
+ else:
+ yield self.get_instance(env, model)
+
+ @cmpi_logging.trace_method
+ def references(self, env, object_name, model, result_class_name, role,
+ result_role, keys_only):
+ """Instrument Associations."""
+ ch = env.get_cimom_handle()
+ if ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super='CIM_ManagedElement') or \
+ ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super=self.job_manager.job_classname):
+ return self.simple_refs(env, object_name, model,
+ result_class_name, role, result_role, keys_only)
+
+ class Values(object):
+ class ElementEffects(object):
+ Unknown = pywbem.Uint16(0)
+ Other = pywbem.Uint16(1)
+ Exclusive_Use = pywbem.Uint16(2)
+ Performance_Impact = pywbem.Uint16(3)
+ Element_Integrity = pywbem.Uint16(4)
+ Create = pywbem.Uint16(5)
+
+
+class LMI_MethodResult(CIMProvider2):
+ """Instrumentation of LMI_MethodResult class and its subclasses."""
+
+ @cmpi_logging.trace_method
+ def __init__(self, classname, job_manager):
+ self.classname = classname
+ self.job_manager = job_manager
+
+ @cmpi_logging.trace_method
+ # pylint: disable-msg=W0221
+ def get_instance(self, env, model, job=None):
+ """Return an instance."""
+ if not job:
+ instance_id = model['InstanceID']
+ job = self.job_manager.get_job_for_instance_id(
+ instance_id, self.classname)
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+
+ model['Description'] = job.job_name
+ if job.job_state in Job.FINAL_STATES:
+ model['PostCallIndication'] = pywbem.CIMProperty(
+ name='PostCallIndication',
+ value=job.get_post_call())
+ else:
+ model['PostCallIndication'] = pywbem.CIMProperty(
+ name='PostCallIndication',
+ type='instance',
+ value=None)
+ model['PreCallIndication'] = pywbem.CIMProperty(
+ name='PreCallIndication',
+ value=job.get_pre_call())
+ return model
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """Enumerate instances."""
+ model.path.update({'InstanceID': None})
+ for job in self.job_manager.jobs.values():
+ model['InstanceID'] = job.get_instance_id(
+ classname=self.classname)
+ if keys_only:
+ yield model
+ else:
+ yield self.get_instance(env, model, job)
+
+class LMI_AssociatedJobMethodResult(CIMProvider2):
+ """
+ Instrumentation of LMI_AssociatedJobMethodResult class and its
+ subclasses.
+ """
+
+ @cmpi_logging.trace_method
+ def __init__(self, classname, job_manager):
+ self.classname = classname
+ self.job_manager = job_manager
+
+ @cmpi_logging.trace_method
+ def get_instance(self, env, model):
+ """Return an instance."""
+ instance_id = model['Job']['InstanceID']
+ job = self.job_manager.get_job_for_instance_id(instance_id)
+ if not job:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job not found.")
+
+ expected_result_id = job.get_instance_id(
+ classname=self.job_manager.method_result_classname)
+ if model['JobParameters']['InstanceID'] != expected_result_id:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Job is not associated to JobParameters.")
+ return model
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """Enumerate instances."""
+ model.path.update({'JobParameters': None, 'Job': None})
+ for job in self.job_manager.jobs.values():
+ if job.owning_element:
+ model['Job'] = job.get_name()
+ model['JobParameters'] = pywbem.CIMInstanceName(
+ classname=self.job_manager.method_result_classname,
+ namespace=self.job_manager.namespace,
+ keybindings={
+ 'InstanceID': job.get_instance_id(
+ classname=self.job_manager.method_result_classname)
+ })
+ yield model
+
+ @cmpi_logging.trace_method
+ def references(self, env, object_name, model, result_class_name, role,
+ result_role, keys_only):
+ """Instrument Associations."""
+ ch = env.get_cimom_handle()
+ if ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super=self.job_manager.method_result_classname) or \
+ ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super=self.job_manager.job_classname):
+ return self.simple_refs(env, object_name, model,
+ result_class_name, role, result_role, keys_only)
+
diff --git a/src/python/lmi/common/TimerManager.py b/src/python/lmi/common/TimerManager.py
new file mode 100644
index 0000000..01e53b8
--- /dev/null
+++ b/src/python/lmi/common/TimerManager.py
@@ -0,0 +1,421 @@
+# Copyright (C) 2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# -*- coding: utf-8 -*-
+"""
+Module with functionality to create timers, which can be used in CMPI providers.
+
+Default python threading.Timer is not suitable, because it creates thread
+for each timer, which is inefficient. In addition, each such thread would need
+to be registered at CIMOM to enable logging in timer callbacks.
+
+Usage:
+
+1. Initialize the TimerManager when your provider initializes!
+Otherwise you may encounter weird exceptions.
+
+2. When any provider needs timer, create it using Time.create_timer() to create
+Timer instance.
+
+3. Call Timer.start() to start the timer. It will call registered callback
+when the timer expires. The callback is called in context of TimerManager
+thread, which has enabled logging to CIMOM, i.e. the callback can log as usual.
+
+4. (optionally) cancel the timer before expiration using Timer.cancel().
+However, this does not guarantee that the timer callback won't be called -
+it may be already being scheduled / called.
+
+.. autoclass:: TimerManager
+ :members:
+
+.. autoclass:: Timer
+ :members:
+
+.. autoclass:: MonotonicClock
+ :members:
+"""
+
+import ctypes
+import lmi.common.singletonmixin as singletonmixin
+import threading
+import Queue
+import lmi.common.cmpi_logging as cmpi_logging
+
+class TimerException(Exception):
+ pass
+
+class MonotonicClock(object):
+ """
+ Monotonic clock, represented by clock_gettime() and CLOCK_MONOTONIC.
+ This clock is not influenced by NTP or administrator setting time or date.
+ """
+ CLOCK_MONOTONIC = ctypes.c_int(1)
+
+ class timespec(ctypes.Structure):
+ _fields_ = [
+ ("tv_sec", ctypes.c_long),
+ ("tv_nsec", ctypes.c_long)]
+
+ def __init__(self):
+ libc = ctypes.CDLL("librt.so.1")
+ self._clock_gettime = libc.clock_gettime
+
+ def now(self):
+ """
+ Return current time, i.e. float representing seconds with precision up
+ to nanoseconds (depends on glibc). The actual value of current time is
+ meaningless, it can be used only to measure time differences.
+
+ :returns: ``float`` with current time in seconds.
+ """
+ t = MonotonicClock.timespec(0, 0)
+ ret = self._clock_gettime(self.CLOCK_MONOTONIC, ctypes.pointer(t))
+
+ if ret < 0:
+ raise TimerException("Cannot get clock time, clock_gettime() failed.")
+ return t.tv_sec + t.tv_nsec * 10 ** (-9)
+
+class Timer(object):
+ """
+ A class representing a timer. A timer has a timeout and after the timeout,
+ given callback is called and the timer is deleted.
+ """
+
+ @cmpi_logging.trace_method
+ def __init__(self, timer_manager, name, callback=None, *args, **kwargs):
+ """
+ Create a timer. If specified, given callback is registered.
+ The callback is called with *args and **kwargs.
+
+ :param timer_manager: (``TimerManager)`` Instance of the timer manager
+ which will manage the timer.
+ :param name: (``string``) Name of the timer, used for logging.
+ :param callback: (``function``) Callback to call when the timer expires.
+ :param *args, **kwargs: Parameters of the callback.
+ """
+ self._mgr = timer_manager
+ self._name = name
+ self._callback = callback
+ self._args = args
+ self._kwargs = kwargs
+
+ cmpi_logging.logger.trace_info("Timer: Timer %s created" % name)
+
+ @cmpi_logging.trace_method
+ def set_callback(self, callback, *args, **kwargs):
+ """
+ Set callback to call when the timer expires.
+
+ :param callback: (``function``) Callback to call when the timer expires.
+ :param *args, **kwargs: Parameters of the callback.
+ """
+ self._callback = callback
+ self._args = args
+ self._kwargs = kwargs
+
+ @cmpi_logging.trace_method
+ def start(self, timeout):
+ """
+ Start the timer with given timeout. After the timeout, the registered
+ callback will be called.
+
+ :param timeout: (``float``) Timeout in seconds.
+ """
+
+ self._timeout = timeout
+ now = self._mgr.now()
+ self._end_time = now + timeout
+ cmpi_logging.logger.trace_info(
+ "Timer: Timer %s started at %f for %f seconds"
+ % (self._name, now, self._timeout))
+ self._mgr._add_timer(self)
+
+ @cmpi_logging.trace_method
+ def cancel(self):
+ """
+ Cancel the timer. This method does not guarantee that the callback won't
+ be called, the timer might be calling the callback right now,
+ """
+ cmpi_logging.logger.trace_info("Timer: Timer %s cancelled"
+ % (self._name))
+ self._mgr._remove_timer(self)
+
+ @cmpi_logging.trace_method
+ def _expired(self, now):
+ """
+ Returns True, if the timer is expired.
+
+ :param now: (``float``) Current time, as returned by MonotonicClock.now().
+ :returns: (``boolean``) ``True``, if the timer is expired.
+ """
+ if self._end_time <= now:
+ cmpi_logging.logger.trace_info("Timer: Timer %s has expired"
+ % (self._name))
+ return True
+ return False
+
+ @cmpi_logging.trace_method
+ def _expire(self):
+ """
+ Called when the timer expired. It calls the callback.
+ """
+ cmpi_logging.logger.trace_info("Timer: Calling callback for timer %s"
+ % (self._name))
+ self._callback(*self._args, **self._kwargs)
+
+class TimerManager(singletonmixin.Singleton):
+ """
+ Manages set of timers.
+
+ Python standard Timer class creates a thread for
+
+ each timer, which is inefficient. This class uses only one thread, which
+ is registered at CIMOM, i.e. it can log as usual.
+
+ This class is singleton, use TimerManager.get_instance() to get the
+ instance.
+
+ Still, the singleton needs to be initialized with ProviderEnvironment to
+ enable logging in the timer thread. Use TimerManager.get_instance(env) in
+ you provider initialization.
+ """
+
+ # Commands to the timer thread
+ COMMAND_STOP = 1
+ COMMAND_RESCHEDULE = 2
+
+ @cmpi_logging.trace_method
+ def __init__(self, env=None):
+ """
+ Initialize new thread manager.
+
+ :param env: (``ProviderEnvironment``) Environment to use for logging.
+ """
+ self._clock = MonotonicClock()
+ self._lock = threading.RLock()
+ self._queue = Queue.Queue()
+
+ # Array of timers. Assumption: nr. of timers is relatively small,
+ # i.e. hundreds at the worst.
+ self._timers = []
+
+ new_broker = None
+ if env:
+ broker = env.get_cimom_handle()
+ new_broker = broker.PrepareAttachThread()
+
+ self._timer_thread = threading.Thread(
+ target=self._timer_loop, args=(new_broker,))
+ self._timer_thread.daemon = False
+ self._timer_thread.start()
+
+ def create_timer(self, name, callback=None, *args, **kwargs):
+ """
+ Create new timer. If specified, given callback is registered.
+ The callback is called with *args and **kwargs.
+
+ :param name: (``string``) Name of the timer, used for logging.
+ :param callback: (``function``) Callback to call when the timer expires.
+ :param *args, **kwargs: Parameters of the callback.
+ """
+ return Timer(self, name, callback, *args, **kwargs)
+
+ def _timer_loop(self, broker):
+ """
+ TimerManager thread main loop. It waits for timeout of all timers
+ and calls their callbacks.
+
+ :param broker: (``BrokerCIMOMHandle``) CIM broker handle, used for
+ logging.
+ """
+ if broker:
+ broker.AttachThread()
+ cmpi_logging.logger.info("Started Timer thread.")
+ while True:
+ self._handle_expired()
+ timeout = self._find_timeout()
+ if timeout != 0:
+ # Wait for the timeout or any change in timers.
+ try:
+ command = self._queue.get(timeout=timeout)
+ self._queue.task_done()
+ if command == self.COMMAND_STOP:
+ break # stop the thread
+ # process COMMAND_RESCHEDULE in next loop
+ except Queue.Empty:
+ # Timeout has happened, ignore the exception.
+ pass
+ cmpi_logging.logger.info("Stopped Timer thread.")
+
+ @cmpi_logging.trace_method
+ def _handle_expired(self):
+ """
+ Finds all expired timers, calls their callback and removes them from
+ list of timers.
+ """
+
+ # Get list of expired timers.
+ with self._lock:
+ now = self.now()
+ cmpi_logging.logger.trace_info(
+ "Timer: Checking for expired, now=%f." % (now))
+ expired = [t for t in self._timers if t._expired(now)]
+
+ # Call the callbacks (unlocked!).
+ for t in expired:
+ t._expire()
+
+ # Remove the timers (locked).
+ with self._lock:
+ for t in expired:
+ try:
+ cmpi_logging.logger.trace_info(
+ "Timer: Removing %s" % (t._name))
+ self._timers.remove(t)
+ except ValueError:
+ # The timer has already been removed.
+ pass
+
+ @cmpi_logging.trace_method
+ def _find_timeout(self):
+ """
+ Return nearest timeout, in seconds (as float, i.e. subsecond timeout
+ is possible). If no timer is scheduled, None is returned.
+ If there are expired timers, 0 is returned.
+
+ :returns: Positive ``float``: Nearest timeout.
+ :returns: ``0``: Some timer has expired.
+ :returns: ``None``: No timer is scheduled.
+ """
+ with self._lock:
+ if not self._timers:
+ cmpi_logging.logger.trace_info(
+ "Timer: No timers scheduled, waiting forever.")
+ return None
+ closest = min(self._timers, key=lambda timer: timer._end_time)
+ now = self.now()
+ timeout = closest._end_time - now
+ if timeout > 0:
+ cmpi_logging.logger.trace_info(
+ "Timer: Waiting for %f seconds, now=%f."
+ % (timeout, now))
+ return timeout
+ cmpi_logging.logger.trace_info(
+ "Timer: Some timer has already expired, no waiting.")
+ return 0
+
+ @cmpi_logging.trace_method
+ def _add_timer(self, timer):
+ """
+ Adds timer to list of timers. The timer must be started, i.e. its
+ timeout must be nozero!
+ This is internal method called by Timer.start().
+
+ :param timer: (``Timer``) Timer to add.
+ """
+ with self._lock:
+ self._timers.append(timer)
+ # Wake up the timer manager thread.
+ self._queue.put(self.COMMAND_RESCHEDULE)
+ cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name))
+
+ @cmpi_logging.trace_method
+ def _remove_timer(self, timer):
+ """
+ Remove timer from list of timers.
+ This is internal method called by Timer.cancel().
+ :param timer: (``Timer``) Timer to remove.
+ """
+ with self._lock:
+ try:
+ self._timers.remove(timer)
+ except ValueError:
+ pass
+ # Wake up the timer manager thread.
+ self._queue.put(self.COMMAND_RESCHEDULE)
+ cmpi_logging.logger.trace_info("Timer: Timer %s removed"
+ % (timer._name))
+
+ def now(self):
+ """
+ Return current time, not influenced by NTP or admin setting date or
+ time. The actual value of current time is meaningless, it can be used
+ only to measure time differences.
+
+ :returns: ``float`` Current time, in seconds.
+ """
+ return self._clock.now()
+
+ @cmpi_logging.trace_method
+ def shutdown(self):
+ """
+ Stop the thread. This method blocks until the thread is safely
+ destroyed.
+ """
+ self._queue.put(self.COMMAND_STOP)
+ self._timer_thread.join()
+
+if __name__ == "__main__":
+ cmpi_logging.logger = cmpi_logging.CMPILogger("")
+ import time
+
+ class Env(object):
+ def AttachThread(self):
+ pass
+ def PrepareAttachThread(self):
+ return self
+ def get_cimom_handle(self):
+ return self
+
+ clock = MonotonicClock()
+
+ start = clock.now()
+ time.sleep(0.5)
+ print "Clock 0.5:", clock.now() - start
+
+ time.sleep(0.5)
+ print "Clock 1:", clock.now() - start
+
+ mgr = TimerManager.get_instance(Env())
+
+ def callback(msg):
+ if callback.first:
+ t = mgr.create_timer("internal 0.5")
+ t.set_callback(callback, "internal 0.5")
+ t.start(0.5)
+ callback.first = False
+
+ print clock.now(), msg
+
+ callback.first = True
+
+ t1 = mgr.create_timer("one second")
+ t1.set_callback(callback, "1")
+ t1.start(1)
+ t2 = mgr.create_timer("two seconds")
+ t2.set_callback(callback, "2")
+ t2.start(2)
+ t22 = mgr.create_timer("two seconds 2")
+ t22.set_callback(callback, "2 again")
+ t22.start(2)
+ t15 = mgr.create_timer("one+half seconds")
+ t15.set_callback(callback, "1.5")
+ t15.start(1.5)
+
+ time.sleep(4)
+
+ mgr.stop_thread()
diff --git a/src/python/lmi/common/__init__.py b/src/python/lmi/common/__init__.py
new file mode 100644
index 0000000..baebcdb
--- /dev/null
+++ b/src/python/lmi/common/__init__.py
@@ -0,0 +1,42 @@
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+
+"""
+Common utilities for OpenLMI python providers.
+"""
+def parse_instance_id(instance_id, classname=None):
+ """
+ Parse InstanceID, check it has LMI:<classname>:<ID> format and return
+ the ID. Return None if the format is bad.
+ :param instance_id: (``string``) String to parse.
+ :param classname: (``string``) Name of class, whose InstanceID we parse.
+ If the classname is None, it won't be checked.
+ :returns: ``string`` with the ID.
+ """
+ parts = instance_id.split(":", 2)
+ if len(parts) != 3:
+ return None
+ if parts[0] != "LMI":
+ return None
+ real_classname = parts[1]
+ if classname and real_classname.lower() != classname.lower():
+ return None
+ return parts[2]
diff --git a/src/python/lmi/common/cmpi_logging.py b/src/python/lmi/common/cmpi_logging.py
new file mode 100644
index 0000000..a97e4ab
--- /dev/null
+++ b/src/python/lmi/common/cmpi_logging.py
@@ -0,0 +1,204 @@
+# -*- Coding:utf-8 -*-
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+
+
+import logging
+import inspect
+import traceback
+
+TRACE_WARNING = logging.INFO - 1
+TRACE_INFO = logging.INFO - 2
+TRACE_VERBOSE = logging.DEBUG
+
+class CMPILogHandler(logging.Handler):
+ """
+ A handler class, which sends log messages to CMPI log.
+ """
+
+ def __init__(self, cmpi_logger, *args, **kwargs):
+ self.cmpi_logger = cmpi_logger
+ super(CMPILogHandler, self).__init__(*args, **kwargs)
+
+ def emit(self, record):
+ msg = self.format(record)
+ if record.levelno >= logging.ERROR:
+ self.cmpi_logger.log_error(msg)
+ elif record.levelno >= logging.WARNING:
+ self.cmpi_logger.log_warn(msg)
+ elif record.levelno >= logging.INFO:
+ self.cmpi_logger.log_info(msg)
+ elif record.levelno >= TRACE_WARNING:
+ self.cmpi_logger.trace_warn(record.filename, msg)
+ elif record.levelno >= TRACE_INFO:
+ self.cmpi_logger.trace_info(record.filename, msg)
+ elif record.levelno >= logging.DEBUG:
+ self.cmpi_logger.trace_verbose(record.filename, msg)
+
+class CMPILogger(logging.getLoggerClass()):
+ """
+ A logger class, which adds trace_method level log methods.
+ """
+ def trace_warn(self, msg, *args, **kwargs):
+ """ Log message with TRACE_WARNING severity. """
+ self.log(TRACE_WARNING, msg, *args, **kwargs)
+
+ def trace_info(self, msg, *args, **kwargs):
+ """ Log message with TRACE_INFO severity. """
+ self.log(TRACE_INFO, msg, *args, **kwargs)
+
+ def trace_verbose(self, msg, *args, **kwargs):
+ """ Log message with TRACE_VERBOSE severity. """
+ self.log(TRACE_VERBOSE, msg, *args, **kwargs)
+
+logging.setLoggerClass(CMPILogger)
+
+def trace_method(func):
+ """ Decorator, trace entry and exit for a class method. """
+ classname = inspect.getouterframes(inspect.currentframe())[1][3]
+ def helper_func(*args, **kwargs):
+ """
+ Helper function, wrapping real function by trace_method decorator.
+ """
+ logger.log(TRACE_VERBOSE, "Entering %s.%s", classname, func.__name__)
+ try:
+ ret = func(*args, **kwargs)
+ except Exception as exc:
+ if getattr(exc, "tb_printed", False) is False:
+ logger.exception("full traceback")
+ logger.log(TRACE_VERBOSE, "traceback: %s",
+ traceback.format_exc())
+ exc.tb_printed = True
+ logger.log(TRACE_WARNING, "%s.%s threw exception %s",
+ classname, func.__name__, str(exc))
+ raise
+ logger.log(TRACE_VERBOSE, "Exiting %s.%s", classname, func.__name__)
+ return ret
+ helper_func.__name__ = func.__name__
+ helper_func.__doc__ = func.__doc__
+ helper_func.__module__ = func.__module__
+ return helper_func
+
+def trace_function(func):
+ """ Decorator, trace entry and exit for a function outside any class. """
+ def helper_func(*args, **kwargs):
+ """
+ Helper function, wrapping real function by trace_method decorator.
+ """
+ logger.log(TRACE_VERBOSE, "Entering %s.%s",
+ func.__module__, func.__name__)
+ try:
+ ret = func(*args, **kwargs)
+ except Exception as exc:
+ if getattr(exc, "tb_printed", False) is False:
+ logger.exception("full traceback")
+ logger.log(TRACE_VERBOSE, "traceback: %s",
+ traceback.format_exc())
+ exc.tb_printed = True
+ logger.log(TRACE_WARNING, "%s.%s threw exception %s",
+ func.__module__, func.__name__, str(exc))
+ raise
+ logger.log(TRACE_VERBOSE, "Exiting %s.%s",
+ func.__module__, func.__name__)
+ return ret
+ helper_func.__name__ = func.__name__
+ helper_func.__doc__ = func.__doc__
+ helper_func.__module__ = func.__module__
+ return helper_func
+
+class LogManager(object):
+ """
+ Class, which takes care of CMPI logging.
+ There should be only one instance of this class and it should be
+ instantiated as soon as possible, even before reading a config.
+ The config file can be provided later by set_config call.
+ """
+ FORMAT_STDERR = '%(levelname)s: %(message)s'
+ FORMAT_CMPI = '%(levelname)s: %(message)s'
+
+ LOGGER_NAME = "lmi.storage"
+
+ def __init__(self, env):
+ """
+ Initialize logging.
+ """
+ formatter = logging.Formatter(self.FORMAT_CMPI)
+
+ self.cmpi_handler = CMPILogHandler(env.get_logger())
+ self.cmpi_handler.setLevel(logging.DEBUG)
+ self.cmpi_handler.setFormatter(formatter)
+
+ self.logger = logging.getLogger(self.LOGGER_NAME)
+ self.logger.addHandler(self.cmpi_handler)
+ self.logger.setLevel(logging.INFO)
+
+ self.stderr_handler = None
+ self.config = None
+
+ global logger # IGNORE:W0603
+ logger = self.logger
+ logger.info("CMPI log started")
+
+ @trace_method
+ def set_config(self, config):
+ """
+ Set a configuration of logging. It applies its setting immediately
+ and also subscribes for configuration changes.
+ """
+ self.config = config
+ config.add_listener(self._config_changed)
+ # apply the config
+ self._config_changed(config)
+
+ @trace_method
+ def _config_changed(self, config):
+ """
+ Apply changed configuration, i.e. start/stop sending to stderr
+ and set appropriate log level.
+ """
+ if config.tracing:
+ self.logger.setLevel(logging.DEBUG)
+ else:
+ self.logger.setLevel(logging.INFO)
+ if config.stderr:
+ # start sending to stderr
+ if not self.stderr_handler:
+ # create stderr handler
+ formatter = logging.Formatter(self.FORMAT_STDERR)
+ self.stderr_handler = logging.StreamHandler()
+ self.stderr_handler.setLevel(logging.DEBUG)
+ self.stderr_handler.setFormatter(formatter)
+ self.logger.addHandler(self.stderr_handler)
+ self.logger.info("Started logging to stderr.")
+ else:
+ # stop sending to stderr
+ if self.stderr_handler:
+ self.logger.info("Stopped logging to stderr.")
+ self.logger.removeHandler(self.stderr_handler)
+ self.stderr_handler = None
+
+ def destroy(self):
+ if self.stderr_handler:
+ self.logger.removeHandler(self.stderr_handler)
+ self.stderr_handler = None
+ self.logger.removeHandler(self.cmpi_handler)
+ self.cmpi_handler = None
+ self.config.remove_listener(self._config_changed)
+
+logger = None
diff --git a/src/python/lmi/common/singletonmixin.py b/src/python/lmi/common/singletonmixin.py
new file mode 100644
index 0000000..c252676
--- /dev/null
+++ b/src/python/lmi/common/singletonmixin.py
@@ -0,0 +1,560 @@
+#pylint: disable-all
+"""
+A Python Singleton mixin class that makes use of some of the ideas
+found at http://c2.com/cgi/wiki?PythonSingleton. Just inherit
+from it and you have a singleton. No code is required in
+subclasses to create singleton behavior -- inheritance from
+Singleton is all that is needed.
+
+Singleton creation is threadsafe.
+
+USAGE:
+
+Just inherit from Singleton. If you need a constructor, include
+an __init__() method in your class as you usually would. However,
+if your class is S, you instantiate the singleton using S.get_instance()
+instead of S(). Repeated calls to S.get_instance() return the
+originally-created instance.
+
+For example:
+
+class S(Singleton):
+
+ def __init__(self, a, b=1):
+ pass
+
+S1 = S.get_instance(1, b=3)
+
+
+Most of the time, that's all you need to know. However, there are some
+other useful behaviors. Read on for a full description:
+
+1) Getting the singleton:
+
+ S.get_instance()
+
+returns the instance of S. If none exists, it is created.
+
+2) The usual idiom to construct an instance by calling the class, i.e.
+
+ S()
+
+is disabled for the sake of clarity.
+
+For one thing, the S() syntax means instantiation, but get_instance()
+usually does not cause instantiation. So the S() syntax would
+be misleading.
+
+Because of that, if S() were allowed, a programmer who didn't
+happen to notice the inheritance from Singleton (or who
+wasn't fully aware of what a Singleton pattern
+does) might think he was creating a new instance,
+which could lead to very unexpected behavior.
+
+So, overall, it is felt that it is better to make things clearer
+by requiring the call of a class method that is defined in
+Singleton. An attempt to instantiate via S() will result
+in a SingletonException being raised.
+
+3) Use __S.__init__() for instantiation processing,
+since S.get_instance() runs S.__init__(), passing it the args it has received.
+
+If no data needs to be passed in at instantiation time,
+you don't need S.__init__().
+
+4) If S.__init__(.) requires parameters, include them ONLY in the
+first call to S.get_instance(). If subsequent calls have arguments,
+a SingletonException is raised by default.
+
+If you find it more convenient for subsequent calls to be allowed to
+have arguments, but for those argumentsto be ignored, just include
+'ignoreSubsequent = True' in your class definition, i.e.:
+
+ class S(Singleton):
+
+ ignoreSubsequent = True
+
+ def __init__(self, a, b=1):
+ pass
+
+5) For testing, it is sometimes convenient for all existing singleton
+instances to be forgotten, so that new instantiations can occur. For that
+reason, a _forget_all_singletons() function is included. Just call
+
+ _forget_all_singletons()
+
+and it is as if no earlier instantiations have occurred.
+
+6) As an implementation detail, classes that inherit
+from Singleton may not have their own __new__
+methods. To make sure this requirement is followed,
+an exception is raised if a Singleton subclass includ
+es __new__. This happens at subclass instantiation
+time (by means of the MetaSingleton metaclass.
+
+
+By Gary Robinson, grobinson@flyfi.com. No rights reserved --
+placed in the public domain -- which is only reasonable considering
+how much it owes to other people's code and ideas which are in the
+public domain. The idea of using a metaclass came from
+a comment on Gary's blog (see
+http://www.garyrobinson.net/2004/03/python_singleto.html#comments).
+Other improvements came from comments and email from other
+people who saw it online. (See the blog post and comments
+for further credits.)
+
+Not guaranteed to be fit for any particular purpose. Use at your
+own risk.
+"""
+
+import threading
+
+class SingletonException(Exception):
+ """
+ Base exception related to singleton handling.
+ """
+ pass
+
+_ST_SINGLETONS = set()
+_LOCK_FOR_SINGLETONS = threading.RLock()
+# Ensure only one instance of each Singleton class is created. This is not
+# bound to the _LOCK_FOR_SINGLETON_CREATION = threading.RLock() individual
+# Singleton class since we need to ensure that there is only one mutex for each
+# Singleton class, which would require having a lock when setting up the
+# Singleton class, which is what this is anyway. So, when any Singleton is
+# created, we lock this lock and then we don't need to lock it again for that
+# class.
+_LOCK_FOR_SINGLETON_CREATION = threading.RLock()
+
+def _create_singleton_instance(cls, lst_args, dct_kw_args):
+ """
+ Creates singleton instance and stores its class in set.
+ """
+ _LOCK_FOR_SINGLETON_CREATION.acquire()
+ try:
+ if cls._is_instantiated(): # some other thread got here first
+ return
+
+ instance = cls.__new__(cls)
+ try:
+ instance.__init__(*lst_args, **dct_kw_args)
+ except TypeError, exc:
+ if '__init__() takes' in exc.message:
+ raise SingletonException, (
+ 'If the singleton requires __init__ args,'
+ ' supply them on first call to get_instance().')
+ else:
+ raise
+ cls.c_instance = instance
+ _add_singleton(cls)
+ finally:
+ _LOCK_FOR_SINGLETON_CREATION.release()
+
+def _add_singleton(cls):
+ """
+ Adds class to singleton set.
+ """
+ _LOCK_FOR_SINGLETONS.acquire()
+ try:
+ assert cls not in _ST_SINGLETONS
+ _ST_SINGLETONS.add(cls)
+ finally:
+ _LOCK_FOR_SINGLETONS.release()
+
+def _remove_singleton(cls):
+ """
+ Removes class from singleton set.
+ """
+ _LOCK_FOR_SINGLETONS.acquire()
+ try:
+ if cls in _ST_SINGLETONS:
+ _ST_SINGLETONS.remove(cls)
+ finally:
+ _LOCK_FOR_SINGLETONS.release()
+
+def _forget_all_singletons():
+ '''
+ This is useful in tests, since it is hard to know which singletons need
+ to be cleared to make a test work.
+ '''
+ _LOCK_FOR_SINGLETONS.acquire()
+ try:
+ for cls in _ST_SINGLETONS.copy():
+ cls._forget_class_instance_reference_for_testing()
+
+ # Might have created some Singletons in the process of tearing down.
+ # Try one more time - there should be a limit to this.
+ i_num_singletons = len(_ST_SINGLETONS)
+ if len(_ST_SINGLETONS) > 0:
+ for cls in _ST_SINGLETONS.copy():
+ cls._forget_class_instance_reference_for_testing()
+ i_num_singletons -= 1
+ assert i_num_singletons == len(_ST_SINGLETONS), \
+ 'Added a singleton while destroying ' + str(cls)
+ assert len(_ST_SINGLETONS) == 0, _ST_SINGLETONS
+ finally:
+ _LOCK_FOR_SINGLETONS.release()
+
+class MetaSingleton(type):
+ """
+ Metaclass for Singleton base class.
+ """
+ def __new__(mcs, str_name, tup_bases, dct):
+ if dct.has_key('__new__'):
+ raise SingletonException, 'Can not override __new__ in a Singleton'
+ return super(MetaSingleton, mcs).__new__(
+ mcs, str_name, tup_bases, dct)
+
+ def __call__(cls, *lst_args, **dictArgs):
+ raise SingletonException, \
+ 'Singletons may only be instantiated through get_instance()'
+
+class Singleton(object):
+ """
+ Base class for all singletons.
+ """
+ __metaclass__ = MetaSingleton
+
+ def get_instance(cls, *lst_args, **dct_kw_args):
+ """
+ Call this to instantiate an instance or retrieve the existing instance.
+ If the singleton requires args to be instantiated, include them the first
+ time you call get_instance.
+ """
+ if cls._is_instantiated():
+ if ( (lst_args or dct_kw_args)
+ and not hasattr(cls, 'ignoreSubsequent')):
+ raise SingletonException, (
+ 'Singleton already instantiated, but get_instance()'
+ ' called with args.')
+ else:
+ _create_singleton_instance(cls, lst_args, dct_kw_args)
+
+ return cls.c_instance #pylint: disable=E1101
+ get_instance = classmethod(get_instance)
+
+ def _is_instantiated(cls):
+ """
+ Don't use hasattr(cls, 'c_instance'), because that screws things
+ up if there is a singleton that extends another singleton.
+ hasattr looks in the base class if it doesn't find in subclass.
+ """
+ return 'c_instance' in cls.__dict__
+ _is_instantiated = classmethod(_is_instantiated)
+
+ # This can be handy for public use also
+ isInstantiated = _is_instantiated
+
+ def _forget_class_instance_reference_for_testing(cls):
+ """
+ This is designed for convenience in testing -- sometimes you
+ want to get rid of a singleton during test code to see what
+ happens when you call get_instance() under a new situation.
+
+ To really delete the object, all external references to it
+ also need to be deleted.
+ """
+ try:
+ if hasattr(cls.c_instance, '_prepare_to_forget_singleton'):
+ # tell instance to release anything it might be holding onto.
+ cls.c_instance._prepare_to_forget_singleton()
+ del cls.c_instance
+ _remove_singleton(cls)
+ except AttributeError:
+ # run up the chain of base classes until we find the one that has
+ # the instance and then delete it there
+ for base_class in cls.__bases__:
+ if issubclass(base_class, Singleton):
+ base_class._forget_class_instance_reference_for_testing()
+ _forget_class_instance_reference_for_testing = classmethod(
+ _forget_class_instance_reference_for_testing)
+
+
+if __name__ == '__main__':
+
+ import unittest
+ import time
+
+ class SingletonMixinPublicTestCase(unittest.TestCase):
+ """
+ TestCase for singleton class.
+ """
+ def testReturnsSameObject(self): #pylint: disable=C0103
+ """
+ Demonstrates normal use -- just call get_instance and it returns a singleton instance
+ """
+
+ class Foo(Singleton):
+ """Singleton child class."""
+ def __init__(self):
+ super(Foo, self).__init__()
+
+ a1 = Foo.get_instance()
+ a2 = Foo.get_instance()
+ self.assertEquals(id(a1), id(a2))
+
+ def testInstantiateWithMultiArgConstructor(self):#pylint: disable=C0103
+ """
+ If the singleton needs args to construct, include them in the first
+ call to get instances.
+ """
+
+ class Bar(Singleton):
+ """Singleton child class."""
+
+ def __init__(self, arg1, arg2):
+ super(Bar, self).__init__()
+ self.arg1 = arg1
+ self.arg2 = arg2
+
+ b1 = Bar.get_instance('arg1 value', 'arg2 value')
+ b2 = Bar.get_instance()
+ self.assertEquals(b1.arg1, 'arg1 value')
+ self.assertEquals(b1.arg2, 'arg2 value')
+ self.assertEquals(id(b1), id(b2))
+
+ def testInstantiateWithKeywordArg(self):
+ """
+ Test instantiation with keyword arguments.
+ """
+
+ class Baz(Singleton):
+ """Singleton child class."""
+ def __init__(self, arg1=5):
+ super(Baz, self).__init__()
+ self.arg1 = arg1
+
+ b1 = Baz.get_instance('arg1 value')
+ b2 = Baz.get_instance()
+ self.assertEquals(b1.arg1, 'arg1 value')
+ self.assertEquals(id(b1), id(b2))
+
+ def testTryToInstantiateWithoutNeededArgs(self):
+ """
+ This tests, improper instantiation.
+ """
+
+ class Foo(Singleton):
+ """Singleton child class."""
+ def __init__(self, arg1, arg2):
+ super(Foo, self).__init__()
+ self.arg1 = arg1
+ self.arg2 = arg2
+
+ self.assertRaises(SingletonException, Foo.get_instance)
+
+ def testPassTypeErrorIfAllArgsThere(self):
+ """
+ Make sure the test for capturing missing args doesn't interfere
+ with a normal TypeError.
+ """
+ class Bar(Singleton):
+ """Singleton child class."""
+ def __init__(self, arg1, arg2):
+ super(Bar, self).__init__()
+ self.arg1 = arg1
+ self.arg2 = arg2
+ raise TypeError, 'some type error'
+
+ self.assertRaises(TypeError, Bar.get_instance, 1, 2)
+
+ def testTryToInstantiateWithoutGetInstance(self):
+ """
+ Demonstrates that singletons can ONLY be instantiated through
+ get_instance, as long as they call Singleton.__init__ during
+ construction.
+
+ If this check is not required, you don't need to call
+ Singleton.__init__().
+ """
+
+ class A(Singleton):
+ def __init__(self):
+ super(A, self).__init__()
+
+ self.assertRaises(SingletonException, A)
+
+ def testDontAllowNew(self):
+
+ def instantiatedAnIllegalClass():
+ class A(Singleton):
+ def __init__(self):
+ super(A, self).__init__()
+
+ def __new__(metaclass, str_name, tup_bases, dct):
+ return super(MetaSingleton, metaclass).__new__(
+ metaclass, str_name, tup_bases, dct)
+
+ self.assertRaises(SingletonException, instantiatedAnIllegalClass)
+
+
+ def testDontAllowArgsAfterConstruction(self):
+ class B(Singleton):
+
+ def __init__(self, arg1, arg2):
+ super(B, self).__init__()
+ self.arg1 = arg1
+ self.arg2 = arg2
+
+ B.get_instance('arg1 value', 'arg2 value')
+ self.assertRaises(SingletonException, B, 'arg1 value', 'arg2 value')
+
+ def test_forgetClassInstanceReferenceForTesting(self):
+ class A(Singleton):
+ def __init__(self):
+ super(A, self).__init__()
+ class B(A):
+ def __init__(self):
+ super(B, self).__init__()
+
+ # check that changing the class after forgetting the instance
+ # produces an instance of the new class
+ a = A.get_instance()
+ assert a.__class__.__name__ == 'A'
+ A._forget_class_instance_reference_for_testing()
+ b = B.get_instance()
+ assert b.__class__.__name__ == 'B'
+
+ # check that invoking the 'forget' on a subclass still deletes
+ # the instance
+ B._forget_class_instance_reference_for_testing()
+ a = A.get_instance()
+ B._forget_class_instance_reference_for_testing()
+ b = B.get_instance()
+ assert b.__class__.__name__ == 'B'
+
+ def test_forgetAllSingletons(self):
+ # Should work if there are no singletons
+ _forget_all_singletons()
+
+ class A(Singleton):
+ ciInitCount = 0
+ def __init__(self):
+ super(A, self).__init__()
+ A.ciInitCount += 1
+
+ A.get_instance()
+ self.assertEqual(A.ciInitCount, 1)
+
+ A.get_instance()
+ self.assertEqual(A.ciInitCount, 1)
+
+ _forget_all_singletons()
+ A.get_instance()
+ self.assertEqual(A.ciInitCount, 2)
+
+ def test_threadedCreation(self):
+ # Check that only one Singleton is created even if multiple threads
+ # try at the same time. If fails, would see assert in _add_singleton
+ class Test_Singleton(Singleton):
+ def __init__(self):
+ super(Test_Singleton, self).__init__()
+
+ class Test_SingletonThread(threading.Thread):
+ def __init__(self, fTargetTime):
+ super(Test_SingletonThread, self).__init__()
+ self._fTargetTime = fTargetTime
+ self._eException = None
+
+ def run(self):
+ try:
+ fSleepTime = self._fTargetTime - time.time()
+ if fSleepTime > 0:
+ time.sleep(fSleepTime)
+ Test_Singleton.get_instance()
+ except Exception, exc:
+ self._eException = exc
+
+ fTargetTime = time.time() + 0.1
+ lstThreads = []
+ for _ in xrange(100):
+ t = Test_SingletonThread(fTargetTime)
+ t.start()
+ lstThreads.append(t)
+ eException = None
+ for t in lstThreads:
+ t.join()
+ if t._eException and not eException:
+ eException = t._eException
+ if eException:
+ raise eException
+
+ def testNoInit(self):
+ """
+ Demonstrates use with a class not defining __init__
+ """
+
+ class A(Singleton):
+ pass
+
+ #INTENTIONALLY UNDEFINED:
+ #def __init__(self):
+ # super(A, self).__init__()
+
+ A.get_instance() #Make sure no exception is raised
+
+ def testMultipleGetInstancesWithArgs(self):
+
+ class A(Singleton):
+
+ ignoreSubsequent = True
+
+ def __init__(self, a, b=1):
+ pass
+
+ a1 = A.get_instance(1)
+ # ignores the second call because of ignoreSubsequent
+ a2 = A.get_instance(2)
+
+ class B(Singleton):
+
+ def __init__(self, a, b=1):
+ pass
+
+ b1 = B.get_instance(1)
+ # No ignoreSubsequent included
+ self.assertRaises(SingletonException, B.get_instance, 2)
+
+ class C(Singleton):
+
+ def __init__(self, a=1):
+ pass
+
+ c1 = C.get_instance(a=1)
+ # No ignoreSubsequent included
+ self.assertRaises(SingletonException, C.get_instance, a=2)
+
+ def testInheritance(self):
+ """
+ It's sometimes said that you can't subclass a singleton (see, for instance,
+ http://steve.yegge.googlepages.com/singleton-considered-stupid point e). This
+ test shows that at least rudimentary subclassing works fine for us.
+ """
+
+ class A(Singleton):
+
+ def set_x(self, x):
+ self.x = x
+
+ def setZ(self, z):
+ raise NotImplementedError
+
+ class B(A):
+
+ def set_x(self, x):
+ self.x = -x
+
+ def set_y(self, y):
+ self.y = y
+
+ a = A.get_instance()
+ a.set_x(5)
+ b = B.get_instance()
+ b.set_x(5)
+ b.set_y(50)
+ self.assertEqual((a.x, b.x, b.y), (5, -5, 50))
+ self.assertRaises(AttributeError, eval, 'a.set_y', {}, locals())
+ self.assertRaises(NotImplementedError, b.setZ, 500)
+
+ unittest.main()
+