summaryrefslogtreecommitdiffstats
path: root/src/python/lmi/test
diff options
context:
space:
mode:
authorTomas Bzatek <tbzatek@redhat.com>2013-12-16 14:20:08 +0100
committerTomas Bzatek <tbzatek@redhat.com>2014-01-03 13:33:24 +0100
commit6c56489a82e34a07089da832fb6e71ff69262177 (patch)
treed5117bbd8f27fcd3753630ef5947c0122ffaa4d3 /src/python/lmi/test
parentddbe7ec6f7c62d0223ce211938e56d24c0205ef4 (diff)
downloadopenlmi-providers-6c56489a82e34a07089da832fb6e71ff69262177.tar.gz
openlmi-providers-6c56489a82e34a07089da832fb6e71ff69262177.tar.xz
openlmi-providers-6c56489a82e34a07089da832fb6e71ff69262177.zip
Port indication test base to lmishell
Historically indication tests have been using pywbem machinery, which is now obsolete as lmishell gained proper indication support recently.
Diffstat (limited to 'src/python/lmi/test')
-rw-r--r--src/python/lmi/test/lmibase.py81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/python/lmi/test/lmibase.py b/src/python/lmi/test/lmibase.py
index e1b1b29..242c002 100644
--- a/src/python/lmi/test/lmibase.py
+++ b/src/python/lmi/test/lmibase.py
@@ -23,12 +23,15 @@ Base class and utilities for test suits written upon lmi shell.
import functools
import inspect
import pywbem
+import random
+import Queue
from lmi.test import base
from lmi.shell import connect
from lmi.shell import LMIInstance
from lmi.shell import LMIInstanceName
from lmi.shell import LMIUtil
+from lmi.shell import LMIIndicationListener
def enable_lmi_exceptions(method):
"""
@@ -66,11 +69,34 @@ class LmiTestCase(base.BaseLmiTestCase):
#: wrapper of this class.
CLASS_NAME = None
+ #: Says, whether the test case needs indication listener running or not.
+ #: Each subclass shall override this property and set it to ``True`` if
+ #: it wants to test indication events.
+ NEEDS_INDICATIONS = False
+
_SYSTEM_INAME = None
@classmethod
+ def needs_indications(cls):
+ """
+ Whether the indication listener should be started for this test case.
+ In subclasses override ``NEEDS_INDICATIONS`` property and set it to
+ ``True`` if indication testing is desired.
+ """
+ return cls.NEEDS_INDICATIONS
+
+ @classmethod
def setUpClass(cls):
base.BaseLmiTestCase.setUpClass.im_func(cls)
+ if cls.needs_indications():
+ cls.indication_port = random.randint(12000, 13000)
+ cls.indication_queue = Queue.Queue()
+ cls.listener = LMIIndicationListener("0.0.0.0", cls.indication_port)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.needs_indications():
+ cls.listener.stop()
@property
def conn(self):
@@ -167,3 +193,58 @@ class LmiTestCase(base.BaseLmiTestCase):
candidates = [to_cim_object(c) for c in candidates]
base.BaseLmiTestCase.assertCIMNameIn(self, name, candidates)
+ def _process_indication(self, ind, **kwargs):
+ """ Callback to process one indication."""
+ exported_objects = ind.exported_objects()
+ for i in exported_objects:
+ self.indication_queue.put(i)
+
+ def get_indication(self, timeout):
+ """ Wait for an indication for given nr. of seconds and return it."""
+ try:
+ indication = self.indication_queue.get(timeout=timeout)
+ except Queue.Empty:
+ raise AssertionError("Timeout when waiting for indication")
+ self.indication_queue.task_done()
+ return indication
+
+ def subscribe(self, filter_name, query, querylang="DMTF:CQL"):
+ """
+ Create indication subscription for given filter name.
+ """
+ if not self.needs_indications():
+ raise Exception("can not subscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+
+ # Connect the listener
+ subscription = self.listener.add_handler(filter_name + "-XXXXXXXX", self._process_indication)
+
+ # Subscribe the indication
+ ret = self.conn.subscribe_indication(QueryLanguage=querylang,
+ Query=query,
+ Name=subscription,
+ CreationNamespace="root/interop",
+ SubscriptionCreationClassName="CIM_IndicationSubscription",
+ FilterCreationClassName="CIM_IndicationFilter",
+ FilterSystemCreationClassName=self.system_cs_name,
+ FilterSourceNamespace="root/cimv2",
+ HandlerCreationClassName="CIM_IndicationHandlerCIMXML",
+ HandlerSystemCreationClassName="CIM_ComputerSystem",
+ Destination="http://localhost:%d" % (self.indication_port)
+ )
+ if not ret or not ret.rval:
+ raise AssertionError("Indication subscription failed")
+
+ # Start the listener if not running already
+ if not self.listener.is_alive:
+ self.listener.start()
+ return subscription
+
+ def unsubscribe(self, filter_name):
+ """
+ Unsubscribe from given filter.
+ """
+ if not self.needs_indications():
+ raise Exception("can not unsubscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+ self.conn.unsubscribe_indication(filter_name)