From 2a2bc8a4e9498024c8a85ce2813e7d0f9c5677a0 Mon Sep 17 00:00:00 2001 From: Michal Minar Date: Tue, 30 Jul 2013 14:24:49 +0200 Subject: openlmi-python: split python package Split the openlmi-python package to 2: * openlmi-python-base - lmi namespace - functionality for any OpenLMI related python code - contains packages 'lmi' and 'lmi.base' * openlmi-python-providers - common functionality for OpenLMI providers - contains 'lmi.providers' --- src/python/lmi/base/BaseConfiguration.py | 268 ++++ src/python/lmi/base/__init__.py | 24 + src/python/lmi/base/singletonmixin.py | 560 +++++++++ src/python/lmi/common/BaseConfiguration.py | 267 ---- src/python/lmi/common/IndicationManager.py | 758 ----------- src/python/lmi/common/JobManager.py | 1670 ------------------------- src/python/lmi/common/TimerManager.py | 421 ------- src/python/lmi/common/__init__.py | 42 - src/python/lmi/common/cmpi_logging.py | 204 --- src/python/lmi/common/singletonmixin.py | 560 --------- src/python/lmi/providers/IndicationManager.py | 758 +++++++++++ src/python/lmi/providers/JobManager.py | 1670 +++++++++++++++++++++++++ src/python/lmi/providers/TimerManager.py | 422 +++++++ src/python/lmi/providers/__init__.py | 42 + src/python/lmi/providers/cmpi_logging.py | 204 +++ src/python/setup.py | 4 +- 16 files changed, 3950 insertions(+), 3924 deletions(-) create mode 100644 src/python/lmi/base/BaseConfiguration.py create mode 100644 src/python/lmi/base/__init__.py create mode 100644 src/python/lmi/base/singletonmixin.py delete mode 100644 src/python/lmi/common/BaseConfiguration.py delete mode 100644 src/python/lmi/common/IndicationManager.py delete mode 100644 src/python/lmi/common/JobManager.py delete mode 100644 src/python/lmi/common/TimerManager.py delete mode 100644 src/python/lmi/common/__init__.py delete mode 100644 src/python/lmi/common/cmpi_logging.py delete mode 100644 src/python/lmi/common/singletonmixin.py create mode 100644 src/python/lmi/providers/IndicationManager.py create mode 100644 src/python/lmi/providers/JobManager.py create mode 100644 src/python/lmi/providers/TimerManager.py create mode 100644 src/python/lmi/providers/__init__.py create mode 100644 src/python/lmi/providers/cmpi_logging.py diff --git a/src/python/lmi/base/BaseConfiguration.py b/src/python/lmi/base/BaseConfiguration.py new file mode 100644 index 0000000..8790acf --- /dev/null +++ b/src/python/lmi/base/BaseConfiguration.py @@ -0,0 +1,268 @@ +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek +# Authors: Michal Minar +# -*- coding: utf-8 -*- +""" +Module for BaseConfiguration class. + +BaseConfiguration +-------------------- + +.. autoclass:: BaseConfiguration + :members: + +""" + +import ConfigParser +import logging +import os +import socket + +from lmi.base.singletonmixin import Singleton + +def convert_value(section, option, convert_func, value): + """ + Return result of application of ``convert_func`` on value. + If the conversion failes, error is logged and ValueError is raised. + + :param section: (``str``) Section of configuration file. Used for + error message. + :param option: (``str``) Option of configuration file. Used for + error message. + :param convert_func: (``type``) Conversion function to apply on passed + value. + :param value: (``basestring``) Value to convert. + """ + if not isinstance(value, basestring): + raise TypeError("value must be a string") + try: + if convert_func is bool: + return value.lower() in ('1', 'y', 'yes', 'on', 'true') + if convert_func is str and isinstance(value, unicode): + return value.encode('utf-8') + if convert_func is unicode and isinstance(value, str): + return value.decode('utf-8') + return convert_func(value) + except ValueError as exc: + logging.getLogger(__name__).error( + 'failed to convert value of "[%s]%s: %s', section, option, + exc) + raise + +class BaseConfiguration(Singleton): + """ + OpenLMI configuration file. By default, it resides in + /etc/openlmi/${provider_prefix}/${provider_prefix}.conf. + + There should be only one instance of this class. + """ + + CONFIG_DIRECTORY_TEMPLATE = '/etc/openlmi/%(provider_prefix)s/' + CONFIG_FILE_PATH_TEMPLATE = \ + CONFIG_DIRECTORY_TEMPLATE + '%(provider_prefix)s.conf' + + PERSISTENT_PATH_TEMPLATE = '/var/lib/openlmi-%(provider_prefix)s/' + SETTINGS_DIR = 'settings/' + + DEFAULT_OPTIONS = { + 'Namespace' : 'root/cimv2', + 'SystemClassName' : 'Linux_ComputerSystem', + # Default logging level + "Level" : "ERROR", + 'DebugBlivet' : 'false', + 'Stderr' : 'false', + } + + @classmethod + def provider_prefix(cls): + """ + This is responsibility of a subclass. + + :rtype: (``string`) Prefix of providers in lowercase. For example + configuration class for storage providers would return "storage". + + Result is used to construct configuration paths. + """ + raise NotImplementedError + + @classmethod + def default_options(cls): + """ :rtype: (``dict``) Dictionary of default values. """ + return cls.DEFAULT_OPTIONS + + @classmethod + def config_directory(cls): + """ Base directory with configuration settings. """ + return cls.CONFIG_DIRECTORY_TEMPLATE % { + 'provider_prefix' : cls.provider_prefix() } + + @classmethod + def persistent_path(cls): + """ Base directory with persistent settings. """ + return cls.PERSISTENT_PATH_TEMPLATE % { + 'provider_prefix': cls.provider_prefix() } + + @classmethod + def config_file_path(cls): + """ File path of configuration file. """ + return cls.CONFIG_FILE_PATH_TEMPLATE % { + 'provider_prefix' : cls.provider_prefix() } + + @classmethod + def mandatory_sections(cls): + """ + Return list of sections, that must be present in configuration + file. If not present, they will be created in memory. + """ + return ['Log', 'CIM'] + + def __init__(self): + """ Initialize and load a configuration file.""" + self._listeners = set() + self.config = ConfigParser.SafeConfigParser( + defaults=self.default_options()) + self.load() + + def add_listener(self, callback): + """ + Add a callback, which will be called when configuration is updated. + The callback will be called with instance of this class as + parameter: + callback(config) + """ + self._listeners.add(callback) + + def remove_listener(self, callback): + """ + Remove previously registered callback. + """ + + self._listeners.remove(callback) + + def _call_listeners(self): + """ + Call all listeners that configuration has updated. + """ + for callback in self._listeners: + callback(self) + + def load(self): + """ + Load configuration from config file path. + The file does not need to exist. + """ + self.config.read(self.config_file_path()) + for section in self.mandatory_sections(): + if not self.config.has_section(section): + self.config.add_section(section) + self._call_listeners() + + @property + def namespace(self): + """ Return namespace of OpenLMI provider. """ + return self.config.get('CIM', 'Namespace') + + @property + def system_class_name(self): + """ Return SystemClassName of OpenLMI provider. """ + return self.config.get('CIM', 'SystemClassName') + + @property + def system_name(self): + """ Return SystemName of OpenLMI provider. """ + return socket.getfqdn() + + @property + def logging_level(self): + """ Return name of logging level in lower case. """ + return self.config.get('Log', 'Level').lower() + + @property + def stderr(self): + """ Return True if logging to stderr is enabled. """ + return self.config.getboolean('Log', 'Stderr') + + def file_path(self, section, option): + """ + Return absolute file path for requested option. + Relative path is converted to absolute one with config's directory + as a prefix. + """ + path = self.config.get(section, option) + if not os.path.isabs(path): + path = os.path.join(self.config_directory(), path) + return path + + def get_safe(self, section, option, convert=str, fallback=None, + *args, **kwargs): + """ + Get the configuration option value as specified type in a safe way. + Value is searched in this order: + config_file -> defaults_dict -> fallback + + :param section: (``str``) Section name of option. + :param option: (``str``) Option name. + :param convert: (``type``) Is a conversion function for obtained + value. If the value could not be converted, error message is + generated and ``fallback`` is returned. This function is not + applied to ``fallback`` value. Supported values are: + str, unicode, int ,float, long, bool + + :param fallback: Value returned, when section or option does not + exists and no default value is given, or when the obtained value + could not be converted by supplied function. + + All the other parameters are passed to the ``SafeConfigParser.get()`` + method. + """ + if not isinstance(section, basestring): + raise TypeError('section must be a string') + if not isinstance(option, basestring): + raise TypeError("option must be a string") + if not convert in (str, unicode, int, float, long, bool): + raise ValueError("unsupported type for conversion: %s:", + getattr(convert, '__name__', 'unknown')) + if ( not self.config.has_option(section, option) + and not option.lower() in self.default_options()): + logging.getLogger(__name__).warn( + 'no option value and no default supplied for "[%s]%s"', + section, option) + return fallback + try: + value = self.config.get(section, option, *args, **kwargs) + except ConfigParser.Error as exc: + logging.getLogger(__name__).error( + 'failed to get value of "[%s]%s": %s', section, option, + exc) + return fallback + try: + # first try to convert value from config + return convert_value(section, option, convert, value) + except ValueError as exc: + logging.getLogger(__name__).error( + 'failed to convert value of "[%s]%s: %s', section, option, + exc) + # if it failes, try the value from defaults + if ( option.lower() in self.default_options() + and self.default_options()[option.lower()] != value): + try: + return convert_value(section, option, convert, + self.default_options()[option.lower()]) + except ValueError: + pass # error is already logged, no more options left + return fallback diff --git a/src/python/lmi/base/__init__.py b/src/python/lmi/base/__init__.py new file mode 100644 index 0000000..c3b443f --- /dev/null +++ b/src/python/lmi/base/__init__.py @@ -0,0 +1,24 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar +# + +""" +Common utilities for OpenLMI python projects. +""" diff --git a/src/python/lmi/base/singletonmixin.py b/src/python/lmi/base/singletonmixin.py new file mode 100644 index 0000000..c252676 --- /dev/null +++ b/src/python/lmi/base/singletonmixin.py @@ -0,0 +1,560 @@ +#pylint: disable-all +""" +A Python Singleton mixin class that makes use of some of the ideas +found at http://c2.com/cgi/wiki?PythonSingleton. Just inherit +from it and you have a singleton. No code is required in +subclasses to create singleton behavior -- inheritance from +Singleton is all that is needed. + +Singleton creation is threadsafe. + +USAGE: + +Just inherit from Singleton. If you need a constructor, include +an __init__() method in your class as you usually would. However, +if your class is S, you instantiate the singleton using S.get_instance() +instead of S(). Repeated calls to S.get_instance() return the +originally-created instance. + +For example: + +class S(Singleton): + + def __init__(self, a, b=1): + pass + +S1 = S.get_instance(1, b=3) + + +Most of the time, that's all you need to know. However, there are some +other useful behaviors. Read on for a full description: + +1) Getting the singleton: + + S.get_instance() + +returns the instance of S. If none exists, it is created. + +2) The usual idiom to construct an instance by calling the class, i.e. + + S() + +is disabled for the sake of clarity. + +For one thing, the S() syntax means instantiation, but get_instance() +usually does not cause instantiation. So the S() syntax would +be misleading. + +Because of that, if S() were allowed, a programmer who didn't +happen to notice the inheritance from Singleton (or who +wasn't fully aware of what a Singleton pattern +does) might think he was creating a new instance, +which could lead to very unexpected behavior. + +So, overall, it is felt that it is better to make things clearer +by requiring the call of a class method that is defined in +Singleton. An attempt to instantiate via S() will result +in a SingletonException being raised. + +3) Use __S.__init__() for instantiation processing, +since S.get_instance() runs S.__init__(), passing it the args it has received. + +If no data needs to be passed in at instantiation time, +you don't need S.__init__(). + +4) If S.__init__(.) requires parameters, include them ONLY in the +first call to S.get_instance(). If subsequent calls have arguments, +a SingletonException is raised by default. + +If you find it more convenient for subsequent calls to be allowed to +have arguments, but for those argumentsto be ignored, just include +'ignoreSubsequent = True' in your class definition, i.e.: + + class S(Singleton): + + ignoreSubsequent = True + + def __init__(self, a, b=1): + pass + +5) For testing, it is sometimes convenient for all existing singleton +instances to be forgotten, so that new instantiations can occur. For that +reason, a _forget_all_singletons() function is included. Just call + + _forget_all_singletons() + +and it is as if no earlier instantiations have occurred. + +6) As an implementation detail, classes that inherit +from Singleton may not have their own __new__ +methods. To make sure this requirement is followed, +an exception is raised if a Singleton subclass includ +es __new__. This happens at subclass instantiation +time (by means of the MetaSingleton metaclass. + + +By Gary Robinson, grobinson@flyfi.com. No rights reserved -- +placed in the public domain -- which is only reasonable considering +how much it owes to other people's code and ideas which are in the +public domain. The idea of using a metaclass came from +a comment on Gary's blog (see +http://www.garyrobinson.net/2004/03/python_singleto.html#comments). +Other improvements came from comments and email from other +people who saw it online. (See the blog post and comments +for further credits.) + +Not guaranteed to be fit for any particular purpose. Use at your +own risk. +""" + +import threading + +class SingletonException(Exception): + """ + Base exception related to singleton handling. + """ + pass + +_ST_SINGLETONS = set() +_LOCK_FOR_SINGLETONS = threading.RLock() +# Ensure only one instance of each Singleton class is created. This is not +# bound to the _LOCK_FOR_SINGLETON_CREATION = threading.RLock() individual +# Singleton class since we need to ensure that there is only one mutex for each +# Singleton class, which would require having a lock when setting up the +# Singleton class, which is what this is anyway. So, when any Singleton is +# created, we lock this lock and then we don't need to lock it again for that +# class. +_LOCK_FOR_SINGLETON_CREATION = threading.RLock() + +def _create_singleton_instance(cls, lst_args, dct_kw_args): + """ + Creates singleton instance and stores its class in set. + """ + _LOCK_FOR_SINGLETON_CREATION.acquire() + try: + if cls._is_instantiated(): # some other thread got here first + return + + instance = cls.__new__(cls) + try: + instance.__init__(*lst_args, **dct_kw_args) + except TypeError, exc: + if '__init__() takes' in exc.message: + raise SingletonException, ( + 'If the singleton requires __init__ args,' + ' supply them on first call to get_instance().') + else: + raise + cls.c_instance = instance + _add_singleton(cls) + finally: + _LOCK_FOR_SINGLETON_CREATION.release() + +def _add_singleton(cls): + """ + Adds class to singleton set. + """ + _LOCK_FOR_SINGLETONS.acquire() + try: + assert cls not in _ST_SINGLETONS + _ST_SINGLETONS.add(cls) + finally: + _LOCK_FOR_SINGLETONS.release() + +def _remove_singleton(cls): + """ + Removes class from singleton set. + """ + _LOCK_FOR_SINGLETONS.acquire() + try: + if cls in _ST_SINGLETONS: + _ST_SINGLETONS.remove(cls) + finally: + _LOCK_FOR_SINGLETONS.release() + +def _forget_all_singletons(): + ''' + This is useful in tests, since it is hard to know which singletons need + to be cleared to make a test work. + ''' + _LOCK_FOR_SINGLETONS.acquire() + try: + for cls in _ST_SINGLETONS.copy(): + cls._forget_class_instance_reference_for_testing() + + # Might have created some Singletons in the process of tearing down. + # Try one more time - there should be a limit to this. + i_num_singletons = len(_ST_SINGLETONS) + if len(_ST_SINGLETONS) > 0: + for cls in _ST_SINGLETONS.copy(): + cls._forget_class_instance_reference_for_testing() + i_num_singletons -= 1 + assert i_num_singletons == len(_ST_SINGLETONS), \ + 'Added a singleton while destroying ' + str(cls) + assert len(_ST_SINGLETONS) == 0, _ST_SINGLETONS + finally: + _LOCK_FOR_SINGLETONS.release() + +class MetaSingleton(type): + """ + Metaclass for Singleton base class. + """ + def __new__(mcs, str_name, tup_bases, dct): + if dct.has_key('__new__'): + raise SingletonException, 'Can not override __new__ in a Singleton' + return super(MetaSingleton, mcs).__new__( + mcs, str_name, tup_bases, dct) + + def __call__(cls, *lst_args, **dictArgs): + raise SingletonException, \ + 'Singletons may only be instantiated through get_instance()' + +class Singleton(object): + """ + Base class for all singletons. + """ + __metaclass__ = MetaSingleton + + def get_instance(cls, *lst_args, **dct_kw_args): + """ + Call this to instantiate an instance or retrieve the existing instance. + If the singleton requires args to be instantiated, include them the first + time you call get_instance. + """ + if cls._is_instantiated(): + if ( (lst_args or dct_kw_args) + and not hasattr(cls, 'ignoreSubsequent')): + raise SingletonException, ( + 'Singleton already instantiated, but get_instance()' + ' called with args.') + else: + _create_singleton_instance(cls, lst_args, dct_kw_args) + + return cls.c_instance #pylint: disable=E1101 + get_instance = classmethod(get_instance) + + def _is_instantiated(cls): + """ + Don't use hasattr(cls, 'c_instance'), because that screws things + up if there is a singleton that extends another singleton. + hasattr looks in the base class if it doesn't find in subclass. + """ + return 'c_instance' in cls.__dict__ + _is_instantiated = classmethod(_is_instantiated) + + # This can be handy for public use also + isInstantiated = _is_instantiated + + def _forget_class_instance_reference_for_testing(cls): + """ + This is designed for convenience in testing -- sometimes you + want to get rid of a singleton during test code to see what + happens when you call get_instance() under a new situation. + + To really delete the object, all external references to it + also need to be deleted. + """ + try: + if hasattr(cls.c_instance, '_prepare_to_forget_singleton'): + # tell instance to release anything it might be holding onto. + cls.c_instance._prepare_to_forget_singleton() + del cls.c_instance + _remove_singleton(cls) + except AttributeError: + # run up the chain of base classes until we find the one that has + # the instance and then delete it there + for base_class in cls.__bases__: + if issubclass(base_class, Singleton): + base_class._forget_class_instance_reference_for_testing() + _forget_class_instance_reference_for_testing = classmethod( + _forget_class_instance_reference_for_testing) + + +if __name__ == '__main__': + + import unittest + import time + + class SingletonMixinPublicTestCase(unittest.TestCase): + """ + TestCase for singleton class. + """ + def testReturnsSameObject(self): #pylint: disable=C0103 + """ + Demonstrates normal use -- just call get_instance and it returns a singleton instance + """ + + class Foo(Singleton): + """Singleton child class.""" + def __init__(self): + super(Foo, self).__init__() + + a1 = Foo.get_instance() + a2 = Foo.get_instance() + self.assertEquals(id(a1), id(a2)) + + def testInstantiateWithMultiArgConstructor(self):#pylint: disable=C0103 + """ + If the singleton needs args to construct, include them in the first + call to get instances. + """ + + class Bar(Singleton): + """Singleton child class.""" + + def __init__(self, arg1, arg2): + super(Bar, self).__init__() + self.arg1 = arg1 + self.arg2 = arg2 + + b1 = Bar.get_instance('arg1 value', 'arg2 value') + b2 = Bar.get_instance() + self.assertEquals(b1.arg1, 'arg1 value') + self.assertEquals(b1.arg2, 'arg2 value') + self.assertEquals(id(b1), id(b2)) + + def testInstantiateWithKeywordArg(self): + """ + Test instantiation with keyword arguments. + """ + + class Baz(Singleton): + """Singleton child class.""" + def __init__(self, arg1=5): + super(Baz, self).__init__() + self.arg1 = arg1 + + b1 = Baz.get_instance('arg1 value') + b2 = Baz.get_instance() + self.assertEquals(b1.arg1, 'arg1 value') + self.assertEquals(id(b1), id(b2)) + + def testTryToInstantiateWithoutNeededArgs(self): + """ + This tests, improper instantiation. + """ + + class Foo(Singleton): + """Singleton child class.""" + def __init__(self, arg1, arg2): + super(Foo, self).__init__() + self.arg1 = arg1 + self.arg2 = arg2 + + self.assertRaises(SingletonException, Foo.get_instance) + + def testPassTypeErrorIfAllArgsThere(self): + """ + Make sure the test for capturing missing args doesn't interfere + with a normal TypeError. + """ + class Bar(Singleton): + """Singleton child class.""" + def __init__(self, arg1, arg2): + super(Bar, self).__init__() + self.arg1 = arg1 + self.arg2 = arg2 + raise TypeError, 'some type error' + + self.assertRaises(TypeError, Bar.get_instance, 1, 2) + + def testTryToInstantiateWithoutGetInstance(self): + """ + Demonstrates that singletons can ONLY be instantiated through + get_instance, as long as they call Singleton.__init__ during + construction. + + If this check is not required, you don't need to call + Singleton.__init__(). + """ + + class A(Singleton): + def __init__(self): + super(A, self).__init__() + + self.assertRaises(SingletonException, A) + + def testDontAllowNew(self): + + def instantiatedAnIllegalClass(): + class A(Singleton): + def __init__(self): + super(A, self).__init__() + + def __new__(metaclass, str_name, tup_bases, dct): + return super(MetaSingleton, metaclass).__new__( + metaclass, str_name, tup_bases, dct) + + self.assertRaises(SingletonException, instantiatedAnIllegalClass) + + + def testDontAllowArgsAfterConstruction(self): + class B(Singleton): + + def __init__(self, arg1, arg2): + super(B, self).__init__() + self.arg1 = arg1 + self.arg2 = arg2 + + B.get_instance('arg1 value', 'arg2 value') + self.assertRaises(SingletonException, B, 'arg1 value', 'arg2 value') + + def test_forgetClassInstanceReferenceForTesting(self): + class A(Singleton): + def __init__(self): + super(A, self).__init__() + class B(A): + def __init__(self): + super(B, self).__init__() + + # check that changing the class after forgetting the instance + # produces an instance of the new class + a = A.get_instance() + assert a.__class__.__name__ == 'A' + A._forget_class_instance_reference_for_testing() + b = B.get_instance() + assert b.__class__.__name__ == 'B' + + # check that invoking the 'forget' on a subclass still deletes + # the instance + B._forget_class_instance_reference_for_testing() + a = A.get_instance() + B._forget_class_instance_reference_for_testing() + b = B.get_instance() + assert b.__class__.__name__ == 'B' + + def test_forgetAllSingletons(self): + # Should work if there are no singletons + _forget_all_singletons() + + class A(Singleton): + ciInitCount = 0 + def __init__(self): + super(A, self).__init__() + A.ciInitCount += 1 + + A.get_instance() + self.assertEqual(A.ciInitCount, 1) + + A.get_instance() + self.assertEqual(A.ciInitCount, 1) + + _forget_all_singletons() + A.get_instance() + self.assertEqual(A.ciInitCount, 2) + + def test_threadedCreation(self): + # Check that only one Singleton is created even if multiple threads + # try at the same time. If fails, would see assert in _add_singleton + class Test_Singleton(Singleton): + def __init__(self): + super(Test_Singleton, self).__init__() + + class Test_SingletonThread(threading.Thread): + def __init__(self, fTargetTime): + super(Test_SingletonThread, self).__init__() + self._fTargetTime = fTargetTime + self._eException = None + + def run(self): + try: + fSleepTime = self._fTargetTime - time.time() + if fSleepTime > 0: + time.sleep(fSleepTime) + Test_Singleton.get_instance() + except Exception, exc: + self._eException = exc + + fTargetTime = time.time() + 0.1 + lstThreads = [] + for _ in xrange(100): + t = Test_SingletonThread(fTargetTime) + t.start() + lstThreads.append(t) + eException = None + for t in lstThreads: + t.join() + if t._eException and not eException: + eException = t._eException + if eException: + raise eException + + def testNoInit(self): + """ + Demonstrates use with a class not defining __init__ + """ + + class A(Singleton): + pass + + #INTENTIONALLY UNDEFINED: + #def __init__(self): + # super(A, self).__init__() + + A.get_instance() #Make sure no exception is raised + + def testMultipleGetInstancesWithArgs(self): + + class A(Singleton): + + ignoreSubsequent = True + + def __init__(self, a, b=1): + pass + + a1 = A.get_instance(1) + # ignores the second call because of ignoreSubsequent + a2 = A.get_instance(2) + + class B(Singleton): + + def __init__(self, a, b=1): + pass + + b1 = B.get_instance(1) + # No ignoreSubsequent included + self.assertRaises(SingletonException, B.get_instance, 2) + + class C(Singleton): + + def __init__(self, a=1): + pass + + c1 = C.get_instance(a=1) + # No ignoreSubsequent included + self.assertRaises(SingletonException, C.get_instance, a=2) + + def testInheritance(self): + """ + It's sometimes said that you can't subclass a singleton (see, for instance, + http://steve.yegge.googlepages.com/singleton-considered-stupid point e). This + test shows that at least rudimentary subclassing works fine for us. + """ + + class A(Singleton): + + def set_x(self, x): + self.x = x + + def setZ(self, z): + raise NotImplementedError + + class B(A): + + def set_x(self, x): + self.x = -x + + def set_y(self, y): + self.y = y + + a = A.get_instance() + a.set_x(5) + b = B.get_instance() + b.set_x(5) + b.set_y(50) + self.assertEqual((a.x, b.x, b.y), (5, -5, 50)) + self.assertRaises(AttributeError, eval, 'a.set_y', {}, locals()) + self.assertRaises(NotImplementedError, b.setZ, 500) + + unittest.main() + diff --git a/src/python/lmi/common/BaseConfiguration.py b/src/python/lmi/common/BaseConfiguration.py deleted file mode 100644 index f54de03..0000000 --- a/src/python/lmi/common/BaseConfiguration.py +++ /dev/null @@ -1,267 +0,0 @@ -# Copyright (C) 2012 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Jan Safranek -# Authors: Michal Minar -# -*- coding: utf-8 -*- -""" -Module for BaseConfiguration class. - -BaseConfiguration --------------------- - -.. autoclass:: BaseConfiguration - :members: - -""" - -import ConfigParser -import logging -import os -import socket -from lmi.common.singletonmixin import Singleton - -def convert_value(section, option, convert_func, value): - """ - Return result of application of ``convert_func`` on value. - If the conversion failes, error is logged and ValueError is raised. - - :param section: (``str``) Section of configuration file. Used for - error message. - :param option: (``str``) Option of configuration file. Used for - error message. - :param convert_func: (``type``) Conversion function to apply on passed - value. - :param value: (``basestring``) Value to convert. - """ - if not isinstance(value, basestring): - raise TypeError("value must be a string") - try: - if convert_func is bool: - return value.lower() in ('1', 'y', 'yes', 'on', 'true') - if convert_func is str and isinstance(value, unicode): - return value.encode('utf-8') - if convert_func is unicode and isinstance(value, str): - return value.decode('utf-8') - return convert_func(value) - except ValueError as exc: - logging.getLogger(__name__).error( - 'failed to convert value of "[%s]%s: %s', section, option, - exc) - raise - -class BaseConfiguration(Singleton): - """ - OpenLMI configuration file. By default, it resides in - /etc/openlmi/${provider_prefix}/${provider_prefix}.conf. - - There should be only one instance of this class. - """ - - CONFIG_DIRECTORY_TEMPLATE = '/etc/openlmi/%(provider_prefix)s/' - CONFIG_FILE_PATH_TEMPLATE = \ - CONFIG_DIRECTORY_TEMPLATE + '%(provider_prefix)s.conf' - - PERSISTENT_PATH_TEMPLATE = '/var/lib/openlmi-%(provider_prefix)s/' - SETTINGS_DIR = 'settings/' - - DEFAULT_OPTIONS = { - 'Namespace' : 'root/cimv2', - 'SystemClassName' : 'Linux_ComputerSystem', - # Default logging level - "Level" : "ERROR", - 'DebugBlivet' : 'false', - 'Stderr' : 'false', - } - - @classmethod - def provider_prefix(cls): - """ - This is responsibility of a subclass. - - :rtype: (``string`) Prefix of providers in lowercase. For example - configuration class for storage providers would return "storage". - - Result is used to construct configuration paths. - """ - raise NotImplementedError - - @classmethod - def default_options(cls): - """ :rtype: (``dict``) Dictionary of default values. """ - return cls.DEFAULT_OPTIONS - - @classmethod - def config_directory(cls): - """ Base directory with configuration settings. """ - return cls.CONFIG_DIRECTORY_TEMPLATE % { - 'provider_prefix' : cls.provider_prefix() } - - @classmethod - def persistent_path(cls): - """ Base directory with persistent settings. """ - return cls.PERSISTENT_PATH_TEMPLATE % { - 'provider_prefix': cls.provider_prefix() } - - @classmethod - def config_file_path(cls): - """ File path of configuration file. """ - return cls.CONFIG_FILE_PATH_TEMPLATE % { - 'provider_prefix' : cls.provider_prefix() } - - @classmethod - def mandatory_sections(cls): - """ - Return list of sections, that must be present in configuration - file. If not present, they will be created in memory. - """ - return ['Log', 'CIM'] - - def __init__(self): - """ Initialize and load a configuration file.""" - self._listeners = set() - self.config = ConfigParser.SafeConfigParser( - defaults=self.default_options()) - self.load() - - def add_listener(self, callback): - """ - Add a callback, which will be called when configuration is updated. - The callback will be called with instance of this class as - parameter: - callback(config) - """ - self._listeners.add(callback) - - def remove_listener(self, callback): - """ - Remove previously registered callback. - """ - - self._listeners.remove(callback) - - def _call_listeners(self): - """ - Call all listeners that configuration has updated. - """ - for callback in self._listeners: - callback(self) - - def load(self): - """ - Load configuration from config file path. - The file does not need to exist. - """ - self.config.read(self.config_file_path()) - for section in self.mandatory_sections(): - if not self.config.has_section(section): - self.config.add_section(section) - self._call_listeners() - - @property - def namespace(self): - """ Return namespace of OpenLMI provider. """ - return self.config.get('CIM', 'Namespace') - - @property - def system_class_name(self): - """ Return SystemClassName of OpenLMI provider. """ - return self.config.get('CIM', 'SystemClassName') - - @property - def system_name(self): - """ Return SystemName of OpenLMI provider. """ - return socket.getfqdn() - - @property - def logging_level(self): - """ Return name of logging level in lower case. """ - return self.config.get('Log', 'Level').lower() - - @property - def stderr(self): - """ Return True if logging to stderr is enabled. """ - return self.config.getboolean('Log', 'Stderr') - - def file_path(self, section, option): - """ - Return absolute file path for requested option. - Relative path is converted to absolute one with config's directory - as a prefix. - """ - path = self.config.get(section, option) - if not os.path.isabs(path): - path = os.path.join(self.config_directory(), path) - return path - - def get_safe(self, section, option, convert=str, fallback=None, - *args, **kwargs): - """ - Get the configuration option value as specified type in a safe way. - Value is searched in this order: - config_file -> defaults_dict -> fallback - - :param section: (``str``) Section name of option. - :param option: (``str``) Option name. - :param convert: (``type``) Is a conversion function for obtained - value. If the value could not be converted, error message is - generated and ``fallback`` is returned. This function is not - applied to ``fallback`` value. Supported values are: - str, unicode, int ,float, long, bool - - :param fallback: Value returned, when section or option does not - exists and no default value is given, or when the obtained value - could not be converted by supplied function. - - All the other parameters are passed to the ``SafeConfigParser.get()`` - method. - """ - if not isinstance(section, basestring): - raise TypeError('section must be a string') - if not isinstance(option, basestring): - raise TypeError("option must be a string") - if not convert in (str, unicode, int, float, long, bool): - raise ValueError("unsupported type for conversion: %s:", - getattr(convert, '__name__', 'unknown')) - if ( not self.config.has_option(section, option) - and not option.lower() in self.default_options()): - logging.getLogger(__name__).warn( - 'no option value and no default supplied for "[%s]%s"', - section, option) - return fallback - try: - value = self.config.get(section, option, *args, **kwargs) - except ConfigParser.Error as exc: - logging.getLogger(__name__).error( - 'failed to get value of "[%s]%s": %s', section, option, - exc) - return fallback - try: - # first try to convert value from config - return convert_value(section, option, convert, value) - except ValueError as exc: - logging.getLogger(__name__).error( - 'failed to convert value of "[%s]%s: %s', section, option, - exc) - # if it failes, try the value from defaults - if ( option.lower() in self.default_options() - and self.default_options()[option.lower()] != value): - try: - return convert_value(section, option, convert, - self.default_options()[option.lower()]) - except ValueError: - pass # error is already logged, no more options left - return fallback diff --git a/src/python/lmi/common/IndicationManager.py b/src/python/lmi/common/IndicationManager.py deleted file mode 100644 index 0467227..0000000 --- a/src/python/lmi/common/IndicationManager.py +++ /dev/null @@ -1,758 +0,0 @@ -# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Jan Safranek -# Authors: Michal Minar -# -*- coding: utf-8 -*- -""" - .. autoclass:: IndicationManager - :members: -""" - -import pywbem -from Queue import Queue -import re -import socket -import threading - -import lmi.common.cmpi_logging as cmpi_logging -from lmi.common import singletonmixin - -RE_FILTER_NAME = re.compile(r'^(?Plmi:' - r'(?P[a-z0-9_]+):)(?P.*)$', re.IGNORECASE) - -FILTER_DEFAULTS = { - "SourceNamespace" : "root/cimv2", - "SourceNamespaces" : ["root/cimv2"], - "QueryLanguage" : "CIM:CQL" -} - -@cmpi_logging.trace_function -def enumerate_namespaces(ch): - """ - Return tuple ``([CIM_Namespace instance, ...], ns_interop)``. Where - first item is a list of object paths of all namespaces in broker and - the second is a name of namespace, where this information can be found. - - :param ch: CIMOM handle. - """ - nsclasses = ["CIM_Namespace", "__Namespace"] - namespaces = ['root/cimv2', 'root/PG_InterOp', 'Interop', - 'interop', 'root', 'root/interop'] - nspaths = [] - ns = None - for cls in nsclasses: - for ns in namespaces: - try: - nspaths = [nm for nm in ch.EnumerateInstanceNames(ns, cls)] - if nspaths: - break - except pywbem.CIMError as exc: - if exc[0] in ( - pywbem.CIM_ERR_INVALID_NAMESPACE, - pywbem.CIM_ERR_NOT_SUPPORTED, - pywbem.CIM_ERR_INVALID_CLASS): - pass - if exc[0] == pywbem.CIM_ERR_FAILED: - cmpi_logging.logger.error("EnumerateInstanceNames failed" - " for %s:%s: %s", ns, cls, str(exc)) - else: - raise - if nspaths: - break - if not nspaths: - cmpi_logging.logger.error("failed to enumerate namespaces") - ns = None - return (nspaths, ns) - -@cmpi_logging.trace_function -def find_ns_interop(ch): - """ - Return name of interop namespace, where ``CIM_IndicationFilter`` - class reside. - - :param ch: CIMOM handle. - """ - _, ns_interop = enumerate_namespaces(ch) - return ns_interop - -def make_filter_name(class_name, fltr_id): - """ - Return value for ``CIM_IndicationFilter.Name`` property. - """ - return "LMI:%s:%s" % (class_name, fltr_id) - -def parse_filter_name(name): - """ - Return tuple ``(class_name, filter_id)``. - - :param name: (``string``) Value of cim filter's *Name* property. - """ - match = RE_FILTER_NAME.match(name) - if not match: - raise ValueError('Could not parse filter name: "%s"' % name) - return (match.group("class_name"), match.group("filter_id")) - -@cmpi_logging.trace_function -def make_indication_filter_path(class_name, fltr_id, ns_interop): - """ - Return CIM_IndicationFilter instance path for given filter id. - - :param class_name: (``string``) *Scoped class* name. - :param fltr_id: (``string``) Filter name. - :param ns_interop: (``string``) Interop namespace. - """ - for arg in ('class_name', 'fltr_id', 'ns_interop'): - if not isinstance(locals()[arg], basestring): - raise TypeError("%s must be basestring" % arg) - cop = pywbem.CIMInstanceName("CIM_IndicationFilter", - namespace=ns_interop) - cop['CreationClassName'] = 'CIM_IndicationFilter' - cop['SystemCreationClassName'] = 'CIM_ComputerSystem' - cop['SystemName'] = socket.gethostname() - cop['Name'] = make_filter_name(class_name, fltr_id) - return cop - -@cmpi_logging.trace_function -def remove_cimom_filter(ch, fltr_path): - """ - Deletes instance of CIM_IndicationFilter installed at broker with all - referencing subscriptions. - - Returns list of subscription instace names, that were deleted. - - :param ch: CIMOM handle. - :param fltr_path: (``CIMInstanceName``) Path of ``CIM_IndicationFilter`` to - remove. - """ - if not isinstance(fltr_path, pywbem.CIMInstanceName): - raise TypeError("fltr_path must be a CIMInstanceName") - - referents = [] - for ref in ch.AssociatorNames(fltr_path, - role="Filter", - resultRole="Handler", - resultClass="CIM_IndicationSubscription"): - ch.DeleteInstance(ref) - referents.append(ref) - ch.DeleteInstance(fltr_path) - cmpi_logging.logger.debug('removed indication filter "%s" with %d' - ' referents', fltr_path["Name"], len(referents)) - return referents - -class IndicationManager(singletonmixin.Singleton): - """ - Using ``IndicationManager`` class - providers can send indications without bothering with handling of - indication subscriptions. - - Usage: - - 1. Subclass CIM_InstCreation and CIM_InstModification. - - 2. In your initialization routine, create one ``IndicationManager`` - instance. E.g. one for whole ``LMI_Storage`` may is enough. Like - this:: - - indication_manager = \ - IndicationManager.get_instance(env, "Storage", "root/cimv2") - - 3. Call ``indication_manager.add_filters()`` with all filters your - providers support for particular CIM class. This method can be called - multiple times. - For example:: - - filters = { - "JobPercentUpdated": { - "Query" : "SELECT * FROM CIM_InstModification WHERE" - " SourceInstance ISA LMI_StorageJob AND" - " SourceInstance.CIM_ConcreteJob::PercentComplete <>" - " PreviousInstance.CIM_ConcreteJob::PercentComplete", - "Description" : "Modification of Percentage Complete for" - " a Concrete Job.", - }, - "JobSucceeded": { - "Query" : "SELECT * FROM CIM_InstModification WHERE" - " SourceInstance ISA LMI_StorageJob AND" - " SourceInstance.CIM_ConcreteJob::JobState = " - " CIM_ConcreteJob.JobState#'Completed'", - "Description": "Modification of Operational Status for" - " a Concrete Job to 'Complete' and 'OK'.", - }, - #... other indications - } - instance_manager.add_filters("LMI_StorageJob", filters) - - First argument is a name of class to which indications apply. We'll call - it *Scoping class*. - - 4. In your provider module, implement indication functions like this:: - - def authorize_filter(env, fltr, ns, classes, owner): - indication_manager.authorize_filter(env, fltr, ns, classes, owner) - - def activate_filter (env, fltr, ns, classes, first_activation): - indication_manager.activate_filter(env, fltr, ns, classes, - first_activation) - - def deactivate_filter(env, fltr, ns, classes, last_activation): - indication_manager.deactivate_filter(env, fltr, ns, classes, - last_activation) - - def enable_indications(env): - indication_manager.enable_indications(env) - - def disable_indications(env): - indication_manager.disable_indications(env) - - From now on, the ``IndicationManager`` will track all subscribed filters. - You can query the ``indication_manager.is_subscribed()`` before you create - and send an indication. Use ``indication_manager.send_indication()`` - to send your indications. - - Only static (=preconfigured, read-only) indication filters are - supported. - - For user to use these preconfigured filters, they need to be installed - at broker as instances of ``CIM_IndicationFilter``. But since they can - not be guarded against removel by accident, this object provides a way - to reinstall them. But using this is not recomended, since it can upset - users. See :ref:`_update_context-label`. - - The supported filters must be passed to add_filters method. The filters - are passed as dictionary ``'filter_id' -> {dictionary 'IndicationFilter - property' -> 'value'}``. There must be at least ``Query`` property in - each filter, CQL is assumed. - - This helper automatically tracks which filters are subscribed. Provider - can query ``is_subscribed()`` to check, if filter with given - ``filter_id`` is subscribed before generating indications. - - The CMPI interface to send indications is complicated - - when an indication is send from CIMOM callback (e.g. ``get_instance``), - it must use current ``env`` parameter of the callback and it would be - tedious to pass it to ``IndicationManager`` each time. Therefore - ``IndicationManager`` creates its own thread, registers it at CIMOM - using ``PrepareAttachThread``/``AttachThread``. - - As side-effect, indication can be sent from any thread, there is no - need to call ``PrepareAttachThread``/``AttachThread``. - """ - SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity - - COMMAND_STOP = 1 # Command to the IndicationManager thread to stop. - - @cmpi_logging.trace_method - def __init__(self, env, nameprefix, namespace, ns_interop=None, - queue=None): - """ - Create new ``IndicationManager``. Usually only one instance - is necessary for one provider process. - - :param env: (``ProviderEnvironment``) Provider enviroment, taken - from CIMOM callback (e.g. ``get_providers()``). - :param nameprefix: (``string``) Prefix of your ``CIM_InstCreation`` - and ``CIM_InstModification`` subclasses, e.g. 'Storage' for - ``LMI_StorageInstCreation``. - :param namespace: (``string``) Namespace, which will be set to - outgoing indications instances. - :param ns_interop: (``string``) Namespace, where filters and - subscriptions are stored. - :param queue: Optional custom input queue with the same interface as - ``Queue.Queue``. - """ - - # { class_name : - # { filter_id : filter_properties - # , ... } - # } - self._filters = pywbem.NocaseDict() - self._enabled = False - # { (class_name, filter_id), ... } - self._subscribed_filters = set() - self._nameprefix = nameprefix - self._namespace = namespace - self._ns_interop = ns_interop - self._access_lock = threading.RLock() - self._env = env - - if queue is None: - queue = Queue() - self._queue = queue - # prepare indication thread - ch = env.get_cimom_handle() - new_broker = ch.PrepareAttachThread() - self._indication_sender = threading.Thread( - target=self._send_indications_loop, args=(new_broker,)) - self._indication_sender.daemon = False - self._indication_sender.start() - - @property - def enabled(self): - """ - Return a boolean saying, whether indication sending is enabled. - """ - with self._access_lock: - return self.enabled - - @property - def namespace(self): - """ - Return namespace of outgoing indication instances. - """ - return self._namespace - - @property - def nameprefix(self): - """ - Return prefix of indication class names. - """ - return self._nameprefix - - @property - def ns_interop(self): - """ - Return interop namespace name. - """ - with self._access_lock: - if self._ns_interop is None: - ch = self._env.get_cimom_handle() - self._ns_interop = find_ns_interop(ch) - cmpi_logging.logger.info('found interop namespace: %s', - self._ns_interop) - return self._ns_interop - - @property - def instcreation_classname(self): - """ - Return whole class name of InstCreation indication. - """ - return "LMI_" + self._nameprefix + "InstCreation" - - @property - def instmodification_classname(self): - """ - Return whole class name of InstModification indication. - """ - return "LMI_" + self._nameprefix + "InstModification" - - @property - def instdeletetion_classname(self): - """ - Return whole class name of InstDeletion indication. - """ - return "LMI_" + self._nameprefix + "InstDeletion" - - @cmpi_logging.trace_method - def _get_filter_inst(self, class_name, fltr_id): - """ - Return instance of CIM_IndicationFilter registered in CIMOM if any. - - :param class_name: (``string``) *Scoping class* name. - :param fltr_id: (``string``) Indication name. - """ - ch = self._env.get_cimom_handle() - cop = make_indication_filter_path(class_name, fltr_id, self.ns_interop) - try: - return ch.GetInstance(cop) - except pywbem.CIMError as exc: - if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND: - return None - raise - - @cmpi_logging.trace_method - def _ensure_cimom_has_filter(self, class_name, fltr_id): - """ - Ensures, that cimom has ``fltr_id`` filter registered as instance. - If it has, but the query differs it is recreated at broker. - - :param class_name: (``string``) *Scoping class* name. - :param fltr_id: (``string``) Indication name. - """ - inst = self._get_filter_inst(class_name, fltr_id) - ch = self._env.get_cimom_handle() - installed = inst is not None - referents = [] - if installed: - for prop_name, val in self._filters[class_name][fltr_id].items(): - if inst[prop_name] != val: - cmpi_logging.logger.info("filter \"%s\" is installed, but" - " its property \"%s\" has outdated value;" - " removing...", fltr_id, prop_name) - referents = remove_cimom_filter(ch, inst.path) - installed = False - if not installed: - if inst is not None: - path = inst.path - else: - path = make_indication_filter_path(class_name, fltr_id, - self.ns_interop) - inst = pywbem.CIMInstance(path.classname, path=path) - kwargs = FILTER_DEFAULTS.copy() - for key, val in path.keybindings.items(): - kwargs[key] = val - kwargs.update(self._filters[class_name][fltr_id]) - inst.update(kwargs) - try: - inst = ch.CreateInstance(inst) - cmpi_logging.logger.info("filter \"%s\" installed", fltr_id) - except pywbem.CIMError: - cmpi_logging.logger.exception( - "failed to install indication filter \"%s\"", - fltr_id) - if referents: - cmpi_logging.logger.debug('reinstalling %d filter' - ' subscriptions', len(referents)) - for ref in referents: - ch.CreateInstance(ref) - return inst - - @cmpi_logging.trace_method - def _get_matching_filter(self, query): - """ - Try to find matching filter properties in local ``_filters`` storage - and return it. ``None`` is returned if not found. - - Return a tuple ``(class_name, filter_id, filter_properties)``. - - :param query: (``string``) Is filter query. - """ - if not isinstance(query, basestring): - raise TypeError("query must be a string") - for clsname, fltrs in self._filters.iteritems(): - for fltr_id, props in fltrs.iteritems(): - if query == props["Query"]: - return (clsname, fltr_id, props) - return None - - @cmpi_logging.trace_method - def ensure_filters_installed(self, class_name=None, fltr_id=None): - """ - This function checks for existence of filters at broker. Filters - must be registered with this instance before the check can be done. - Without arguments all registered filters will be checked. - - :param class_name: (``string``) Name of *Scoped class* that reduces - searched filters. - :param fltr_id: (``string``) Indication name reducing filters that - will be checked. - """ - cls_to_check = self._filters.keys() - if class_name is not None: - cls_to_check = [class_name] - filters_to_check = list( - (c, f) - for c in cls_to_check - for f in self._filters[c].keys() - if fltr_id is None or fltr_id == f) - with self._access_lock: - try: - for clsname, fltr_id in filters_to_check: - self._ensure_cimom_has_filter(clsname, fltr_id) - cmpi_logging.logger.debug('filters installed') - return True - except pywbem.CIMError as exc: - if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED: - cmpi_logging.logger.error("filters could not be checked" - " for presence due to invalid context") - return False - raise - - @cmpi_logging.trace_method - def update_context(self, env): - """ - .. _update_context-label - - When ``IndicationManager`` is initialized upon provider initialization, - the conxet given does not contain any user credentials that are - needed for communication with broker. In order to check for filter's - existence at broker, this method needs to be called first with - context containing user's credentials. - - This needs to be called only once. - - **Note** that if you don't plan to check for filter's presence at - broker at runtime, you are not interested in this function. - """ - with self._access_lock: - self._env = env - - @cmpi_logging.trace_method - def add_filters(self, class_name, filters, ensure_installed=False): - """ - Add new filters to the helper. These filters will be allowed for - subscription. - - :param filters: (``dictionary filter_id -> filter properties``) - The filters. ``filter properties`` is dictionary - ``property_name -> value``, where at least ``Query`` property - must be set. ``Name`` property will be automatically created - as 'LMI::'. - :param ensure_installed: (``bool``) Whether to check for filter presence - at broker and install them if missing. **Note** That in order - for this to work, the context must be updated with user's - credentials. See :ref:`update_context-label`. - """ - with self._access_lock: - if not class_name in self._filters: - self._filters[class_name] = pywbem.NocaseDict() - self._filters[class_name].update(filters) - if ensure_installed: - self.ensure_filters_installed(class_name=class_name) - - @cmpi_logging.trace_method - def authorize_filter(self, _env, fltr, _class_name, _op, _owner): - """ - AuthorizeFilter callback from CIMOM. Call this method from appropriate - CIMOM callback - - It asks us to verify whether this filter is allowed. - - :param fltr: Contains the filter that must be authorized. - :param _class_name: (``String``) Contains the class name extracted - from the filter FROM clause. - :param _op: The name of the class for which monitoring is required. - Only the namespace part is set if className is a process indication. - :param _owner The owner argument is the destination owner. - """ - with self._access_lock: - res = self._get_matching_filter(fltr) - if res is not None: - self._subscribed_filters.add((res[0], res[1])) - cmpi_logging.logger.info("InstanceFilter %s: %s authorized", - make_filter_name(res[0], res[1]), fltr) - return True - return False - - @cmpi_logging.trace_method - def activate_filter(self, _env, fltr, _class_name, _class_path, - first_activation): - """ - ActivateFilter callback from CIMOM. Call this method from appropriate - CIMOM callback. - - It ask us to begin monitoring a resource. The function shall begin - monitoring the resource according to the filter express only. - - :param fltr: The filter argument contains the filter specification - for this subscription to become active. - :param _class_name: (``String``) The class name extracted from the filter - FROM clause. - :param _class_path: (``CIMInstanceName``) The name of the class for - which monitoring is required. Only the namespace part is set if - eventType is a process indication. - :param first_activation: (``bool``) Set to true if this is the first - filter for className. - """ - with self._access_lock: - if not first_activation: - return - res = self._get_matching_filter(fltr) - if res is not None: - self._subscribed_filters.add((res[0], res[1])) - cmpi_logging.logger.info("InstanceFilter %s: %s started", - make_filter_name(res[0], res[1]), fltr) - - @cmpi_logging.trace_method - def deactivate_filter(self, _env, fltr, _class_name, _class_path, - last_activation): - """ - DeactivateFilter callback from CIMOM. Call this method from appropriate - CIMOM callback. - - Informs us that monitoring using this filter should stop. - - :param fltr: The filter argument contains the filter specification for - this subscription to become active. - :param class_name: (``String``) The class name extracted from the filter - FROM clause. - :param class_path: (``CIMInstanceName``) class_path The name of the - class for which monitoring is required. Only the namespace part is - set if className is a process indication. - :last_activation: (``bool``) Set to true if this is the last filter for - className. - """ - with self._access_lock: - if not last_activation: - return - res = self._get_matching_filter(fltr) - if res is not None: - self._subscribed_filters.remove((res[0], res[1])) - cmpi_logging.logger.info("InstanceFilter %s: %s stopped", - make_filter_name(res[0], res[1]), fltr) - - @cmpi_logging.trace_method - def enable_indications(self, _env): - """ - EnableIndications callback from CIMOM. Call this method from - appropriate CIMOM callback. - - Tells us that indications can now be generated. The MB is now prepared - to process indications. The function is normally called by the MB after - having done its intialization and processing of persistent subscription - requests. - """ - with self._access_lock: - self._enabled = True - cmpi_logging.logger.info("Indications enabled") - - @cmpi_logging.trace_method - def disable_indications(self, _env): - """ - EnableIndications callback from CIMOM. Call this method from - appropriate CIMOM callback. - - Tells us that we should stop generating indications. MB will not accept - any indications until enabled again. The function is normally called - when the MB is shutting down indication services either temporarily or - permanently. - """ - with self._access_lock: - self._enabled = False - cmpi_logging.logger.info("Indications disabled") - - @cmpi_logging.trace_method - def send_indication(self, indication): - """ - Send indication to all subscribers. Call this method from appropriate - CIMOM callback. - """ - self._queue.put(indication) - - @cmpi_logging.trace_method - def send_instcreation(self, instance, filter_id): - """ - Send ``LMI_InstCreation`` indication with given instance. - - :param instance: (``CIMInstance``) The created instance. - :param filter_id: (``string``) The ID of registered filter which - corresponds to this indication. - """ - if not self.is_subscribed(instance.classname, filter_id): - return - path = pywbem.CIMInstanceName( - classname=self.instcreation_classname, - namespace=self.namespace) - ind = pywbem.CIMInstance( - self.instcreation_classname, - path=path) - ind['SourceInstance'] = instance - ind['SourceInstanceHost'] = socket.gethostname() - ind['SourceInstanceModelPath'] = str(instance.path) - ind['IndicationFilterName'] = make_filter_name( - instance.classname, filter_id) - ind['PerceivedSeverity'] = self.SEVERITY_INFO - - cmpi_logging.logger.info("Sending indication %s for %s" % - (ind["IndicationFilterName"], str(path))) - self.send_indication(ind) - - @cmpi_logging.trace_method - def send_instmodification(self, old_instance, new_instance, filter_id): - """ - Send ``LMI_InstModification`` indication with given - instance. - - :param old_instance: (``CIMInstance``) The instance before - modification. - :param new_instance: (``CIMInstance``) The instance after modification. - :param filter_id: (``string``) The ID of registered filter which - corresponds to this indication. - """ - if not self.is_subscribed(new_instance.classname, filter_id): - return - path = pywbem.CIMInstanceName( - classname=self.instmodification_classname, - namespace=self.namespace) - ind = pywbem.CIMInstance( - self.instcreation_classname, - path=path) - ind['SourceInstance'] = new_instance - ind['PreviousInstance'] = old_instance - ind['SourceInstanceHost'] = socket.gethostname() - ind['SourceInstanceModelPath'] = str(new_instance.path) - ind['IndicationFilterName'] = make_filter_name( - new_instance.classname, filter_id) - ind['PerceivedSeverity'] = self.SEVERITY_INFO - - cmpi_logging.logger.info("Sending indication %s for %s", - ind["IndicationFilterName"], str(path)) - self.send_indication(ind) - - @cmpi_logging.trace_method - def is_subscribed(self, class_name, fltr_id): - """ - Return True, if there is someone subscribed for given filter. - - :param class_name: (``string``) *Scoping class* name. - :param fltr_id: (``string``) ID of the filter to check. - """ - with self._access_lock: - if not self._enabled: - return False - if (class_name, fltr_id) in self._subscribed_filters: - return True - return False - - @cmpi_logging.trace_method - def is_registered(self, class_name, fltr_id): - """ - Return True, if filter id has been registered with current instance. - - :param class_name: (``string``) *Scoping class* name. - :param fltr_id: (``string``) ID of the filter to check. - """ - with self._access_lock: - return (class_name in self._filters - and fltr_id in self._filters[class_name]) - - def _send_indications_loop(self, broker): - """ - This method runs in its own thread. It just sends all enqueued - indications. - - :param broker: (``BrokerCIMOMHandle``) Handle of the CIMOM. - """ - broker.AttachThread() - while True: - command = self._queue.get() - - if isinstance(command, pywbem.CIMInstance) : - indication = command - cmpi_logging.logger.trace_info("Delivering indication %s" % - (str(indication.path))) - broker.DeliverIndication(self.namespace, indication) - - elif isinstance(command, int): - cmpi_logging.logger.trace_info("Received command %d", command) - if command == self.COMMAND_STOP: - if hasattr(self._queue, "task_done"): - self._queue.task_done() - break - - if hasattr(self._queue, "task_done"): - self._queue.task_done() - - cmpi_logging.logger.info("Stopped Indication thread.") - - @cmpi_logging.trace_method - def shutdown(self): - """ - Stop the thread. This method blocks until the thread is safely - destroyed. - """ - self._queue.put(self.COMMAND_STOP) - self._indication_sender.join() diff --git a/src/python/lmi/common/JobManager.py b/src/python/lmi/common/JobManager.py deleted file mode 100644 index 5ee59a9..0000000 --- a/src/python/lmi/common/JobManager.py +++ /dev/null @@ -1,1670 +0,0 @@ -# Copyright (C) 2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Jan Safranek -# -*- coding: utf-8 -*- -""" - Basic infrastructure for asynchronous jobs. All necessary CIM classes and - indications are implemented here. - - .. autoclass:: JobManager - :members: - - .. autoclass:: Job - :members: - - .. autoclass:: LMI_ConcreteJob - :members: - - .. autoclass:: LMI_OwningJobElement - :members: - - .. autoclass:: LMI_AffectedJobElement - :members: - - .. autoclass:: LMI_MethodResult - :members: - - .. autoclass:: LMI_AssociatedJobMethodResult - :members: -""" - -from datetime import datetime, timedelta -import threading -from Queue import Queue -import pywbem -import lmi.common.cmpi_logging as cmpi_logging -import lmi.common -from lmi.common.IndicationManager import IndicationManager -from pywbem.cim_provider2 import CIMProvider2 -import socket -import traceback - -@cmpi_logging.trace_function -def register_filters(job_clsname, indication_manager=None): - """ - This function registers static indication filters at IndicationManager. - It should be called upon provider's initialization. - - :param job_clsname: (``String``) CIM class name for asynchonous jobs. - Will be part of filter queries. - :param indication_manager: If not given, global instance will be obtained. - """ - if indication_manager is None: - ind_manager = IndicationManager.get_instance() - filters = {} - query_args = { - "classname" : job_clsname, - "prefix" : indication_manager.nameprefix - } - for fltr_id, fltr_props in JobManager.IND_FILTERS.items(): - filters[fltr_id] = fltr_props.copy() - filters[fltr_id]['Query'] = fltr_props['Query'] % query_args - indication_manager.add_filters(job_clsname, filters) - -# Too many instance attributes -# pylint: disable-msg=R0902 -class Job(object): - """ - Generic abstract class representing one CIM_ConcreteJob. - It remembers input and output arguments, affected ManagedElements and - owning ManagedElement (to be able to create associations to them) - and all CIM_ConcreteJob properties. - - Due to multiple threads processing the job, each job has its own - lock to guard its status changes. It is expected that number of jobs - is quite low. - """ - - DEFAULT_TIME_BEFORE_REMOVAL = 60 # in seconds - - STATE_QUEUED = 1 # Job has not started yet - STATE_RUNNING = 2 # Job is running - STATE_FINISHED_OK = 3 # Job finished OK - STATE_FAILED = 4 # Job finished with error - STATE_SUSPENDED = 5 # Job is queued and suspended - STATE_TERMINATED = 6 # Job was queued and terminated - - FINAL_STATES = [STATE_FINISHED_OK, STATE_FAILED, STATE_SUSPENDED, - STATE_TERMINATED] - - # There is no way how to suspend/terminate running job! - - @cmpi_logging.trace_method - def __init__(self, job_manager, job_name, input_arguments, - method_name, affected_elements, owning_element): - """ - Create new storage job. - - :param job_manager: (``JobManager``) Reference to ``JobManager``, which - will manage this job. - :param job_name: (``string``) User-friendly name of the job. - :param input_arguments: (``dictionary param_name -> param_value``) - Input arguments of the method, which spawned this job. - :param method_name: (``string``) Name of the CIM method, which spawned - this job. - :param affected_elements: (``array of CIMInstanceName``) List of - affected elements. ``LMI_AffectedJobElement`` association will be - created for them. - :param owning_element: (``CIMInstanceName``) Reference to service, which - spawned the job. ``LMI_OwningJobElement`` association will be - created for it. - """ - self.job_manager = job_manager - self.timer_manager = job_manager.timer_manager - - # Unique ID - self.the_id = job_manager.get_next_id() - - # User friendly name of the job - self.job_name = job_name - - # Dictionary of input arguments, 'parameter_name' -> 'parameter_value' - # The parameter value must be CIMProperty or something that can be - # assigned to it. - self.input_arguments = input_arguments - - # Dictionary of output arguments, 'parameter_name' -> 'parameter_value' - # The parameter value must be CIMProperty or something that can be - # assigned to it. - self.output_arguments = None - - # Method return value, as CIMProperty or something that can be - # assigned to it. - self.return_value = None - # Value of Job.ReturnValueType - self.return_value_type = None - - # Name of the method - self.method_name = method_name - - # Time when the job was created - self.time_submitted = datetime.utcnow() - - # Nr. of seconds before the job is removed when the job finishes - self.time_before_removal = self.DEFAULT_TIME_BEFORE_REMOVAL - - # If the job should be removed after completion - self.delete_on_completion = True - - self.percent_complete = 0 - - # State of the job - self.job_state = self.STATE_QUEUED - - # Last change of job state, wall clock time - self.clocktime_of_last_state_change = self.time_submitted - - # Duration of the job in RUNNING state (in seconds) - self.elapsed_time = None - - # When the job started (= switched to RUNNING), wall clock time - self.start_clocktime = None - # When the job started (= switched to RUNNING), monotonic clock time - self.start_monotime = None - # When the job finished (= switched from RUNNING), monotonic clock time - self.finish_monotime = None - - # Array of CIMInstanceNames of affected elements, so we can - # enumerate associations to them. - self.affected_elements = affected_elements - - # CIMInstanceName to owning element (service), so we can enumerate - # instances. - self.owning_element = owning_element - - # Timer used to delete the job after time_before_removal seconds - self.timer = None - - # CIMError with result code - self.error = None - - # internal lock to protect state changes from races - self._lock = threading.RLock() - - self._execute = None - self._execargs = None - self._execkwargs = None - self._cancel = None - self._cancelargs = None - self._cancelkwargs = None - - self._finished_event = threading.Event() - - @cmpi_logging.trace_method - def set_execute_action(self, callback, *args, **kwargs): - """ - Set callback, which will be called when the job is to be executed. It is - expected that the callback will take some time to execute. The callback - must change state of the job and set output parameters and error in a - thread-safe way, i.e. by calling ``finish_method()``. - - :param callback: (``function``) Reference to callback to call. - :param args, kwargs: All other parameters will be passed to the - callback. It is highly recommended to add reference to the job - to the callback. - """ - self._execute = callback - self._execargs = args - self._execkwargs = kwargs - - @cmpi_logging.trace_method - def set_cancel_action(self, callback, *args, **kwargs): - """ - Set callbacks, which will be called when the job is to be - cancelled. The callback must be quick, the job is already locked! - - :param callback: (``function``) Reference to callback to call. - :param args, kwargs: All other parameters will be passed to the - callback. It is highly recommended to add reference to the job - to the callback. - """ - self._cancel = callback - self._cancelargs = args - self._cancelkwargs = kwargs - - @cmpi_logging.trace_method - def finish_method(self, new_state, return_value=None, return_type=None, - output_arguments=None, error=None, affected_elements=None): - """ - Mark the job as finished, with given return value, output parameters and - error. - This method is thread-safe. - - :param new_state: (``Job.STATE_* value``) Resulting state of the job. - :param return_value: (``string``) Return value of the job, encoded - into string. Can be None when the job does not return any value. - :param return_type: (``Job.RetunValueType.* value``) Type of the return - value. Can be None when the job does not return any value. - :param output_arguments: (``dictionary param_name -> param_value``) - Output arguments of the job. Can be None when the job does not have - any output parameters. - :param error: (``CIMError``) Error raised by the job. Can be None, - when the job finished successfully. - :param affected_elements: (``array of CIMInstanceName``) New list of - affected elements to generate LMI_JobAffectedElement - association. If None, the old list, passed to constructor, remains - untouched. - """ - self.lock() - self.return_value = return_value - self.return_value_type = return_type - self.output_arguments = output_arguments - self.error = error - if affected_elements is not None: - self.affected_elements = affected_elements - self.change_state(new_state, 100) - self.unlock() - self._finished_event.set() - - @cmpi_logging.trace_method - def change_state(self, new_state, percent=None): - """ - Change state of a job. (Re-)calculate various times based on the state - change. Send indications as necessary. - This method is thread-safe. - - :param new_state: (``Job.STATE_* value``) New state of the job. - It can be the same as the previous state to indicate progress of - the job. - :param percent: (``int``)) Percent complete of the job. When None, - this valuu will be automatically calculated (in progress = 50%, - finished = 100%). - """ - self.lock() - - cmpi_logging.logger.debug("Job %s: %s changes state from %d to %d" - % (self.the_id, self.job_name, self.job_state, new_state)) - - # For sending indications - prev_instance = None - send_indication = False - indication_ids = [] - - if self.job_state != new_state: - # Remember to send indications - prev_instance = self.job_manager.get_job_instance(self) - send_indication = True - indication_ids.append(JobManager.IND_JOB_CHANGED) - - # Check if the job has just finished - if (self.job_state not in self.FINAL_STATES - and new_state in self.FINAL_STATES): - # Remember finish time - self.finish_clocktime = datetime.utcnow() - self.finish_monotime = self.timer_manager.now() - # Remember job execution time. - if self.start_monotime: - self.elapsed_time = self.finish_monotime \ - - self.start_monotime - # Send indication - if self.job_state == self.STATE_FAILED: - indication_ids.append(JobManager.IND_JOB_FAILED) - if self.job_state == self.STATE_SUSPENDED: - indication_ids.append(JobManager.IND_JOB_SUCCEEDED) - - # Check if the job has just started - if new_state == self.STATE_RUNNING: - self.start_clocktime = datetime.utcnow() - self.start_monotime = self.timer_manager.now() - - self.clocktime_of_last_state_change = datetime.utcnow() - self.job_state = new_state - - if percent is None: - # guess the percentage from status - if new_state == self.STATE_QUEUED: - percent = 0 - elif new_state == self.STATE_RUNNING: - percent = 50 - else: - percent = 100 - if self.percent_complete != percent: - # Remember to send indications - if not send_indication: - self.clocktime_of_last_state_change = datetime.utcnow() - prev_instance = self.job_manager.get_job_instance(self) - send_indication = True - indication_ids.append(JobManager.IND_JOB_PERCENT_UPDATED) - self.percent_complete = percent - - if send_indication: - current_instance = self.job_manager.get_job_instance(self) - self.job_manager.send_modify_indications( - prev_instance, current_instance, indication_ids) - - # start / update the timer if necesasry - self._restart_timer() - self.unlock() - - @cmpi_logging.trace_method - def _expire(self): - """ - Callback when a Job completes and time_before_removal second passed. - The job gets removed from its JobManager. - """ - cmpi_logging.logger.debug("Job %s: %s expired" - % (self.the_id, self.job_name)) - - self.job_manager.remove_job(self) - - @cmpi_logging.trace_method - def _restart_timer(self): - """ - Re-schedule timer for TimeBeforeRemoval because some property has - changed. - """ - if not self.job_state in self.FINAL_STATES: - return - - # Stop the old timer. - if self.timer: - self.timer.cancel() - self.timer = None - - # Start the new timer. - if self.delete_on_completion: - now = self.timer_manager.now() - passed = now - self.finish_monotime - timeout = self.time_before_removal - passed - if timeout <= 0: - # Just in case... - self._expire() - return - - cmpi_logging.logger.debug("Starting timer for job %s: '%s' for %f" - " seconds" % (self.the_id, self.job_name, timeout)) - self.timer = self.timer_manager.create_timer( - "Job " + self.job_name, - callback=self._expire) - self.timer.start(timeout) - - @cmpi_logging.trace_method - def lock(self): - """ - Lock internal mutex. Other threads will block on subsequent lock(). - The lock is recursive, i.e. can be called multiple times from - single thread. - """ - self._lock.acquire() - - @cmpi_logging.trace_method - def unlock(self): - """ Unlock internal mutex.""" - self._lock.release() - - @cmpi_logging.trace_method - def execute(self): - """ - Start executing the job. It calls the execute callback, set by - ``set_execute_action()``. - - job_state must be already set to STATE_RUNNING. - Any exception is translated to CIMError and appropriate state is set. - """ - try: - self._execute(*(self._execargs), **(self._execkwargs)) - except pywbem.CIMError, error: - cmpi_logging.logger.trace_warn("Job.execute caught an CIMError %s", - str(error)) - cmpi_logging.logger.trace_verbose("traceback: %s", - traceback.format_exc()) - self.finish_method(Job.STATE_FAILED, error=error) - except Exception, ex: - cmpi_logging.logger.trace_warn("Job.execute caught an Exception %s", - str(ex)) - cmpi_logging.logger.trace_verbose("traceback: %s", - traceback.format_exc()) - error = pywbem.CIMError(pywbem.CIM_ERR_FAILED, str(ex)) - self.finish_method(Job.STATE_FAILED, error=error) - - @cmpi_logging.trace_method - def cancel(self): - """ - Cancels queued action. The action must have not been started. - """ - self.change_state(self.STATE_TERMINATED) - if self._cancel: - self._cancel(*(self._cancelargs), **(self._cancelkwargs)) - self._finished_event.set() - - @cmpi_logging.trace_method - def get_name(self): - """ - Return CIMInstanceName of the job. - - :rtype: ``CIMInstanceName`` - """ - name = pywbem.CIMInstanceName( - classname=self.job_manager.job_classname, - namespace=self.job_manager.namespace, - keybindings={ - 'InstanceID': self.get_instance_id() - }) - return name - - @cmpi_logging.trace_method - def get_instance_id(self, classname=None): - """ - Return InstanceID. - - :param classname: (``string``) Optional classname to generate InstanceID - for different class, e.g. for LMI_MethodResult. - :rtype: ``string`` - """ - if classname is None: - classname = self.job_manager.job_classname - return 'LMI:' + classname + ':' + str(self.the_id) - - @cmpi_logging.trace_method - def get_pre_call(self): - """ - Return indication that describes the pre-execution values of the - job's invocation. - - :rtype: ``CIMInstance of CIM_InstMethodCall`` - """ - path = pywbem.CIMInstanceName( - classname="CIM_InstMethodCall", - keybindings={}, - host=socket.gethostname(), - namespace=self.job_manager.namespace) - inst = pywbem.CIMInstance( - classname="CIM_InstMethodCall", - path=path) - src_instance = self._get_cim_instance() - inst['SourceInstance'] = src_instance - inst['SourceInstanceModelPath'] = str(src_instance.path) - inst['MethodName'] = self.method_name - inst['MethodParameters'] = self.get_method_params( - '__MethodParameters', True, False) - inst['PreCall'] = True - return inst - - @cmpi_logging.trace_method - def get_cim_error(self): - """ - Return job error as CIMInstance of CIM_Error. - :returns: CIMInstance of CIM_Error - """ - path = pywbem.CIMInstanceName( - classname="CIM_Error", - host=socket.gethostname(), - namespace=self.job_manager.namespace) - err = pywbem.CIMInstance( - classname="CIM_Error", - path=path) - err['CIMStatusCode'] = pywbem.Uint32(self.error[0]) - err['Message'] = self.error[1] - return err - - @cmpi_logging.trace_method - def get_post_call(self): - """ - Return indication that describes the post-execution values of the - job's invocation. - - :rtype: ``CIMInstance of CIM_InstMethodCall`` - """ - path = pywbem.CIMInstanceName( - classname="CIM_InstMethodCall", - keybindings={}, - host=socket.gethostname(), - namespace=self.job_manager.namespace) - inst = pywbem.CIMInstance( - classname="CIM_InstMethodCall", - path=path) - - src_instance = self._get_cim_instance() - inst['SourceInstance'] = src_instance - inst['SourceInstanceModelPath'] = str(src_instance.path) - inst['MethodName'] = self.method_name - inst['MethodParameters'] = self.get_method_params( - '__MethodParameters', True, True) - inst['PreCall'] = False - - if self.return_value_type is not None: - inst['ReturnValueType'] = self.return_value_type - if self.return_value is not None: - inst['ReturnValue'] = str(self.return_value) - if self.error is not None: - err = self.get_cim_error() - inst['Error'] = [err, ] - return inst - - @cmpi_logging.trace_method - def _get_cim_instance(self): - """ - Return CIMInstance of this job. - - :rtype: CIMInstance - """ - return self.job_manager.get_job_instance(self) - - @cmpi_logging.trace_method - def get_method_params(self, class_name, include_input, include_output): - """ - Create a class of given name with all input or output parameters - of the asynchronous method. Typically used to assemble - CIM_ConcreteJob.JobInParameters or CIM_InstMethodCall.MethodParameters - values. - - :param class_name: (``string``) Name of the class to create. - :param input: (``boolean``) Whether input parameters should be - included in the returned class - :param output: (``boolean``) Whether output parameters should be - included in the returned class - :rtype: CIMInstance of the created class. - """ - # TODO: this is workaround for bug #920763, use class_name - # when it's fixed - clsname = "CIM_ManagedElement" - path = pywbem.CIMInstanceName( - classname=clsname, - namespace=self.job_manager.namespace) - inst = pywbem.CIMInstance(classname=clsname, path=path) - if include_input and self.input_arguments: - for (name, value) in self.input_arguments.iteritems(): - inst[name] = value - if include_output and self.output_arguments: - # overwrite any input parameter - for (name, value) in self.output_arguments.iteritems(): - inst[name] = value - return inst - - @cmpi_logging.trace_method - def wait_for_job(self, timeout=None): - """ - Block and wait until the job completes. - - :param timeout: (``float``) Number of seconds to wait for the job - to complete. - :rtype: ``bool`` - True, when the job is finished, False if the timeout - occurred. - """ - return self._finished_event.wait(timeout) - - # pylint: disable-msg=R0903 - class ReturnValueType(object): - """ CIM_InstMethodCall.ReturnValueType values.""" - Boolean = pywbem.Uint16(2) - String = pywbem.Uint16(3) - Char16 = pywbem.Uint16(4) - Uint8 = pywbem.Uint16(5) - Sint8 = pywbem.Uint16(6) - Uint16 = pywbem.Uint16(7) - Sint16 = pywbem.Uint16(8) - Uint32 = pywbem.Uint16(9) - Sint32 = pywbem.Uint16(10) - Uint64 = pywbem.Uint16(11) - Sint64 = pywbem.Uint16(12) - Datetime = pywbem.Uint16(13) - Real32 = pywbem.Uint16(14) - Real64 = pywbem.Uint16(15) - Reference = pywbem.Uint16(16) - -class JobManager(object): - """ - Container of all queued, running or finished ``LMI_ConcreteJobs``. - - Usage: - - 1. Create MOF file for these classes: - - * ``LMI_Job`` - - * ``LMI_MethodResult`` - - * ``LMI_AffectedJobElement`` - - * ``LMI_OwningJobElement`` - - * ``LMI_AssociatedJobMethodResult`` - - Where ```` is prefix of your classes, for example 'Storage' - - 2. During initialization, initialize ``TimerManager`` and create - ``JobManager``. - - 3. When needed. create new Job instance: - - 4. Set its execute callback using ``set_execute_action()``. This callback - will be called when the job is to be executed. It will be called in - context of ``JobManager`` worker thread! - - 5. Optionally, set cancel callback using ``set_execute_action()``. This - callback will be called when the job is still queued and is cancelled by - application. This callback will be called in context of CIMOM callback - and should be quick! - - 6. Enqueue the job using ``JobManager.add_job()`` method. - - 7. When your execute callback is called, you can optionally call - ``job.change_state()`` to update percentage of completion. - - 8. When your execute callback is finished, don't forget to set method - result using ``job.finish_method()``. - - * ``JobManager`` automatically sends all job-related indications. - * ``Job`` automatically tracks various timestamps. - * By default, the job automatically disappears after 60 seconds after it - finishes. Application may set ``DeleteOnCompletion`` and - ``TimeBeforeRemoval`` properties of ``LMI_Job`` to override this - timeout. - """ - - COMMAND_STOP = 1 - - IND_JOB_PERCENT_UPDATED = "PercentUpdated" - IND_JOB_SUCCEEDED = "Succeeded" - IND_JOB_FAILED = "Failed" - IND_JOB_CHANGED = "Changed" - IND_JOB_CREATED = "Created" - - IND_FILTERS = { - IND_JOB_PERCENT_UPDATED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::PercentComplete <> " - "PreviousInstance.CIM_ConcreteJob::PercentComplete", - "Description" : "Modification of Percentage Complete for a " - "Concrete Job.", - }, - IND_JOB_SUCCEEDED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::JobState = 17", - "Description": "Modification of Job State for a " - "Concrete Job to 'Complete'.", - }, - IND_JOB_FAILED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::JobState = 10", - "Description": "Modification of Job State for a " - "Concrete Job to 'Exception'.", - }, - IND_JOB_CHANGED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::JobState <> " - "PreviousInstance.CIM_ConcreteJob::JobState", - "Description": "Modification of Job State for a ConcreteJob.", - }, - IND_JOB_CREATED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstCreation WHERE " - "SourceInstance ISA %(classname)s", - "Description": "Creation of a ConcreteJob.", - }, - } - - @cmpi_logging.trace_method - def __init__(self, name, namespace, indication_manager, timer_manager): - """ - Initialize new Manager. It automatically registers all job-related - filters to indication_manager and starts a worker thread. - - :param name: (``string``) String with classname infix. For example - 'Storage' for ``LMI_StorageJob``, ``LMI_StorageJobMethodResult`` - etc. - :param namespace: (``string``) Namespace of all providers. - :param indication_manager: (``IndicationManager``): a manager where - indications and filters should be added. - :param timer_manager: (``TimerManager``): Timer manager instance. - """ - # List of all jobs. Dictionary job_id -> Job. - self.jobs = {} - # Queue of jobs scheduled to execute. - self.queue = Queue() - # Last created job_id. - self.last_instance_id = 0 - # Classname infix. - self.name = name - # CIMProvider2 instances for job classes. - self.providers = {} - self.namespace = namespace - self.indication_manager = indication_manager - self.timer_manager = timer_manager - - # Start the worker thread (don't forget to register it at CIMOM) - self.worker = threading.Thread(target=self._worker_main) - self.worker.daemon = False - self.worker.start() - - # Various classnames for job-related classes, with correct infixes. - self.job_classname = 'LMI_' + self.name + 'Job' - self.method_result_classname = "LMI_" + self.name + "MethodResult" - self.affected_classname = "LMI_Affected" + self.name + "JobElement" - self.owning_classname = "LMI_Owning" + self.name + "JobElement" - self.associated_result_classname = ('LMI_Associated' + self.name - + 'JobMethodResult') - self.indication_filter_classname = ('LMI_' + self.name - + 'JobIndicationFilter') - self.job_provider = None - self._add_indication_filters() - - @cmpi_logging.trace_method - def _add_indication_filters(self): - """ - Add all job-related ``IndicationFilters`` to indication manager. - """ - register_filters(self.job_classname, self.indication_manager) - - @cmpi_logging.trace_method - def get_providers(self): - """ - Get dictionary of providers for these classes: - - * ``LMI_Job`` - * ``LMI_MethodResult`` - * ``LMI_AffectedJobElement`` - * ``LMI_OwningJobElement`` - * ``LMI_AssociatedJobMethodResult`` - - :rtype: dictionary class_name -> CIMProvider2 - """ - - if not self.providers: - job_provider = LMI_ConcreteJob(self.job_classname, job_manager=self) - self.providers[self.job_classname] = job_provider - self.job_provider = job_provider - - provider = LMI_MethodResult( - self.method_result_classname, job_manager=self) - self.providers[self.method_result_classname] = provider - - provider = LMI_AffectedJobElement( - self.affected_classname, job_manager=self) - self.providers[self.affected_classname] = provider - - provider = LMI_OwningJobElement( - self.owning_classname, job_manager=self) - self.providers[self.owning_classname] = provider - - provider = LMI_AssociatedJobMethodResult( - self.owning_classname, job_manager=self) - self.providers[self.associated_result_classname] = provider - - return self.providers - - @cmpi_logging.trace_method - def add_job(self, job): - """ - Enqueue new job. Send indication when needed. - - :param job: (``Job``) A job to enqueue. - """ - cmpi_logging.logger.debug("Job %s: '%s' enqueued" - % (job.the_id, job.job_name)) - - self.jobs[job.the_id] = job - self.queue.put(job) - # send indication - if self.indication_manager.is_subscribed( - self.job_classname, self.IND_JOB_CREATED): - job_instance = self.get_job_instance(job) - self.indication_manager.send_instcreation( - job_instance, self.IND_JOB_CREATED) - - def send_modify_indications(self, prev_instance, current_instance, - indication_ids): - """ - Send InstModification. This is helper method called by ``Job`` when - needed. - - :param prev_instance: Instance of ``LMI_Job`` before it was - modified. - :param current_instance: Instance of ``LMI_Job`` after it was - modified. - """ - for _id in indication_ids: - self.indication_manager.send_instmodification(prev_instance, - current_instance, _id) - - @cmpi_logging.trace_method - def remove_job(self, job): - """ - Remove existing job. Note that jobs are removed automatically after a - timeout, providers should not call this method directly. - - :param job: (``Job``) Job to remove. - """ - cmpi_logging.logger.debug("Job %s: '%s' removed from queue." - % (job.the_id, job.job_name)) - del self.jobs[job.the_id] - # The job may still be in the queue! - # There is no way, how to remove it, it will be skipped by the - # worker thread. - - @cmpi_logging.trace_method - def get_job_for_instance_id(self, instance_id, classname=None): - """ - Return Job for given InstanceID or None when no such Job exist. - - :param instance_id: (``string``) InstanceID value to parse. - :param classname: (``string``) Optional classname to parse the - InstanceID (e.g. when parsing InstanceID of - ``LMI_MethodResult``). - :rtype: ``Job`` - """ - if classname is None: - classname = self.job_classname - the_id = lmi.common.parse_instance_id(instance_id, classname) - if not the_id.isdigit(): - return None - return self.jobs.get(the_id, None) - - @cmpi_logging.trace_method - def _worker_main(self): - """ - This is the main loop of the job queue. It just processes enqueued - jobs and never ends. - """ - cmpi_logging.logger.info("Started Job thread.") - while True: - command = self.queue.get() - if isinstance(command, Job): - # we need to protect from changes between checking state and - # setting new state - job = command - job.lock() - if job.job_state == Job.STATE_QUEUED: - # the job was not cancelled - job.change_state(Job.STATE_RUNNING) - job.unlock() - cmpi_logging.logger.info("Starting job %s: '%s'" % - (job.the_id, job.job_name)) - - job.execute() - if job.error: - cmpi_logging.logger.warn("Job %s: '%s' finished with error:" - " %s" % (job.the_id, job.job_name, str(job.error))) - else: - cmpi_logging.logger.info("Job %s: '%s' finished OK" % - (job.the_id, job.job_name)) - else: - # just skip suspended and terminated jobs - job.unlock() - - elif isinstance(command, int): - self.queue.task_done() - break - - self.queue.task_done() - - cmpi_logging.logger.info("Stopped Job thread.") - - @cmpi_logging.trace_method - def get_next_id(self): - """ - Return next unused job id. - - :rtype: string - """ - self.last_instance_id += 1 - return str(self.last_instance_id) - - @cmpi_logging.trace_method - def get_job_instance(self, job): - """ - Return CIMInstance for given job. - - :param job: (``Job``) - :rtype: ``CIMInstance`` - """ - path = pywbem.CIMInstanceName( - classname=self.job_classname, - keybindings={'InstanceID': job.get_instance_id()}, - host=socket.gethostname(), - namespace=self.namespace) - inst = pywbem.CIMInstance(classname=self.job_classname, path=path) - inst['InstanceID'] = job.get_instance_id() - return self.job_provider.get_instance(None, inst) - - @cmpi_logging.trace_method - def shutdown(self, timeout=1): - """ - Stop the thread. If a job is running, it may leave the job process - (mkfs, resize2fs, ...) and the worker thread (waiting for the process to - finish) still running. - - JobManager still needs Indication Manager and TimeManager working at - this point! - - :param timeout: Nr. of seconds to wait for the current job. Afterwards - the thread is abandoned, leaving the process still running. - """ - # Empty the queue, we don't want the worker to proceed with any other - # queued job. - while not self.queue.empty(): - queue.get(False) - queue.task_done() - - self.queue.put(self.COMMAND_STOP) - self.worker.join(timeout) - - # Cancel all running/suspended/queued jobs. - # This will send indications. - for job in self.jobs.itervalues(): - if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED, - Job.STATE_RUNNING): - job.cancel() - - if self.worker.isAlive(): - # There is no way, how to stop the thread in Python, so abandon it. - self.worker.daemon = True - self.indication_manager = None - self.timer_manager = None - - def can_shutdown(self): - """ - Return True, if there is no running Job. - """ - return self.queue.empty() - - -class LMI_ConcreteJob(CIMProvider2): - """ - Provider of LMI_ConcreteJob class or its subclass. - """ - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """ - Provider implementation of EnumerateInstances intrinsic method. - """ - model.path.update({'InstanceID': None}) - for job in self.job_manager.jobs.values(): - model['InstanceID'] = job.get_instance_id() - if keys_only: - yield model - else: - yield self.get_instance(env, model, job) - - @cmpi_logging.trace_method - def get_job_states(self, job): - """ - Return JobState and OperationalStatus property values. - - :param job: (``int``) Job.STATE_* value. - :rtype: tuple ``(JobState, OperationalStatus)`` values. - """ - if job.job_state == Job.STATE_QUEUED: - jobstate = self.Values.JobState.New - opstate = [self.Values.OperationalStatus.Dormant] - elif job.job_state == Job.STATE_RUNNING: - jobstate = self.Values.JobState.Running - opstate = [self.Values.OperationalStatus.OK] - elif job.job_state == Job.STATE_FINISHED_OK: - jobstate = self.Values.JobState.Completed - opstate = [self.Values.OperationalStatus.OK, - self.Values.OperationalStatus.Completed] - elif job.job_state == Job.STATE_SUSPENDED: - jobstate = self.Values.JobState.Suspended - opstate = [self.Values.OperationalStatus.OK] - elif job.job_state == Job.STATE_FAILED: - jobstate = self.Values.JobState.Exception - opstate = [self.Values.OperationalStatus.Error, - self.Values.OperationalStatus.Completed] - elif job.job_state == Job.STATE_TERMINATED: - jobstate = self.Values.JobState.Terminated - opstate = [self.Values.OperationalStatus.Stopped] - return jobstate, opstate - - @cmpi_logging.trace_method - # pylint: disable-msg=W0221 - def get_instance(self, env, model, job=None): - """ - Provider implementation of GetInstance intrinsic method. - """ - if not job: - instance_id = model['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - model['DeleteOnCompletion'] = job.delete_on_completion - model['Name'] = job.job_name - - # convert seconds to timedelta - seconds = job.time_before_removal - if seconds: - delta = timedelta(seconds=seconds) - model['TimeBeforeRemoval'] = pywbem.CIMDateTime(delta) - else: - model['TimeBeforeRemoval'] = pywbem.CIMProperty( - name='TimeBeforeRemoval', - value=None, - type='datetime') - - if job.clocktime_of_last_state_change: - model['TimeOfLastStateChange'] = pywbem.CIMDateTime( - job.clocktime_of_last_state_change) - else: - model['TimeOfLastStateChange'] = pywbem.CIMProperty( - name='TimeOfLastStateChange', - value=None, - type='datetime') - - if job.elapsed_time: - elapsed_time = timedelta(seconds=job.elapsed_time) - model['ElapsedTime'] = pywbem.CIMDateTime(elapsed_time) - else: - model['ElapsedTime'] = pywbem.CIMProperty( - name='ElapsedTime', - value=None, - type='datetime') - - model['Description'] = job.job_name - model['LocalOrUtcTime'] = self.Values.LocalOrUtcTime.UTC_Time - model['PercentComplete'] = pywbem.Uint16(job.percent_complete) - if job.start_clocktime: - model['StartTime'] = pywbem.CIMDateTime(job.start_clocktime) - else: - model['StartTime'] = pywbem.CIMProperty( - name='StartTime', - value=None, - type='datetime') - - if job.input_arguments: - model['JobInParameters'] = job.get_method_params( - "__JobInParameters", True, False) - - if job.job_state in Job.FINAL_STATES: - # assemble output parameters with return value - outparams = job.get_method_params("__JobOutParameters", False, True) - if job.return_value is not None: - outparams['__ReturnValue'] = job.return_value - model['JobOutParameters'] = outparams - - model['TimeSubmitted'] = pywbem.CIMDateTime(job.time_submitted) - # set correct state - jobstate, opstate = self.get_job_states(job) - model['JobState'] = jobstate - model['OperationalStatus'] = opstate - return model - - @cmpi_logging.trace_method - def set_instance(self, env, instance, modify_existing): - """Return a newly created or modified instance. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param instance: The new pywbem.CIMInstance. If modifying an existing - instance, the properties on this instance have been filtered by - the PropertyList from the request. - :param modify_existing: True if ModifyInstance, False if CreateInstance - - Return the new instance. The keys must be set on the new instance. - """ - if not modify_existing: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "Creation of Job instances is not supported.") - - job = self.job_manager.get_job_for_instance_id(instance['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - try: - job.lock() - restart_timer = False - - for (key, value) in instance.iteritems(): - if value is None: - continue - if key == 'DeleteOnCompletion': - job.delete_on_completion = value - restart_timer = True - elif key == 'TimeBeforeRemoval': - job.time_before_removal = value.total_seconds() - restart_timer = True - elif key == 'JobRunTimes': - if value != 1: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "JobRunTimes property is not supported.") - elif key == 'LocalOrUtcTime': - if value != self.Values.LocalOrUtcTime.UTC_Time: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "Setting of LocalOrUtcTime property is not" - " supported.") - else: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "Setting of %s property is not supported." % (key,)) - - if restart_timer: - job._restart_timer() - finally: - job.unlock() - return instance - - @cmpi_logging.trace_method - def delete_instance(self, env, instance_name): - """Delete an instance. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param instance_name: A pywbem.CIMInstanceName specifying the instance - to delete. - """ - job = self.job_manager.get_job_for_instance_id( - instance_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - if not job.job_status in Job.FINAL_STATES: - raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, - "Job has not finished.") - - self.job_manager.remove_job(job) - - @cmpi_logging.trace_method - def cim_method_geterrors(self, env, object_name): - """Implements LMI_StorageJob.GetErrors() - - If JobState is "Completed" and Operational Status is "Completed" - then no instance of CIM_Error is returned. - - If JobState is "Exception" then GetErrors may return intances of - CIM_Error related to the execution of the procedure or method invoked by - the job. - - If Operatational Status is not "OK" or "Completed" then - GetErrors may return CIM_Error instances related to the running of - the job. - - :param env: -- Provider Environment (pycimmb.ProviderEnvironment) - :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName - specifying the object on which the method GetErrors() - should be invoked. - - Output parameters: - - * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...)) - If the OperationalStatus on the Job is not "OK", then this - method will return one or more CIM Error instance(s). - Otherwise, when the Job is "OK", null is returned. - """ - job = self.job_manager.get_job_for_instance_id( - object_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - if job.error is None: - errors = [] - else: - err = job.get_cim_error() - errors = [err, ] - out_params = [ - pywbem.CIMParameter( - name='Errors', - value=errors, - type='instance', - is_array=True, - array_size=len(errors)) - ] - rval = self.Values.GetErrors.Success - - return (rval, out_params) - - @cmpi_logging.trace_method - def cim_method_requeststatechange(self, env, object_name, - param_requestedstate=None, - param_timeoutperiod=None): - """Implements LMI_StorageJob.RequestStateChange() - - Requests that the state of the job be changed to the value - specified in the RequestedState parameter. Invoking the - RequestStateChange method multiple times could result in earlier - requests being overwritten or lost. - - If 0 is returned, then the - task completed successfully. Any other return code indicates an - error condition. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName - specifying the object on which the method RequestStateChange() - should be invoked. - :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState) - RequestStateChange changes the state of a job. The possible - values are as follows: Start (2) changes the state to - \'Running\'. Suspend (3) stops the job temporarily. The - intention is to subsequently restart the job with \'Start\'. - It might be possible to enter the \'Service\' state while - suspended. (This is job-specific.) Terminate (4) stops the - job cleanly, saving data, preserving the state, and shutting - down all underlying processes in an orderly manner. Kill (5) - terminates the job immediately with no requirement to save - data or preserve the state. Service (6) puts the job into a - vendor-specific service state. It might be possible to restart - the job. - - :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime) - A timeout period that specifies the maximum amount of time that - the client expects the transition to the new state to take. - The interval format must be used to specify the TimeoutPeriod. - A value of 0 or a null parameter indicates that the client has - no time requirements for the transition. If this property - does not contain 0 or null and the implementation does not - support this parameter, a return code of \'Use Of Timeout - Parameter Not Supported\' must be returned. - """ - job = self.job_manager.get_job_for_instance_id( - object_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - try: - job.lock() - states = self.Values.RequestStateChange.RequestedState - retcodes = self.Values.RequestStateChange - if param_requestedstate == states.Suspend: - if job.job_state != Job.STATE_QUEUED: - # Can suspend only queued jobs - rval = retcodes.Invalid_State_Transition - else: - job.change_state(Job.STATE_SUSPENDED) - rval = retcodes.Completed_with_No_Error - - elif param_requestedstate == states.Terminate: - if job.job_state not in (Job.STATE_QUEUED, Job.STATE_SUSPENDED): - # Can terminate only queued or suspended jobs - rval = retcodes.Invalid_State_Transition - else: - job.cancel() - rval = retcodes.Completed_with_No_Error - - elif param_requestedstate == states.Start: - if job.job_state != Job.STATE_SUSPENDED: - # Can start only suspended jobs - rval = retcodes.Invalid_State_Transition - else: - job.change_state(Job.STATE_QUEUED) - # Enqueue the job again, it may be already processed - # (we might get the job in the queue twice, but - # we have only one worker thread so it won't collide). - self.job_manager.add_job(job) - rval = retcodes.Completed_with_No_Error - - else: - rval = retcodes.Invalid_State_Transition - finally: - job.unlock() - return (rval, []) - - @cmpi_logging.trace_method - def cim_method_killjob(self, env, object_name, - param_deleteonkill=None): - """Implements LMI_StorageJob.KillJob() """ - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) - - @cmpi_logging.trace_method - def cim_method_geterror(self, env, object_name): - """Implements LMI_StorageJob.GetError() - - GetError is deprecated because Error should be an array,not a - scalar. - - When the job is executing or has terminated without - error, then this method returns no CIM_Error instance. However, if - the job has failed because of some internal problem or because the - job has been terminated by a client, then a CIM_Error instance is - returned. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName - specifying the object on which the method GetError() - should be invoked. - - Output parameters: - - * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``) - If the OperationalStatus on the Job is not "OK", then this - method will return a CIM Error instance. Otherwise, when the - Job is "OK", null is returned. - """ - job = self.job_manager.get_job_for_instance_id( - object_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - if job.error is None: - error = pywbem.CIMParameter( - name='error', - value=None, - type='instance', - is_array=False) - else: - err = job.get_cim_error() - error = pywbem.CIMParameter( - name='error', - value=err, - type='instance') - rval = self.Values.GetError.Success - return (rval, [error]) - - class Values(object): - class JobState(object): - New = pywbem.Uint16(2) - Starting = pywbem.Uint16(3) - Running = pywbem.Uint16(4) - Suspended = pywbem.Uint16(5) - Shutting_Down = pywbem.Uint16(6) - Completed = pywbem.Uint16(7) - Terminated = pywbem.Uint16(8) - Killed = pywbem.Uint16(9) - Exception = pywbem.Uint16(10) - Service = pywbem.Uint16(11) - Query_Pending = pywbem.Uint16(12) - # DMTF_Reserved = 13..32767 - # Vendor_Reserved = 32768..65535 - - class LocalOrUtcTime(object): - Local_Time = pywbem.Uint16(1) - UTC_Time = pywbem.Uint16(2) - - class OperationalStatus(object): - Unknown = pywbem.Uint16(0) - Other = pywbem.Uint16(1) - OK = pywbem.Uint16(2) - Degraded = pywbem.Uint16(3) - Stressed = pywbem.Uint16(4) - Predictive_Failure = pywbem.Uint16(5) - Error = pywbem.Uint16(6) - Non_Recoverable_Error = pywbem.Uint16(7) - Starting = pywbem.Uint16(8) - Stopping = pywbem.Uint16(9) - Stopped = pywbem.Uint16(10) - In_Service = pywbem.Uint16(11) - No_Contact = pywbem.Uint16(12) - Lost_Communication = pywbem.Uint16(13) - Aborted = pywbem.Uint16(14) - Dormant = pywbem.Uint16(15) - Supporting_Entity_in_Error = pywbem.Uint16(16) - Completed = pywbem.Uint16(17) - Power_Mode = pywbem.Uint16(18) - Relocating = pywbem.Uint16(19) - # DMTF_Reserved = .. - # Vendor_Reserved = 0x8000.. - - class GetErrors(object): - Success = pywbem.Uint32(0) - Not_Supported = pywbem.Uint32(1) - Unspecified_Error = pywbem.Uint32(2) - Timeout = pywbem.Uint32(3) - Failed = pywbem.Uint32(4) - Invalid_Parameter = pywbem.Uint32(5) - Access_Denied = pywbem.Uint32(6) - # DMTF_Reserved = .. - # Vendor_Specific = 32768..65535 - - class GetError(object): - Success = pywbem.Uint32(0) - Not_Supported = pywbem.Uint32(1) - Unspecified_Error = pywbem.Uint32(2) - Timeout = pywbem.Uint32(3) - Failed = pywbem.Uint32(4) - Invalid_Parameter = pywbem.Uint32(5) - Access_Denied = pywbem.Uint32(6) - # DMTF_Reserved = .. - # Vendor_Specific = 32768..65535 - - class RequestStateChange(object): - Completed_with_No_Error = pywbem.Uint32(0) - Not_Supported = pywbem.Uint32(1) - Unknown_Unspecified_Error = pywbem.Uint32(2) - Can_NOT_complete_within_Timeout_Period = pywbem.Uint32(3) - Failed = pywbem.Uint32(4) - Invalid_Parameter = pywbem.Uint32(5) - In_Use = pywbem.Uint32(6) - # DMTF_Reserved = .. - Method_Parameters_Checked___Transition_Started = pywbem.Uint32(4096) - Invalid_State_Transition = pywbem.Uint32(4097) - Use_of_Timeout_Parameter_Not_Supported = pywbem.Uint32(4098) - Busy = pywbem.Uint32(4099) - # Method_Reserved = 4100..32767 - # Vendor_Specific = 32768..65535 - class RequestedState(object): - Start = pywbem.Uint16(2) - Suspend = pywbem.Uint16(3) - Terminate = pywbem.Uint16(4) - Kill = pywbem.Uint16(5) - Service = pywbem.Uint16(6) - # DMTF_Reserved = 7..32767 - # Vendor_Reserved = 32768..65535 - -class LMI_OwningJobElement(CIMProvider2): - """ Instrumentation of LMI_OwningJobElement class and its subclasses.""" - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def get_instance(self, env, model): - """Return an instance.""" - instance_id = model['OwnedElement']['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "OwnedElement not found.") - - if job.owning_element != model['OwningElement']: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "OwnedElement is not associated to OwningElement.") - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'OwnedElement': None, 'OwningElement': None}) - for job in self.job_manager.jobs.values(): - if job.owning_element: - model['OwnedElement'] = job.get_name() - model['OwningElement'] = job.owning_element - yield model - - @cmpi_logging.trace_method - def references(self, env, object_name, model, result_class_name, role, - result_role, keys_only): - """Instrument Associations.""" - ch = env.get_cimom_handle() - if ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super='CIM_ManagedElement') or \ - ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.job_classname): - return self.simple_refs(env, object_name, model, - result_class_name, role, result_role, keys_only) - -class LMI_AffectedJobElement(CIMProvider2): - """ Instrumentation of LMI_AffectedJobElement class and its subclasses.""" - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def get_instance(self, env, model): - """Return an instance.""" - instance_id = model['AffectingElement']['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "AffectingElement not found.") - - if job.affected_elements is None: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "The AffectingElement has no AffectedElement.") - if model['AffectedElement'] not in job.affected_elements: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "AffectedElement is not associated to AffectingElement.") - model['ElementEffects'] = [self.Values.ElementEffects.Unknown, ] - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'AffectingElement': None, 'AffectedElement': None}) - for job in self.job_manager.jobs.values(): - if job.affected_elements is None: - continue - for element in job.affected_elements: - model['AffectingElement'] = job.get_name() - model['AffectedElement'] = element - if keys_only: - yield model - else: - yield self.get_instance(env, model) - - @cmpi_logging.trace_method - def references(self, env, object_name, model, result_class_name, role, - result_role, keys_only): - """Instrument Associations.""" - ch = env.get_cimom_handle() - if ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super='CIM_ManagedElement') or \ - ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.job_classname): - return self.simple_refs(env, object_name, model, - result_class_name, role, result_role, keys_only) - - class Values(object): - class ElementEffects(object): - Unknown = pywbem.Uint16(0) - Other = pywbem.Uint16(1) - Exclusive_Use = pywbem.Uint16(2) - Performance_Impact = pywbem.Uint16(3) - Element_Integrity = pywbem.Uint16(4) - Create = pywbem.Uint16(5) - - -class LMI_MethodResult(CIMProvider2): - """Instrumentation of LMI_MethodResult class and its subclasses.""" - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - # pylint: disable-msg=W0221 - def get_instance(self, env, model, job=None): - """Return an instance.""" - if not job: - instance_id = model['InstanceID'] - job = self.job_manager.get_job_for_instance_id( - instance_id, self.classname) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - model['Description'] = job.job_name - if job.job_state in Job.FINAL_STATES: - model['PostCallIndication'] = pywbem.CIMProperty( - name='PostCallIndication', - value=job.get_post_call()) - else: - model['PostCallIndication'] = pywbem.CIMProperty( - name='PostCallIndication', - type='instance', - value=None) - model['PreCallIndication'] = pywbem.CIMProperty( - name='PreCallIndication', - value=job.get_pre_call()) - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'InstanceID': None}) - for job in self.job_manager.jobs.values(): - model['InstanceID'] = job.get_instance_id( - classname=self.classname) - if keys_only: - yield model - else: - yield self.get_instance(env, model, job) - -class LMI_AssociatedJobMethodResult(CIMProvider2): - """ - Instrumentation of LMI_AssociatedJobMethodResult class and its - subclasses. - """ - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def get_instance(self, env, model): - """Return an instance.""" - instance_id = model['Job']['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - expected_result_id = job.get_instance_id( - classname=self.job_manager.method_result_classname) - if model['JobParameters']['InstanceID'] != expected_result_id: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job is not associated to JobParameters.") - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'JobParameters': None, 'Job': None}) - for job in self.job_manager.jobs.values(): - if job.owning_element: - model['Job'] = job.get_name() - model['JobParameters'] = pywbem.CIMInstanceName( - classname=self.job_manager.method_result_classname, - namespace=self.job_manager.namespace, - keybindings={ - 'InstanceID': job.get_instance_id( - classname=self.job_manager.method_result_classname) - }) - yield model - - @cmpi_logging.trace_method - def references(self, env, object_name, model, result_class_name, role, - result_role, keys_only): - """Instrument Associations.""" - ch = env.get_cimom_handle() - if ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.method_result_classname) or \ - ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.job_classname): - return self.simple_refs(env, object_name, model, - result_class_name, role, result_role, keys_only) - diff --git a/src/python/lmi/common/TimerManager.py b/src/python/lmi/common/TimerManager.py deleted file mode 100644 index 01e53b8..0000000 --- a/src/python/lmi/common/TimerManager.py +++ /dev/null @@ -1,421 +0,0 @@ -# Copyright (C) 2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Jan Safranek -# -*- coding: utf-8 -*- -""" -Module with functionality to create timers, which can be used in CMPI providers. - -Default python threading.Timer is not suitable, because it creates thread -for each timer, which is inefficient. In addition, each such thread would need -to be registered at CIMOM to enable logging in timer callbacks. - -Usage: - -1. Initialize the TimerManager when your provider initializes! -Otherwise you may encounter weird exceptions. - -2. When any provider needs timer, create it using Time.create_timer() to create -Timer instance. - -3. Call Timer.start() to start the timer. It will call registered callback -when the timer expires. The callback is called in context of TimerManager -thread, which has enabled logging to CIMOM, i.e. the callback can log as usual. - -4. (optionally) cancel the timer before expiration using Timer.cancel(). -However, this does not guarantee that the timer callback won't be called - -it may be already being scheduled / called. - -.. autoclass:: TimerManager - :members: - -.. autoclass:: Timer - :members: - -.. autoclass:: MonotonicClock - :members: -""" - -import ctypes -import lmi.common.singletonmixin as singletonmixin -import threading -import Queue -import lmi.common.cmpi_logging as cmpi_logging - -class TimerException(Exception): - pass - -class MonotonicClock(object): - """ - Monotonic clock, represented by clock_gettime() and CLOCK_MONOTONIC. - This clock is not influenced by NTP or administrator setting time or date. - """ - CLOCK_MONOTONIC = ctypes.c_int(1) - - class timespec(ctypes.Structure): - _fields_ = [ - ("tv_sec", ctypes.c_long), - ("tv_nsec", ctypes.c_long)] - - def __init__(self): - libc = ctypes.CDLL("librt.so.1") - self._clock_gettime = libc.clock_gettime - - def now(self): - """ - Return current time, i.e. float representing seconds with precision up - to nanoseconds (depends on glibc). The actual value of current time is - meaningless, it can be used only to measure time differences. - - :returns: ``float`` with current time in seconds. - """ - t = MonotonicClock.timespec(0, 0) - ret = self._clock_gettime(self.CLOCK_MONOTONIC, ctypes.pointer(t)) - - if ret < 0: - raise TimerException("Cannot get clock time, clock_gettime() failed.") - return t.tv_sec + t.tv_nsec * 10 ** (-9) - -class Timer(object): - """ - A class representing a timer. A timer has a timeout and after the timeout, - given callback is called and the timer is deleted. - """ - - @cmpi_logging.trace_method - def __init__(self, timer_manager, name, callback=None, *args, **kwargs): - """ - Create a timer. If specified, given callback is registered. - The callback is called with *args and **kwargs. - - :param timer_manager: (``TimerManager)`` Instance of the timer manager - which will manage the timer. - :param name: (``string``) Name of the timer, used for logging. - :param callback: (``function``) Callback to call when the timer expires. - :param *args, **kwargs: Parameters of the callback. - """ - self._mgr = timer_manager - self._name = name - self._callback = callback - self._args = args - self._kwargs = kwargs - - cmpi_logging.logger.trace_info("Timer: Timer %s created" % name) - - @cmpi_logging.trace_method - def set_callback(self, callback, *args, **kwargs): - """ - Set callback to call when the timer expires. - - :param callback: (``function``) Callback to call when the timer expires. - :param *args, **kwargs: Parameters of the callback. - """ - self._callback = callback - self._args = args - self._kwargs = kwargs - - @cmpi_logging.trace_method - def start(self, timeout): - """ - Start the timer with given timeout. After the timeout, the registered - callback will be called. - - :param timeout: (``float``) Timeout in seconds. - """ - - self._timeout = timeout - now = self._mgr.now() - self._end_time = now + timeout - cmpi_logging.logger.trace_info( - "Timer: Timer %s started at %f for %f seconds" - % (self._name, now, self._timeout)) - self._mgr._add_timer(self) - - @cmpi_logging.trace_method - def cancel(self): - """ - Cancel the timer. This method does not guarantee that the callback won't - be called, the timer might be calling the callback right now, - """ - cmpi_logging.logger.trace_info("Timer: Timer %s cancelled" - % (self._name)) - self._mgr._remove_timer(self) - - @cmpi_logging.trace_method - def _expired(self, now): - """ - Returns True, if the timer is expired. - - :param now: (``float``) Current time, as returned by MonotonicClock.now(). - :returns: (``boolean``) ``True``, if the timer is expired. - """ - if self._end_time <= now: - cmpi_logging.logger.trace_info("Timer: Timer %s has expired" - % (self._name)) - return True - return False - - @cmpi_logging.trace_method - def _expire(self): - """ - Called when the timer expired. It calls the callback. - """ - cmpi_logging.logger.trace_info("Timer: Calling callback for timer %s" - % (self._name)) - self._callback(*self._args, **self._kwargs) - -class TimerManager(singletonmixin.Singleton): - """ - Manages set of timers. - - Python standard Timer class creates a thread for - - each timer, which is inefficient. This class uses only one thread, which - is registered at CIMOM, i.e. it can log as usual. - - This class is singleton, use TimerManager.get_instance() to get the - instance. - - Still, the singleton needs to be initialized with ProviderEnvironment to - enable logging in the timer thread. Use TimerManager.get_instance(env) in - you provider initialization. - """ - - # Commands to the timer thread - COMMAND_STOP = 1 - COMMAND_RESCHEDULE = 2 - - @cmpi_logging.trace_method - def __init__(self, env=None): - """ - Initialize new thread manager. - - :param env: (``ProviderEnvironment``) Environment to use for logging. - """ - self._clock = MonotonicClock() - self._lock = threading.RLock() - self._queue = Queue.Queue() - - # Array of timers. Assumption: nr. of timers is relatively small, - # i.e. hundreds at the worst. - self._timers = [] - - new_broker = None - if env: - broker = env.get_cimom_handle() - new_broker = broker.PrepareAttachThread() - - self._timer_thread = threading.Thread( - target=self._timer_loop, args=(new_broker,)) - self._timer_thread.daemon = False - self._timer_thread.start() - - def create_timer(self, name, callback=None, *args, **kwargs): - """ - Create new timer. If specified, given callback is registered. - The callback is called with *args and **kwargs. - - :param name: (``string``) Name of the timer, used for logging. - :param callback: (``function``) Callback to call when the timer expires. - :param *args, **kwargs: Parameters of the callback. - """ - return Timer(self, name, callback, *args, **kwargs) - - def _timer_loop(self, broker): - """ - TimerManager thread main loop. It waits for timeout of all timers - and calls their callbacks. - - :param broker: (``BrokerCIMOMHandle``) CIM broker handle, used for - logging. - """ - if broker: - broker.AttachThread() - cmpi_logging.logger.info("Started Timer thread.") - while True: - self._handle_expired() - timeout = self._find_timeout() - if timeout != 0: - # Wait for the timeout or any change in timers. - try: - command = self._queue.get(timeout=timeout) - self._queue.task_done() - if command == self.COMMAND_STOP: - break # stop the thread - # process COMMAND_RESCHEDULE in next loop - except Queue.Empty: - # Timeout has happened, ignore the exception. - pass - cmpi_logging.logger.info("Stopped Timer thread.") - - @cmpi_logging.trace_method - def _handle_expired(self): - """ - Finds all expired timers, calls their callback and removes them from - list of timers. - """ - - # Get list of expired timers. - with self._lock: - now = self.now() - cmpi_logging.logger.trace_info( - "Timer: Checking for expired, now=%f." % (now)) - expired = [t for t in self._timers if t._expired(now)] - - # Call the callbacks (unlocked!). - for t in expired: - t._expire() - - # Remove the timers (locked). - with self._lock: - for t in expired: - try: - cmpi_logging.logger.trace_info( - "Timer: Removing %s" % (t._name)) - self._timers.remove(t) - except ValueError: - # The timer has already been removed. - pass - - @cmpi_logging.trace_method - def _find_timeout(self): - """ - Return nearest timeout, in seconds (as float, i.e. subsecond timeout - is possible). If no timer is scheduled, None is returned. - If there are expired timers, 0 is returned. - - :returns: Positive ``float``: Nearest timeout. - :returns: ``0``: Some timer has expired. - :returns: ``None``: No timer is scheduled. - """ - with self._lock: - if not self._timers: - cmpi_logging.logger.trace_info( - "Timer: No timers scheduled, waiting forever.") - return None - closest = min(self._timers, key=lambda timer: timer._end_time) - now = self.now() - timeout = closest._end_time - now - if timeout > 0: - cmpi_logging.logger.trace_info( - "Timer: Waiting for %f seconds, now=%f." - % (timeout, now)) - return timeout - cmpi_logging.logger.trace_info( - "Timer: Some timer has already expired, no waiting.") - return 0 - - @cmpi_logging.trace_method - def _add_timer(self, timer): - """ - Adds timer to list of timers. The timer must be started, i.e. its - timeout must be nozero! - This is internal method called by Timer.start(). - - :param timer: (``Timer``) Timer to add. - """ - with self._lock: - self._timers.append(timer) - # Wake up the timer manager thread. - self._queue.put(self.COMMAND_RESCHEDULE) - cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name)) - - @cmpi_logging.trace_method - def _remove_timer(self, timer): - """ - Remove timer from list of timers. - This is internal method called by Timer.cancel(). - :param timer: (``Timer``) Timer to remove. - """ - with self._lock: - try: - self._timers.remove(timer) - except ValueError: - pass - # Wake up the timer manager thread. - self._queue.put(self.COMMAND_RESCHEDULE) - cmpi_logging.logger.trace_info("Timer: Timer %s removed" - % (timer._name)) - - def now(self): - """ - Return current time, not influenced by NTP or admin setting date or - time. The actual value of current time is meaningless, it can be used - only to measure time differences. - - :returns: ``float`` Current time, in seconds. - """ - return self._clock.now() - - @cmpi_logging.trace_method - def shutdown(self): - """ - Stop the thread. This method blocks until the thread is safely - destroyed. - """ - self._queue.put(self.COMMAND_STOP) - self._timer_thread.join() - -if __name__ == "__main__": - cmpi_logging.logger = cmpi_logging.CMPILogger("") - import time - - class Env(object): - def AttachThread(self): - pass - def PrepareAttachThread(self): - return self - def get_cimom_handle(self): - return self - - clock = MonotonicClock() - - start = clock.now() - time.sleep(0.5) - print "Clock 0.5:", clock.now() - start - - time.sleep(0.5) - print "Clock 1:", clock.now() - start - - mgr = TimerManager.get_instance(Env()) - - def callback(msg): - if callback.first: - t = mgr.create_timer("internal 0.5") - t.set_callback(callback, "internal 0.5") - t.start(0.5) - callback.first = False - - print clock.now(), msg - - callback.first = True - - t1 = mgr.create_timer("one second") - t1.set_callback(callback, "1") - t1.start(1) - t2 = mgr.create_timer("two seconds") - t2.set_callback(callback, "2") - t2.start(2) - t22 = mgr.create_timer("two seconds 2") - t22.set_callback(callback, "2 again") - t22.start(2) - t15 = mgr.create_timer("one+half seconds") - t15.set_callback(callback, "1.5") - t15.start(1.5) - - time.sleep(4) - - mgr.stop_thread() diff --git a/src/python/lmi/common/__init__.py b/src/python/lmi/common/__init__.py deleted file mode 100644 index baebcdb..0000000 --- a/src/python/lmi/common/__init__.py +++ /dev/null @@ -1,42 +0,0 @@ -# Software Management Providers -# -# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Michal Minar -# - -""" -Common utilities for OpenLMI python providers. -""" -def parse_instance_id(instance_id, classname=None): - """ - Parse InstanceID, check it has LMI:: format and return - the ID. Return None if the format is bad. - :param instance_id: (``string``) String to parse. - :param classname: (``string``) Name of class, whose InstanceID we parse. - If the classname is None, it won't be checked. - :returns: ``string`` with the ID. - """ - parts = instance_id.split(":", 2) - if len(parts) != 3: - return None - if parts[0] != "LMI": - return None - real_classname = parts[1] - if classname and real_classname.lower() != classname.lower(): - return None - return parts[2] diff --git a/src/python/lmi/common/cmpi_logging.py b/src/python/lmi/common/cmpi_logging.py deleted file mode 100644 index a97e4ab..0000000 --- a/src/python/lmi/common/cmpi_logging.py +++ /dev/null @@ -1,204 +0,0 @@ -# -*- Coding:utf-8 -*- -# -# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Jan Safranek - - -import logging -import inspect -import traceback - -TRACE_WARNING = logging.INFO - 1 -TRACE_INFO = logging.INFO - 2 -TRACE_VERBOSE = logging.DEBUG - -class CMPILogHandler(logging.Handler): - """ - A handler class, which sends log messages to CMPI log. - """ - - def __init__(self, cmpi_logger, *args, **kwargs): - self.cmpi_logger = cmpi_logger - super(CMPILogHandler, self).__init__(*args, **kwargs) - - def emit(self, record): - msg = self.format(record) - if record.levelno >= logging.ERROR: - self.cmpi_logger.log_error(msg) - elif record.levelno >= logging.WARNING: - self.cmpi_logger.log_warn(msg) - elif record.levelno >= logging.INFO: - self.cmpi_logger.log_info(msg) - elif record.levelno >= TRACE_WARNING: - self.cmpi_logger.trace_warn(record.filename, msg) - elif record.levelno >= TRACE_INFO: - self.cmpi_logger.trace_info(record.filename, msg) - elif record.levelno >= logging.DEBUG: - self.cmpi_logger.trace_verbose(record.filename, msg) - -class CMPILogger(logging.getLoggerClass()): - """ - A logger class, which adds trace_method level log methods. - """ - def trace_warn(self, msg, *args, **kwargs): - """ Log message with TRACE_WARNING severity. """ - self.log(TRACE_WARNING, msg, *args, **kwargs) - - def trace_info(self, msg, *args, **kwargs): - """ Log message with TRACE_INFO severity. """ - self.log(TRACE_INFO, msg, *args, **kwargs) - - def trace_verbose(self, msg, *args, **kwargs): - """ Log message with TRACE_VERBOSE severity. """ - self.log(TRACE_VERBOSE, msg, *args, **kwargs) - -logging.setLoggerClass(CMPILogger) - -def trace_method(func): - """ Decorator, trace entry and exit for a class method. """ - classname = inspect.getouterframes(inspect.currentframe())[1][3] - def helper_func(*args, **kwargs): - """ - Helper function, wrapping real function by trace_method decorator. - """ - logger.log(TRACE_VERBOSE, "Entering %s.%s", classname, func.__name__) - try: - ret = func(*args, **kwargs) - except Exception as exc: - if getattr(exc, "tb_printed", False) is False: - logger.exception("full traceback") - logger.log(TRACE_VERBOSE, "traceback: %s", - traceback.format_exc()) - exc.tb_printed = True - logger.log(TRACE_WARNING, "%s.%s threw exception %s", - classname, func.__name__, str(exc)) - raise - logger.log(TRACE_VERBOSE, "Exiting %s.%s", classname, func.__name__) - return ret - helper_func.__name__ = func.__name__ - helper_func.__doc__ = func.__doc__ - helper_func.__module__ = func.__module__ - return helper_func - -def trace_function(func): - """ Decorator, trace entry and exit for a function outside any class. """ - def helper_func(*args, **kwargs): - """ - Helper function, wrapping real function by trace_method decorator. - """ - logger.log(TRACE_VERBOSE, "Entering %s.%s", - func.__module__, func.__name__) - try: - ret = func(*args, **kwargs) - except Exception as exc: - if getattr(exc, "tb_printed", False) is False: - logger.exception("full traceback") - logger.log(TRACE_VERBOSE, "traceback: %s", - traceback.format_exc()) - exc.tb_printed = True - logger.log(TRACE_WARNING, "%s.%s threw exception %s", - func.__module__, func.__name__, str(exc)) - raise - logger.log(TRACE_VERBOSE, "Exiting %s.%s", - func.__module__, func.__name__) - return ret - helper_func.__name__ = func.__name__ - helper_func.__doc__ = func.__doc__ - helper_func.__module__ = func.__module__ - return helper_func - -class LogManager(object): - """ - Class, which takes care of CMPI logging. - There should be only one instance of this class and it should be - instantiated as soon as possible, even before reading a config. - The config file can be provided later by set_config call. - """ - FORMAT_STDERR = '%(levelname)s: %(message)s' - FORMAT_CMPI = '%(levelname)s: %(message)s' - - LOGGER_NAME = "lmi.storage" - - def __init__(self, env): - """ - Initialize logging. - """ - formatter = logging.Formatter(self.FORMAT_CMPI) - - self.cmpi_handler = CMPILogHandler(env.get_logger()) - self.cmpi_handler.setLevel(logging.DEBUG) - self.cmpi_handler.setFormatter(formatter) - - self.logger = logging.getLogger(self.LOGGER_NAME) - self.logger.addHandler(self.cmpi_handler) - self.logger.setLevel(logging.INFO) - - self.stderr_handler = None - self.config = None - - global logger # IGNORE:W0603 - logger = self.logger - logger.info("CMPI log started") - - @trace_method - def set_config(self, config): - """ - Set a configuration of logging. It applies its setting immediately - and also subscribes for configuration changes. - """ - self.config = config - config.add_listener(self._config_changed) - # apply the config - self._config_changed(config) - - @trace_method - def _config_changed(self, config): - """ - Apply changed configuration, i.e. start/stop sending to stderr - and set appropriate log level. - """ - if config.tracing: - self.logger.setLevel(logging.DEBUG) - else: - self.logger.setLevel(logging.INFO) - if config.stderr: - # start sending to stderr - if not self.stderr_handler: - # create stderr handler - formatter = logging.Formatter(self.FORMAT_STDERR) - self.stderr_handler = logging.StreamHandler() - self.stderr_handler.setLevel(logging.DEBUG) - self.stderr_handler.setFormatter(formatter) - self.logger.addHandler(self.stderr_handler) - self.logger.info("Started logging to stderr.") - else: - # stop sending to stderr - if self.stderr_handler: - self.logger.info("Stopped logging to stderr.") - self.logger.removeHandler(self.stderr_handler) - self.stderr_handler = None - - def destroy(self): - if self.stderr_handler: - self.logger.removeHandler(self.stderr_handler) - self.stderr_handler = None - self.logger.removeHandler(self.cmpi_handler) - self.cmpi_handler = None - self.config.remove_listener(self._config_changed) - -logger = None diff --git a/src/python/lmi/common/singletonmixin.py b/src/python/lmi/common/singletonmixin.py deleted file mode 100644 index c252676..0000000 --- a/src/python/lmi/common/singletonmixin.py +++ /dev/null @@ -1,560 +0,0 @@ -#pylint: disable-all -""" -A Python Singleton mixin class that makes use of some of the ideas -found at http://c2.com/cgi/wiki?PythonSingleton. Just inherit -from it and you have a singleton. No code is required in -subclasses to create singleton behavior -- inheritance from -Singleton is all that is needed. - -Singleton creation is threadsafe. - -USAGE: - -Just inherit from Singleton. If you need a constructor, include -an __init__() method in your class as you usually would. However, -if your class is S, you instantiate the singleton using S.get_instance() -instead of S(). Repeated calls to S.get_instance() return the -originally-created instance. - -For example: - -class S(Singleton): - - def __init__(self, a, b=1): - pass - -S1 = S.get_instance(1, b=3) - - -Most of the time, that's all you need to know. However, there are some -other useful behaviors. Read on for a full description: - -1) Getting the singleton: - - S.get_instance() - -returns the instance of S. If none exists, it is created. - -2) The usual idiom to construct an instance by calling the class, i.e. - - S() - -is disabled for the sake of clarity. - -For one thing, the S() syntax means instantiation, but get_instance() -usually does not cause instantiation. So the S() syntax would -be misleading. - -Because of that, if S() were allowed, a programmer who didn't -happen to notice the inheritance from Singleton (or who -wasn't fully aware of what a Singleton pattern -does) might think he was creating a new instance, -which could lead to very unexpected behavior. - -So, overall, it is felt that it is better to make things clearer -by requiring the call of a class method that is defined in -Singleton. An attempt to instantiate via S() will result -in a SingletonException being raised. - -3) Use __S.__init__() for instantiation processing, -since S.get_instance() runs S.__init__(), passing it the args it has received. - -If no data needs to be passed in at instantiation time, -you don't need S.__init__(). - -4) If S.__init__(.) requires parameters, include them ONLY in the -first call to S.get_instance(). If subsequent calls have arguments, -a SingletonException is raised by default. - -If you find it more convenient for subsequent calls to be allowed to -have arguments, but for those argumentsto be ignored, just include -'ignoreSubsequent = True' in your class definition, i.e.: - - class S(Singleton): - - ignoreSubsequent = True - - def __init__(self, a, b=1): - pass - -5) For testing, it is sometimes convenient for all existing singleton -instances to be forgotten, so that new instantiations can occur. For that -reason, a _forget_all_singletons() function is included. Just call - - _forget_all_singletons() - -and it is as if no earlier instantiations have occurred. - -6) As an implementation detail, classes that inherit -from Singleton may not have their own __new__ -methods. To make sure this requirement is followed, -an exception is raised if a Singleton subclass includ -es __new__. This happens at subclass instantiation -time (by means of the MetaSingleton metaclass. - - -By Gary Robinson, grobinson@flyfi.com. No rights reserved -- -placed in the public domain -- which is only reasonable considering -how much it owes to other people's code and ideas which are in the -public domain. The idea of using a metaclass came from -a comment on Gary's blog (see -http://www.garyrobinson.net/2004/03/python_singleto.html#comments). -Other improvements came from comments and email from other -people who saw it online. (See the blog post and comments -for further credits.) - -Not guaranteed to be fit for any particular purpose. Use at your -own risk. -""" - -import threading - -class SingletonException(Exception): - """ - Base exception related to singleton handling. - """ - pass - -_ST_SINGLETONS = set() -_LOCK_FOR_SINGLETONS = threading.RLock() -# Ensure only one instance of each Singleton class is created. This is not -# bound to the _LOCK_FOR_SINGLETON_CREATION = threading.RLock() individual -# Singleton class since we need to ensure that there is only one mutex for each -# Singleton class, which would require having a lock when setting up the -# Singleton class, which is what this is anyway. So, when any Singleton is -# created, we lock this lock and then we don't need to lock it again for that -# class. -_LOCK_FOR_SINGLETON_CREATION = threading.RLock() - -def _create_singleton_instance(cls, lst_args, dct_kw_args): - """ - Creates singleton instance and stores its class in set. - """ - _LOCK_FOR_SINGLETON_CREATION.acquire() - try: - if cls._is_instantiated(): # some other thread got here first - return - - instance = cls.__new__(cls) - try: - instance.__init__(*lst_args, **dct_kw_args) - except TypeError, exc: - if '__init__() takes' in exc.message: - raise SingletonException, ( - 'If the singleton requires __init__ args,' - ' supply them on first call to get_instance().') - else: - raise - cls.c_instance = instance - _add_singleton(cls) - finally: - _LOCK_FOR_SINGLETON_CREATION.release() - -def _add_singleton(cls): - """ - Adds class to singleton set. - """ - _LOCK_FOR_SINGLETONS.acquire() - try: - assert cls not in _ST_SINGLETONS - _ST_SINGLETONS.add(cls) - finally: - _LOCK_FOR_SINGLETONS.release() - -def _remove_singleton(cls): - """ - Removes class from singleton set. - """ - _LOCK_FOR_SINGLETONS.acquire() - try: - if cls in _ST_SINGLETONS: - _ST_SINGLETONS.remove(cls) - finally: - _LOCK_FOR_SINGLETONS.release() - -def _forget_all_singletons(): - ''' - This is useful in tests, since it is hard to know which singletons need - to be cleared to make a test work. - ''' - _LOCK_FOR_SINGLETONS.acquire() - try: - for cls in _ST_SINGLETONS.copy(): - cls._forget_class_instance_reference_for_testing() - - # Might have created some Singletons in the process of tearing down. - # Try one more time - there should be a limit to this. - i_num_singletons = len(_ST_SINGLETONS) - if len(_ST_SINGLETONS) > 0: - for cls in _ST_SINGLETONS.copy(): - cls._forget_class_instance_reference_for_testing() - i_num_singletons -= 1 - assert i_num_singletons == len(_ST_SINGLETONS), \ - 'Added a singleton while destroying ' + str(cls) - assert len(_ST_SINGLETONS) == 0, _ST_SINGLETONS - finally: - _LOCK_FOR_SINGLETONS.release() - -class MetaSingleton(type): - """ - Metaclass for Singleton base class. - """ - def __new__(mcs, str_name, tup_bases, dct): - if dct.has_key('__new__'): - raise SingletonException, 'Can not override __new__ in a Singleton' - return super(MetaSingleton, mcs).__new__( - mcs, str_name, tup_bases, dct) - - def __call__(cls, *lst_args, **dictArgs): - raise SingletonException, \ - 'Singletons may only be instantiated through get_instance()' - -class Singleton(object): - """ - Base class for all singletons. - """ - __metaclass__ = MetaSingleton - - def get_instance(cls, *lst_args, **dct_kw_args): - """ - Call this to instantiate an instance or retrieve the existing instance. - If the singleton requires args to be instantiated, include them the first - time you call get_instance. - """ - if cls._is_instantiated(): - if ( (lst_args or dct_kw_args) - and not hasattr(cls, 'ignoreSubsequent')): - raise SingletonException, ( - 'Singleton already instantiated, but get_instance()' - ' called with args.') - else: - _create_singleton_instance(cls, lst_args, dct_kw_args) - - return cls.c_instance #pylint: disable=E1101 - get_instance = classmethod(get_instance) - - def _is_instantiated(cls): - """ - Don't use hasattr(cls, 'c_instance'), because that screws things - up if there is a singleton that extends another singleton. - hasattr looks in the base class if it doesn't find in subclass. - """ - return 'c_instance' in cls.__dict__ - _is_instantiated = classmethod(_is_instantiated) - - # This can be handy for public use also - isInstantiated = _is_instantiated - - def _forget_class_instance_reference_for_testing(cls): - """ - This is designed for convenience in testing -- sometimes you - want to get rid of a singleton during test code to see what - happens when you call get_instance() under a new situation. - - To really delete the object, all external references to it - also need to be deleted. - """ - try: - if hasattr(cls.c_instance, '_prepare_to_forget_singleton'): - # tell instance to release anything it might be holding onto. - cls.c_instance._prepare_to_forget_singleton() - del cls.c_instance - _remove_singleton(cls) - except AttributeError: - # run up the chain of base classes until we find the one that has - # the instance and then delete it there - for base_class in cls.__bases__: - if issubclass(base_class, Singleton): - base_class._forget_class_instance_reference_for_testing() - _forget_class_instance_reference_for_testing = classmethod( - _forget_class_instance_reference_for_testing) - - -if __name__ == '__main__': - - import unittest - import time - - class SingletonMixinPublicTestCase(unittest.TestCase): - """ - TestCase for singleton class. - """ - def testReturnsSameObject(self): #pylint: disable=C0103 - """ - Demonstrates normal use -- just call get_instance and it returns a singleton instance - """ - - class Foo(Singleton): - """Singleton child class.""" - def __init__(self): - super(Foo, self).__init__() - - a1 = Foo.get_instance() - a2 = Foo.get_instance() - self.assertEquals(id(a1), id(a2)) - - def testInstantiateWithMultiArgConstructor(self):#pylint: disable=C0103 - """ - If the singleton needs args to construct, include them in the first - call to get instances. - """ - - class Bar(Singleton): - """Singleton child class.""" - - def __init__(self, arg1, arg2): - super(Bar, self).__init__() - self.arg1 = arg1 - self.arg2 = arg2 - - b1 = Bar.get_instance('arg1 value', 'arg2 value') - b2 = Bar.get_instance() - self.assertEquals(b1.arg1, 'arg1 value') - self.assertEquals(b1.arg2, 'arg2 value') - self.assertEquals(id(b1), id(b2)) - - def testInstantiateWithKeywordArg(self): - """ - Test instantiation with keyword arguments. - """ - - class Baz(Singleton): - """Singleton child class.""" - def __init__(self, arg1=5): - super(Baz, self).__init__() - self.arg1 = arg1 - - b1 = Baz.get_instance('arg1 value') - b2 = Baz.get_instance() - self.assertEquals(b1.arg1, 'arg1 value') - self.assertEquals(id(b1), id(b2)) - - def testTryToInstantiateWithoutNeededArgs(self): - """ - This tests, improper instantiation. - """ - - class Foo(Singleton): - """Singleton child class.""" - def __init__(self, arg1, arg2): - super(Foo, self).__init__() - self.arg1 = arg1 - self.arg2 = arg2 - - self.assertRaises(SingletonException, Foo.get_instance) - - def testPassTypeErrorIfAllArgsThere(self): - """ - Make sure the test for capturing missing args doesn't interfere - with a normal TypeError. - """ - class Bar(Singleton): - """Singleton child class.""" - def __init__(self, arg1, arg2): - super(Bar, self).__init__() - self.arg1 = arg1 - self.arg2 = arg2 - raise TypeError, 'some type error' - - self.assertRaises(TypeError, Bar.get_instance, 1, 2) - - def testTryToInstantiateWithoutGetInstance(self): - """ - Demonstrates that singletons can ONLY be instantiated through - get_instance, as long as they call Singleton.__init__ during - construction. - - If this check is not required, you don't need to call - Singleton.__init__(). - """ - - class A(Singleton): - def __init__(self): - super(A, self).__init__() - - self.assertRaises(SingletonException, A) - - def testDontAllowNew(self): - - def instantiatedAnIllegalClass(): - class A(Singleton): - def __init__(self): - super(A, self).__init__() - - def __new__(metaclass, str_name, tup_bases, dct): - return super(MetaSingleton, metaclass).__new__( - metaclass, str_name, tup_bases, dct) - - self.assertRaises(SingletonException, instantiatedAnIllegalClass) - - - def testDontAllowArgsAfterConstruction(self): - class B(Singleton): - - def __init__(self, arg1, arg2): - super(B, self).__init__() - self.arg1 = arg1 - self.arg2 = arg2 - - B.get_instance('arg1 value', 'arg2 value') - self.assertRaises(SingletonException, B, 'arg1 value', 'arg2 value') - - def test_forgetClassInstanceReferenceForTesting(self): - class A(Singleton): - def __init__(self): - super(A, self).__init__() - class B(A): - def __init__(self): - super(B, self).__init__() - - # check that changing the class after forgetting the instance - # produces an instance of the new class - a = A.get_instance() - assert a.__class__.__name__ == 'A' - A._forget_class_instance_reference_for_testing() - b = B.get_instance() - assert b.__class__.__name__ == 'B' - - # check that invoking the 'forget' on a subclass still deletes - # the instance - B._forget_class_instance_reference_for_testing() - a = A.get_instance() - B._forget_class_instance_reference_for_testing() - b = B.get_instance() - assert b.__class__.__name__ == 'B' - - def test_forgetAllSingletons(self): - # Should work if there are no singletons - _forget_all_singletons() - - class A(Singleton): - ciInitCount = 0 - def __init__(self): - super(A, self).__init__() - A.ciInitCount += 1 - - A.get_instance() - self.assertEqual(A.ciInitCount, 1) - - A.get_instance() - self.assertEqual(A.ciInitCount, 1) - - _forget_all_singletons() - A.get_instance() - self.assertEqual(A.ciInitCount, 2) - - def test_threadedCreation(self): - # Check that only one Singleton is created even if multiple threads - # try at the same time. If fails, would see assert in _add_singleton - class Test_Singleton(Singleton): - def __init__(self): - super(Test_Singleton, self).__init__() - - class Test_SingletonThread(threading.Thread): - def __init__(self, fTargetTime): - super(Test_SingletonThread, self).__init__() - self._fTargetTime = fTargetTime - self._eException = None - - def run(self): - try: - fSleepTime = self._fTargetTime - time.time() - if fSleepTime > 0: - time.sleep(fSleepTime) - Test_Singleton.get_instance() - except Exception, exc: - self._eException = exc - - fTargetTime = time.time() + 0.1 - lstThreads = [] - for _ in xrange(100): - t = Test_SingletonThread(fTargetTime) - t.start() - lstThreads.append(t) - eException = None - for t in lstThreads: - t.join() - if t._eException and not eException: - eException = t._eException - if eException: - raise eException - - def testNoInit(self): - """ - Demonstrates use with a class not defining __init__ - """ - - class A(Singleton): - pass - - #INTENTIONALLY UNDEFINED: - #def __init__(self): - # super(A, self).__init__() - - A.get_instance() #Make sure no exception is raised - - def testMultipleGetInstancesWithArgs(self): - - class A(Singleton): - - ignoreSubsequent = True - - def __init__(self, a, b=1): - pass - - a1 = A.get_instance(1) - # ignores the second call because of ignoreSubsequent - a2 = A.get_instance(2) - - class B(Singleton): - - def __init__(self, a, b=1): - pass - - b1 = B.get_instance(1) - # No ignoreSubsequent included - self.assertRaises(SingletonException, B.get_instance, 2) - - class C(Singleton): - - def __init__(self, a=1): - pass - - c1 = C.get_instance(a=1) - # No ignoreSubsequent included - self.assertRaises(SingletonException, C.get_instance, a=2) - - def testInheritance(self): - """ - It's sometimes said that you can't subclass a singleton (see, for instance, - http://steve.yegge.googlepages.com/singleton-considered-stupid point e). This - test shows that at least rudimentary subclassing works fine for us. - """ - - class A(Singleton): - - def set_x(self, x): - self.x = x - - def setZ(self, z): - raise NotImplementedError - - class B(A): - - def set_x(self, x): - self.x = -x - - def set_y(self, y): - self.y = y - - a = A.get_instance() - a.set_x(5) - b = B.get_instance() - b.set_x(5) - b.set_y(50) - self.assertEqual((a.x, b.x, b.y), (5, -5, 50)) - self.assertRaises(AttributeError, eval, 'a.set_y', {}, locals()) - self.assertRaises(NotImplementedError, b.setZ, 500) - - unittest.main() - diff --git a/src/python/lmi/providers/IndicationManager.py b/src/python/lmi/providers/IndicationManager.py new file mode 100644 index 0000000..0b0e132 --- /dev/null +++ b/src/python/lmi/providers/IndicationManager.py @@ -0,0 +1,758 @@ +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek +# Authors: Michal Minar +# -*- coding: utf-8 -*- +""" + .. autoclass:: IndicationManager + :members: +""" + +import pywbem +from Queue import Queue +import re +import socket +import threading + +from lmi.base import singletonmixin +from lmi.providers import cmpi_logging + +RE_FILTER_NAME = re.compile(r'^(?Plmi:' + r'(?P[a-z0-9_]+):)(?P.*)$', re.IGNORECASE) + +FILTER_DEFAULTS = { + "SourceNamespace" : "root/cimv2", + "SourceNamespaces" : ["root/cimv2"], + "QueryLanguage" : "CIM:CQL" +} + +@cmpi_logging.trace_function +def enumerate_namespaces(ch): + """ + Return tuple ``([CIM_Namespace instance, ...], ns_interop)``. Where + first item is a list of object paths of all namespaces in broker and + the second is a name of namespace, where this information can be found. + + :param ch: CIMOM handle. + """ + nsclasses = ["CIM_Namespace", "__Namespace"] + namespaces = ['root/cimv2', 'root/PG_InterOp', 'Interop', + 'interop', 'root', 'root/interop'] + nspaths = [] + ns = None + for cls in nsclasses: + for ns in namespaces: + try: + nspaths = [nm for nm in ch.EnumerateInstanceNames(ns, cls)] + if nspaths: + break + except pywbem.CIMError as exc: + if exc[0] in ( + pywbem.CIM_ERR_INVALID_NAMESPACE, + pywbem.CIM_ERR_NOT_SUPPORTED, + pywbem.CIM_ERR_INVALID_CLASS): + pass + if exc[0] == pywbem.CIM_ERR_FAILED: + cmpi_logging.logger.error("EnumerateInstanceNames failed" + " for %s:%s: %s", ns, cls, str(exc)) + else: + raise + if nspaths: + break + if not nspaths: + cmpi_logging.logger.error("failed to enumerate namespaces") + ns = None + return (nspaths, ns) + +@cmpi_logging.trace_function +def find_ns_interop(ch): + """ + Return name of interop namespace, where ``CIM_IndicationFilter`` + class reside. + + :param ch: CIMOM handle. + """ + _, ns_interop = enumerate_namespaces(ch) + return ns_interop + +def make_filter_name(class_name, fltr_id): + """ + Return value for ``CIM_IndicationFilter.Name`` property. + """ + return "LMI:%s:%s" % (class_name, fltr_id) + +def parse_filter_name(name): + """ + Return tuple ``(class_name, filter_id)``. + + :param name: (``string``) Value of cim filter's *Name* property. + """ + match = RE_FILTER_NAME.match(name) + if not match: + raise ValueError('Could not parse filter name: "%s"' % name) + return (match.group("class_name"), match.group("filter_id")) + +@cmpi_logging.trace_function +def make_indication_filter_path(class_name, fltr_id, ns_interop): + """ + Return CIM_IndicationFilter instance path for given filter id. + + :param class_name: (``string``) *Scoped class* name. + :param fltr_id: (``string``) Filter name. + :param ns_interop: (``string``) Interop namespace. + """ + for arg in ('class_name', 'fltr_id', 'ns_interop'): + if not isinstance(locals()[arg], basestring): + raise TypeError("%s must be basestring" % arg) + cop = pywbem.CIMInstanceName("CIM_IndicationFilter", + namespace=ns_interop) + cop['CreationClassName'] = 'CIM_IndicationFilter' + cop['SystemCreationClassName'] = 'CIM_ComputerSystem' + cop['SystemName'] = socket.gethostname() + cop['Name'] = make_filter_name(class_name, fltr_id) + return cop + +@cmpi_logging.trace_function +def remove_cimom_filter(ch, fltr_path): + """ + Deletes instance of CIM_IndicationFilter installed at broker with all + referencing subscriptions. + + Returns list of subscription instace names, that were deleted. + + :param ch: CIMOM handle. + :param fltr_path: (``CIMInstanceName``) Path of ``CIM_IndicationFilter`` to + remove. + """ + if not isinstance(fltr_path, pywbem.CIMInstanceName): + raise TypeError("fltr_path must be a CIMInstanceName") + + referents = [] + for ref in ch.AssociatorNames(fltr_path, + role="Filter", + resultRole="Handler", + resultClass="CIM_IndicationSubscription"): + ch.DeleteInstance(ref) + referents.append(ref) + ch.DeleteInstance(fltr_path) + cmpi_logging.logger.debug('removed indication filter "%s" with %d' + ' referents', fltr_path["Name"], len(referents)) + return referents + +class IndicationManager(singletonmixin.Singleton): + """ + Using ``IndicationManager`` class + providers can send indications without bothering with handling of + indication subscriptions. + + Usage: + + 1. Subclass CIM_InstCreation and CIM_InstModification. + + 2. In your initialization routine, create one ``IndicationManager`` + instance. E.g. one for whole ``LMI_Storage`` may is enough. Like + this:: + + indication_manager = \ + IndicationManager.get_instance(env, "Storage", "root/cimv2") + + 3. Call ``indication_manager.add_filters()`` with all filters your + providers support for particular CIM class. This method can be called + multiple times. + For example:: + + filters = { + "JobPercentUpdated": { + "Query" : "SELECT * FROM CIM_InstModification WHERE" + " SourceInstance ISA LMI_StorageJob AND" + " SourceInstance.CIM_ConcreteJob::PercentComplete <>" + " PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for" + " a Concrete Job.", + }, + "JobSucceeded": { + "Query" : "SELECT * FROM CIM_InstModification WHERE" + " SourceInstance ISA LMI_StorageJob AND" + " SourceInstance.CIM_ConcreteJob::JobState = " + " CIM_ConcreteJob.JobState#'Completed'", + "Description": "Modification of Operational Status for" + " a Concrete Job to 'Complete' and 'OK'.", + }, + #... other indications + } + instance_manager.add_filters("LMI_StorageJob", filters) + + First argument is a name of class to which indications apply. We'll call + it *Scoping class*. + + 4. In your provider module, implement indication functions like this:: + + def authorize_filter(env, fltr, ns, classes, owner): + indication_manager.authorize_filter(env, fltr, ns, classes, owner) + + def activate_filter (env, fltr, ns, classes, first_activation): + indication_manager.activate_filter(env, fltr, ns, classes, + first_activation) + + def deactivate_filter(env, fltr, ns, classes, last_activation): + indication_manager.deactivate_filter(env, fltr, ns, classes, + last_activation) + + def enable_indications(env): + indication_manager.enable_indications(env) + + def disable_indications(env): + indication_manager.disable_indications(env) + + From now on, the ``IndicationManager`` will track all subscribed filters. + You can query the ``indication_manager.is_subscribed()`` before you create + and send an indication. Use ``indication_manager.send_indication()`` + to send your indications. + + Only static (=preconfigured, read-only) indication filters are + supported. + + For user to use these preconfigured filters, they need to be installed + at broker as instances of ``CIM_IndicationFilter``. But since they can + not be guarded against removel by accident, this object provides a way + to reinstall them. But using this is not recomended, since it can upset + users. See :ref:`_update_context-label`. + + The supported filters must be passed to add_filters method. The filters + are passed as dictionary ``'filter_id' -> {dictionary 'IndicationFilter + property' -> 'value'}``. There must be at least ``Query`` property in + each filter, CQL is assumed. + + This helper automatically tracks which filters are subscribed. Provider + can query ``is_subscribed()`` to check, if filter with given + ``filter_id`` is subscribed before generating indications. + + The CMPI interface to send indications is complicated - + when an indication is send from CIMOM callback (e.g. ``get_instance``), + it must use current ``env`` parameter of the callback and it would be + tedious to pass it to ``IndicationManager`` each time. Therefore + ``IndicationManager`` creates its own thread, registers it at CIMOM + using ``PrepareAttachThread``/``AttachThread``. + + As side-effect, indication can be sent from any thread, there is no + need to call ``PrepareAttachThread``/``AttachThread``. + """ + SEVERITY_INFO = pywbem.Uint16(2) # CIM_Indication.PerceivedSeverity + + COMMAND_STOP = 1 # Command to the IndicationManager thread to stop. + + @cmpi_logging.trace_method + def __init__(self, env, nameprefix, namespace, ns_interop=None, + queue=None): + """ + Create new ``IndicationManager``. Usually only one instance + is necessary for one provider process. + + :param env: (``ProviderEnvironment``) Provider enviroment, taken + from CIMOM callback (e.g. ``get_providers()``). + :param nameprefix: (``string``) Prefix of your ``CIM_InstCreation`` + and ``CIM_InstModification`` subclasses, e.g. 'Storage' for + ``LMI_StorageInstCreation``. + :param namespace: (``string``) Namespace, which will be set to + outgoing indications instances. + :param ns_interop: (``string``) Namespace, where filters and + subscriptions are stored. + :param queue: Optional custom input queue with the same interface as + ``Queue.Queue``. + """ + + # { class_name : + # { filter_id : filter_properties + # , ... } + # } + self._filters = pywbem.NocaseDict() + self._enabled = False + # { (class_name, filter_id), ... } + self._subscribed_filters = set() + self._nameprefix = nameprefix + self._namespace = namespace + self._ns_interop = ns_interop + self._access_lock = threading.RLock() + self._env = env + + if queue is None: + queue = Queue() + self._queue = queue + # prepare indication thread + ch = env.get_cimom_handle() + new_broker = ch.PrepareAttachThread() + self._indication_sender = threading.Thread( + target=self._send_indications_loop, args=(new_broker,)) + self._indication_sender.daemon = False + self._indication_sender.start() + + @property + def enabled(self): + """ + Return a boolean saying, whether indication sending is enabled. + """ + with self._access_lock: + return self.enabled + + @property + def namespace(self): + """ + Return namespace of outgoing indication instances. + """ + return self._namespace + + @property + def nameprefix(self): + """ + Return prefix of indication class names. + """ + return self._nameprefix + + @property + def ns_interop(self): + """ + Return interop namespace name. + """ + with self._access_lock: + if self._ns_interop is None: + ch = self._env.get_cimom_handle() + self._ns_interop = find_ns_interop(ch) + cmpi_logging.logger.info('found interop namespace: %s', + self._ns_interop) + return self._ns_interop + + @property + def instcreation_classname(self): + """ + Return whole class name of InstCreation indication. + """ + return "LMI_" + self._nameprefix + "InstCreation" + + @property + def instmodification_classname(self): + """ + Return whole class name of InstModification indication. + """ + return "LMI_" + self._nameprefix + "InstModification" + + @property + def instdeletetion_classname(self): + """ + Return whole class name of InstDeletion indication. + """ + return "LMI_" + self._nameprefix + "InstDeletion" + + @cmpi_logging.trace_method + def _get_filter_inst(self, class_name, fltr_id): + """ + Return instance of CIM_IndicationFilter registered in CIMOM if any. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) Indication name. + """ + ch = self._env.get_cimom_handle() + cop = make_indication_filter_path(class_name, fltr_id, self.ns_interop) + try: + return ch.GetInstance(cop) + except pywbem.CIMError as exc: + if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND: + return None + raise + + @cmpi_logging.trace_method + def _ensure_cimom_has_filter(self, class_name, fltr_id): + """ + Ensures, that cimom has ``fltr_id`` filter registered as instance. + If it has, but the query differs it is recreated at broker. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) Indication name. + """ + inst = self._get_filter_inst(class_name, fltr_id) + ch = self._env.get_cimom_handle() + installed = inst is not None + referents = [] + if installed: + for prop_name, val in self._filters[class_name][fltr_id].items(): + if inst[prop_name] != val: + cmpi_logging.logger.info("filter \"%s\" is installed, but" + " its property \"%s\" has outdated value;" + " removing...", fltr_id, prop_name) + referents = remove_cimom_filter(ch, inst.path) + installed = False + if not installed: + if inst is not None: + path = inst.path + else: + path = make_indication_filter_path(class_name, fltr_id, + self.ns_interop) + inst = pywbem.CIMInstance(path.classname, path=path) + kwargs = FILTER_DEFAULTS.copy() + for key, val in path.keybindings.items(): + kwargs[key] = val + kwargs.update(self._filters[class_name][fltr_id]) + inst.update(kwargs) + try: + inst = ch.CreateInstance(inst) + cmpi_logging.logger.info("filter \"%s\" installed", fltr_id) + except pywbem.CIMError: + cmpi_logging.logger.exception( + "failed to install indication filter \"%s\"", + fltr_id) + if referents: + cmpi_logging.logger.debug('reinstalling %d filter' + ' subscriptions', len(referents)) + for ref in referents: + ch.CreateInstance(ref) + return inst + + @cmpi_logging.trace_method + def _get_matching_filter(self, query): + """ + Try to find matching filter properties in local ``_filters`` storage + and return it. ``None`` is returned if not found. + + Return a tuple ``(class_name, filter_id, filter_properties)``. + + :param query: (``string``) Is filter query. + """ + if not isinstance(query, basestring): + raise TypeError("query must be a string") + for clsname, fltrs in self._filters.iteritems(): + for fltr_id, props in fltrs.iteritems(): + if query == props["Query"]: + return (clsname, fltr_id, props) + return None + + @cmpi_logging.trace_method + def ensure_filters_installed(self, class_name=None, fltr_id=None): + """ + This function checks for existence of filters at broker. Filters + must be registered with this instance before the check can be done. + Without arguments all registered filters will be checked. + + :param class_name: (``string``) Name of *Scoped class* that reduces + searched filters. + :param fltr_id: (``string``) Indication name reducing filters that + will be checked. + """ + cls_to_check = self._filters.keys() + if class_name is not None: + cls_to_check = [class_name] + filters_to_check = list( + (c, f) + for c in cls_to_check + for f in self._filters[c].keys() + if fltr_id is None or fltr_id == f) + with self._access_lock: + try: + for clsname, fltr_id in filters_to_check: + self._ensure_cimom_has_filter(clsname, fltr_id) + cmpi_logging.logger.debug('filters installed') + return True + except pywbem.CIMError as exc: + if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED: + cmpi_logging.logger.error("filters could not be checked" + " for presence due to invalid context") + return False + raise + + @cmpi_logging.trace_method + def update_context(self, env): + """ + .. _update_context-label + + When ``IndicationManager`` is initialized upon provider initialization, + the conxet given does not contain any user credentials that are + needed for communication with broker. In order to check for filter's + existence at broker, this method needs to be called first with + context containing user's credentials. + + This needs to be called only once. + + **Note** that if you don't plan to check for filter's presence at + broker at runtime, you are not interested in this function. + """ + with self._access_lock: + self._env = env + + @cmpi_logging.trace_method + def add_filters(self, class_name, filters, ensure_installed=False): + """ + Add new filters to the helper. These filters will be allowed for + subscription. + + :param filters: (``dictionary filter_id -> filter properties``) + The filters. ``filter properties`` is dictionary + ``property_name -> value``, where at least ``Query`` property + must be set. ``Name`` property will be automatically created + as 'LMI::'. + :param ensure_installed: (``bool``) Whether to check for filter presence + at broker and install them if missing. **Note** That in order + for this to work, the context must be updated with user's + credentials. See :ref:`update_context-label`. + """ + with self._access_lock: + if not class_name in self._filters: + self._filters[class_name] = pywbem.NocaseDict() + self._filters[class_name].update(filters) + if ensure_installed: + self.ensure_filters_installed(class_name=class_name) + + @cmpi_logging.trace_method + def authorize_filter(self, _env, fltr, _class_name, _op, _owner): + """ + AuthorizeFilter callback from CIMOM. Call this method from appropriate + CIMOM callback + + It asks us to verify whether this filter is allowed. + + :param fltr: Contains the filter that must be authorized. + :param _class_name: (``String``) Contains the class name extracted + from the filter FROM clause. + :param _op: The name of the class for which monitoring is required. + Only the namespace part is set if className is a process indication. + :param _owner The owner argument is the destination owner. + """ + with self._access_lock: + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.add((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s authorized", + make_filter_name(res[0], res[1]), fltr) + return True + return False + + @cmpi_logging.trace_method + def activate_filter(self, _env, fltr, _class_name, _class_path, + first_activation): + """ + ActivateFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + + It ask us to begin monitoring a resource. The function shall begin + monitoring the resource according to the filter express only. + + :param fltr: The filter argument contains the filter specification + for this subscription to become active. + :param _class_name: (``String``) The class name extracted from the filter + FROM clause. + :param _class_path: (``CIMInstanceName``) The name of the class for + which monitoring is required. Only the namespace part is set if + eventType is a process indication. + :param first_activation: (``bool``) Set to true if this is the first + filter for className. + """ + with self._access_lock: + if not first_activation: + return + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.add((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s started", + make_filter_name(res[0], res[1]), fltr) + + @cmpi_logging.trace_method + def deactivate_filter(self, _env, fltr, _class_name, _class_path, + last_activation): + """ + DeactivateFilter callback from CIMOM. Call this method from appropriate + CIMOM callback. + + Informs us that monitoring using this filter should stop. + + :param fltr: The filter argument contains the filter specification for + this subscription to become active. + :param class_name: (``String``) The class name extracted from the filter + FROM clause. + :param class_path: (``CIMInstanceName``) class_path The name of the + class for which monitoring is required. Only the namespace part is + set if className is a process indication. + :last_activation: (``bool``) Set to true if this is the last filter for + className. + """ + with self._access_lock: + if not last_activation: + return + res = self._get_matching_filter(fltr) + if res is not None: + self._subscribed_filters.remove((res[0], res[1])) + cmpi_logging.logger.info("InstanceFilter %s: %s stopped", + make_filter_name(res[0], res[1]), fltr) + + @cmpi_logging.trace_method + def enable_indications(self, _env): + """ + EnableIndications callback from CIMOM. Call this method from + appropriate CIMOM callback. + + Tells us that indications can now be generated. The MB is now prepared + to process indications. The function is normally called by the MB after + having done its intialization and processing of persistent subscription + requests. + """ + with self._access_lock: + self._enabled = True + cmpi_logging.logger.info("Indications enabled") + + @cmpi_logging.trace_method + def disable_indications(self, _env): + """ + EnableIndications callback from CIMOM. Call this method from + appropriate CIMOM callback. + + Tells us that we should stop generating indications. MB will not accept + any indications until enabled again. The function is normally called + when the MB is shutting down indication services either temporarily or + permanently. + """ + with self._access_lock: + self._enabled = False + cmpi_logging.logger.info("Indications disabled") + + @cmpi_logging.trace_method + def send_indication(self, indication): + """ + Send indication to all subscribers. Call this method from appropriate + CIMOM callback. + """ + self._queue.put(indication) + + @cmpi_logging.trace_method + def send_instcreation(self, instance, filter_id): + """ + Send ``LMI_InstCreation`` indication with given instance. + + :param instance: (``CIMInstance``) The created instance. + :param filter_id: (``string``) The ID of registered filter which + corresponds to this indication. + """ + if not self.is_subscribed(instance.classname, filter_id): + return + path = pywbem.CIMInstanceName( + classname=self.instcreation_classname, + namespace=self.namespace) + ind = pywbem.CIMInstance( + self.instcreation_classname, + path=path) + ind['SourceInstance'] = instance + ind['SourceInstanceHost'] = socket.gethostname() + ind['SourceInstanceModelPath'] = str(instance.path) + ind['IndicationFilterName'] = make_filter_name( + instance.classname, filter_id) + ind['PerceivedSeverity'] = self.SEVERITY_INFO + + cmpi_logging.logger.info("Sending indication %s for %s" % + (ind["IndicationFilterName"], str(path))) + self.send_indication(ind) + + @cmpi_logging.trace_method + def send_instmodification(self, old_instance, new_instance, filter_id): + """ + Send ``LMI_InstModification`` indication with given + instance. + + :param old_instance: (``CIMInstance``) The instance before + modification. + :param new_instance: (``CIMInstance``) The instance after modification. + :param filter_id: (``string``) The ID of registered filter which + corresponds to this indication. + """ + if not self.is_subscribed(new_instance.classname, filter_id): + return + path = pywbem.CIMInstanceName( + classname=self.instmodification_classname, + namespace=self.namespace) + ind = pywbem.CIMInstance( + self.instcreation_classname, + path=path) + ind['SourceInstance'] = new_instance + ind['PreviousInstance'] = old_instance + ind['SourceInstanceHost'] = socket.gethostname() + ind['SourceInstanceModelPath'] = str(new_instance.path) + ind['IndicationFilterName'] = make_filter_name( + new_instance.classname, filter_id) + ind['PerceivedSeverity'] = self.SEVERITY_INFO + + cmpi_logging.logger.info("Sending indication %s for %s", + ind["IndicationFilterName"], str(path)) + self.send_indication(ind) + + @cmpi_logging.trace_method + def is_subscribed(self, class_name, fltr_id): + """ + Return True, if there is someone subscribed for given filter. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) ID of the filter to check. + """ + with self._access_lock: + if not self._enabled: + return False + if (class_name, fltr_id) in self._subscribed_filters: + return True + return False + + @cmpi_logging.trace_method + def is_registered(self, class_name, fltr_id): + """ + Return True, if filter id has been registered with current instance. + + :param class_name: (``string``) *Scoping class* name. + :param fltr_id: (``string``) ID of the filter to check. + """ + with self._access_lock: + return (class_name in self._filters + and fltr_id in self._filters[class_name]) + + def _send_indications_loop(self, broker): + """ + This method runs in its own thread. It just sends all enqueued + indications. + + :param broker: (``BrokerCIMOMHandle``) Handle of the CIMOM. + """ + broker.AttachThread() + while True: + command = self._queue.get() + + if isinstance(command, pywbem.CIMInstance) : + indication = command + cmpi_logging.logger.trace_info("Delivering indication %s" % + (str(indication.path))) + broker.DeliverIndication(self.namespace, indication) + + elif isinstance(command, int): + cmpi_logging.logger.trace_info("Received command %d", command) + if command == self.COMMAND_STOP: + if hasattr(self._queue, "task_done"): + self._queue.task_done() + break + + if hasattr(self._queue, "task_done"): + self._queue.task_done() + + cmpi_logging.logger.info("Stopped Indication thread.") + + @cmpi_logging.trace_method + def shutdown(self): + """ + Stop the thread. This method blocks until the thread is safely + destroyed. + """ + self._queue.put(self.COMMAND_STOP) + self._indication_sender.join() diff --git a/src/python/lmi/providers/JobManager.py b/src/python/lmi/providers/JobManager.py new file mode 100644 index 0000000..24c909c --- /dev/null +++ b/src/python/lmi/providers/JobManager.py @@ -0,0 +1,1670 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek +# -*- coding: utf-8 -*- +""" + Basic infrastructure for asynchronous jobs. All necessary CIM classes and + indications are implemented here. + + .. autoclass:: JobManager + :members: + + .. autoclass:: Job + :members: + + .. autoclass:: LMI_ConcreteJob + :members: + + .. autoclass:: LMI_OwningJobElement + :members: + + .. autoclass:: LMI_AffectedJobElement + :members: + + .. autoclass:: LMI_MethodResult + :members: + + .. autoclass:: LMI_AssociatedJobMethodResult + :members: +""" + +from datetime import datetime, timedelta +import threading +from Queue import Queue +import pywbem +from pywbem.cim_provider2 import CIMProvider2 +import socket +import traceback + +from lmi.providers import cmpi_logging, parse_instance_id +from lmi.providers.IndicationManager import IndicationManager + +@cmpi_logging.trace_function +def register_filters(job_clsname, indication_manager=None): + """ + This function registers static indication filters at IndicationManager. + It should be called upon provider's initialization. + + :param job_clsname: (``String``) CIM class name for asynchonous jobs. + Will be part of filter queries. + :param indication_manager: If not given, global instance will be obtained. + """ + if indication_manager is None: + ind_manager = IndicationManager.get_instance() + filters = {} + query_args = { + "classname" : job_clsname, + "prefix" : indication_manager.nameprefix + } + for fltr_id, fltr_props in JobManager.IND_FILTERS.items(): + filters[fltr_id] = fltr_props.copy() + filters[fltr_id]['Query'] = fltr_props['Query'] % query_args + indication_manager.add_filters(job_clsname, filters) + +# Too many instance attributes +# pylint: disable-msg=R0902 +class Job(object): + """ + Generic abstract class representing one CIM_ConcreteJob. + It remembers input and output arguments, affected ManagedElements and + owning ManagedElement (to be able to create associations to them) + and all CIM_ConcreteJob properties. + + Due to multiple threads processing the job, each job has its own + lock to guard its status changes. It is expected that number of jobs + is quite low. + """ + + DEFAULT_TIME_BEFORE_REMOVAL = 60 # in seconds + + STATE_QUEUED = 1 # Job has not started yet + STATE_RUNNING = 2 # Job is running + STATE_FINISHED_OK = 3 # Job finished OK + STATE_FAILED = 4 # Job finished with error + STATE_SUSPENDED = 5 # Job is queued and suspended + STATE_TERMINATED = 6 # Job was queued and terminated + + FINAL_STATES = [STATE_FINISHED_OK, STATE_FAILED, STATE_SUSPENDED, + STATE_TERMINATED] + + # There is no way how to suspend/terminate running job! + + @cmpi_logging.trace_method + def __init__(self, job_manager, job_name, input_arguments, + method_name, affected_elements, owning_element): + """ + Create new storage job. + + :param job_manager: (``JobManager``) Reference to ``JobManager``, which + will manage this job. + :param job_name: (``string``) User-friendly name of the job. + :param input_arguments: (``dictionary param_name -> param_value``) + Input arguments of the method, which spawned this job. + :param method_name: (``string``) Name of the CIM method, which spawned + this job. + :param affected_elements: (``array of CIMInstanceName``) List of + affected elements. ``LMI_AffectedJobElement`` association will be + created for them. + :param owning_element: (``CIMInstanceName``) Reference to service, which + spawned the job. ``LMI_OwningJobElement`` association will be + created for it. + """ + self.job_manager = job_manager + self.timer_manager = job_manager.timer_manager + + # Unique ID + self.the_id = job_manager.get_next_id() + + # User friendly name of the job + self.job_name = job_name + + # Dictionary of input arguments, 'parameter_name' -> 'parameter_value' + # The parameter value must be CIMProperty or something that can be + # assigned to it. + self.input_arguments = input_arguments + + # Dictionary of output arguments, 'parameter_name' -> 'parameter_value' + # The parameter value must be CIMProperty or something that can be + # assigned to it. + self.output_arguments = None + + # Method return value, as CIMProperty or something that can be + # assigned to it. + self.return_value = None + # Value of Job.ReturnValueType + self.return_value_type = None + + # Name of the method + self.method_name = method_name + + # Time when the job was created + self.time_submitted = datetime.utcnow() + + # Nr. of seconds before the job is removed when the job finishes + self.time_before_removal = self.DEFAULT_TIME_BEFORE_REMOVAL + + # If the job should be removed after completion + self.delete_on_completion = True + + self.percent_complete = 0 + + # State of the job + self.job_state = self.STATE_QUEUED + + # Last change of job state, wall clock time + self.clocktime_of_last_state_change = self.time_submitted + + # Duration of the job in RUNNING state (in seconds) + self.elapsed_time = None + + # When the job started (= switched to RUNNING), wall clock time + self.start_clocktime = None + # When the job started (= switched to RUNNING), monotonic clock time + self.start_monotime = None + # When the job finished (= switched from RUNNING), monotonic clock time + self.finish_monotime = None + + # Array of CIMInstanceNames of affected elements, so we can + # enumerate associations to them. + self.affected_elements = affected_elements + + # CIMInstanceName to owning element (service), so we can enumerate + # instances. + self.owning_element = owning_element + + # Timer used to delete the job after time_before_removal seconds + self.timer = None + + # CIMError with result code + self.error = None + + # internal lock to protect state changes from races + self._lock = threading.RLock() + + self._execute = None + self._execargs = None + self._execkwargs = None + self._cancel = None + self._cancelargs = None + self._cancelkwargs = None + + self._finished_event = threading.Event() + + @cmpi_logging.trace_method + def set_execute_action(self, callback, *args, **kwargs): + """ + Set callback, which will be called when the job is to be executed. It is + expected that the callback will take some time to execute. The callback + must change state of the job and set output parameters and error in a + thread-safe way, i.e. by calling ``finish_method()``. + + :param callback: (``function``) Reference to callback to call. + :param args, kwargs: All other parameters will be passed to the + callback. It is highly recommended to add reference to the job + to the callback. + """ + self._execute = callback + self._execargs = args + self._execkwargs = kwargs + + @cmpi_logging.trace_method + def set_cancel_action(self, callback, *args, **kwargs): + """ + Set callbacks, which will be called when the job is to be + cancelled. The callback must be quick, the job is already locked! + + :param callback: (``function``) Reference to callback to call. + :param args, kwargs: All other parameters will be passed to the + callback. It is highly recommended to add reference to the job + to the callback. + """ + self._cancel = callback + self._cancelargs = args + self._cancelkwargs = kwargs + + @cmpi_logging.trace_method + def finish_method(self, new_state, return_value=None, return_type=None, + output_arguments=None, error=None, affected_elements=None): + """ + Mark the job as finished, with given return value, output parameters and + error. + This method is thread-safe. + + :param new_state: (``Job.STATE_* value``) Resulting state of the job. + :param return_value: (``string``) Return value of the job, encoded + into string. Can be None when the job does not return any value. + :param return_type: (``Job.RetunValueType.* value``) Type of the return + value. Can be None when the job does not return any value. + :param output_arguments: (``dictionary param_name -> param_value``) + Output arguments of the job. Can be None when the job does not have + any output parameters. + :param error: (``CIMError``) Error raised by the job. Can be None, + when the job finished successfully. + :param affected_elements: (``array of CIMInstanceName``) New list of + affected elements to generate LMI_JobAffectedElement + association. If None, the old list, passed to constructor, remains + untouched. + """ + self.lock() + self.return_value = return_value + self.return_value_type = return_type + self.output_arguments = output_arguments + self.error = error + if affected_elements is not None: + self.affected_elements = affected_elements + self.change_state(new_state, 100) + self.unlock() + self._finished_event.set() + + @cmpi_logging.trace_method + def change_state(self, new_state, percent=None): + """ + Change state of a job. (Re-)calculate various times based on the state + change. Send indications as necessary. + This method is thread-safe. + + :param new_state: (``Job.STATE_* value``) New state of the job. + It can be the same as the previous state to indicate progress of + the job. + :param percent: (``int``)) Percent complete of the job. When None, + this valuu will be automatically calculated (in progress = 50%, + finished = 100%). + """ + self.lock() + + cmpi_logging.logger.debug("Job %s: %s changes state from %d to %d" + % (self.the_id, self.job_name, self.job_state, new_state)) + + # For sending indications + prev_instance = None + send_indication = False + indication_ids = [] + + if self.job_state != new_state: + # Remember to send indications + prev_instance = self.job_manager.get_job_instance(self) + send_indication = True + indication_ids.append(JobManager.IND_JOB_CHANGED) + + # Check if the job has just finished + if (self.job_state not in self.FINAL_STATES + and new_state in self.FINAL_STATES): + # Remember finish time + self.finish_clocktime = datetime.utcnow() + self.finish_monotime = self.timer_manager.now() + # Remember job execution time. + if self.start_monotime: + self.elapsed_time = self.finish_monotime \ + - self.start_monotime + # Send indication + if self.job_state == self.STATE_FAILED: + indication_ids.append(JobManager.IND_JOB_FAILED) + if self.job_state == self.STATE_SUSPENDED: + indication_ids.append(JobManager.IND_JOB_SUCCEEDED) + + # Check if the job has just started + if new_state == self.STATE_RUNNING: + self.start_clocktime = datetime.utcnow() + self.start_monotime = self.timer_manager.now() + + self.clocktime_of_last_state_change = datetime.utcnow() + self.job_state = new_state + + if percent is None: + # guess the percentage from status + if new_state == self.STATE_QUEUED: + percent = 0 + elif new_state == self.STATE_RUNNING: + percent = 50 + else: + percent = 100 + if self.percent_complete != percent: + # Remember to send indications + if not send_indication: + self.clocktime_of_last_state_change = datetime.utcnow() + prev_instance = self.job_manager.get_job_instance(self) + send_indication = True + indication_ids.append(JobManager.IND_JOB_PERCENT_UPDATED) + self.percent_complete = percent + + if send_indication: + current_instance = self.job_manager.get_job_instance(self) + self.job_manager.send_modify_indications( + prev_instance, current_instance, indication_ids) + + # start / update the timer if necesasry + self._restart_timer() + self.unlock() + + @cmpi_logging.trace_method + def _expire(self): + """ + Callback when a Job completes and time_before_removal second passed. + The job gets removed from its JobManager. + """ + cmpi_logging.logger.debug("Job %s: %s expired" + % (self.the_id, self.job_name)) + + self.job_manager.remove_job(self) + + @cmpi_logging.trace_method + def _restart_timer(self): + """ + Re-schedule timer for TimeBeforeRemoval because some property has + changed. + """ + if not self.job_state in self.FINAL_STATES: + return + + # Stop the old timer. + if self.timer: + self.timer.cancel() + self.timer = None + + # Start the new timer. + if self.delete_on_completion: + now = self.timer_manager.now() + passed = now - self.finish_monotime + timeout = self.time_before_removal - passed + if timeout <= 0: + # Just in case... + self._expire() + return + + cmpi_logging.logger.debug("Starting timer for job %s: '%s' for %f" + " seconds" % (self.the_id, self.job_name, timeout)) + self.timer = self.timer_manager.create_timer( + "Job " + self.job_name, + callback=self._expire) + self.timer.start(timeout) + + @cmpi_logging.trace_method + def lock(self): + """ + Lock internal mutex. Other threads will block on subsequent lock(). + The lock is recursive, i.e. can be called multiple times from + single thread. + """ + self._lock.acquire() + + @cmpi_logging.trace_method + def unlock(self): + """ Unlock internal mutex.""" + self._lock.release() + + @cmpi_logging.trace_method + def execute(self): + """ + Start executing the job. It calls the execute callback, set by + ``set_execute_action()``. + + job_state must be already set to STATE_RUNNING. + Any exception is translated to CIMError and appropriate state is set. + """ + try: + self._execute(*(self._execargs), **(self._execkwargs)) + except pywbem.CIMError, error: + cmpi_logging.logger.trace_warn("Job.execute caught an CIMError %s", + str(error)) + cmpi_logging.logger.trace_verbose("traceback: %s", + traceback.format_exc()) + self.finish_method(Job.STATE_FAILED, error=error) + except Exception, ex: + cmpi_logging.logger.trace_warn("Job.execute caught an Exception %s", + str(ex)) + cmpi_logging.logger.trace_verbose("traceback: %s", + traceback.format_exc()) + error = pywbem.CIMError(pywbem.CIM_ERR_FAILED, str(ex)) + self.finish_method(Job.STATE_FAILED, error=error) + + @cmpi_logging.trace_method + def cancel(self): + """ + Cancels queued action. The action must have not been started. + """ + self.change_state(self.STATE_TERMINATED) + if self._cancel: + self._cancel(*(self._cancelargs), **(self._cancelkwargs)) + self._finished_event.set() + + @cmpi_logging.trace_method + def get_name(self): + """ + Return CIMInstanceName of the job. + + :rtype: ``CIMInstanceName`` + """ + name = pywbem.CIMInstanceName( + classname=self.job_manager.job_classname, + namespace=self.job_manager.namespace, + keybindings={ + 'InstanceID': self.get_instance_id() + }) + return name + + @cmpi_logging.trace_method + def get_instance_id(self, classname=None): + """ + Return InstanceID. + + :param classname: (``string``) Optional classname to generate InstanceID + for different class, e.g. for LMI_MethodResult. + :rtype: ``string`` + """ + if classname is None: + classname = self.job_manager.job_classname + return 'LMI:' + classname + ':' + str(self.the_id) + + @cmpi_logging.trace_method + def get_pre_call(self): + """ + Return indication that describes the pre-execution values of the + job's invocation. + + :rtype: ``CIMInstance of CIM_InstMethodCall`` + """ + path = pywbem.CIMInstanceName( + classname="CIM_InstMethodCall", + keybindings={}, + host=socket.gethostname(), + namespace=self.job_manager.namespace) + inst = pywbem.CIMInstance( + classname="CIM_InstMethodCall", + path=path) + src_instance = self._get_cim_instance() + inst['SourceInstance'] = src_instance + inst['SourceInstanceModelPath'] = str(src_instance.path) + inst['MethodName'] = self.method_name + inst['MethodParameters'] = self.get_method_params( + '__MethodParameters', True, False) + inst['PreCall'] = True + return inst + + @cmpi_logging.trace_method + def get_cim_error(self): + """ + Return job error as CIMInstance of CIM_Error. + :returns: CIMInstance of CIM_Error + """ + path = pywbem.CIMInstanceName( + classname="CIM_Error", + host=socket.gethostname(), + namespace=self.job_manager.namespace) + err = pywbem.CIMInstance( + classname="CIM_Error", + path=path) + err['CIMStatusCode'] = pywbem.Uint32(self.error[0]) + err['Message'] = self.error[1] + return err + + @cmpi_logging.trace_method + def get_post_call(self): + """ + Return indication that describes the post-execution values of the + job's invocation. + + :rtype: ``CIMInstance of CIM_InstMethodCall`` + """ + path = pywbem.CIMInstanceName( + classname="CIM_InstMethodCall", + keybindings={}, + host=socket.gethostname(), + namespace=self.job_manager.namespace) + inst = pywbem.CIMInstance( + classname="CIM_InstMethodCall", + path=path) + + src_instance = self._get_cim_instance() + inst['SourceInstance'] = src_instance + inst['SourceInstanceModelPath'] = str(src_instance.path) + inst['MethodName'] = self.method_name + inst['MethodParameters'] = self.get_method_params( + '__MethodParameters', True, True) + inst['PreCall'] = False + + if self.return_value_type is not None: + inst['ReturnValueType'] = self.return_value_type + if self.return_value is not None: + inst['ReturnValue'] = str(self.return_value) + if self.error is not None: + err = self.get_cim_error() + inst['Error'] = [err, ] + return inst + + @cmpi_logging.trace_method + def _get_cim_instance(self): + """ + Return CIMInstance of this job. + + :rtype: CIMInstance + """ + return self.job_manager.get_job_instance(self) + + @cmpi_logging.trace_method + def get_method_params(self, class_name, include_input, include_output): + """ + Create a class of given name with all input or output parameters + of the asynchronous method. Typically used to assemble + CIM_ConcreteJob.JobInParameters or CIM_InstMethodCall.MethodParameters + values. + + :param class_name: (``string``) Name of the class to create. + :param input: (``boolean``) Whether input parameters should be + included in the returned class + :param output: (``boolean``) Whether output parameters should be + included in the returned class + :rtype: CIMInstance of the created class. + """ + # TODO: this is workaround for bug #920763, use class_name + # when it's fixed + clsname = "CIM_ManagedElement" + path = pywbem.CIMInstanceName( + classname=clsname, + namespace=self.job_manager.namespace) + inst = pywbem.CIMInstance(classname=clsname, path=path) + if include_input and self.input_arguments: + for (name, value) in self.input_arguments.iteritems(): + inst[name] = value + if include_output and self.output_arguments: + # overwrite any input parameter + for (name, value) in self.output_arguments.iteritems(): + inst[name] = value + return inst + + @cmpi_logging.trace_method + def wait_for_job(self, timeout=None): + """ + Block and wait until the job completes. + + :param timeout: (``float``) Number of seconds to wait for the job + to complete. + :rtype: ``bool`` - True, when the job is finished, False if the timeout + occurred. + """ + return self._finished_event.wait(timeout) + + # pylint: disable-msg=R0903 + class ReturnValueType(object): + """ CIM_InstMethodCall.ReturnValueType values.""" + Boolean = pywbem.Uint16(2) + String = pywbem.Uint16(3) + Char16 = pywbem.Uint16(4) + Uint8 = pywbem.Uint16(5) + Sint8 = pywbem.Uint16(6) + Uint16 = pywbem.Uint16(7) + Sint16 = pywbem.Uint16(8) + Uint32 = pywbem.Uint16(9) + Sint32 = pywbem.Uint16(10) + Uint64 = pywbem.Uint16(11) + Sint64 = pywbem.Uint16(12) + Datetime = pywbem.Uint16(13) + Real32 = pywbem.Uint16(14) + Real64 = pywbem.Uint16(15) + Reference = pywbem.Uint16(16) + +class JobManager(object): + """ + Container of all queued, running or finished ``LMI_ConcreteJobs``. + + Usage: + + 1. Create MOF file for these classes: + + * ``LMI_Job`` + + * ``LMI_MethodResult`` + + * ``LMI_AffectedJobElement`` + + * ``LMI_OwningJobElement`` + + * ``LMI_AssociatedJobMethodResult`` + + Where ```` is prefix of your classes, for example 'Storage' + + 2. During initialization, initialize ``TimerManager`` and create + ``JobManager``. + + 3. When needed. create new Job instance: + + 4. Set its execute callback using ``set_execute_action()``. This callback + will be called when the job is to be executed. It will be called in + context of ``JobManager`` worker thread! + + 5. Optionally, set cancel callback using ``set_execute_action()``. This + callback will be called when the job is still queued and is cancelled by + application. This callback will be called in context of CIMOM callback + and should be quick! + + 6. Enqueue the job using ``JobManager.add_job()`` method. + + 7. When your execute callback is called, you can optionally call + ``job.change_state()`` to update percentage of completion. + + 8. When your execute callback is finished, don't forget to set method + result using ``job.finish_method()``. + + * ``JobManager`` automatically sends all job-related indications. + * ``Job`` automatically tracks various timestamps. + * By default, the job automatically disappears after 60 seconds after it + finishes. Application may set ``DeleteOnCompletion`` and + ``TimeBeforeRemoval`` properties of ``LMI_Job`` to override this + timeout. + """ + + COMMAND_STOP = 1 + + IND_JOB_PERCENT_UPDATED = "PercentUpdated" + IND_JOB_SUCCEEDED = "Succeeded" + IND_JOB_FAILED = "Failed" + IND_JOB_CHANGED = "Changed" + IND_JOB_CREATED = "Created" + + IND_FILTERS = { + IND_JOB_PERCENT_UPDATED: { + "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::PercentComplete <> " + "PreviousInstance.CIM_ConcreteJob::PercentComplete", + "Description" : "Modification of Percentage Complete for a " + "Concrete Job.", + }, + IND_JOB_SUCCEEDED: { + "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState = 17", + "Description": "Modification of Job State for a " + "Concrete Job to 'Complete'.", + }, + IND_JOB_FAILED: { + "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState = 10", + "Description": "Modification of Job State for a " + "Concrete Job to 'Exception'.", + }, + IND_JOB_CHANGED: { + "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " + "SourceInstance ISA %(classname)s AND " + "SourceInstance.CIM_ConcreteJob::JobState <> " + "PreviousInstance.CIM_ConcreteJob::JobState", + "Description": "Modification of Job State for a ConcreteJob.", + }, + IND_JOB_CREATED: { + "Query" : "SELECT * FROM LMI_%(prefix)sInstCreation WHERE " + "SourceInstance ISA %(classname)s", + "Description": "Creation of a ConcreteJob.", + }, + } + + @cmpi_logging.trace_method + def __init__(self, name, namespace, indication_manager, timer_manager): + """ + Initialize new Manager. It automatically registers all job-related + filters to indication_manager and starts a worker thread. + + :param name: (``string``) String with classname infix. For example + 'Storage' for ``LMI_StorageJob``, ``LMI_StorageJobMethodResult`` + etc. + :param namespace: (``string``) Namespace of all providers. + :param indication_manager: (``IndicationManager``): a manager where + indications and filters should be added. + :param timer_manager: (``TimerManager``): Timer manager instance. + """ + # List of all jobs. Dictionary job_id -> Job. + self.jobs = {} + # Queue of jobs scheduled to execute. + self.queue = Queue() + # Last created job_id. + self.last_instance_id = 0 + # Classname infix. + self.name = name + # CIMProvider2 instances for job classes. + self.providers = {} + self.namespace = namespace + self.indication_manager = indication_manager + self.timer_manager = timer_manager + + # Start the worker thread (don't forget to register it at CIMOM) + self.worker = threading.Thread(target=self._worker_main) + self.worker.daemon = False + self.worker.start() + + # Various classnames for job-related classes, with correct infixes. + self.job_classname = 'LMI_' + self.name + 'Job' + self.method_result_classname = "LMI_" + self.name + "MethodResult" + self.affected_classname = "LMI_Affected" + self.name + "JobElement" + self.owning_classname = "LMI_Owning" + self.name + "JobElement" + self.associated_result_classname = ('LMI_Associated' + self.name + + 'JobMethodResult') + self.indication_filter_classname = ('LMI_' + self.name + + 'JobIndicationFilter') + self.job_provider = None + self._add_indication_filters() + + @cmpi_logging.trace_method + def _add_indication_filters(self): + """ + Add all job-related ``IndicationFilters`` to indication manager. + """ + register_filters(self.job_classname, self.indication_manager) + + @cmpi_logging.trace_method + def get_providers(self): + """ + Get dictionary of providers for these classes: + + * ``LMI_Job`` + * ``LMI_MethodResult`` + * ``LMI_AffectedJobElement`` + * ``LMI_OwningJobElement`` + * ``LMI_AssociatedJobMethodResult`` + + :rtype: dictionary class_name -> CIMProvider2 + """ + + if not self.providers: + job_provider = LMI_ConcreteJob(self.job_classname, job_manager=self) + self.providers[self.job_classname] = job_provider + self.job_provider = job_provider + + provider = LMI_MethodResult( + self.method_result_classname, job_manager=self) + self.providers[self.method_result_classname] = provider + + provider = LMI_AffectedJobElement( + self.affected_classname, job_manager=self) + self.providers[self.affected_classname] = provider + + provider = LMI_OwningJobElement( + self.owning_classname, job_manager=self) + self.providers[self.owning_classname] = provider + + provider = LMI_AssociatedJobMethodResult( + self.owning_classname, job_manager=self) + self.providers[self.associated_result_classname] = provider + + return self.providers + + @cmpi_logging.trace_method + def add_job(self, job): + """ + Enqueue new job. Send indication when needed. + + :param job: (``Job``) A job to enqueue. + """ + cmpi_logging.logger.debug("Job %s: '%s' enqueued" + % (job.the_id, job.job_name)) + + self.jobs[job.the_id] = job + self.queue.put(job) + # send indication + if self.indication_manager.is_subscribed( + self.job_classname, self.IND_JOB_CREATED): + job_instance = self.get_job_instance(job) + self.indication_manager.send_instcreation( + job_instance, self.IND_JOB_CREATED) + + def send_modify_indications(self, prev_instance, current_instance, + indication_ids): + """ + Send InstModification. This is helper method called by ``Job`` when + needed. + + :param prev_instance: Instance of ``LMI_Job`` before it was + modified. + :param current_instance: Instance of ``LMI_Job`` after it was + modified. + """ + for _id in indication_ids: + self.indication_manager.send_instmodification(prev_instance, + current_instance, _id) + + @cmpi_logging.trace_method + def remove_job(self, job): + """ + Remove existing job. Note that jobs are removed automatically after a + timeout, providers should not call this method directly. + + :param job: (``Job``) Job to remove. + """ + cmpi_logging.logger.debug("Job %s: '%s' removed from queue." + % (job.the_id, job.job_name)) + del self.jobs[job.the_id] + # The job may still be in the queue! + # There is no way, how to remove it, it will be skipped by the + # worker thread. + + @cmpi_logging.trace_method + def get_job_for_instance_id(self, instance_id, classname=None): + """ + Return Job for given InstanceID or None when no such Job exist. + + :param instance_id: (``string``) InstanceID value to parse. + :param classname: (``string``) Optional classname to parse the + InstanceID (e.g. when parsing InstanceID of + ``LMI_MethodResult``). + :rtype: ``Job`` + """ + if classname is None: + classname = self.job_classname + the_id = parse_instance_id(instance_id, classname) + if not the_id.isdigit(): + return None + return self.jobs.get(the_id, None) + + @cmpi_logging.trace_method + def _worker_main(self): + """ + This is the main loop of the job queue. It just processes enqueued + jobs and never ends. + """ + cmpi_logging.logger.info("Started Job thread.") + while True: + command = self.queue.get() + if isinstance(command, Job): + # we need to protect from changes between checking state and + # setting new state + job = command + job.lock() + if job.job_state == Job.STATE_QUEUED: + # the job was not cancelled + job.change_state(Job.STATE_RUNNING) + job.unlock() + cmpi_logging.logger.info("Starting job %s: '%s'" % + (job.the_id, job.job_name)) + + job.execute() + if job.error: + cmpi_logging.logger.warn("Job %s: '%s' finished with error:" + " %s" % (job.the_id, job.job_name, str(job.error))) + else: + cmpi_logging.logger.info("Job %s: '%s' finished OK" % + (job.the_id, job.job_name)) + else: + # just skip suspended and terminated jobs + job.unlock() + + elif isinstance(command, int): + self.queue.task_done() + break + + self.queue.task_done() + + cmpi_logging.logger.info("Stopped Job thread.") + + @cmpi_logging.trace_method + def get_next_id(self): + """ + Return next unused job id. + + :rtype: string + """ + self.last_instance_id += 1 + return str(self.last_instance_id) + + @cmpi_logging.trace_method + def get_job_instance(self, job): + """ + Return CIMInstance for given job. + + :param job: (``Job``) + :rtype: ``CIMInstance`` + """ + path = pywbem.CIMInstanceName( + classname=self.job_classname, + keybindings={'InstanceID': job.get_instance_id()}, + host=socket.gethostname(), + namespace=self.namespace) + inst = pywbem.CIMInstance(classname=self.job_classname, path=path) + inst['InstanceID'] = job.get_instance_id() + return self.job_provider.get_instance(None, inst) + + @cmpi_logging.trace_method + def shutdown(self, timeout=1): + """ + Stop the thread. If a job is running, it may leave the job process + (mkfs, resize2fs, ...) and the worker thread (waiting for the process to + finish) still running. + + JobManager still needs Indication Manager and TimeManager working at + this point! + + :param timeout: Nr. of seconds to wait for the current job. Afterwards + the thread is abandoned, leaving the process still running. + """ + # Empty the queue, we don't want the worker to proceed with any other + # queued job. + while not self.queue.empty(): + queue.get(False) + queue.task_done() + + self.queue.put(self.COMMAND_STOP) + self.worker.join(timeout) + + # Cancel all running/suspended/queued jobs. + # This will send indications. + for job in self.jobs.itervalues(): + if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED, + Job.STATE_RUNNING): + job.cancel() + + if self.worker.isAlive(): + # There is no way, how to stop the thread in Python, so abandon it. + self.worker.daemon = True + self.indication_manager = None + self.timer_manager = None + + def can_shutdown(self): + """ + Return True, if there is no running Job. + """ + return self.queue.empty() + + +class LMI_ConcreteJob(CIMProvider2): + """ + Provider of LMI_ConcreteJob class or its subclass. + """ + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """ + Provider implementation of EnumerateInstances intrinsic method. + """ + model.path.update({'InstanceID': None}) + for job in self.job_manager.jobs.values(): + model['InstanceID'] = job.get_instance_id() + if keys_only: + yield model + else: + yield self.get_instance(env, model, job) + + @cmpi_logging.trace_method + def get_job_states(self, job): + """ + Return JobState and OperationalStatus property values. + + :param job: (``int``) Job.STATE_* value. + :rtype: tuple ``(JobState, OperationalStatus)`` values. + """ + if job.job_state == Job.STATE_QUEUED: + jobstate = self.Values.JobState.New + opstate = [self.Values.OperationalStatus.Dormant] + elif job.job_state == Job.STATE_RUNNING: + jobstate = self.Values.JobState.Running + opstate = [self.Values.OperationalStatus.OK] + elif job.job_state == Job.STATE_FINISHED_OK: + jobstate = self.Values.JobState.Completed + opstate = [self.Values.OperationalStatus.OK, + self.Values.OperationalStatus.Completed] + elif job.job_state == Job.STATE_SUSPENDED: + jobstate = self.Values.JobState.Suspended + opstate = [self.Values.OperationalStatus.OK] + elif job.job_state == Job.STATE_FAILED: + jobstate = self.Values.JobState.Exception + opstate = [self.Values.OperationalStatus.Error, + self.Values.OperationalStatus.Completed] + elif job.job_state == Job.STATE_TERMINATED: + jobstate = self.Values.JobState.Terminated + opstate = [self.Values.OperationalStatus.Stopped] + return jobstate, opstate + + @cmpi_logging.trace_method + # pylint: disable-msg=W0221 + def get_instance(self, env, model, job=None): + """ + Provider implementation of GetInstance intrinsic method. + """ + if not job: + instance_id = model['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + model['DeleteOnCompletion'] = job.delete_on_completion + model['Name'] = job.job_name + + # convert seconds to timedelta + seconds = job.time_before_removal + if seconds: + delta = timedelta(seconds=seconds) + model['TimeBeforeRemoval'] = pywbem.CIMDateTime(delta) + else: + model['TimeBeforeRemoval'] = pywbem.CIMProperty( + name='TimeBeforeRemoval', + value=None, + type='datetime') + + if job.clocktime_of_last_state_change: + model['TimeOfLastStateChange'] = pywbem.CIMDateTime( + job.clocktime_of_last_state_change) + else: + model['TimeOfLastStateChange'] = pywbem.CIMProperty( + name='TimeOfLastStateChange', + value=None, + type='datetime') + + if job.elapsed_time: + elapsed_time = timedelta(seconds=job.elapsed_time) + model['ElapsedTime'] = pywbem.CIMDateTime(elapsed_time) + else: + model['ElapsedTime'] = pywbem.CIMProperty( + name='ElapsedTime', + value=None, + type='datetime') + + model['Description'] = job.job_name + model['LocalOrUtcTime'] = self.Values.LocalOrUtcTime.UTC_Time + model['PercentComplete'] = pywbem.Uint16(job.percent_complete) + if job.start_clocktime: + model['StartTime'] = pywbem.CIMDateTime(job.start_clocktime) + else: + model['StartTime'] = pywbem.CIMProperty( + name='StartTime', + value=None, + type='datetime') + + if job.input_arguments: + model['JobInParameters'] = job.get_method_params( + "__JobInParameters", True, False) + + if job.job_state in Job.FINAL_STATES: + # assemble output parameters with return value + outparams = job.get_method_params("__JobOutParameters", False, True) + if job.return_value is not None: + outparams['__ReturnValue'] = job.return_value + model['JobOutParameters'] = outparams + + model['TimeSubmitted'] = pywbem.CIMDateTime(job.time_submitted) + # set correct state + jobstate, opstate = self.get_job_states(job) + model['JobState'] = jobstate + model['OperationalStatus'] = opstate + return model + + @cmpi_logging.trace_method + def set_instance(self, env, instance, modify_existing): + """Return a newly created or modified instance. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param instance: The new pywbem.CIMInstance. If modifying an existing + instance, the properties on this instance have been filtered by + the PropertyList from the request. + :param modify_existing: True if ModifyInstance, False if CreateInstance + + Return the new instance. The keys must be set on the new instance. + """ + if not modify_existing: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "Creation of Job instances is not supported.") + + job = self.job_manager.get_job_for_instance_id(instance['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + try: + job.lock() + restart_timer = False + + for (key, value) in instance.iteritems(): + if value is None: + continue + if key == 'DeleteOnCompletion': + job.delete_on_completion = value + restart_timer = True + elif key == 'TimeBeforeRemoval': + job.time_before_removal = value.total_seconds() + restart_timer = True + elif key == 'JobRunTimes': + if value != 1: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "JobRunTimes property is not supported.") + elif key == 'LocalOrUtcTime': + if value != self.Values.LocalOrUtcTime.UTC_Time: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "Setting of LocalOrUtcTime property is not" + " supported.") + else: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "Setting of %s property is not supported." % (key,)) + + if restart_timer: + job._restart_timer() + finally: + job.unlock() + return instance + + @cmpi_logging.trace_method + def delete_instance(self, env, instance_name): + """Delete an instance. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param instance_name: A pywbem.CIMInstanceName specifying the instance + to delete. + """ + job = self.job_manager.get_job_for_instance_id( + instance_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + if not job.job_status in Job.FINAL_STATES: + raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, + "Job has not finished.") + + self.job_manager.remove_job(job) + + @cmpi_logging.trace_method + def cim_method_geterrors(self, env, object_name): + """Implements LMI_StorageJob.GetErrors() + + If JobState is "Completed" and Operational Status is "Completed" + then no instance of CIM_Error is returned. + + If JobState is "Exception" then GetErrors may return intances of + CIM_Error related to the execution of the procedure or method invoked by + the job. + + If Operatational Status is not "OK" or "Completed" then + GetErrors may return CIM_Error instances related to the running of + the job. + + :param env: -- Provider Environment (pycimmb.ProviderEnvironment) + :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName + specifying the object on which the method GetErrors() + should be invoked. + + Output parameters: + + * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...)) + If the OperationalStatus on the Job is not "OK", then this + method will return one or more CIM Error instance(s). + Otherwise, when the Job is "OK", null is returned. + """ + job = self.job_manager.get_job_for_instance_id( + object_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + if job.error is None: + errors = [] + else: + err = job.get_cim_error() + errors = [err, ] + out_params = [ + pywbem.CIMParameter( + name='Errors', + value=errors, + type='instance', + is_array=True, + array_size=len(errors)) + ] + rval = self.Values.GetErrors.Success + + return (rval, out_params) + + @cmpi_logging.trace_method + def cim_method_requeststatechange(self, env, object_name, + param_requestedstate=None, + param_timeoutperiod=None): + """Implements LMI_StorageJob.RequestStateChange() + + Requests that the state of the job be changed to the value + specified in the RequestedState parameter. Invoking the + RequestStateChange method multiple times could result in earlier + requests being overwritten or lost. + + If 0 is returned, then the + task completed successfully. Any other return code indicates an + error condition. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName + specifying the object on which the method RequestStateChange() + should be invoked. + :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState) + RequestStateChange changes the state of a job. The possible + values are as follows: Start (2) changes the state to + \'Running\'. Suspend (3) stops the job temporarily. The + intention is to subsequently restart the job with \'Start\'. + It might be possible to enter the \'Service\' state while + suspended. (This is job-specific.) Terminate (4) stops the + job cleanly, saving data, preserving the state, and shutting + down all underlying processes in an orderly manner. Kill (5) + terminates the job immediately with no requirement to save + data or preserve the state. Service (6) puts the job into a + vendor-specific service state. It might be possible to restart + the job. + + :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime) + A timeout period that specifies the maximum amount of time that + the client expects the transition to the new state to take. + The interval format must be used to specify the TimeoutPeriod. + A value of 0 or a null parameter indicates that the client has + no time requirements for the transition. If this property + does not contain 0 or null and the implementation does not + support this parameter, a return code of \'Use Of Timeout + Parameter Not Supported\' must be returned. + """ + job = self.job_manager.get_job_for_instance_id( + object_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + try: + job.lock() + states = self.Values.RequestStateChange.RequestedState + retcodes = self.Values.RequestStateChange + if param_requestedstate == states.Suspend: + if job.job_state != Job.STATE_QUEUED: + # Can suspend only queued jobs + rval = retcodes.Invalid_State_Transition + else: + job.change_state(Job.STATE_SUSPENDED) + rval = retcodes.Completed_with_No_Error + + elif param_requestedstate == states.Terminate: + if job.job_state not in (Job.STATE_QUEUED, Job.STATE_SUSPENDED): + # Can terminate only queued or suspended jobs + rval = retcodes.Invalid_State_Transition + else: + job.cancel() + rval = retcodes.Completed_with_No_Error + + elif param_requestedstate == states.Start: + if job.job_state != Job.STATE_SUSPENDED: + # Can start only suspended jobs + rval = retcodes.Invalid_State_Transition + else: + job.change_state(Job.STATE_QUEUED) + # Enqueue the job again, it may be already processed + # (we might get the job in the queue twice, but + # we have only one worker thread so it won't collide). + self.job_manager.add_job(job) + rval = retcodes.Completed_with_No_Error + + else: + rval = retcodes.Invalid_State_Transition + finally: + job.unlock() + return (rval, []) + + @cmpi_logging.trace_method + def cim_method_killjob(self, env, object_name, + param_deleteonkill=None): + """Implements LMI_StorageJob.KillJob() """ + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) + + @cmpi_logging.trace_method + def cim_method_geterror(self, env, object_name): + """Implements LMI_StorageJob.GetError() + + GetError is deprecated because Error should be an array,not a + scalar. + + When the job is executing or has terminated without + error, then this method returns no CIM_Error instance. However, if + the job has failed because of some internal problem or because the + job has been terminated by a client, then a CIM_Error instance is + returned. + + :param env: Provider Environment (pycimmb.ProviderEnvironment) + :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName + specifying the object on which the method GetError() + should be invoked. + + Output parameters: + + * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``) + If the OperationalStatus on the Job is not "OK", then this + method will return a CIM Error instance. Otherwise, when the + Job is "OK", null is returned. + """ + job = self.job_manager.get_job_for_instance_id( + object_name['InstanceID']) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + if job.error is None: + error = pywbem.CIMParameter( + name='error', + value=None, + type='instance', + is_array=False) + else: + err = job.get_cim_error() + error = pywbem.CIMParameter( + name='error', + value=err, + type='instance') + rval = self.Values.GetError.Success + return (rval, [error]) + + class Values(object): + class JobState(object): + New = pywbem.Uint16(2) + Starting = pywbem.Uint16(3) + Running = pywbem.Uint16(4) + Suspended = pywbem.Uint16(5) + Shutting_Down = pywbem.Uint16(6) + Completed = pywbem.Uint16(7) + Terminated = pywbem.Uint16(8) + Killed = pywbem.Uint16(9) + Exception = pywbem.Uint16(10) + Service = pywbem.Uint16(11) + Query_Pending = pywbem.Uint16(12) + # DMTF_Reserved = 13..32767 + # Vendor_Reserved = 32768..65535 + + class LocalOrUtcTime(object): + Local_Time = pywbem.Uint16(1) + UTC_Time = pywbem.Uint16(2) + + class OperationalStatus(object): + Unknown = pywbem.Uint16(0) + Other = pywbem.Uint16(1) + OK = pywbem.Uint16(2) + Degraded = pywbem.Uint16(3) + Stressed = pywbem.Uint16(4) + Predictive_Failure = pywbem.Uint16(5) + Error = pywbem.Uint16(6) + Non_Recoverable_Error = pywbem.Uint16(7) + Starting = pywbem.Uint16(8) + Stopping = pywbem.Uint16(9) + Stopped = pywbem.Uint16(10) + In_Service = pywbem.Uint16(11) + No_Contact = pywbem.Uint16(12) + Lost_Communication = pywbem.Uint16(13) + Aborted = pywbem.Uint16(14) + Dormant = pywbem.Uint16(15) + Supporting_Entity_in_Error = pywbem.Uint16(16) + Completed = pywbem.Uint16(17) + Power_Mode = pywbem.Uint16(18) + Relocating = pywbem.Uint16(19) + # DMTF_Reserved = .. + # Vendor_Reserved = 0x8000.. + + class GetErrors(object): + Success = pywbem.Uint32(0) + Not_Supported = pywbem.Uint32(1) + Unspecified_Error = pywbem.Uint32(2) + Timeout = pywbem.Uint32(3) + Failed = pywbem.Uint32(4) + Invalid_Parameter = pywbem.Uint32(5) + Access_Denied = pywbem.Uint32(6) + # DMTF_Reserved = .. + # Vendor_Specific = 32768..65535 + + class GetError(object): + Success = pywbem.Uint32(0) + Not_Supported = pywbem.Uint32(1) + Unspecified_Error = pywbem.Uint32(2) + Timeout = pywbem.Uint32(3) + Failed = pywbem.Uint32(4) + Invalid_Parameter = pywbem.Uint32(5) + Access_Denied = pywbem.Uint32(6) + # DMTF_Reserved = .. + # Vendor_Specific = 32768..65535 + + class RequestStateChange(object): + Completed_with_No_Error = pywbem.Uint32(0) + Not_Supported = pywbem.Uint32(1) + Unknown_Unspecified_Error = pywbem.Uint32(2) + Can_NOT_complete_within_Timeout_Period = pywbem.Uint32(3) + Failed = pywbem.Uint32(4) + Invalid_Parameter = pywbem.Uint32(5) + In_Use = pywbem.Uint32(6) + # DMTF_Reserved = .. + Method_Parameters_Checked___Transition_Started = pywbem.Uint32(4096) + Invalid_State_Transition = pywbem.Uint32(4097) + Use_of_Timeout_Parameter_Not_Supported = pywbem.Uint32(4098) + Busy = pywbem.Uint32(4099) + # Method_Reserved = 4100..32767 + # Vendor_Specific = 32768..65535 + class RequestedState(object): + Start = pywbem.Uint16(2) + Suspend = pywbem.Uint16(3) + Terminate = pywbem.Uint16(4) + Kill = pywbem.Uint16(5) + Service = pywbem.Uint16(6) + # DMTF_Reserved = 7..32767 + # Vendor_Reserved = 32768..65535 + +class LMI_OwningJobElement(CIMProvider2): + """ Instrumentation of LMI_OwningJobElement class and its subclasses.""" + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def get_instance(self, env, model): + """Return an instance.""" + instance_id = model['OwnedElement']['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "OwnedElement not found.") + + if job.owning_element != model['OwningElement']: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "OwnedElement is not associated to OwningElement.") + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'OwnedElement': None, 'OwningElement': None}) + for job in self.job_manager.jobs.values(): + if job.owning_element: + model['OwnedElement'] = job.get_name() + model['OwningElement'] = job.owning_element + yield model + + @cmpi_logging.trace_method + def references(self, env, object_name, model, result_class_name, role, + result_role, keys_only): + """Instrument Associations.""" + ch = env.get_cimom_handle() + if ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super='CIM_ManagedElement') or \ + ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.job_classname): + return self.simple_refs(env, object_name, model, + result_class_name, role, result_role, keys_only) + +class LMI_AffectedJobElement(CIMProvider2): + """ Instrumentation of LMI_AffectedJobElement class and its subclasses.""" + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def get_instance(self, env, model): + """Return an instance.""" + instance_id = model['AffectingElement']['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "AffectingElement not found.") + + if job.affected_elements is None: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "The AffectingElement has no AffectedElement.") + if model['AffectedElement'] not in job.affected_elements: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "AffectedElement is not associated to AffectingElement.") + model['ElementEffects'] = [self.Values.ElementEffects.Unknown, ] + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'AffectingElement': None, 'AffectedElement': None}) + for job in self.job_manager.jobs.values(): + if job.affected_elements is None: + continue + for element in job.affected_elements: + model['AffectingElement'] = job.get_name() + model['AffectedElement'] = element + if keys_only: + yield model + else: + yield self.get_instance(env, model) + + @cmpi_logging.trace_method + def references(self, env, object_name, model, result_class_name, role, + result_role, keys_only): + """Instrument Associations.""" + ch = env.get_cimom_handle() + if ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super='CIM_ManagedElement') or \ + ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.job_classname): + return self.simple_refs(env, object_name, model, + result_class_name, role, result_role, keys_only) + + class Values(object): + class ElementEffects(object): + Unknown = pywbem.Uint16(0) + Other = pywbem.Uint16(1) + Exclusive_Use = pywbem.Uint16(2) + Performance_Impact = pywbem.Uint16(3) + Element_Integrity = pywbem.Uint16(4) + Create = pywbem.Uint16(5) + + +class LMI_MethodResult(CIMProvider2): + """Instrumentation of LMI_MethodResult class and its subclasses.""" + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + # pylint: disable-msg=W0221 + def get_instance(self, env, model, job=None): + """Return an instance.""" + if not job: + instance_id = model['InstanceID'] + job = self.job_manager.get_job_for_instance_id( + instance_id, self.classname) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + model['Description'] = job.job_name + if job.job_state in Job.FINAL_STATES: + model['PostCallIndication'] = pywbem.CIMProperty( + name='PostCallIndication', + value=job.get_post_call()) + else: + model['PostCallIndication'] = pywbem.CIMProperty( + name='PostCallIndication', + type='instance', + value=None) + model['PreCallIndication'] = pywbem.CIMProperty( + name='PreCallIndication', + value=job.get_pre_call()) + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'InstanceID': None}) + for job in self.job_manager.jobs.values(): + model['InstanceID'] = job.get_instance_id( + classname=self.classname) + if keys_only: + yield model + else: + yield self.get_instance(env, model, job) + +class LMI_AssociatedJobMethodResult(CIMProvider2): + """ + Instrumentation of LMI_AssociatedJobMethodResult class and its + subclasses. + """ + + @cmpi_logging.trace_method + def __init__(self, classname, job_manager): + self.classname = classname + self.job_manager = job_manager + + @cmpi_logging.trace_method + def get_instance(self, env, model): + """Return an instance.""" + instance_id = model['Job']['InstanceID'] + job = self.job_manager.get_job_for_instance_id(instance_id) + if not job: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job not found.") + + expected_result_id = job.get_instance_id( + classname=self.job_manager.method_result_classname) + if model['JobParameters']['InstanceID'] != expected_result_id: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Job is not associated to JobParameters.") + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances.""" + model.path.update({'JobParameters': None, 'Job': None}) + for job in self.job_manager.jobs.values(): + if job.owning_element: + model['Job'] = job.get_name() + model['JobParameters'] = pywbem.CIMInstanceName( + classname=self.job_manager.method_result_classname, + namespace=self.job_manager.namespace, + keybindings={ + 'InstanceID': job.get_instance_id( + classname=self.job_manager.method_result_classname) + }) + yield model + + @cmpi_logging.trace_method + def references(self, env, object_name, model, result_class_name, role, + result_role, keys_only): + """Instrument Associations.""" + ch = env.get_cimom_handle() + if ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.method_result_classname) or \ + ch.is_subclass(object_name.namespace, + sub=object_name.classname, + super=self.job_manager.job_classname): + return self.simple_refs(env, object_name, model, + result_class_name, role, result_role, keys_only) + diff --git a/src/python/lmi/providers/TimerManager.py b/src/python/lmi/providers/TimerManager.py new file mode 100644 index 0000000..76b2f5b --- /dev/null +++ b/src/python/lmi/providers/TimerManager.py @@ -0,0 +1,422 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek +# -*- coding: utf-8 -*- +""" +Module with functionality to create timers, which can be used in CMPI providers. + +Default python threading.Timer is not suitable, because it creates thread +for each timer, which is inefficient. In addition, each such thread would need +to be registered at CIMOM to enable logging in timer callbacks. + +Usage: + +1. Initialize the TimerManager when your provider initializes! +Otherwise you may encounter weird exceptions. + +2. When any provider needs timer, create it using Time.create_timer() to create +Timer instance. + +3. Call Timer.start() to start the timer. It will call registered callback +when the timer expires. The callback is called in context of TimerManager +thread, which has enabled logging to CIMOM, i.e. the callback can log as usual. + +4. (optionally) cancel the timer before expiration using Timer.cancel(). +However, this does not guarantee that the timer callback won't be called - +it may be already being scheduled / called. + +.. autoclass:: TimerManager + :members: + +.. autoclass:: Timer + :members: + +.. autoclass:: MonotonicClock + :members: +""" + +import ctypes +import threading +import Queue + +from lmi.base import singletonmixin +from lmi.providers import cmpi_logging + +class TimerException(Exception): + pass + +class MonotonicClock(object): + """ + Monotonic clock, represented by clock_gettime() and CLOCK_MONOTONIC. + This clock is not influenced by NTP or administrator setting time or date. + """ + CLOCK_MONOTONIC = ctypes.c_int(1) + + class timespec(ctypes.Structure): + _fields_ = [ + ("tv_sec", ctypes.c_long), + ("tv_nsec", ctypes.c_long)] + + def __init__(self): + libc = ctypes.CDLL("librt.so.1") + self._clock_gettime = libc.clock_gettime + + def now(self): + """ + Return current time, i.e. float representing seconds with precision up + to nanoseconds (depends on glibc). The actual value of current time is + meaningless, it can be used only to measure time differences. + + :returns: ``float`` with current time in seconds. + """ + t = MonotonicClock.timespec(0, 0) + ret = self._clock_gettime(self.CLOCK_MONOTONIC, ctypes.pointer(t)) + + if ret < 0: + raise TimerException("Cannot get clock time, clock_gettime() failed.") + return t.tv_sec + t.tv_nsec * 10 ** (-9) + +class Timer(object): + """ + A class representing a timer. A timer has a timeout and after the timeout, + given callback is called and the timer is deleted. + """ + + @cmpi_logging.trace_method + def __init__(self, timer_manager, name, callback=None, *args, **kwargs): + """ + Create a timer. If specified, given callback is registered. + The callback is called with *args and **kwargs. + + :param timer_manager: (``TimerManager)`` Instance of the timer manager + which will manage the timer. + :param name: (``string``) Name of the timer, used for logging. + :param callback: (``function``) Callback to call when the timer expires. + :param *args, **kwargs: Parameters of the callback. + """ + self._mgr = timer_manager + self._name = name + self._callback = callback + self._args = args + self._kwargs = kwargs + + cmpi_logging.logger.trace_info("Timer: Timer %s created" % name) + + @cmpi_logging.trace_method + def set_callback(self, callback, *args, **kwargs): + """ + Set callback to call when the timer expires. + + :param callback: (``function``) Callback to call when the timer expires. + :param *args, **kwargs: Parameters of the callback. + """ + self._callback = callback + self._args = args + self._kwargs = kwargs + + @cmpi_logging.trace_method + def start(self, timeout): + """ + Start the timer with given timeout. After the timeout, the registered + callback will be called. + + :param timeout: (``float``) Timeout in seconds. + """ + + self._timeout = timeout + now = self._mgr.now() + self._end_time = now + timeout + cmpi_logging.logger.trace_info( + "Timer: Timer %s started at %f for %f seconds" + % (self._name, now, self._timeout)) + self._mgr._add_timer(self) + + @cmpi_logging.trace_method + def cancel(self): + """ + Cancel the timer. This method does not guarantee that the callback won't + be called, the timer might be calling the callback right now, + """ + cmpi_logging.logger.trace_info("Timer: Timer %s cancelled" + % (self._name)) + self._mgr._remove_timer(self) + + @cmpi_logging.trace_method + def _expired(self, now): + """ + Returns True, if the timer is expired. + + :param now: (``float``) Current time, as returned by MonotonicClock.now(). + :returns: (``boolean``) ``True``, if the timer is expired. + """ + if self._end_time <= now: + cmpi_logging.logger.trace_info("Timer: Timer %s has expired" + % (self._name)) + return True + return False + + @cmpi_logging.trace_method + def _expire(self): + """ + Called when the timer expired. It calls the callback. + """ + cmpi_logging.logger.trace_info("Timer: Calling callback for timer %s" + % (self._name)) + self._callback(*self._args, **self._kwargs) + +class TimerManager(singletonmixin.Singleton): + """ + Manages set of timers. + + Python standard Timer class creates a thread for + + each timer, which is inefficient. This class uses only one thread, which + is registered at CIMOM, i.e. it can log as usual. + + This class is singleton, use TimerManager.get_instance() to get the + instance. + + Still, the singleton needs to be initialized with ProviderEnvironment to + enable logging in the timer thread. Use TimerManager.get_instance(env) in + you provider initialization. + """ + + # Commands to the timer thread + COMMAND_STOP = 1 + COMMAND_RESCHEDULE = 2 + + @cmpi_logging.trace_method + def __init__(self, env=None): + """ + Initialize new thread manager. + + :param env: (``ProviderEnvironment``) Environment to use for logging. + """ + self._clock = MonotonicClock() + self._lock = threading.RLock() + self._queue = Queue.Queue() + + # Array of timers. Assumption: nr. of timers is relatively small, + # i.e. hundreds at the worst. + self._timers = [] + + new_broker = None + if env: + broker = env.get_cimom_handle() + new_broker = broker.PrepareAttachThread() + + self._timer_thread = threading.Thread( + target=self._timer_loop, args=(new_broker,)) + self._timer_thread.daemon = False + self._timer_thread.start() + + def create_timer(self, name, callback=None, *args, **kwargs): + """ + Create new timer. If specified, given callback is registered. + The callback is called with *args and **kwargs. + + :param name: (``string``) Name of the timer, used for logging. + :param callback: (``function``) Callback to call when the timer expires. + :param *args, **kwargs: Parameters of the callback. + """ + return Timer(self, name, callback, *args, **kwargs) + + def _timer_loop(self, broker): + """ + TimerManager thread main loop. It waits for timeout of all timers + and calls their callbacks. + + :param broker: (``BrokerCIMOMHandle``) CIM broker handle, used for + logging. + """ + if broker: + broker.AttachThread() + cmpi_logging.logger.info("Started Timer thread.") + while True: + self._handle_expired() + timeout = self._find_timeout() + if timeout != 0: + # Wait for the timeout or any change in timers. + try: + command = self._queue.get(timeout=timeout) + self._queue.task_done() + if command == self.COMMAND_STOP: + break # stop the thread + # process COMMAND_RESCHEDULE in next loop + except Queue.Empty: + # Timeout has happened, ignore the exception. + pass + cmpi_logging.logger.info("Stopped Timer thread.") + + @cmpi_logging.trace_method + def _handle_expired(self): + """ + Finds all expired timers, calls their callback and removes them from + list of timers. + """ + + # Get list of expired timers. + with self._lock: + now = self.now() + cmpi_logging.logger.trace_info( + "Timer: Checking for expired, now=%f." % (now)) + expired = [t for t in self._timers if t._expired(now)] + + # Call the callbacks (unlocked!). + for t in expired: + t._expire() + + # Remove the timers (locked). + with self._lock: + for t in expired: + try: + cmpi_logging.logger.trace_info( + "Timer: Removing %s" % (t._name)) + self._timers.remove(t) + except ValueError: + # The timer has already been removed. + pass + + @cmpi_logging.trace_method + def _find_timeout(self): + """ + Return nearest timeout, in seconds (as float, i.e. subsecond timeout + is possible). If no timer is scheduled, None is returned. + If there are expired timers, 0 is returned. + + :returns: Positive ``float``: Nearest timeout. + :returns: ``0``: Some timer has expired. + :returns: ``None``: No timer is scheduled. + """ + with self._lock: + if not self._timers: + cmpi_logging.logger.trace_info( + "Timer: No timers scheduled, waiting forever.") + return None + closest = min(self._timers, key=lambda timer: timer._end_time) + now = self.now() + timeout = closest._end_time - now + if timeout > 0: + cmpi_logging.logger.trace_info( + "Timer: Waiting for %f seconds, now=%f." + % (timeout, now)) + return timeout + cmpi_logging.logger.trace_info( + "Timer: Some timer has already expired, no waiting.") + return 0 + + @cmpi_logging.trace_method + def _add_timer(self, timer): + """ + Adds timer to list of timers. The timer must be started, i.e. its + timeout must be nozero! + This is internal method called by Timer.start(). + + :param timer: (``Timer``) Timer to add. + """ + with self._lock: + self._timers.append(timer) + # Wake up the timer manager thread. + self._queue.put(self.COMMAND_RESCHEDULE) + cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name)) + + @cmpi_logging.trace_method + def _remove_timer(self, timer): + """ + Remove timer from list of timers. + This is internal method called by Timer.cancel(). + :param timer: (``Timer``) Timer to remove. + """ + with self._lock: + try: + self._timers.remove(timer) + except ValueError: + pass + # Wake up the timer manager thread. + self._queue.put(self.COMMAND_RESCHEDULE) + cmpi_logging.logger.trace_info("Timer: Timer %s removed" + % (timer._name)) + + def now(self): + """ + Return current time, not influenced by NTP or admin setting date or + time. The actual value of current time is meaningless, it can be used + only to measure time differences. + + :returns: ``float`` Current time, in seconds. + """ + return self._clock.now() + + @cmpi_logging.trace_method + def shutdown(self): + """ + Stop the thread. This method blocks until the thread is safely + destroyed. + """ + self._queue.put(self.COMMAND_STOP) + self._timer_thread.join() + +if __name__ == "__main__": + cmpi_logging.logger = cmpi_logging.CMPILogger("") + import time + + class Env(object): + def AttachThread(self): + pass + def PrepareAttachThread(self): + return self + def get_cimom_handle(self): + return self + + clock = MonotonicClock() + + start = clock.now() + time.sleep(0.5) + print "Clock 0.5:", clock.now() - start + + time.sleep(0.5) + print "Clock 1:", clock.now() - start + + mgr = TimerManager.get_instance(Env()) + + def callback(msg): + if callback.first: + t = mgr.create_timer("internal 0.5") + t.set_callback(callback, "internal 0.5") + t.start(0.5) + callback.first = False + + print clock.now(), msg + + callback.first = True + + t1 = mgr.create_timer("one second") + t1.set_callback(callback, "1") + t1.start(1) + t2 = mgr.create_timer("two seconds") + t2.set_callback(callback, "2") + t2.start(2) + t22 = mgr.create_timer("two seconds 2") + t22.set_callback(callback, "2 again") + t22.start(2) + t15 = mgr.create_timer("one+half seconds") + t15.set_callback(callback, "1.5") + t15.start(1.5) + + time.sleep(4) + + mgr.stop_thread() diff --git a/src/python/lmi/providers/__init__.py b/src/python/lmi/providers/__init__.py new file mode 100644 index 0000000..baebcdb --- /dev/null +++ b/src/python/lmi/providers/__init__.py @@ -0,0 +1,42 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar +# + +""" +Common utilities for OpenLMI python providers. +""" +def parse_instance_id(instance_id, classname=None): + """ + Parse InstanceID, check it has LMI:: format and return + the ID. Return None if the format is bad. + :param instance_id: (``string``) String to parse. + :param classname: (``string``) Name of class, whose InstanceID we parse. + If the classname is None, it won't be checked. + :returns: ``string`` with the ID. + """ + parts = instance_id.split(":", 2) + if len(parts) != 3: + return None + if parts[0] != "LMI": + return None + real_classname = parts[1] + if classname and real_classname.lower() != classname.lower(): + return None + return parts[2] diff --git a/src/python/lmi/providers/cmpi_logging.py b/src/python/lmi/providers/cmpi_logging.py new file mode 100644 index 0000000..a97e4ab --- /dev/null +++ b/src/python/lmi/providers/cmpi_logging.py @@ -0,0 +1,204 @@ +# -*- Coding:utf-8 -*- +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek + + +import logging +import inspect +import traceback + +TRACE_WARNING = logging.INFO - 1 +TRACE_INFO = logging.INFO - 2 +TRACE_VERBOSE = logging.DEBUG + +class CMPILogHandler(logging.Handler): + """ + A handler class, which sends log messages to CMPI log. + """ + + def __init__(self, cmpi_logger, *args, **kwargs): + self.cmpi_logger = cmpi_logger + super(CMPILogHandler, self).__init__(*args, **kwargs) + + def emit(self, record): + msg = self.format(record) + if record.levelno >= logging.ERROR: + self.cmpi_logger.log_error(msg) + elif record.levelno >= logging.WARNING: + self.cmpi_logger.log_warn(msg) + elif record.levelno >= logging.INFO: + self.cmpi_logger.log_info(msg) + elif record.levelno >= TRACE_WARNING: + self.cmpi_logger.trace_warn(record.filename, msg) + elif record.levelno >= TRACE_INFO: + self.cmpi_logger.trace_info(record.filename, msg) + elif record.levelno >= logging.DEBUG: + self.cmpi_logger.trace_verbose(record.filename, msg) + +class CMPILogger(logging.getLoggerClass()): + """ + A logger class, which adds trace_method level log methods. + """ + def trace_warn(self, msg, *args, **kwargs): + """ Log message with TRACE_WARNING severity. """ + self.log(TRACE_WARNING, msg, *args, **kwargs) + + def trace_info(self, msg, *args, **kwargs): + """ Log message with TRACE_INFO severity. """ + self.log(TRACE_INFO, msg, *args, **kwargs) + + def trace_verbose(self, msg, *args, **kwargs): + """ Log message with TRACE_VERBOSE severity. """ + self.log(TRACE_VERBOSE, msg, *args, **kwargs) + +logging.setLoggerClass(CMPILogger) + +def trace_method(func): + """ Decorator, trace entry and exit for a class method. """ + classname = inspect.getouterframes(inspect.currentframe())[1][3] + def helper_func(*args, **kwargs): + """ + Helper function, wrapping real function by trace_method decorator. + """ + logger.log(TRACE_VERBOSE, "Entering %s.%s", classname, func.__name__) + try: + ret = func(*args, **kwargs) + except Exception as exc: + if getattr(exc, "tb_printed", False) is False: + logger.exception("full traceback") + logger.log(TRACE_VERBOSE, "traceback: %s", + traceback.format_exc()) + exc.tb_printed = True + logger.log(TRACE_WARNING, "%s.%s threw exception %s", + classname, func.__name__, str(exc)) + raise + logger.log(TRACE_VERBOSE, "Exiting %s.%s", classname, func.__name__) + return ret + helper_func.__name__ = func.__name__ + helper_func.__doc__ = func.__doc__ + helper_func.__module__ = func.__module__ + return helper_func + +def trace_function(func): + """ Decorator, trace entry and exit for a function outside any class. """ + def helper_func(*args, **kwargs): + """ + Helper function, wrapping real function by trace_method decorator. + """ + logger.log(TRACE_VERBOSE, "Entering %s.%s", + func.__module__, func.__name__) + try: + ret = func(*args, **kwargs) + except Exception as exc: + if getattr(exc, "tb_printed", False) is False: + logger.exception("full traceback") + logger.log(TRACE_VERBOSE, "traceback: %s", + traceback.format_exc()) + exc.tb_printed = True + logger.log(TRACE_WARNING, "%s.%s threw exception %s", + func.__module__, func.__name__, str(exc)) + raise + logger.log(TRACE_VERBOSE, "Exiting %s.%s", + func.__module__, func.__name__) + return ret + helper_func.__name__ = func.__name__ + helper_func.__doc__ = func.__doc__ + helper_func.__module__ = func.__module__ + return helper_func + +class LogManager(object): + """ + Class, which takes care of CMPI logging. + There should be only one instance of this class and it should be + instantiated as soon as possible, even before reading a config. + The config file can be provided later by set_config call. + """ + FORMAT_STDERR = '%(levelname)s: %(message)s' + FORMAT_CMPI = '%(levelname)s: %(message)s' + + LOGGER_NAME = "lmi.storage" + + def __init__(self, env): + """ + Initialize logging. + """ + formatter = logging.Formatter(self.FORMAT_CMPI) + + self.cmpi_handler = CMPILogHandler(env.get_logger()) + self.cmpi_handler.setLevel(logging.DEBUG) + self.cmpi_handler.setFormatter(formatter) + + self.logger = logging.getLogger(self.LOGGER_NAME) + self.logger.addHandler(self.cmpi_handler) + self.logger.setLevel(logging.INFO) + + self.stderr_handler = None + self.config = None + + global logger # IGNORE:W0603 + logger = self.logger + logger.info("CMPI log started") + + @trace_method + def set_config(self, config): + """ + Set a configuration of logging. It applies its setting immediately + and also subscribes for configuration changes. + """ + self.config = config + config.add_listener(self._config_changed) + # apply the config + self._config_changed(config) + + @trace_method + def _config_changed(self, config): + """ + Apply changed configuration, i.e. start/stop sending to stderr + and set appropriate log level. + """ + if config.tracing: + self.logger.setLevel(logging.DEBUG) + else: + self.logger.setLevel(logging.INFO) + if config.stderr: + # start sending to stderr + if not self.stderr_handler: + # create stderr handler + formatter = logging.Formatter(self.FORMAT_STDERR) + self.stderr_handler = logging.StreamHandler() + self.stderr_handler.setLevel(logging.DEBUG) + self.stderr_handler.setFormatter(formatter) + self.logger.addHandler(self.stderr_handler) + self.logger.info("Started logging to stderr.") + else: + # stop sending to stderr + if self.stderr_handler: + self.logger.info("Stopped logging to stderr.") + self.logger.removeHandler(self.stderr_handler) + self.stderr_handler = None + + def destroy(self): + if self.stderr_handler: + self.logger.removeHandler(self.stderr_handler) + self.stderr_handler = None + self.logger.removeHandler(self.cmpi_handler) + self.cmpi_handler = None + self.config.remove_listener(self._config_changed) + +logger = None diff --git a/src/python/setup.py b/src/python/setup.py index b36d455..89c7461 100644 --- a/src/python/setup.py +++ b/src/python/setup.py @@ -5,9 +5,9 @@ setup( author='Michal Minar', author_email='miminar@redhat.com', url='https://fedorahosted.org/openlmi/', - version='0.3', + version='0.4', namespace_packages = ['lmi'], - packages = ['lmi', 'lmi.common'], + packages = ['lmi', 'lmi.base', 'lmi.providers'], classifiers=[ 'license :: osi approved :: gnu lesser general public license v2 or later (lgplv2+)', 'operating system :: posix :: linux', -- cgit