summaryrefslogtreecommitdiffstats
path: root/src/python
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-10-31 15:10:22 +0100
committerMichal Minar <miminar@redhat.com>2013-11-04 10:58:12 +0100
commitd4dfe71963b9a9721a136eff7015ff80275f1706 (patch)
treef77a63996a3d86009c4d611ef054ea1b0e8457a3 /src/python
parentcfd18ce3b74065bb432028868e379336ebe46fdd (diff)
downloadopenlmi-providers-d4dfe71963b9a9721a136eff7015ff80275f1706.tar.gz
openlmi-providers-d4dfe71963b9a9721a136eff7015ff80275f1706.tar.xz
openlmi-providers-d4dfe71963b9a9721a136eff7015ff80275f1706.zip
tests: added Base test cases for cim and lmi shell tests
Base test case class is renamed to BaseLmiTest. But it's not intended for direct use in provider tests. For this purpose two more classes are added. Namely LmiTestCase and CIMTestCase. Tests wanting to take advantage of lmi shell's power should subclass LmiTestCase. Those based on pure pywbem code shall subclass the other one. Added lots of convenience methods. Allowed to use lmi.shell abstractions if available. Useful utilities are moved to standalone util module.
Diffstat (limited to 'src/python')
-rw-r--r--src/python/lmi/test/base.py384
-rw-r--r--src/python/lmi/test/cimbase.py362
-rw-r--r--src/python/lmi/test/lmibase.py169
-rw-r--r--src/python/lmi/test/util.py131
4 files changed, 696 insertions, 350 deletions
diff --git a/src/python/lmi/test/base.py b/src/python/lmi/test/base.py
index eb00914..1522979 100644
--- a/src/python/lmi/test/base.py
+++ b/src/python/lmi/test/base.py
@@ -22,122 +22,36 @@
Base classes for *OpenLMI Provider* test cases.
"""
-import BaseHTTPServer
import os
import pywbem
-import Queue
-import random
import socket
-import threading
-import time
import unittest
-JOB_CREATED = 4096
+from lmi.test import util
-def get_environvar(variable, default='', convert=str):
+def render_iname(iname, indent=2):
"""
- Get the value of environment variable.
-
- :param string variable: Name of environment variable.
- :param default: Any value that should be returned when the variable is not
- set. If None, the conversion won't be done.
- :param callable convert: Function transforming value to something else.
- :returns: Converted value of the environment variable.
- """
- val = os.environ.get(variable, default)
- if convert is bool:
- return val.lower() in ('true', 'yes', 'on', '1')
- if val is None:
- return None
- return convert(val)
-
-def mark_dangerous(method):
- """
- Decorator for methods of :py:class:`unittest.TestCase` subclasses that
- skips dangerous tests if an environment variable says so.
- ``LMI_RUN_DANGEROUS`` is the environment variabled read.
-
- These tests will be skipped by default.
+ Render object path in human readable way. Result will occupy multiple
+ lines. The first line is a class name, which is not indented at all. Other
+ lines will be indented with *indent* spaces.
+
+ :param iname: Object path to render.
+ :type iname: :py:class:`pywbem.CIMInstanceName`
+ :param integer ident: Number of spaces prefixing all lines but the first.
+ :returns: *iname* nicely rendered.
+ :rtype: string
"""
- if get_environvar('LMI_RUN_DANGEROUS', '0', bool):
- return method
- else:
- return unittest.skip("This test is marked as dangerous.")(method)
-
-def mark_tedious(method):
- """
- Decorator for methods of :py:class:`unittest.TestCase` subclasses that
- skips tedious tests. Those running for very long time and usually need a
- lot of memory. They are run by default. Environment variable
- ``LMI_RUN_TEDIOUS`` can be used to skip them.
- """
- if get_environvar('LMI_RUN_TEDIOUS', '1', bool):
- return method
- else:
- return unittest.skip("This test is marked as tedious.")(method)
-
-def compare_cim_name(first, second):
- """
- Compare two CIMInstanceNames. Their ``host`` property is not checked.
- """
- if not isinstance(first, pywbem.CIMInstanceName):
- raise TypeError("both arguments must be a pywbem.CIMInstanceName")
- if not isinstance(second, pywbem.CIMInstanceName):
- raise TypeError("both arguments must be a pywbem.CIMInstanceName")
- return ( first.classname == second.classname
- and first.namespace == second.namespace
- and first.keybindings == second.keybindings)
-
-class CIMListener(object):
- """ CIM Listener for indication events. """
-
- class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler):
- """ Handler for the POST request from *CIMOM*. """
- def do_POST(self):
- """ Handle the POST request. """
- data = self.rfile.read(int(self.headers['Content-Length']))
- ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data))
- # Get the instance from CIM-XML, copied from
- # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial
- insts = [x[1] for x in ttree[2][2][0][2][2]]
- for inst in insts:
- self.callback(inst)
- self.send_response(200)
- self.end_headers()
-
- def log_message(self, _fmt, *_args):
- # suppress log messages
- pass
-
- def __init__(self, callback, http_port=5988):
- self.address = ('', http_port)
- self.CIMHandler.callback = callback
- self.thread = None
- self.server = None
-
- def start(self):
- """ Start listening. """
- BaseHTTPServer.HTTPServer.allow_reuse_address = True
- self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler)
- self.thread = threading.Thread(target=self.server.serve_forever)
- self.thread.start()
-
- def stop(self):
- """ Stop listening. """
- if self.server is not None:
- self.server.shutdown()
- self.server.socket.close()
- if self.thread is not None:
- self.thread.join()
-
- def running(self):
- """
- Are we listening for events?
- :rtype: bool
- """
- return self.thread is not None
-
-class LmiTestCase(unittest.TestCase):
+ lines = [ "%s" % iname.classname
+ , " "*indent + "namespace: %s" % iname.namespace
+ , " "*indent + "keys:"]
+ align = max([len(k) for k in iname.keybindings.iterkeys()])
+ for key, value in iname.keybindings.iteritems():
+ if isinstance(value, pywbem.CIMInstanceName):
+ value = render_iname(value, indent + 4)
+ lines.append(" "*indent + (" %%-%ds : %%s" % align) % (key, value))
+ return "\n".join(lines)
+
+class BaseLmiTestCase(unittest.TestCase):
"""
Base class for all LMI test cases.
"""
@@ -145,25 +59,13 @@ class LmiTestCase(unittest.TestCase):
#: Value used in ``SystemName`` key properties in various *CIM* instances.
#: It's also used to fill ``CIM_ComputerySystem.Name`` property.
SYSTEM_NAME = socket.gethostname()
- #: Says, whether the test case needs indication listener running or not.
- #: Each subclass shall override this property and set it to ``True`` if
- #: it want to test indication events.
- NEEDS_INDICATIONS = False
-
- @classmethod
- def needs_indications(cls):
- """
- Whether the indication listener should be started for this test case.
- In subclasses override ``NEEDS_INDICATIONS`` property and set it to
- ``True`` if indication testing is desired.
- """
- return cls.NEEDS_INDICATIONS
@classmethod
def setUpClass(cls):
#: Cached value of SystemCreationClassName set with
#: ``LMI_CS_CLASSNAME`` environment variable.
- cls.system_cs_name = os.environ.get("LMI_CS_CLASSNAME", "PG_ComputerSystem")
+ cls.system_cs_name = os.environ.get(
+ "LMI_CS_CLASSNAME", "PG_ComputerSystem")
#: *URL* of *CIMOM* we connect to. Overriden with ``LMI_CIMOM_URL``
#: environment variable.
cls.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989")
@@ -180,44 +82,11 @@ class LmiTestCase(unittest.TestCase):
#: Boolean value saying whether to run dangerous tests. These are marked
#: with :py:func:`mark_dangerous` decorator. This is set with
#: ``LMI_RUN_DANGEROUS`` environment variable.
- cls.run_dangerous = get_environvar('LMI_RUN_DANGEROUS', '0', bool)
+ cls.run_dangerous = util.get_environvar('LMI_RUN_DANGEROUS', '0', bool)
#: Boolean value saying whether to run tedious tests. These are marked
#: with :py:func:`mark_tedious` decorator. This is set with
#: ``LMI_RUN_TEDIOUS`` environment variable.
- cls.run_tedious = get_environvar('LMI_RUN_TEDIOUS', '1', bool)
-
- #: Active connection to *CIMOM*.
- cls.wbemconnection = pywbem.WBEMConnection(cls.url,
- (cls.username, cls.password))
-
- if cls.needs_indications():
- cls.indication_port = random.randint(12000, 13000)
- cls.indication_queue = Queue.Queue()
- cls.listener = CIMListener(
- callback=cls._process_indication,
- http_port=cls.indication_port)
-
- @classmethod
- def tearDownClass(cls):
- if cls.needs_indications():
- cls.listener.stop()
-
- @classmethod
- def _start_listening(self):
- """ Start listening for incoming indications. """
- self.listener.start()
-
- @classmethod
- def _process_indication(self, indication):
- """ Callback to process one indication."""
- self.indication_queue.put(indication)
-
- def setUp(self):
- self.subscribed = {}
-
- def tearDown(self):
- for name in self.subscribed.keys():
- self.unsubscribe(name)
+ cls.run_tedious = util.get_environvar('LMI_RUN_TEDIOUS', '1', bool)
def assertRaisesCIM(self, cim_err_code, func, *args, **kwds):
"""
@@ -228,12 +97,15 @@ class LmiTestCase(unittest.TestCase):
func(*args, **kwds)
self.assertEqual(cim_err_code, cm.exception.args[0])
- def assertCIMNameEquals(self, first, second):
+ def assertCIMNameEqual(self, fst, snd, msg=None):
"""
- Compare two CIMInstanceNames. Their host properties are not checked.
+ Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their host
+ properties are not checked.
"""
- self.assertTrue(compare_cim_name(first, second),
- "%s != %s" % (str(first), str(second)))
+ if msg is None:
+ msg = ( "%s\n\nis not equal to: %s"
+ % (render_iname(fst), render_iname(snd)))
+ self.assertTrue(util.check_inames_equal(fst, snd), msg)
def assertCIMNameIn(self, name, candidates):
"""
@@ -241,7 +113,7 @@ class LmiTestCase(unittest.TestCase):
set of candidates. It compares all properties but ``host``.
"""
for candidate in candidates:
- if compare_cim_name(name, candidate):
+ if util.check_inames_equal(name, candidate):
return
self.assertTrue(False, 'name "%s" is not in candidates' % str(name))
@@ -257,191 +129,3 @@ class LmiTestCase(unittest.TestCase):
snd_dict[key.lower()] = value
self.assertEqual(fst_dict, snd_dict, msg)
- def get_indication(self, timeout):
- """ Wait for an indication for given nr. of seconds and return it."""
- try:
- indication = self.indication_queue.get(timeout=timeout)
- except Queue.Empty:
- raise AssertionError("Timeout when waiting for indication")
- self.indication_queue.task_done()
- return indication
-
- def make_filter_iname(self, filter_name):
- """
- Create an instance name of ``CIM_IndicationFilter``.
-
- :rtype: :py:class:`pywbem.CIMInstanceName`
- """
- return pywbem.CIMInstanceName(
- classname="CIM_IndicationFilter",
- namespace="root/interop",
- keybindings={
- 'CreationClassName': 'CIM_IndicationFilter',
- 'SystemClassName': self.system_cs_name,
- 'SystemName': self.SYSTEM_NAME,
- 'Name': filter_name})
-
- def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"):
- """
- Create an instance of ``CIM_IndicationFilter``.
-
- :rtype: :py:class:`pywbem.CIMInstance`
- """
- inst = pywbem.CIMInstance('CIM_IndicationFilter')
- inst['CreationClassName'] = 'CIM_IndicationFilter'
- inst['SystemCreationClassName'] = self.system_cs_name
- inst['SystemName'] = self.SYSTEM_NAME
- inst['Name'] = filter_name
- inst['Query'] = query
- inst['QueryLanguage'] = query_lang
- inst['SourceNamespace'] = "root/cimv2"
- inst.path = self.make_filter_iname(filter_name)
- return inst
-
- def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"):
- """
- Create indication subscription for given filter name.
- """
- if not self.needs_indications():
- raise Exception("can not subscribe to indications, enable them"
- " with NEEDS_INDICATIONS")
-
- if query is not None:
- # Create filter first
- indfilter = self.wbemconnection.CreateInstance(
- self.make_filter_inst(filter_name, query, querylang))
- else:
- # the filter is already created, assemble its name
- indfilter = self.make_filter_iname(filter_name)
-
- # create destination
- destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML')
- destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML'
- destinst['SystemCreationClassName'] = self.system_cs_name
- destinst['SystemName'] = self.SYSTEM_NAME
- destinst['Name'] = filter_name
- destinst['Destination'] = "http://localhost:%d" % (self.indication_port)
- destinst['PersistenceType'] = pywbem.Uint16(3) # Transient
- cop = pywbem.CIMInstanceName(
- 'CIM_ListenerDestinationCIMXML', namespace="root/interop")
- cop.keybindings = {
- 'CreationClassName' : 'CIM_ListenerDestinationCIMXML',
- 'SystemClassName' : self.system_cs_name,
- 'SystemName' : self.SYSTEM_NAME,
- 'Name' : filter_name }
- destinst.path = cop
- destname = self.wbemconnection.CreateInstance(destinst)
-
- # create the subscription
- subinst = pywbem.CIMInstance(
- 'CIM_IndicationSubscription')
- subinst['Filter'] = indfilter
- subinst['Handler'] = destname
- cop = pywbem.CIMInstanceName(
- 'CIM_IndicationSubscription', namespace="root/interop")
- cop.keybindings = {
- 'Filter': indfilter,
- 'Handler': destname }
- subinst.path = cop
- subscription = self.wbemconnection.CreateInstance(subinst)
-
- self.subscribed[filter_name] = [subscription, destname]
-
- if not self.listener.running():
- self._start_listening()
- return subscription
-
- def unsubscribe(self, filter_name):
- """
- Unsubscribe from given filter.
- """
- if not self.needs_indications():
- raise Exception("can not unsubscribe to indications, enable them"
- " with NEEDS_INDICATIONS")
- for instance in self.subscribed.pop(filter_name):
- self.wbemconnection.DeleteInstance(instance)
-
- def finish_job(self,
- jobname,
- assoc_class,
- return_constructor=int):
- """
- Wait until the job finishes and return ``(ret, outparams)``just as
- ``InvokeMethod`` would.
-
- It's hard to reconstruct these outparams, since the embedded
- instances / ojects do not work in our ``CIMOMS``, therefore special
- care is needed.
-
- :param jobname: Name of the job.
- :type jobname: :py:class:`pywbem.CIMInstanceName`
- :param callable return_constructor: Callable, which converts
- string to the right type, for example ``int``.
- :returns: ``(retvalue, outparams)`` in the same way as
- ``finish_method()`` would.
- :rtype: tuple
- """
- # Use busy loop for now
- # TODO: rework to something sane
- while True:
- job = self.wbemconnection.GetInstance(jobname)
- if job['JobState'] > 5: # all states higher than 5 are final
- break
- time.sleep(0.1)
-
- # get the MethodResult
- resultname = self.wbemconnection.AssociatorNames(
- jobname,
- AssocClass=assoc_class)[0]
- result = self.wbemconnection.GetInstance(resultname)
- ind = result['PostCallIndication']
- # check for error
- if ind['Error'] is not None:
- err = ind['Error'][0]
- code = err['CIMStatusCode']
- msg = err['Message']
- raise pywbem.CIMError(code, msg)
-
- ret = return_constructor(ind['ReturnValue'])
-
- # convert output parameters to format returned by InvokeMethod
- outparams = pywbem.NocaseDict()
- try:
- params = ind['MethodParameters']
- except KeyError:
- params = {}
- if params:
- for (key, value) in params.iteritems():
- outparams[key] = value
-
- return (ret, outparams)
-
- def invoke_async_method(self,
- method_name,
- object_name,
- return_constructor=int,
- *args, **kwargs):
- """
- Invoke a method and if it returns a job, wait for the job.
-
- :param string method_name: Name of the method.
- :param object_name: Instance, on which the method should be invoked.
- :type object_name: :py:class:`pywbem.CIMInstanceName`
- :param callable return_constructor: Callable, which converts
- string to the right type, for example ``int``.
- :param list args: Positional arguments passed to invoked method.
- :param dictionary kwargs: Keyword arguments passed to invoked method.
- :returns: ``(retvalue, outparams)`` in the same way as
- ``finish_method()`` would.
- :rtype: tuple
- """
- (ret, outparams) = self.wbemconnection.InvokeMethod(
- method_name,
- object_name,
- *args,
- **kwargs)
- if ret == JOB_CREATED:
- # wait for the job
- jobname = outparams['Job']
- (ret, outparams) = self.finish_job(jobname, return_constructor)
- return (ret, outparams)
diff --git a/src/python/lmi/test/cimbase.py b/src/python/lmi/test/cimbase.py
new file mode 100644
index 0000000..def5ba5
--- /dev/null
+++ b/src/python/lmi/test/cimbase.py
@@ -0,0 +1,362 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+# Authors: Roman Rakus <rrakus@redhat.com>
+#
+"""
+Base class and utilities for test suits written with plain pywbem
+abastractions.
+"""
+
+import BaseHTTPServer
+import pywbem
+import Queue
+import random
+import threading
+import time
+
+from lmi.test import base
+
+JOB_CREATED = 4096
+
+class CIMListener(object):
+ """ CIM Listener for indication events. """
+
+ class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """ Handler for the POST request from *CIMOM*. """
+ def do_POST(self):
+ """ Handle the POST request. """
+ data = self.rfile.read(int(self.headers['Content-Length']))
+ ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data))
+ # Get the instance from CIM-XML, copied from
+ # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial
+ insts = [x[1] for x in ttree[2][2][0][2][2]]
+ for inst in insts:
+ self.callback(inst)
+ self.send_response(200)
+ self.end_headers()
+
+ def log_message(self, _fmt, *_args):
+ # suppress log messages
+ pass
+
+ def __init__(self, callback, http_port=5988):
+ self.address = ('', http_port)
+ self.CIMHandler.callback = callback
+ self.thread = None
+ self.server = None
+
+ def start(self):
+ """ Start listening. """
+ BaseHTTPServer.HTTPServer.allow_reuse_address = True
+ self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.start()
+
+ def stop(self):
+ """ Stop listening. """
+ if self.server is not None:
+ self.server.shutdown()
+ self.server.socket.close()
+ if self.thread is not None:
+ self.thread.join()
+
+ def running(self):
+ """
+ Are we listening for events?
+ :rtype: bool
+ """
+ return self.thread is not None
+
+class CIMTestCase(base.BaseLmiTestCase):
+ """
+ Base class for LMI test cases based on plain pywbem.
+ """
+
+ #: Says, whether the test case needs indication listener running or not.
+ #: Each subclass shall override this property and set it to ``True`` if
+ #: it want to test indication events.
+ NEEDS_INDICATIONS = False
+
+ _WBEMCONNECTION = None
+ _SYSTEM_INAME = None
+
+ @classmethod
+ def needs_indications(cls):
+ """
+ Whether the indication listener should be started for this test case.
+ In subclasses override ``NEEDS_INDICATIONS`` property and set it to
+ ``True`` if indication testing is desired.
+ """
+ return cls.NEEDS_INDICATIONS
+
+ @classmethod
+ def setUpClass(cls):
+ base.BaseLmiTestCase.setUpClass.im_func(cls)
+ if cls.needs_indications():
+ cls.indication_port = random.randint(12000, 13000)
+ cls.indication_queue = Queue.Queue()
+ cls.listener = CIMListener(
+ callback=cls._process_indication,
+ http_port=cls.indication_port)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.needs_indications():
+ cls.listener.stop()
+
+ @classmethod
+ def _start_listening(cls):
+ """ Start listening for incoming indications. """
+ cls.listener.start()
+
+ @classmethod
+ def _process_indication(cls, indication):
+ """ Callback to process one indication."""
+ cls.indication_queue.put(indication)
+
+ @property
+ def wbemconnection(self):
+ """
+ :returns: Active connection to *CIMOM*.
+ :rtype: :py:class:`pywbem.WBEMConnection`
+ """
+ if CIMTestCase._WBEMCONNECTION is None:
+ CIMTestCase._WBEMCONNECTION = pywbem.WBEMConnection(self.url,
+ (self.username, self.password))
+ return CIMTestCase._WBEMCONNECTION
+
+ @property
+ def system_iname(self):
+ """
+ :returns: Instance of ``CIM_ComputerSystem`` registered with *CIMOM*.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
+ """
+ if CIMTestCase._SYSTEM_INAME is None:
+ CIMTestCase._SYSTEM_INAME = self.wbemconnection. \
+ EnumerateInstanceNames(self.system_cs_name, 'root/cimv2')[0]
+ return CIMTestCase._SYSTEM_INAME.copy()
+
+ def setUp(self):
+ self.subscribed = {}
+
+ def tearDown(self):
+ for name in self.subscribed.keys():
+ self.unsubscribe(name)
+
+ def assertCIMIsSubclass(self, cls, base_cls):
+ """
+ Checks, whether cls is subclass of base_cls from CIM perspective.
+ @param cls name of subclass
+ @param base_cls name of base class
+ """
+ if not isinstance(cls, basestring):
+ raise TypeError("cls must be a string")
+ if not isinstance(base_cls, basestring):
+ raise TypeError("base_cls must be a string")
+ return self.assertTrue(pywbem.is_subclass(self.wbemconnection,
+ "root/cimv2", base_cls, cls))
+
+ def get_indication(self, timeout):
+ """ Wait for an indication for given nr. of seconds and return it."""
+ try:
+ indication = self.indication_queue.get(timeout=timeout)
+ except Queue.Empty:
+ raise AssertionError("Timeout when waiting for indication")
+ self.indication_queue.task_done()
+ return indication
+
+ def make_filter_iname(self, filter_name):
+ """
+ Create an instance name of ``CIM_IndicationFilter``.
+
+ :rtype: :py:class:`pywbem.CIMInstanceName`
+ """
+ return pywbem.CIMInstanceName(
+ classname="CIM_IndicationFilter",
+ namespace="root/interop",
+ keybindings={
+ 'CreationClassName': 'CIM_IndicationFilter',
+ 'SystemClassName': self.system_cs_name,
+ 'SystemName': self.SYSTEM_NAME,
+ 'Name': filter_name})
+
+ def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"):
+ """
+ Create an instance of ``CIM_IndicationFilter``.
+
+ :rtype: :py:class:`pywbem.CIMInstance`
+ """
+ inst = pywbem.CIMInstance('CIM_IndicationFilter')
+ inst['CreationClassName'] = 'CIM_IndicationFilter'
+ inst['SystemCreationClassName'] = self.system_cs_name
+ inst['SystemName'] = self.SYSTEM_NAME
+ inst['Name'] = filter_name
+ inst['Query'] = query
+ inst['QueryLanguage'] = query_lang
+ inst['SourceNamespace'] = "root/cimv2"
+ inst.path = self.make_filter_iname(filter_name)
+ return inst
+
+ def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"):
+ """
+ Create indication subscription for given filter name.
+ """
+ if not self.needs_indications():
+ raise Exception("can not subscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+
+ if query is not None:
+ # Create filter first
+ indfilter = self.wbemconnection.CreateInstance(
+ self.make_filter_inst(filter_name, query, querylang))
+ else:
+ # the filter is already created, assemble its name
+ indfilter = self.make_filter_iname(filter_name)
+
+ # create destination
+ destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML')
+ destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML'
+ destinst['SystemCreationClassName'] = self.system_cs_name
+ destinst['SystemName'] = self.SYSTEM_NAME
+ destinst['Name'] = filter_name
+ destinst['Destination'] = "http://localhost:%d" % (self.indication_port)
+ destinst['PersistenceType'] = pywbem.Uint16(3) # Transient
+ cop = pywbem.CIMInstanceName(
+ 'CIM_ListenerDestinationCIMXML', namespace="root/interop")
+ cop.keybindings = {
+ 'CreationClassName' : 'CIM_ListenerDestinationCIMXML',
+ 'SystemClassName' : self.system_cs_name,
+ 'SystemName' : self.SYSTEM_NAME,
+ 'Name' : filter_name }
+ destinst.path = cop
+ destname = self.wbemconnection.CreateInstance(destinst)
+
+ # create the subscription
+ subinst = pywbem.CIMInstance(
+ 'CIM_IndicationSubscription')
+ subinst['Filter'] = indfilter
+ subinst['Handler'] = destname
+ cop = pywbem.CIMInstanceName(
+ 'CIM_IndicationSubscription', namespace="root/interop")
+ cop.keybindings = {
+ 'Filter': indfilter,
+ 'Handler': destname }
+ subinst.path = cop
+ subscription = self.wbemconnection.CreateInstance(subinst)
+
+ self.subscribed[filter_name] = [subscription, destname]
+
+ if not self.listener.running():
+ self._start_listening()
+ return subscription
+
+ def unsubscribe(self, filter_name):
+ """
+ Unsubscribe from given filter.
+ """
+ if not self.needs_indications():
+ raise Exception("can not unsubscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+ for instance in self.subscribed.pop(filter_name):
+ self.wbemconnection.DeleteInstance(instance)
+
+ def finish_job(self,
+ jobname,
+ assoc_class,
+ return_constructor=int):
+ """
+ Wait until the job finishes and return ``(ret, outparams)``just as
+ ``InvokeMethod`` would.
+
+ It's hard to reconstruct these outparams, since the embedded
+ instances / ojects do not work in our ``CIMOMS``, therefore special
+ care is needed.
+
+ :param jobname: Name of the job.
+ :type jobname: :py:class:`pywbem.CIMInstanceName`
+ :param callable return_constructor: Callable, which converts
+ string to the right type, for example ``int``.
+ :returns: ``(retvalue, outparams)`` in the same way as
+ ``finish_method()`` would.
+ :rtype: tuple
+ """
+ # Use busy loop for now
+ # TODO: rework to something sane
+ while True:
+ job = self.wbemconnection.GetInstance(jobname)
+ if job['JobState'] > 5: # all states higher than 5 are final
+ break
+ time.sleep(0.1)
+
+ # get the MethodResult
+ resultname = self.wbemconnection.AssociatorNames(
+ jobname,
+ AssocClass=assoc_class)[0]
+ result = self.wbemconnection.GetInstance(resultname)
+ ind = result['PostCallIndication']
+ # check for error
+ if ind['Error'] is not None:
+ err = ind['Error'][0]
+ code = err['CIMStatusCode']
+ msg = err['Message']
+ raise pywbem.CIMError(code, msg)
+
+ ret = return_constructor(ind['ReturnValue'])
+
+ # convert output parameters to format returned by InvokeMethod
+ outparams = pywbem.NocaseDict()
+ try:
+ params = ind['MethodParameters']
+ except KeyError:
+ params = {}
+ if params:
+ for (key, value) in params.iteritems():
+ outparams[key] = value
+
+ return (ret, outparams)
+
+ def invoke_async_method(self,
+ method_name,
+ object_name,
+ return_constructor=int,
+ *args, **kwargs):
+ """
+ Invoke a method and if it returns a job, wait for the job.
+
+ :param string method_name: Name of the method.
+ :param object_name: Instance, on which the method should be invoked.
+ :type object_name: :py:class:`pywbem.CIMInstanceName`
+ :param callable return_constructor: Callable, which converts
+ string to the right type, for example ``int``.
+ :param list args: Positional arguments passed to invoked method.
+ :param dictionary kwargs: Keyword arguments passed to invoked method.
+ :returns: ``(retvalue, outparams)`` in the same way as
+ ``finish_method()`` would.
+ :rtype: tuple
+ """
+ (ret, outparams) = self.wbemconnection.InvokeMethod(
+ method_name,
+ object_name,
+ *args,
+ **kwargs)
+ if ret == JOB_CREATED:
+ # wait for the job
+ jobname = outparams['Job']
+ (ret, outparams) = self.finish_job(jobname, return_constructor)
+ return (ret, outparams)
diff --git a/src/python/lmi/test/lmibase.py b/src/python/lmi/test/lmibase.py
new file mode 100644
index 0000000..e1b1b29
--- /dev/null
+++ b/src/python/lmi/test/lmibase.py
@@ -0,0 +1,169 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Base class and utilities for test suits written upon lmi shell.
+"""
+
+import functools
+import inspect
+import pywbem
+
+from lmi.test import base
+from lmi.shell import connect
+from lmi.shell import LMIInstance
+from lmi.shell import LMIInstanceName
+from lmi.shell import LMIUtil
+
+def enable_lmi_exceptions(method):
+ """
+ Function or method decorator enabling exceptions to be raised from under
+ lmi shell intestines.
+ """
+ @functools.wraps(method)
+ def _wrapper(*args, **kwargs):
+ """ Enable exceptions in wrapped function. """
+ original = LMIUtil.lmi_get_use_exceptions()
+ LMIUtil.lmi_set_use_exceptions(True)
+ try:
+ retval = method(*args, **kwargs)
+ finally:
+ LMIUtil.lmi_set_use_exceptions(original)
+ return retval
+
+ return _wrapper
+
+def to_cim_object(obj):
+ """
+ :returns: Wrapped object of from inside of shell abstractions.
+ """
+ if isinstance(obj, (LMIInstance, LMIInstanceName)):
+ return obj.wrapped_object
+ return obj
+
+class LmiTestCase(base.BaseLmiTestCase):
+ """
+ Base class for all LMI test cases based on lmi shell.
+ """
+
+ #: Once you override this in subclass with a name of CIM class to be tested,
+ #: you can use :py:attr:`LmiTestCase.cim_class` to get reference to a shell
+ #: wrapper of this class.
+ CLASS_NAME = None
+
+ _SYSTEM_INAME = None
+
+ @classmethod
+ def setUpClass(cls):
+ base.BaseLmiTestCase.setUpClass.im_func(cls)
+
+ @property
+ def conn(self):
+ """
+ :returns: Active connection to *CIMOM* wrapped by *lmi shell*
+ abstraction.
+ :rtype: :py:class:`lmi.shell.LMIConnection`
+ """
+ if not hasattr(self, '_shellconnection'):
+ kwargs = {}
+ con_argspec = inspect.getargspec(connect)
+ # support older versions of lmi shell
+ if 'verify_server_cert' in con_argspec.args or con_argspec.keywords:
+ # newer name
+ kwargs['verify_server_cert'] = False
+ elif 'verify_certificate' in con_argspec.args:
+ # older one
+ kwargs['verify_certificate'] = False
+ self._shellconnection = connect(
+ self.url, self.username, self.password, **kwargs)
+ return self._shellconnection
+
+ @property
+ def ns(self):
+ """
+ :returns: Namespace object representing CIM ``"root/cimv2"`` namespace.
+ :rtype: :py:class:`lmi.shell.LMINamespace`
+ """
+ return self.conn.root.cimv2
+
+ @property
+ def cim_class(self):
+ """
+ A convenience accessor to ``self.conn.root.cimv2.<CLASS_NAME>``.
+ You need to override ``CLASS_NAME`` attribute of this class in order
+ to use this.
+
+ :returns: Lmi shell wrapper of CIM class to be tested.
+ :rtype: :py:class:`lmi.shell.LMIClass`
+ """
+ if self.CLASS_NAME is None:
+ return None
+ return getattr(self.ns, self.CLASS_NAME)
+
+ @property
+ def system_iname(self):
+ """
+ :returns: Instance of ``CIM_ComputerSystem`` registered with *CIMOM*.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
+ """
+ if LmiTestCase._SYSTEM_INAME is None:
+ LmiTestCase._SYSTEM_INAME = getattr(self.ns, self.system_cs_name) \
+ .first_instance_name()
+ return LmiTestCase._SYSTEM_INAME.copy()
+
+ def assertCIMIsSubclass(self, cls, base_cls):
+ """
+ Checks, whether cls is subclass of base_cls from CIM perspective.
+ @param cls name of subclass
+ @param base_cls name of base class
+ """
+ if not isinstance(cls, basestring):
+ raise TypeError("cls must be a string")
+ if not isinstance(base_cls, basestring):
+ raise TypeError("base_cls must be a string")
+ return self.assertTrue(pywbem.is_subclass(self.conn._client._cliconn,
+ "root/cimv2", base_cls, cls))
+
+ def assertRaisesCIM(self, cim_err_code, func, *args, **kwds):
+ """
+ This test passes if given function called with supplied arguments
+ raises :py:class:`pywbem.CIMError` with given cim error code.
+ """
+ base.BaseLmiTestCase.assertRaisesCIM(
+ self, cim_err_code, func, *args, **kwds)
+
+ def assertCIMNameEqual(self, fst, snd, msg=None):
+ """
+ Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their host
+ properties are not checked.
+ """
+ base.BaseLmiTestCase.assertCIMNameEqual(
+ self,
+ to_cim_object(fst),
+ to_cim_object(snd),
+ msg)
+
+ def assertCIMNameIn(self, name, candidates):
+ """
+ Checks that given :py:class:`pywbem.CIMInstanceName` is present in
+ set of candidates. It compares all properties but ``host``.
+ """
+ name = to_cim_object(name)
+ candidates = [to_cim_object(c) for c in candidates]
+ base.BaseLmiTestCase.assertCIMNameIn(self, name, candidates)
+
diff --git a/src/python/lmi/test/util.py b/src/python/lmi/test/util.py
new file mode 100644
index 0000000..32d04c4
--- /dev/null
+++ b/src/python/lmi/test/util.py
@@ -0,0 +1,131 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+LMI test utilities.
+"""
+
+import os
+import pywbem
+import socket
+import unittest
+
+def is_this_system(system_name):
+ """
+ :returns: Whether the given *system_name* matches the hostname of currently
+ running system.
+ :rtype: boolean
+ """
+ return ( socket.gethostbyaddr(system_name)[0]
+ == socket.gethostbyaddr(socket.gethostname())[0])
+
+def get_environvar(variable, default='', convert=str):
+ """
+ Get the value of environment variable.
+
+ :param string variable: Name of environment variable.
+ :param default: Any value that should be returned when the variable is not
+ set. If None, the conversion won't be done.
+ :param callable convert: Function transforming value to something else.
+ :returns: Converted value of the environment variable.
+ """
+ val = os.environ.get(variable, default)
+ if convert is bool:
+ return val.lower() in ('true', 'yes', 'on', '1')
+ if val is None:
+ return None
+ return convert(val)
+
+def mark_dangerous(method):
+ """
+ Decorator for methods of :py:class:`unittest.TestCase` subclasses that
+ skips dangerous tests if an environment variable says so.
+ ``LMI_RUN_DANGEROUS`` is the environment variabled read.
+
+ These tests will be skipped by default.
+ """
+ if get_environvar('LMI_RUN_DANGEROUS', '0', bool):
+ return method
+ else:
+ return unittest.skip("This test is marked as dangerous.")(method)
+
+def mark_tedious(method):
+ """
+ Decorator for methods of :py:class:`unittest.TestCase` subclasses that
+ skips tedious tests. Those running for very long time and usually need a
+ lot of memory. They are run by default. Environment variable
+ ``LMI_RUN_TEDIOUS`` can be used to skip them.
+ """
+ if get_environvar('LMI_RUN_TEDIOUS', '1', bool):
+ return method
+ else:
+ return unittest.skip("This test is marked as tedious.")(method)
+
+def check_inames_equal(fst, snd):
+ """
+ Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their ``host``
+ property is not checked. Be benevolent when checking names system
+ creation class names.
+
+ :returns: ``True`` if both instance names are equal.
+ :rtype: boolean
+ """
+ if not isinstance(fst, pywbem.CIMInstanceName):
+ raise TypeError("fst argument must be a pywbem.CIMInstanceName, not %s"
+ % repr(fst))
+ if not isinstance(snd, pywbem.CIMInstanceName):
+ raise TypeError("snd argument must be a pywbem.CIMInstanceName, not %s"
+ % repr(snd))
+ if fst.classname != snd.classname or fst.namespace != snd.namespace:
+ return False
+
+ snd_keys = { k: v for k, v in snd.keybindings.iteritems() }
+ for key, value in fst.keybindings.iteritems():
+ if key not in snd_keys:
+ return False
+ snd_value = snd_keys.pop(key)
+ if ( isinstance(value, pywbem.CIMInstanceName)
+ and isinstance(snd_value, pywbem.CIMInstanceName)):
+ if not check_inames_equal(value, snd_value):
+ return False
+
+ # accept also aliases in the Name attribute of ComputerSystem
+ elif ( ( fst.classname.endswith('_ComputerSystem')
+ and key.lower() == 'name')
+ or ( key.lower() == 'systemname'
+ and 'SystemCreationClassName' in fst)):
+ if ( value != snd_value
+ and ( not is_this_system(value)
+ or not is_this_system(snd_value))):
+ return False
+
+ elif ( fst.classname.endswith('_ComputerSystem')
+ and key.lower() == 'creationclassname'):
+ if ( value != snd_value
+ and 'CIM_ComputerSystem' not in [
+ p['CreationClassName'] for p in (fst, snd)]):
+ return False
+
+ elif value != snd_value:
+ return False
+
+ if snd_keys: # second path has more key properties than first one
+ return False
+
+ return True
+