# -*- encoding: utf-8 -*- # Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Authors: Roman Rakus # import pywbem import os import unittest import Queue import random import BaseHTTPServer import socket import threading """ Base class for all tests """ class CIMListener(object): """ CIM Listener """ class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): def do_POST(self): data = self.rfile.read(int(self.headers['Content-Length'])) tt = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) # Get the instance from CIM-XML, copied from # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial insts = [x[1] for x in tt[2][2][0][2][2]] for inst in insts: self.callback(inst) self.send_response(200) self.end_headers() def log_message(self, format, *p): # suppress log messages pass def __init__(self, callback, http_port=5988): self.address = ('', http_port) self.CIMHandler.callback = callback self.thread = None self.server = None def start(self): BaseHTTPServer.HTTPServer.allow_reuse_address = True self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) self.thread = threading.Thread(target=self.server.serve_forever) self.thread.start() def stop(self): if self.server is not None: self.server.shutdown() self.server.socket.close() if self.thread is not None: self.thread.join() def running(self): return self.thread is not None class AccountBase(unittest.TestCase): """ Base class for all LMI Account tests """ def setUp(self): """ Connnect to server """ self.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989") self.username = os.environ.get("LMI_CIMOM_USERNAME", "root") self.password = os.environ.get("LMI_CIMOM_PASSWORD", "") self.user_name = os.environ.get("LMI_ACCOUNT_USER") self.group_name = os.environ.get("LMI_ACCOUNT_GROUP") self.wbemconnection = pywbem.WBEMConnection(self.url, (self.username, self.password)) # for indications self.indication_port = random.randint(12000, 13000) self.indication_queue = Queue.Queue() self.listener = CIMListener( callback=self._process_indication, http_port=self.indication_port) self.subscribed = {} def tearDown(self): self.listener.stop() if self.subscribed: for name in self.subscribed.keys(): self.unsubscribe(name) def get_indication(self, timeout): """ Wait for an indication for given nr. of seconds and return it.""" try: indication = self.indication_queue.get(timeout=timeout) except Queue.Empty: raise AssertionError("Timeout when waiting for indicaiton") self.indication_queue.task_done() return indication def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): """ Create indication subscription for given filter name. """ namespace = "root/PG_interop" hostname = socket.gethostname() if query is not None: # Create filter first filterinst = pywbem.CIMInstance('CIM_IndicationFilter') filterinst['CreationClassName'] = 'CIM_IndicationFilter' filterinst['SystemCreationClassName'] = 'CIM_ComputerSystem' filterinst['SystemName'] = hostname filterinst['Name'] = filter_name filterinst['Query'] = query filterinst['QueryLanguage'] = querylang filterinst['SourceNamespace'] = "root/cimv2"#namespace cop = pywbem.CIMInstanceName('CIM_IndicationFilter') cop.keybindings = { 'CreationClassName': 'CIM_IndicationFilter', 'SystemClassName': 'CIM_ComputerSystem', 'SystemName': hostname, 'Name': filter_name } cop.namespace=namespace filterinst.path = cop indfilter = self.wbemconnection.CreateInstance(filterinst) else: # the filter is already created, assemble its name indfilter = pywbem.CIMInstanceName( classname="CIM_IndicationFilter", namespace=namespace, keybindings={ 'CreationClassName': 'CIM_IndicationFilter', 'SystemClassName': 'CIM_ComputerSystem', 'SystemName': hostname, 'Name': filter_name}) # create destination destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' destinst['SystemCreationClassName'] = 'CIM_ComputerSystem' destinst['SystemName'] = hostname destinst['Name'] = filter_name destinst['Destination'] = "http://localhost:%d" % (self.indication_port) destinst['PersistenceType'] = pywbem.Uint16(3) # Transient cop = pywbem.CIMInstanceName('CIM_ListenerDestinationCIMXML') cop.keybindings = { 'CreationClassName':'CIM_ListenerDestinationCIMXML', 'SystemClassName':'CIM_ComputerSystem', 'SystemName':hostname, 'Name':filter_name } cop.namespace = namespace destinst.path = cop destname = self.wbemconnection.CreateInstance(destinst) # create the subscription subinst = pywbem.CIMInstance('CIM_IndicationSubscription') subinst['Filter'] = indfilter subinst['Handler'] = destname cop = pywbem.CIMInstanceName('CIM_IndicationSubscription') cop.keybindings = { 'Filter': indfilter, 'Handler': destname } cop.namespace = namespace subinst.path = cop subscription = self.wbemconnection.CreateInstance(subinst) self.subscribed[filter_name] = [subscription, destname] # start listening if not self.listener.running(): self._start_listening() return subscription def unsubscribe(self, filter_name): """ Unsubscribe fron given filter. """ _list = self.subscribed.pop(filter_name) for instance in _list: self.wbemconnection.DeleteInstance(instance) def _start_listening(self): """ Start listening for incoming indications. """ self.listener.start() def _process_indication(self, indication): """ Callback to process one indication.""" self.indication_queue.put(indication)