summaryrefslogtreecommitdiffstats
path: root/src/python
diff options
context:
space:
mode:
authorTomas Bzatek <tbzatek@redhat.com>2013-12-16 14:20:08 +0100
committerTomas Bzatek <tbzatek@redhat.com>2014-01-03 13:33:24 +0100
commit6c56489a82e34a07089da832fb6e71ff69262177 (patch)
treed5117bbd8f27fcd3753630ef5947c0122ffaa4d3 /src/python
parentddbe7ec6f7c62d0223ce211938e56d24c0205ef4 (diff)
Port indication test base to lmishell
Historically indication tests have been using pywbem machinery, which is now obsolete as lmishell gained proper indication support recently.
Diffstat (limited to 'src/python')
-rw-r--r--src/python/lmi/test/lmibase.py81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/python/lmi/test/lmibase.py b/src/python/lmi/test/lmibase.py
index e1b1b29..242c002 100644
--- a/src/python/lmi/test/lmibase.py
+++ b/src/python/lmi/test/lmibase.py
@@ -23,12 +23,15 @@ Base class and utilities for test suits written upon lmi shell.
import functools
import inspect
import pywbem
+import random
+import Queue
from lmi.test import base
from lmi.shell import connect
from lmi.shell import LMIInstance
from lmi.shell import LMIInstanceName
from lmi.shell import LMIUtil
+from lmi.shell import LMIIndicationListener
def enable_lmi_exceptions(method):
"""
@@ -66,11 +69,34 @@ class LmiTestCase(base.BaseLmiTestCase):
#: wrapper of this class.
CLASS_NAME = None
+ #: Says, whether the test case needs indication listener running or not.
+ #: Each subclass shall override this property and set it to ``True`` if
+ #: it wants to test indication events.
+ NEEDS_INDICATIONS = False
+
_SYSTEM_INAME = None
@classmethod
+ def needs_indications(cls):
+ """
+ Whether the indication listener should be started for this test case.
+ In subclasses override ``NEEDS_INDICATIONS`` property and set it to
+ ``True`` if indication testing is desired.
+ """
+ return cls.NEEDS_INDICATIONS
+
+ @classmethod
def setUpClass(cls):
base.BaseLmiTestCase.setUpClass.im_func(cls)
+ if cls.needs_indications():
+ cls.indication_port = random.randint(12000, 13000)
+ cls.indication_queue = Queue.Queue()
+ cls.listener = LMIIndicationListener("0.0.0.0", cls.indication_port)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.needs_indications():
+ cls.listener.stop()
@property
def conn(self):
@@ -167,3 +193,58 @@ class LmiTestCase(base.BaseLmiTestCase):
candidates = [to_cim_object(c) for c in candidates]
base.BaseLmiTestCase.assertCIMNameIn(self, name, candidates)
+ def _process_indication(self, ind, **kwargs):
+ """ Callback to process one indication."""
+ exported_objects = ind.exported_objects()
+ for i in exported_objects:
+ self.indication_queue.put(i)
+
+ def get_indication(self, timeout):
+ """ Wait for an indication for given nr. of seconds and return it."""
+ try:
+ indication = self.indication_queue.get(timeout=timeout)
+ except Queue.Empty:
+ raise AssertionError("Timeout when waiting for indication")
+ self.indication_queue.task_done()
+ return indication
+
+ def subscribe(self, filter_name, query, querylang="DMTF:CQL"):
+ """
+ Create indication subscription for given filter name.
+ """
+ if not self.needs_indications():
+ raise Exception("can not subscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+
+ # Connect the listener
+ subscription = self.listener.add_handler(filter_name + "-XXXXXXXX", self._process_indication)
+
+ # Subscribe the indication
+ ret = self.conn.subscribe_indication(QueryLanguage=querylang,
+ Query=query,
+ Name=subscription,
+ CreationNamespace="root/interop",
+ SubscriptionCreationClassName="CIM_IndicationSubscription",
+ FilterCreationClassName="CIM_IndicationFilter",
+ FilterSystemCreationClassName=self.system_cs_name,
+ FilterSourceNamespace="root/cimv2",
+ HandlerCreationClassName="CIM_IndicationHandlerCIMXML",
+ HandlerSystemCreationClassName="CIM_ComputerSystem",
+ Destination="http://localhost:%d" % (self.indication_port)
+ )
+ if not ret or not ret.rval:
+ raise AssertionError("Indication subscription failed")
+
+ # Start the listener if not running already
+ if not self.listener.is_alive:
+ self.listener.start()
+ return subscription
+
+ def unsubscribe(self, filter_name):
+ """
+ Unsubscribe from given filter.
+ """
+ if not self.needs_indications():
+ raise Exception("can not unsubscribe to indications, enable them"
+ " with NEEDS_INDICATIONS")
+ self.conn.unsubscribe_indication(filter_name)