From 355b6818b8ff08cd7e0d72f790bcc98504924e48 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 9 Jan 2012 11:55:57 +0100 Subject: imudp: added per-listner stats settings & more refactoring --- plugins/imudp/imudp.c | 154 ++++++++++++++++++++++++++------------------------ 1 file changed, 80 insertions(+), 74 deletions(-) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0e6c6614..0db6bf9a 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -26,6 +26,7 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" +#include #include #include #include @@ -69,14 +70,14 @@ DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) DEFobjCurrIf(statsobj) -statsobj_t *modStats; -STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) -static struct cnfinfo_s { +static struct lstn_s { + struct lstn_s *next; int sock; /* socket */ ruleset_t *pRuleset; /* bound ruleset */ -} *lcnfinfo = NULL; /**< the structure contains information needed to configure the listeners */ -static int nLstn = 0; /**< number of listners */ + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +} *lcnfRoot = NULL, *lcnfLast = NULL; static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -199,9 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) DEFiRet; uchar *bindAddr; int *newSocks; - int iSrc, iDst; - struct cnfinfo_s *newlcnfinfo; - int newnLstn; + int iSrc; + struct lstn_s *newlcnfinfo; + uchar *bindName; + uchar *port; + uchar statname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -212,35 +215,37 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) bindAddr = NULL; else bindAddr = pszBindAddr; + bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr; - DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", - (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal); + DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal); - newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1); + port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal; + newSocks = net.create_udp_socket(bindAddr, port, 1); if(newSocks != NULL) { /* we now need to add the new sockets to the existing set */ - if(lcnfinfo == NULL) { - /* esay, src and dest are equal */ - nLstn = newSocks[0]; - CHKmalloc(lcnfinfo = (struct cnfinfo_s*) MALLOC(sizeof(struct cnfinfo_s) * nLstn)); - for(iDst = 0, iSrc=1 ; iDst < nLstn ; ++iDst, ++iSrc) { - lcnfinfo[iDst].sock = newSocks[iSrc]; - lcnfinfo[iDst].pRuleset = pBindRuleset; - } - } else { - /* we need to add them */ - newnLstn = nLstn + newSocks[0]; - CHKmalloc(newlcnfinfo = (struct cnfinfo_s*) MALLOC(sizeof(struct cnfinfo_s) * newnLstn)); - /* ready to copy */ - iDst = nLstn; - memcpy(newlcnfinfo, lcnfinfo, nLstn * sizeof(struct cnfinfo_s)); - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) { - newlcnfinfo[iDst].sock = newSocks[iSrc]; - newlcnfinfo[iDst].pRuleset = pBindRuleset; - } - free(lcnfinfo); - lcnfinfo = newlcnfinfo; - nLstn = newnLstn; + /* ready to copy */ + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { + CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s))); + newlcnfinfo->next = NULL; + newlcnfinfo->sock = newSocks[iSrc]; + newlcnfinfo->pRuleset = pBindRuleset; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); + snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats)); + /* link to list. Order must be preserved to take care for + * conflicting matches. + */ + if(lcnfRoot == NULL) + lcnfRoot = newlcnfinfo; + if(lcnfLast == NULL) + lcnfLast = newlcnfinfo; + else + lcnfLast->next = newlcnfinfo; } } @@ -289,7 +294,7 @@ finalize_it: * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { DEFiRet; int iNbrTimeUsed; @@ -309,7 +314,7 @@ processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetP if(pThrd->bShallStop == TRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(lcnfinfo[idxLstn].sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -354,7 +359,7 @@ processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetP *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lcnfinfo[idxLstn].sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted != 0) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { @@ -364,14 +369,14 @@ processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetP CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); - MsgSetRuleset(pMsg, lcnfinfo[idxLstn].pRuleset); + MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); CHKiRet(submitMsg(pMsg)); - STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } @@ -438,6 +443,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) struct epoll_event *udpEPollEvt = NULL; struct epoll_event currEvt[NUM_EPOLL_EVENTS]; char errStr[1024]; + struct lstn_s *lstn; + int nLstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -446,6 +453,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); + /* count num listeners -- do it here in order to avoid inconsistency */ + nLstn = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) + ++nLstn; CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event))); #if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) @@ -466,16 +477,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ - for (i = 0; i < nLstn ; i++) { - if (lcnfinfo[i].sock != -1) { + i = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if(lstn->sock != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; - udpEPollEvt[i].data.u64 = i; - if(epoll_ctl(efd, EPOLL_CTL_ADD, lcnfinfo[i].sock, &(udpEPollEvt[i])) < 0) { + udpEPollEvt[i].data.u64 = (long long unsigned) lstn; + if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", - lcnfinfo[i].sock, errStr); + lstn->sock, errStr); } } + i++; } while(1) { @@ -487,7 +500,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(pThrd, (int)currEvt[i].data.u64, &frominetPrev, &bIsPermitted); + processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted); } } @@ -504,10 +517,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DEFiRet; int maxfds; int nfds; - int i; fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; + struct lstn_s *lstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -518,22 +531,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DBGPRINTF("imudp uses select()\n"); while(1) { - /* Add the Unix Domain Sockets to the list of read - * descriptors. - * rgerhards 2005-08-01: we must now check if there are - * any local sockets to listen to at all. If the -o option - * is given without -a, we do not need to listen at all.. + /* Add the Unix Domain Sockets to the list of read descriptors. */ maxfds = 0; FD_ZERO(&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ - for (i = 0; i < nLstn ; i++) { - if (lcnfinfo[i].sock != -1) { + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if (lstn->sock != -1) { if(Debug) - net.debugListenInfo(lcnfinfo[i].sock, "UDP"); - FD_SET(lcnfinfo[i].sock, &readfds); - if(lcnfinfo[i].sock>maxfds) maxfds=lcnfinfo[i].sock; + net.debugListenInfo(lstn->sock, "UDP"); + FD_SET(lstn->sock, &readfds); + if(lstn->sock>maxfds) maxfds=lstn->sock; } } if(Debug) { @@ -549,9 +558,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ - for(i = 0; nfds && i < nLstn ; i++) { - if(FD_ISSET(lcnfinfo[i].sock, &readfds)) { - processSocket(pThrd, i, &frominetPrev, &bIsPermitted); + for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { + if(FD_ISSET(lstn->sock, &readfds)) { + processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } @@ -584,8 +593,10 @@ CODESTARTwillRun net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */ /* if we could not set up any listners, there is no point in running... */ - if(nLstn == 0) + if(lcnfRoot == NULL) { + DBGPRINTF("imudp: no listeners configured, will not run\n"); ABORT_FINALIZE(RS_RET_NO_RUN); + } iMaxLine = glbl.GetMaxLine(); @@ -595,14 +606,18 @@ ENDwillRun BEGINafterRun - int i; + struct lstn_s *lstn, *lstnDel; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); - for (i = 0; i < nLstn ; i++) - close(lcnfinfo[i].sock); - free(lcnfinfo); - lcnfinfo = NULL; + for(lstn = lcnfRoot ; lstn != NULL ; ) { + statsobj.Destruct(&(lstn->stats)); + close(lstn->sock); + lstnDel = lstn; + lstn = lstn->next; + free(lstnDel); + } + lcnfRoot = lcnfLast = NULL; if(pRcvBuf != NULL) { free(pRcvBuf); pRcvBuf = NULL; @@ -614,8 +629,6 @@ ENDafterRun BEGINmodExit CODESTARTmodExit - statsobj.Destruct(&modStats); - /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); @@ -678,13 +691,6 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); - - /* support statistics gathering */ - CHKiRet(statsobj.Construct(&modStats)); - CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imudp"))); - CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"), - ctrType_IntCtr, &ctrSubmit)); - CHKiRet(statsobj.ConstructFinalize(modStats)); ENDmodInit /* vim:set ai: */ -- cgit