summaryrefslogtreecommitdiffstats
path: root/src/account/test/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/account/test/common.py')
-rw-r--r--src/account/test/common.py158
1 files changed, 158 insertions, 0 deletions
diff --git a/src/account/test/common.py b/src/account/test/common.py
index c56e94d..39442af 100644
--- a/src/account/test/common.py
+++ b/src/account/test/common.py
@@ -21,11 +21,56 @@
import pywbem
import os
import unittest
+import Queue
+import random
+import BaseHTTPServer
+import socket
+import threading
"""
Base class for all tests
"""
+class CIMListener(object):
+ """ CIM Listener
+ """
+ class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_POST(self):
+ data = self.rfile.read(int(self.headers['Content-Length']))
+ tt = pywbem.parse_cim(pywbem.xml_to_tupletree(data))
+ # Get the instance from CIM-XML, copied from
+ # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial
+ insts = [x[1] for x in tt[2][2][0][2][2]]
+ for inst in insts:
+ self.callback(inst)
+ self.send_response(200)
+ self.end_headers()
+
+ def log_message(self, format, *p):
+ # suppress log messages
+ pass
+
+ def __init__(self, callback, http_port=5988):
+ self.address = ('', http_port)
+ self.CIMHandler.callback = callback
+ self.thread = None
+ self.server = None
+
+ def start(self):
+ BaseHTTPServer.HTTPServer.allow_reuse_address = True
+ self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.start()
+
+ def stop(self):
+ if self.server is not None:
+ self.server.shutdown()
+ self.server.socket.close()
+ if self.thread is not None:
+ self.thread.join()
+
+ def running(self):
+ return self.thread is not None
class AccountBase(unittest.TestCase):
@@ -44,3 +89,116 @@ class AccountBase(unittest.TestCase):
self.wbemconnection = pywbem.WBEMConnection(self.url,
(self.username, self.password))
+ # for indications
+ self.indication_port = random.randint(12000, 13000)
+ self.indication_queue = Queue.Queue()
+ self.listener = CIMListener(
+ callback=self._process_indication,
+ http_port=self.indication_port)
+
+ self.subscribed = {}
+
+ def tearDown(self):
+ self.listener.stop()
+ if self.subscribed:
+ for name in self.subscribed.keys():
+ self.unsubscribe(name)
+
+ def get_indication(self, timeout):
+ """ Wait for an indication for given nr. of seconds and return it."""
+ try:
+ indication = self.indication_queue.get(timeout=timeout)
+ except Queue.Empty:
+ raise AssertionError("Timeout when waiting for indicaiton")
+ self.indication_queue.task_done()
+ return indication
+
+ def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"):
+ """
+ Create indication subscription for given filter name.
+ """
+ namespace = "root/PG_interop"
+ hostname = socket.gethostname()
+
+ if query is not None:
+ # Create filter first
+ filterinst = pywbem.CIMInstance('CIM_IndicationFilter')
+ filterinst['CreationClassName'] = 'CIM_IndicationFilter'
+ filterinst['SystemCreationClassName'] = 'CIM_ComputerSystem'
+ filterinst['SystemName'] = hostname
+ filterinst['Name'] = filter_name
+ filterinst['Query'] = query
+ filterinst['QueryLanguage'] = querylang
+ filterinst['SourceNamespace'] = "root/cimv2"#namespace
+ cop = pywbem.CIMInstanceName('CIM_IndicationFilter')
+ cop.keybindings = { 'CreationClassName': 'CIM_IndicationFilter',
+ 'SystemClassName': 'CIM_ComputerSystem',
+ 'SystemName': hostname,
+ 'Name': filter_name
+ }
+ cop.namespace=namespace
+ filterinst.path = cop
+ indfilter = self.wbemconnection.CreateInstance(filterinst)
+ else:
+ # the filter is already created, assemble its name
+ indfilter = pywbem.CIMInstanceName(
+ classname="CIM_IndicationFilter",
+ namespace=namespace,
+ keybindings={
+ 'CreationClassName': 'CIM_IndicationFilter',
+ 'SystemClassName': 'CIM_ComputerSystem',
+ 'SystemName': hostname,
+ 'Name': filter_name})
+
+ # create destination
+ destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML')
+ destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML'
+ destinst['SystemCreationClassName'] = 'CIM_ComputerSystem'
+ destinst['SystemName'] = hostname
+ destinst['Name'] = filter_name
+ destinst['Destination'] = "http://localhost:%d" % (self.indication_port)
+ destinst['PersistenceType'] = pywbem.Uint16(3) # Transient
+ cop = pywbem.CIMInstanceName('CIM_ListenerDestinationCIMXML')
+ cop.keybindings = { 'CreationClassName':'CIM_ListenerDestinationCIMXML',
+ 'SystemClassName':'CIM_ComputerSystem',
+ 'SystemName':hostname,
+ 'Name':filter_name }
+ cop.namespace = namespace
+ destinst.path = cop
+ destname = self.wbemconnection.CreateInstance(destinst)
+
+ # create the subscription
+ subinst = pywbem.CIMInstance('CIM_IndicationSubscription')
+ subinst['Filter'] = indfilter
+ subinst['Handler'] = destname
+ cop = pywbem.CIMInstanceName('CIM_IndicationSubscription')
+ cop.keybindings = { 'Filter': indfilter,
+ 'Handler': destname }
+ cop.namespace = namespace
+ subinst.path = cop
+ subscription = self.wbemconnection.CreateInstance(subinst)
+
+ self.subscribed[filter_name] = [subscription, destname]
+
+ # start listening
+ if not self.listener.running():
+ self._start_listening()
+ return subscription
+
+ def unsubscribe(self, filter_name):
+ """
+ Unsubscribe fron given filter.
+ """
+ _list = self.subscribed.pop(filter_name)
+ for instance in _list:
+ self.wbemconnection.DeleteInstance(instance)
+
+ def _start_listening(self):
+ """ Start listening for incoming indications. """
+ self.listener.start()
+
+ def _process_indication(self, indication):
+ """ Callback to process one indication."""
+ self.indication_queue.put(indication)
+
+