diff options
author | Tomas Bzatek <tbzatek@redhat.com> | 2013-08-23 14:44:02 +0200 |
---|---|---|
committer | Tomas Bzatek <tbzatek@redhat.com> | 2013-10-15 15:23:49 +0200 |
commit | fa53e73b895c90c80361300abc9304596115d2d1 (patch) | |
tree | ce81a027e06a0c6a9a1f71e477838829e285b805 /src/journald | |
parent | 144dcfc5a924ecba7f615064156f6a40247c39b2 (diff) | |
download | openlmi-providers-fa53e73b895c90c80361300abc9304596115d2d1.tar.gz openlmi-providers-fa53e73b895c90c80361300abc9304596115d2d1.tar.xz openlmi-providers-fa53e73b895c90c80361300abc9304596115d2d1.zip |
journald: Add indications
Also contains simple test suite to test indications.
Diffstat (limited to 'src/journald')
-rw-r--r-- | src/journald/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/journald/LMI_JournalLogRecordInstanceCreationIndicationProvider.c | 131 | ||||
-rw-r--r-- | src/journald/instutil.c | 144 | ||||
-rw-r--r-- | src/journald/instutil.h | 7 | ||||
-rw-r--r-- | src/journald/test/TestIndications.py | 52 | ||||
-rw-r--r-- | src/journald/test/__init__.py | 0 | ||||
-rw-r--r-- | src/journald/test/common.py | 202 | ||||
-rwxr-xr-x | src/journald/test/localtest.sh | 1 |
8 files changed, 539 insertions, 0 deletions
diff --git a/src/journald/CMakeLists.txt b/src/journald/CMakeLists.txt index 7a4de06..f524c5b 100644 --- a/src/journald/CMakeLists.txt +++ b/src/journald/CMakeLists.txt @@ -28,10 +28,12 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMPI_INCLUDE_DIR} ${GLIB2_INCLUDE_DIRS} ${SYSTEMD-JOURNAL_INCLUDE_DIRS} + ${CMAKE_SOURCE_DIR}/src/indmanager ) target_link_libraries(${LIBRARY_NAME} openlmicommon + openlmiindmanager ${KONKRETCMPI_LIBRARIES} ${GLIB2_LIBRARIES} ${SYSTEMD-JOURNAL_LIBRARIES} diff --git a/src/journald/LMI_JournalLogRecordInstanceCreationIndicationProvider.c b/src/journald/LMI_JournalLogRecordInstanceCreationIndicationProvider.c new file mode 100644 index 0000000..580424e --- /dev/null +++ b/src/journald/LMI_JournalLogRecordInstanceCreationIndicationProvider.c @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2013 Red Hat, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: Tomas Bzatek <tbzatek@redhat.com> + */ + +#include <konkret/konkret.h> +#include "LMI_JournalLogRecordInstanceCreationIndication.h" + +#include "ind_manager.h" +#include "instutil.h" + +static const CMPIBroker* _cb = NULL; + +static IMManager *im = NULL; +static IMError im_err = IM_ERR_OK; + +static void LMI_JournalLogRecordInstanceCreationIndicationInitialize() +{ + im = im_create_manager(ind_gather, ind_filter_cb, false, ind_watcher, + IM_IND_CREATION, _cb, &im_err); +} + +static CMPIStatus LMI_JournalLogRecordInstanceCreationIndicationIndicationCleanup( + CMPIIndicationMI* mi, + const CMPIContext* cc, + CMPIBoolean term) +{ + if (! im_destroy_manager(im, cc, &im_err)) + CMReturn(CMPI_RC_ERR_FAILED); + CMReturn(CMPI_RC_OK); +} + +static CMPIStatus LMI_JournalLogRecordInstanceCreationIndicationAuthorizeFilter( + CMPIIndicationMI* mi, + const CMPIContext* cc, + const CMPISelectExp* se, + const char* ns, + const CMPIObjectPath* op, + const char* user) +{ + if (!im_verify_filter(im, se, cc, &im_err)) + CMReturn(CMPI_RC_ERR_INVALID_QUERY); + CMReturn(CMPI_RC_OK); +} + +static CMPIStatus LMI_JournalLogRecordInstanceCreationIndicationMustPoll( + CMPIIndicationMI* mi, + const CMPIContext* cc, + const CMPISelectExp* se, + const char* ns, + const CMPIObjectPath* op) +{ + CMReturn(CMPI_RC_ERR_NOT_SUPPORTED); +} + +static CMPIStatus LMI_JournalLogRecordInstanceCreationIndicationActivateFilter( + CMPIIndicationMI* mi, + const CMPIContext* cc, + const CMPISelectExp* se, + const char* ns, + const CMPIObjectPath* op, + CMPIBoolean firstActivation) +{ + if (!im_verify_filter(im, se, cc, &im_err)) + CMReturn(CMPI_RC_ERR_INVALID_QUERY); + if (!im_add_filter(im, (CMPISelectExp*)se, cc, &im_err)) + CMReturn(CMPI_RC_ERR_FAILED); + CMReturn(CMPI_RC_OK); +} + +static CMPIStatus LMI_JournalLogRecordInstanceCreationIndicationDeActivateFilter( + CMPIIndicationMI* mi, + const CMPIContext* cc, + const CMPISelectExp* se, + const char* ns, + const CMPIObjectPath* op, + CMPIBoolean lastActivation) +{ + if (!im_remove_filter(im, se, cc, &im_err)) + CMReturn(CMPI_RC_ERR_FAILED); + CMReturn(CMPI_RC_OK); +} + +static CMPIStatus LMI_JournalLogRecordInstanceCreationIndicationEnableIndications( + CMPIIndicationMI* mi, + const CMPIContext* cc) +{ + ind_init(); + if (!im_start_ind(im, cc, &im_err)) { + ind_destroy(); + CMReturn(CMPI_RC_ERR_FAILED); + } + CMReturn(CMPI_RC_OK); +} + +static CMPIStatus LMI_JournalLogRecordInstanceCreationIndicationDisableIndications( + CMPIIndicationMI* mi, + const CMPIContext* cc) +{ + if (!im_stop_ind(im, cc, &im_err)) + CMReturn(CMPI_RC_ERR_FAILED); + ind_destroy(); + CMReturn(CMPI_RC_OK); +} + +CMIndicationMIStub( + LMI_JournalLogRecordInstanceCreationIndication, + LMI_JournalLogRecordInstanceCreationIndication, + _cb, + LMI_JournalLogRecordInstanceCreationIndicationInitialize()) + +KONKRET_REGISTRATION( + "root/cimv2", + "LMI_JournalLogRecordInstanceCreationIndication", + "LMI_JournalLogRecordInstanceCreationIndication", + "indication") diff --git a/src/journald/instutil.c b/src/journald/instutil.c index 2164aa9..c93a91e 100644 --- a/src/journald/instutil.c +++ b/src/journald/instutil.c @@ -29,6 +29,10 @@ #include "LMI_JournalMessageLog.h" +/* Assuming no thread safety */ +static sd_journal *ind_journal = NULL; + + int create_LMI_JournalLogRecordRef(sd_journal *j, LMI_JournalLogRecordRef *ref, const CMPIBroker *_cb) @@ -111,3 +115,143 @@ int create_LMI_JournalLogRecord(sd_journal *j, return 1; } + + +void ind_init() +{ + if (ind_journal == NULL) { + sd_journal *journal; + int r; + + r = sd_journal_open(&journal, 0); + if (r < 0) { + fprintf(stderr, "ind_init(): Error opening journal: %s\n", strerror(-r)); + return; + } + + r = sd_journal_seek_tail(journal); + if (r < 0) { + fprintf(stderr, "ind_init(): Error seeking to the end of the journal: %s\n", strerror(-r)); + sd_journal_close(journal); + return; + } + + /* need to position the marker one step before EOF or otherwise the next sd_journal_next() call will overflow to the beginning */ + r = sd_journal_previous(journal); + if (r < 0) { + fprintf(stderr, "ind_init(): Error seeking to the end of the journal: %s\n", strerror(-r)); + sd_journal_close(journal); + return; + } + ind_journal = journal; + } else + fprintf(stderr, "ind_init(): indications already initialized, possible bug in the code\n"); +} + +void ind_destroy() +{ + if (ind_journal != NULL) { + sd_journal_close(ind_journal); + ind_journal = NULL; + } +} + +bool ind_watcher(void **data) +{ + int r; + + if (ind_journal == NULL) { + fprintf(stderr, "ind_watcher(): indications have not been initialized yet or error occurred previously\n"); + return false; + } + + r = sd_journal_wait(ind_journal, (uint64_t) -1); + if (r == SD_JOURNAL_INVALIDATE) { + /* Looking at sd-journal sources, the sd_journal_wait() call will likely return + * SD_JOURNAL_INVALIDATE on a first run because of creating new inotify watch. */ + r = sd_journal_wait(ind_journal, (uint64_t) -1); + } + while (r == SD_JOURNAL_NOP) { + /* received NOP, ignore the event and wait for the next one */ + r = sd_journal_wait(ind_journal, (uint64_t) -1); + } + if (r < 0) { + fprintf(stderr, "ind_watcher(): Error while waiting for new record: %s\n", strerror(-r)); + return false; + } + if (r == SD_JOURNAL_INVALIDATE) { + fprintf(stderr, "ind_watcher(): Journal not valid, reopen needed\n"); + ind_destroy(); + ind_init(); + return false; + } + *data = ind_journal; + + return true; +} + +bool ind_gather(const IMManager *manager, CMPIInstance **old, CMPIInstance **new, void *data) +{ + sd_journal *journal; + int r; + LMI_JournalLogRecord log_record; + CMPIStatus st; + + g_return_val_if_fail(data != NULL, false); + journal = data; + + r = sd_journal_next(journal); + if (r < 0) { + fprintf(stderr, "ind_gather(): Failed to iterate to next entry: %s\n", strerror(-r)); + return false; + } + if (r == 0) { + /* We've reached the end of the journal */ + return false; + } + + /* FIXME: hardcoded namespace (so does ind_manager.c) */ + LMI_JournalLogRecord_Init(&log_record, manager->broker, "root/cimv2"); + r = create_LMI_JournalLogRecord(journal, &log_record, manager->broker); + if (r <= 0) { + fprintf(stderr, "ind_gather(): Failed to create instance: %s\n", strerror(-r)); + return false; + } + + g_assert(new != NULL); + *new = LMI_JournalLogRecord_ToInstance(&log_record, &st); + fprintf(stderr, " ind_gather(): new instance created\n"); + + return true; +} + +bool ind_filter_cb(const CMPISelectExp *filter) +{ + /* TODO: copied from account/indication_common.c, may require generalization */ + + /* + * Support only simple conditions and only on allowed_classes + * and type of `sourceinstance ISA allowed_class' + */ + CMPIStatus st; + CMPISelectCond *sec = CMGetDoc(filter, &st); + if (!sec) return false; + CMPICount count = CMGetSubCondCountAndType(sec, NULL, &st); + if (count != 1) return false; + CMPISubCond *sub = CMGetSubCondAt(sec, 0, &st); + if (!sub) return false; + count = CMGetPredicateCount(sub, &st); + if (count != 1) return false; + CMPIPredicate *pred = CMGetPredicateAt(sub, 0, &st); + if (!pred) return false; + CMPIType type; + CMPIPredOp op; + CMPIString *lhs = NULL; + CMPIString *rhs = NULL; + st = CMGetPredicateData(pred, &type, &op, &lhs, &rhs); + if (st.rc != CMPI_RC_OK || op != CMPI_PredOp_Isa) return false; + const char *rhs_str = CMGetCharsPtr(rhs, &st); + if (!rhs_str) return false; + if (strcasecmp(rhs_str, LMI_JournalLogRecord_ClassName) == 0) return true; + return false; +} diff --git a/src/journald/instutil.h b/src/journald/instutil.h index ef02aa1..ee823c5 100644 --- a/src/journald/instutil.h +++ b/src/journald/instutil.h @@ -23,9 +23,16 @@ #include <systemd/sd-journal.h> +#include <ind_manager.h> #include "LMI_JournalLogRecord.h" int create_LMI_JournalLogRecordRef(sd_journal *j, LMI_JournalLogRecordRef *ref, const CMPIBroker *_cb); int create_LMI_JournalLogRecord(sd_journal *j, LMI_JournalLogRecord *rec, const CMPIBroker *_cb); +void ind_init(); +bool ind_watcher(void **data); +bool ind_filter_cb(const CMPISelectExp *filter); +bool ind_gather(const IMManager *manager, CMPIInstance **old, CMPIInstance **new, void *data); +void ind_destroy(); + #endif /* INSTUTIL_H_ */ diff --git a/src/journald/test/TestIndications.py b/src/journald/test/TestIndications.py new file mode 100644 index 0000000..cf79ad6 --- /dev/null +++ b/src/journald/test/TestIndications.py @@ -0,0 +1,52 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Tomas Bzatek <tbzatek@redhat.com> +# + +from common import JournalBase +import time +import syslog + +class TestIndications(JournalBase): + """ + Class for testing LMI_JournalMessageLog indications + """ + + def test_check_good_filter(self): + """ + Journal: Test good indication filter + """ + filter_name = "test_good_filter_%d" % (time.time() * 1000) + sub = self.subscribe(filter_name, "select * from LMI_JournalLogRecordInstanceCreationIndication where SourceInstance isa LMI_JournalLogRecord") + self.assertIsNotNone(sub) + self.unsubscribe(filter_name); + + + def test_message_send(self): + """ + Journal: Test message logging and its retrieval from journal + """ + filter_name = "test_message_send_%d" % (time.time() * 1000) + syslog_msg = "== LMI_Journald test message ==" + sub = self.subscribe(filter_name, "select * from LMI_JournalLogRecordInstanceCreationIndication where SourceInstance isa LMI_JournalLogRecord") + syslog.syslog(syslog_msg) + indication = self.get_indication(10) + self.assertEqual(indication.classname, "LMI_JournalLogRecordInstanceCreationIndication") + self.assertIn("SourceInstance", indication.keys()) + self.assertTrue(indication["SourceInstance"] is not None) + self.assertEqual(indication["SourceInstance"]["DataFormat"], syslog_msg) + self.unsubscribe(filter_name); diff --git a/src/journald/test/__init__.py b/src/journald/test/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/journald/test/__init__.py diff --git a/src/journald/test/common.py b/src/journald/test/common.py new file mode 100644 index 0000000..d65b5d1 --- /dev/null +++ b/src/journald/test/common.py @@ -0,0 +1,202 @@ +# Copyright (C) 2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Roman Rakus <rrakus@redhat.com> +# Tomas Bzatek <tbzatek@redhat.com> +# + +import pywbem +import os +import unittest +import Queue +import random +import BaseHTTPServer +import socket +import threading + +""" +Base class for all tests +""" + +class CIMListener(object): + """ CIM Listener + """ + class CIMHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_POST(self): + data = self.rfile.read(int(self.headers['Content-Length'])) + tt = pywbem.parse_cim(pywbem.xml_to_tupletree(data)) + # Get the instance from CIM-XML, copied from + # http://sf.net/apps/mediawiki/pywbem/?title=Indications_Tutorial + insts = [x[1] for x in tt[2][2][0][2][2]] + for inst in insts: + self.callback(inst) + self.send_response(200) + self.end_headers() + + def log_message(self, format, *p): + # suppress log messages + pass + + def __init__(self, callback, http_port=5988): + self.address = ('', http_port) + self.CIMHandler.callback = callback + self.thread = None + self.server = None + + def start(self): + BaseHTTPServer.HTTPServer.allow_reuse_address = True + self.server = BaseHTTPServer.HTTPServer(self.address, self.CIMHandler) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + + def stop(self): + if self.server is not None: + self.server.shutdown() + self.server.socket.close() + if self.thread is not None: + self.thread.join() + + def running(self): + return self.thread is not None + + +class JournalBase(unittest.TestCase): + """ + Base class for all LMI Journal tests + """ + def setUp(self): + """ + Connnect to server + """ + self.url = os.environ.get("LMI_CIMOM_URL", "https://localhost:5989") + self.username = os.environ.get("LMI_CIMOM_USERNAME", "root") + self.password = os.environ.get("LMI_CIMOM_PASSWORD", "") + self.wbemconnection = pywbem.WBEMConnection(self.url, + (self.username, self.password)) + + # for indications + self.indication_port = random.randint(12000, 13000) + self.indication_queue = Queue.Queue() + self.listener = CIMListener( + callback=self._process_indication, + http_port=self.indication_port) + + self.subscribed = {} + + def tearDown(self): + self.listener.stop() + if self.subscribed: + for name in self.subscribed.keys(): + self.unsubscribe(name) + + def get_indication(self, timeout): + """ Wait for an indication for given nr. of seconds and return it.""" + try: + indication = self.indication_queue.get(timeout=timeout) + except Queue.Empty: + raise AssertionError("Timeout when waiting for indicaiton") + self.indication_queue.task_done() + return indication + + def subscribe(self, filter_name, query=None, querylang="DMTF:CQL"): + """ + Create indication subscription for given filter name. + """ + namespace = "root/interop" + hostname = socket.gethostname() + + if query is not None: + # Create filter first + filterinst = pywbem.CIMInstance('CIM_IndicationFilter') + filterinst['CreationClassName'] = 'CIM_IndicationFilter' + filterinst['SystemCreationClassName'] = 'CIM_ComputerSystem' + filterinst['SystemName'] = hostname + filterinst['Name'] = filter_name + filterinst['Query'] = query + filterinst['QueryLanguage'] = querylang + filterinst['SourceNamespace'] = "root/cimv2"#namespace + cop = pywbem.CIMInstanceName('CIM_IndicationFilter') + cop.keybindings = { 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': 'CIM_ComputerSystem', + 'SystemName': hostname, + 'Name': filter_name + } + cop.namespace=namespace + filterinst.path = cop + indfilter = self.wbemconnection.CreateInstance(filterinst) + else: + # the filter is already created, assemble its name + indfilter = pywbem.CIMInstanceName( + classname="CIM_IndicationFilter", + namespace=namespace, + keybindings={ + 'CreationClassName': 'CIM_IndicationFilter', + 'SystemClassName': 'CIM_ComputerSystem', + 'SystemName': hostname, + 'Name': filter_name}) + + # create destination + destinst = pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') + destinst['CreationClassName'] = 'CIM_ListenerDestinationCIMXML' + destinst['SystemCreationClassName'] = 'CIM_ComputerSystem' + destinst['SystemName'] = hostname + destinst['Name'] = filter_name + destinst['Destination'] = "http://localhost:%d" % (self.indication_port) + destinst['PersistenceType'] = pywbem.Uint16(3) # Transient + cop = pywbem.CIMInstanceName('CIM_ListenerDestinationCIMXML') + cop.keybindings = { 'CreationClassName':'CIM_ListenerDestinationCIMXML', + 'SystemClassName':'CIM_ComputerSystem', + 'SystemName':hostname, + 'Name':filter_name } + cop.namespace = namespace + destinst.path = cop + destname = self.wbemconnection.CreateInstance(destinst) + + # create the subscription + subinst = pywbem.CIMInstance('CIM_IndicationSubscription') + subinst['Filter'] = indfilter + subinst['Handler'] = destname + cop = pywbem.CIMInstanceName('CIM_IndicationSubscription') + cop.keybindings = { 'Filter': indfilter, + 'Handler': destname } + cop.namespace = namespace + subinst.path = cop + subscription = self.wbemconnection.CreateInstance(subinst) + + self.subscribed[filter_name] = [subscription, destname] + + # start listening + if not self.listener.running(): + self._start_listening() + return subscription + + def unsubscribe(self, filter_name): + """ + Unsubscribe fron given filter. + """ + _list = self.subscribed.pop(filter_name) + for instance in _list: + self.wbemconnection.DeleteInstance(instance) + + def _start_listening(self): + """ Start listening for incoming indications. """ + self.listener.start() + + def _process_indication(self, indication): + """ Callback to process one indication.""" + self.indication_queue.put(indication) + + diff --git a/src/journald/test/localtest.sh b/src/journald/test/localtest.sh new file mode 100755 index 0000000..3667ebc --- /dev/null +++ b/src/journald/test/localtest.sh @@ -0,0 +1 @@ +LMI_CIMOM_PASSWORD="pass" LMI_CIMOM_USERNAME="pegasus" nosetests -v |