diff options
-rw-r--r-- | action.c | 58 | ||||
-rw-r--r-- | action.h | 4 | ||||
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | doc/impstats.html | 4 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 173 | ||||
-rw-r--r-- | runtime/msg.c | 4 | ||||
-rw-r--r-- | runtime/queue.c | 11 | ||||
-rw-r--r-- | runtime/queue.h | 2 | ||||
-rw-r--r-- | tcps_sess.c | 1 | ||||
-rw-r--r-- | tcpsrv.c | 15 | ||||
-rw-r--r-- | tcpsrv.h | 3 |
11 files changed, 183 insertions, 94 deletions
@@ -112,6 +112,7 @@ #include "datetime.h" #include "unicode-helper.h" #include "atomic.h" +#include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ @@ -127,6 +128,7 @@ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ @@ -263,6 +265,12 @@ rsRetVal actionDestruct(action_t *pThis) qqueueDestruct(&pThis->pQueue); } + /* destroy stats object, if we have one (may not always be + * be the case, e.g. if turned off) + */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); + if(pThis->pMod != NULL) pThis->pMod->freeInstance(pThis->pModData); @@ -313,12 +321,35 @@ rsRetVal actionConstructFinalize(action_t *pThis) { DEFiRet; - uchar pszQName[64]; /* friendly name of our queue */ + uchar pszAName[64]; /* friendly name of our queue */ ASSERT(pThis != NULL); + /* generate a friendly name for us action stats */ + if(pThis->pszName == NULL) { + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); + } else { + ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); + pszAName[63] = '\0'; /* to be on the save side */ + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, pszAName)); + + STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), + ctrType_IntCtr, &pThis->ctrProcessed)); + + STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), + ctrType_IntCtr, &pThis->ctrFail)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + + /* create our queue */ /* find a name for our queue */ - snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", iActionNbr); /* now check if we can run the action in "firehose mode" during stage one of * its processing (that is before messages are enqueued into the action q). @@ -362,7 +393,7 @@ actionConstructFinalize(action_t *pThis) */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); - obj.SetName((obj_t*) pThis->pQueue, pszQName); + obj.SetName((obj_t*) pThis->pQueue, pszAName); /* ... set some properties ... */ # define setQPROP(func, directive, data) \ @@ -1072,6 +1103,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) && pBatch->pElem[i].state != BATCH_STATE_COMM ) { pBatch->pElem[i].state = BATCH_STATE_BAD; pBatch->pElem[i].bPrevWasSuspended = 1; + STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); } } bDone = 1; @@ -1261,6 +1293,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); else @@ -1537,6 +1570,17 @@ finalize_it: RETiRet; } +static inline void +countStatsBatchEnq(action_t *pAction, batch_t *pBatch) +{ + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + } + } +} + /* enqueue a batch in direct mode. We have put this into its own function just to avoid * cluttering the actual submit function. @@ -1573,13 +1617,16 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) pBatch->pElem[i].bFilterOK = 0; bModifiedFilter = 1; } - if(pBatch->pElem[i].bFilterOK) + if(pBatch->pElem[i].bFilterOK) { + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); bNeedSubmit = 1; + } DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); } if(bNeedSubmit) { + /* note: stats were already computed above */ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } else { DBGPRINTF("no need to submit batch, all bFilterOK==0\n"); @@ -1594,6 +1641,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } } } else { + if(GatherStats) + countStatsBatchEnq(pAction, pBatch); iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } @@ -1817,6 +1866,7 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); @@ -89,6 +89,10 @@ struct action_s { pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ uchar *pszName; /* action name (for documentation) */ DEF_ATOMIC_HELPER_MUT(mutCAS); + /* for statistics subsystem */ + statsobj_t *statsobj; + STATSCOUNTER_DEF(ctrProcessed, mutCtrProcessed); + STATSCOUNTER_DEF(ctrFail, mutCtrFail); }; diff --git a/configure.ac b/configure.ac index 5a77a8a6..dd305221 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ(2.61) -AC_INIT([rsyslog],[5.8.6],[rsyslog@lists.adiscon.com]) +AC_INIT([rsyslog],[5.8.6-newstats3],[rsyslog@lists.adiscon.com]) AM_INIT_AUTOMAKE m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])]) diff --git a/doc/impstats.html b/doc/impstats.html index cede4874..260c1aa4 100644 --- a/doc/impstats.html +++ b/doc/impstats.html @@ -18,7 +18,9 @@ prepared to change your trending scripts when you upgrade to a newer rsyslog ver output is periodic, with the interval being configurable (default is 5 minutes). Be sure that your configuration records the counter messages (default is syslog.info). <p>Note that loading this module has impact on rsyslog performance. Depending on -settings, this impact may be severe (for high-load environments). +settings, this impact may be noticable (for high-load environments). +<p>The rsyslog website has an updated overview of available +<a href="http://rsyslog.com/rsyslog-statistic-counter/">rsyslog statistic counters</a>. </p> <p><b>Configuration Directives</b>:</p> <ul> diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index a5002591..0db6bf9a 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -26,6 +26,7 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -51,6 +52,7 @@ #include "datetime.h" #include "prop.h" #include "ruleset.h" +#include "statsobj.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -66,6 +68,16 @@ DEFobjCurrIf(net) DEFobjCurrIf(datetime) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) +DEFobjCurrIf(statsobj) + + +static struct lstn_s { + struct lstn_s *next; + int sock; /* socket */ + ruleset_t *pRuleset; /* bound ruleset */ + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +} *lcnfRoot = NULL, *lcnfLast = NULL; static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -73,9 +85,7 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ -static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements - * read-only after init(), but beware of restart! */ -static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */ + static uchar *pszBindAddr = NULL; /* IP to bind socket to */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request @@ -190,9 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) DEFiRet; uchar *bindAddr; int *newSocks; - int *tmpSocks; - int iSrc, iDst; - ruleset_t **tmpRulesets; + int iSrc; + struct lstn_s *newlcnfinfo; + uchar *bindName; + uchar *port; + uchar statname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -203,55 +215,43 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) bindAddr = NULL; else bindAddr = pszBindAddr; + bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr; - DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", - (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal); + DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal); - newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1); + port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal; + newSocks = net.create_udp_socket(bindAddr, port, 1); if(newSocks != NULL) { /* we now need to add the new sockets to the existing set */ - if(udpLstnSocks == NULL) { - /* esay, we can just replace it */ - udpLstnSocks = newSocks; - CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1))); - for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst) - udpRulesets[iDst] = pBindRuleset; - } else { - /* we need to add them */ - tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); - tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); - if(tmpSocks == NULL || tmpRulesets == NULL) { - DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); - /* in this case, we discard the new sockets but continue with what we - * already have - */ - free(newSocks); - free(tmpSocks); - free(tmpRulesets); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } else { - /* ready to copy */ - iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = udpLstnSocks[iSrc]; - tmpRulesets[iDst] = udpRulesets[iSrc]; - } - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = newSocks[iSrc]; - tmpRulesets[iDst] = pBindRuleset; - } - tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; - free(newSocks); - free(udpLstnSocks); - udpLstnSocks = tmpSocks; - free(udpRulesets); - udpRulesets = tmpRulesets; - } + /* ready to copy */ + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { + CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s))); + newlcnfinfo->next = NULL; + newlcnfinfo->sock = newSocks[iSrc]; + newlcnfinfo->pRuleset = pBindRuleset; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); + snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats)); + /* link to list. Order must be preserved to take care for + * conflicting matches. + */ + if(lcnfRoot == NULL) + lcnfRoot = newlcnfinfo; + if(lcnfLast == NULL) + lcnfLast = newlcnfinfo; + else + lcnfLast->next = newlcnfinfo; } } finalize_it: free(pNewVal); /* in any case, this is no longer needed */ + free(newSocks); RETiRet; } @@ -294,8 +294,7 @@ finalize_it: * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - ruleset_t *pRuleset) +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { DEFiRet; int iNbrTimeUsed; @@ -315,7 +314,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, if(pThrd->bShallStop == TRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -360,7 +359,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", fd, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted != 0) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { @@ -370,13 +369,14 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); - MsgSetRuleset(pMsg, pRuleset); + MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); CHKiRet(submitMsg(pMsg)); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } @@ -443,6 +443,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) struct epoll_event *udpEPollEvt = NULL; struct epoll_event currEvt[NUM_EPOLL_EVENTS]; char errStr[1024]; + struct lstn_s *lstn; + int nLstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -451,7 +453,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); - CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); + /* count num listeners -- do it here in order to avoid inconsistency */ + nLstn = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) + ++nLstn; + CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event))); #if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) DBGPRINTF("imudp uses epoll_create1()\n"); @@ -471,16 +477,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + i = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if(lstn->sock != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; - udpEPollEvt[i].data.u64 = i+1; - if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { + udpEPollEvt[i].data.u64 = (long long unsigned) lstn; + if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", - udpLstnSocks[i+1], errStr); + lstn->sock, errStr); } } + i++; } while(1) { @@ -492,8 +500,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, - udpRulesets[currEvt[i].data.u64]); + processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted); } } @@ -510,10 +517,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DEFiRet; int maxfds; int nfds; - int i; fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; + struct lstn_s *lstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -524,22 +531,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DBGPRINTF("imudp uses select()\n"); while(1) { - /* Add the Unix Domain Sockets to the list of read - * descriptors. - * rgerhards 2005-08-01: we must now check if there are - * any local sockets to listen to at all. If the -o option - * is given without -a, we do not need to listen at all.. + /* Add the Unix Domain Sockets to the list of read descriptors. */ maxfds = 0; FD_ZERO(&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if (lstn->sock != -1) { if(Debug) - net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], &readfds); - if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; + net.debugListenInfo(lstn->sock, "UDP"); + FD_SET(lstn->sock, &readfds); + if(lstn->sock>maxfds) maxfds=lstn->sock; } } if(Debug) { @@ -555,10 +558,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ - for(i = 0; nfds && i < *udpLstnSocks; i++) { - if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { - processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, - udpRulesets[i+1]); + for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { + if(FD_ISSET(lstn->sock, &readfds)) { + processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } @@ -570,7 +572,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) #endif /* #if HAVE_EPOLL_CREATE1 */ /* This function is called to gather input. - * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * Note that sock must be non-NULL because otherwise we would not have * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 */ BEGINrunInput @@ -591,8 +593,10 @@ CODESTARTwillRun net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */ /* if we could not set up any listners, there is no point in running... */ - if(udpLstnSocks == NULL) + if(lcnfRoot == NULL) { + DBGPRINTF("imudp: no listeners configured, will not run\n"); ABORT_FINALIZE(RS_RET_NO_RUN); + } iMaxLine = glbl.GetMaxLine(); @@ -602,15 +606,18 @@ ENDwillRun BEGINafterRun + struct lstn_s *lstn, *lstnDel; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; - free(udpRulesets); - udpRulesets = NULL; + for(lstn = lcnfRoot ; lstn != NULL ; ) { + statsobj.Destruct(&(lstn->stats)); + close(lstn->sock); + lstnDel = lstn; + lstn = lstn->next; + free(lstnDel); } + lcnfRoot = lcnfLast = NULL; if(pRcvBuf != NULL) { free(pRcvBuf); pRcvBuf = NULL; @@ -625,6 +632,7 @@ CODESTARTmodExit /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); @@ -662,6 +670,7 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); diff --git a/runtime/msg.c b/runtime/msg.c index 31863b2d..810a396e 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -1671,8 +1671,6 @@ void MsgSetTAG(msg_t *pMsg, uchar* pszBuf, size_t lenBuf) uchar *pBuf; assert(pMsg != NULL); -dbgprintf("MsgSetTAG in: len %d, pszBuf: %s\n", lenBuf, pszBuf); - freeTAG(pMsg); pMsg->iLenTAG = lenBuf; @@ -1691,8 +1689,6 @@ dbgprintf("MsgSetTAG in: len %d, pszBuf: %s\n", lenBuf, pszBuf); memcpy(pBuf, pszBuf, pMsg->iLenTAG); pBuf[pMsg->iLenTAG] = '\0'; /* this also works with truncation! */ - -dbgprintf("MsgSetTAG exit: pMsg->iLenTAG %d, pMsg->TAG.szBuf: %s\n", pMsg->iLenTAG, pMsg->TAG.szBuf); } diff --git a/runtime/queue.c b/runtime/queue.c index 9012abeb..e97d78e8 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -12,7 +12,7 @@ * function names - this makes it really hard to read and does not provide much * benefit, at least I (now) think so... * - * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -1312,6 +1312,7 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize, iSeverity); + STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { @@ -1936,6 +1937,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"), ctrType_IntCtr, &pThis->ctrFull)); + STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"), + ctrType_IntCtr, &pThis->ctrFDscrd)); + STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"), + ctrType_IntCtr, &pThis->ctrNFDscrd)); + pThis->ctrMaxqsize = 0; CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"), ctrType_Int, &pThis->ctrMaxqsize)); @@ -2289,6 +2297,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) // TODO : handle enqOnly => discard! if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } diff --git a/runtime/queue.h b/runtime/queue.h index 97057180..06a58229 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -169,6 +169,8 @@ struct queue_s { statsobj_t *statsobj; STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued); STATSCOUNTER_DEF(ctrFull, mutCtrFull); + STATSCOUNTER_DEF(ctrFDscrd, mutCtrFDscrd); + STATSCOUNTER_DEF(ctrNFDscrd, mutCtrNFDscrd); int ctrMaxqsize; }; diff --git a/tcps_sess.c b/tcps_sess.c index 15423cc8..854d3742 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -259,6 +259,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset); + STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit); if(pMultiSub == NULL) { CHKiRet(submitMsg(pMsg)); } else { @@ -39,8 +39,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -93,6 +93,7 @@ DEFobjCurrIf(netstrm) DEFobjCurrIf(nssel) DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +DEFobjCurrIf(statsobj) /* add new listener port to listener port list @@ -102,6 +103,7 @@ static inline rsRetVal addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) { tcpLstnPortList_t *pEntry; + uchar statname[64]; DEFiRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -121,6 +123,15 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) pEntry->pNext = pThis->pLstnPorts; pThis->pLstnPorts = pEntry; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pEntry->stats))); + snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pEntry->stats, statname)); + CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pEntry->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pEntry->stats)); + finalize_it: RETiRet; } @@ -1071,6 +1082,7 @@ CODESTARTObjClassExit(tcpsrv) objRelease(tcps_sess, DONT_LOAD_LIB); objRelease(conf, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); @@ -1097,6 +1109,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); /* set our own handlers */ @@ -24,6 +24,7 @@ #include "obj.h" #include "prop.h" #include "tcps_sess.h" +#include "statsobj.h" /* support for framing anomalies */ typedef enum ETCPsyslogFramingAnomaly { @@ -39,6 +40,8 @@ struct tcpLstnPortList_s { prop_t *pInputName; tcpsrv_t *pSrv; /**< pointer to higher-level server instance */ ruleset_t *pRuleset; /**< associated ruleset */ + statsobj_t *stats; /**< associated stats object */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) tcpLstnPortList_t *pNext; /**< next port or NULL */ }; |