diff options
author | Roman Rakus <rrakus@redhat.com> | 2013-07-15 21:10:32 +0200 |
---|---|---|
committer | Roman Rakus <rrakus@redhat.com> | 2013-07-17 12:51:32 +0200 |
commit | 9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2 (patch) | |
tree | 8daf939586a23d3e63d9aca31d34f8d4338358f3 /src | |
parent | 17627a29921cb6faab3a771421717e904fe32725 (diff) | |
download | openlmi-providers-9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2.tar.gz openlmi-providers-9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2.tar.xz openlmi-providers-9d6c010fcebf5aa9282e82a0ee86d01e0d4812c2.zip |
Account: Tests for indications
Signed-off-by: Roman Rakus <rrakus@redhat.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/account/test/TestIndications.py | 100 | ||||
-rw-r--r-- | src/account/test/common.py | 158 |
2 files changed, 258 insertions, 0 deletions
diff --git a/src/account/test/TestIndications.py b/src/account/test/TestIndications.py new file mode 100644 index 0000000..7b34b2e --- /dev/null +++ b/src/account/test/TestIndications.py @@ -0,0 +1,100 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Roman Rakus <rrakus@redhat.com> +# + +from common import AccountBase +import time +from methods import * + +class TestIndications(AccountBase): + """ + Class for testing LMI_Account indications + """ + def test_check_good_filter(self): + """ + Account: Test good indication filter + """ + filter_name = "test_good_filter_%d" % time.time() * 1000 + sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceCreationIndication where SourceInstance isa LMI_Account") + self.assertIsNotNone(sub) + self.unsubscribe(filter_name); + + def test_check_bad_filter(self): + """ + Account: Test bad indication filter + """ + pass + + def test_group_deletion_indication(self): + """ + Account: Test indication of group deletion + """ + create_group(self.group_name) + filter_name = "test_delete_group_%d" % time.time() * 1000 + sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceDeletionIndication where SourceInstance isa LMI_Group") + clean_group(self.group_name) + indication = self.get_indication(10) + self.assertEqual(indication.classname, "LMI_AccountInstanceDeletionIndication") + self.assertIn("SourceInstance", indication.keys()) + self.assertTrue(indication["SourceInstance"] is not None) + self.assertEqual(indication["SourceInstance"]["Name"], self.group_name) + + def test_group_creation_indication(self): + """ + Account: Test indication of group creation + """ + clean_group(self.group_name) + filter_name = "test_create_group_%d" % time.time() * 1000 + sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceCreationIndication where SourceInstance isa LMI_Group") + create_group(self.group_name) + indication = self.get_indication(10) + self.assertEqual(indication.classname, "LMI_AccountInstanceCreationIndication") + self.assertIn("SourceInstance", indication.keys()) + self.assertTrue(indication["SourceInstance"] is not None) + self.assertEqual(indication["SourceInstance"]["Name"], self.group_name) + clean_group(self.group_name) + + def test_account_deletion_indication(self): + """ + Account: Test indication of account deletion + """ + create_account(self.user_name) + filter_name = "test_delete_account_%d" % time.time() * 1000 + sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceDeletionIndication where SourceInstance isa LMI_Account") + clean_account(self.user_name) + indication = self.get_indication(10) + self.assertEqual(indication.classname, "LMI_AccountInstanceDeletionIndication") + self.assertIn("SourceInstance", indication.keys()) + self.assertTrue(indication["SourceInstance"] is not None) + self.assertEqual(indication["SourceInstance"]["Name"], self.user_name) + + def test_account_creation_indication(self): + """ + Account: Test indication of account creation + """ + clean_account(self.user_name) + filter_name = "test_create_account_%d" % time.time() * 1000 + sub = self.subscribe(filter_name, "select * from LMI_AccountInstanceCreationIndication where SourceInstance isa LMI_Account") + create_account(self.user_name) + indication = self.get_indication(10) + self.assertEqual(indication.classname, "LMI_AccountInstanceCreationIndication") + self.assertIn("SourceInstance", indication.keys()) + self.assertTrue(indication["SourceInstance"] is not None) + self.assertEqual(indication["SourceInstance"]["Name"], self.user_name) + clean_account(self.user_name) + diff --git a/src/account/test/common.py b/src/account/test/common.py index c56e94d..39442af 100644 --- a/src/account/test/common.py +++ b/src/account/test/common.py @@ -21,11 +21,56 @@ import pywbem import os import unittest +import Queue +import random +import BaseHTTPServer +import socket +import threading """ Base class for all tests """ +class CIMListener(object): + """ CIM Listener + """ + class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_POST(self): + data = self.rfile.read(int(self.headers['Content-Length'])) + tt = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) + # Get the instance from CIM-XML, copied from + # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial + insts = [x[1] for x in tt[2][2][0][2][2]] + for inst in insts: + self.callback(inst) + self.send_response(200) + self.end_headers() + + def log_message(self, format, *p): + # suppress log messages + pass + + def __init__(self, callback, http_port=5988): + self.address = ('', http_port) + self.CIMHandler.callback = callback + self.thread = None + self.server = None + + def start(self): + BaseHTTPServer.HTTPServer.allow_reuse_address = True + self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + + def stop(self): + if self.server is not None: + self.server.shutdown() + self.server.socket.close() + if self.thread is not None: + self.thread.join() + + def running(self): + return self.thread is not None class AccountBase(unittest.TestCase): @@ -44,3 +89,116 @@ class AccountBase(unittest.TestCase): self.wbemconnection = pywbem.WBEMConnection(self.url, (self.username, self.password)) + # for indications + self.indication_port = random.randint(12000, 13000) + self.indication_queue = Queue.Queue() + self.listener = CIMListener( + callback=self._process_indication, + http_port=self.indication_port) + + self.subscribed = {} + + def tearDown(self): + self.listener.stop() + if self.subscribed: + for name in self.subscribed.keys(): + self.unsubscribe(name) + + def get_indication(self, timeout): + """ Wait for an indication for given nr. of seconds and return it.""" + try: + indication = self.indication_queue.get(timeout=timeout) + except Queue.Empty: + raise AssertionError("Timeout when waiting for indicaiton") + self.indication_queue.task_done() + return indication + + def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): + """ + Create indication subscription for given filter name. + """ + namespace = "root/PG_interop" + hostname = socket.gethostname() + + if query is not None: + # Create filter first + filterinst = pywbem.CIMInstance('CIM_IndicationFilter') + filterinst['CreationClassName'] = 'CIM_IndicationFilter' + filterinst['SystemCreationClassName'] = 'CIM_ComputerSystem' + filterinst['SystemName'] = hostname + filterinst['Name'] = filter_name + filterinst['Query'] = query + filterinst['QueryLanguage'] = querylang + filterinst['SourceNamespace'] = "root/cimv2"#namespace + cop = pywbem.CIMInstanceName('CIM_IndicationFilter') + cop.keybindings = { 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': 'CIM_ComputerSystem', + 'SystemName': hostname, + 'Name': filter_name + } + cop.namespace=namespace + filterinst.path = cop + indfilter = self.wbemconnection.CreateInstance(filterinst) + else: + # the filter is already created, assemble its name + indfilter = pywbem.CIMInstanceName( + classname="CIM_IndicationFilter", + namespace=namespace, + keybindings={ + 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': 'CIM_ComputerSystem', + 'SystemName': hostname, + 'Name': filter_name}) + + # create destination + destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') + destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' + destinst['SystemCreationClassName'] = 'CIM_ComputerSystem' + destinst['SystemName'] = hostname + destinst['Name'] = filter_name + destinst['Destination'] = "http://localhost:%d" % (self.indication_port) + destinst['PersistenceType'] = pywbem.Uint16(3) # Transient + cop = pywbem.CIMInstanceName('CIM_ListenerDestinationCIMXML') + cop.keybindings = { 'CreationClassName':'CIM_ListenerDestinationCIMXML', + 'SystemClassName':'CIM_ComputerSystem', + 'SystemName':hostname, + 'Name':filter_name } + cop.namespace = namespace + destinst.path = cop + destname = self.wbemconnection.CreateInstance(destinst) + + # create the subscription + subinst = pywbem.CIMInstance('CIM_IndicationSubscription') + subinst['Filter'] = indfilter + subinst['Handler'] = destname + cop = pywbem.CIMInstanceName('CIM_IndicationSubscription') + cop.keybindings = { 'Filter': indfilter, + 'Handler': destname } + cop.namespace = namespace + subinst.path = cop + subscription = self.wbemconnection.CreateInstance(subinst) + + self.subscribed[filter_name] = [subscription, destname] + + # start listening + if not self.listener.running(): + self._start_listening() + return subscription + + def unsubscribe(self, filter_name): + """ + Unsubscribe fron given filter. + """ + _list = self.subscribed.pop(filter_name) + for instance in _list: + self.wbemconnection.DeleteInstance(instance) + + def _start_listening(self): + """ Start listening for incoming indications. """ + self.listener.start() + + def _process_indication(self, indication): + """ Callback to process one indication.""" + self.indication_queue.put(indication) + + |