summaryrefslogtreecommitdiffstats
path: root/plugins/imudp/imudp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r--plugins/imudp/imudp.c208
1 files changed, 145 insertions, 63 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index e9e82b20..c7e8c1d4 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -40,6 +40,9 @@
#include "srUtils.h"
#include "errmsg.h"
#include "glbl.h"
+#include "msg.h"
+#include "parser.h"
+#include "datetime.h"
MODULE_TYPE_INPUT
@@ -50,6 +53,7 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
+DEFobjCurrIf(datetime)
static int iMaxLine; /* maximum UDP message size supported */
static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded
@@ -63,6 +67,8 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a
* it so that we can check available memory in willRun() and request
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
+#define TIME_REQUERY_DFLT 2
+static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
/* config settings */
@@ -130,20 +136,134 @@ finalize_it:
}
+/* This function is a helper to runInput. I have extracted it
+ * from the main loop just so that we do not have that large amount of code
+ * in a single place. This function takes a socket and pulls messages from
+ * it until the socket does not have any more waiting.
+ * rgerhards, 2008-01-08
+ * We try to read from the file descriptor until there
+ * is no more data. This is done in the hope to get better performance
+ * out of the system. However, this also means that a descriptor
+ * monopolizes processing while it contains data. This can lead to
+ * data loss in other descriptors. However, if the system is incapable of
+ * handling the workload, we will loss data in any case. So it doesn't really
+ * matter where the actual loss occurs - it is always random, because we depend
+ * on scheduling order. -- rgerhards, 2008-10-02
+ */
+static inline rsRetVal
+processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
+ uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP)
+{
+ DEFiRet;
+ int iNbrTimeUsed;
+ time_t ttGenTime;
+ struct syslogTime stTime;
+ socklen_t socklen;
+ ssize_t lenRcvBuf;
+ struct sockaddr_storage frominet;
+ msg_t *pMsg;
+ char errStr[1024];
+
+ iNbrTimeUsed = 0;
+ while(1) { /* loop is terminated if we have a bad receive, done below in the body */
+ socklen = sizeof(struct sockaddr_storage);
+ lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
+ if(lenRcvBuf < 0) {
+ if(errno != EINTR && errno != EAGAIN) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr);
+ errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet");
+ }
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ /* if we reach this point, we had a good receive and can process the packet received */
+ /* check if we have a different sender than before, if so, we need to query some new values */
+ if(memcmp(&frominet, frominetPrev, socklen) != 0) {
+ CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP));
+ memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */
+ /* Here we check if a host is permitted to send us
+ * syslog messages. If it isn't, we do not further
+ * process the message but log a warning (if we are
+ * configured to do this).
+ * rgerhards, 2005-09-26
+ */
+ *pbIsPermitted = net.isAllowedSender((uchar*)"UDP",
+ (struct sockaddr *)&frominet, (char*)fromHostFQDN);
+
+ if(!*pbIsPermitted) {
+ DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN);
+ if(glbl.GetOption_DisallowWarning) {
+ time_t tt;
+
+ time(&tt);
+ if(tt > ttLastDiscard + 60) {
+ ttLastDiscard = tt;
+ errmsg.LogError(0, NO_ERRCODE,
+ "UDP message from disallowed sender %s discarded",
+ (char*)fromHost);
+ }
+ }
+ }
+ }
+
+ DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf);
+
+ if(*pbIsPermitted) {
+ if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
+ datetime.getCurrTime(&stTime, &ttGenTime);
+ }
+ /* we now create our own message object and submit it to the queue */
+ CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
+ /* first trim the buffer to what we have actually received */
+ CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar)* lenRcvBuf));
+ memcpy(pMsg->pszRawMsg, pRcvBuf, lenRcvBuf);
+ pMsg->iLenRawMsg = lenRcvBuf;
+ MsgSetInputName(pMsg, "imudp");
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ pMsg->bParseHOSTNAME = 1;
+ MsgSetRcvFrom(pMsg, (char*)fromHost);
+ CHKiRet(MsgSetRcvFromIP(pMsg, fromHostIP));
+ CHKiRet(submitMsg(pMsg));
+ }
+ }
+
+
+finalize_it:
+ RETiRet;
+}
+
+
/* This function is called to gather input.
+ * Note that udpLstnSocks must be non-NULL because otherwise we would not have
+ * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
+ * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably
+ * highly efficient "name caching". Before querying a name, I now check if the name to be
+ * queried is the same as the one queried in the last message processed. If that is the
+ * case, we can simple re-use the previous value. This algorithm works quite well with
+ * few sender, especially if they emit messages in bursts. The more sender and the
+ * more intermixed messages arrive, the less this algorithm works, but the overhead
+ * is so minimal (a simple memory compare and move) that this does not hurt. Even
+ * with a real name lookup cache, this optimization here is useful as it is quicker
+ * than even a cache lookup).
*/
BEGINrunInput
int maxfds;
int nfds;
int i;
fd_set readfds;
- struct sockaddr_storage frominet;
- socklen_t socklen;
+ struct sockaddr_storage frominetPrev;
+ int bIsPermitted;
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
- ssize_t l;
CODESTARTrunInput
+ /* start "name caching" algo by making sure the previous system indicator
+ * is invalidated.
+ */
+ bIsPermitted = 0;
+ memset(&frominetPrev, 0, sizeof(frominetPrev));
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
@@ -158,17 +278,14 @@ CODESTARTrunInput
maxfds = 0;
FD_ZERO (&readfds);
- /* Add the UDP listen sockets to the list of read descriptors.
- */
- if(udpLstnSocks != NULL) {
- for (i = 0; i < *udpLstnSocks; i++) {
- if (udpLstnSocks[i+1] != -1) {
- if(Debug)
- net.debugListenInfo(udpLstnSocks[i+1], "UDP");
- FD_SET(udpLstnSocks[i+1], &readfds);
- if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
- }
- }
+ /* Add the UDP listen sockets to the list of read descriptors. */
+ for (i = 0; i < *udpLstnSocks; i++) {
+ if (udpLstnSocks[i+1] != -1) {
+ if(Debug)
+ net.debugListenInfo(udpLstnSocks[i+1], "UDP");
+ FD_SET(udpLstnSocks[i+1], &readfds);
+ if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
+ }
}
if(Debug) {
dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds);
@@ -181,53 +298,14 @@ CODESTARTrunInput
/* wait for io to become ready */
nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
- if(udpLstnSocks != NULL) {
- for (i = 0; nfds && i < *udpLstnSocks; i++) {
- if (FD_ISSET(udpLstnSocks[i+1], &readfds)) {
- socklen = sizeof(frominet);
- l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0,
- (struct sockaddr *)&frominet, &socklen);
- if (l > 0) {
- if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) {
- dbgprintf("Message from inetd socket: #%d, host: %s\n",
- udpLstnSocks[i+1], fromHost);
- /* Here we check if a host is permitted to send us
- * syslog messages. If it isn't, we do not further
- * process the message but log a warning (if we are
- * configured to do this).
- * rgerhards, 2005-09-26
- */
- if(net.isAllowedSender((uchar*) "UDP",
- (struct sockaddr *)&frominet, (char*)fromHostFQDN)) {
- parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l,
- MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp");
- } else {
- dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN);
- if(glbl.GetOption_DisallowWarning) {
- time_t tt;
-
- time(&tt);
- if(tt > ttLastDiscard + 60) {
- ttLastDiscard = tt;
- errmsg.LogError(0, NO_ERRCODE,
- "UDP message from disallowed sender %s discarded",
- (char*)fromHost);
- }
- }
- }
- }
- } else if (l < 0 && errno != EINTR && errno != EAGAIN) {
- char errStr[1024];
- rs_strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("INET socket error: %d = %s.\n", errno, errStr);
- errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet");
- /* should be harmless */
- sleep(1);
- }
- --nfds; /* indicate we have processed one */
- }
- }
- }
+ for(i = 0; nfds && i < *udpLstnSocks; i++) {
+ if(FD_ISSET(udpLstnSocks[i+1], &readfds)) {
+ processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
+ fromHost, fromHostFQDN, fromHostIP);
+ --nfds; /* indicate we have processed one descriptor */
+ }
+ }
+ /* end of a run, back to loop for next recv() */
}
return iRet;
@@ -272,6 +350,7 @@ CODESTARTmodExit
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
objRelease(net, LM_NET_FILENAME);
ENDmodExit
@@ -291,6 +370,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
net.closeUDPListenSockets(udpLstnSocks);
udpLstnSocks = NULL;
}
+ iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */
return RS_RET_OK;
}
@@ -301,6 +381,7 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
/* register config file handlers */
@@ -308,9 +389,10 @@ CODEmodInit_QueryRegCFSLineHdlr
addListner, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord,
NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt,
+ NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit
-/*
- * vi:set ai:
+/* vim:set ai:
*/