summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-01-09 11:55:57 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2012-01-09 11:55:57 +0100
commit355b6818b8ff08cd7e0d72f790bcc98504924e48 (patch)
treec21bd425a895d03bfc9159ba7293b2deace474ed /plugins
parentc0b3c93a8b16d4b7aaf912405fce77a4be365f0a (diff)
downloadrsyslog-355b6818b8ff08cd7e0d72f790bcc98504924e48.tar.gz
rsyslog-355b6818b8ff08cd7e0d72f790bcc98504924e48.tar.xz
rsyslog-355b6818b8ff08cd7e0d72f790bcc98504924e48.zip
imudp: added per-listner stats settings & more refactoring
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imudp/imudp.c154
1 files changed, 80 insertions, 74 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 0e6c6614..0db6bf9a 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -26,6 +26,7 @@
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -69,14 +70,14 @@ DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
DEFobjCurrIf(statsobj)
-statsobj_t *modStats;
-STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
-static struct cnfinfo_s {
+static struct lstn_s {
+ struct lstn_s *next;
int sock; /* socket */
ruleset_t *pRuleset; /* bound ruleset */
-} *lcnfinfo = NULL; /**< the structure contains information needed to configure the listeners */
-static int nLstn = 0; /**< number of listners */
+ statsobj_t *stats; /* listener stats */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+} *lcnfRoot = NULL, *lcnfLast = NULL;
static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
@@ -199,9 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
DEFiRet;
uchar *bindAddr;
int *newSocks;
- int iSrc, iDst;
- struct cnfinfo_s *newlcnfinfo;
- int newnLstn;
+ int iSrc;
+ struct lstn_s *newlcnfinfo;
+ uchar *bindName;
+ uchar *port;
+ uchar statname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -212,35 +215,37 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
bindAddr = NULL;
else
bindAddr = pszBindAddr;
+ bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr;
- DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n",
- (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal);
+ DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal);
- newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1);
+ port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal;
+ newSocks = net.create_udp_socket(bindAddr, port, 1);
if(newSocks != NULL) {
/* we now need to add the new sockets to the existing set */
- if(lcnfinfo == NULL) {
- /* esay, src and dest are equal */
- nLstn = newSocks[0];
- CHKmalloc(lcnfinfo = (struct cnfinfo_s*) MALLOC(sizeof(struct cnfinfo_s) * nLstn));
- for(iDst = 0, iSrc=1 ; iDst < nLstn ; ++iDst, ++iSrc) {
- lcnfinfo[iDst].sock = newSocks[iSrc];
- lcnfinfo[iDst].pRuleset = pBindRuleset;
- }
- } else {
- /* we need to add them */
- newnLstn = nLstn + newSocks[0];
- CHKmalloc(newlcnfinfo = (struct cnfinfo_s*) MALLOC(sizeof(struct cnfinfo_s) * newnLstn));
- /* ready to copy */
- iDst = nLstn;
- memcpy(newlcnfinfo, lcnfinfo, nLstn * sizeof(struct cnfinfo_s));
- for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) {
- newlcnfinfo[iDst].sock = newSocks[iSrc];
- newlcnfinfo[iDst].pRuleset = pBindRuleset;
- }
- free(lcnfinfo);
- lcnfinfo = newlcnfinfo;
- nLstn = newnLstn;
+ /* ready to copy */
+ for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) {
+ CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s)));
+ newlcnfinfo->next = NULL;
+ newlcnfinfo->sock = newSocks[iSrc];
+ newlcnfinfo->pRuleset = pBindRuleset;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
+ snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats));
+ /* link to list. Order must be preserved to take care for
+ * conflicting matches.
+ */
+ if(lcnfRoot == NULL)
+ lcnfRoot = newlcnfinfo;
+ if(lcnfLast == NULL)
+ lcnfLast = newlcnfinfo;
+ else
+ lcnfLast->next = newlcnfinfo;
}
}
@@ -289,7 +294,7 @@ finalize_it:
* on scheduling order. -- rgerhards, 2008-10-02
*/
static inline rsRetVal
-processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
+processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
DEFiRet;
int iNbrTimeUsed;
@@ -309,7 +314,7 @@ processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetP
if(pThrd->bShallStop == TRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
socklen = sizeof(struct sockaddr_storage);
- lenRcvBuf = recvfrom(lcnfinfo[idxLstn].sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
+ lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
if(lenRcvBuf < 0) {
if(errno != EINTR && errno != EAGAIN) {
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -354,7 +359,7 @@ processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetP
*pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lcnfinfo[idxLstn].sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
if(*pbIsPermitted != 0) {
if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
@@ -364,14 +369,14 @@ processSocket(thrdInfo_t *pThrd, int idxLstn, struct sockaddr_storage *frominetP
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
- MsgSetRuleset(pMsg, lcnfinfo[idxLstn].pRuleset);
+ MsgSetRuleset(pMsg, lstn->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL;
if(*pbIsPermitted == 2)
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
CHKiRet(submitMsg(pMsg));
- STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
+ STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
@@ -438,6 +443,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
struct epoll_event *udpEPollEvt = NULL;
struct epoll_event currEvt[NUM_EPOLL_EVENTS];
char errStr[1024];
+ struct lstn_s *lstn;
+ int nLstn;
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
@@ -446,6 +453,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
+ /* count num listeners -- do it here in order to avoid inconsistency */
+ nLstn = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next)
+ ++nLstn;
CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event)));
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
@@ -466,16 +477,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* fill the epoll set - we need to do this only once, as the set
* can not change dyamically.
*/
- for (i = 0; i < nLstn ; i++) {
- if (lcnfinfo[i].sock != -1) {
+ i = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if(lstn->sock != -1) {
udpEPollEvt[i].events = EPOLLIN | EPOLLET;
- udpEPollEvt[i].data.u64 = i;
- if(epoll_ctl(efd, EPOLL_CTL_ADD, lcnfinfo[i].sock, &(udpEPollEvt[i])) < 0) {
+ udpEPollEvt[i].data.u64 = (long long unsigned) lstn;
+ if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) {
rs_strerror_r(errno, errStr, sizeof(errStr));
errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n",
- lcnfinfo[i].sock, errStr);
+ lstn->sock, errStr);
}
}
+ i++;
}
while(1) {
@@ -487,7 +500,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
break; /* terminate input! */
for(i = 0 ; i < nfds ; ++i) {
- processSocket(pThrd, (int)currEvt[i].data.u64, &frominetPrev, &bIsPermitted);
+ processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted);
}
}
@@ -504,10 +517,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
DEFiRet;
int maxfds;
int nfds;
- int i;
fd_set readfds;
struct sockaddr_storage frominetPrev;
int bIsPermitted;
+ struct lstn_s *lstn;
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
@@ -518,22 +531,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
DBGPRINTF("imudp uses select()\n");
while(1) {
- /* Add the Unix Domain Sockets to the list of read
- * descriptors.
- * rgerhards 2005-08-01: we must now check if there are
- * any local sockets to listen to at all. If the -o option
- * is given without -a, we do not need to listen at all..
+ /* Add the Unix Domain Sockets to the list of read descriptors.
*/
maxfds = 0;
FD_ZERO(&readfds);
/* Add the UDP listen sockets to the list of read descriptors. */
- for (i = 0; i < nLstn ; i++) {
- if (lcnfinfo[i].sock != -1) {
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if (lstn->sock != -1) {
if(Debug)
- net.debugListenInfo(lcnfinfo[i].sock, "UDP");
- FD_SET(lcnfinfo[i].sock, &readfds);
- if(lcnfinfo[i].sock>maxfds) maxfds=lcnfinfo[i].sock;
+ net.debugListenInfo(lstn->sock, "UDP");
+ FD_SET(lstn->sock, &readfds);
+ if(lstn->sock>maxfds) maxfds=lstn->sock;
}
}
if(Debug) {
@@ -549,9 +558,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
- for(i = 0; nfds && i < nLstn ; i++) {
- if(FD_ISSET(lcnfinfo[i].sock, &readfds)) {
- processSocket(pThrd, i, &frominetPrev, &bIsPermitted);
+ for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) {
+ if(FD_ISSET(lstn->sock, &readfds)) {
+ processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted);
--nfds; /* indicate we have processed one descriptor */
}
}
@@ -584,8 +593,10 @@ CODESTARTwillRun
net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */
/* if we could not set up any listners, there is no point in running... */
- if(nLstn == 0)
+ if(lcnfRoot == NULL) {
+ DBGPRINTF("imudp: no listeners configured, will not run\n");
ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
iMaxLine = glbl.GetMaxLine();
@@ -595,14 +606,18 @@ ENDwillRun
BEGINafterRun
- int i;
+ struct lstn_s *lstn, *lstnDel;
CODESTARTafterRun
/* do cleanup here */
net.clearAllowedSenders((uchar*)"UDP");
- for (i = 0; i < nLstn ; i++)
- close(lcnfinfo[i].sock);
- free(lcnfinfo);
- lcnfinfo = NULL;
+ for(lstn = lcnfRoot ; lstn != NULL ; ) {
+ statsobj.Destruct(&(lstn->stats));
+ close(lstn->sock);
+ lstnDel = lstn;
+ lstn = lstn->next;
+ free(lstnDel);
+ }
+ lcnfRoot = lcnfLast = NULL;
if(pRcvBuf != NULL) {
free(pRcvBuf);
pRcvBuf = NULL;
@@ -614,8 +629,6 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
- statsobj.Destruct(&modStats);
-
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
@@ -678,13 +691,6 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
-
- /* support statistics gathering */
- CHKiRet(statsobj.Construct(&modStats));
- CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imudp")));
- CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"),
- ctrType_IntCtr, &ctrSubmit));
- CHKiRet(statsobj.ConstructFinalize(modStats));
ENDmodInit
/* vim:set ai:
*/