diff options
author | Michal Minar <miminar@redhat.com> | 2013-10-24 08:49:37 +0200 |
---|---|---|
committer | Michal Minar <miminar@redhat.com> | 2013-10-24 10:28:30 +0200 |
commit | 53cb6d8e6f44e5c103b266e715d8ca56fa70ca50 (patch) | |
tree | 8f691e96ea08c86890d93a0d0e73a53873a64c09 /src | |
parent | fd26a63b53334483a1f70d9cc30642dc7587b0ec (diff) | |
download | openlmi-providers-53cb6d8e6f44e5c103b266e715d8ca56fa70ca50.tar.gz openlmi-providers-53cb6d8e6f44e5c103b266e715d8ca56fa70ca50.tar.xz openlmi-providers-53cb6d8e6f44e5c103b266e715d8ca56fa70ca50.zip |
tests: added base class for our test cases
import it in your tests with:
from lmi.test.base import LmiTestCase
For imports to work, run:
export PYTHONPATH=${PROVIDERS_GIT_ROOT}/src/python
before nosetests or before running them individually.
Diffstat (limited to 'src')
-rw-r--r-- | src/python/lmi/test/__init__.py | 20 | ||||
-rw-r--r-- | src/python/lmi/test/base.py | 447 |
2 files changed, 467 insertions, 0 deletions
diff --git a/src/python/lmi/test/__init__.py b/src/python/lmi/test/__init__.py new file mode 100644 index 0000000..6ff965f --- /dev/null +++ b/src/python/lmi/test/__init__.py @@ -0,0 +1,20 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# diff --git a/src/python/lmi/test/base.py b/src/python/lmi/test/base.py new file mode 100644 index 0000000..eb00914 --- /dev/null +++ b/src/python/lmi/test/base.py @@ -0,0 +1,447 @@ +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Jan Safranek <jsafrane@redhat.com> +# Authors: Michal Minar <miminar@redhat.com> +# Authors: Roman Rakus <rrakus@redhat.com> +# +""" +Base classes for *OpenLMI Provider* test cases. +""" + +import BaseHTTPServer +import os +import pywbem +import Queue +import random +import socket +import threading +import time +import unittest + +JOB_CREATED = 4096 + +def get_environvar(variable, default='', convert=str): + """ + Get the value of environment variable. + + :param string variable: Name of environment variable. + :param default: Any value that should be returned when the variable is not + set. If None, the conversion won't be done. + :param callable convert: Function transforming value to something else. + :returns: Converted value of the environment variable. + """ + val = os.environ.get(variable, default) + if convert is bool: + return val.lower() in ('true', 'yes', 'on', '1') + if val is None: + return None + return convert(val) + +def mark_dangerous(method): + """ + Decorator for methods of :py:class:`unittest.TestCase` subclasses that + skips dangerous tests if an environment variable says so. + ``LMI_RUN_DANGEROUS`` is the environment variabled read. + + These tests will be skipped by default. + """ + if get_environvar('LMI_RUN_DANGEROUS', '0', bool): + return method + else: + return unittest.skip("This test is marked as dangerous.")(method) + +def mark_tedious(method): + """ + Decorator for methods of :py:class:`unittest.TestCase` subclasses that + skips tedious tests. Those running for very long time and usually need a + lot of memory. They are run by default. Environment variable + ``LMI_RUN_TEDIOUS`` can be used to skip them. + """ + if get_environvar('LMI_RUN_TEDIOUS', '1', bool): + return method + else: + return unittest.skip("This test is marked as tedious.")(method) + +def compare_cim_name(first, second): + """ + Compare two CIMInstanceNames. Their ``host`` property is not checked. + """ + if not isinstance(first, pywbem.CIMInstanceName): + raise TypeError("both arguments must be a pywbem.CIMInstanceName") + if not isinstance(second, pywbem.CIMInstanceName): + raise TypeError("both arguments must be a pywbem.CIMInstanceName") + return ( first.classname == second.classname + and first.namespace == second.namespace + and first.keybindings == second.keybindings) + +class CIMListener(object): + """ CIM Listener for indication events. """ + + class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): + """ Handler for the POST request from *CIMOM*. """ + def do_POST(self): + """ Handle the POST request. """ + data = self.rfile.read(int(self.headers['Content-Length'])) + ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) + # Get the instance from CIM-XML, copied from + # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial + insts = [x[1] for x in ttree[2][2][0][2][2]] + for inst in insts: + self.callback(inst) + self.send_response(200) + self.end_headers() + + def log_message(self, _fmt, *_args): + # suppress log messages + pass + + def __init__(self, callback, http_port=5988): + self.address = ('', http_port) + self.CIMHandler.callback = callback + self.thread = None + self.server = None + + def start(self): + """ Start listening. """ + BaseHTTPServer.HTTPServer.allow_reuse_address = True + self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + + def stop(self): + """ Stop listening. """ + if self.server is not None: + self.server.shutdown() + self.server.socket.close() + if self.thread is not None: + self.thread.join() + + def running(self): + """ + Are we listening for events? + :rtype: bool + """ + return self.thread is not None + +class LmiTestCase(unittest.TestCase): + """ + Base class for all LMI test cases. + """ + + #: Value used in ``SystemName`` key properties in various *CIM* instances. + #: It's also used to fill ``CIM_ComputerySystem.Name`` property. + SYSTEM_NAME = socket.gethostname() + #: Says, whether the test case needs indication listener running or not. + #: Each subclass shall override this property and set it to ``True`` if + #: it want to test indication events. + NEEDS_INDICATIONS = False + + @classmethod + def needs_indications(cls): + """ + Whether the indication listener should be started for this test case. + In subclasses override ``NEEDS_INDICATIONS`` property and set it to + ``True`` if indication testing is desired. + """ + return cls.NEEDS_INDICATIONS + + @classmethod + def setUpClass(cls): + #: Cached value of SystemCreationClassName set with + #: ``LMI_CS_CLASSNAME`` environment variable. + cls.system_cs_name = os.environ.get("LMI_CS_CLASSNAME", "PG_ComputerSystem") + #: *URL* of *CIMOM* we connect to. Overriden with ``LMI_CIMOM_URL`` + #: environment variable. + cls.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989") + #: User name for authentication with *CIMOM*. Overriden with + #: ``LMI_CIMOM_USERNAME`` variable. + cls.username = os.environ.get("LMI_CIMOM_USERNAME", "root") + #: User's password for authentication with *CIMOM*. Overriden with + #: ``LMI_CIMOM_PASSWORD`` environment variable. + cls.password = os.environ.get("LMI_CIMOM_PASSWORD", "") + #: Name of *CIMOM* we connect to. There are two possible values: + #: ``"tog-pegasus"`` and ``"sblim-sfcb"``. Overriden with + #: ``LMI_CIMOM_BROKER`` environment variable. + cls.cimom = os.environ.get("LMI_CIMOM_BROKER", "tog-pegasus") + #: Boolean value saying whether to run dangerous tests. These are marked + #: with :py:func:`mark_dangerous` decorator. This is set with + #: ``LMI_RUN_DANGEROUS`` environment variable. + cls.run_dangerous = get_environvar('LMI_RUN_DANGEROUS', '0', bool) + #: Boolean value saying whether to run tedious tests. These are marked + #: with :py:func:`mark_tedious` decorator. This is set with + #: ``LMI_RUN_TEDIOUS`` environment variable. + cls.run_tedious = get_environvar('LMI_RUN_TEDIOUS', '1', bool) + + #: Active connection to *CIMOM*. + cls.wbemconnection = pywbem.WBEMConnection(cls.url, + (cls.username, cls.password)) + + if cls.needs_indications(): + cls.indication_port = random.randint(12000, 13000) + cls.indication_queue = Queue.Queue() + cls.listener = CIMListener( + callback=cls._process_indication, + http_port=cls.indication_port) + + @classmethod + def tearDownClass(cls): + if cls.needs_indications(): + cls.listener.stop() + + @classmethod + def _start_listening(self): + """ Start listening for incoming indications. """ + self.listener.start() + + @classmethod + def _process_indication(self, indication): + """ Callback to process one indication.""" + self.indication_queue.put(indication) + + def setUp(self): + self.subscribed = {} + + def tearDown(self): + for name in self.subscribed.keys(): + self.unsubscribe(name) + + def assertRaisesCIM(self, cim_err_code, func, *args, **kwds): + """ + This test passes if given function called with supplied arguments + raises :py:class:`pywbem.CIMError` with given cim error code. + """ + with self.assertRaises(pywbem.CIMError) as cm: + func(*args, **kwds) + self.assertEqual(cim_err_code, cm.exception.args[0]) + + def assertCIMNameEquals(self, first, second): + """ + Compare two CIMInstanceNames. Their host properties are not checked. + """ + self.assertTrue(compare_cim_name(first, second), + "%s != %s" % (str(first), str(second))) + + def assertCIMNameIn(self, name, candidates): + """ + Checks that given :py:class:`pywbem.CIMInstanceName` is present in + set of candidates. It compares all properties but ``host``. + """ + for candidate in candidates: + if compare_cim_name(name, candidate): + return + self.assertTrue(False, 'name "%s" is not in candidates' % str(name)) + + def assertNocaseDictEqual(self, fst, snd, msg=None): + """ + Compare two no-case dictionaries ignoring the case of their keys. + """ + fst_dict = {} + for (key, value) in fst.iteritems(): + fst_dict[key.lower()] = value + snd_dict = {} + for (key, value) in snd.iteritems(): + snd_dict[key.lower()] = value + self.assertEqual(fst_dict, snd_dict, msg) + + def get_indication(self, timeout): + """ Wait for an indication for given nr. of seconds and return it.""" + try: + indication = self.indication_queue.get(timeout=timeout) + except Queue.Empty: + raise AssertionError("Timeout when waiting for indication") + self.indication_queue.task_done() + return indication + + def make_filter_iname(self, filter_name): + """ + Create an instance name of ``CIM_IndicationFilter``. + + :rtype: :py:class:`pywbem.CIMInstanceName` + """ + return pywbem.CIMInstanceName( + classname="CIM_IndicationFilter", + namespace="root/interop", + keybindings={ + 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': self.system_cs_name, + 'SystemName': self.SYSTEM_NAME, + 'Name': filter_name}) + + def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"): + """ + Create an instance of ``CIM_IndicationFilter``. + + :rtype: :py:class:`pywbem.CIMInstance` + """ + inst = pywbem.CIMInstance('CIM_IndicationFilter') + inst['CreationClassName'] = 'CIM_IndicationFilter' + inst['SystemCreationClassName'] = self.system_cs_name + inst['SystemName'] = self.SYSTEM_NAME + inst['Name'] = filter_name + inst['Query'] = query + inst['QueryLanguage'] = query_lang + inst['SourceNamespace'] = "root/cimv2" + inst.path = self.make_filter_iname(filter_name) + return inst + + def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): + """ + Create indication subscription for given filter name. + """ + if not self.needs_indications(): + raise Exception("can not subscribe to indications, enable them" + " with NEEDS_INDICATIONS") + + if query is not None: + # Create filter first + indfilter = self.wbemconnection.CreateInstance( + self.make_filter_inst(filter_name, query, querylang)) + else: + # the filter is already created, assemble its name + indfilter = self.make_filter_iname(filter_name) + + # create destination + destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') + destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' + destinst['SystemCreationClassName'] = self.system_cs_name + destinst['SystemName'] = self.SYSTEM_NAME + destinst['Name'] = filter_name + destinst['Destination'] = "http://localhost:%d" % (self.indication_port) + destinst['PersistenceType'] = pywbem.Uint16(3) # Transient + cop = pywbem.CIMInstanceName( + 'CIM_ListenerDestinationCIMXML', namespace="root/interop") + cop.keybindings = { + 'CreationClassName' : 'CIM_ListenerDestinationCIMXML', + 'SystemClassName' : self.system_cs_name, + 'SystemName' : self.SYSTEM_NAME, + 'Name' : filter_name } + destinst.path = cop + destname = self.wbemconnection.CreateInstance(destinst) + + # create the subscription + subinst = pywbem.CIMInstance( + 'CIM_IndicationSubscription') + subinst['Filter'] = indfilter + subinst['Handler'] = destname + cop = pywbem.CIMInstanceName( + 'CIM_IndicationSubscription', namespace="root/interop") + cop.keybindings = { + 'Filter': indfilter, + 'Handler': destname } + subinst.path = cop + subscription = self.wbemconnection.CreateInstance(subinst) + + self.subscribed[filter_name] = [subscription, destname] + + if not self.listener.running(): + self._start_listening() + return subscription + + def unsubscribe(self, filter_name): + """ + Unsubscribe from given filter. + """ + if not self.needs_indications(): + raise Exception("can not unsubscribe to indications, enable them" + " with NEEDS_INDICATIONS") + for instance in self.subscribed.pop(filter_name): + self.wbemconnection.DeleteInstance(instance) + + def finish_job(self, + jobname, + assoc_class, + return_constructor=int): + """ + Wait until the job finishes and return ``(ret, outparams)``just as + ``InvokeMethod`` would. + + It's hard to reconstruct these outparams, since the embedded + instances / ojects do not work in our ``CIMOMS``, therefore special + care is needed. + + :param jobname: Name of the job. + :type jobname: :py:class:`pywbem.CIMInstanceName` + :param callable return_constructor: Callable, which converts + string to the right type, for example ``int``. + :returns: ``(retvalue, outparams)`` in the same way as + ``finish_method()`` would. + :rtype: tuple + """ + # Use busy loop for now + # TODO: rework to something sane + while True: + job = self.wbemconnection.GetInstance(jobname) + if job['JobState'] > 5: # all states higher than 5 are final + break + time.sleep(0.1) + + # get the MethodResult + resultname = self.wbemconnection.AssociatorNames( + jobname, + AssocClass=assoc_class)[0] + result = self.wbemconnection.GetInstance(resultname) + ind = result['PostCallIndication'] + # check for error + if ind['Error'] is not None: + err = ind['Error'][0] + code = err['CIMStatusCode'] + msg = err['Message'] + raise pywbem.CIMError(code, msg) + + ret = return_constructor(ind['ReturnValue']) + + # convert output parameters to format returned by InvokeMethod + outparams = pywbem.NocaseDict() + try: + params = ind['MethodParameters'] + except KeyError: + params = {} + if params: + for (key, value) in params.iteritems(): + outparams[key] = value + + return (ret, outparams) + + def invoke_async_method(self, + method_name, + object_name, + return_constructor=int, + *args, **kwargs): + """ + Invoke a method and if it returns a job, wait for the job. + + :param string method_name: Name of the method. + :param object_name: Instance, on which the method should be invoked. + :type object_name: :py:class:`pywbem.CIMInstanceName` + :param callable return_constructor: Callable, which converts + string to the right type, for example ``int``. + :param list args: Positional arguments passed to invoked method. + :param dictionary kwargs: Keyword arguments passed to invoked method. + :returns: ``(retvalue, outparams)`` in the same way as + ``finish_method()`` would. + :rtype: tuple + """ + (ret, outparams) = self.wbemconnection.InvokeMethod( + method_name, + object_name, + *args, + **kwargs) + if ret == JOB_CREATED: + # wait for the job + jobname = outparams['Job'] + (ret, outparams) = self.finish_job(jobname, return_constructor) + return (ret, outparams) |