diff options
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r-- | plugins/imudp/imudp.c | 260 |
1 files changed, 195 insertions, 65 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 79d51263..f8555f00 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -40,6 +40,11 @@ #include "srUtils.h" #include "errmsg.h" #include "glbl.h" +#include "msg.h" +#include "parser.h" +#include "datetime.h" +#include "prop.h" +#include "unicode-helper.h" MODULE_TYPE_INPUT @@ -50,6 +55,8 @@ DEF_IMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(net) +DEFobjCurrIf(datetime) +DEFobjCurrIf(prop) static int iMaxLine; /* maximum UDP message size supported */ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded @@ -63,6 +70,10 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ +static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ +// TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ +#define TIME_REQUERY_DFLT 2 +static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ /* config settings */ @@ -90,7 +101,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) else bindAddr = pszBindAddr; - dbgprintf("Trying to open syslog UDP ports at %s:%s.\n", + DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal); newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1); @@ -130,20 +141,164 @@ finalize_it: } +#if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */ +/* accept a new ruleset to bind. Checks if it exists and complains, if not */ +static rsRetVal +setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) +{ + ruleset_t *pRuleset; + rsRetVal localRet; + DEFiRet; + + localRet = ruleset.GetRuleset(&pRuleset, pszName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, NO_ERRCODE, "error: ruleset '%s' not found - ignored", pszName); + } + CHKiRet(localRet); + pBindRuleset = pRuleset; + DBGPRINTF("imudp current bind ruleset %p: '%s'\n", pRuleset, pszName); + +finalize_it: + free(pszName); /* no longer needed */ + RETiRet; +} +#endif + + +/* This function is a helper to runInput. I have extracted it + * from the main loop just so that we do not have that large amount of code + * in a single place. This function takes a socket and pulls messages from + * it until the socket does not have any more waiting. + * rgerhards, 2008-01-08 + * We try to read from the file descriptor until there + * is no more data. This is done in the hope to get better performance + * out of the system. However, this also means that a descriptor + * monopolizes processing while it contains data. This can lead to + * data loss in other descriptors. However, if the system is incapable of + * handling the workload, we will loss data in any case. So it doesn't really + * matter where the actual loss occurs - it is always random, because we depend + * on scheduling order. -- rgerhards, 2008-10-02 + */ +static inline rsRetVal +processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, + uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP) +{ + DEFiRet; + int iNbrTimeUsed; + time_t ttGenTime; + struct syslogTime stTime; + socklen_t socklen; + ssize_t lenRcvBuf; + struct sockaddr_storage frominet; + msg_t *pMsg; + prop_t *propFromHost = NULL; + prop_t *propFromHostIP = NULL; + char errStr[1024]; + + iNbrTimeUsed = 0; + while(1) { /* loop is terminated if we have a bad receive, done below in the body */ + socklen = sizeof(struct sockaddr_storage); + lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + if(lenRcvBuf < 0) { + if(errno != EINTR && errno != EAGAIN) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); + } + ABORT_FINALIZE(RS_RET_ERR); + } + + if(lenRcvBuf == 0) + continue; /* this looks a bit strange, but practice shows it happens... */ + + /* if we reach this point, we had a good receive and can process the packet received */ + /* check if we have a different sender than before, if so, we need to query some new values */ + if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { + CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); + memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ + /* Here we check if a host is permitted to send us + * syslog messages. If it isn't, we do not further + * process the message but log a warning (if we are + * configured to do this). + * rgerhards, 2005-09-26 + */ + *pbIsPermitted = net.isAllowedSender((uchar*)"UDP", + (struct sockaddr *)&frominet, (char*)fromHostFQDN); + + if(!*pbIsPermitted) { + DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN); + if(glbl.GetOption_DisallowWarning) { + time_t tt; + + time(&tt); + if(tt > ttLastDiscard + 60) { + ttLastDiscard = tt; + errmsg.LogError(0, NO_ERRCODE, + "UDP message from disallowed sender %s discarded", + (char*)fromHost); + } + } + } + } + + DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + + if(*pbIsPermitted) { + if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + } + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); + MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); + MsgSetInputName(pMsg, pInputName); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + pMsg->bParseHOSTNAME = 1; + MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); + CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); + CHKiRet(submitMsg(pMsg)); + } + } + +finalize_it: + if(propFromHost != NULL) + prop.Destruct(&propFromHost); + if(propFromHostIP != NULL) + prop.Destruct(&propFromHostIP); + + RETiRet; +} + + /* This function is called to gather input. + * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 + * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably + * highly efficient "name caching". Before querying a name, I now check if the name to be + * queried is the same as the one queried in the last message processed. If that is the + * case, we can simple re-use the previous value. This algorithm works quite well with + * few sender, especially if they emit messages in bursts. The more sender and the + * more intermixed messages arrive, the less this algorithm works, but the overhead + * is so minimal (a simple memory compare and move) that this does not hurt. Even + * with a real name lookup cache, this optimization here is useful as it is quicker + * than even a cache lookup). */ BEGINrunInput int maxfds; int nfds; int i; fd_set readfds; - struct sockaddr_storage frominet; - socklen_t socklen; + struct sockaddr_storage frominetPrev; + int bIsPermitted; uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; - ssize_t l; CODESTARTrunInput + /* start "name caching" algo by making sure the previous system indicator + * is invalidated. + */ + bIsPermitted = 0; + memset(&frominetPrev, 0, sizeof(frominetPrev)); /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. @@ -158,17 +313,14 @@ CODESTARTrunInput maxfds = 0; FD_ZERO (&readfds); - /* Add the UDP listen sockets to the list of read descriptors. - */ - if(udpLstnSocks != NULL) { - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { - if(Debug) - net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], &readfds); - if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; - } - } + /* Add the UDP listen sockets to the list of read descriptors. */ + for (i = 0; i < *udpLstnSocks; i++) { + if (udpLstnSocks[i+1] != -1) { + if(Debug) + net.debugListenInfo(udpLstnSocks[i+1], "UDP"); + FD_SET(udpLstnSocks[i+1], &readfds); + if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; + } } if(Debug) { dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds); @@ -181,53 +333,14 @@ CODESTARTrunInput /* wait for io to become ready */ nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); - if(udpLstnSocks != NULL) { - for (i = 0; nfds && i < *udpLstnSocks; i++) { - if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { - socklen = sizeof(frominet); - l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0, - (struct sockaddr *)&frominet, &socklen); - if (l > 0) { - if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) { - dbgprintf("Message from inetd socket: #%d, host: %s\n", - udpLstnSocks[i+1], fromHost); - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - if(net.isAllowedSender((uchar*) "UDP", - (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { - parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp"); - } else { - dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); - if(glbl.GetOption_DisallowWarning) { - time_t tt; - - time(&tt); - if(tt > ttLastDiscard + 60) { - ttLastDiscard = tt; - errmsg.LogError(0, NO_ERRCODE, - "UDP message from disallowed sender %s discarded", - (char*)fromHost); - } - } - } - } - } else if (l < 0 && errno != EINTR && errno != EAGAIN) { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); - /* should be harmless */ - sleep(1); - } - --nfds; /* indicate we have processed one */ - } - } - } + for(i = 0; nfds && i < *udpLstnSocks; i++) { + if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { + processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, + fromHost, fromHostFQDN, fromHostIP); + --nfds; /* indicate we have processed one descriptor */ + } + } + /* end of a run, back to loop for next recv() */ } return iRet; @@ -237,6 +350,11 @@ ENDrunInput /* initialize and return if will run or not */ BEGINwillRun CODESTARTwillRun + /* we need to create the inputName property (only once during our lifetime) */ + CHKiRet(prop.Construct(&pInputName)); + CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imudp"), sizeof("imudp") - 1)); + CHKiRet(prop.ConstructFinalize(pInputName)); + net.PrintAllowedSenders(1); /* UDP */ /* if we could not set up any listners, there is no point in running... */ @@ -264,6 +382,8 @@ CODESTARTafterRun free(pRcvBuf); pRcvBuf = NULL; } + if(pInputName != NULL) + prop.Destruct(&pInputName); ENDafterRun @@ -272,6 +392,8 @@ CODESTARTmodExit /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); ENDmodExit @@ -291,6 +413,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a net.closeUDPListenSockets(udpLstnSocks); udpLstnSocks = NULL; } + iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; } @@ -301,16 +424,23 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); /* register config file handlers */ + /* TODO: add - but this requires more changes, no time right now... + CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord, + setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord, addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, + NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit -/* - * vi:set ai: +/* vim:set ai: */ |