summaryrefslogtreecommitdiffstats
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/lmi/test/base.py384
-rw-r--r--src/python/lmi/test/cimbase.py362
-rw-r--r--src/python/lmi/test/lmibase.py169
-rw-r--r--src/python/lmi/test/util.py131
4 files changed, 696 insertions, 350 deletions
diff --git a/src/python/lmi/test/base.py b/src/python/lmi/test/base.py
index eb00914..1522979 100644
--- a/src/python/lmi/test/base.py
+++ b/src/python/lmi/test/base.py
@@ -22,122 +22,36 @@
Base classes for *OpenLMI Provider* test cases.
"""
-import BaseHTTPServer
import os
import pywbem
-import Queue
-import random
import socket
-import threading
-import time
import unittest
-JOB_CREATED = 4096
+from lmi.test import util
-def get_environvar(variable, default='', convert=str):
+def render_iname(iname, indent=2):
"""
- Get the value of environment variable.
-
- :param string variable: Name of environment variable.
- :param default: Any value that should be returned when the variable is not
- set. If None, the conversion won't be done.
- :param callable convert: Function transforming value to something else.
- :returns: Converted value of the environment variable.
- """
- val = os.environ.get(variable, default)
- if convert is bool:
- return val.lower() in ('true', 'yes', 'on', '1')
- if val is None:
- return None
- return convert(val)
-
-def mark_dangerous(method):
- """
- Decorator for methods of :py:class:`unittest.TestCase` subclasses that
- skips dangerous tests if an environment variable says so.
- ``LMI_RUN_DANGEROUS`` is the environment variabled read.
-
- These tests will be skipped by default.
+ Render object path in human readable way. Result will occupy multiple
+ lines. The first line is a class name, which is not indented at all. Other
+ lines will be indented with *indent* spaces.
+
+ :param iname: Object path to render.
+ :type iname: :py:class:`pywbem.CIMInstanceName`
+ :param integer ident: Number of spaces prefixing all lines but the first.
+ :returns: *iname* nicely rendered.
+ :rtype: string
"""
- if get_environvar('LMI_RUN_DANGEROUS', '0', bool):
- return method
- else:
- return unittest.skip("This test is marked as dangerous.")(method)
-
-def mark_tedious(method):
- """
- Decorator for methods of :py:class:`unittest.TestCase` subclasses that
- skips tedious tests. Those running for very long time and usually need a
- lot of memory. They are run by default. Environment variable
- ``LMI_RUN_TEDIOUS`` can be used to skip them.
- """
- if get_environvar('LMI_RUN_TEDIOUS', '1', bool):
- return method
- else:
- return unittest.skip("This test is marked as tedious.")(method)
-
-def compare_cim_name(first, second):
- """
- Compare two CIMInstanceNames. Their ``host`` property is not checked.
- """
- if not isinstance(first, pywbem.CIMInstanceName):
- raise TypeError("both arguments must be a pywbem.CIMInstanceName")
- if not isinstance(second, pywbem.CIMInstanceName):
- raise TypeError("both arguments must be a pywbem.CIMInstanceName")
- return ( first.classname == second.classname
- and first.namespace == second.namespace
- and first.keybindings == second.keybindings)
-
-class CIMListener(object):
- """ CIM Listener for indication events. """
-
- class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler):
- """ Handler for the POST request from *CIMOM*. """
- def do_POST(self):
- """ Handle the POST request. """
- data = self.rfile.read(int(self.headers['Content-Length']))
- ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data))
- # Get the instance from CIM-XML, copied from
- # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial
- insts = [x[1] for x in ttree[2][2][0][2][2]]
- for inst in insts:
- self.callback(inst)
- self.send_response(200)
- self.end_headers()
-
- def log_message(self, _fmt, *_args):
- # suppress log messages
- pass
-
- def __init__(self, callback, http_port=5988):
- self.address = ('', http_port)
- self.CIMHandler.callback = callback
- self.thread = None
- self.server = None
-
- def start(self):
- """ Start listening. """
- BaseHTTPServer.HTTPServer.allow_reuse_address = True
- self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler)
- self.thread = threading.Thread(target=self.server.serve_forever)
- self.thread.start()
-
- def stop(self):
- """ Stop listening. """
- if self.server is not None:
- self.server.shutdown()
- self.server.socket.close()
- if self.thread is not None:
- self.thread.join()
-
- def running(self):
- """
- Are we listening for events?
- :rtype: bool
- """
- return self.thread is not None
-
-class LmiTestCase(unittest.TestCase):
+ lines = [ "%s" % iname.classname
+ , " "*indent + "namespace: %s" % iname.namespace
+ , " "*indent + "keys:"]
+ align = max([len(k) for k in iname.keybindings.iterkeys()])
+ for key, value in iname.keybindings.iteritems():
+ if isinstance(value, pywbem.CIMInstanceName):
+ value = render_iname(value, indent + 4)
+ lines.append(" "*indent + (" %%-%ds : %%s" % align) % (key, value))
+ return "\n".join(lines)
+
+class BaseLmiTestCase(unittest.TestCase):
"""
Base class for all LMI test cases.
"""
@@ -145,25 +59,13 @@ class LmiTestCase(unittest.TestCase):
#: Value used in ``SystemName`` key properties in various *CIM* instances.
#: It's also used to fill ``CIM_ComputerySystem.Name`` property.
SYSTEM_NAME = socket.gethostname()
- #: Says, whether the test case needs indication listener running or not.
- #: Each subclass shall override this property and set it to ``True`` if
- #: it want to test indication events.
- NEEDS_INDICATIONS = False
-
- @classmethod
- def needs_indications(cls):
- """
- Whether the indication listener should be started for this test case.
- In subclasses override ``NEEDS_INDICATIONS`` property and set it to
- ``True`` if indication testing is desired.
- """
- return cls.NEEDS_INDICATIONS
@classmethod
def setUpClass(cls):
#: Cached value of SystemCreationClassName set with
#: ``LMI_CS_CLASSNAME`` environment variable.
- cls.system_cs_name = os.environ.get("LMI_CS_CLASSNAME", "PG_ComputerSystem")
+ cls.system_cs_name = os.environ.get(
+ "LMI_CS_CLASSNAME", "PG_ComputerSystem")
#: *URL* of *CIMOM* we connect to. Overriden with ``LMI_CIMOM_URL``
#: environment variable.
cls.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989")
@@ -180,44 +82,11 @@ class LmiTestCase(unittest.TestCase):
#: Boolean value saying whether to run dangerous tests. These are marked
#: with :py:func:`mark_dangerous` decorator. This is set with
#: ``LMI_RUN_DANGEROUS`` environment variable.
- cls.run_dangerous = get_environvar('LMI_RUN_DANGEROUS', '0', bool)
+ cls.run_dangerous = util.get_environvar('LMI_RUN_DANGEROUS', '0', bool)
#: Boolean value saying whether to run tedious tests. These are marked
#: with :py:func:`mark_tedious` decorator. This is set with
#: ``LMI_RUN_TEDIOUS`` environment variable.
- cls.run_tedious = get_environvar('LMI_RUN_TEDIOUS', '1', bool)
-
- #: Active connection to *CIMOM*.
- cls.wbemconnection = pywbem.WBEMConnection(cls.url,
- (cls.username, cls.password))
-
- if cls.needs_indications():
- cls.indication_port = random.randint(12000, 13000)
- cls.indication_queue = Queue.Queue()
- cls.listener = CIMListener(
- callback=cls._process_indication,
- http_port=cls.indication_port)
-
- @classmethod
- def tearDownClass(cls):
- if cls.needs_indications():
- cls.listener.stop()
-
- @classmethod
- def _start_listening(self):
- """ Start listening for incoming indications. """
- self.listener.start()
-
- @classmethod
- def _process_indication(self, indication):
- """ Callback to process one indication."""
- self.indication_queue.put(indication)
-
- def setUp(self):
- self.subscribed = {}
-
- def tearDown(self):
- for name in self.subscribed.keys():
- self.unsubscribe(name)
+ cls.run_tedious = util.get_environvar('LMI_RUN_TEDIOUS', '1', bool)
def assertRaisesCIM(self, cim_err_code, func, *args, **kwds):
"""
@@ -228,12 +97,15 @@ class LmiTestCase(unittest.TestCase):
func(*args, **kwds)
self.assertEqual(cim_err_code, cm.exception.args[0])
- def assertCIMNameEquals(self, first, second):
+ def assertCIMNameEqual(self, fst, snd, msg=None):
"""
- Compare two CIMInstanceNames. Their host properties are not checked.
+ Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their host
+ properties are not checked.
"""
- self.assertTrue(compare_cim_name(first, second),
- "%s != %s" % (str(first), str(second)))
+ if msg is None:
+ msg = ( "%s\n\nis not equal to: %s"
+ % (render_iname(fst), render_iname(snd)))
+ self.assertTrue(util.check_inames_equal(fst, snd), msg)
def assertCIMNameIn(self, name, candidates):
"""
@@ -241,7 +113,7 @@ class LmiTestCase(unittest.TestCase):
set of candidates. It compares all properties but ``host``.
"""
for candidate in candidates:
- if compare_cim_name(name, candidate):
+ if util.check_inames_equal(name, candidate):
return
self.assertTrue(False, 'name "%s" is not in candidates' % str(name))
@@ -257,191 +129,3 @@ class LmiTestCase(unittest.TestCase):
snd_dict[key.lower()] = value
self.assertEqual(fst_dict, snd_dict, msg)
- def get_indication(self, timeout):
- """ Wait for an indication for given nr. of seconds and return it."""
- try:
- indication = self.indication_queue.get(timeout=timeout)
- except Queue.Empty:
- raise AssertionError("Timeout when waiting for indication")
- self.indication_queue.task_done()
- return indication
-
- def make_filter_iname(self, filter_name):
- """
- Create an instance name of ``CIM_IndicationFilter``.
-
- :rtype: :py:class:`pywbem.CIMInstanceName`
- """
- return pywbem.CIMInstanceName(
- classname="CIM_IndicationFilter",
- namespace="root/interop",
- keybindings={
- 'CreationClassName': 'CIM_IndicationFilter',
- 'SystemClassName': self.system_cs_name,
- 'SystemName': self.SYSTEM_NAME,
- 'Name': filter_name})
-
- def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"):
- """
- Create an instance of ``CIM_IndicationFilter``.
-
- :rtype: :py:class:`pywbem.CIMInstance`
- """
- inst = pywbem.CIMInstance('CIM_IndicationFilter')
- inst['CreationClassName'] = 'CIM_IndicationFilter'
- inst['SystemCreationClassName'] = self.system_cs_name
- inst['SystemName'] = self.SYSTEM_NAME
- inst['Name'] = filter_name
- inst['Query'] = query
- inst['QueryLanguage'] = query_lang
- inst['SourceNamespace'] = "root/cimv2"
- inst.path = self.make_filter_iname(filter_name)
- return inst
-
- def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"):
- """
- Create indication subscription for given filter name.
- """
- if not self.needs_indications():
- raise Exception("can not subscribe to indications, enable them"
- " with NEEDS_INDICATIONS")
-
- if query is not None:
- # Create filter first
- indfilter = self.wbemconnection.CreateInstance(
- self.make_filter_inst(filter_name, query, querylang))
- else:
- # the filter is already created, assemble its name
- indfilter = self.make_filter_iname(filter_name)
-
- # create destination
- destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML')
- destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML'
- destinst['SystemCreationClassName'] = self.system_cs_name
- destinst['SystemName'] = self.SYSTEM_NAME
- destinst['Name'] = filter_name
- destinst['Destination'] = "http://localhost:%d" % (self.indication_port)
- destinst['PersistenceType'] = pywbem.Uint16(3) # Transient
- cop = pywbem.CIMInstanceName(
- 'CIM_ListenerDestinationCIMXML', namespace="root/interop")
- cop.keybindings = {
- 'CreationClassName' : 'CIM_ListenerDestinationCIMXML',
- 'SystemClassName' : self.system_cs_name,
- 'SystemName' : self.SYSTEM_NAME,
- 'Name' : filter_name }
- destinst.path = cop
- destname = self.wbemconnection.CreateInstance(destinst)
-
- # create the subscription
- subinst = pywbem.CIMInstance(
- 'CIM_IndicationSubscription')
- subinst['Filter'] = indfilter
- subinst['Handler'] = destname
- cop = pywbem.CIMInstanceName(
- 'CIM_IndicationSubscription', namespace="root/interop")
- cop.keybindings = {
- 'Filter': indfilter,
- 'Handler': destname }
- subinst.path = cop
- subscription = self.wbemconnection.CreateInstance(subinst)
-
- self.subscribed[filter_name] = [subscription, destname]
-
- if not self.listener.running():
- self._start_listening()
- return subscription
-
- def unsubscribe(self, filter_name):
- """
- Unsubscribe from given filter.
- """
- if not self.needs_indications():
- raise Exception("can not unsubscribe to indications, enable them"
- " with NEEDS_INDICATIONS")
- for instance in self.subscribed.pop(filter_name):
- self.wbemconnection.DeleteInstance(instance)
-
- def finish_job(self,
- jobname,
- assoc_class,
- return_constructor=int):
- """
- Wait until the job finishes and return ``(ret, outparams)``just as
- ``InvokeMethod`` would.
-
- It's hard to reconstruct these outparams, since the embedded
- instances / ojects do not work in our ``CIMOMS``, therefore special
- care is needed.
-
- :param jobname: Name of the job.
- :type jobname: :py:class:`pywbem.CIMInstanceName`
- :param callable return_constructor: Callable, which converts
- string to the right type, for example ``int``.
- :returns: ``(retvalue, outparams)`` in the same way as
- ``finish_method()`` would.
- :rtype: tuple
- """
- # Use busy loop for now
- # TODO: rework to something sane
- while True:
- job = self.wbemconnection.GetInstance(jobname)
- if job['JobState'] > 5: # all states higher than 5 are final
- break
- time.sleep(0.1)
-
- # get the MethodResult
- resultname = self.wbemconnection.AssociatorNames(
- jobname,
- AssocClass=assoc_class)[0]
- result = self.wbemconnection.GetInstance(resultname)
- ind = result['PostCallIndication']
- # check for error
- if ind['Error'] is not None:
- err = ind['Error'][0]
- code = err['CIMStatusCode']
- msg = err['Message']
- raise pywbem.CIMError(code, msg)
-
- ret = return_constructor(ind['ReturnValue'])
-
- # convert output parameters to format returned by InvokeMethod
- outparams = pywbem.NocaseDict()
- try:
- params = ind['MethodParameters']
- except KeyError:
- params = {}
- if params:
- for (key, value) in params.iteritems():
- outparams[key] = value
-
- return (ret, outparams)
-
- def invoke_async_method(self,
- method_name,
- object_name,
- return_constructor=int,
- *args, **kwargs):
- """
- Invoke a method and if it returns a job, wait for the job.
-
- :param string method_name: Name of the method.
- :param object_name: Instance, on which the method should be invoked.
- :type object_name: :py:class:`pywbem.CIMInstanceName`
- :param callable return_constructor: Callable, which converts
- string to the right type, for example ``int``.
- :param list args: Positional arguments passed to invoked method.
- :param dictionary kwargs: Keyword arguments passed to invoked method.
- :returns: ``(retvalue, outparams)`` in the same way as
- ``finish_method()`` would.
- :rtype: tuple
- """
- (ret, outparams) = self.wbemconnection.InvokeMethod(
- method_name,
- object_name,
- *args,
- **kwargs)
- if ret == JOB_CREATED:
- # wait for the job
- jobname = outparams['Job']
- (ret, outparams) = self.finish_job(jobname, return_constructor)
- return (ret, outparams)
diff --git a/src/python/lmi/test/cimbase.py b/src/python/lmi/test/cimbase.py
new file mode 100644
index 0000000..def5ba5
--- /dev/null
+++ b/src/python/lmi/test/cimbase.py
@@ -0,0 +1,362 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+# Authors: Roman Rakus <rrakus@redhat.com>
+#
+"""
+Base class and utilities for test suits written with plain pywbem
+abastractions.
+"""
+
+import BaseHTTPServer
+import pywbem
+import Queue
+import random
+import threading
+import time
+
+from lmi.test import base
+
+JOB_CREATED = 4096
+
+class CIMListener(object):
+ """ CIM Listener for indication events. """
+
+ class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """ Handler for the POST request from *CIMOM*. """
+ def do_POST(self):
+ """ Handle the POST request. """
+ data = self.rfile.read(int(self.headers['Content-Length']))
+ ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data))
+ # Get the instance from CIM-XML, copied from
+ # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial
+ insts = [x[1] for x in ttree[2][2][0][2][2]]
+ for inst in insts:
+ self.callback(inst)
+ self.send_response(200)
+ self.end_headers()
+
+ def log_message(self, _fmt, *_args):
+ # suppress log messages
+ pass
+
+ def __init__(self, callback, http_port=5988):
+ self.address = ('', http_port)
+ self.CIMHandler.callback = callback
+ self.thread = None
+ self.server = None
+
+ def start(self):
+ """ Start listening. """
+ BaseHTTPServer.HTTPServer.allow_reuse_address = True
+ self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.start()
+
+ def stop(self):
+ """ Stop listening. """
+ if self.server is not None:
+ self.server.shutdown()
+ self.server.socket.close()
+ if self.thread is not None:
+ self.thread.join()
+
+ def running(self):
+ """
+ Are we listening for events?
+ :rtype: bool
+ """
+ return self.thread is not None
+
+class CIMTestCase(base.BaseLmiTestCase):
+ """
+ Base class for LMI test cases based on plain pywbem.
+ """
+
+ #: Says, whether the test case needs indication listener running or not.
+ #: Each subclass shall override this property and set it to ``True`` if
+ #: it want to test indication events.
+ NEEDS_INDICATIONS = False
+
+ _WBEMCONNECTION = None
+ _SYSTEM_INAME = None
+
+ @classmethod
+ def needs_indications(cls):
+ """
+ Whether the indication listener should be started for this test case.
+ In subclasses override ``NEEDS_INDICATIONS`` property and set it to
+ ``True`` if indication testing is desired.
+ """
+ return cls.NEEDS_INDICATIONS
+
+ @classmethod
+ def setUpClass(cls):
+ base.BaseLmiTestCase.setUpClass.im_func(cls)
+ if cls.needs_indications():
+ cls.indication_port = random.randint(12000, 13000)
+ cls.indication_queue = Queue.Queue()
+ cls.listener = CIMListener(
+ callback=cls._process_indication,
+ http_port=cls.indication_port)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.needs_indications():
+ cls.listener.stop()
+
+ @classmethod
+ def _start_listening(cls):
+ """ Start listening for incoming indications. """
+ cls.listener.start()
+
+ @classmethod
+ def _process_indication(cls, indication):
+ """ Callback to process one indication."""
+ cls.indication_queue.put(indication)
+
+ @property
+ def wbemconnection(self):
+ """
+ :returns: Active connection to *CIMOM*.
+ :rtype: :py:class:`pywbem.WBEMConnection`
+ """
+ if CIMTestCase._WBEMCONNECTION is None:
+ CIMTestCase._WBEMCONNECTION = pywbem.WBEMConnection(self.url,
+ (self.username, self.password))
+ return CIMTestCase._WBEMCONNECTION
+
+ @property
+ def system_iname(self):
+ """
+ :returns: Instance of ``CIM_ComputerSystem`` registered with *CIMOM*.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
+ """
+ if CIMTestCase._SYSTEM_INAME is None:
+ CIMTestCase._SYSTEM_INAME = self.wbemconnection. \
+ EnumerateInstanceNames(self.system_cs_name, 'root/cimv2')[0]
+ return CIMTestCase._SYSTEM_INAME.copy()
+
+ def setUp(self):
+ self.subscribed = {}
+
+ def tearDown(self):
+ for name in self.subscribed.keys():
+ self.unsubscribe(name)
+
+ def assertCIMIsSubclass(self, cls, base_cls):
+ """
+ Checks, whether cls is subclass of base_cls from CIM perspective.
+ @param cls name of subclass
+ @param base_cls name of base class
+ """
+ if not isinstance(cls, basestring):
+ raise TypeError("cls must be a string")
+ if not isinstance(base_cls, basestring):
+ raise TypeError("base_cls must be a string")
+ return self.assertTrue(pywbem.is_subclass(self.wbemconnection,
+ "root/cimv2", base_cls, cls))
+
+ def get_indication(self, timeout):
+ """ Wait for an indication for given nr. of seconds and return it."""
+ try:
+ indication = self.indication_queue.get(timeout=timeout)
+ except Queue.Empty:
+ raise AssertionError("Timeout when waiting for indication")
+ self.indication_queue.task_done()
+ return indication
+
+ def make_filter_iname(self, filter_name):
+ """
+ Create an instance name of ``CIM_IndicationFilter``.
+
+ :rtype: :py:class:`pywbem.CIMInstanceName`
+ """
+ return pywbem.CIMInstanceName(
+ classname="CIM_IndicationFilter",
+ namespace="root/interop",
+ keybindings={
+ 'CreationClassName': 'CIM_IndicationFilter',
+ 'SystemClassName': self.system_cs_name,
+ 'SystemName': self.SYSTEM_NAME,
+ 'Name': filter_name})
+
+ def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"):
+ """
+ Create an instance of ``CIM_IndicationFilter``.
+
+ :rtype: :py:class:`pywbem.CIMInstance`
+ """
+ inst = pywbem.CIMInstance('CIM_IndicationFilter')
+ inst['CreationClassName'] = 'CIM_IndicationFilter'
+ inst['SystemCreationClassName'] = self.system_cs_name
+ inst['SystemName'] = self.SYSTEM_NAME
+ inst['Name'] = filter_name
+ inst['Query'] = query
+ inst['QueryLanguage'] = query_lang
+ inst['SourceNamespace'] = "root/cimv2"
+ inst.path = self.make_filter_iname(filter_name)
+ return inst
+
+ def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"):
+ """
+ Create indication subscription for given filter name.
+ """
+ if not self.needs_indications():
+ raise Exception("can not subscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+
+ if query is not None:
+ # Create filter first
+ indfilter = self.wbemconnection.CreateInstance(
+ self.make_filter_inst(filter_name, query, querylang))
+ else:
+ # the filter is already created, assemble its name
+ indfilter = self.make_filter_iname(filter_name)
+
+ # create destination
+ destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML')
+ destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML'
+ destinst['SystemCreationClassName'] = self.system_cs_name
+ destinst['SystemName'] = self.SYSTEM_NAME
+ destinst['Name'] = filter_name
+ destinst['Destination'] = "http://localhost:%d" % (self.indication_port)
+ destinst['PersistenceType'] = pywbem.Uint16(3) # Transient
+ cop = pywbem.CIMInstanceName(
+ 'CIM_ListenerDestinationCIMXML', namespace="root/interop")
+ cop.keybindings = {
+ 'CreationClassName' : 'CIM_ListenerDestinationCIMXML',
+ 'SystemClassName' : self.system_cs_name,
+ 'SystemName' : self.SYSTEM_NAME,
+ 'Name' : filter_name }
+ destinst.path = cop
+ destname = self.wbemconnection.CreateInstance(destinst)
+
+ # create the subscription
+ subinst = pywbem.CIMInstance(
+ 'CIM_IndicationSubscription')
+ subinst['Filter'] = indfilter
+ subinst['Handler'] = destname
+ cop = pywbem.CIMInstanceName(
+ 'CIM_IndicationSubscription', namespace="root/interop")
+ cop.keybindings = {
+ 'Filter': indfilter,
+ 'Handler': destname }
+ subinst.path = cop
+ subscription = self.wbemconnection.CreateInstance(subinst)
+
+ self.subscribed[filter_name] = [subscription, destname]
+
+ if not self.listener.running():
+ self._start_listening()
+ return subscription
+
+ def unsubscribe(self, filter_name):
+ """
+ Unsubscribe from given filter.
+ """
+ if not self.needs_indications():
+ raise Exception("can not unsubscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+ for instance in self.subscribed.pop(filter_name):
+ self.wbemconnection.DeleteInstance(instance)
+
+ def finish_job(self,
+ jobname,
+ assoc_class,
+ return_constructor=int):
+ """
+ Wait until the job finishes and return ``(ret, outparams)``just as
+ ``InvokeMethod`` would.
+
+ It's hard to reconstruct these outparams, since the embedded
+ instances / ojects do not work in our ``CIMOMS``, therefore special
+ care is needed.
+
+ :param jobname: Name of the job.
+ :type jobname: :py:class:`pywbem.CIMInstanceName`
+ :param callable return_constructor: Callable, which converts
+ string to the right type, for example ``int``.
+ :returns: ``(retvalue, outparams)`` in the same way as
+ ``finish_method()`` would.
+ :rtype: tuple
+ """
+ # Use busy loop for now
+ # TODO: rework to something sane
+ while True:
+ job = self.wbemconnection.GetInstance(jobname)
+ if job['JobState'] > 5: # all states higher than 5 are final
+ break
+ time.sleep(0.1)
+
+ # get the MethodResult
+ resultname = self.wbemconnection.AssociatorNames(
+ jobname,
+ AssocClass=assoc_class)[0]
+ result = self.wbemconnection.GetInstance(resultname)
+ ind = result['PostCallIndication']
+ # check for error
+ if ind['Error'] is not None:
+ err = ind['Error'][0]
+ code = err['CIMStatusCode']
+ msg = err['Message']
+ raise pywbem.CIMError(code, msg)
+
+ ret = return_constructor(ind['ReturnValue'])
+
+ # convert output parameters to format returned by InvokeMethod
+ outparams = pywbem.NocaseDict()
+ try:
+ params = ind['MethodParameters']
+ except KeyError:
+ params = {}
+ if params:
+ for (key, value) in params.iteritems():
+ outparams[key] = value
+
+ return (ret, outparams)
+
+ def invoke_async_method(self,
+ method_name,
+ object_name,
+ return_constructor=int,
+ *args, **kwargs):
+ """
+ Invoke a method and if it returns a job, wait for the job.
+
+ :param string method_name: Name of the method.
+ :param object_name: Instance, on which the method should be invoked.
+ :type object_name: :py:class:`pywbem.CIMInstanceName`
+ :param callable return_constructor: Callable, which converts
+ string to the right type, for example ``int``.
+ :param list args: Positional arguments passed to invoked method.
+ :param dictionary kwargs: Keyword arguments passed to invoked method.
+ :returns: ``(retvalue, outparams)`` in the same way as
+ ``finish_method()`` would.
+ :rtype: tuple
+ """
+ (ret, outparams) = self.wbemconnection.InvokeMethod(
+ method_name,
+ object_name,
+ *args,
+ **kwargs)
+ if ret == JOB_CREATED:
+ # wait for the job
+ jobname = outparams['Job']
+ (ret, outparams) = self.finish_job(jobname, return_constructor)
+ return (ret, outparams)
diff --git a/src/python/lmi/test/lmibase.py b/src/python/lmi/test/lmibase.py
new file mode 100644
index 0000000..e1b1b29
--- /dev/null
+++ b/src/python/lmi/test/lmibase.py
@@ -0,0 +1,169 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Base class and utilities for test suits written upon lmi shell.
+"""
+
+import functools
+import inspect
+import pywbem
+
+from lmi.test import base
+from lmi.shell import connect
+from lmi.shell import LMIInstance
+from lmi.shell import LMIInstanceName
+from lmi.shell import LMIUtil
+
+def enable_lmi_exceptions(method):
+ """
+ Function or method decorator enabling exceptions to be raised from under
+ lmi shell intestines.
+ """
+ @functools.wraps(method)
+ def _wrapper(*args, **kwargs):
+ """ Enable exceptions in wrapped function. """
+ original = LMIUtil.lmi_get_use_exceptions()
+ LMIUtil.lmi_set_use_exceptions(True)
+ try:
+ retval = method(*args, **kwargs)
+ finally:
+ LMIUtil.lmi_set_use_exceptions(original)
+ return retval
+
+ return _wrapper
+
+def to_cim_object(obj):
+ """
+ :returns: Wrapped object of from inside of shell abstractions.
+ """
+ if isinstance(obj, (LMIInstance, LMIInstanceName)):
+ return obj.wrapped_object
+ return obj
+
+class LmiTestCase(base.BaseLmiTestCase):
+ """
+ Base class for all LMI test cases based on lmi shell.
+ """
+
+ #: Once you override this in subclass with a name of CIM class to be tested,
+ #: you can use :py:attr:`LmiTestCase.cim_class` to get reference to a shell
+ #: wrapper of this class.
+ CLASS_NAME = None
+
+ _SYSTEM_INAME = None
+
+ @classmethod
+ def setUpClass(cls):
+ base.BaseLmiTestCase.setUpClass.im_func(cls)
+
+ @property
+ def conn(self):
+ """
+ :returns: Active connection to *CIMOM* wrapped by *lmi shell*
+ abstraction.
+ :rtype: :py:class:`lmi.shell.LMIConnection`
+ """
+ if not hasattr(self, '_shellconnection'):
+ kwargs = {}
+ con_argspec = inspect.getargspec(connect)
+ # support older versions of lmi shell
+ if 'verify_server_cert' in con_argspec.args or con_argspec.keywords:
+ # newer name
+ kwargs['verify_server_cert'] = False
+ elif 'verify_certificate' in con_argspec.args:
+ # older one
+ kwargs['verify_certificate'] = False
+ self._shellconnection = connect(
+ self.url, self.username, self.password, **kwargs)
+ return self._shellconnection
+
+ @property
+ def ns(self):
+ """
+ :returns: Namespace object representing CIM ``"root/cimv2"`` namespace.
+ :rtype: :py:class:`lmi.shell.LMINamespace`
+ """
+ return self.conn.root.cimv2
+
+ @property
+ def cim_class(self):
+ """
+ A convenience accessor to ``self.conn.root.cimv2.<CLASS_NAME>``.
+ You need to override ``CLASS_NAME`` attribute of this class in order
+ to use this.
+
+ :returns: Lmi shell wrapper of CIM class to be tested.
+ :rtype: :py:class:`lmi.shell.LMIClass`
+ """
+ if self.CLASS_NAME is None:
+ return None
+ return getattr(self.ns, self.CLASS_NAME)
+
+ @property
+ def system_iname(self):
+ """
+ :returns: Instance of ``CIM_ComputerSystem`` registered with *CIMOM*.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
+ """
+ if LmiTestCase._SYSTEM_INAME is None:
+ LmiTestCase._SYSTEM_INAME = getattr(self.ns, self.system_cs_name) \
+ .first_instance_name()
+ return LmiTestCase._SYSTEM_INAME.copy()
+
+ def assertCIMIsSubclass(self, cls, base_cls):
+ """
+ Checks, whether cls is subclass of base_cls from CIM perspective.
+ @param cls name of subclass
+ @param base_cls name of base class
+ """
+ if not isinstance(cls, basestring):
+ raise TypeError("cls must be a string")
+ if not isinstance(base_cls, basestring):
+ raise TypeError("base_cls must be a string")
+ return self.assertTrue(pywbem.is_subclass(self.conn._client._cliconn,
+ "root/cimv2", base_cls, cls))
+
+ def assertRaisesCIM(self, cim_err_code, func, *args, **kwds):
+ """
+ This test passes if given function called with supplied arguments
+ raises :py:class:`pywbem.CIMError` with given cim error code.
+ """
+ base.BaseLmiTestCase.assertRaisesCIM(
+ self, cim_err_code, func, *args, **kwds)
+
+ def assertCIMNameEqual(self, fst, snd, msg=None):
+ """
+ Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their host
+ properties are not checked.
+ """
+ base.BaseLmiTestCase.assertCIMNameEqual(
+ self,
+ to_cim_object(fst),
+ to_cim_object(snd),
+ msg)
+
+ def assertCIMNameIn(self, name, candidates):
+ """
+ Checks that given :py:class:`pywbem.CIMInstanceName` is present in
+ set of candidates. It compares all properties but ``host``.
+ """
+ name = to_cim_object(name)
+ candidates = [to_cim_object(c) for c in candidates]
+ base.BaseLmiTestCase.assertCIMNameIn(self, name, candidates)
+
diff --git a/src/python/lmi/test/util.py b/src/python/lmi/test/util.py
new file mode 100644
index 0000000..32d04c4
--- /dev/null
+++ b/src/python/lmi/test/util.py
@@ -0,0 +1,131 @@
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+LMI test utilities.
+"""
+
+import os
+import pywbem
+import socket
+import unittest
+
+def is_this_system(system_name):
+ """
+ :returns: Whether the given *system_name* matches the hostname of currently
+ running system.
+ :rtype: boolean
+ """
+ return ( socket.gethostbyaddr(system_name)[0]
+ == socket.gethostbyaddr(socket.gethostname())[0])
+
+def get_environvar(variable, default='', convert=str):
+ """
+ Get the value of environment variable.
+
+ :param string variable: Name of environment variable.
+ :param default: Any value that should be returned when the variable is not
+ set. If None, the conversion won't be done.
+ :param callable convert: Function transforming value to something else.
+ :returns: Converted value of the environment variable.
+ """
+ val = os.environ.get(variable, default)
+ if convert is bool:
+ return val.lower() in ('true', 'yes', 'on', '1')
+ if val is None:
+ return None
+ return convert(val)
+
+def mark_dangerous(method):
+ """
+ Decorator for methods of :py:class:`unittest.TestCase` subclasses that
+ skips dangerous tests if an environment variable says so.
+ ``LMI_RUN_DANGEROUS`` is the environment variabled read.
+
+ These tests will be skipped by default.
+ """
+ if get_environvar('LMI_RUN_DANGEROUS', '0', bool):
+ return method
+ else:
+ return unittest.skip("This test is marked as dangerous.")(method)
+
+def mark_tedious(method):
+ """
+ Decorator for methods of :py:class:`unittest.TestCase` subclasses that
+ skips tedious tests. Those running for very long time and usually need a
+ lot of memory. They are run by default. Environment variable
+ ``LMI_RUN_TEDIOUS`` can be used to skip them.
+ """
+ if get_environvar('LMI_RUN_TEDIOUS', '1', bool):
+ return method
+ else:
+ return unittest.skip("This test is marked as tedious.")(method)
+
+def check_inames_equal(fst, snd):
+ """
+ Compare two objects of :py:class:`pywbem.CIMInstanceName`. Their ``host``
+ property is not checked. Be benevolent when checking names system
+ creation class names.
+
+ :returns: ``True`` if both instance names are equal.
+ :rtype: boolean
+ """
+ if not isinstance(fst, pywbem.CIMInstanceName):
+ raise TypeError("fst argument must be a pywbem.CIMInstanceName, not %s"
+ % repr(fst))
+ if not isinstance(snd, pywbem.CIMInstanceName):
+ raise TypeError("snd argument must be a pywbem.CIMInstanceName, not %s"
+ % repr(snd))
+ if fst.classname != snd.classname or fst.namespace != snd.namespace:
+ return False
+
+ snd_keys = { k: v for k, v in snd.keybindings.iteritems() }
+ for key, value in fst.keybindings.iteritems():
+ if key not in snd_keys:
+ return False
+ snd_value = snd_keys.pop(key)
+ if ( isinstance(value, pywbem.CIMInstanceName)
+ and isinstance(snd_value, pywbem.CIMInstanceName)):
+ if not check_inames_equal(value, snd_value):
+ return False
+
+ # accept also aliases in the Name attribute of ComputerSystem
+ elif ( ( fst.classname.endswith('_ComputerSystem')
+ and key.lower() == 'name')
+ or ( key.lower() == 'systemname'
+ and 'SystemCreationClassName' in fst)):
+ if ( value != snd_value
+ and ( not is_this_system(value)
+ or not is_this_system(snd_value))):
+ return False
+
+ elif ( fst.classname.endswith('_ComputerSystem')
+ and key.lower() == 'creationclassname'):
+ if ( value != snd_value
+ and 'CIM_ComputerSystem' not in [
+ p['CreationClassName'] for p in (fst, snd)]):
+ return False
+
+ elif value != snd_value:
+ return False
+
+ if snd_keys: # second path has more key properties than first one
+ return False
+
+ return True
+