diff options
Diffstat (limited to 'src/python')
| -rw-r--r-- | src/python/lmi/test/base.py | 384 | ||||
| -rw-r--r-- | src/python/lmi/test/cimbase.py | 362 | ||||
| -rw-r--r-- | src/python/lmi/test/lmibase.py | 169 | ||||
| -rw-r--r-- | src/python/lmi/test/util.py | 131 |
4 files changed, 696 insertions, 350 deletions
diff --git a/src/python/lmi/test/base.py b/src/python/lmi/test/base.py index eb00914..1522979 100644 --- a/src/python/lmi/test/base.py +++ b/src/python/lmi/test/base.py @@ -22,122 +22,36 @@ Base classes for *OpenLMI Provider* test cases. """ -import BaseHTTPServer import os import pywbem -import Queue -import random import socket -import threading -import time import unittest -JOB_CREATED = 4096 +from lmi.test import util -def get_environvar(variable, default='', convert=str): +def render_iname(iname, indent=2): """ - Get the value of environment variable. - - :param string variable: Name of environment variable. - :param default: Any value that should be returned when the variable is not - set. If None, the conversion won't be done. - :param callable convert: Function transforming value to something else. - :returns: Converted value of the environment variable. - """ - val = os.environ.get(variable, default) - if convert is bool: - return val.lower() in ('true', 'yes', 'on', '1') - if val is None: - return None - return convert(val) - -def mark_dangerous(method): - """ - Decorator for methods of :py:class:`unittest.TestCase` subclasses that - skips dangerous tests if an environment variable says so. - ``LMI_RUN_DANGEROUS`` is the environment variabled read. - - These tests will be skipped by default. + Render object path in human readable way. Result will occupy multiple + lines. The first line is a class name, which is not indented at all. Other + lines will be indented with *indent* spaces. + + :param iname: Object path to render. + :type iname: :py:class:`pywbem.CIMInstanceName` + :param integer ident: Number of spaces prefixing all lines but the first. + :returns: *iname* nicely rendered. + :rtype: string """ - if get_environvar('LMI_RUN_DANGEROUS', '0', bool): - return method - else: - return unittest.skip("This test is marked as dangerous.")(method) - -def mark_tedious(method): - """ - Decorator for methods of :py:class:`unittest.TestCase` subclasses that - skips tedious tests. Those running for very long time and usually need a - lot of memory. They are run by default. Environment variable - ``LMI_RUN_TEDIOUS`` can be used to skip them. - """ - if get_environvar('LMI_RUN_TEDIOUS', '1', bool): - return method - else: - return unittest.skip("This test is marked as tedious.")(method) - -def compare_cim_name(first, second): - """ - Compare two CIMInstanceNames. Their ``host`` property is not checked. - """ - if not isinstance(first, pywbem.CIMInstanceName): - raise TypeError("both arguments must be a pywbem.CIMInstanceName") - if not isinstance(second, pywbem.CIMInstanceName): - raise TypeError("both arguments must be a pywbem.CIMInstanceName") - return ( first.classname == second.classname - and first.namespace == second.namespace - and first.keybindings == second.keybindings) - -class CIMListener(object): - """ CIM Listener for indication events. """ - - class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): - """ Handler for the POST request from *CIMOM*. """ - def do_POST(self): - """ Handle the POST request. """ - data = self.rfile.read(int(self.headers['Content-Length'])) - ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) - # Get the instance from CIM-XML, copied from - # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial - insts = [x[1] for x in ttree[2][2][0][2][2]] - for inst in insts: - self.callback(inst) - self.send_response(200) - self.end_headers() - - def log_message(self, _fmt, *_args): - # suppress log messages - pass - - def __init__(self, callback, http_port=5988): - self.address = ('', http_port) - self.CIMHandler.callback = callback - self.thread = None - self.server = None - - def start(self): - """ Start listening. """ - BaseHTTPServer.HTTPServer.allow_reuse_address = True - self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) - self.thread = threading.Thread(target=self.server.serve_forever) - self.thread.start() - - def stop(self): - """ Stop listening. """ - if self.server is not None: - self.server.shutdown() - self.server.socket.close() - if self.thread is not None: - self.thread.join() - - def running(self): - """ - Are we listening for events? - :rtype: bool - """ - return self.thread is not None - -class LmiTestCase(unittest.TestCase): + lines = [ "%s" % iname.classname + , " "*indent + "namespace: %s" % iname.namespace + , " "*indent + "keys:"] + align = max([len(k) for k in iname.keybindings.iterkeys()]) + for key, value in iname.keybindings.iteritems(): + if isinstance(value, pywbem.CIMInstanceName): + value = render_iname(value, indent + 4) + lines.append(" "*indent + (" %%-%ds : %%s" % align) % (key, value)) + return "\n".join(lines) + +class BaseLmiTestCase(unittest.TestCase): """ Base class for all LMI test cases. """ @@ -145,25 +59,13 @@ class LmiTestCase(unittest.TestCase): #: Value used in ``SystemName`` key properties in various *CIM* instances. #: It's also used to fill ``CIM_ComputerySystem.Name`` property. SYSTEM_NAME = socket.gethostname() - #: Says, whether the test case needs indication listener running or not. - #: Each subclass shall override this property and set it to ``True`` if - #: it want to test indication events. - NEEDS_INDICATIONS = False - - @classmethod - def needs_indications(cls): - """ - Whether the indication listener should be started for this test case. - In subclasses override ``NEEDS_INDICATIONS`` property and set it to - ``True`` if indication testing is desired. - """ - return cls.NEEDS_INDICATIONS @classmethod def setUpClass(cls): #: Cached value of SystemCreationClassName set with #: ``LMI_CS_CLASSNAME`` environment variable. - cls.system_cs_name = os.environ.get("LMI_CS_CLASSNAME", "PG_ComputerSystem") + cls.system_cs_name = os.environ.get( + "LMI_CS_CLASSNAME", "PG_ComputerSystem") #: *URL* of *CIMOM* we connect to. Overriden with ``LMI_CIMOM_URL`` #: environment variable. cls.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989") @@ -180,44 +82,11 @@ class LmiTestCase(unittest.TestCase): #: Boolean value saying whether to run dangerous tests. These are marked #: with :py:func:`mark_dangerous` decorator. This is set with #: ``LMI_RUN_DANGEROUS`` environment variable. - cls.run_dangerous = get_environvar('LMI_RUN_DANGEROUS', '0', bool) + cls.run_dangerous = util.get_environvar('LMI_RUN_DANGEROUS', '0', bool) #: Boolean value saying whether to run tedious tests. These are marked #: with :py:func:`mark_tedious` decorator. This is set with #: ``LMI_RUN_TEDIOUS`` environment variable. - cls.run_tedious = get_environvar('LMI_RUN_TEDIOUS', '1', bool) - - #: Active connection to *CIMOM*. - cls.wbemconnection = pywbem.WBEMConnection(cls.url, - (cls.username, cls.password)) - - if cls.needs_indications(): - cls.indication_port = random.randint(12000, 13000) - cls.indication_queue = Queue.Queue() - cls.listener = CIMListener( - callback=cls._process_indication, - http_port=cls.indication_port) - - @classmethod - def tearDownClass(cls): - if cls.needs_indications(): - cls.listener.stop() - - @classmethod - def _start_listening(self): - """ Start listening for incoming indications. """ - self.listener.start() - - @classmethod - def _process_indication(self, indication): - """ Callback to process one indication.""" - self.indication_queue.put(indication) - - def setUp(self): - self.subscribed = {} - - def tearDown(self): - for name in self.subscribed.keys(): - self.unsubscribe(name) + cls.run_tedious = util.get_environvar('LMI_RUN_TEDIOUS', '1', bool) def assertRaisesCIM(self, cim_err_code, func, *args, **kwds): """ @@ -228,12 +97,15 @@ class LmiTestCase(unittest.TestCase): func(*args, **kwds) self.assertEqual(cim_err_code, cm.exception.args[0]) - def assertCIMNameEquals(self, first, second): + def assertCIMNameEqual(self, fst, snd, msg=None): """ - Compare two CIMInstanceNames. Their host properties are not checked. + Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their host + properties are not checked. """ - self.assertTrue(compare_cim_name(first, second), - "%s != %s" % (str(first), str(second))) + if msg is None: + msg = ( "%s\n\nis not equal to: %s" + % (render_iname(fst), render_iname(snd))) + self.assertTrue(util.check_inames_equal(fst, snd), msg) def assertCIMNameIn(self, name, candidates): """ @@ -241,7 +113,7 @@ class LmiTestCase(unittest.TestCase): set of candidates. It compares all properties but ``host``. """ for candidate in candidates: - if compare_cim_name(name, candidate): + if util.check_inames_equal(name, candidate): return self.assertTrue(False, 'name "%s" is not in candidates' % str(name)) @@ -257,191 +129,3 @@ class LmiTestCase(unittest.TestCase): snd_dict[key.lower()] = value self.assertEqual(fst_dict, snd_dict, msg) - def get_indication(self, timeout): - """ Wait for an indication for given nr. of seconds and return it.""" - try: - indication = self.indication_queue.get(timeout=timeout) - except Queue.Empty: - raise AssertionError("Timeout when waiting for indication") - self.indication_queue.task_done() - return indication - - def make_filter_iname(self, filter_name): - """ - Create an instance name of ``CIM_IndicationFilter``. - - :rtype: :py:class:`pywbem.CIMInstanceName` - """ - return pywbem.CIMInstanceName( - classname="CIM_IndicationFilter", - namespace="root/interop", - keybindings={ - 'CreationClassName': 'CIM_IndicationFilter', - 'SystemClassName': self.system_cs_name, - 'SystemName': self.SYSTEM_NAME, - 'Name': filter_name}) - - def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"): - """ - Create an instance of ``CIM_IndicationFilter``. - - :rtype: :py:class:`pywbem.CIMInstance` - """ - inst = pywbem.CIMInstance('CIM_IndicationFilter') - inst['CreationClassName'] = 'CIM_IndicationFilter' - inst['SystemCreationClassName'] = self.system_cs_name - inst['SystemName'] = self.SYSTEM_NAME - inst['Name'] = filter_name - inst['Query'] = query - inst['QueryLanguage'] = query_lang - inst['SourceNamespace'] = "root/cimv2" - inst.path = self.make_filter_iname(filter_name) - return inst - - def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): - """ - Create indication subscription for given filter name. - """ - if not self.needs_indications(): - raise Exception("can not subscribe to indications, enable them" - " with NEEDS_INDICATIONS") - - if query is not None: - # Create filter first - indfilter = self.wbemconnection.CreateInstance( - self.make_filter_inst(filter_name, query, querylang)) - else: - # the filter is already created, assemble its name - indfilter = self.make_filter_iname(filter_name) - - # create destination - destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') - destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' - destinst['SystemCreationClassName'] = self.system_cs_name - destinst['SystemName'] = self.SYSTEM_NAME - destinst['Name'] = filter_name - destinst['Destination'] = "http://localhost:%d" % (self.indication_port) - destinst['PersistenceType'] = pywbem.Uint16(3) # Transient - cop = pywbem.CIMInstanceName( - 'CIM_ListenerDestinationCIMXML', namespace="root/interop") - cop.keybindings = { - 'CreationClassName' : 'CIM_ListenerDestinationCIMXML', - 'SystemClassName' : self.system_cs_name, - 'SystemName' : self.SYSTEM_NAME, - 'Name' : filter_name } - destinst.path = cop - destname = self.wbemconnection.CreateInstance(destinst) - - # create the subscription - subinst = pywbem.CIMInstance( - 'CIM_IndicationSubscription') - subinst['Filter'] = indfilter - subinst['Handler'] = destname - cop = pywbem.CIMInstanceName( - 'CIM_IndicationSubscription', namespace="root/interop") - cop.keybindings = { - 'Filter': indfilter, - 'Handler': destname } - subinst.path = cop - subscription = self.wbemconnection.CreateInstance(subinst) - - self.subscribed[filter_name] = [subscription, destname] - - if not self.listener.running(): - self._start_listening() - return subscription - - def unsubscribe(self, filter_name): - """ - Unsubscribe from given filter. - """ - if not self.needs_indications(): - raise Exception("can not unsubscribe to indications, enable them" - " with NEEDS_INDICATIONS") - for instance in self.subscribed.pop(filter_name): - self.wbemconnection.DeleteInstance(instance) - - def finish_job(self, - jobname, - assoc_class, - return_constructor=int): - """ - Wait until the job finishes and return ``(ret, outparams)``just as - ``InvokeMethod`` would. - - It's hard to reconstruct these outparams, since the embedded - instances / ojects do not work in our ``CIMOMS``, therefore special - care is needed. - - :param jobname: Name of the job. - :type jobname: :py:class:`pywbem.CIMInstanceName` - :param callable return_constructor: Callable, which converts - string to the right type, for example ``int``. - :returns: ``(retvalue, outparams)`` in the same way as - ``finish_method()`` would. - :rtype: tuple - """ - # Use busy loop for now - # TODO: rework to something sane - while True: - job = self.wbemconnection.GetInstance(jobname) - if job['JobState'] > 5: # all states higher than 5 are final - break - time.sleep(0.1) - - # get the MethodResult - resultname = self.wbemconnection.AssociatorNames( - jobname, - AssocClass=assoc_class)[0] - result = self.wbemconnection.GetInstance(resultname) - ind = result['PostCallIndication'] - # check for error - if ind['Error'] is not None: - err = ind['Error'][0] - code = err['CIMStatusCode'] - msg = err['Message'] - raise pywbem.CIMError(code, msg) - - ret = return_constructor(ind['ReturnValue']) - - # convert output parameters to format returned by InvokeMethod - outparams = pywbem.NocaseDict() - try: - params = ind['MethodParameters'] - except KeyError: - params = {} - if params: - for (key, value) in params.iteritems(): - outparams[key] = value - - return (ret, outparams) - - def invoke_async_method(self, - method_name, - object_name, - return_constructor=int, - *args, **kwargs): - """ - Invoke a method and if it returns a job, wait for the job. - - :param string method_name: Name of the method. - :param object_name: Instance, on which the method should be invoked. - :type object_name: :py:class:`pywbem.CIMInstanceName` - :param callable return_constructor: Callable, which converts - string to the right type, for example ``int``. - :param list args: Positional arguments passed to invoked method. - :param dictionary kwargs: Keyword arguments passed to invoked method. - :returns: ``(retvalue, outparams)`` in the same way as - ``finish_method()`` would. - :rtype: tuple - """ - (ret, outparams) = self.wbemconnection.InvokeMethod( - method_name, - object_name, - *args, - **kwargs) - if ret == JOB_CREATED: - # wait for the job - jobname = outparams['Job'] - (ret, outparams) = self.finish_job(jobname, return_constructor) - return (ret, outparams) diff --git a/src/python/lmi/test/cimbase.py b/src/python/lmi/test/cimbase.py new file mode 100644 index 0000000..def5ba5 --- /dev/null +++ b/src/python/lmi/test/cimbase.py @@ -0,0 +1,362 @@ +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek <jsafrane@redhat.com> +# Authors: Michal Minar <miminar@redhat.com> +# Authors: Roman Rakus <rrakus@redhat.com> +# +""" +Base class and utilities for test suits written with plain pywbem +abastractions. +""" + +import BaseHTTPServer +import pywbem +import Queue +import random +import threading +import time + +from lmi.test import base + +JOB_CREATED = 4096 + +class CIMListener(object): + """ CIM Listener for indication events. """ + + class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): + """ Handler for the POST request from *CIMOM*. """ + def do_POST(self): + """ Handle the POST request. """ + data = self.rfile.read(int(self.headers['Content-Length'])) + ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) + # Get the instance from CIM-XML, copied from + # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial + insts = [x[1] for x in ttree[2][2][0][2][2]] + for inst in insts: + self.callback(inst) + self.send_response(200) + self.end_headers() + + def log_message(self, _fmt, *_args): + # suppress log messages + pass + + def __init__(self, callback, http_port=5988): + self.address = ('', http_port) + self.CIMHandler.callback = callback + self.thread = None + self.server = None + + def start(self): + """ Start listening. """ + BaseHTTPServer.HTTPServer.allow_reuse_address = True + self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + + def stop(self): + """ Stop listening. """ + if self.server is not None: + self.server.shutdown() + self.server.socket.close() + if self.thread is not None: + self.thread.join() + + def running(self): + """ + Are we listening for events? + :rtype: bool + """ + return self.thread is not None + +class CIMTestCase(base.BaseLmiTestCase): + """ + Base class for LMI test cases based on plain pywbem. + """ + + #: Says, whether the test case needs indication listener running or not. + #: Each subclass shall override this property and set it to ``True`` if + #: it want to test indication events. + NEEDS_INDICATIONS = False + + _WBEMCONNECTION = None + _SYSTEM_INAME = None + + @classmethod + def needs_indications(cls): + """ + Whether the indication listener should be started for this test case. + In subclasses override ``NEEDS_INDICATIONS`` property and set it to + ``True`` if indication testing is desired. + """ + return cls.NEEDS_INDICATIONS + + @classmethod + def setUpClass(cls): + base.BaseLmiTestCase.setUpClass.im_func(cls) + if cls.needs_indications(): + cls.indication_port = random.randint(12000, 13000) + cls.indication_queue = Queue.Queue() + cls.listener = CIMListener( + callback=cls._process_indication, + http_port=cls.indication_port) + + @classmethod + def tearDownClass(cls): + if cls.needs_indications(): + cls.listener.stop() + + @classmethod + def _start_listening(cls): + """ Start listening for incoming indications. """ + cls.listener.start() + + @classmethod + def _process_indication(cls, indication): + """ Callback to process one indication.""" + cls.indication_queue.put(indication) + + @property + def wbemconnection(self): + """ + :returns: Active connection to *CIMOM*. + :rtype: :py:class:`pywbem.WBEMConnection` + """ + if CIMTestCase._WBEMCONNECTION is None: + CIMTestCase._WBEMCONNECTION = pywbem.WBEMConnection(self.url, + (self.username, self.password)) + return CIMTestCase._WBEMCONNECTION + + @property + def system_iname(self): + """ + :returns: Instance of ``CIM_ComputerSystem`` registered with *CIMOM*. + :rtype: :py:class:`lmi.shell.LMIInstanceName` + """ + if CIMTestCase._SYSTEM_INAME is None: + CIMTestCase._SYSTEM_INAME = self.wbemconnection. \ + EnumerateInstanceNames(self.system_cs_name, 'root/cimv2')[0] + return CIMTestCase._SYSTEM_INAME.copy() + + def setUp(self): + self.subscribed = {} + + def tearDown(self): + for name in self.subscribed.keys(): + self.unsubscribe(name) + + def assertCIMIsSubclass(self, cls, base_cls): + """ + Checks, whether cls is subclass of base_cls from CIM perspective. + @param cls name of subclass + @param base_cls name of base class + """ + if not isinstance(cls, basestring): + raise TypeError("cls must be a string") + if not isinstance(base_cls, basestring): + raise TypeError("base_cls must be a string") + return self.assertTrue(pywbem.is_subclass(self.wbemconnection, + "root/cimv2", base_cls, cls)) + + def get_indication(self, timeout): + """ Wait for an indication for given nr. of seconds and return it.""" + try: + indication = self.indication_queue.get(timeout=timeout) + except Queue.Empty: + raise AssertionError("Timeout when waiting for indication") + self.indication_queue.task_done() + return indication + + def make_filter_iname(self, filter_name): + """ + Create an instance name of ``CIM_IndicationFilter``. + + :rtype: :py:class:`pywbem.CIMInstanceName` + """ + return pywbem.CIMInstanceName( + classname="CIM_IndicationFilter", + namespace="root/interop", + keybindings={ + 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': self.system_cs_name, + 'SystemName': self.SYSTEM_NAME, + 'Name': filter_name}) + + def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"): + """ + Create an instance of ``CIM_IndicationFilter``. + + :rtype: :py:class:`pywbem.CIMInstance` + """ + inst = pywbem.CIMInstance('CIM_IndicationFilter') + inst['CreationClassName'] = 'CIM_IndicationFilter' + inst['SystemCreationClassName'] = self.system_cs_name + inst['SystemName'] = self.SYSTEM_NAME + inst['Name'] = filter_name + inst['Query'] = query + inst['QueryLanguage'] = query_lang + inst['SourceNamespace'] = "root/cimv2" + inst.path = self.make_filter_iname(filter_name) + return inst + + def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): + """ + Create indication subscription for given filter name. + """ + if not self.needs_indications(): + raise Exception("can not subscribe to indications, enable them" + " with NEEDS_INDICATIONS") + + if query is not None: + # Create filter first + indfilter = self.wbemconnection.CreateInstance( + self.make_filter_inst(filter_name, query, querylang)) + else: + # the filter is already created, assemble its name + indfilter = self.make_filter_iname(filter_name) + + # create destination + destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') + destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' + destinst['SystemCreationClassName'] = self.system_cs_name + destinst['SystemName'] = self.SYSTEM_NAME + destinst['Name'] = filter_name + destinst['Destination'] = "http://localhost:%d" % (self.indication_port) + destinst['PersistenceType'] = pywbem.Uint16(3) # Transient + cop = pywbem.CIMInstanceName( + 'CIM_ListenerDestinationCIMXML', namespace="root/interop") + cop.keybindings = { + 'CreationClassName' : 'CIM_ListenerDestinationCIMXML', + 'SystemClassName' : self.system_cs_name, + 'SystemName' : self.SYSTEM_NAME, + 'Name' : filter_name } + destinst.path = cop + destname = self.wbemconnection.CreateInstance(destinst) + + # create the subscription + subinst = pywbem.CIMInstance( + 'CIM_IndicationSubscription') + subinst['Filter'] = indfilter + subinst['Handler'] = destname + cop = pywbem.CIMInstanceName( + 'CIM_IndicationSubscription', namespace="root/interop") + cop.keybindings = { + 'Filter': indfilter, + 'Handler': destname } + subinst.path = cop + subscription = self.wbemconnection.CreateInstance(subinst) + + self.subscribed[filter_name] = [subscription, destname] + + if not self.listener.running(): + self._start_listening() + return subscription + + def unsubscribe(self, filter_name): + """ + Unsubscribe from given filter. + """ + if not self.needs_indications(): + raise Exception("can not unsubscribe to indications, enable them" + " with NEEDS_INDICATIONS") + for instance in self.subscribed.pop(filter_name): + self.wbemconnection.DeleteInstance(instance) + + def finish_job(self, + jobname, + assoc_class, + return_constructor=int): + """ + Wait until the job finishes and return ``(ret, outparams)``just as + ``InvokeMethod`` would. + + It's hard to reconstruct these outparams, since the embedded + instances / ojects do not work in our ``CIMOMS``, therefore special + care is needed. + + :param jobname: Name of the job. + :type jobname: :py:class:`pywbem.CIMInstanceName` + :param callable return_constructor: Callable, which converts + string to the right type, for example ``int``. + :returns: ``(retvalue, outparams)`` in the same way as + ``finish_method()`` would. + :rtype: tuple + """ + # Use busy loop for now + # TODO: rework to something sane + while True: + job = self.wbemconnection.GetInstance(jobname) + if job['JobState'] > 5: # all states higher than 5 are final + break + time.sleep(0.1) + + # get the MethodResult + resultname = self.wbemconnection.AssociatorNames( + jobname, + AssocClass=assoc_class)[0] + result = self.wbemconnection.GetInstance(resultname) + ind = result['PostCallIndication'] + # check for error + if ind['Error'] is not None: + err = ind['Error'][0] + code = err['CIMStatusCode'] + msg = err['Message'] + raise pywbem.CIMError(code, msg) + + ret = return_constructor(ind['ReturnValue']) + + # convert output parameters to format returned by InvokeMethod + outparams = pywbem.NocaseDict() + try: + params = ind['MethodParameters'] + except KeyError: + params = {} + if params: + for (key, value) in params.iteritems(): + outparams[key] = value + + return (ret, outparams) + + def invoke_async_method(self, + method_name, + object_name, + return_constructor=int, + *args, **kwargs): + """ + Invoke a method and if it returns a job, wait for the job. + + :param string method_name: Name of the method. + :param object_name: Instance, on which the method should be invoked. + :type object_name: :py:class:`pywbem.CIMInstanceName` + :param callable return_constructor: Callable, which converts + string to the right type, for example ``int``. + :param list args: Positional arguments passed to invoked method. + :param dictionary kwargs: Keyword arguments passed to invoked method. + :returns: ``(retvalue, outparams)`` in the same way as + ``finish_method()`` would. + :rtype: tuple + """ + (ret, outparams) = self.wbemconnection.InvokeMethod( + method_name, + object_name, + *args, + **kwargs) + if ret == JOB_CREATED: + # wait for the job + jobname = outparams['Job'] + (ret, outparams) = self.finish_job(jobname, return_constructor) + return (ret, outparams) diff --git a/src/python/lmi/test/lmibase.py b/src/python/lmi/test/lmibase.py new file mode 100644 index 0000000..e1b1b29 --- /dev/null +++ b/src/python/lmi/test/lmibase.py @@ -0,0 +1,169 @@ +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Base class and utilities for test suits written upon lmi shell. +""" + +import functools +import inspect +import pywbem + +from lmi.test import base +from lmi.shell import connect +from lmi.shell import LMIInstance +from lmi.shell import LMIInstanceName +from lmi.shell import LMIUtil + +def enable_lmi_exceptions(method): + """ + Function or method decorator enabling exceptions to be raised from under + lmi shell intestines. + """ + @functools.wraps(method) + def _wrapper(*args, **kwargs): + """ Enable exceptions in wrapped function. """ + original = LMIUtil.lmi_get_use_exceptions() + LMIUtil.lmi_set_use_exceptions(True) + try: + retval = method(*args, **kwargs) + finally: + LMIUtil.lmi_set_use_exceptions(original) + return retval + + return _wrapper + +def to_cim_object(obj): + """ + :returns: Wrapped object of from inside of shell abstractions. + """ + if isinstance(obj, (LMIInstance, LMIInstanceName)): + return obj.wrapped_object + return obj + +class LmiTestCase(base.BaseLmiTestCase): + """ + Base class for all LMI test cases based on lmi shell. + """ + + #: Once you override this in subclass with a name of CIM class to be tested, + #: you can use :py:attr:`LmiTestCase.cim_class` to get reference to a shell + #: wrapper of this class. + CLASS_NAME = None + + _SYSTEM_INAME = None + + @classmethod + def setUpClass(cls): + base.BaseLmiTestCase.setUpClass.im_func(cls) + + @property + def conn(self): + """ + :returns: Active connection to *CIMOM* wrapped by *lmi shell* + abstraction. + :rtype: :py:class:`lmi.shell.LMIConnection` + """ + if not hasattr(self, '_shellconnection'): + kwargs = {} + con_argspec = inspect.getargspec(connect) + # support older versions of lmi shell + if 'verify_server_cert' in con_argspec.args or con_argspec.keywords: + # newer name + kwargs['verify_server_cert'] = False + elif 'verify_certificate' in con_argspec.args: + # older one + kwargs['verify_certificate'] = False + self._shellconnection = connect( + self.url, self.username, self.password, **kwargs) + return self._shellconnection + + @property + def ns(self): + """ + :returns: Namespace object representing CIM ``"root/cimv2"`` namespace. + :rtype: :py:class:`lmi.shell.LMINamespace` + """ + return self.conn.root.cimv2 + + @property + def cim_class(self): + """ + A convenience accessor to ``self.conn.root.cimv2.<CLASS_NAME>``. + You need to override ``CLASS_NAME`` attribute of this class in order + to use this. + + :returns: Lmi shell wrapper of CIM class to be tested. + :rtype: :py:class:`lmi.shell.LMIClass` + """ + if self.CLASS_NAME is None: + return None + return getattr(self.ns, self.CLASS_NAME) + + @property + def system_iname(self): + """ + :returns: Instance of ``CIM_ComputerSystem`` registered with *CIMOM*. + :rtype: :py:class:`lmi.shell.LMIInstanceName` + """ + if LmiTestCase._SYSTEM_INAME is None: + LmiTestCase._SYSTEM_INAME = getattr(self.ns, self.system_cs_name) \ + .first_instance_name() + return LmiTestCase._SYSTEM_INAME.copy() + + def assertCIMIsSubclass(self, cls, base_cls): + """ + Checks, whether cls is subclass of base_cls from CIM perspective. + @param cls name of subclass + @param base_cls name of base class + """ + if not isinstance(cls, basestring): + raise TypeError("cls must be a string") + if not isinstance(base_cls, basestring): + raise TypeError("base_cls must be a string") + return self.assertTrue(pywbem.is_subclass(self.conn._client._cliconn, + "root/cimv2", base_cls, cls)) + + def assertRaisesCIM(self, cim_err_code, func, *args, **kwds): + """ + This test passes if given function called with supplied arguments + raises :py:class:`pywbem.CIMError` with given cim error code. + """ + base.BaseLmiTestCase.assertRaisesCIM( + self, cim_err_code, func, *args, **kwds) + + def assertCIMNameEqual(self, fst, snd, msg=None): + """ + Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their host + properties are not checked. + """ + base.BaseLmiTestCase.assertCIMNameEqual( + self, + to_cim_object(fst), + to_cim_object(snd), + msg) + + def assertCIMNameIn(self, name, candidates): + """ + Checks that given :py:class:`pywbem.CIMInstanceName` is present in + set of candidates. It compares all properties but ``host``. + """ + name = to_cim_object(name) + candidates = [to_cim_object(c) for c in candidates] + base.BaseLmiTestCase.assertCIMNameIn(self, name, candidates) + diff --git a/src/python/lmi/test/util.py b/src/python/lmi/test/util.py new file mode 100644 index 0000000..32d04c4 --- /dev/null +++ b/src/python/lmi/test/util.py @@ -0,0 +1,131 @@ +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +LMI test utilities. +""" + +import os +import pywbem +import socket +import unittest + +def is_this_system(system_name): + """ + :returns: Whether the given *system_name* matches the hostname of currently + running system. + :rtype: boolean + """ + return ( socket.gethostbyaddr(system_name)[0] + == socket.gethostbyaddr(socket.gethostname())[0]) + +def get_environvar(variable, default='', convert=str): + """ + Get the value of environment variable. + + :param string variable: Name of environment variable. + :param default: Any value that should be returned when the variable is not + set. If None, the conversion won't be done. + :param callable convert: Function transforming value to something else. + :returns: Converted value of the environment variable. + """ + val = os.environ.get(variable, default) + if convert is bool: + return val.lower() in ('true', 'yes', 'on', '1') + if val is None: + return None + return convert(val) + +def mark_dangerous(method): + """ + Decorator for methods of :py:class:`unittest.TestCase` subclasses that + skips dangerous tests if an environment variable says so. + ``LMI_RUN_DANGEROUS`` is the environment variabled read. + + These tests will be skipped by default. + """ + if get_environvar('LMI_RUN_DANGEROUS', '0', bool): + return method + else: + return unittest.skip("This test is marked as dangerous.")(method) + +def mark_tedious(method): + """ + Decorator for methods of :py:class:`unittest.TestCase` subclasses that + skips tedious tests. Those running for very long time and usually need a + lot of memory. They are run by default. Environment variable + ``LMI_RUN_TEDIOUS`` can be used to skip them. + """ + if get_environvar('LMI_RUN_TEDIOUS', '1', bool): + return method + else: + return unittest.skip("This test is marked as tedious.")(method) + +def check_inames_equal(fst, snd): + """ + Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their ``host`` + property is not checked. Be benevolent when checking names system + creation class names. + + :returns: ``True`` if both instance names are equal. + :rtype: boolean + """ + if not isinstance(fst, pywbem.CIMInstanceName): + raise TypeError("fst argument must be a pywbem.CIMInstanceName, not %s" + % repr(fst)) + if not isinstance(snd, pywbem.CIMInstanceName): + raise TypeError("snd argument must be a pywbem.CIMInstanceName, not %s" + % repr(snd)) + if fst.classname != snd.classname or fst.namespace != snd.namespace: + return False + + snd_keys = { k: v for k, v in snd.keybindings.iteritems() } + for key, value in fst.keybindings.iteritems(): + if key not in snd_keys: + return False + snd_value = snd_keys.pop(key) + if ( isinstance(value, pywbem.CIMInstanceName) + and isinstance(snd_value, pywbem.CIMInstanceName)): + if not check_inames_equal(value, snd_value): + return False + + # accept also aliases in the Name attribute of ComputerSystem + elif ( ( fst.classname.endswith('_ComputerSystem') + and key.lower() == 'name') + or ( key.lower() == 'systemname' + and 'SystemCreationClassName' in fst)): + if ( value != snd_value + and ( not is_this_system(value) + or not is_this_system(snd_value))): + return False + + elif ( fst.classname.endswith('_ComputerSystem') + and key.lower() == 'creationclassname'): + if ( value != snd_value + and 'CIM_ComputerSystem' not in [ + p['CreationClassName'] for p in (fst, snd)]): + return False + + elif value != snd_value: + return False + + if snd_keys: # second path has more key properties than first one + return False + + return True + |
