From fa53e73b895c90c80361300abc9304596115d2d1 Mon Sep 17 00:00:00 2001 From: Tomas Bzatek Date: Fri, 23 Aug 2013 14:44:02 +0200 Subject: journald: Add indications Also contains simple test suite to test indications. --- src/journald/test/TestIndications.py | 52 +++++++++ src/journald/test/__init__.py | 0 src/journald/test/common.py | 202 +++++++++++++++++++++++++++++++++++ src/journald/test/localtest.sh | 1 + 4 files changed, 255 insertions(+) create mode 100644 src/journald/test/TestIndications.py create mode 100644 src/journald/test/__init__.py create mode 100644 src/journald/test/common.py create mode 100755 src/journald/test/localtest.sh (limited to 'src/journald/test') diff --git a/src/journald/test/TestIndications.py b/src/journald/test/TestIndications.py new file mode 100644 index 0000000..cf79ad6 --- /dev/null +++ b/src/journald/test/TestIndications.py @@ -0,0 +1,52 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Tomas Bzatek +# + +from common import JournalBase +import time +import syslog + +class TestIndications(JournalBase): + """ + Class for testing LMI_JournalMessageLog indications + """ + + def test_check_good_filter(self): + """ + Journal: Test good indication filter + """ + filter_name = "test_good_filter_%d" % (time.time() * 1000) + sub = self.subscribe(filter_name, "select * from LMI_JournalLogRecordInstanceCreationIndication where SourceInstance isa LMI_JournalLogRecord") + self.assertIsNotNone(sub) + self.unsubscribe(filter_name); + + + def test_message_send(self): + """ + Journal: Test message logging and its retrieval from journal + """ + filter_name = "test_message_send_%d" % (time.time() * 1000) + syslog_msg = "== LMI_Journald test message ==" + sub = self.subscribe(filter_name, "select * from LMI_JournalLogRecordInstanceCreationIndication where SourceInstance isa LMI_JournalLogRecord") + syslog.syslog(syslog_msg) + indication = self.get_indication(10) + self.assertEqual(indication.classname, "LMI_JournalLogRecordInstanceCreationIndication") + self.assertIn("SourceInstance", indication.keys()) + self.assertTrue(indication["SourceInstance"] is not None) + self.assertEqual(indication["SourceInstance"]["DataFormat"], syslog_msg) + self.unsubscribe(filter_name); diff --git a/src/journald/test/__init__.py b/src/journald/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/journald/test/common.py b/src/journald/test/common.py new file mode 100644 index 0000000..d65b5d1 --- /dev/null +++ b/src/journald/test/common.py @@ -0,0 +1,202 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Roman Rakus +# Tomas Bzatek +# + +import pywbem +import os +import unittest +import Queue +import random +import BaseHTTPServer +import socket +import threading + +""" +Base class for all tests +""" + +class CIMListener(object): + """ CIM Listener + """ + class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_POST(self): + data = self.rfile.read(int(self.headers['Content-Length'])) + tt = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) + # Get the instance from CIM-XML, copied from + # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial + insts = [x[1] for x in tt[2][2][0][2][2]] + for inst in insts: + self.callback(inst) + self.send_response(200) + self.end_headers() + + def log_message(self, format, *p): + # suppress log messages + pass + + def __init__(self, callback, http_port=5988): + self.address = ('', http_port) + self.CIMHandler.callback = callback + self.thread = None + self.server = None + + def start(self): + BaseHTTPServer.HTTPServer.allow_reuse_address = True + self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + + def stop(self): + if self.server is not None: + self.server.shutdown() + self.server.socket.close() + if self.thread is not None: + self.thread.join() + + def running(self): + return self.thread is not None + + +class JournalBase(unittest.TestCase): + """ + Base class for all LMI Journal tests + """ + def setUp(self): + """ + Connnect to server + """ + self.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989") + self.username = os.environ.get("LMI_CIMOM_USERNAME", "root") + self.password = os.environ.get("LMI_CIMOM_PASSWORD", "") + self.wbemconnection = pywbem.WBEMConnection(self.url, + (self.username, self.password)) + + # for indications + self.indication_port = random.randint(12000, 13000) + self.indication_queue = Queue.Queue() + self.listener = CIMListener( + callback=self._process_indication, + http_port=self.indication_port) + + self.subscribed = {} + + def tearDown(self): + self.listener.stop() + if self.subscribed: + for name in self.subscribed.keys(): + self.unsubscribe(name) + + def get_indication(self, timeout): + """ Wait for an indication for given nr. of seconds and return it.""" + try: + indication = self.indication_queue.get(timeout=timeout) + except Queue.Empty: + raise AssertionError("Timeout when waiting for indicaiton") + self.indication_queue.task_done() + return indication + + def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): + """ + Create indication subscription for given filter name. + """ + namespace = "root/interop" + hostname = socket.gethostname() + + if query is not None: + # Create filter first + filterinst = pywbem.CIMInstance('CIM_IndicationFilter') + filterinst['CreationClassName'] = 'CIM_IndicationFilter' + filterinst['SystemCreationClassName'] = 'CIM_ComputerSystem' + filterinst['SystemName'] = hostname + filterinst['Name'] = filter_name + filterinst['Query'] = query + filterinst['QueryLanguage'] = querylang + filterinst['SourceNamespace'] = "root/cimv2"#namespace + cop = pywbem.CIMInstanceName('CIM_IndicationFilter') + cop.keybindings = { 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': 'CIM_ComputerSystem', + 'SystemName': hostname, + 'Name': filter_name + } + cop.namespace=namespace + filterinst.path = cop + indfilter = self.wbemconnection.CreateInstance(filterinst) + else: + # the filter is already created, assemble its name + indfilter = pywbem.CIMInstanceName( + classname="CIM_IndicationFilter", + namespace=namespace, + keybindings={ + 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': 'CIM_ComputerSystem', + 'SystemName': hostname, + 'Name': filter_name}) + + # create destination + destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') + destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' + destinst['SystemCreationClassName'] = 'CIM_ComputerSystem' + destinst['SystemName'] = hostname + destinst['Name'] = filter_name + destinst['Destination'] = "http://localhost:%d" % (self.indication_port) + destinst['PersistenceType'] = pywbem.Uint16(3) # Transient + cop = pywbem.CIMInstanceName('CIM_ListenerDestinationCIMXML') + cop.keybindings = { 'CreationClassName':'CIM_ListenerDestinationCIMXML', + 'SystemClassName':'CIM_ComputerSystem', + 'SystemName':hostname, + 'Name':filter_name } + cop.namespace = namespace + destinst.path = cop + destname = self.wbemconnection.CreateInstance(destinst) + + # create the subscription + subinst = pywbem.CIMInstance('CIM_IndicationSubscription') + subinst['Filter'] = indfilter + subinst['Handler'] = destname + cop = pywbem.CIMInstanceName('CIM_IndicationSubscription') + cop.keybindings = { 'Filter': indfilter, + 'Handler': destname } + cop.namespace = namespace + subinst.path = cop + subscription = self.wbemconnection.CreateInstance(subinst) + + self.subscribed[filter_name] = [subscription, destname] + + # start listening + if not self.listener.running(): + self._start_listening() + return subscription + + def unsubscribe(self, filter_name): + """ + Unsubscribe fron given filter. + """ + _list = self.subscribed.pop(filter_name) + for instance in _list: + self.wbemconnection.DeleteInstance(instance) + + def _start_listening(self): + """ Start listening for incoming indications. """ + self.listener.start() + + def _process_indication(self, indication): + """ Callback to process one indication.""" + self.indication_queue.put(indication) + + diff --git a/src/journald/test/localtest.sh b/src/journald/test/localtest.sh new file mode 100755 index 0000000..3667ebc --- /dev/null +++ b/src/journald/test/localtest.sh @@ -0,0 +1 @@ +LMI_CIMOM_PASSWORD="pass" LMI_CIMOM_USERNAME="pegasus" nosetests -v -- cgit