summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorRoman Rakus <rrakus@redhat.com>2013-07-15 21:10:32 +0200
committerRoman Rakus <rrakus@redhat.com>2013-07-17 12:51:32 +0200
commit9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2 (patch)
tree8daf939586a23d3e63d9aca31d34f8d4338358f3 /src
parent17627a29921cb6faab3a771421717e904fe32725 (diff)
downloadopenlmi-providers-9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2.tar.gz
openlmi-providers-9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2.tar.xz
openlmi-providers-9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2.zip
Account: Tests for indications
Signed-off-by: Roman Rakus <rrakus@redhat.com>
Diffstat (limited to 'src')
-rw-r--r--src/account/test/TestIndications.py100
-rw-r--r--src/account/test/common.py158
2 files changed, 258 insertions, 0 deletions
diff --git a/src/account/test/TestIndications.py b/src/account/test/TestIndications.py
new file mode 100644
index 0000000..7b34b2e
--- /dev/null
+++ b/src/account/test/TestIndications.py
@@ -0,0 +1,100 @@
+# Copyright (C) 2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Roman Rakus <rrakus@redhat.com>
+#
+
+from common import AccountBase
+import time
+from methods import *
+
+class TestIndications(AccountBase):
+ """
+ Class for testing LMI_Account indications
+ """
+ def test_check_good_filter(self):
+ """
+ Account: Test good indication filter
+ """
+ filter_name = "test_good_filter_%d" % time.time() * 1000
+ sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceCreationIndication where SourceInstance isa LMI_Account")
+ self.assertIsNotNone(sub)
+ self.unsubscribe(filter_name);
+
+ def test_check_bad_filter(self):
+ """
+ Account: Test bad indication filter
+ """
+ pass
+
+ def test_group_deletion_indication(self):
+ """
+ Account: Test indication of group deletion
+ """
+ create_group(self.group_name)
+ filter_name = "test_delete_group_%d" % time.time() * 1000
+ sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceDeletionIndication where SourceInstance isa LMI_Group")
+ clean_group(self.group_name)
+ indication = self.get_indication(10)
+ self.assertEqual(indication.classname, "LMI_AccountInstanceDeletionIndication")
+ self.assertIn("SourceInstance", indication.keys())
+ self.assertTrue(indication["SourceInstance"] is not None)
+ self.assertEqual(indication["SourceInstance"]["Name"], self.group_name)
+
+ def test_group_creation_indication(self):
+ """
+ Account: Test indication of group creation
+ """
+ clean_group(self.group_name)
+ filter_name = "test_create_group_%d" % time.time() * 1000
+ sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceCreationIndication where SourceInstance isa LMI_Group")
+ create_group(self.group_name)
+ indication = self.get_indication(10)
+ self.assertEqual(indication.classname, "LMI_AccountInstanceCreationIndication")
+ self.assertIn("SourceInstance", indication.keys())
+ self.assertTrue(indication["SourceInstance"] is not None)
+ self.assertEqual(indication["SourceInstance"]["Name"], self.group_name)
+ clean_group(self.group_name)
+
+ def test_account_deletion_indication(self):
+ """
+ Account: Test indication of account deletion
+ """
+ create_account(self.user_name)
+ filter_name = "test_delete_account_%d" % time.time() * 1000
+ sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceDeletionIndication where SourceInstance isa LMI_Account")
+ clean_account(self.user_name)
+ indication = self.get_indication(10)
+ self.assertEqual(indication.classname, "LMI_AccountInstanceDeletionIndication")
+ self.assertIn("SourceInstance", indication.keys())
+ self.assertTrue(indication["SourceInstance"] is not None)
+ self.assertEqual(indication["SourceInstance"]["Name"], self.user_name)
+
+ def test_account_creation_indication(self):
+ """
+ Account: Test indication of account creation
+ """
+ clean_account(self.user_name)
+ filter_name = "test_create_account_%d" % time.time() * 1000
+ sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceCreationIndication where SourceInstance isa LMI_Account")
+ create_account(self.user_name)
+ indication = self.get_indication(10)
+ self.assertEqual(indication.classname, "LMI_AccountInstanceCreationIndication")
+ self.assertIn("SourceInstance", indication.keys())
+ self.assertTrue(indication["SourceInstance"] is not None)
+ self.assertEqual(indication["SourceInstance"]["Name"], self.user_name)
+ clean_account(self.user_name)
+
diff --git a/src/account/test/common.py b/src/account/test/common.py
index c56e94d..39442af 100644
--- a/src/account/test/common.py
+++ b/src/account/test/common.py
@@ -21,11 +21,56 @@
import pywbem
import os
import unittest
+import Queue
+import random
+import BaseHTTPServer
+import socket
+import threading
"""
Base class for all tests
"""
+class CIMListener(object):
+ """ CIM Listener
+ """
+ class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_POST(self):
+ data = self.rfile.read(int(self.headers['Content-Length']))
+ tt = pywbem.parse_cim(pywbem.xml_to_tupletree(data))
+ # Get the instance from CIM-XML, copied from
+ # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial
+ insts = [x[1] for x in tt[2][2][0][2][2]]
+ for inst in insts:
+ self.callback(inst)
+ self.send_response(200)
+ self.end_headers()
+
+ def log_message(self, format, *p):
+ # suppress log messages
+ pass
+
+ def __init__(self, callback, http_port=5988):
+ self.address = ('', http_port)
+ self.CIMHandler.callback = callback
+ self.thread = None
+ self.server = None
+
+ def start(self):
+ BaseHTTPServer.HTTPServer.allow_reuse_address = True
+ self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.start()
+
+ def stop(self):
+ if self.server is not None:
+ self.server.shutdown()
+ self.server.socket.close()
+ if self.thread is not None:
+ self.thread.join()
+
+ def running(self):
+ return self.thread is not None
class AccountBase(unittest.TestCase):
@@ -44,3 +89,116 @@ class AccountBase(unittest.TestCase):
self.wbemconnection = pywbem.WBEMConnection(self.url,
(self.username, self.password))
+ # for indications
+ self.indication_port = random.randint(12000, 13000)
+ self.indication_queue = Queue.Queue()
+ self.listener = CIMListener(
+ callback=self._process_indication,
+ http_port=self.indication_port)
+
+ self.subscribed = {}
+
+ def tearDown(self):
+ self.listener.stop()
+ if self.subscribed:
+ for name in self.subscribed.keys():
+ self.unsubscribe(name)
+
+ def get_indication(self, timeout):
+ """ Wait for an indication for given nr. of seconds and return it."""
+ try:
+ indication = self.indication_queue.get(timeout=timeout)
+ except Queue.Empty:
+ raise AssertionError("Timeout when waiting for indicaiton")
+ self.indication_queue.task_done()
+ return indication
+
+ def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"):
+ """
+ Create indication subscription for given filter name.
+ """
+ namespace = "root/PG_interop"
+ hostname = socket.gethostname()
+
+ if query is not None:
+ # Create filter first
+ filterinst = pywbem.CIMInstance('CIM_IndicationFilter')
+ filterinst['CreationClassName'] = 'CIM_IndicationFilter'
+ filterinst['SystemCreationClassName'] = 'CIM_ComputerSystem'
+ filterinst['SystemName'] = hostname
+ filterinst['Name'] = filter_name
+ filterinst['Query'] = query
+ filterinst['QueryLanguage'] = querylang
+ filterinst['SourceNamespace'] = "root/cimv2"#namespace
+ cop = pywbem.CIMInstanceName('CIM_IndicationFilter')
+ cop.keybindings = { 'CreationClassName': 'CIM_IndicationFilter',
+ 'SystemClassName': 'CIM_ComputerSystem',
+ 'SystemName': hostname,
+ 'Name': filter_name
+ }
+ cop.namespace=namespace
+ filterinst.path = cop
+ indfilter = self.wbemconnection.CreateInstance(filterinst)
+ else:
+ # the filter is already created, assemble its name
+ indfilter = pywbem.CIMInstanceName(
+ classname="CIM_IndicationFilter",
+ namespace=namespace,
+ keybindings={
+ 'CreationClassName': 'CIM_IndicationFilter',
+ 'SystemClassName': 'CIM_ComputerSystem',
+ 'SystemName': hostname,
+ 'Name': filter_name})
+
+ # create destination
+ destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML')
+ destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML'
+ destinst['SystemCreationClassName'] = 'CIM_ComputerSystem'
+ destinst['SystemName'] = hostname
+ destinst['Name'] = filter_name
+ destinst['Destination'] = "http://localhost:%d" % (self.indication_port)
+ destinst['PersistenceType'] = pywbem.Uint16(3) # Transient
+ cop = pywbem.CIMInstanceName('CIM_ListenerDestinationCIMXML')
+ cop.keybindings = { 'CreationClassName':'CIM_ListenerDestinationCIMXML',
+ 'SystemClassName':'CIM_ComputerSystem',
+ 'SystemName':hostname,
+ 'Name':filter_name }
+ cop.namespace = namespace
+ destinst.path = cop
+ destname = self.wbemconnection.CreateInstance(destinst)
+
+ # create the subscription
+ subinst = pywbem.CIMInstance('CIM_IndicationSubscription')
+ subinst['Filter'] = indfilter
+ subinst['Handler'] = destname
+ cop = pywbem.CIMInstanceName('CIM_IndicationSubscription')
+ cop.keybindings = { 'Filter': indfilter,
+ 'Handler': destname }
+ cop.namespace = namespace
+ subinst.path = cop
+ subscription = self.wbemconnection.CreateInstance(subinst)
+
+ self.subscribed[filter_name] = [subscription, destname]
+
+ # start listening
+ if not self.listener.running():
+ self._start_listening()
+ return subscription
+
+ def unsubscribe(self, filter_name):
+ """
+ Unsubscribe fron given filter.
+ """
+ _list = self.subscribed.pop(filter_name)
+ for instance in _list:
+ self.wbemconnection.DeleteInstance(instance)
+
+ def _start_listening(self):
+ """ Start listening for incoming indications. """
+ self.listener.start()
+
+ def _process_indication(self, indication):
+ """ Callback to process one indication."""
+ self.indication_queue.put(indication)
+
+