summaryrefslogtreecommitdiffstats
path: root/src/python/lmi/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/lmi/common')
-rw-r--r--src/python/lmi/common/BaseConfiguration.py267
-rw-r--r--src/python/lmi/common/IndicationManager.py758
-rw-r--r--src/python/lmi/common/JobManager.py1670
-rw-r--r--src/python/lmi/common/TimerManager.py421
-rw-r--r--src/python/lmi/common/__init__.py42
-rw-r--r--src/python/lmi/common/cmpi_logging.py204
-rw-r--r--src/python/lmi/common/singletonmixin.py560
7 files changed, 0 insertions, 3922 deletions
diff --git a/src/python/lmi/common/BaseConfiguration.py b/src/python/lmi/common/BaseConfiguration.py
deleted file mode 100644
index f54de03..0000000
--- a/src/python/lmi/common/BaseConfiguration.py
+++ /dev/null
@@ -1,267 +0,0 @@
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Jan Safranek <jsafrane@redhat.com>
-# Authors: Michal Minar <miminar@redhat.com>
-# -*- coding: utf-8 -*-
-"""
-Module for BaseConfiguration class.
-
-BaseConfiguration
---------------------
-
-.. autoclass:: BaseConfiguration
- :members:
-
-"""
-
-import ConfigParser
-import logging
-import os
-import socket
-from lmi.common.singletonmixin import Singleton
-
-def convert_value(section, option, convert_func, value):
- """
- Return result of application of ``convert_func`` on value.
- If the conversion failes, error is logged and ValueError is raised.
-
- :param section: (``str``) Section of configuration file. Used for
- error message.
- :param option: (``str``) Option of configuration file. Used for
- error message.
- :param convert_func: (``type``) Conversion function to apply on passed
- value.
- :param value: (``basestring``) Value to convert.
- """
- if not isinstance(value, basestring):
- raise TypeError("value must be a string")
- try:
- if convert_func is bool:
- return value.lower() in ('1', 'y', 'yes', 'on', 'true')
- if convert_func is str and isinstance(value, unicode):
- return value.encode('utf-8')
- if convert_func is unicode and isinstance(value, str):
- return value.decode('utf-8')
- return convert_func(value)
- except ValueError as exc:
- logging.getLogger(__name__).error(
- 'failed to convert value of "[%s]%s: %s', section, option,
- exc)
- raise
-
-class BaseConfiguration(Singleton):
- """
- OpenLMI configuration file. By default, it resides in
- /etc/openlmi/${provider_prefix}/${provider_prefix}.conf.
-
- There should be only one instance of this class.
- """
-
- CONFIG_DIRECTORY_TEMPLATE = '/etc/openlmi/%(provider_prefix)s/'
- CONFIG_FILE_PATH_TEMPLATE = \
- CONFIG_DIRECTORY_TEMPLATE + '%(provider_prefix)s.conf'
-
- PERSISTENT_PATH_TEMPLATE = '/var/lib/openlmi-%(provider_prefix)s/'
- SETTINGS_DIR = 'settings/'
-
- DEFAULT_OPTIONS = {
- 'Namespace' : 'root/cimv2',
- 'SystemClassName' : 'Linux_ComputerSystem',
- # Default logging level
- "Level" : "ERROR",
- 'DebugBlivet' : 'false',
- 'Stderr' : 'false',
- }
-
- @classmethod
- def provider_prefix(cls):
- """
- This is responsibility of a subclass.
-
- :rtype: (``string`) Prefix of providers in lowercase. For example
- configuration class for storage providers would return "storage".
-
- Result is used to construct configuration paths.
- """
- raise NotImplementedError
-
- @classmethod
- def default_options(cls):
- """ :rtype: (``dict``) Dictionary of default values. """
- return cls.DEFAULT_OPTIONS
-
- @classmethod
- def config_directory(cls):
- """ Base directory with configuration settings. """
- return cls.CONFIG_DIRECTORY_TEMPLATE % {
- 'provider_prefix' : cls.provider_prefix() }
-
- @classmethod
- def persistent_path(cls):
- """ Base directory with persistent settings. """
- return cls.PERSISTENT_PATH_TEMPLATE % {
- 'provider_prefix': cls.provider_prefix() }
-
- @classmethod
- def config_file_path(cls):
- """ File path of configuration file. """
- return cls.CONFIG_FILE_PATH_TEMPLATE % {
- 'provider_prefix' : cls.provider_prefix() }
-
- @classmethod
- def mandatory_sections(cls):
- """
- Return list of sections, that must be present in configuration
- file. If not present, they will be created in memory.
- """
- return ['Log', 'CIM']
-
- def __init__(self):
- """ Initialize and load a configuration file."""
- self._listeners = set()
- self.config = ConfigParser.SafeConfigParser(
- defaults=self.default_options())
- self.load()
-
- def add_listener(self, callback):
- """
- Add a callback, which will be called when configuration is updated.
- The callback will be called with instance of this class as
- parameter:
- callback(config)
- """
- self._listeners.add(callback)
-
- def remove_listener(self, callback):
- """
- Remove previously registered callback.
- """
-
- self._listeners.remove(callback)
-
- def _call_listeners(self):
- """
- Call all listeners that configuration has updated.
- """
- for callback in self._listeners:
- callback(self)
-
- def load(self):
- """
- Load configuration from config file path.
- The file does not need to exist.
- """
- self.config.read(self.config_file_path())
- for section in self.mandatory_sections():
- if not self.config.has_section(section):
- self.config.add_section(section)
- self._call_listeners()
-
- @property
- def namespace(self):
- """ Return namespace of OpenLMI provider. """
- return self.config.get('CIM', 'Namespace')
-
- @property
- def system_class_name(self):
- """ Return SystemClassName of OpenLMI provider. """
- return self.config.get('CIM', 'SystemClassName')
-
- @property
- def system_name(self):
- """ Return SystemName of OpenLMI provider. """
- return socket.getfqdn()
-
- @property
- def logging_level(self):
- """ Return name of logging level in lower case. """
- return self.config.get('Log', 'Level').lower()
-
- @property
- def stderr(self):
- """ Return True if logging to stderr is enabled. """
- return self.config.getboolean('Log', 'Stderr')
-
- def file_path(self, section, option):
- """
- Return absolute file path for requested option.
- Relative path is converted to absolute one with config's directory
- as a prefix.
- """
- path = self.config.get(section, option)
- if not os.path.isabs(path):
- path = os.path.join(self.config_directory(), path)
- return path
-
- def get_safe(self, section, option, convert=str, fallback=None,
- *args, **kwargs):
- """
- Get the configuration option value as specified type in a safe way.
- Value is searched in this order:
- config_file -> defaults_dict -> fallback
-
- :param section: (``str``) Section name of option.
- :param option: (``str``) Option name.
- :param convert: (``type``) Is a conversion function for obtained
- value. If the value could not be converted, error message is
- generated and ``fallback`` is returned. This function is not
- applied to ``fallback`` value. Supported values are:
- str, unicode, int ,float, long, bool
-
- :param fallback: Value returned, when section or option does not
- exists and no default value is given, or when the obtained value
- could not be converted by supplied function.
-
- All the other parameters are passed to the ``SafeConfigParser.get()``
- method.
- """
- if not isinstance(section, basestring):
- raise TypeError('section must be a string')
- if not isinstance(option, basestring):
- raise TypeError("option must be a string")
- if not convert in (str, unicode, int, float, long, bool):
- raise ValueError("unsupported type for conversion: %s:",
- getattr(convert, '__name__', 'unknown'))
- if ( not self.config.has_option(section, option)
- and not option.lower() in self.default_options()):
- logging.getLogger(__name__).warn(
- 'no option value and no default supplied for "[%s]%s"',
- section, option)
- return fallback
- try:
- value = self.config.get(section, option, *args, **kwargs)
- except ConfigParser.Error as exc:
- logging.getLogger(__name__).error(
- 'failed to get value of "[%s]%s": %s', section, option,
- exc)
- return fallback
- try:
- # first try to convert value from config
- return convert_value(section, option, convert, value)
- except ValueError as exc:
- logging.getLogger(__name__).error(
- 'failed to convert value of "[%s]%s: %s', section, option,
- exc)
- # if it failes, try the value from defaults
- if ( option.lower() in self.default_options()
- and self.default_options()[option.lower()] != value):
- try:
- return convert_value(section, option, convert,
- self.default_options()[option.lower()])
- except ValueError:
- pass # error is already logged, no more options left
- return fallback
diff --git a/src/python/lmi/common/IndicationManager.py b/src/python/lmi/common/IndicationManager.py
deleted file mode 100644
index 0467227..0000000
--- a/src/python/lmi/common/IndicationManager.py
+++ /dev/null
@@ -1,758 +0,0 @@
-# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Jan Safranek <jsafrane@redhat.com>
-# Authors: Michal Minar <miminar@redhat.com>
-# -*- coding: utf-8 -*-
-"""
- .. autoclass:: IndicationManager
- :members:
-"""
-
-import pywbem
-from Queue import Queue
-import re
-import socket
-import threading
-
-import lmi.common.cmpi_logging as cmpi_logging
-from lmi.common import singletonmixin
-
-RE_FILTER_NAME = re.compile(r'^(?P<prefix>lmi:'
- r'(?P<class_name>[a-z0-9_]+):)(?P<filter_id>.*)$', re.IGNORECASE)
-
-FILTER_DEFAULTS = {
- "SourceNamespace" : "root/cimv2",
- "SourceNamespaces" : ["root/cimv2"],
- "QueryLanguage" : "CIM:CQL"
-}
-
-@cmpi_logging.trace_function
-def enumerate_namespaces(ch):
- """
- Return tuple ``([CIM_Namespace instance, ...], ns_interop)``. Where
- first item is a list of object paths of all namespaces in broker and
- the second is a name of namespace, where this information can be found.
-
- :param ch: CIMOM handle.
- """
- nsclasses = ["CIM_Namespace", "__Namespace"]
- namespaces = ['root/cimv2', 'root/PG_InterOp', 'Interop',
- 'interop', 'root', 'root/interop']
- nspaths = []
- ns = None
- for cls in nsclasses:
- for ns in namespaces:
- try:
- nspaths = [nm for nm in ch.EnumerateInstanceNames(ns, cls)]
- if nspaths:
- break
- except pywbem.CIMError as exc:
- if exc[0] in (
- pywbem.CIM_ERR_INVALID_NAMESPACE,
- pywbem.CIM_ERR_NOT_SUPPORTED,
- pywbem.CIM_ERR_INVALID_CLASS):
- pass
- if exc[0] == pywbem.CIM_ERR_FAILED:
- cmpi_logging.logger.error("EnumerateInstanceNames failed"
- " for %s:%s: %s", ns, cls, str(exc))
- else:
- raise
- if nspaths:
- break
- if not nspaths:
- cmpi_logging.logger.error("failed to enumerate namespaces")
- ns = None
- return (nspaths, ns)
-
-@cmpi_logging.trace_function
-def find_ns_interop(ch):
- """
- Return name of interop namespace, where ``CIM_IndicationFilter``
- class reside.
-
- :param ch: CIMOM handle.
- """
- _, ns_interop = enumerate_namespaces(ch)
- return ns_interop
-
-def make_filter_name(class_name, fltr_id):
- """
- Return value for ``CIM_IndicationFilter.Name`` property.
- """
- return "LMI:%s:%s" % (class_name, fltr_id)
-
-def parse_filter_name(name):
- """
- Return tuple ``(class_name, filter_id)``.
-
- :param name: (``string``) Value of cim filter's *Name* property.
- """
- match = RE_FILTER_NAME.match(name)
- if not match:
- raise ValueError('Could not parse filter name: "%s"' % name)
- return (match.group("class_name"), match.group("filter_id"))
-
-@cmpi_logging.trace_function
-def make_indication_filter_path(class_name, fltr_id, ns_interop):
- """
- Return CIM_IndicationFilter instance path for given filter id.
-
- :param class_name: (``string``) *Scoped class* name.
- :param fltr_id: (``string``) Filter name.
- :param ns_interop: (``string``) Interop namespace.
- """
- for arg in ('class_name', 'fltr_id', 'ns_interop'):
- if not isinstance(locals()[arg], basestring):
- raise TypeError("%s must be basestring" % arg)
- cop = pywbem.CIMInstanceName("CIM_IndicationFilter",
- namespace=ns_interop)
- cop['CreationClassName'] = 'CIM_IndicationFilter'
- cop['SystemCreationClassName'] = 'CIM_ComputerSystem'
- cop['SystemName'] = socket.gethostname()
- cop['Name'] = make_filter_name(class_name, fltr_id)
- return cop
-
-@cmpi_logging.trace_function
-def remove_cimom_filter(ch, fltr_path):
- """
- Deletes instance of CIM_IndicationFilter installed at broker with all
- referencing subscriptions.
-
- Returns list of subscription instace names, that were deleted.
-
- :param ch: CIMOM handle.
- :param fltr_path: (``CIMInstanceName``) Path of ``CIM_IndicationFilter`` to
- remove.
- """
- if not isinstance(fltr_path, pywbem.CIMInstanceName):
- raise TypeError("fltr_path must be a CIMInstanceName")
-
- referents = []
- for ref in ch.AssociatorNames(fltr_path,
- role="Filter",
- resultRole="Handler",
- resultClass="CIM_IndicationSubscription"):
- ch.DeleteInstance(ref)
- referents.append(ref)
- ch.DeleteInstance(fltr_path)
- cmpi_logging.logger.debug('removed indication filter "%s" with %d'
- ' referents', fltr_path["Name"], len(referents))
- return referents
-
-class IndicationManager(singletonmixin.Singleton):
- """
- Using ``IndicationManager`` class
- providers can send indications without bothering with handling of
- indication subscriptions.
-
- Usage:
-
- 1. Subclass CIM_InstCreation and CIM_InstModification.
-
- 2. In your initialization routine, create one ``IndicationManager``
- instance. E.g. one for whole ``LMI_Storage`` may is enough. Like
- this::
-
- indication_manager = \
- IndicationManager.get_instance(env, "Storage", "root/cimv2")
-
- 3. Call ``indication_manager.add_filters()`` with all filters your
- providers support for particular CIM class. This method can be called
- multiple times.
- For example::
-
- filters = {
- "JobPercentUpdated": {
- "Query" : "SELECT * FROM CIM_InstModification WHERE"
- " SourceInstance ISA LMI_StorageJob AND"
- " SourceInstance.CIM_ConcreteJob::PercentComplete <>"
- " PreviousInstance.CIM_ConcreteJob::PercentComplete",
- "Description" : "Modification of Percentage Complete for"
- " a Concrete Job.",
- },
- "JobSucceeded": {
- "Query" : "SELECT * FROM CIM_InstModification WHERE"
- " SourceInstance ISA LMI_StorageJob AND"
- " SourceInstance.CIM_ConcreteJob::JobState = "
- " CIM_ConcreteJob.JobState#'Completed'",
- "Description": "Modification of Operational Status for"
- " a Concrete Job to 'Complete' and 'OK'.",
- },
- #... other indications
- }
- instance_manager.add_filters("LMI_StorageJob", filters)
-
- First argument is a name of class to which indications apply. We'll call
- it *Scoping class*.
-
- 4. In your provider module, implement indication functions like this::
-
- def authorize_filter(env, fltr, ns, classes, owner):
- indication_manager.authorize_filter(env, fltr, ns, classes, owner)
-
- def activate_filter (env, fltr, ns, classes, first_activation):
- indication_manager.activate_filter(env, fltr, ns, classes,
- first_activation)
-
- def deactivate_filter(env, fltr, ns, classes, last_activation):
- indication_manager.deactivate_filter(env, fltr, ns, classes,
- last_activation)
-
- def enable_indications(env):
- indication_manager.enable_indications(env)
-
- def disable_indications(env):
- indication_manager.disable_indications(env)
-
- From now on, the ``IndicationManager`` will track all subscribed filters.
- You can query the ``indication_manager.is_subscribed()`` before you create
- and send an indication. Use ``indication_manager.send_indication()``
- to send your indications.
-
- Only static (=preconfigured, read-only) indication filters are
- supported.
-
- For user to use these preconfigured filters, they need to be installed
- at broker as instances of ``CIM_IndicationFilter``. But since they can
- not be guarded against removel by accident, this object provides a way
- to reinstall them. But using this is not recomended, since it can upset
- users. See :ref:`_update_context-label`.
-
- The supported filters must be passed to add_filters method. The filters
- are passed as dictionary ``'filter_id' -> {dictionary 'IndicationFilter
- property' -> 'value'}``. There must be at least ``Query`` property in
- each filter, CQL is assumed.
-
- This helper automatically tracks which filters are subscribed. Provider
- can query ``is_subscribed()`` to check, if filter with given
- ``filter_id`` is subscribed before generating indications.
-
- The CMPI interface to send indications is complicated -
- when an indication is send from CIMOM callback (e.g. ``get_instance``),
- it must use current ``env`` parameter of the callback and it would be
- tedious to pass it to ``IndicationManager`` each time. Therefore
- ``IndicationManager`` creates its own thread, registers it at CIMOM
- using ``PrepareAttachThread``/``AttachThread``.
-
- As side-effect, indication can be sent from any thread, there is no
- need to call ``PrepareAttachThread``/``AttachThread``.
- """
- SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity
-
- COMMAND_STOP = 1 # Command to the IndicationManager thread to stop.
-
- @cmpi_logging.trace_method
- def __init__(self, env, nameprefix, namespace, ns_interop=None,
- queue=None):
- """
- Create new ``IndicationManager``. Usually only one instance
- is necessary for one provider process.
-
- :param env: (``ProviderEnvironment``) Provider enviroment, taken
- from CIMOM callback (e.g. ``get_providers()``).
- :param nameprefix: (``string``) Prefix of your ``CIM_InstCreation``
- and ``CIM_InstModification`` subclasses, e.g. 'Storage' for
- ``LMI_StorageInstCreation``.
- :param namespace: (``string``) Namespace, which will be set to
- outgoing indications instances.
- :param ns_interop: (``string``) Namespace, where filters and
- subscriptions are stored.
- :param queue: Optional custom input queue with the same interface as
- ``Queue.Queue``.
- """
-
- # { class_name :
- # { filter_id : filter_properties
- # , ... }
- # }
- self._filters = pywbem.NocaseDict()
- self._enabled = False
- # { (class_name, filter_id), ... }
- self._subscribed_filters = set()
- self._nameprefix = nameprefix
- self._namespace = namespace
- self._ns_interop = ns_interop
- self._access_lock = threading.RLock()
- self._env = env
-
- if queue is None:
- queue = Queue()
- self._queue = queue
- # prepare indication thread
- ch = env.get_cimom_handle()
- new_broker = ch.PrepareAttachThread()
- self._indication_sender = threading.Thread(
- target=self._send_indications_loop, args=(new_broker,))
- self._indication_sender.daemon = False
- self._indication_sender.start()
-
- @property
- def enabled(self):
- """
- Return a boolean saying, whether indication sending is enabled.
- """
- with self._access_lock:
- return self.enabled
-
- @property
- def namespace(self):
- """
- Return namespace of outgoing indication instances.
- """
- return self._namespace
-
- @property
- def nameprefix(self):
- """
- Return prefix of indication class names.
- """
- return self._nameprefix
-
- @property
- def ns_interop(self):
- """
- Return interop namespace name.
- """
- with self._access_lock:
- if self._ns_interop is None:
- ch = self._env.get_cimom_handle()
- self._ns_interop = find_ns_interop(ch)
- cmpi_logging.logger.info('found interop namespace: %s',
- self._ns_interop)
- return self._ns_interop
-
- @property
- def instcreation_classname(self):
- """
- Return whole class name of InstCreation indication.
- """
- return "LMI_" + self._nameprefix + "InstCreation"
-
- @property
- def instmodification_classname(self):
- """
- Return whole class name of InstModification indication.
- """
- return "LMI_" + self._nameprefix + "InstModification"
-
- @property
- def instdeletetion_classname(self):
- """
- Return whole class name of InstDeletion indication.
- """
- return "LMI_" + self._nameprefix + "InstDeletion"
-
- @cmpi_logging.trace_method
- def _get_filter_inst(self, class_name, fltr_id):
- """
- Return instance of CIM_IndicationFilter registered in CIMOM if any.
-
- :param class_name: (``string``) *Scoping class* name.
- :param fltr_id: (``string``) Indication name.
- """
- ch = self._env.get_cimom_handle()
- cop = make_indication_filter_path(class_name, fltr_id, self.ns_interop)
- try:
- return ch.GetInstance(cop)
- except pywbem.CIMError as exc:
- if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND:
- return None
- raise
-
- @cmpi_logging.trace_method
- def _ensure_cimom_has_filter(self, class_name, fltr_id):
- """
- Ensures, that cimom has ``fltr_id`` filter registered as instance.
- If it has, but the query differs it is recreated at broker.
-
- :param class_name: (``string``) *Scoping class* name.
- :param fltr_id: (``string``) Indication name.
- """
- inst = self._get_filter_inst(class_name, fltr_id)
- ch = self._env.get_cimom_handle()
- installed = inst is not None
- referents = []
- if installed:
- for prop_name, val in self._filters[class_name][fltr_id].items():
- if inst[prop_name] != val:
- cmpi_logging.logger.info("filter \"%s\" is installed, but"
- " its property \"%s\" has outdated value;"
- " removing...", fltr_id, prop_name)
- referents = remove_cimom_filter(ch, inst.path)
- installed = False
- if not installed:
- if inst is not None:
- path = inst.path
- else:
- path = make_indication_filter_path(class_name, fltr_id,
- self.ns_interop)
- inst = pywbem.CIMInstance(path.classname, path=path)
- kwargs = FILTER_DEFAULTS.copy()
- for key, val in path.keybindings.items():
- kwargs[key] = val
- kwargs.update(self._filters[class_name][fltr_id])
- inst.update(kwargs)
- try:
- inst = ch.CreateInstance(inst)
- cmpi_logging.logger.info("filter \"%s\" installed", fltr_id)
- except pywbem.CIMError:
- cmpi_logging.logger.exception(
- "failed to install indication filter \"%s\"",
- fltr_id)
- if referents:
- cmpi_logging.logger.debug('reinstalling %d filter'
- ' subscriptions', len(referents))
- for ref in referents:
- ch.CreateInstance(ref)
- return inst
-
- @cmpi_logging.trace_method
- def _get_matching_filter(self, query):
- """
- Try to find matching filter properties in local ``_filters`` storage
- and return it. ``None`` is returned if not found.
-
- Return a tuple ``(class_name, filter_id, filter_properties)``.
-
- :param query: (``string``) Is filter query.
- """
- if not isinstance(query, basestring):
- raise TypeError("query must be a string")
- for clsname, fltrs in self._filters.iteritems():
- for fltr_id, props in fltrs.iteritems():
- if query == props["Query"]:
- return (clsname, fltr_id, props)
- return None
-
- @cmpi_logging.trace_method
- def ensure_filters_installed(self, class_name=None, fltr_id=None):
- """
- This function checks for existence of filters at broker. Filters
- must be registered with this instance before the check can be done.
- Without arguments all registered filters will be checked.
-
- :param class_name: (``string``) Name of *Scoped class* that reduces
- searched filters.
- :param fltr_id: (``string``) Indication name reducing filters that
- will be checked.
- """
- cls_to_check = self._filters.keys()
- if class_name is not None:
- cls_to_check = [class_name]
- filters_to_check = list(
- (c, f)
- for c in cls_to_check
- for f in self._filters[c].keys()
- if fltr_id is None or fltr_id == f)
- with self._access_lock:
- try:
- for clsname, fltr_id in filters_to_check:
- self._ensure_cimom_has_filter(clsname, fltr_id)
- cmpi_logging.logger.debug('filters installed')
- return True
- except pywbem.CIMError as exc:
- if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED:
- cmpi_logging.logger.error("filters could not be checked"
- " for presence due to invalid context")
- return False
- raise
-
- @cmpi_logging.trace_method
- def update_context(self, env):
- """
- .. _update_context-label
-
- When ``IndicationManager`` is initialized upon provider initialization,
- the conxet given does not contain any user credentials that are
- needed for communication with broker. In order to check for filter's
- existence at broker, this method needs to be called first with
- context containing user's credentials.
-
- This needs to be called only once.
-
- **Note** that if you don't plan to check for filter's presence at
- broker at runtime, you are not interested in this function.
- """
- with self._access_lock:
- self._env = env
-
- @cmpi_logging.trace_method
- def add_filters(self, class_name, filters, ensure_installed=False):
- """
- Add new filters to the helper. These filters will be allowed for
- subscription.
-
- :param filters: (``dictionary filter_id -> filter properties``)
- The filters. ``filter properties`` is dictionary
- ``property_name -> value``, where at least ``Query`` property
- must be set. ``Name`` property will be automatically created
- as 'LMI:<class_name>:<filter_id>'.
- :param ensure_installed: (``bool``) Whether to check for filter presence
- at broker and install them if missing. **Note** That in order
- for this to work, the context must be updated with user's
- credentials. See :ref:`update_context-label`.
- """
- with self._access_lock:
- if not class_name in self._filters:
- self._filters[class_name] = pywbem.NocaseDict()
- self._filters[class_name].update(filters)
- if ensure_installed:
- self.ensure_filters_installed(class_name=class_name)
-
- @cmpi_logging.trace_method
- def authorize_filter(self, _env, fltr, _class_name, _op, _owner):
- """
- AuthorizeFilter callback from CIMOM. Call this method from appropriate
- CIMOM callback
-
- It asks us to verify whether this filter is allowed.
-
- :param fltr: Contains the filter that must be authorized.
- :param _class_name: (``String``) Contains the class name extracted
- from the filter FROM clause.
- :param _op: The name of the class for which monitoring is required.
- Only the namespace part is set if className is a process indication.
- :param _owner The owner argument is the destination owner.
- """
- with self._access_lock:
- res = self._get_matching_filter(fltr)
- if res is not None:
- self._subscribed_filters.add((res[0], res[1]))
- cmpi_logging.logger.info("InstanceFilter %s: %s authorized",
- make_filter_name(res[0], res[1]), fltr)
- return True
- return False
-
- @cmpi_logging.trace_method
- def activate_filter(self, _env, fltr, _class_name, _class_path,
- first_activation):
- """
- ActivateFilter callback from CIMOM. Call this method from appropriate
- CIMOM callback.
-
- It ask us to begin monitoring a resource. The function shall begin
- monitoring the resource according to the filter express only.
-
- :param fltr: The filter argument contains the filter specification
- for this subscription to become active.
- :param _class_name: (``String``) The class name extracted from the filter
- FROM clause.
- :param _class_path: (``CIMInstanceName``) The name of the class for
- which monitoring is required. Only the namespace part is set if
- eventType is a process indication.
- :param first_activation: (``bool``) Set to true if this is the first
- filter for className.
- """
- with self._access_lock:
- if not first_activation:
- return
- res = self._get_matching_filter(fltr)
- if res is not None:
- self._subscribed_filters.add((res[0], res[1]))
- cmpi_logging.logger.info("InstanceFilter %s: %s started",
- make_filter_name(res[0], res[1]), fltr)
-
- @cmpi_logging.trace_method
- def deactivate_filter(self, _env, fltr, _class_name, _class_path,
- last_activation):
- """
- DeactivateFilter callback from CIMOM. Call this method from appropriate
- CIMOM callback.
-
- Informs us that monitoring using this filter should stop.
-
- :param fltr: The filter argument contains the filter specification for
- this subscription to become active.
- :param class_name: (``String``) The class name extracted from the filter
- FROM clause.
- :param class_path: (``CIMInstanceName``) class_path The name of the
- class for which monitoring is required. Only the namespace part is
- set if className is a process indication.
- :last_activation: (``bool``) Set to true if this is the last filter for
- className.
- """
- with self._access_lock:
- if not last_activation:
- return
- res = self._get_matching_filter(fltr)
- if res is not None:
- self._subscribed_filters.remove((res[0], res[1]))
- cmpi_logging.logger.info("InstanceFilter %s: %s stopped",
- make_filter_name(res[0], res[1]), fltr)
-
- @cmpi_logging.trace_method
- def enable_indications(self, _env):
- """
- EnableIndications callback from CIMOM. Call this method from
- appropriate CIMOM callback.
-
- Tells us that indications can now be generated. The MB is now prepared
- to process indications. The function is normally called by the MB after
- having done its intialization and processing of persistent subscription
- requests.
- """
- with self._access_lock:
- self._enabled = True
- cmpi_logging.logger.info("Indications enabled")
-
- @cmpi_logging.trace_method
- def disable_indications(self, _env):
- """
- EnableIndications callback from CIMOM. Call this method from
- appropriate CIMOM callback.
-
- Tells us that we should stop generating indications. MB will not accept
- any indications until enabled again. The function is normally called
- when the MB is shutting down indication services either temporarily or
- permanently.
- """
- with self._access_lock:
- self._enabled = False
- cmpi_logging.logger.info("Indications disabled")
-
- @cmpi_logging.trace_method
- def send_indication(self, indication):
- """
- Send indication to all subscribers. Call this method from appropriate
- CIMOM callback.
- """
- self._queue.put(indication)
-
- @cmpi_logging.trace_method
- def send_instcreation(self, instance, filter_id):
- """
- Send ``LMI_<nameprefix>InstCreation`` indication with given instance.
-
- :param instance: (``CIMInstance``) The created instance.
- :param filter_id: (``string``) The ID of registered filter which
- corresponds to this indication.
- """
- if not self.is_subscribed(instance.classname, filter_id):
- return
- path = pywbem.CIMInstanceName(
- classname=self.instcreation_classname,
- namespace=self.namespace)
- ind = pywbem.CIMInstance(
- self.instcreation_classname,
- path=path)
- ind['SourceInstance'] = instance
- ind['SourceInstanceHost'] = socket.gethostname()
- ind['SourceInstanceModelPath'] = str(instance.path)
- ind['IndicationFilterName'] = make_filter_name(
- instance.classname, filter_id)
- ind['PerceivedSeverity'] = self.SEVERITY_INFO
-
- cmpi_logging.logger.info("Sending indication %s for %s" %
- (ind["IndicationFilterName"], str(path)))
- self.send_indication(ind)
-
- @cmpi_logging.trace_method
- def send_instmodification(self, old_instance, new_instance, filter_id):
- """
- Send ``LMI_<nameprefix>InstModification`` indication with given
- instance.
-
- :param old_instance: (``CIMInstance``) The instance before
- modification.
- :param new_instance: (``CIMInstance``) The instance after modification.
- :param filter_id: (``string``) The ID of registered filter which
- corresponds to this indication.
- """
- if not self.is_subscribed(new_instance.classname, filter_id):
- return
- path = pywbem.CIMInstanceName(
- classname=self.instmodification_classname,
- namespace=self.namespace)
- ind = pywbem.CIMInstance(
- self.instcreation_classname,
- path=path)
- ind['SourceInstance'] = new_instance
- ind['PreviousInstance'] = old_instance
- ind['SourceInstanceHost'] = socket.gethostname()
- ind['SourceInstanceModelPath'] = str(new_instance.path)
- ind['IndicationFilterName'] = make_filter_name(
- new_instance.classname, filter_id)
- ind['PerceivedSeverity'] = self.SEVERITY_INFO
-
- cmpi_logging.logger.info("Sending indication %s for %s",
- ind["IndicationFilterName"], str(path))
- self.send_indication(ind)
-
- @cmpi_logging.trace_method
- def is_subscribed(self, class_name, fltr_id):
- """
- Return True, if there is someone subscribed for given filter.
-
- :param class_name: (``string``) *Scoping class* name.
- :param fltr_id: (``string``) ID of the filter to check.
- """
- with self._access_lock:
- if not self._enabled:
- return False
- if (class_name, fltr_id) in self._subscribed_filters:
- return True
- return False
-
- @cmpi_logging.trace_method
- def is_registered(self, class_name, fltr_id):
- """
- Return True, if filter id has been registered with current instance.
-
- :param class_name: (``string``) *Scoping class* name.
- :param fltr_id: (``string``) ID of the filter to check.
- """
- with self._access_lock:
- return (class_name in self._filters
- and fltr_id in self._filters[class_name])
-
- def _send_indications_loop(self, broker):
- """
- This method runs in its own thread. It just sends all enqueued
- indications.
-
- :param broker: (``BrokerCIMOMHandle``) Handle of the CIMOM.
- """
- broker.AttachThread()
- while True:
- command = self._queue.get()
-
- if isinstance(command, pywbem.CIMInstance) :
- indication = command
- cmpi_logging.logger.trace_info("Delivering indication %s" %
- (str(indication.path)))
- broker.DeliverIndication(self.namespace, indication)
-
- elif isinstance(command, int):
- cmpi_logging.logger.trace_info("Received command %d", command)
- if command == self.COMMAND_STOP:
- if hasattr(self._queue, "task_done"):
- self._queue.task_done()
- break
-
- if hasattr(self._queue, "task_done"):
- self._queue.task_done()
-
- cmpi_logging.logger.info("Stopped Indication thread.")
-
- @cmpi_logging.trace_method
- def shutdown(self):
- """
- Stop the thread. This method blocks until the thread is safely
- destroyed.
- """
- self._queue.put(self.COMMAND_STOP)
- self._indication_sender.join()
diff --git a/src/python/lmi/common/JobManager.py b/src/python/lmi/common/JobManager.py
deleted file mode 100644
index 5ee59a9..0000000
--- a/src/python/lmi/common/JobManager.py
+++ /dev/null
@@ -1,1670 +0,0 @@
-# Copyright (C) 2013 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Jan Safranek <jsafrane@redhat.com>
-# -*- coding: utf-8 -*-
-"""
- Basic infrastructure for asynchronous jobs. All necessary CIM classes and
- indications are implemented here.
-
- .. autoclass:: JobManager
- :members:
-
- .. autoclass:: Job
- :members:
-
- .. autoclass:: LMI_ConcreteJob
- :members:
-
- .. autoclass:: LMI_OwningJobElement
- :members:
-
- .. autoclass:: LMI_AffectedJobElement
- :members:
-
- .. autoclass:: LMI_MethodResult
- :members:
-
- .. autoclass:: LMI_AssociatedJobMethodResult
- :members:
-"""
-
-from datetime import datetime, timedelta
-import threading
-from Queue import Queue
-import pywbem
-import lmi.common.cmpi_logging as cmpi_logging
-import lmi.common
-from lmi.common.IndicationManager import IndicationManager
-from pywbem.cim_provider2 import CIMProvider2
-import socket
-import traceback
-
-@cmpi_logging.trace_function
-def register_filters(job_clsname, indication_manager=None):
- """
- This function registers static indication filters at IndicationManager.
- It should be called upon provider's initialization.
-
- :param job_clsname: (``String``) CIM class name for asynchonous jobs.
- Will be part of filter queries.
- :param indication_manager: If not given, global instance will be obtained.
- """
- if indication_manager is None:
- ind_manager = IndicationManager.get_instance()
- filters = {}
- query_args = {
- "classname" : job_clsname,
- "prefix" : indication_manager.nameprefix
- }
- for fltr_id, fltr_props in JobManager.IND_FILTERS.items():
- filters[fltr_id] = fltr_props.copy()
- filters[fltr_id]['Query'] = fltr_props['Query'] % query_args
- indication_manager.add_filters(job_clsname, filters)
-
-# Too many instance attributes
-# pylint: disable-msg=R0902
-class Job(object):
- """
- Generic abstract class representing one CIM_ConcreteJob.
- It remembers input and output arguments, affected ManagedElements and
- owning ManagedElement (to be able to create associations to them)
- and all CIM_ConcreteJob properties.
-
- Due to multiple threads processing the job, each job has its own
- lock to guard its status changes. It is expected that number of jobs
- is quite low.
- """
-
- DEFAULT_TIME_BEFORE_REMOVAL = 60 # in seconds
-
- STATE_QUEUED = 1 # Job has not started yet
- STATE_RUNNING = 2 # Job is running
- STATE_FINISHED_OK = 3 # Job finished OK
- STATE_FAILED = 4 # Job finished with error
- STATE_SUSPENDED = 5 # Job is queued and suspended
- STATE_TERMINATED = 6 # Job was queued and terminated
-
- FINAL_STATES = [STATE_FINISHED_OK, STATE_FAILED, STATE_SUSPENDED,
- STATE_TERMINATED]
-
- # There is no way how to suspend/terminate running job!
-
- @cmpi_logging.trace_method
- def __init__(self, job_manager, job_name, input_arguments,
- method_name, affected_elements, owning_element):
- """
- Create new storage job.
-
- :param job_manager: (``JobManager``) Reference to ``JobManager``, which
- will manage this job.
- :param job_name: (``string``) User-friendly name of the job.
- :param input_arguments: (``dictionary param_name -> param_value``)
- Input arguments of the method, which spawned this job.
- :param method_name: (``string``) Name of the CIM method, which spawned
- this job.
- :param affected_elements: (``array of CIMInstanceName``) List of
- affected elements. ``LMI_AffectedJobElement`` association will be
- created for them.
- :param owning_element: (``CIMInstanceName``) Reference to service, which
- spawned the job. ``LMI_OwningJobElement`` association will be
- created for it.
- """
- self.job_manager = job_manager
- self.timer_manager = job_manager.timer_manager
-
- # Unique ID
- self.the_id = job_manager.get_next_id()
-
- # User friendly name of the job
- self.job_name = job_name
-
- # Dictionary of input arguments, 'parameter_name' -> 'parameter_value'
- # The parameter value must be CIMProperty or something that can be
- # assigned to it.
- self.input_arguments = input_arguments
-
- # Dictionary of output arguments, 'parameter_name' -> 'parameter_value'
- # The parameter value must be CIMProperty or something that can be
- # assigned to it.
- self.output_arguments = None
-
- # Method return value, as CIMProperty or something that can be
- # assigned to it.
- self.return_value = None
- # Value of Job.ReturnValueType
- self.return_value_type = None
-
- # Name of the method
- self.method_name = method_name
-
- # Time when the job was created
- self.time_submitted = datetime.utcnow()
-
- # Nr. of seconds before the job is removed when the job finishes
- self.time_before_removal = self.DEFAULT_TIME_BEFORE_REMOVAL
-
- # If the job should be removed after completion
- self.delete_on_completion = True
-
- self.percent_complete = 0
-
- # State of the job
- self.job_state = self.STATE_QUEUED
-
- # Last change of job state, wall clock time
- self.clocktime_of_last_state_change = self.time_submitted
-
- # Duration of the job in RUNNING state (in seconds)
- self.elapsed_time = None
-
- # When the job started (= switched to RUNNING), wall clock time
- self.start_clocktime = None
- # When the job started (= switched to RUNNING), monotonic clock time
- self.start_monotime = None
- # When the job finished (= switched from RUNNING), monotonic clock time
- self.finish_monotime = None
-
- # Array of CIMInstanceNames of affected elements, so we can
- # enumerate associations to them.
- self.affected_elements = affected_elements
-
- # CIMInstanceName to owning element (service), so we can enumerate
- # instances.
- self.owning_element = owning_element
-
- # Timer used to delete the job after time_before_removal seconds
- self.timer = None
-
- # CIMError with result code
- self.error = None
-
- # internal lock to protect state changes from races
- self._lock = threading.RLock()
-
- self._execute = None
- self._execargs = None
- self._execkwargs = None
- self._cancel = None
- self._cancelargs = None
- self._cancelkwargs = None
-
- self._finished_event = threading.Event()
-
- @cmpi_logging.trace_method
- def set_execute_action(self, callback, *args, **kwargs):
- """
- Set callback, which will be called when the job is to be executed. It is
- expected that the callback will take some time to execute. The callback
- must change state of the job and set output parameters and error in a
- thread-safe way, i.e. by calling ``finish_method()``.
-
- :param callback: (``function``) Reference to callback to call.
- :param args, kwargs: All other parameters will be passed to the
- callback. It is highly recommended to add reference to the job
- to the callback.
- """
- self._execute = callback
- self._execargs = args
- self._execkwargs = kwargs
-
- @cmpi_logging.trace_method
- def set_cancel_action(self, callback, *args, **kwargs):
- """
- Set callbacks, which will be called when the job is to be
- cancelled. The callback must be quick, the job is already locked!
-
- :param callback: (``function``) Reference to callback to call.
- :param args, kwargs: All other parameters will be passed to the
- callback. It is highly recommended to add reference to the job
- to the callback.
- """
- self._cancel = callback
- self._cancelargs = args
- self._cancelkwargs = kwargs
-
- @cmpi_logging.trace_method
- def finish_method(self, new_state, return_value=None, return_type=None,
- output_arguments=None, error=None, affected_elements=None):
- """
- Mark the job as finished, with given return value, output parameters and
- error.
- This method is thread-safe.
-
- :param new_state: (``Job.STATE_* value``) Resulting state of the job.
- :param return_value: (``string``) Return value of the job, encoded
- into string. Can be None when the job does not return any value.
- :param return_type: (``Job.RetunValueType.* value``) Type of the return
- value. Can be None when the job does not return any value.
- :param output_arguments: (``dictionary param_name -> param_value``)
- Output arguments of the job. Can be None when the job does not have
- any output parameters.
- :param error: (``CIMError``) Error raised by the job. Can be None,
- when the job finished successfully.
- :param affected_elements: (``array of CIMInstanceName``) New list of
- affected elements to generate LMI_<name>JobAffectedElement
- association. If None, the old list, passed to constructor, remains
- untouched.
- """
- self.lock()
- self.return_value = return_value
- self.return_value_type = return_type
- self.output_arguments = output_arguments
- self.error = error
- if affected_elements is not None:
- self.affected_elements = affected_elements
- self.change_state(new_state, 100)
- self.unlock()
- self._finished_event.set()
-
- @cmpi_logging.trace_method
- def change_state(self, new_state, percent=None):
- """
- Change state of a job. (Re-)calculate various times based on the state
- change. Send indications as necessary.
- This method is thread-safe.
-
- :param new_state: (``Job.STATE_* value``) New state of the job.
- It can be the same as the previous state to indicate progress of
- the job.
- :param percent: (``int``)) Percent complete of the job. When None,
- this valuu will be automatically calculated (in progress = 50%,
- finished = 100%).
- """
- self.lock()
-
- cmpi_logging.logger.debug("Job %s: %s changes state from %d to %d"
- % (self.the_id, self.job_name, self.job_state, new_state))
-
- # For sending indications
- prev_instance = None
- send_indication = False
- indication_ids = []
-
- if self.job_state != new_state:
- # Remember to send indications
- prev_instance = self.job_manager.get_job_instance(self)
- send_indication = True
- indication_ids.append(JobManager.IND_JOB_CHANGED)
-
- # Check if the job has just finished
- if (self.job_state not in self.FINAL_STATES
- and new_state in self.FINAL_STATES):
- # Remember finish time
- self.finish_clocktime = datetime.utcnow()
- self.finish_monotime = self.timer_manager.now()
- # Remember job execution time.
- if self.start_monotime:
- self.elapsed_time = self.finish_monotime \
- - self.start_monotime
- # Send indication
- if self.job_state == self.STATE_FAILED:
- indication_ids.append(JobManager.IND_JOB_FAILED)
- if self.job_state == self.STATE_SUSPENDED:
- indication_ids.append(JobManager.IND_JOB_SUCCEEDED)
-
- # Check if the job has just started
- if new_state == self.STATE_RUNNING:
- self.start_clocktime = datetime.utcnow()
- self.start_monotime = self.timer_manager.now()
-
- self.clocktime_of_last_state_change = datetime.utcnow()
- self.job_state = new_state
-
- if percent is None:
- # guess the percentage from status
- if new_state == self.STATE_QUEUED:
- percent = 0
- elif new_state == self.STATE_RUNNING:
- percent = 50
- else:
- percent = 100
- if self.percent_complete != percent:
- # Remember to send indications
- if not send_indication:
- self.clocktime_of_last_state_change = datetime.utcnow()
- prev_instance = self.job_manager.get_job_instance(self)
- send_indication = True
- indication_ids.append(JobManager.IND_JOB_PERCENT_UPDATED)
- self.percent_complete = percent
-
- if send_indication:
- current_instance = self.job_manager.get_job_instance(self)
- self.job_manager.send_modify_indications(
- prev_instance, current_instance, indication_ids)
-
- # start / update the timer if necesasry
- self._restart_timer()
- self.unlock()
-
- @cmpi_logging.trace_method
- def _expire(self):
- """
- Callback when a Job completes and time_before_removal second passed.
- The job gets removed from its JobManager.
- """
- cmpi_logging.logger.debug("Job %s: %s expired"
- % (self.the_id, self.job_name))
-
- self.job_manager.remove_job(self)
-
- @cmpi_logging.trace_method
- def _restart_timer(self):
- """
- Re-schedule timer for TimeBeforeRemoval because some property has
- changed.
- """
- if not self.job_state in self.FINAL_STATES:
- return
-
- # Stop the old timer.
- if self.timer:
- self.timer.cancel()
- self.timer = None
-
- # Start the new timer.
- if self.delete_on_completion:
- now = self.timer_manager.now()
- passed = now - self.finish_monotime
- timeout = self.time_before_removal - passed
- if timeout <= 0:
- # Just in case...
- self._expire()
- return
-
- cmpi_logging.logger.debug("Starting timer for job %s: '%s' for %f"
- " seconds" % (self.the_id, self.job_name, timeout))
- self.timer = self.timer_manager.create_timer(
- "Job " + self.job_name,
- callback=self._expire)
- self.timer.start(timeout)
-
- @cmpi_logging.trace_method
- def lock(self):
- """
- Lock internal mutex. Other threads will block on subsequent lock().
- The lock is recursive, i.e. can be called multiple times from
- single thread.
- """
- self._lock.acquire()
-
- @cmpi_logging.trace_method
- def unlock(self):
- """ Unlock internal mutex."""
- self._lock.release()
-
- @cmpi_logging.trace_method
- def execute(self):
- """
- Start executing the job. It calls the execute callback, set by
- ``set_execute_action()``.
-
- job_state must be already set to STATE_RUNNING.
- Any exception is translated to CIMError and appropriate state is set.
- """
- try:
- self._execute(*(self._execargs), **(self._execkwargs))
- except pywbem.CIMError, error:
- cmpi_logging.logger.trace_warn("Job.execute caught an CIMError %s",
- str(error))
- cmpi_logging.logger.trace_verbose("traceback: %s",
- traceback.format_exc())
- self.finish_method(Job.STATE_FAILED, error=error)
- except Exception, ex:
- cmpi_logging.logger.trace_warn("Job.execute caught an Exception %s",
- str(ex))
- cmpi_logging.logger.trace_verbose("traceback: %s",
- traceback.format_exc())
- error = pywbem.CIMError(pywbem.CIM_ERR_FAILED, str(ex))
- self.finish_method(Job.STATE_FAILED, error=error)
-
- @cmpi_logging.trace_method
- def cancel(self):
- """
- Cancels queued action. The action must have not been started.
- """
- self.change_state(self.STATE_TERMINATED)
- if self._cancel:
- self._cancel(*(self._cancelargs), **(self._cancelkwargs))
- self._finished_event.set()
-
- @cmpi_logging.trace_method
- def get_name(self):
- """
- Return CIMInstanceName of the job.
-
- :rtype: ``CIMInstanceName``
- """
- name = pywbem.CIMInstanceName(
- classname=self.job_manager.job_classname,
- namespace=self.job_manager.namespace,
- keybindings={
- 'InstanceID': self.get_instance_id()
- })
- return name
-
- @cmpi_logging.trace_method
- def get_instance_id(self, classname=None):
- """
- Return InstanceID.
-
- :param classname: (``string``) Optional classname to generate InstanceID
- for different class, e.g. for LMI_<name>MethodResult.
- :rtype: ``string``
- """
- if classname is None:
- classname = self.job_manager.job_classname
- return 'LMI:' + classname + ':' + str(self.the_id)
-
- @cmpi_logging.trace_method
- def get_pre_call(self):
- """
- Return indication that describes the pre-execution values of the
- job's invocation.
-
- :rtype: ``CIMInstance of CIM_InstMethodCall``
- """
- path = pywbem.CIMInstanceName(
- classname="CIM_InstMethodCall",
- keybindings={},
- host=socket.gethostname(),
- namespace=self.job_manager.namespace)
- inst = pywbem.CIMInstance(
- classname="CIM_InstMethodCall",
- path=path)
- src_instance = self._get_cim_instance()
- inst['SourceInstance'] = src_instance
- inst['SourceInstanceModelPath'] = str(src_instance.path)
- inst['MethodName'] = self.method_name
- inst['MethodParameters'] = self.get_method_params(
- '__MethodParameters', True, False)
- inst['PreCall'] = True
- return inst
-
- @cmpi_logging.trace_method
- def get_cim_error(self):
- """
- Return job error as CIMInstance of CIM_Error.
- :returns: CIMInstance of CIM_Error
- """
- path = pywbem.CIMInstanceName(
- classname="CIM_Error",
- host=socket.gethostname(),
- namespace=self.job_manager.namespace)
- err = pywbem.CIMInstance(
- classname="CIM_Error",
- path=path)
- err['CIMStatusCode'] = pywbem.Uint32(self.error[0])
- err['Message'] = self.error[1]
- return err
-
- @cmpi_logging.trace_method
- def get_post_call(self):
- """
- Return indication that describes the post-execution values of the
- job's invocation.
-
- :rtype: ``CIMInstance of CIM_InstMethodCall``
- """
- path = pywbem.CIMInstanceName(
- classname="CIM_InstMethodCall",
- keybindings={},
- host=socket.gethostname(),
- namespace=self.job_manager.namespace)
- inst = pywbem.CIMInstance(
- classname="CIM_InstMethodCall",
- path=path)
-
- src_instance = self._get_cim_instance()
- inst['SourceInstance'] = src_instance
- inst['SourceInstanceModelPath'] = str(src_instance.path)
- inst['MethodName'] = self.method_name
- inst['MethodParameters'] = self.get_method_params(
- '__MethodParameters', True, True)
- inst['PreCall'] = False
-
- if self.return_value_type is not None:
- inst['ReturnValueType'] = self.return_value_type
- if self.return_value is not None:
- inst['ReturnValue'] = str(self.return_value)
- if self.error is not None:
- err = self.get_cim_error()
- inst['Error'] = [err, ]
- return inst
-
- @cmpi_logging.trace_method
- def _get_cim_instance(self):
- """
- Return CIMInstance of this job.
-
- :rtype: CIMInstance
- """
- return self.job_manager.get_job_instance(self)
-
- @cmpi_logging.trace_method
- def get_method_params(self, class_name, include_input, include_output):
- """
- Create a class of given name with all input or output parameters
- of the asynchronous method. Typically used to assemble
- CIM_ConcreteJob.JobInParameters or CIM_InstMethodCall.MethodParameters
- values.
-
- :param class_name: (``string``) Name of the class to create.
- :param input: (``boolean``) Whether input parameters should be
- included in the returned class
- :param output: (``boolean``) Whether output parameters should be
- included in the returned class
- :rtype: CIMInstance of the created class.
- """
- # TODO: this is workaround for bug #920763, use class_name
- # when it's fixed
- clsname = "CIM_ManagedElement"
- path = pywbem.CIMInstanceName(
- classname=clsname,
- namespace=self.job_manager.namespace)
- inst = pywbem.CIMInstance(classname=clsname, path=path)
- if include_input and self.input_arguments:
- for (name, value) in self.input_arguments.iteritems():
- inst[name] = value
- if include_output and self.output_arguments:
- # overwrite any input parameter
- for (name, value) in self.output_arguments.iteritems():
- inst[name] = value
- return inst
-
- @cmpi_logging.trace_method
- def wait_for_job(self, timeout=None):
- """
- Block and wait until the job completes.
-
- :param timeout: (``float``) Number of seconds to wait for the job
- to complete.
- :rtype: ``bool`` - True, when the job is finished, False if the timeout
- occurred.
- """
- return self._finished_event.wait(timeout)
-
- # pylint: disable-msg=R0903
- class ReturnValueType(object):
- """ CIM_InstMethodCall.ReturnValueType values."""
- Boolean = pywbem.Uint16(2)
- String = pywbem.Uint16(3)
- Char16 = pywbem.Uint16(4)
- Uint8 = pywbem.Uint16(5)
- Sint8 = pywbem.Uint16(6)
- Uint16 = pywbem.Uint16(7)
- Sint16 = pywbem.Uint16(8)
- Uint32 = pywbem.Uint16(9)
- Sint32 = pywbem.Uint16(10)
- Uint64 = pywbem.Uint16(11)
- Sint64 = pywbem.Uint16(12)
- Datetime = pywbem.Uint16(13)
- Real32 = pywbem.Uint16(14)
- Real64 = pywbem.Uint16(15)
- Reference = pywbem.Uint16(16)
-
-class JobManager(object):
- """
- Container of all queued, running or finished ``LMI_ConcreteJobs``.
-
- Usage:
-
- 1. Create MOF file for these classes:
-
- * ``LMI_<name>Job``
-
- * ``LMI_<name>MethodResult``
-
- * ``LMI_Affected<name>JobElement``
-
- * ``LMI_Owning<name>JobElement``
-
- * ``LMI_Associated<name>JobMethodResult``
-
- Where ``<name>`` is prefix of your classes, for example 'Storage'
-
- 2. During initialization, initialize ``TimerManager`` and create
- ``JobManager``.
-
- 3. When needed. create new Job instance:
-
- 4. Set its execute callback using ``set_execute_action()``. This callback
- will be called when the job is to be executed. It will be called in
- context of ``JobManager`` worker thread!
-
- 5. Optionally, set cancel callback using ``set_execute_action()``. This
- callback will be called when the job is still queued and is cancelled by
- application. This callback will be called in context of CIMOM callback
- and should be quick!
-
- 6. Enqueue the job using ``JobManager.add_job()`` method.
-
- 7. When your execute callback is called, you can optionally call
- ``job.change_state()`` to update percentage of completion.
-
- 8. When your execute callback is finished, don't forget to set method
- result using ``job.finish_method()``.
-
- * ``JobManager`` automatically sends all job-related indications.
- * ``Job`` automatically tracks various timestamps.
- * By default, the job automatically disappears after 60 seconds after it
- finishes. Application may set ``DeleteOnCompletion`` and
- ``TimeBeforeRemoval`` properties of ``LMI_<name>Job`` to override this
- timeout.
- """
-
- COMMAND_STOP = 1
-
- IND_JOB_PERCENT_UPDATED = "PercentUpdated"
- IND_JOB_SUCCEEDED = "Succeeded"
- IND_JOB_FAILED = "Failed"
- IND_JOB_CHANGED = "Changed"
- IND_JOB_CREATED = "Created"
-
- IND_FILTERS = {
- IND_JOB_PERCENT_UPDATED: {
- "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
- "SourceInstance ISA %(classname)s AND "
- "SourceInstance.CIM_ConcreteJob::PercentComplete <> "
- "PreviousInstance.CIM_ConcreteJob::PercentComplete",
- "Description" : "Modification of Percentage Complete for a "
- "Concrete Job.",
- },
- IND_JOB_SUCCEEDED: {
- "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
- "SourceInstance ISA %(classname)s AND "
- "SourceInstance.CIM_ConcreteJob::JobState = 17",
- "Description": "Modification of Job State for a "
- "Concrete Job to 'Complete'.",
- },
- IND_JOB_FAILED: {
- "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
- "SourceInstance ISA %(classname)s AND "
- "SourceInstance.CIM_ConcreteJob::JobState = 10",
- "Description": "Modification of Job State for a "
- "Concrete Job to 'Exception'.",
- },
- IND_JOB_CHANGED: {
- "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE "
- "SourceInstance ISA %(classname)s AND "
- "SourceInstance.CIM_ConcreteJob::JobState <> "
- "PreviousInstance.CIM_ConcreteJob::JobState",
- "Description": "Modification of Job State for a ConcreteJob.",
- },
- IND_JOB_CREATED: {
- "Query" : "SELECT * FROM LMI_%(prefix)sInstCreation WHERE "
- "SourceInstance ISA %(classname)s",
- "Description": "Creation of a ConcreteJob.",
- },
- }
-
- @cmpi_logging.trace_method
- def __init__(self, name, namespace, indication_manager, timer_manager):
- """
- Initialize new Manager. It automatically registers all job-related
- filters to indication_manager and starts a worker thread.
-
- :param name: (``string``) String with classname infix. For example
- 'Storage' for ``LMI_StorageJob``, ``LMI_StorageJobMethodResult``
- etc.
- :param namespace: (``string``) Namespace of all providers.
- :param indication_manager: (``IndicationManager``): a manager where
- indications and filters should be added.
- :param timer_manager: (``TimerManager``): Timer manager instance.
- """
- # List of all jobs. Dictionary job_id -> Job.
- self.jobs = {}
- # Queue of jobs scheduled to execute.
- self.queue = Queue()
- # Last created job_id.
- self.last_instance_id = 0
- # Classname infix.
- self.name = name
- # CIMProvider2 instances for job classes.
- self.providers = {}
- self.namespace = namespace
- self.indication_manager = indication_manager
- self.timer_manager = timer_manager
-
- # Start the worker thread (don't forget to register it at CIMOM)
- self.worker = threading.Thread(target=self._worker_main)
- self.worker.daemon = False
- self.worker.start()
-
- # Various classnames for job-related classes, with correct infixes.
- self.job_classname = 'LMI_' + self.name + 'Job'
- self.method_result_classname = "LMI_" + self.name + "MethodResult"
- self.affected_classname = "LMI_Affected" + self.name + "JobElement"
- self.owning_classname = "LMI_Owning" + self.name + "JobElement"
- self.associated_result_classname = ('LMI_Associated' + self.name
- + 'JobMethodResult')
- self.indication_filter_classname = ('LMI_' + self.name
- + 'JobIndicationFilter')
- self.job_provider = None
- self._add_indication_filters()
-
- @cmpi_logging.trace_method
- def _add_indication_filters(self):
- """
- Add all job-related ``IndicationFilters`` to indication manager.
- """
- register_filters(self.job_classname, self.indication_manager)
-
- @cmpi_logging.trace_method
- def get_providers(self):
- """
- Get dictionary of providers for these classes:
-
- * ``LMI_<name>Job``
- * ``LMI_<name>MethodResult``
- * ``LMI_Affected<name>JobElement``
- * ``LMI_Owning<name>JobElement``
- * ``LMI_Associated<name>JobMethodResult``
-
- :rtype: dictionary class_name -> CIMProvider2
- """
-
- if not self.providers:
- job_provider = LMI_ConcreteJob(self.job_classname, job_manager=self)
- self.providers[self.job_classname] = job_provider
- self.job_provider = job_provider
-
- provider = LMI_MethodResult(
- self.method_result_classname, job_manager=self)
- self.providers[self.method_result_classname] = provider
-
- provider = LMI_AffectedJobElement(
- self.affected_classname, job_manager=self)
- self.providers[self.affected_classname] = provider
-
- provider = LMI_OwningJobElement(
- self.owning_classname, job_manager=self)
- self.providers[self.owning_classname] = provider
-
- provider = LMI_AssociatedJobMethodResult(
- self.owning_classname, job_manager=self)
- self.providers[self.associated_result_classname] = provider
-
- return self.providers
-
- @cmpi_logging.trace_method
- def add_job(self, job):
- """
- Enqueue new job. Send indication when needed.
-
- :param job: (``Job``) A job to enqueue.
- """
- cmpi_logging.logger.debug("Job %s: '%s' enqueued"
- % (job.the_id, job.job_name))
-
- self.jobs[job.the_id] = job
- self.queue.put(job)
- # send indication
- if self.indication_manager.is_subscribed(
- self.job_classname, self.IND_JOB_CREATED):
- job_instance = self.get_job_instance(job)
- self.indication_manager.send_instcreation(
- job_instance, self.IND_JOB_CREATED)
-
- def send_modify_indications(self, prev_instance, current_instance,
- indication_ids):
- """
- Send InstModification. This is helper method called by ``Job`` when
- needed.
-
- :param prev_instance: Instance of ``LMI_<name>Job`` before it was
- modified.
- :param current_instance: Instance of ``LMI_<name>Job`` after it was
- modified.
- """
- for _id in indication_ids:
- self.indication_manager.send_instmodification(prev_instance,
- current_instance, _id)
-
- @cmpi_logging.trace_method
- def remove_job(self, job):
- """
- Remove existing job. Note that jobs are removed automatically after a
- timeout, providers should not call this method directly.
-
- :param job: (``Job``) Job to remove.
- """
- cmpi_logging.logger.debug("Job %s: '%s' removed from queue."
- % (job.the_id, job.job_name))
- del self.jobs[job.the_id]
- # The job may still be in the queue!
- # There is no way, how to remove it, it will be skipped by the
- # worker thread.
-
- @cmpi_logging.trace_method
- def get_job_for_instance_id(self, instance_id, classname=None):
- """
- Return Job for given InstanceID or None when no such Job exist.
-
- :param instance_id: (``string``) InstanceID value to parse.
- :param classname: (``string``) Optional classname to parse the
- InstanceID (e.g. when parsing InstanceID of
- ``LMI_<name>MethodResult``).
- :rtype: ``Job``
- """
- if classname is None:
- classname = self.job_classname
- the_id = lmi.common.parse_instance_id(instance_id, classname)
- if not the_id.isdigit():
- return None
- return self.jobs.get(the_id, None)
-
- @cmpi_logging.trace_method
- def _worker_main(self):
- """
- This is the main loop of the job queue. It just processes enqueued
- jobs and never ends.
- """
- cmpi_logging.logger.info("Started Job thread.")
- while True:
- command = self.queue.get()
- if isinstance(command, Job):
- # we need to protect from changes between checking state and
- # setting new state
- job = command
- job.lock()
- if job.job_state == Job.STATE_QUEUED:
- # the job was not cancelled
- job.change_state(Job.STATE_RUNNING)
- job.unlock()
- cmpi_logging.logger.info("Starting job %s: '%s'" %
- (job.the_id, job.job_name))
-
- job.execute()
- if job.error:
- cmpi_logging.logger.warn("Job %s: '%s' finished with error:"
- " %s" % (job.the_id, job.job_name, str(job.error)))
- else:
- cmpi_logging.logger.info("Job %s: '%s' finished OK" %
- (job.the_id, job.job_name))
- else:
- # just skip suspended and terminated jobs
- job.unlock()
-
- elif isinstance(command, int):
- self.queue.task_done()
- break
-
- self.queue.task_done()
-
- cmpi_logging.logger.info("Stopped Job thread.")
-
- @cmpi_logging.trace_method
- def get_next_id(self):
- """
- Return next unused job id.
-
- :rtype: string
- """
- self.last_instance_id += 1
- return str(self.last_instance_id)
-
- @cmpi_logging.trace_method
- def get_job_instance(self, job):
- """
- Return CIMInstance for given job.
-
- :param job: (``Job``)
- :rtype: ``CIMInstance``
- """
- path = pywbem.CIMInstanceName(
- classname=self.job_classname,
- keybindings={'InstanceID': job.get_instance_id()},
- host=socket.gethostname(),
- namespace=self.namespace)
- inst = pywbem.CIMInstance(classname=self.job_classname, path=path)
- inst['InstanceID'] = job.get_instance_id()
- return self.job_provider.get_instance(None, inst)
-
- @cmpi_logging.trace_method
- def shutdown(self, timeout=1):
- """
- Stop the thread. If a job is running, it may leave the job process
- (mkfs, resize2fs, ...) and the worker thread (waiting for the process to
- finish) still running.
-
- JobManager still needs Indication Manager and TimeManager working at
- this point!
-
- :param timeout: Nr. of seconds to wait for the current job. Afterwards
- the thread is abandoned, leaving the process still running.
- """
- # Empty the queue, we don't want the worker to proceed with any other
- # queued job.
- while not self.queue.empty():
- queue.get(False)
- queue.task_done()
-
- self.queue.put(self.COMMAND_STOP)
- self.worker.join(timeout)
-
- # Cancel all running/suspended/queued jobs.
- # This will send indications.
- for job in self.jobs.itervalues():
- if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED,
- Job.STATE_RUNNING):
- job.cancel()
-
- if self.worker.isAlive():
- # There is no way, how to stop the thread in Python, so abandon it.
- self.worker.daemon = True
- self.indication_manager = None
- self.timer_manager = None
-
- def can_shutdown(self):
- """
- Return True, if there is no running Job.
- """
- return self.queue.empty()
-
-
-class LMI_ConcreteJob(CIMProvider2):
- """
- Provider of LMI_ConcreteJob class or its subclass.
- """
- @cmpi_logging.trace_method
- def __init__(self, classname, job_manager):
- self.classname = classname
- self.job_manager = job_manager
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """
- Provider implementation of EnumerateInstances intrinsic method.
- """
- model.path.update({'InstanceID': None})
- for job in self.job_manager.jobs.values():
- model['InstanceID'] = job.get_instance_id()
- if keys_only:
- yield model
- else:
- yield self.get_instance(env, model, job)
-
- @cmpi_logging.trace_method
- def get_job_states(self, job):
- """
- Return JobState and OperationalStatus property values.
-
- :param job: (``int``) Job.STATE_* value.
- :rtype: tuple ``(JobState, OperationalStatus)`` values.
- """
- if job.job_state == Job.STATE_QUEUED:
- jobstate = self.Values.JobState.New
- opstate = [self.Values.OperationalStatus.Dormant]
- elif job.job_state == Job.STATE_RUNNING:
- jobstate = self.Values.JobState.Running
- opstate = [self.Values.OperationalStatus.OK]
- elif job.job_state == Job.STATE_FINISHED_OK:
- jobstate = self.Values.JobState.Completed
- opstate = [self.Values.OperationalStatus.OK,
- self.Values.OperationalStatus.Completed]
- elif job.job_state == Job.STATE_SUSPENDED:
- jobstate = self.Values.JobState.Suspended
- opstate = [self.Values.OperationalStatus.OK]
- elif job.job_state == Job.STATE_FAILED:
- jobstate = self.Values.JobState.Exception
- opstate = [self.Values.OperationalStatus.Error,
- self.Values.OperationalStatus.Completed]
- elif job.job_state == Job.STATE_TERMINATED:
- jobstate = self.Values.JobState.Terminated
- opstate = [self.Values.OperationalStatus.Stopped]
- return jobstate, opstate
-
- @cmpi_logging.trace_method
- # pylint: disable-msg=W0221
- def get_instance(self, env, model, job=None):
- """
- Provider implementation of GetInstance intrinsic method.
- """
- if not job:
- instance_id = model['InstanceID']
- job = self.job_manager.get_job_for_instance_id(instance_id)
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
-
- model['DeleteOnCompletion'] = job.delete_on_completion
- model['Name'] = job.job_name
-
- # convert seconds to timedelta
- seconds = job.time_before_removal
- if seconds:
- delta = timedelta(seconds=seconds)
- model['TimeBeforeRemoval'] = pywbem.CIMDateTime(delta)
- else:
- model['TimeBeforeRemoval'] = pywbem.CIMProperty(
- name='TimeBeforeRemoval',
- value=None,
- type='datetime')
-
- if job.clocktime_of_last_state_change:
- model['TimeOfLastStateChange'] = pywbem.CIMDateTime(
- job.clocktime_of_last_state_change)
- else:
- model['TimeOfLastStateChange'] = pywbem.CIMProperty(
- name='TimeOfLastStateChange',
- value=None,
- type='datetime')
-
- if job.elapsed_time:
- elapsed_time = timedelta(seconds=job.elapsed_time)
- model['ElapsedTime'] = pywbem.CIMDateTime(elapsed_time)
- else:
- model['ElapsedTime'] = pywbem.CIMProperty(
- name='ElapsedTime',
- value=None,
- type='datetime')
-
- model['Description'] = job.job_name
- model['LocalOrUtcTime'] = self.Values.LocalOrUtcTime.UTC_Time
- model['PercentComplete'] = pywbem.Uint16(job.percent_complete)
- if job.start_clocktime:
- model['StartTime'] = pywbem.CIMDateTime(job.start_clocktime)
- else:
- model['StartTime'] = pywbem.CIMProperty(
- name='StartTime',
- value=None,
- type='datetime')
-
- if job.input_arguments:
- model['JobInParameters'] = job.get_method_params(
- "__JobInParameters", True, False)
-
- if job.job_state in Job.FINAL_STATES:
- # assemble output parameters with return value
- outparams = job.get_method_params("__JobOutParameters", False, True)
- if job.return_value is not None:
- outparams['__ReturnValue'] = job.return_value
- model['JobOutParameters'] = outparams
-
- model['TimeSubmitted'] = pywbem.CIMDateTime(job.time_submitted)
- # set correct state
- jobstate, opstate = self.get_job_states(job)
- model['JobState'] = jobstate
- model['OperationalStatus'] = opstate
- return model
-
- @cmpi_logging.trace_method
- def set_instance(self, env, instance, modify_existing):
- """Return a newly created or modified instance.
-
- :param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param instance: The new pywbem.CIMInstance. If modifying an existing
- instance, the properties on this instance have been filtered by
- the PropertyList from the request.
- :param modify_existing: True if ModifyInstance, False if CreateInstance
-
- Return the new instance. The keys must be set on the new instance.
- """
- if not modify_existing:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
- "Creation of Job instances is not supported.")
-
- job = self.job_manager.get_job_for_instance_id(instance['InstanceID'])
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
-
- try:
- job.lock()
- restart_timer = False
-
- for (key, value) in instance.iteritems():
- if value is None:
- continue
- if key == 'DeleteOnCompletion':
- job.delete_on_completion = value
- restart_timer = True
- elif key == 'TimeBeforeRemoval':
- job.time_before_removal = value.total_seconds()
- restart_timer = True
- elif key == 'JobRunTimes':
- if value != 1:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
- "JobRunTimes property is not supported.")
- elif key == 'LocalOrUtcTime':
- if value != self.Values.LocalOrUtcTime.UTC_Time:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
- "Setting of LocalOrUtcTime property is not"
- " supported.")
- else:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
- "Setting of %s property is not supported." % (key,))
-
- if restart_timer:
- job._restart_timer()
- finally:
- job.unlock()
- return instance
-
- @cmpi_logging.trace_method
- def delete_instance(self, env, instance_name):
- """Delete an instance.
-
- :param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param instance_name: A pywbem.CIMInstanceName specifying the instance
- to delete.
- """
- job = self.job_manager.get_job_for_instance_id(
- instance_name['InstanceID'])
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
- if not job.job_status in Job.FINAL_STATES:
- raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
- "Job has not finished.")
-
- self.job_manager.remove_job(job)
-
- @cmpi_logging.trace_method
- def cim_method_geterrors(self, env, object_name):
- """Implements LMI_StorageJob.GetErrors()
-
- If JobState is "Completed" and Operational Status is "Completed"
- then no instance of CIM_Error is returned.
-
- If JobState is "Exception" then GetErrors may return intances of
- CIM_Error related to the execution of the procedure or method invoked by
- the job.
-
- If Operatational Status is not "OK" or "Completed" then
- GetErrors may return CIM_Error instances related to the running of
- the job.
-
- :param env: -- Provider Environment (pycimmb.ProviderEnvironment)
- :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method GetErrors()
- should be invoked.
-
- Output parameters:
-
- * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...))
- If the OperationalStatus on the Job is not "OK", then this
- method will return one or more CIM Error instance(s).
- Otherwise, when the Job is "OK", null is returned.
- """
- job = self.job_manager.get_job_for_instance_id(
- object_name['InstanceID'])
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
-
- if job.error is None:
- errors = []
- else:
- err = job.get_cim_error()
- errors = [err, ]
- out_params = [
- pywbem.CIMParameter(
- name='Errors',
- value=errors,
- type='instance',
- is_array=True,
- array_size=len(errors))
- ]
- rval = self.Values.GetErrors.Success
-
- return (rval, out_params)
-
- @cmpi_logging.trace_method
- def cim_method_requeststatechange(self, env, object_name,
- param_requestedstate=None,
- param_timeoutperiod=None):
- """Implements LMI_StorageJob.RequestStateChange()
-
- Requests that the state of the job be changed to the value
- specified in the RequestedState parameter. Invoking the
- RequestStateChange method multiple times could result in earlier
- requests being overwritten or lost.
-
- If 0 is returned, then the
- task completed successfully. Any other return code indicates an
- error condition.
-
- :param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method RequestStateChange()
- should be invoked.
- :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState)
- RequestStateChange changes the state of a job. The possible
- values are as follows: Start (2) changes the state to
- \'Running\'. Suspend (3) stops the job temporarily. The
- intention is to subsequently restart the job with \'Start\'.
- It might be possible to enter the \'Service\' state while
- suspended. (This is job-specific.) Terminate (4) stops the
- job cleanly, saving data, preserving the state, and shutting
- down all underlying processes in an orderly manner. Kill (5)
- terminates the job immediately with no requirement to save
- data or preserve the state. Service (6) puts the job into a
- vendor-specific service state. It might be possible to restart
- the job.
-
- :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime)
- A timeout period that specifies the maximum amount of time that
- the client expects the transition to the new state to take.
- The interval format must be used to specify the TimeoutPeriod.
- A value of 0 or a null parameter indicates that the client has
- no time requirements for the transition. If this property
- does not contain 0 or null and the implementation does not
- support this parameter, a return code of \'Use Of Timeout
- Parameter Not Supported\' must be returned.
- """
- job = self.job_manager.get_job_for_instance_id(
- object_name['InstanceID'])
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
-
- try:
- job.lock()
- states = self.Values.RequestStateChange.RequestedState
- retcodes = self.Values.RequestStateChange
- if param_requestedstate == states.Suspend:
- if job.job_state != Job.STATE_QUEUED:
- # Can suspend only queued jobs
- rval = retcodes.Invalid_State_Transition
- else:
- job.change_state(Job.STATE_SUSPENDED)
- rval = retcodes.Completed_with_No_Error
-
- elif param_requestedstate == states.Terminate:
- if job.job_state not in (Job.STATE_QUEUED, Job.STATE_SUSPENDED):
- # Can terminate only queued or suspended jobs
- rval = retcodes.Invalid_State_Transition
- else:
- job.cancel()
- rval = retcodes.Completed_with_No_Error
-
- elif param_requestedstate == states.Start:
- if job.job_state != Job.STATE_SUSPENDED:
- # Can start only suspended jobs
- rval = retcodes.Invalid_State_Transition
- else:
- job.change_state(Job.STATE_QUEUED)
- # Enqueue the job again, it may be already processed
- # (we might get the job in the queue twice, but
- # we have only one worker thread so it won't collide).
- self.job_manager.add_job(job)
- rval = retcodes.Completed_with_No_Error
-
- else:
- rval = retcodes.Invalid_State_Transition
- finally:
- job.unlock()
- return (rval, [])
-
- @cmpi_logging.trace_method
- def cim_method_killjob(self, env, object_name,
- param_deleteonkill=None):
- """Implements LMI_StorageJob.KillJob() """
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
-
- @cmpi_logging.trace_method
- def cim_method_geterror(self, env, object_name):
- """Implements LMI_StorageJob.GetError()
-
- GetError is deprecated because Error should be an array,not a
- scalar.
-
- When the job is executing or has terminated without
- error, then this method returns no CIM_Error instance. However, if
- the job has failed because of some internal problem or because the
- job has been terminated by a client, then a CIM_Error instance is
- returned.
-
- :param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method GetError()
- should be invoked.
-
- Output parameters:
-
- * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``)
- If the OperationalStatus on the Job is not "OK", then this
- method will return a CIM Error instance. Otherwise, when the
- Job is "OK", null is returned.
- """
- job = self.job_manager.get_job_for_instance_id(
- object_name['InstanceID'])
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
-
- if job.error is None:
- error = pywbem.CIMParameter(
- name='error',
- value=None,
- type='instance',
- is_array=False)
- else:
- err = job.get_cim_error()
- error = pywbem.CIMParameter(
- name='error',
- value=err,
- type='instance')
- rval = self.Values.GetError.Success
- return (rval, [error])
-
- class Values(object):
- class JobState(object):
- New = pywbem.Uint16(2)
- Starting = pywbem.Uint16(3)
- Running = pywbem.Uint16(4)
- Suspended = pywbem.Uint16(5)
- Shutting_Down = pywbem.Uint16(6)
- Completed = pywbem.Uint16(7)
- Terminated = pywbem.Uint16(8)
- Killed = pywbem.Uint16(9)
- Exception = pywbem.Uint16(10)
- Service = pywbem.Uint16(11)
- Query_Pending = pywbem.Uint16(12)
- # DMTF_Reserved = 13..32767
- # Vendor_Reserved = 32768..65535
-
- class LocalOrUtcTime(object):
- Local_Time = pywbem.Uint16(1)
- UTC_Time = pywbem.Uint16(2)
-
- class OperationalStatus(object):
- Unknown = pywbem.Uint16(0)
- Other = pywbem.Uint16(1)
- OK = pywbem.Uint16(2)
- Degraded = pywbem.Uint16(3)
- Stressed = pywbem.Uint16(4)
- Predictive_Failure = pywbem.Uint16(5)
- Error = pywbem.Uint16(6)
- Non_Recoverable_Error = pywbem.Uint16(7)
- Starting = pywbem.Uint16(8)
- Stopping = pywbem.Uint16(9)
- Stopped = pywbem.Uint16(10)
- In_Service = pywbem.Uint16(11)
- No_Contact = pywbem.Uint16(12)
- Lost_Communication = pywbem.Uint16(13)
- Aborted = pywbem.Uint16(14)
- Dormant = pywbem.Uint16(15)
- Supporting_Entity_in_Error = pywbem.Uint16(16)
- Completed = pywbem.Uint16(17)
- Power_Mode = pywbem.Uint16(18)
- Relocating = pywbem.Uint16(19)
- # DMTF_Reserved = ..
- # Vendor_Reserved = 0x8000..
-
- class GetErrors(object):
- Success = pywbem.Uint32(0)
- Not_Supported = pywbem.Uint32(1)
- Unspecified_Error = pywbem.Uint32(2)
- Timeout = pywbem.Uint32(3)
- Failed = pywbem.Uint32(4)
- Invalid_Parameter = pywbem.Uint32(5)
- Access_Denied = pywbem.Uint32(6)
- # DMTF_Reserved = ..
- # Vendor_Specific = 32768..65535
-
- class GetError(object):
- Success = pywbem.Uint32(0)
- Not_Supported = pywbem.Uint32(1)
- Unspecified_Error = pywbem.Uint32(2)
- Timeout = pywbem.Uint32(3)
- Failed = pywbem.Uint32(4)
- Invalid_Parameter = pywbem.Uint32(5)
- Access_Denied = pywbem.Uint32(6)
- # DMTF_Reserved = ..
- # Vendor_Specific = 32768..65535
-
- class RequestStateChange(object):
- Completed_with_No_Error = pywbem.Uint32(0)
- Not_Supported = pywbem.Uint32(1)
- Unknown_Unspecified_Error = pywbem.Uint32(2)
- Can_NOT_complete_within_Timeout_Period = pywbem.Uint32(3)
- Failed = pywbem.Uint32(4)
- Invalid_Parameter = pywbem.Uint32(5)
- In_Use = pywbem.Uint32(6)
- # DMTF_Reserved = ..
- Method_Parameters_Checked___Transition_Started = pywbem.Uint32(4096)
- Invalid_State_Transition = pywbem.Uint32(4097)
- Use_of_Timeout_Parameter_Not_Supported = pywbem.Uint32(4098)
- Busy = pywbem.Uint32(4099)
- # Method_Reserved = 4100..32767
- # Vendor_Specific = 32768..65535
- class RequestedState(object):
- Start = pywbem.Uint16(2)
- Suspend = pywbem.Uint16(3)
- Terminate = pywbem.Uint16(4)
- Kill = pywbem.Uint16(5)
- Service = pywbem.Uint16(6)
- # DMTF_Reserved = 7..32767
- # Vendor_Reserved = 32768..65535
-
-class LMI_OwningJobElement(CIMProvider2):
- """ Instrumentation of LMI_OwningJobElement class and its subclasses."""
-
- @cmpi_logging.trace_method
- def __init__(self, classname, job_manager):
- self.classname = classname
- self.job_manager = job_manager
-
- @cmpi_logging.trace_method
- def get_instance(self, env, model):
- """Return an instance."""
- instance_id = model['OwnedElement']['InstanceID']
- job = self.job_manager.get_job_for_instance_id(instance_id)
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "OwnedElement not found.")
-
- if job.owning_element != model['OwningElement']:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "OwnedElement is not associated to OwningElement.")
- return model
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """Enumerate instances."""
- model.path.update({'OwnedElement': None, 'OwningElement': None})
- for job in self.job_manager.jobs.values():
- if job.owning_element:
- model['OwnedElement'] = job.get_name()
- model['OwningElement'] = job.owning_element
- yield model
-
- @cmpi_logging.trace_method
- def references(self, env, object_name, model, result_class_name, role,
- result_role, keys_only):
- """Instrument Associations."""
- ch = env.get_cimom_handle()
- if ch.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super='CIM_ManagedElement') or \
- ch.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super=self.job_manager.job_classname):
- return self.simple_refs(env, object_name, model,
- result_class_name, role, result_role, keys_only)
-
-class LMI_AffectedJobElement(CIMProvider2):
- """ Instrumentation of LMI_AffectedJobElement class and its subclasses."""
-
- @cmpi_logging.trace_method
- def __init__(self, classname, job_manager):
- self.classname = classname
- self.job_manager = job_manager
-
- @cmpi_logging.trace_method
- def get_instance(self, env, model):
- """Return an instance."""
- instance_id = model['AffectingElement']['InstanceID']
- job = self.job_manager.get_job_for_instance_id(instance_id)
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "AffectingElement not found.")
-
- if job.affected_elements is None:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "The AffectingElement has no AffectedElement.")
- if model['AffectedElement'] not in job.affected_elements:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "AffectedElement is not associated to AffectingElement.")
- model['ElementEffects'] = [self.Values.ElementEffects.Unknown, ]
- return model
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """Enumerate instances."""
- model.path.update({'AffectingElement': None, 'AffectedElement': None})
- for job in self.job_manager.jobs.values():
- if job.affected_elements is None:
- continue
- for element in job.affected_elements:
- model['AffectingElement'] = job.get_name()
- model['AffectedElement'] = element
- if keys_only:
- yield model
- else:
- yield self.get_instance(env, model)
-
- @cmpi_logging.trace_method
- def references(self, env, object_name, model, result_class_name, role,
- result_role, keys_only):
- """Instrument Associations."""
- ch = env.get_cimom_handle()
- if ch.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super='CIM_ManagedElement') or \
- ch.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super=self.job_manager.job_classname):
- return self.simple_refs(env, object_name, model,
- result_class_name, role, result_role, keys_only)
-
- class Values(object):
- class ElementEffects(object):
- Unknown = pywbem.Uint16(0)
- Other = pywbem.Uint16(1)
- Exclusive_Use = pywbem.Uint16(2)
- Performance_Impact = pywbem.Uint16(3)
- Element_Integrity = pywbem.Uint16(4)
- Create = pywbem.Uint16(5)
-
-
-class LMI_MethodResult(CIMProvider2):
- """Instrumentation of LMI_MethodResult class and its subclasses."""
-
- @cmpi_logging.trace_method
- def __init__(self, classname, job_manager):
- self.classname = classname
- self.job_manager = job_manager
-
- @cmpi_logging.trace_method
- # pylint: disable-msg=W0221
- def get_instance(self, env, model, job=None):
- """Return an instance."""
- if not job:
- instance_id = model['InstanceID']
- job = self.job_manager.get_job_for_instance_id(
- instance_id, self.classname)
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
-
- model['Description'] = job.job_name
- if job.job_state in Job.FINAL_STATES:
- model['PostCallIndication'] = pywbem.CIMProperty(
- name='PostCallIndication',
- value=job.get_post_call())
- else:
- model['PostCallIndication'] = pywbem.CIMProperty(
- name='PostCallIndication',
- type='instance',
- value=None)
- model['PreCallIndication'] = pywbem.CIMProperty(
- name='PreCallIndication',
- value=job.get_pre_call())
- return model
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """Enumerate instances."""
- model.path.update({'InstanceID': None})
- for job in self.job_manager.jobs.values():
- model['InstanceID'] = job.get_instance_id(
- classname=self.classname)
- if keys_only:
- yield model
- else:
- yield self.get_instance(env, model, job)
-
-class LMI_AssociatedJobMethodResult(CIMProvider2):
- """
- Instrumentation of LMI_AssociatedJobMethodResult class and its
- subclasses.
- """
-
- @cmpi_logging.trace_method
- def __init__(self, classname, job_manager):
- self.classname = classname
- self.job_manager = job_manager
-
- @cmpi_logging.trace_method
- def get_instance(self, env, model):
- """Return an instance."""
- instance_id = model['Job']['InstanceID']
- job = self.job_manager.get_job_for_instance_id(instance_id)
- if not job:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job not found.")
-
- expected_result_id = job.get_instance_id(
- classname=self.job_manager.method_result_classname)
- if model['JobParameters']['InstanceID'] != expected_result_id:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Job is not associated to JobParameters.")
- return model
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """Enumerate instances."""
- model.path.update({'JobParameters': None, 'Job': None})
- for job in self.job_manager.jobs.values():
- if job.owning_element:
- model['Job'] = job.get_name()
- model['JobParameters'] = pywbem.CIMInstanceName(
- classname=self.job_manager.method_result_classname,
- namespace=self.job_manager.namespace,
- keybindings={
- 'InstanceID': job.get_instance_id(
- classname=self.job_manager.method_result_classname)
- })
- yield model
-
- @cmpi_logging.trace_method
- def references(self, env, object_name, model, result_class_name, role,
- result_role, keys_only):
- """Instrument Associations."""
- ch = env.get_cimom_handle()
- if ch.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super=self.job_manager.method_result_classname) or \
- ch.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super=self.job_manager.job_classname):
- return self.simple_refs(env, object_name, model,
- result_class_name, role, result_role, keys_only)
-
diff --git a/src/python/lmi/common/TimerManager.py b/src/python/lmi/common/TimerManager.py
deleted file mode 100644
index 01e53b8..0000000
--- a/src/python/lmi/common/TimerManager.py
+++ /dev/null
@@ -1,421 +0,0 @@
-# Copyright (C) 2013 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Jan Safranek <jsafrane@redhat.com>
-# -*- coding: utf-8 -*-
-"""
-Module with functionality to create timers, which can be used in CMPI providers.
-
-Default python threading.Timer is not suitable, because it creates thread
-for each timer, which is inefficient. In addition, each such thread would need
-to be registered at CIMOM to enable logging in timer callbacks.
-
-Usage:
-
-1. Initialize the TimerManager when your provider initializes!
-Otherwise you may encounter weird exceptions.
-
-2. When any provider needs timer, create it using Time.create_timer() to create
-Timer instance.
-
-3. Call Timer.start() to start the timer. It will call registered callback
-when the timer expires. The callback is called in context of TimerManager
-thread, which has enabled logging to CIMOM, i.e. the callback can log as usual.
-
-4. (optionally) cancel the timer before expiration using Timer.cancel().
-However, this does not guarantee that the timer callback won't be called -
-it may be already being scheduled / called.
-
-.. autoclass:: TimerManager
- :members:
-
-.. autoclass:: Timer
- :members:
-
-.. autoclass:: MonotonicClock
- :members:
-"""
-
-import ctypes
-import lmi.common.singletonmixin as singletonmixin
-import threading
-import Queue
-import lmi.common.cmpi_logging as cmpi_logging
-
-class TimerException(Exception):
- pass
-
-class MonotonicClock(object):
- """
- Monotonic clock, represented by clock_gettime() and CLOCK_MONOTONIC.
- This clock is not influenced by NTP or administrator setting time or date.
- """
- CLOCK_MONOTONIC = ctypes.c_int(1)
-
- class timespec(ctypes.Structure):
- _fields_ = [
- ("tv_sec", ctypes.c_long),
- ("tv_nsec", ctypes.c_long)]
-
- def __init__(self):
- libc = ctypes.CDLL("librt.so.1")
- self._clock_gettime = libc.clock_gettime
-
- def now(self):
- """
- Return current time, i.e. float representing seconds with precision up
- to nanoseconds (depends on glibc). The actual value of current time is
- meaningless, it can be used only to measure time differences.
-
- :returns: ``float`` with current time in seconds.
- """
- t = MonotonicClock.timespec(0, 0)
- ret = self._clock_gettime(self.CLOCK_MONOTONIC, ctypes.pointer(t))
-
- if ret < 0:
- raise TimerException("Cannot get clock time, clock_gettime() failed.")
- return t.tv_sec + t.tv_nsec * 10 ** (-9)
-
-class Timer(object):
- """
- A class representing a timer. A timer has a timeout and after the timeout,
- given callback is called and the timer is deleted.
- """
-
- @cmpi_logging.trace_method
- def __init__(self, timer_manager, name, callback=None, *args, **kwargs):
- """
- Create a timer. If specified, given callback is registered.
- The callback is called with *args and **kwargs.
-
- :param timer_manager: (``TimerManager)`` Instance of the timer manager
- which will manage the timer.
- :param name: (``string``) Name of the timer, used for logging.
- :param callback: (``function``) Callback to call when the timer expires.
- :param *args, **kwargs: Parameters of the callback.
- """
- self._mgr = timer_manager
- self._name = name
- self._callback = callback
- self._args = args
- self._kwargs = kwargs
-
- cmpi_logging.logger.trace_info("Timer: Timer %s created" % name)
-
- @cmpi_logging.trace_method
- def set_callback(self, callback, *args, **kwargs):
- """
- Set callback to call when the timer expires.
-
- :param callback: (``function``) Callback to call when the timer expires.
- :param *args, **kwargs: Parameters of the callback.
- """
- self._callback = callback
- self._args = args
- self._kwargs = kwargs
-
- @cmpi_logging.trace_method
- def start(self, timeout):
- """
- Start the timer with given timeout. After the timeout, the registered
- callback will be called.
-
- :param timeout: (``float``) Timeout in seconds.
- """
-
- self._timeout = timeout
- now = self._mgr.now()
- self._end_time = now + timeout
- cmpi_logging.logger.trace_info(
- "Timer: Timer %s started at %f for %f seconds"
- % (self._name, now, self._timeout))
- self._mgr._add_timer(self)
-
- @cmpi_logging.trace_method
- def cancel(self):
- """
- Cancel the timer. This method does not guarantee that the callback won't
- be called, the timer might be calling the callback right now,
- """
- cmpi_logging.logger.trace_info("Timer: Timer %s cancelled"
- % (self._name))
- self._mgr._remove_timer(self)
-
- @cmpi_logging.trace_method
- def _expired(self, now):
- """
- Returns True, if the timer is expired.
-
- :param now: (``float``) Current time, as returned by MonotonicClock.now().
- :returns: (``boolean``) ``True``, if the timer is expired.
- """
- if self._end_time <= now:
- cmpi_logging.logger.trace_info("Timer: Timer %s has expired"
- % (self._name))
- return True
- return False
-
- @cmpi_logging.trace_method
- def _expire(self):
- """
- Called when the timer expired. It calls the callback.
- """
- cmpi_logging.logger.trace_info("Timer: Calling callback for timer %s"
- % (self._name))
- self._callback(*self._args, **self._kwargs)
-
-class TimerManager(singletonmixin.Singleton):
- """
- Manages set of timers.
-
- Python standard Timer class creates a thread for
-
- each timer, which is inefficient. This class uses only one thread, which
- is registered at CIMOM, i.e. it can log as usual.
-
- This class is singleton, use TimerManager.get_instance() to get the
- instance.
-
- Still, the singleton needs to be initialized with ProviderEnvironment to
- enable logging in the timer thread. Use TimerManager.get_instance(env) in
- you provider initialization.
- """
-
- # Commands to the timer thread
- COMMAND_STOP = 1
- COMMAND_RESCHEDULE = 2
-
- @cmpi_logging.trace_method
- def __init__(self, env=None):
- """
- Initialize new thread manager.
-
- :param env: (``ProviderEnvironment``) Environment to use for logging.
- """
- self._clock = MonotonicClock()
- self._lock = threading.RLock()
- self._queue = Queue.Queue()
-
- # Array of timers. Assumption: nr. of timers is relatively small,
- # i.e. hundreds at the worst.
- self._timers = []
-
- new_broker = None
- if env:
- broker = env.get_cimom_handle()
- new_broker = broker.PrepareAttachThread()
-
- self._timer_thread = threading.Thread(
- target=self._timer_loop, args=(new_broker,))
- self._timer_thread.daemon = False
- self._timer_thread.start()
-
- def create_timer(self, name, callback=None, *args, **kwargs):
- """
- Create new timer. If specified, given callback is registered.
- The callback is called with *args and **kwargs.
-
- :param name: (``string``) Name of the timer, used for logging.
- :param callback: (``function``) Callback to call when the timer expires.
- :param *args, **kwargs: Parameters of the callback.
- """
- return Timer(self, name, callback, *args, **kwargs)
-
- def _timer_loop(self, broker):
- """
- TimerManager thread main loop. It waits for timeout of all timers
- and calls their callbacks.
-
- :param broker: (``BrokerCIMOMHandle``) CIM broker handle, used for
- logging.
- """
- if broker:
- broker.AttachThread()
- cmpi_logging.logger.info("Started Timer thread.")
- while True:
- self._handle_expired()
- timeout = self._find_timeout()
- if timeout != 0:
- # Wait for the timeout or any change in timers.
- try:
- command = self._queue.get(timeout=timeout)
- self._queue.task_done()
- if command == self.COMMAND_STOP:
- break # stop the thread
- # process COMMAND_RESCHEDULE in next loop
- except Queue.Empty:
- # Timeout has happened, ignore the exception.
- pass
- cmpi_logging.logger.info("Stopped Timer thread.")
-
- @cmpi_logging.trace_method
- def _handle_expired(self):
- """
- Finds all expired timers, calls their callback and removes them from
- list of timers.
- """
-
- # Get list of expired timers.
- with self._lock:
- now = self.now()
- cmpi_logging.logger.trace_info(
- "Timer: Checking for expired, now=%f." % (now))
- expired = [t for t in self._timers if t._expired(now)]
-
- # Call the callbacks (unlocked!).
- for t in expired:
- t._expire()
-
- # Remove the timers (locked).
- with self._lock:
- for t in expired:
- try:
- cmpi_logging.logger.trace_info(
- "Timer: Removing %s" % (t._name))
- self._timers.remove(t)
- except ValueError:
- # The timer has already been removed.
- pass
-
- @cmpi_logging.trace_method
- def _find_timeout(self):
- """
- Return nearest timeout, in seconds (as float, i.e. subsecond timeout
- is possible). If no timer is scheduled, None is returned.
- If there are expired timers, 0 is returned.
-
- :returns: Positive ``float``: Nearest timeout.
- :returns: ``0``: Some timer has expired.
- :returns: ``None``: No timer is scheduled.
- """
- with self._lock:
- if not self._timers:
- cmpi_logging.logger.trace_info(
- "Timer: No timers scheduled, waiting forever.")
- return None
- closest = min(self._timers, key=lambda timer: timer._end_time)
- now = self.now()
- timeout = closest._end_time - now
- if timeout > 0:
- cmpi_logging.logger.trace_info(
- "Timer: Waiting for %f seconds, now=%f."
- % (timeout, now))
- return timeout
- cmpi_logging.logger.trace_info(
- "Timer: Some timer has already expired, no waiting.")
- return 0
-
- @cmpi_logging.trace_method
- def _add_timer(self, timer):
- """
- Adds timer to list of timers. The timer must be started, i.e. its
- timeout must be nozero!
- This is internal method called by Timer.start().
-
- :param timer: (``Timer``) Timer to add.
- """
- with self._lock:
- self._timers.append(timer)
- # Wake up the timer manager thread.
- self._queue.put(self.COMMAND_RESCHEDULE)
- cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name))
-
- @cmpi_logging.trace_method
- def _remove_timer(self, timer):
- """
- Remove timer from list of timers.
- This is internal method called by Timer.cancel().
- :param timer: (``Timer``) Timer to remove.
- """
- with self._lock:
- try:
- self._timers.remove(timer)
- except ValueError:
- pass
- # Wake up the timer manager thread.
- self._queue.put(self.COMMAND_RESCHEDULE)
- cmpi_logging.logger.trace_info("Timer: Timer %s removed"
- % (timer._name))
-
- def now(self):
- """
- Return current time, not influenced by NTP or admin setting date or
- time. The actual value of current time is meaningless, it can be used
- only to measure time differences.
-
- :returns: ``float`` Current time, in seconds.
- """
- return self._clock.now()
-
- @cmpi_logging.trace_method
- def shutdown(self):
- """
- Stop the thread. This method blocks until the thread is safely
- destroyed.
- """
- self._queue.put(self.COMMAND_STOP)
- self._timer_thread.join()
-
-if __name__ == "__main__":
- cmpi_logging.logger = cmpi_logging.CMPILogger("")
- import time
-
- class Env(object):
- def AttachThread(self):
- pass
- def PrepareAttachThread(self):
- return self
- def get_cimom_handle(self):
- return self
-
- clock = MonotonicClock()
-
- start = clock.now()
- time.sleep(0.5)
- print "Clock 0.5:", clock.now() - start
-
- time.sleep(0.5)
- print "Clock 1:", clock.now() - start
-
- mgr = TimerManager.get_instance(Env())
-
- def callback(msg):
- if callback.first:
- t = mgr.create_timer("internal 0.5")
- t.set_callback(callback, "internal 0.5")
- t.start(0.5)
- callback.first = False
-
- print clock.now(), msg
-
- callback.first = True
-
- t1 = mgr.create_timer("one second")
- t1.set_callback(callback, "1")
- t1.start(1)
- t2 = mgr.create_timer("two seconds")
- t2.set_callback(callback, "2")
- t2.start(2)
- t22 = mgr.create_timer("two seconds 2")
- t22.set_callback(callback, "2 again")
- t22.start(2)
- t15 = mgr.create_timer("one+half seconds")
- t15.set_callback(callback, "1.5")
- t15.start(1.5)
-
- time.sleep(4)
-
- mgr.stop_thread()
diff --git a/src/python/lmi/common/__init__.py b/src/python/lmi/common/__init__.py
deleted file mode 100644
index baebcdb..0000000
--- a/src/python/lmi/common/__init__.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# Software Management Providers
-#
-# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""
-Common utilities for OpenLMI python providers.
-"""
-def parse_instance_id(instance_id, classname=None):
- """
- Parse InstanceID, check it has LMI:<classname>:<ID> format and return
- the ID. Return None if the format is bad.
- :param instance_id: (``string``) String to parse.
- :param classname: (``string``) Name of class, whose InstanceID we parse.
- If the classname is None, it won't be checked.
- :returns: ``string`` with the ID.
- """
- parts = instance_id.split(":", 2)
- if len(parts) != 3:
- return None
- if parts[0] != "LMI":
- return None
- real_classname = parts[1]
- if classname and real_classname.lower() != classname.lower():
- return None
- return parts[2]
diff --git a/src/python/lmi/common/cmpi_logging.py b/src/python/lmi/common/cmpi_logging.py
deleted file mode 100644
index a97e4ab..0000000
--- a/src/python/lmi/common/cmpi_logging.py
+++ /dev/null
@@ -1,204 +0,0 @@
-# -*- Coding:utf-8 -*-
-#
-# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Jan Safranek <jsafrane@redhat.com>
-
-
-import logging
-import inspect
-import traceback
-
-TRACE_WARNING = logging.INFO - 1
-TRACE_INFO = logging.INFO - 2
-TRACE_VERBOSE = logging.DEBUG
-
-class CMPILogHandler(logging.Handler):
- """
- A handler class, which sends log messages to CMPI log.
- """
-
- def __init__(self, cmpi_logger, *args, **kwargs):
- self.cmpi_logger = cmpi_logger
- super(CMPILogHandler, self).__init__(*args, **kwargs)
-
- def emit(self, record):
- msg = self.format(record)
- if record.levelno >= logging.ERROR:
- self.cmpi_logger.log_error(msg)
- elif record.levelno >= logging.WARNING:
- self.cmpi_logger.log_warn(msg)
- elif record.levelno >= logging.INFO:
- self.cmpi_logger.log_info(msg)
- elif record.levelno >= TRACE_WARNING:
- self.cmpi_logger.trace_warn(record.filename, msg)
- elif record.levelno >= TRACE_INFO:
- self.cmpi_logger.trace_info(record.filename, msg)
- elif record.levelno >= logging.DEBUG:
- self.cmpi_logger.trace_verbose(record.filename, msg)
-
-class CMPILogger(logging.getLoggerClass()):
- """
- A logger class, which adds trace_method level log methods.
- """
- def trace_warn(self, msg, *args, **kwargs):
- """ Log message with TRACE_WARNING severity. """
- self.log(TRACE_WARNING, msg, *args, **kwargs)
-
- def trace_info(self, msg, *args, **kwargs):
- """ Log message with TRACE_INFO severity. """
- self.log(TRACE_INFO, msg, *args, **kwargs)
-
- def trace_verbose(self, msg, *args, **kwargs):
- """ Log message with TRACE_VERBOSE severity. """
- self.log(TRACE_VERBOSE, msg, *args, **kwargs)
-
-logging.setLoggerClass(CMPILogger)
-
-def trace_method(func):
- """ Decorator, trace entry and exit for a class method. """
- classname = inspect.getouterframes(inspect.currentframe())[1][3]
- def helper_func(*args, **kwargs):
- """
- Helper function, wrapping real function by trace_method decorator.
- """
- logger.log(TRACE_VERBOSE, "Entering %s.%s", classname, func.__name__)
- try:
- ret = func(*args, **kwargs)
- except Exception as exc:
- if getattr(exc, "tb_printed", False) is False:
- logger.exception("full traceback")
- logger.log(TRACE_VERBOSE, "traceback: %s",
- traceback.format_exc())
- exc.tb_printed = True
- logger.log(TRACE_WARNING, "%s.%s threw exception %s",
- classname, func.__name__, str(exc))
- raise
- logger.log(TRACE_VERBOSE, "Exiting %s.%s", classname, func.__name__)
- return ret
- helper_func.__name__ = func.__name__
- helper_func.__doc__ = func.__doc__
- helper_func.__module__ = func.__module__
- return helper_func
-
-def trace_function(func):
- """ Decorator, trace entry and exit for a function outside any class. """
- def helper_func(*args, **kwargs):
- """
- Helper function, wrapping real function by trace_method decorator.
- """
- logger.log(TRACE_VERBOSE, "Entering %s.%s",
- func.__module__, func.__name__)
- try:
- ret = func(*args, **kwargs)
- except Exception as exc:
- if getattr(exc, "tb_printed", False) is False:
- logger.exception("full traceback")
- logger.log(TRACE_VERBOSE, "traceback: %s",
- traceback.format_exc())
- exc.tb_printed = True
- logger.log(TRACE_WARNING, "%s.%s threw exception %s",
- func.__module__, func.__name__, str(exc))
- raise
- logger.log(TRACE_VERBOSE, "Exiting %s.%s",
- func.__module__, func.__name__)
- return ret
- helper_func.__name__ = func.__name__
- helper_func.__doc__ = func.__doc__
- helper_func.__module__ = func.__module__
- return helper_func
-
-class LogManager(object):
- """
- Class, which takes care of CMPI logging.
- There should be only one instance of this class and it should be
- instantiated as soon as possible, even before reading a config.
- The config file can be provided later by set_config call.
- """
- FORMAT_STDERR = '%(levelname)s: %(message)s'
- FORMAT_CMPI = '%(levelname)s: %(message)s'
-
- LOGGER_NAME = "lmi.storage"
-
- def __init__(self, env):
- """
- Initialize logging.
- """
- formatter = logging.Formatter(self.FORMAT_CMPI)
-
- self.cmpi_handler = CMPILogHandler(env.get_logger())
- self.cmpi_handler.setLevel(logging.DEBUG)
- self.cmpi_handler.setFormatter(formatter)
-
- self.logger = logging.getLogger(self.LOGGER_NAME)
- self.logger.addHandler(self.cmpi_handler)
- self.logger.setLevel(logging.INFO)
-
- self.stderr_handler = None
- self.config = None
-
- global logger # IGNORE:W0603
- logger = self.logger
- logger.info("CMPI log started")
-
- @trace_method
- def set_config(self, config):
- """
- Set a configuration of logging. It applies its setting immediately
- and also subscribes for configuration changes.
- """
- self.config = config
- config.add_listener(self._config_changed)
- # apply the config
- self._config_changed(config)
-
- @trace_method
- def _config_changed(self, config):
- """
- Apply changed configuration, i.e. start/stop sending to stderr
- and set appropriate log level.
- """
- if config.tracing:
- self.logger.setLevel(logging.DEBUG)
- else:
- self.logger.setLevel(logging.INFO)
- if config.stderr:
- # start sending to stderr
- if not self.stderr_handler:
- # create stderr handler
- formatter = logging.Formatter(self.FORMAT_STDERR)
- self.stderr_handler = logging.StreamHandler()
- self.stderr_handler.setLevel(logging.DEBUG)
- self.stderr_handler.setFormatter(formatter)
- self.logger.addHandler(self.stderr_handler)
- self.logger.info("Started logging to stderr.")
- else:
- # stop sending to stderr
- if self.stderr_handler:
- self.logger.info("Stopped logging to stderr.")
- self.logger.removeHandler(self.stderr_handler)
- self.stderr_handler = None
-
- def destroy(self):
- if self.stderr_handler:
- self.logger.removeHandler(self.stderr_handler)
- self.stderr_handler = None
- self.logger.removeHandler(self.cmpi_handler)
- self.cmpi_handler = None
- self.config.remove_listener(self._config_changed)
-
-logger = None
diff --git a/src/python/lmi/common/singletonmixin.py b/src/python/lmi/common/singletonmixin.py
deleted file mode 100644
index c252676..0000000
--- a/src/python/lmi/common/singletonmixin.py
+++ /dev/null
@@ -1,560 +0,0 @@
-#pylint: disable-all
-"""
-A Python Singleton mixin class that makes use of some of the ideas
-found at http://c2.com/cgi/wiki?PythonSingleton. Just inherit
-from it and you have a singleton. No code is required in
-subclasses to create singleton behavior -- inheritance from
-Singleton is all that is needed.
-
-Singleton creation is threadsafe.
-
-USAGE:
-
-Just inherit from Singleton. If you need a constructor, include
-an __init__() method in your class as you usually would. However,
-if your class is S, you instantiate the singleton using S.get_instance()
-instead of S(). Repeated calls to S.get_instance() return the
-originally-created instance.
-
-For example:
-
-class S(Singleton):
-
- def __init__(self, a, b=1):
- pass
-
-S1 = S.get_instance(1, b=3)
-
-
-Most of the time, that's all you need to know. However, there are some
-other useful behaviors. Read on for a full description:
-
-1) Getting the singleton:
-
- S.get_instance()
-
-returns the instance of S. If none exists, it is created.
-
-2) The usual idiom to construct an instance by calling the class, i.e.
-
- S()
-
-is disabled for the sake of clarity.
-
-For one thing, the S() syntax means instantiation, but get_instance()
-usually does not cause instantiation. So the S() syntax would
-be misleading.
-
-Because of that, if S() were allowed, a programmer who didn't
-happen to notice the inheritance from Singleton (or who
-wasn't fully aware of what a Singleton pattern
-does) might think he was creating a new instance,
-which could lead to very unexpected behavior.
-
-So, overall, it is felt that it is better to make things clearer
-by requiring the call of a class method that is defined in
-Singleton. An attempt to instantiate via S() will result
-in a SingletonException being raised.
-
-3) Use __S.__init__() for instantiation processing,
-since S.get_instance() runs S.__init__(), passing it the args it has received.
-
-If no data needs to be passed in at instantiation time,
-you don't need S.__init__().
-
-4) If S.__init__(.) requires parameters, include them ONLY in the
-first call to S.get_instance(). If subsequent calls have arguments,
-a SingletonException is raised by default.
-
-If you find it more convenient for subsequent calls to be allowed to
-have arguments, but for those argumentsto be ignored, just include
-'ignoreSubsequent = True' in your class definition, i.e.:
-
- class S(Singleton):
-
- ignoreSubsequent = True
-
- def __init__(self, a, b=1):
- pass
-
-5) For testing, it is sometimes convenient for all existing singleton
-instances to be forgotten, so that new instantiations can occur. For that
-reason, a _forget_all_singletons() function is included. Just call
-
- _forget_all_singletons()
-
-and it is as if no earlier instantiations have occurred.
-
-6) As an implementation detail, classes that inherit
-from Singleton may not have their own __new__
-methods. To make sure this requirement is followed,
-an exception is raised if a Singleton subclass includ
-es __new__. This happens at subclass instantiation
-time (by means of the MetaSingleton metaclass.
-
-
-By Gary Robinson, grobinson@flyfi.com. No rights reserved --
-placed in the public domain -- which is only reasonable considering
-how much it owes to other people's code and ideas which are in the
-public domain. The idea of using a metaclass came from
-a comment on Gary's blog (see
-http://www.garyrobinson.net/2004/03/python_singleto.html#comments).
-Other improvements came from comments and email from other
-people who saw it online. (See the blog post and comments
-for further credits.)
-
-Not guaranteed to be fit for any particular purpose. Use at your
-own risk.
-"""
-
-import threading
-
-class SingletonException(Exception):
- """
- Base exception related to singleton handling.
- """
- pass
-
-_ST_SINGLETONS = set()
-_LOCK_FOR_SINGLETONS = threading.RLock()
-# Ensure only one instance of each Singleton class is created. This is not
-# bound to the _LOCK_FOR_SINGLETON_CREATION = threading.RLock() individual
-# Singleton class since we need to ensure that there is only one mutex for each
-# Singleton class, which would require having a lock when setting up the
-# Singleton class, which is what this is anyway. So, when any Singleton is
-# created, we lock this lock and then we don't need to lock it again for that
-# class.
-_LOCK_FOR_SINGLETON_CREATION = threading.RLock()
-
-def _create_singleton_instance(cls, lst_args, dct_kw_args):
- """
- Creates singleton instance and stores its class in set.
- """
- _LOCK_FOR_SINGLETON_CREATION.acquire()
- try:
- if cls._is_instantiated(): # some other thread got here first
- return
-
- instance = cls.__new__(cls)
- try:
- instance.__init__(*lst_args, **dct_kw_args)
- except TypeError, exc:
- if '__init__() takes' in exc.message:
- raise SingletonException, (
- 'If the singleton requires __init__ args,'
- ' supply them on first call to get_instance().')
- else:
- raise
- cls.c_instance = instance
- _add_singleton(cls)
- finally:
- _LOCK_FOR_SINGLETON_CREATION.release()
-
-def _add_singleton(cls):
- """
- Adds class to singleton set.
- """
- _LOCK_FOR_SINGLETONS.acquire()
- try:
- assert cls not in _ST_SINGLETONS
- _ST_SINGLETONS.add(cls)
- finally:
- _LOCK_FOR_SINGLETONS.release()
-
-def _remove_singleton(cls):
- """
- Removes class from singleton set.
- """
- _LOCK_FOR_SINGLETONS.acquire()
- try:
- if cls in _ST_SINGLETONS:
- _ST_SINGLETONS.remove(cls)
- finally:
- _LOCK_FOR_SINGLETONS.release()
-
-def _forget_all_singletons():
- '''
- This is useful in tests, since it is hard to know which singletons need
- to be cleared to make a test work.
- '''
- _LOCK_FOR_SINGLETONS.acquire()
- try:
- for cls in _ST_SINGLETONS.copy():
- cls._forget_class_instance_reference_for_testing()
-
- # Might have created some Singletons in the process of tearing down.
- # Try one more time - there should be a limit to this.
- i_num_singletons = len(_ST_SINGLETONS)
- if len(_ST_SINGLETONS) > 0:
- for cls in _ST_SINGLETONS.copy():
- cls._forget_class_instance_reference_for_testing()
- i_num_singletons -= 1
- assert i_num_singletons == len(_ST_SINGLETONS), \
- 'Added a singleton while destroying ' + str(cls)
- assert len(_ST_SINGLETONS) == 0, _ST_SINGLETONS
- finally:
- _LOCK_FOR_SINGLETONS.release()
-
-class MetaSingleton(type):
- """
- Metaclass for Singleton base class.
- """
- def __new__(mcs, str_name, tup_bases, dct):
- if dct.has_key('__new__'):
- raise SingletonException, 'Can not override __new__ in a Singleton'
- return super(MetaSingleton, mcs).__new__(
- mcs, str_name, tup_bases, dct)
-
- def __call__(cls, *lst_args, **dictArgs):
- raise SingletonException, \
- 'Singletons may only be instantiated through get_instance()'
-
-class Singleton(object):
- """
- Base class for all singletons.
- """
- __metaclass__ = MetaSingleton
-
- def get_instance(cls, *lst_args, **dct_kw_args):
- """
- Call this to instantiate an instance or retrieve the existing instance.
- If the singleton requires args to be instantiated, include them the first
- time you call get_instance.
- """
- if cls._is_instantiated():
- if ( (lst_args or dct_kw_args)
- and not hasattr(cls, 'ignoreSubsequent')):
- raise SingletonException, (
- 'Singleton already instantiated, but get_instance()'
- ' called with args.')
- else:
- _create_singleton_instance(cls, lst_args, dct_kw_args)
-
- return cls.c_instance #pylint: disable=E1101
- get_instance = classmethod(get_instance)
-
- def _is_instantiated(cls):
- """
- Don't use hasattr(cls, 'c_instance'), because that screws things
- up if there is a singleton that extends another singleton.
- hasattr looks in the base class if it doesn't find in subclass.
- """
- return 'c_instance' in cls.__dict__
- _is_instantiated = classmethod(_is_instantiated)
-
- # This can be handy for public use also
- isInstantiated = _is_instantiated
-
- def _forget_class_instance_reference_for_testing(cls):
- """
- This is designed for convenience in testing -- sometimes you
- want to get rid of a singleton during test code to see what
- happens when you call get_instance() under a new situation.
-
- To really delete the object, all external references to it
- also need to be deleted.
- """
- try:
- if hasattr(cls.c_instance, '_prepare_to_forget_singleton'):
- # tell instance to release anything it might be holding onto.
- cls.c_instance._prepare_to_forget_singleton()
- del cls.c_instance
- _remove_singleton(cls)
- except AttributeError:
- # run up the chain of base classes until we find the one that has
- # the instance and then delete it there
- for base_class in cls.__bases__:
- if issubclass(base_class, Singleton):
- base_class._forget_class_instance_reference_for_testing()
- _forget_class_instance_reference_for_testing = classmethod(
- _forget_class_instance_reference_for_testing)
-
-
-if __name__ == '__main__':
-
- import unittest
- import time
-
- class SingletonMixinPublicTestCase(unittest.TestCase):
- """
- TestCase for singleton class.
- """
- def testReturnsSameObject(self): #pylint: disable=C0103
- """
- Demonstrates normal use -- just call get_instance and it returns a singleton instance
- """
-
- class Foo(Singleton):
- """Singleton child class."""
- def __init__(self):
- super(Foo, self).__init__()
-
- a1 = Foo.get_instance()
- a2 = Foo.get_instance()
- self.assertEquals(id(a1), id(a2))
-
- def testInstantiateWithMultiArgConstructor(self):#pylint: disable=C0103
- """
- If the singleton needs args to construct, include them in the first
- call to get instances.
- """
-
- class Bar(Singleton):
- """Singleton child class."""
-
- def __init__(self, arg1, arg2):
- super(Bar, self).__init__()
- self.arg1 = arg1
- self.arg2 = arg2
-
- b1 = Bar.get_instance('arg1 value', 'arg2 value')
- b2 = Bar.get_instance()
- self.assertEquals(b1.arg1, 'arg1 value')
- self.assertEquals(b1.arg2, 'arg2 value')
- self.assertEquals(id(b1), id(b2))
-
- def testInstantiateWithKeywordArg(self):
- """
- Test instantiation with keyword arguments.
- """
-
- class Baz(Singleton):
- """Singleton child class."""
- def __init__(self, arg1=5):
- super(Baz, self).__init__()
- self.arg1 = arg1
-
- b1 = Baz.get_instance('arg1 value')
- b2 = Baz.get_instance()
- self.assertEquals(b1.arg1, 'arg1 value')
- self.assertEquals(id(b1), id(b2))
-
- def testTryToInstantiateWithoutNeededArgs(self):
- """
- This tests, improper instantiation.
- """
-
- class Foo(Singleton):
- """Singleton child class."""
- def __init__(self, arg1, arg2):
- super(Foo, self).__init__()
- self.arg1 = arg1
- self.arg2 = arg2
-
- self.assertRaises(SingletonException, Foo.get_instance)
-
- def testPassTypeErrorIfAllArgsThere(self):
- """
- Make sure the test for capturing missing args doesn't interfere
- with a normal TypeError.
- """
- class Bar(Singleton):
- """Singleton child class."""
- def __init__(self, arg1, arg2):
- super(Bar, self).__init__()
- self.arg1 = arg1
- self.arg2 = arg2
- raise TypeError, 'some type error'
-
- self.assertRaises(TypeError, Bar.get_instance, 1, 2)
-
- def testTryToInstantiateWithoutGetInstance(self):
- """
- Demonstrates that singletons can ONLY be instantiated through
- get_instance, as long as they call Singleton.__init__ during
- construction.
-
- If this check is not required, you don't need to call
- Singleton.__init__().
- """
-
- class A(Singleton):
- def __init__(self):
- super(A, self).__init__()
-
- self.assertRaises(SingletonException, A)
-
- def testDontAllowNew(self):
-
- def instantiatedAnIllegalClass():
- class A(Singleton):
- def __init__(self):
- super(A, self).__init__()
-
- def __new__(metaclass, str_name, tup_bases, dct):
- return super(MetaSingleton, metaclass).__new__(
- metaclass, str_name, tup_bases, dct)
-
- self.assertRaises(SingletonException, instantiatedAnIllegalClass)
-
-
- def testDontAllowArgsAfterConstruction(self):
- class B(Singleton):
-
- def __init__(self, arg1, arg2):
- super(B, self).__init__()
- self.arg1 = arg1
- self.arg2 = arg2
-
- B.get_instance('arg1 value', 'arg2 value')
- self.assertRaises(SingletonException, B, 'arg1 value', 'arg2 value')
-
- def test_forgetClassInstanceReferenceForTesting(self):
- class A(Singleton):
- def __init__(self):
- super(A, self).__init__()
- class B(A):
- def __init__(self):
- super(B, self).__init__()
-
- # check that changing the class after forgetting the instance
- # produces an instance of the new class
- a = A.get_instance()
- assert a.__class__.__name__ == 'A'
- A._forget_class_instance_reference_for_testing()
- b = B.get_instance()
- assert b.__class__.__name__ == 'B'
-
- # check that invoking the 'forget' on a subclass still deletes
- # the instance
- B._forget_class_instance_reference_for_testing()
- a = A.get_instance()
- B._forget_class_instance_reference_for_testing()
- b = B.get_instance()
- assert b.__class__.__name__ == 'B'
-
- def test_forgetAllSingletons(self):
- # Should work if there are no singletons
- _forget_all_singletons()
-
- class A(Singleton):
- ciInitCount = 0
- def __init__(self):
- super(A, self).__init__()
- A.ciInitCount += 1
-
- A.get_instance()
- self.assertEqual(A.ciInitCount, 1)
-
- A.get_instance()
- self.assertEqual(A.ciInitCount, 1)
-
- _forget_all_singletons()
- A.get_instance()
- self.assertEqual(A.ciInitCount, 2)
-
- def test_threadedCreation(self):
- # Check that only one Singleton is created even if multiple threads
- # try at the same time. If fails, would see assert in _add_singleton
- class Test_Singleton(Singleton):
- def __init__(self):
- super(Test_Singleton, self).__init__()
-
- class Test_SingletonThread(threading.Thread):
- def __init__(self, fTargetTime):
- super(Test_SingletonThread, self).__init__()
- self._fTargetTime = fTargetTime
- self._eException = None
-
- def run(self):
- try:
- fSleepTime = self._fTargetTime - time.time()
- if fSleepTime > 0:
- time.sleep(fSleepTime)
- Test_Singleton.get_instance()
- except Exception, exc:
- self._eException = exc
-
- fTargetTime = time.time() + 0.1
- lstThreads = []
- for _ in xrange(100):
- t = Test_SingletonThread(fTargetTime)
- t.start()
- lstThreads.append(t)
- eException = None
- for t in lstThreads:
- t.join()
- if t._eException and not eException:
- eException = t._eException
- if eException:
- raise eException
-
- def testNoInit(self):
- """
- Demonstrates use with a class not defining __init__
- """
-
- class A(Singleton):
- pass
-
- #INTENTIONALLY UNDEFINED:
- #def __init__(self):
- # super(A, self).__init__()
-
- A.get_instance() #Make sure no exception is raised
-
- def testMultipleGetInstancesWithArgs(self):
-
- class A(Singleton):
-
- ignoreSubsequent = True
-
- def __init__(self, a, b=1):
- pass
-
- a1 = A.get_instance(1)
- # ignores the second call because of ignoreSubsequent
- a2 = A.get_instance(2)
-
- class B(Singleton):
-
- def __init__(self, a, b=1):
- pass
-
- b1 = B.get_instance(1)
- # No ignoreSubsequent included
- self.assertRaises(SingletonException, B.get_instance, 2)
-
- class C(Singleton):
-
- def __init__(self, a=1):
- pass
-
- c1 = C.get_instance(a=1)
- # No ignoreSubsequent included
- self.assertRaises(SingletonException, C.get_instance, a=2)
-
- def testInheritance(self):
- """
- It's sometimes said that you can't subclass a singleton (see, for instance,
- http://steve.yegge.googlepages.com/singleton-considered-stupid point e). This
- test shows that at least rudimentary subclassing works fine for us.
- """
-
- class A(Singleton):
-
- def set_x(self, x):
- self.x = x
-
- def setZ(self, z):
- raise NotImplementedError
-
- class B(A):
-
- def set_x(self, x):
- self.x = -x
-
- def set_y(self, y):
- self.y = y
-
- a = A.get_instance()
- a.set_x(5)
- b = B.get_instance()
- b.set_x(5)
- b.set_y(50)
- self.assertEqual((a.x, b.x, b.y), (5, -5, 50))
- self.assertRaises(AttributeError, eval, 'a.set_y', {}, locals())
- self.assertRaises(NotImplementedError, b.setZ, 500)
-
- unittest.main()
-