summaryrefslogtreecommitdiffstats
path: root/src/python
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-10-24 08:49:37 +0200
committerMichal Minar <miminar@redhat.com>2013-10-24 10:28:30 +0200
commit53cb6d8e6f44e5c103b266e715d8ca56fa70ca50 (patch)
tree8f691e96ea08c86890d93a0d0e73a53873a64c09 /src/python
parentfd26a63b53334483a1f70d9cc30642dc7587b0ec (diff)
downloadopenlmi-providers-53cb6d8e6f44e5c103b266e715d8ca56fa70ca50.tar.gz
openlmi-providers-53cb6d8e6f44e5c103b266e715d8ca56fa70ca50.tar.xz
openlmi-providers-53cb6d8e6f44e5c103b266e715d8ca56fa70ca50.zip
tests: added base class for our test cases
import it in your tests with: from lmi.test.base import LmiTestCase For imports to work, run: export PYTHONPATH=${PROVIDERS_GIT_ROOT}/src/python before nosetests or before running them individually.
Diffstat (limited to 'src/python')
-rw-r--r--src/python/lmi/test/__init__.py20
-rw-r--r--src/python/lmi/test/base.py447
2 files changed, 467 insertions, 0 deletions
diff --git a/src/python/lmi/test/__init__.py b/src/python/lmi/test/__init__.py
new file mode 100644
index 0000000..6ff965f
--- /dev/null
+++ b/src/python/lmi/test/__init__.py
@@ -0,0 +1,20 @@
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
diff --git a/src/python/lmi/test/base.py b/src/python/lmi/test/base.py
new file mode 100644
index 0000000..eb00914
--- /dev/null
+++ b/src/python/lmi/test/base.py
@@ -0,0 +1,447 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+# Authors: Roman Rakus <rrakus@redhat.com>
+#
+"""
+Base classes for *OpenLMI Provider* test cases.
+"""
+
+import BaseHTTPServer
+import os
+import pywbem
+import Queue
+import random
+import socket
+import threading
+import time
+import unittest
+
+JOB_CREATED = 4096
+
+def get_environvar(variable, default='', convert=str):
+ """
+ Get the value of environment variable.
+
+ :param string variable: Name of environment variable.
+ :param default: Any value that should be returned when the variable is not
+ set. If None, the conversion won't be done.
+ :param callable convert: Function transforming value to something else.
+ :returns: Converted value of the environment variable.
+ """
+ val = os.environ.get(variable, default)
+ if convert is bool:
+ return val.lower() in ('true', 'yes', 'on', '1')
+ if val is None:
+ return None
+ return convert(val)
+
+def mark_dangerous(method):
+ """
+ Decorator for methods of :py:class:`unittest.TestCase` subclasses that
+ skips dangerous tests if an environment variable says so.
+ ``LMI_RUN_DANGEROUS`` is the environment variabled read.
+
+ These tests will be skipped by default.
+ """
+ if get_environvar('LMI_RUN_DANGEROUS', '0', bool):
+ return method
+ else:
+ return unittest.skip("This test is marked as dangerous.")(method)
+
+def mark_tedious(method):
+ """
+ Decorator for methods of :py:class:`unittest.TestCase` subclasses that
+ skips tedious tests. Those running for very long time and usually need a
+ lot of memory. They are run by default. Environment variable
+ ``LMI_RUN_TEDIOUS`` can be used to skip them.
+ """
+ if get_environvar('LMI_RUN_TEDIOUS', '1', bool):
+ return method
+ else:
+ return unittest.skip("This test is marked as tedious.")(method)
+
+def compare_cim_name(first, second):
+ """
+ Compare two CIMInstanceNames. Their ``host`` property is not checked.
+ """
+ if not isinstance(first, pywbem.CIMInstanceName):
+ raise TypeError("both arguments must be a pywbem.CIMInstanceName")
+ if not isinstance(second, pywbem.CIMInstanceName):
+ raise TypeError("both arguments must be a pywbem.CIMInstanceName")
+ return ( first.classname == second.classname
+ and first.namespace == second.namespace
+ and first.keybindings == second.keybindings)
+
+class CIMListener(object):
+ """ CIM Listener for indication events. """
+
+ class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """ Handler for the POST request from *CIMOM*. """
+ def do_POST(self):
+ """ Handle the POST request. """
+ data = self.rfile.read(int(self.headers['Content-Length']))
+ ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data))
+ # Get the instance from CIM-XML, copied from
+ # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial
+ insts = [x[1] for x in ttree[2][2][0][2][2]]
+ for inst in insts:
+ self.callback(inst)
+ self.send_response(200)
+ self.end_headers()
+
+ def log_message(self, _fmt, *_args):
+ # suppress log messages
+ pass
+
+ def __init__(self, callback, http_port=5988):
+ self.address = ('', http_port)
+ self.CIMHandler.callback = callback
+ self.thread = None
+ self.server = None
+
+ def start(self):
+ """ Start listening. """
+ BaseHTTPServer.HTTPServer.allow_reuse_address = True
+ self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.start()
+
+ def stop(self):
+ """ Stop listening. """
+ if self.server is not None:
+ self.server.shutdown()
+ self.server.socket.close()
+ if self.thread is not None:
+ self.thread.join()
+
+ def running(self):
+ """
+ Are we listening for events?
+ :rtype: bool
+ """
+ return self.thread is not None
+
+class LmiTestCase(unittest.TestCase):
+ """
+ Base class for all LMI test cases.
+ """
+
+ #: Value used in ``SystemName`` key properties in various *CIM* instances.
+ #: It's also used to fill ``CIM_ComputerySystem.Name`` property.
+ SYSTEM_NAME = socket.gethostname()
+ #: Says, whether the test case needs indication listener running or not.
+ #: Each subclass shall override this property and set it to ``True`` if
+ #: it want to test indication events.
+ NEEDS_INDICATIONS = False
+
+ @classmethod
+ def needs_indications(cls):
+ """
+ Whether the indication listener should be started for this test case.
+ In subclasses override ``NEEDS_INDICATIONS`` property and set it to
+ ``True`` if indication testing is desired.
+ """
+ return cls.NEEDS_INDICATIONS
+
+ @classmethod
+ def setUpClass(cls):
+ #: Cached value of SystemCreationClassName set with
+ #: ``LMI_CS_CLASSNAME`` environment variable.
+ cls.system_cs_name = os.environ.get("LMI_CS_CLASSNAME", "PG_ComputerSystem")
+ #: *URL* of *CIMOM* we connect to. Overriden with ``LMI_CIMOM_URL``
+ #: environment variable.
+ cls.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989")
+ #: User name for authentication with *CIMOM*. Overriden with
+ #: ``LMI_CIMOM_USERNAME`` variable.
+ cls.username = os.environ.get("LMI_CIMOM_USERNAME", "root")
+ #: User's password for authentication with *CIMOM*. Overriden with
+ #: ``LMI_CIMOM_PASSWORD`` environment variable.
+ cls.password = os.environ.get("LMI_CIMOM_PASSWORD", "")
+ #: Name of *CIMOM* we connect to. There are two possible values:
+ #: ``"tog-pegasus"`` and ``"sblim-sfcb"``. Overriden with
+ #: ``LMI_CIMOM_BROKER`` environment variable.
+ cls.cimom = os.environ.get("LMI_CIMOM_BROKER", "tog-pegasus")
+ #: Boolean value saying whether to run dangerous tests. These are marked
+ #: with :py:func:`mark_dangerous` decorator. This is set with
+ #: ``LMI_RUN_DANGEROUS`` environment variable.
+ cls.run_dangerous = get_environvar('LMI_RUN_DANGEROUS', '0', bool)
+ #: Boolean value saying whether to run tedious tests. These are marked
+ #: with :py:func:`mark_tedious` decorator. This is set with
+ #: ``LMI_RUN_TEDIOUS`` environment variable.
+ cls.run_tedious = get_environvar('LMI_RUN_TEDIOUS', '1', bool)
+
+ #: Active connection to *CIMOM*.
+ cls.wbemconnection = pywbem.WBEMConnection(cls.url,
+ (cls.username, cls.password))
+
+ if cls.needs_indications():
+ cls.indication_port = random.randint(12000, 13000)
+ cls.indication_queue = Queue.Queue()
+ cls.listener = CIMListener(
+ callback=cls._process_indication,
+ http_port=cls.indication_port)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.needs_indications():
+ cls.listener.stop()
+
+ @classmethod
+ def _start_listening(self):
+ """ Start listening for incoming indications. """
+ self.listener.start()
+
+ @classmethod
+ def _process_indication(self, indication):
+ """ Callback to process one indication."""
+ self.indication_queue.put(indication)
+
+ def setUp(self):
+ self.subscribed = {}
+
+ def tearDown(self):
+ for name in self.subscribed.keys():
+ self.unsubscribe(name)
+
+ def assertRaisesCIM(self, cim_err_code, func, *args, **kwds):
+ """
+ This test passes if given function called with supplied arguments
+ raises :py:class:`pywbem.CIMError` with given cim error code.
+ """
+ with self.assertRaises(pywbem.CIMError) as cm:
+ func(*args, **kwds)
+ self.assertEqual(cim_err_code, cm.exception.args[0])
+
+ def assertCIMNameEquals(self, first, second):
+ """
+ Compare two CIMInstanceNames. Their host properties are not checked.
+ """
+ self.assertTrue(compare_cim_name(first, second),
+ "%s != %s" % (str(first), str(second)))
+
+ def assertCIMNameIn(self, name, candidates):
+ """
+ Checks that given :py:class:`pywbem.CIMInstanceName` is present in
+ set of candidates. It compares all properties but ``host``.
+ """
+ for candidate in candidates:
+ if compare_cim_name(name, candidate):
+ return
+ self.assertTrue(False, 'name "%s" is not in candidates' % str(name))
+
+ def assertNocaseDictEqual(self, fst, snd, msg=None):
+ """
+ Compare two no-case dictionaries ignoring the case of their keys.
+ """
+ fst_dict = {}
+ for (key, value) in fst.iteritems():
+ fst_dict[key.lower()] = value
+ snd_dict = {}
+ for (key, value) in snd.iteritems():
+ snd_dict[key.lower()] = value
+ self.assertEqual(fst_dict, snd_dict, msg)
+
+ def get_indication(self, timeout):
+ """ Wait for an indication for given nr. of seconds and return it."""
+ try:
+ indication = self.indication_queue.get(timeout=timeout)
+ except Queue.Empty:
+ raise AssertionError("Timeout when waiting for indication")
+ self.indication_queue.task_done()
+ return indication
+
+ def make_filter_iname(self, filter_name):
+ """
+ Create an instance name of ``CIM_IndicationFilter``.
+
+ :rtype: :py:class:`pywbem.CIMInstanceName`
+ """
+ return pywbem.CIMInstanceName(
+ classname="CIM_IndicationFilter",
+ namespace="root/interop",
+ keybindings={
+ 'CreationClassName': 'CIM_IndicationFilter',
+ 'SystemClassName': self.system_cs_name,
+ 'SystemName': self.SYSTEM_NAME,
+ 'Name': filter_name})
+
+ def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"):
+ """
+ Create an instance of ``CIM_IndicationFilter``.
+
+ :rtype: :py:class:`pywbem.CIMInstance`
+ """
+ inst = pywbem.CIMInstance('CIM_IndicationFilter')
+ inst['CreationClassName'] = 'CIM_IndicationFilter'
+ inst['SystemCreationClassName'] = self.system_cs_name
+ inst['SystemName'] = self.SYSTEM_NAME
+ inst['Name'] = filter_name
+ inst['Query'] = query
+ inst['QueryLanguage'] = query_lang
+ inst['SourceNamespace'] = "root/cimv2"
+ inst.path = self.make_filter_iname(filter_name)
+ return inst
+
+ def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"):
+ """
+ Create indication subscription for given filter name.
+ """
+ if not self.needs_indications():
+ raise Exception("can not subscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+
+ if query is not None:
+ # Create filter first
+ indfilter = self.wbemconnection.CreateInstance(
+ self.make_filter_inst(filter_name, query, querylang))
+ else:
+ # the filter is already created, assemble its name
+ indfilter = self.make_filter_iname(filter_name)
+
+ # create destination
+ destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML')
+ destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML'
+ destinst['SystemCreationClassName'] = self.system_cs_name
+ destinst['SystemName'] = self.SYSTEM_NAME
+ destinst['Name'] = filter_name
+ destinst['Destination'] = "http://localhost:%d" % (self.indication_port)
+ destinst['PersistenceType'] = pywbem.Uint16(3) # Transient
+ cop = pywbem.CIMInstanceName(
+ 'CIM_ListenerDestinationCIMXML', namespace="root/interop")
+ cop.keybindings = {
+ 'CreationClassName' : 'CIM_ListenerDestinationCIMXML',
+ 'SystemClassName' : self.system_cs_name,
+ 'SystemName' : self.SYSTEM_NAME,
+ 'Name' : filter_name }
+ destinst.path = cop
+ destname = self.wbemconnection.CreateInstance(destinst)
+
+ # create the subscription
+ subinst = pywbem.CIMInstance(
+ 'CIM_IndicationSubscription')
+ subinst['Filter'] = indfilter
+ subinst['Handler'] = destname
+ cop = pywbem.CIMInstanceName(
+ 'CIM_IndicationSubscription', namespace="root/interop")
+ cop.keybindings = {
+ 'Filter': indfilter,
+ 'Handler': destname }
+ subinst.path = cop
+ subscription = self.wbemconnection.CreateInstance(subinst)
+
+ self.subscribed[filter_name] = [subscription, destname]
+
+ if not self.listener.running():
+ self._start_listening()
+ return subscription
+
+ def unsubscribe(self, filter_name):
+ """
+ Unsubscribe from given filter.
+ """
+ if not self.needs_indications():
+ raise Exception("can not unsubscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+ for instance in self.subscribed.pop(filter_name):
+ self.wbemconnection.DeleteInstance(instance)
+
+ def finish_job(self,
+ jobname,
+ assoc_class,
+ return_constructor=int):
+ """
+ Wait until the job finishes and return ``(ret, outparams)``just as
+ ``InvokeMethod`` would.
+
+ It's hard to reconstruct these outparams, since the embedded
+ instances / ojects do not work in our ``CIMOMS``, therefore special
+ care is needed.
+
+ :param jobname: Name of the job.
+ :type jobname: :py:class:`pywbem.CIMInstanceName`
+ :param callable return_constructor: Callable, which converts
+ string to the right type, for example ``int``.
+ :returns: ``(retvalue, outparams)`` in the same way as
+ ``finish_method()`` would.
+ :rtype: tuple
+ """
+ # Use busy loop for now
+ # TODO: rework to something sane
+ while True:
+ job = self.wbemconnection.GetInstance(jobname)
+ if job['JobState'] > 5: # all states higher than 5 are final
+ break
+ time.sleep(0.1)
+
+ # get the MethodResult
+ resultname = self.wbemconnection.AssociatorNames(
+ jobname,
+ AssocClass=assoc_class)[0]
+ result = self.wbemconnection.GetInstance(resultname)
+ ind = result['PostCallIndication']
+ # check for error
+ if ind['Error'] is not None:
+ err = ind['Error'][0]
+ code = err['CIMStatusCode']
+ msg = err['Message']
+ raise pywbem.CIMError(code, msg)
+
+ ret = return_constructor(ind['ReturnValue'])
+
+ # convert output parameters to format returned by InvokeMethod
+ outparams = pywbem.NocaseDict()
+ try:
+ params = ind['MethodParameters']
+ except KeyError:
+ params = {}
+ if params:
+ for (key, value) in params.iteritems():
+ outparams[key] = value
+
+ return (ret, outparams)
+
+ def invoke_async_method(self,
+ method_name,
+ object_name,
+ return_constructor=int,
+ *args, **kwargs):
+ """
+ Invoke a method and if it returns a job, wait for the job.
+
+ :param string method_name: Name of the method.
+ :param object_name: Instance, on which the method should be invoked.
+ :type object_name: :py:class:`pywbem.CIMInstanceName`
+ :param callable return_constructor: Callable, which converts
+ string to the right type, for example ``int``.
+ :param list args: Positional arguments passed to invoked method.
+ :param dictionary kwargs: Keyword arguments passed to invoked method.
+ :returns: ``(retvalue, outparams)`` in the same way as
+ ``finish_method()`` would.
+ :rtype: tuple
+ """
+ (ret, outparams) = self.wbemconnection.InvokeMethod(
+ method_name,
+ object_name,
+ *args,
+ **kwargs)
+ if ret == JOB_CREATED:
+ # wait for the job
+ jobname = outparams['Job']
+ (ret, outparams) = self.finish_job(jobname, return_constructor)
+ return (ret, outparams)