# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Authors: Jan Safranek # Authors: Michal Minar # Authors: Roman Rakus # """ Base class and utilities for test suits written with plain pywbem abastractions. """ import BaseHTTPServer import pywbem import Queue import random import threading import time from lmi.test import base JOB_CREATED = 4096 class CIMListener(object): """ CIM Listener for indication events. """ class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): """ Handler for the POST request from *CIMOM*. """ def do_POST(self): """ Handle the POST request. """ data = self.rfile.read(int(self.headers['Content-Length'])) ttree = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) # Get the instance from CIM-XML, copied from # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial insts = [x[1] for x in ttree[2][2][0][2][2]] for inst in insts: self.callback(inst) self.send_response(200) self.end_headers() def log_message(self, _fmt, *_args): # suppress log messages pass def __init__(self, callback, http_port=5988): self.address = ('', http_port) self.CIMHandler.callback = callback self.thread = None self.server = None def start(self): """ Start listening. """ BaseHTTPServer.HTTPServer.allow_reuse_address = True self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) self.thread = threading.Thread(target=self.server.serve_forever) self.thread.start() def stop(self): """ Stop listening. """ if self.server is not None: self.server.shutdown() self.server.socket.close() if self.thread is not None: self.thread.join() def running(self): """ Are we listening for events? :rtype: bool """ return self.thread is not None class CIMTestCase(base.BaseLmiTestCase): """ Base class for LMI test cases based on plain pywbem. """ #: Says, whether the test case needs indication listener running or not. #: Each subclass shall override this property and set it to ``True`` if #: it want to test indication events. NEEDS_INDICATIONS = False _WBEMCONNECTION = None _SYSTEM_INAME = None @classmethod def needs_indications(cls): """ Whether the indication listener should be started for this test case. In subclasses override ``NEEDS_INDICATIONS`` property and set it to ``True`` if indication testing is desired. """ return cls.NEEDS_INDICATIONS @classmethod def setUpClass(cls): base.BaseLmiTestCase.setUpClass.im_func(cls) if cls.needs_indications(): cls.indication_port = random.randint(12000, 13000) cls.indication_queue = Queue.Queue() cls.listener = CIMListener( callback=cls._process_indication, http_port=cls.indication_port) @classmethod def tearDownClass(cls): if cls.needs_indications(): cls.listener.stop() @classmethod def _start_listening(cls): """ Start listening for incoming indications. """ cls.listener.start() @classmethod def _process_indication(cls, indication): """ Callback to process one indication.""" cls.indication_queue.put(indication) @property def wbemconnection(self): """ :returns: Active connection to *CIMOM*. :rtype: :py:class:`pywbem.WBEMConnection` """ if CIMTestCase._WBEMCONNECTION is None: CIMTestCase._WBEMCONNECTION = pywbem.WBEMConnection(self.url, (self.username, self.password)) return CIMTestCase._WBEMCONNECTION @property def system_iname(self): """ :returns: Instance of ``CIM_ComputerSystem`` registered with *CIMOM*. :rtype: :py:class:`lmi.shell.LMIInstanceName` """ if CIMTestCase._SYSTEM_INAME is None: CIMTestCase._SYSTEM_INAME = self.wbemconnection. \ EnumerateInstanceNames(self.system_cs_name, 'root/cimv2')[0] return CIMTestCase._SYSTEM_INAME.copy() def setUp(self): self.subscribed = {} def tearDown(self): for name in self.subscribed.keys(): self.unsubscribe(name) def assertCIMIsSubclass(self, cls, base_cls): """ Checks, whether cls is subclass of base_cls from CIM perspective. @param cls name of subclass @param base_cls name of base class """ if not isinstance(cls, basestring): raise TypeError("cls must be a string") if not isinstance(base_cls, basestring): raise TypeError("base_cls must be a string") return self.assertTrue(pywbem.is_subclass(self.wbemconnection, "root/cimv2", base_cls, cls)) def get_indication(self, timeout): """ Wait for an indication for given nr. of seconds and return it.""" try: indication = self.indication_queue.get(timeout=timeout) except Queue.Empty: raise AssertionError("Timeout when waiting for indication") self.indication_queue.task_done() return indication def make_filter_iname(self, filter_name): """ Create an instance name of ``CIM_IndicationFilter``. :rtype: :py:class:`pywbem.CIMInstanceName` """ return pywbem.CIMInstanceName( classname="CIM_IndicationFilter", namespace="root/interop", keybindings={ 'CreationClassName': 'CIM_IndicationFilter', 'SystemClassName': self.system_cs_name, 'SystemName': self.SYSTEM_NAME, 'Name': filter_name}) def make_filter_inst(self, filter_name, query, query_lang="DMTF:CQL"): """ Create an instance of ``CIM_IndicationFilter``. :rtype: :py:class:`pywbem.CIMInstance` """ inst = pywbem.CIMInstance('CIM_IndicationFilter') inst['CreationClassName'] = 'CIM_IndicationFilter' inst['SystemCreationClassName'] = self.system_cs_name inst['SystemName'] = self.SYSTEM_NAME inst['Name'] = filter_name inst['Query'] = query inst['QueryLanguage'] = query_lang inst['SourceNamespace'] = "root/cimv2" inst.path = self.make_filter_iname(filter_name) return inst def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): """ Create indication subscription for given filter name. """ if not self.needs_indications(): raise Exception("can not subscribe to indications, enable them" " with NEEDS_INDICATIONS") if query is not None: # Create filter first indfilter = self.wbemconnection.CreateInstance( self.make_filter_inst(filter_name, query, querylang)) else: # the filter is already created, assemble its name indfilter = self.make_filter_iname(filter_name) # create destination destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' destinst['SystemCreationClassName'] = self.system_cs_name destinst['SystemName'] = self.SYSTEM_NAME destinst['Name'] = filter_name destinst['Destination'] = "http://localhost:%d" % (self.indication_port) destinst['PersistenceType'] = pywbem.Uint16(3) # Transient cop = pywbem.CIMInstanceName( 'CIM_ListenerDestinationCIMXML', namespace="root/interop") cop.keybindings = { 'CreationClassName' : 'CIM_ListenerDestinationCIMXML', 'SystemClassName' : self.system_cs_name, 'SystemName' : self.SYSTEM_NAME, 'Name' : filter_name } destinst.path = cop destname = self.wbemconnection.CreateInstance(destinst) # create the subscription subinst = pywbem.CIMInstance( 'CIM_IndicationSubscription') subinst['Filter'] = indfilter subinst['Handler'] = destname cop = pywbem.CIMInstanceName( 'CIM_IndicationSubscription', namespace="root/interop") cop.keybindings = { 'Filter': indfilter, 'Handler': destname } subinst.path = cop subscription = self.wbemconnection.CreateInstance(subinst) self.subscribed[filter_name] = [subscription, destname] if not self.listener.running(): self._start_listening() return subscription def unsubscribe(self, filter_name): """ Unsubscribe from given filter. """ if not self.needs_indications(): raise Exception("can not unsubscribe to indications, enable them" " with NEEDS_INDICATIONS") for instance in self.subscribed.pop(filter_name): self.wbemconnection.DeleteInstance(instance) def finish_job(self, jobname, assoc_class, return_constructor=int): """ Wait until the job finishes and return ``(ret, outparams)``just as ``InvokeMethod`` would. It's hard to reconstruct these outparams, since the embedded instances / ojects do not work in our ``CIMOMS``, therefore special care is needed. :param jobname: Name of the job. :type jobname: :py:class:`pywbem.CIMInstanceName` :param callable return_constructor: Callable, which converts string to the right type, for example ``int``. :returns: ``(retvalue, outparams)`` in the same way as ``finish_method()`` would. :rtype: tuple """ # Use busy loop for now # TODO: rework to something sane while True: job = self.wbemconnection.GetInstance(jobname) if job['JobState'] > 5: # all states higher than 5 are final break time.sleep(0.1) # get the MethodResult resultname = self.wbemconnection.AssociatorNames( jobname, AssocClass=assoc_class)[0] result = self.wbemconnection.GetInstance(resultname) ind = result['PostCallIndication'] # check for error if ind['Error'] is not None: err = ind['Error'][0] code = err['CIMStatusCode'] msg = err['Message'] raise pywbem.CIMError(code, msg) ret = return_constructor(ind['ReturnValue']) # convert output parameters to format returned by InvokeMethod outparams = pywbem.NocaseDict() try: params = ind['MethodParameters'] except KeyError: params = {} if params: for (key, value) in params.iteritems(): outparams[key] = value return (ret, outparams) def invoke_async_method(self, method_name, object_name, return_constructor=int, *args, **kwargs): """ Invoke a method and if it returns a job, wait for the job. :param string method_name: Name of the method. :param object_name: Instance, on which the method should be invoked. :type object_name: :py:class:`pywbem.CIMInstanceName` :param callable return_constructor: Callable, which converts string to the right type, for example ``int``. :param list args: Positional arguments passed to invoked method. :param dictionary kwargs: Keyword arguments passed to invoked method. :returns: ``(retvalue, outparams)`` in the same way as ``finish_method()`` would. :rtype: tuple """ (ret, outparams) = self.wbemconnection.InvokeMethod( method_name, object_name, *args, **kwargs) if ret == JOB_CREATED: # wait for the job jobname = outparams['Job'] (ret, outparams) = self.finish_job(jobname, return_constructor) return (ret, outparams)