From 1b68464ddfac8abdca203f3918da98e7d6c54ad4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 2 Oct 2008 11:21:45 +0200 Subject: some code cleanup removed code not actually needed in imupd --- plugins/imudp/imudp.c | 98 +++++++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 50 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 5792a999..d4c9d759 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -127,6 +127,8 @@ finalize_it: /* This function is called to gather input. + * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 */ BEGINrunInput int maxfds; @@ -154,17 +156,14 @@ CODESTARTrunInput maxfds = 0; FD_ZERO (&readfds); - /* Add the UDP listen sockets to the list of read descriptors. - */ - if(udpLstnSocks != NULL) { - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { - if(Debug) - net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], &readfds); - if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; - } - } + /* Add the UDP listen sockets to the list of read descriptors. */ + for (i = 0; i < *udpLstnSocks; i++) { + if (udpLstnSocks[i+1] != -1) { + if(Debug) + net.debugListenInfo(udpLstnSocks[i+1], "UDP"); + FD_SET(udpLstnSocks[i+1], &readfds); + if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; + } } if(Debug) { dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds); @@ -177,46 +176,45 @@ CODESTARTrunInput /* wait for io to become ready */ nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); - if(udpLstnSocks != NULL) { - for (i = 0; nfds && i < *udpLstnSocks; i++) { - if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { - socklen = sizeof(frominet); - l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0, - (struct sockaddr *)&frominet, &socklen); - if (l > 0) { - if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { - dbgprintf("Message from inetd socket: #%d, host: %s\n", - udpLstnSocks[i+1], fromHost); - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - if(net.isAllowedSender(net.pAllowedSenders_UDP, - (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { - parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp"); - } else { - dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); - if(glbl.GetOption_DisallowWarning) { - errmsg.LogError(0, NO_ERRCODE, "UDP message from disallowed sender %s discarded", - (char*)fromHost); - } - } - } - } else if (l < 0 && errno != EINTR && errno != EAGAIN) { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); - /* should be harmless */ - sleep(1); + for (i = 0; nfds && i < *udpLstnSocks; i++) { + if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { + socklen = sizeof(frominet); + l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0, + (struct sockaddr *)&frominet, &socklen); + if (l > 0) { + if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { + dbgprintf("Message from inetd socket: #%d, host: %s\n", + udpLstnSocks[i+1], fromHost); + /* Here we check if a host is permitted to send us + * syslog messages. If it isn't, we do not further + * process the message but log a warning (if we are + * configured to do this). + * rgerhards, 2005-09-26 + */ + if(net.isAllowedSender(net.pAllowedSenders_UDP, + (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { + parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp"); + } else { + dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); + if(glbl.GetOption_DisallowWarning) { + errmsg.LogError(0, NO_ERRCODE, "UDP message from disallowed sender %s discarded", + (char*)fromHost); + } } - --nfds; /* indicate we have processed one */ - } - } - } + } + } else if (l < 0 && errno != EINTR && errno != EAGAIN) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + dbgprintf("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); + /* should be harmless */ + sleep(1); + } + --nfds; /* indicate we have processed one */ + } + } + /* end of a run, back to loop for next recv() */ } return iRet; -- cgit From 6290cc9056dacad3ff80945408c4caad240002b0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 2 Oct 2008 11:44:50 +0200 Subject: performance-optimized imudp --- plugins/imudp/imudp.c | 76 +++++++++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 32 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index d4c9d759..f4830a47 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -179,39 +179,51 @@ CODESTARTrunInput for (i = 0; nfds && i < *udpLstnSocks; i++) { if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { socklen = sizeof(frominet); - l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0, - (struct sockaddr *)&frominet, &socklen); - if (l > 0) { - if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { - dbgprintf("Message from inetd socket: #%d, host: %s\n", - udpLstnSocks[i+1], fromHost); - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - if(net.isAllowedSender(net.pAllowedSenders_UDP, - (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { - parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp"); - } else { - dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); - if(glbl.GetOption_DisallowWarning) { - errmsg.LogError(0, NO_ERRCODE, "UDP message from disallowed sender %s discarded", - (char*)fromHost); - } + do { + /* we now try to read from the file descriptor until there + * is no more data. This is done in the hope to get better performance + * out of the system. However, this also means that a descriptor + * monopolizes processing while it contains data. This can lead to + * data loss in other descriptors. However, if the system is incapable of + * handling the workload, we will loss data in any case. So it doesn't really + * matter where the actual loss occurs - it is always random, because we depend + * on scheduling order. -- rgerhards, 2008-10-02 + */ + l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0, + (struct sockaddr *)&frominet, &socklen); + if(l > 0) { + if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { + dbgprintf("Message from inetd socket: #%d, host: %s\n", + udpLstnSocks[i+1], fromHost); + /* Here we check if a host is permitted to send us + * syslog messages. If it isn't, we do not further + * process the message but log a warning (if we are + * configured to do this). + * rgerhards, 2005-09-26 + */ + if(net.isAllowedSender(net.pAllowedSenders_UDP, + (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { + parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp"); + } else { + dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); + if(glbl.GetOption_DisallowWarning) { + errmsg.LogError(0, NO_ERRCODE, "UDP message from disallowed sender %s discarded", + (char*)fromHost); + } + } } - } - } else if (l < 0 && errno != EINTR && errno != EAGAIN) { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); - /* should be harmless */ - sleep(1); - } - --nfds; /* indicate we have processed one */ + } else if(l < 0 && errno != EINTR && errno != EAGAIN) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + dbgprintf("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); + /* should be harmless */ + sleep(1); + } + } while(l > 0); /* Warning: do ... while()! */ + + --nfds; /* indicate we have processed one descriptor */ } } /* end of a run, back to loop for next recv() */ -- cgit From 65f74a712b111f946faaddd0583a6c7b22bd062f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Oct 2008 14:25:23 +0200 Subject: added $UDPServerTimeRequery option ...which enables to work with less acurate timestamps in favor of performance. This enables querying of the time only every n-th time if imudp is running in the tight receive loop (aka receiving messsages at a high rate) --- plugins/imudp/imudp.c | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index f4830a47..1865d777 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -40,6 +40,7 @@ #include "srUtils.h" #include "errmsg.h" #include "glbl.h" +#include "datetime.h" MODULE_TYPE_INPUT @@ -50,6 +51,7 @@ DEF_IMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(net) +DEFobjCurrIf(datetime) static int iMaxLine; /* maximum UDP message size supported */ static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements @@ -59,6 +61,8 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ +#define TIME_REQUERY_DFLT 2 +static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ /* config settings */ @@ -141,6 +145,8 @@ BEGINrunInput uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; ssize_t l; + struct syslogTime stTime; + int iNbrTimeUsed; CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, @@ -179,6 +185,7 @@ CODESTARTrunInput for (i = 0; nfds && i < *udpLstnSocks; i++) { if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { socklen = sizeof(frominet); + iNbrTimeUsed = 0; do { /* we now try to read from the file descriptor until there * is no more data. This is done in the hope to get better performance @@ -203,8 +210,11 @@ CODESTARTrunInput */ if(net.isAllowedSender(net.pAllowedSenders_UDP, (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { + if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { + datetime.getCurrTime(&stTime); + } parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp"); + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", &stTime); } else { dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); if(glbl.GetOption_DisallowWarning) { @@ -274,6 +284,7 @@ CODESTARTmodExit /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); ENDmodExit @@ -293,6 +304,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a net.closeUDPListenSockets(udpLstnSocks); udpLstnSocks = NULL; } + iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; } @@ -303,6 +315,7 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); /* register config file handlers */ @@ -310,9 +323,10 @@ CODEmodInit_QueryRegCFSLineHdlr addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, + NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit -/* - * vi:set ai: +/* vim:set ai: */ -- cgit From 7b3c05da9f126063384c80e9dd6fd5b2ae610074 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Oct 2008 10:28:44 +0200 Subject: simple (yet efficient) name caching added to imudp --- plugins/imudp/imudp.c | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 1865d777..d13314e8 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -133,6 +133,15 @@ finalize_it: /* This function is called to gather input. * Note that udpLstnSocks must be non-NULL because otherwise we would not have * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 + * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably + * highly efficient "name caching". Before querying a name, I now check if the name to be + * queried is the same as the one queried in the last message processed. If that is the + * case, we can simple re-use the previous value. This algorithm works quite well with + * few sender, especially if they emit messages in bursts. The more sender and the + * more intermixed messages arrive, the less this algorithm works, but the overhead + * is so minimal (a simple memory compare and move) that this does not hurt. Even + * with a real name lookup cache, this optimization here is useful as it is quicker + * than even a cache lookup). */ BEGINrunInput int maxfds; @@ -140,6 +149,7 @@ BEGINrunInput int i; fd_set readfds; struct sockaddr_storage frominet; + struct sockaddr_storage frominetPrev; socklen_t socklen; uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; @@ -148,6 +158,10 @@ BEGINrunInput struct syslogTime stTime; int iNbrTimeUsed; CODESTARTrunInput + /* start "name caching" algo by making sure the previous system indicator + * is invalidated. + */ + memset(&frominetPrev, 0, sizeof(frominetPrev)); /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. @@ -184,7 +198,7 @@ CODESTARTrunInput for (i = 0; nfds && i < *udpLstnSocks; i++) { if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { - socklen = sizeof(frominet); + socklen = sizeof(frominet); iNbrTimeUsed = 0; do { /* we now try to read from the file descriptor until there @@ -199,7 +213,9 @@ CODESTARTrunInput l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(l > 0) { - if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { + if(memcmp(&frominet, &frominetPrev, socklen) == 0 || + net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { + memcpy(&frominetPrev, &frominet, socklen); dbgprintf("Message from inetd socket: #%d, host: %s\n", udpLstnSocks[i+1], fromHost); /* Here we check if a host is permitted to send us -- cgit From 8528344ef58b5d2907bba8809f63d0bca2ce8d38 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Oct 2008 14:26:41 +0200 Subject: "output" timestamp now taken from mesg's time generated This enhances performance and, as some have pointed out, is probably also more consistent with what users expect how the various output-timestamp related function should work. This commit needs some more testing. --- plugins/imudp/imudp.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index d13314e8..d58a0d8e 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -155,6 +155,7 @@ BEGINrunInput uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; ssize_t l; + time_t ttGenTime; struct syslogTime stTime; int iNbrTimeUsed; CODESTARTrunInput @@ -227,10 +228,11 @@ CODESTARTrunInput if(net.isAllowedSender(net.pAllowedSenders_UDP, (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { - datetime.getCurrTime(&stTime); + datetime.getCurrTime(&stTime, &ttGenTime); } parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", &stTime); + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", + &stTime, ttGenTime); } else { dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); if(glbl.GetOption_DisallowWarning) { -- cgit From 82b583c4f99dd9beb30360f222c4d2a1152f75e1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 8 Oct 2008 14:56:02 +0200 Subject: restructured imudp receive loop cleaned up previous code and redid it in a way that makes it much easier to extend it also added a new macro DBGPRINTF which is a performance-optimzed version of dbgprintf --- plugins/imudp/imudp.c | 145 +++++++++++++++++++++++++++++--------------------- 1 file changed, 85 insertions(+), 60 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index d58a0d8e..7f9afb68 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -130,6 +130,85 @@ finalize_it: } +/* This function is a helper to runInput. I have extracted it + * from the main loop just so that we do not have that large amount of code + * in a single place. This function takes a socket and pulls messages from + * it until the socket does not have any more waiting. + * rgerhards, 2008-01-08 + * We try to read from the file descriptor until there + * is no more data. This is done in the hope to get better performance + * out of the system. However, this also means that a descriptor + * monopolizes processing while it contains data. This can lead to + * data loss in other descriptors. However, if the system is incapable of + * handling the workload, we will loss data in any case. So it doesn't really + * matter where the actual loss occurs - it is always random, because we depend + * on scheduling order. -- rgerhards, 2008-10-02 + */ +static inline rsRetVal +processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, + uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP) +{ + DEFiRet; + int iNbrTimeUsed; + time_t ttGenTime; + struct syslogTime stTime; + socklen_t socklen; + ssize_t l; + struct sockaddr_storage frominet; + char errStr[1024]; + + iNbrTimeUsed = 0; + while(1) { /* loop is terminated if we have a bad receive, done below in the body */ + socklen = sizeof(struct sockaddr_storage); + l = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + if(l < 0) { + if(errno != EINTR && errno != EAGAIN) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); + } + ABORT_FINALIZE(RS_RET_ERR); + } + + /* if we reach this point, we had a good receive and can process the packet received */ + /* check if we have a different sender than before, if so, we need to query some new values */ + if(memcmp(&frominet, frominetPrev, socklen) != 0) { + CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); + memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ + /* Here we check if a host is permitted to send us + * syslog messages. If it isn't, we do not further + * process the message but log a warning (if we are + * configured to do this). + * rgerhards, 2005-09-26 + */ + *pbIsPermitted = net.isAllowedSender(net.pAllowedSenders_UDP, + (struct sockaddr *)&frominet, (char*)fromHostFQDN); + + if(!*pbIsPermitted) { + DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN); + if(glbl.GetOption_DisallowWarning) { + errmsg.LogError(0, NO_ERRCODE, "UDP message from disallowed sender %s discarded", + (char*)fromHost); + } + } + } + + DBGPRINTF("Message from inetd socket: #%d, host: %s, isPermitted: %d\n", fd, fromHost, *pbIsPermitted); + if(*pbIsPermitted) { + if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + } + parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", &stTime, ttGenTime); + } + } + + +finalize_it: + RETiRet; +} + + /* This function is called to gather input. * Note that udpLstnSocks must be non-NULL because otherwise we would not have * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 @@ -148,20 +227,16 @@ BEGINrunInput int nfds; int i; fd_set readfds; - struct sockaddr_storage frominet; struct sockaddr_storage frominetPrev; - socklen_t socklen; + int bIsPermitted; uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; - ssize_t l; - time_t ttGenTime; - struct syslogTime stTime; - int iNbrTimeUsed; CODESTARTrunInput /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ + bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, @@ -198,61 +273,11 @@ CODESTARTrunInput nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); for (i = 0; nfds && i < *udpLstnSocks; i++) { - if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { - socklen = sizeof(frominet); - iNbrTimeUsed = 0; - do { - /* we now try to read from the file descriptor until there - * is no more data. This is done in the hope to get better performance - * out of the system. However, this also means that a descriptor - * monopolizes processing while it contains data. This can lead to - * data loss in other descriptors. However, if the system is incapable of - * handling the workload, we will loss data in any case. So it doesn't really - * matter where the actual loss occurs - it is always random, because we depend - * on scheduling order. -- rgerhards, 2008-10-02 - */ - l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0, - (struct sockaddr *)&frominet, &socklen); - if(l > 0) { - if(memcmp(&frominet, &frominetPrev, socklen) == 0 || - net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { - memcpy(&frominetPrev, &frominet, socklen); - dbgprintf("Message from inetd socket: #%d, host: %s\n", - udpLstnSocks[i+1], fromHost); - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - if(net.isAllowedSender(net.pAllowedSenders_UDP, - (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { - if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { - datetime.getCurrTime(&stTime, &ttGenTime); - } - parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", - &stTime, ttGenTime); - } else { - dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); - if(glbl.GetOption_DisallowWarning) { - errmsg.LogError(0, NO_ERRCODE, "UDP message from disallowed sender %s discarded", - (char*)fromHost); - } - } - } - } else if(l < 0 && errno != EINTR && errno != EAGAIN) { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); - /* should be harmless */ - sleep(1); - } - } while(l > 0); /* Warning: do ... while()! */ - - --nfds; /* indicate we have processed one descriptor */ + if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { + processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, + fromHost, fromHostFQDN, fromHostIP); } + --nfds; /* indicate we have processed one descriptor */ } /* end of a run, back to loop for next recv() */ } -- cgit From ace4f2f75202aec39449dac11b9eb1deca7428d7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 8 Oct 2008 18:55:11 +0200 Subject: reordered imudp processing. Message parsing is now done as part of main message queue worker processing (was part of the input thread) This should also improve performance, as potentially more work is done in parallel. --- plugins/imudp/imudp.c | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 7f9afb68..aab201a9 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -40,6 +40,8 @@ #include "srUtils.h" #include "errmsg.h" #include "glbl.h" +#include "msg.h" +#include "parser.h" #include "datetime.h" MODULE_TYPE_INPUT @@ -153,15 +155,16 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, time_t ttGenTime; struct syslogTime stTime; socklen_t socklen; - ssize_t l; + ssize_t lenRcvBuf; struct sockaddr_storage frominet; + msg_t *pMsg; char errStr[1024]; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ socklen = sizeof(struct sockaddr_storage); - l = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); - if(l < 0) { + lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); @@ -193,13 +196,25 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, } } - DBGPRINTF("Message from inetd socket: #%d, host: %s, isPermitted: %d\n", fd, fromHost, *pbIsPermitted); + DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + if(*pbIsPermitted) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { datetime.getCurrTime(&stTime, &ttGenTime); } - parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", &stTime, ttGenTime); + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); + /* first trim the buffer to what we have actually received */ + CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar)* lenRcvBuf)); + memcpy(pMsg->pszRawMsg, pRcvBuf, lenRcvBuf); + pMsg->iLenRawMsg = lenRcvBuf; + MsgSetInputName(pMsg, "imudp"); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + pMsg->bParseHOSTNAME = MSG_PARSE_HOSTNAME; + pMsg->msgFlags = NOFLAG; + MsgSetRcvFrom(pMsg, (char*)fromHost); + CHKiRet(MsgSetRcvFromIP(pMsg, fromHostIP)); + CHKiRet(submitMsg(pMsg)); } } -- cgit From 634cfdbe8ec7182b7fb74b184d269697e5fcab6c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 10:01:30 +0200 Subject: restoring msg parsing for imudp I tried to work too quick this morning. A side-effect of an earlier change was that no UDP messages were parsed, which lead to their loss, because no PRI was set in this case. --- plugins/imudp/imudp.c | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index aab201a9..114b433c 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -207,6 +207,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, /* first trim the buffer to what we have actually received */ CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar)* lenRcvBuf)); memcpy(pMsg->pszRawMsg, pRcvBuf, lenRcvBuf); + pMsg->bIsParsed = 0; /* indicate message needs to be parsed */ pMsg->iLenRawMsg = lenRcvBuf; MsgSetInputName(pMsg, "imudp"); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); -- cgit From 6c6e9a0f3f7d454ba9553a750b195d7f99c7299a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 13:45:56 +0200 Subject: moved bParseHostname and bIsParsed to msgFlags This enables us to use more efficient calling conventions and also helps us keep the on-disk structure of a msg object more consistent in future releases. --- plugins/imudp/imudp.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 114b433c..a49378cf 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -207,12 +207,10 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, /* first trim the buffer to what we have actually received */ CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar)* lenRcvBuf)); memcpy(pMsg->pszRawMsg, pRcvBuf, lenRcvBuf); - pMsg->bIsParsed = 0; /* indicate message needs to be parsed */ pMsg->iLenRawMsg = lenRcvBuf; MsgSetInputName(pMsg, "imudp"); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - pMsg->bParseHOSTNAME = MSG_PARSE_HOSTNAME; - pMsg->msgFlags = NOFLAG; + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, (char*)fromHost); CHKiRet(MsgSetRcvFromIP(pMsg, fromHostIP)); CHKiRet(submitMsg(pMsg)); -- cgit From 128edc1598a13c894fe3853673d1231b9feafc39 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 8 Dec 2008 13:31:55 +0100 Subject: bugfix: imudp went into an endless loop under some circumstances (but could also leave it under some other circumstances...) Thanks to David Lang and speedfox for reporting this issue. --- plugins/imudp/imudp.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 037da56d..0193ac06 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -297,12 +297,12 @@ CODESTARTrunInput /* wait for io to become ready */ nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); - for (i = 0; nfds && i < *udpLstnSocks; i++) { - if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { + for(i = 0; nfds && i < *udpLstnSocks; i++) { + if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP); - } --nfds; /* indicate we have processed one descriptor */ + } } /* end of a run, back to loop for next recv() */ } -- cgit From 60b8ce14bf33e76237cf82dd1f68acc750e64316 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 8 Dec 2008 15:42:47 +0100 Subject: added $PreserveFQDN config file directive Enables to use FQDNs in sender names where the legacy default --- plugins/imudp/imudp.c | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0193ac06..72450513 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -181,6 +181,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, /* check if we have a different sender than before, if so, we need to query some new values */ if(memcmp(&frominet, frominetPrev, socklen) != 0) { CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); +DBGPRINTF("returned: fromHost '%s', FQDN: '%s'\n", fromHost, fromHostFQDN); memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ /* Here we check if a host is permitted to send us * syslog messages. If it isn't, we do not further -- cgit From 59192611db992e7357337beb8e68ec6cee5b3fec Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Mar 2009 22:36:40 +0100 Subject: bugfix: parser did not correctly parse fields in UDP-received messages --- plugins/imudp/imudp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 72450513..c7e8c1d4 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -181,7 +181,6 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, /* check if we have a different sender than before, if so, we need to query some new values */ if(memcmp(&frominet, frominetPrev, socklen) != 0) { CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); -DBGPRINTF("returned: fromHost '%s', FQDN: '%s'\n", fromHost, fromHostFQDN); memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ /* Here we check if a host is permitted to send us * syslog messages. If it isn't, we do not further @@ -223,6 +222,7 @@ DBGPRINTF("returned: fromHost '%s', FQDN: '%s'\n", fromHost, fromHostFQDN); MsgSetInputName(pMsg, "imudp"); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + pMsg->bParseHOSTNAME = 1; MsgSetRcvFrom(pMsg, (char*)fromHost); CHKiRet(MsgSetRcvFromIP(pMsg, fromHostIP)); CHKiRet(submitMsg(pMsg)); -- cgit