diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-09-13 15:43:56 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-09-13 15:43:56 +0200 |
commit | e86cb62f1299ef18732f7b3b87d45a840ee38f1e (patch) | |
tree | 8e346e0c74431c09ea174d63bebae80fb7761ef9 /runtime | |
parent | 3d50fd6a67b7d0bf31628aea14128a47dd5d326f (diff) | |
download | rsyslog-e86cb62f1299ef18732f7b3b87d45a840ee38f1e.tar.gz rsyslog-e86cb62f1299ef18732f7b3b87d45a840ee38f1e.tar.xz rsyslog-e86cb62f1299ef18732f7b3b87d45a840ee38f1e.zip |
improved statistics-gathering subsystem
... well, actually this is a first real implementation of this subsystem.
I have added a counter registry, a way to access the countres (as readable
string) and a way to define and maintem them. Also, module impstats has
been updated to utilize the new system. Finally, I added some counters. I
hope that this sets the baseline for useful future enhancements.
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/Makefile.am | 6 | ||||
-rw-r--r-- | runtime/queue.c | 32 | ||||
-rw-r--r-- | runtime/queue.h | 6 | ||||
-rw-r--r-- | runtime/rsyslog.c | 3 | ||||
-rw-r--r-- | runtime/rsyslog.h | 2 | ||||
-rw-r--r-- | runtime/statsobj.c | 315 | ||||
-rw-r--r-- | runtime/statsobj.h | 124 |
7 files changed, 486 insertions, 2 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 5b2598b2..01b224e7 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -45,6 +45,8 @@ librsyslog_la_SOURCES = \ modules.h \ apc.c \ apc.h \ + statsobj.c \ + statsobj.h \ sync.c \ sync.h \ expr.c \ @@ -81,8 +83,8 @@ librsyslog_la_SOURCES = \ prop.h \ cfsysline.c \ cfsysline.h \ - sd-daemon.c \ - sd-daemon.h \ + sd-daemon.c \ + sd-daemon.h \ \ \ ../action.h \ diff --git a/runtime/queue.c b/runtime/queue.c index 60d17086..387c15c2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -58,6 +58,7 @@ #include "errmsg.h" #include "datetime.h" #include "unicode-helper.h" +#include "statsobj.h" #include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS @@ -70,6 +71,7 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(strm) DEFobjCurrIf(errmsg) DEFobjCurrIf(datetime) +DEFobjCurrIf(statsobj) /* forward-definitions */ static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); @@ -1817,6 +1819,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + uchar *qName; size_t lenBuf; ASSERT(pThis != NULL); @@ -1886,6 +1889,27 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ qqueueAdviseMaxWorkers(pThis); pThis->bQueueStarted = 1; + /* support statistics gathering */ + qName = obj.GetName((obj_t*)pThis); + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, qName)); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("size"), + ctrType_Int, &pThis->iQueueSize)); + + STATSCOUNTER_INIT(pThis->ctrEnqueued, pThis->mutCtrEnqueued); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("enqueued"), + ctrType_IntCtr, &pThis->ctrEnqueued)); + + STATSCOUNTER_INIT(pThis->ctrFull, pThis->mutCtrFull); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"), + ctrType_IntCtr, &pThis->ctrFull)); + + pThis->ctrMaxqsize = 0; + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"), + ctrType_Int, &pThis->ctrMaxqsize)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + finalize_it: RETiRet; } @@ -2119,6 +2143,10 @@ CODESTARTobjDestruct(qqueue) free(pThis->pszFilePrefix); free(pThis->pszSpoolDir); + + /* some queues do not provide stats and thus have no statsobj! */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); ENDobjDestruct(qqueue) @@ -2178,6 +2206,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) DEFiRet; struct timespec t; + STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); @@ -2225,6 +2254,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); + STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); // TODO : handle enqOnly => discard! if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); @@ -2235,6 +2265,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* and finally enqueue the message */ CHKiRet(qqueueAdd(pThis, pUsr)); + STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); finalize_it: RETiRet; @@ -2414,6 +2445,7 @@ BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) CHKiRet(objUse(strm, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); /* now set our own handlers */ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); diff --git a/runtime/queue.h b/runtime/queue.h index 1c758134..38e248cd 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -29,6 +29,7 @@ #include "wtp.h" #include "batch.h" #include "stream.h" +#include "statsobj.h" /* support for the toDelete list */ typedef struct toDeleteLst_s toDeleteLst_t; @@ -165,6 +166,11 @@ struct queue_s { } tVars; DEF_ATOMIC_HELPER_MUT(mutQueueSize); DEF_ATOMIC_HELPER_MUT(mutLogDeq); + /* for statistics subsystem */ + statsobj_t *statsobj; + STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued); + STATSCOUNTER_DEF(ctrFull, mutCtrFull); + int ctrMaxqsize; }; diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index a9794840..8baa2b59 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -82,6 +82,7 @@ #include "ruleset.h" #include "parser.h" #include "strgen.h" +#include "statsobj.h" #include "atomic.h" /* forward definitions */ @@ -150,6 +151,8 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) * class immediately after it is initialized. And, of course, we load those classes * first that we use ourselfs... -- rgerhards, 2008-03-07 */ + if(ppErrObj != NULL) *ppErrObj = "statsobj"; + CHKiRet(statsobjClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "prop"; CHKiRet(propClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "glbl"; diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 34eaedca..7b2655b0 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -150,6 +150,8 @@ typedef struct parser_s parser_t; typedef struct parserList_s parserList_t; typedef struct strgen_s strgen_t; typedef struct strgenList_s strgenList_t; +typedef struct statsobj_s statsobj_t; +typedef struct statsctr_s statsctr_t; typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */ typedef uint64 qDeqID; /* queue Dequeue order ID. 32 bits is considered dangerously few */ diff --git a/runtime/statsobj.c b/runtime/statsobj.c new file mode 100644 index 00000000..e1a89cf4 --- /dev/null +++ b/runtime/statsobj.c @@ -0,0 +1,315 @@ +/* The statsobj object. + * + * This object provides a statistics-gathering facility inside rsyslog. This + * functionality will be pragmatically implemented and extended. + * + * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#include "config.h" +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <pthread.h> +#include <errno.h> +#include <assert.h> + +#include "rsyslog.h" +#include "unicode-helper.h" +#include "obj.h" +#include "statsobj.h" +#include "sysvar.h" +#include "srUtils.h" +#include "stringbuf.h" + + +/* externally-visiable data (see statsobj.h for explanation) */ +int GatherStats = 0; + +/* static data */ +DEFobjStaticHelpers + +/* doubly linked list of stats objects. Object is automatically linked to it + * upon construction. Enqueue always happens at the front (simplifies logic). + */ +static statsobj_t *objRoot = NULL; +static statsobj_t *objLast = NULL; + +static pthread_mutex_t mutStats; + +/* ------------------------------ statsobj linked list maintenance ------------------------------ */ + +static inline void +addToObjList(statsobj_t *pThis) +{ + pthread_mutex_lock(&mutStats); + pThis->prev = objLast; + if(objLast != NULL) + objLast->next = pThis; + objLast = pThis; + if(objRoot == NULL) + objRoot = pThis; + pthread_mutex_unlock(&mutStats); +} + + +static inline void +removeFromObjList(statsobj_t *pThis) +{ + pthread_mutex_lock(&mutStats); + if(pThis->prev != NULL) + pThis->prev->next = pThis->next; + if(pThis->next != NULL) + pThis->next->prev = pThis->prev; + if(objLast == pThis) + objLast = pThis->prev; + if(objRoot == pThis) + objRoot = pThis->next; + pthread_mutex_unlock(&mutStats); +} + + +static inline void +addCtrToList(statsobj_t *pThis, ctr_t *pCtr) +{ + pthread_mutex_lock(&pThis->mutCtr); + pCtr->prev = pThis->ctrLast; + if(pThis->ctrLast != NULL) + pThis->ctrLast->next = pCtr; + pThis->ctrLast = pCtr; + if(pThis->ctrRoot == NULL) + pThis->ctrRoot = pCtr; + pthread_mutex_unlock(&pThis->mutCtr); +} + +/* ------------------------------ methods ------------------------------ */ + + +/* Standard-Constructor + */ +BEGINobjConstruct(statsobj) /* be sure to specify the object type also in END macro! */ + pthread_mutex_init(&pThis->mutCtr, NULL); + pThis->ctrLast = NULL; + pThis->ctrRoot = NULL; +ENDobjConstruct(statsobj) + + +/* ConstructionFinalizer + */ +static rsRetVal +statsobjConstructFinalize(statsobj_t *pThis) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, statsobj); + addToObjList(pThis); + RETiRet; +} + + +/* set name. Note that we make our own copy of the memory, caller is + * responsible to free up name it passes in (if required). + */ +static rsRetVal +setName(statsobj_t *pThis, uchar *name) +{ + DEFiRet; + CHKmalloc(pThis->name = ustrdup(name)); +finalize_it: + RETiRet; +} + + +/* add a counter to an object + * ctrName is duplicated, caller must free it if requried + */ +static rsRetVal +addCounter(statsobj_t *pThis, uchar *ctrName, statsCtrType_t ctrType, void *pCtr) +{ + ctr_t *ctr; + DEFiRet; + + CHKmalloc(ctr = malloc(sizeof(ctr_t))); + ctr->next = NULL; + ctr->prev = NULL; + CHKmalloc(ctr->name = ustrdup(ctrName)); + ctr->ctrType = ctrType; + switch(ctrType) { + case ctrType_IntCtr: + ctr->val.pIntCtr = (intctr_t*) pCtr; + break; + case ctrType_Int: + ctr->val.pInt = (int*) pCtr; + break; + } + addCtrToList(pThis, ctr); + +finalize_it: + RETiRet; +} + + +/* get all the object's countes together with object name as one line. + */ +static rsRetVal +getStatsLine(statsobj_t *pThis, cstr_t **ppcstr) +{ + cstr_t *pcstr; + ctr_t *pCtr; + DEFiRet; + + CHKiRet(cstrConstruct(&pcstr)); + rsCStrAppendStr(pcstr, pThis->name); + rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT(": "), 2); + + /* now add all counters to this line */ + pthread_mutex_lock(&pThis->mutCtr); + for(pCtr = pThis->ctrRoot ; pCtr != NULL ; pCtr = pCtr->next) { + rsCStrAppendStr(pcstr, pCtr->name); + cstrAppendChar(pcstr, '='); + switch(pCtr->ctrType) { + case ctrType_IntCtr: + rsCStrAppendInt(pcstr, *(pCtr->val.pIntCtr)); // TODO: OK????? + break; + case ctrType_Int: + rsCStrAppendInt(pcstr, *(pCtr->val.pInt)); + break; + } + cstrAppendChar(pcstr, ' '); + } + pthread_mutex_unlock(&pThis->mutCtr); + + CHKiRet(cstrFinalize(pcstr)); + *ppcstr = pcstr; + +finalize_it: + RETiRet; +} + + +/* this function can be used to obtain all stats lines. In this case, + * a callback must be provided. This module than iterates over all objects and + * submits each stats line to the callback. The callback has two parameters: + * the first one is a caller-provided void*, the second one the cstr_t with the + * line. If the callback reports an error, processing is stopped. + */ +static rsRetVal +getAllStatsLines(rsRetVal(*cb)(void*, cstr_t*), void *usrptr) +{ + statsobj_t *o; + cstr_t *cstr; + DEFiRet; + + for(o = objRoot ; o != NULL ; o = o->next) { + CHKiRet(getStatsLine(o, &cstr)); + CHKiRet(cb(usrptr, cstr)); + rsCStrDestruct(&cstr); + } + +finalize_it: + RETiRet; +} + + +/* Enable statistics gathering. currently there is no function to disable it + * again, as this is right now not needed. + */ +static rsRetVal +enableStats() +{ + GatherStats = 1; + return RS_RET_OK; +} + + +/* destructor for the statsobj object */ +BEGINobjDestruct(statsobj) /* be sure to specify the object type also in END and CODESTART macros! */ + ctr_t *ctr, *ctrToDel; +CODESTARTobjDestruct(statsobj) + removeFromObjList(pThis); + + /* destruct counters */ + ctr = pThis->ctrRoot; + while(ctr != NULL) { + ctrToDel = ctr; + ctr = ctr->next; + free(ctrToDel->name); + free(ctrToDel); + } + + pthread_mutex_destroy(&pThis->mutCtr); + free(pThis->name); +ENDobjDestruct(statsobj) + + +/* debugprint for the statsobj object */ +BEGINobjDebugPrint(statsobj) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDebugPrint(statsobj) + dbgoprint((obj_t*) pThis, "statsobj object, currently no state info available\n"); +ENDobjDebugPrint(statsobj) + + +/* queryInterface function + */ +BEGINobjQueryInterface(statsobj) +CODESTARTobjQueryInterface(statsobj) + if(pIf->ifVersion != statsobjCURR_IF_VERSION) { /* check for current version, increment on each change */ + ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED); + } + + /* ok, we have the right interface, so let's fill it + * Please note that we may also do some backwards-compatibility + * work here (if we can support an older interface version - that, + * of course, also affects the "if" above). + */ + pIf->Construct = statsobjConstruct; + pIf->ConstructFinalize = statsobjConstructFinalize; + pIf->Destruct = statsobjDestruct; + pIf->DebugPrint = statsobjDebugPrint; + pIf->SetName = setName; + pIf->GetStatsLine = getStatsLine; + pIf->GetAllStatsLines = getAllStatsLines; + pIf->AddCounter = addCounter; + pIf->EnableStats = enableStats; +finalize_it: +ENDobjQueryInterface(statsobj) + + +/* Initialize the statsobj class. Must be called as the very first method + * before anything else is called inside this class. + */ +BEGINAbstractObjClassInit(statsobj, 1, OBJ_IS_CORE_MODULE) /* class, version */ + /* request objects we use */ + + /* set our own handlers */ + OBJSetMethodHandler(objMethod_DEBUGPRINT, statsobjDebugPrint); + OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, statsobjConstructFinalize); + + /* init other data items */ + pthread_mutex_init(&mutStats, NULL); + +ENDObjClassInit(statsobj) + +/* Exit the class. + */ +BEGINObjClassExit(statsobj, OBJ_IS_CORE_MODULE) /* class, version */ + /* release objects we no longer need */ + pthread_mutex_destroy(&mutStats); +ENDObjClassExit(statsobj) diff --git a/runtime/statsobj.h b/runtime/statsobj.h new file mode 100644 index 00000000..1ab02182 --- /dev/null +++ b/runtime/statsobj.h @@ -0,0 +1,124 @@ +/* The statsobj object. + * + * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ +#ifndef INCLUDED_STATSOBJ_H +#define INCLUDED_STATSOBJ_H + +#include "atomic.h" + +/* The following data item is somewhat dirty, in that it does not follow + * our usual object calling conventions. However, much like with "Debug", we + * do this to gain speed. If we finally come to a platform that does not + * provide resolution of names for dynamically loaded modules, we need to find + * a work-around, but until then, we use the direct access. + * If set to 0, statistics are not gathered, otherwise they are. + */ +extern int GatherStats; + +/* our basic counter type -- need 32 bit on 32 bit platform. + * IMPORTANT: this type *MUST* be supported by atomic instructions! + */ +typedef uint64 intctr_t; + +/* counter types */ +typedef enum statsCtrType_e { + ctrType_IntCtr, + ctrType_Int +} statsCtrType_t; + + +/* helper entity, the counter */ +typedef struct ctr_s { + uchar *name; + statsCtrType_t ctrType; + union { + intctr_t *pIntCtr; + int *pInt; + } val; + struct ctr_s *next, *prev; +} ctr_t; + +/* the statsobj object */ +struct statsobj_s { + BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ + uchar *name; + pthread_mutex_t mutCtr; /* to guard counter linked-list ops */ + ctr_t *ctrRoot; /* doubly-linked list of statsobj counters */ + ctr_t *ctrLast; + /* used to link ourselves together */ + statsobj_t *prev; + statsobj_t *next; +}; + + +/* interfaces */ +BEGINinterface(statsobj) /* name must also be changed in ENDinterface macro! */ + INTERFACEObjDebugPrint(statsobj); + rsRetVal (*Construct)(statsobj_t **ppThis); + rsRetVal (*ConstructFinalize)(statsobj_t *pThis); + rsRetVal (*Destruct)(statsobj_t **ppThis); + rsRetVal (*SetName)(statsobj_t *pThis, uchar *name); + rsRetVal (*GetStatsLine)(statsobj_t *pThis, cstr_t **ppcstr); + rsRetVal (*GetAllStatsLines)(rsRetVal(*cb)(void*, cstr_t*), void *usrptr); + rsRetVal (*AddCounter)(statsobj_t *pThis, uchar *ctrName, statsCtrType_t ctrType, void *pCtr); + rsRetVal (*EnableStats)(void); +ENDinterface(statsobj) +#define statsobjCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ + + +/* prototypes */ +PROTOTYPEObj(statsobj); + + +/* macros to handle stats counters + * These are to be used by "counter providers". Note that we MUST + * specify the mutex name, even though at first it looks like it + * could be automatically be generated via e.g. "mut##ctr". + * Unfortunately, this does not work if counter is e.g. "pThis->ctr". + * So we decided, for clarity, to always insist on specifying the mutex + * name (after all, it's just a few more keystrokes...). + */ +#define STATSCOUNTER_DEF(ctr, mut) \ + intctr_t ctr; \ + DEF_ATOMIC_HELPER_MUT(mut) + +#define STATSCOUNTER_INIT(ctr, mut) \ + INIT_ATOMIC_HELPER_MUT(mut); \ + ctr = 0; + +#define STATSCOUNTER_INC(ctr, mut) \ + if(GatherStats) \ + ATOMIC_INC(&ctr, mut); + +#define STATSCOUNTER_DEC(ctr, mut) \ + if(GatherStats) \ + ATOMIC_DEC(&ctr, mut); + +/* the next macro works only if the variable is already guarded + * by mutex (or the users risks a wrong result). It is assumed + * that there are not concurrent operations that modify the counter. + */ +#define STATSCOUNTER_SETMAX_NOMUT(ctr, newmax) \ + if(GatherStats && ((newmax) > (ctr))) \ + ctr = newmax; + +#endif /* #ifndef INCLUDED_STATSOBJ_H */ |