summaryrefslogtreecommitdiffstats
path: root/plugins/imudp/imudp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r--plugins/imudp/imudp.c260
1 files changed, 178 insertions, 82 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index ecc36c28..07a07d74 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -32,6 +32,9 @@
#include <errno.h>
#include <unistd.h>
#include <netdb.h>
+#if HAVE_SYS_EPOLL_H
+# include <sys/epoll.h>
+#endif
#include "rsyslog.h"
#include "dirty.h"
#include "net.h"
@@ -44,8 +47,8 @@
#include "parser.h"
#include "datetime.h"
#include "prop.h"
+#include "ruleset.h"
#include "unicode-helper.h"
-#include "unlimited_select.h"
MODULE_TYPE_INPUT
@@ -58,7 +61,9 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
DEFobjCurrIf(datetime)
DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
+static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded
* This shall prevent remote DoS when the "discard on disallowed sender"
@@ -66,13 +71,14 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte
*/
static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements
* read-only after init(), but beware of restart! */
+static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */
static uchar *pszBindAddr = NULL; /* IP to bind socket to */
static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc
* it so that we can check available memory in willRun() and request
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
-// TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
+static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
#define TIME_REQUERY_DFLT 2
static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
@@ -91,6 +97,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
int *newSocks;
int *tmpSocks;
int iSrc, iDst;
+ ruleset_t **tmpRulesets;
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -111,26 +118,39 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
if(udpLstnSocks == NULL) {
/* esay, we can just replace it */
udpLstnSocks = newSocks;
+ CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1)));
+ for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst)
+ udpRulesets[iDst] = pBindRuleset;
} else {
/* we need to add them */
- if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) {
- dbgprintf("out of memory trying to allocate udp listen socket array\n");
+ tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]));
+ tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0]));
+ if(tmpSocks == NULL || tmpRulesets == NULL) {
+ DBGPRINTF("out of memory trying to allocate udp listen socket array\n");
/* in this case, we discard the new sockets but continue with what we
* already have
*/
free(newSocks);
+ free(tmpSocks);
+ free(tmpRulesets);
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
} else {
/* ready to copy */
iDst = 1;
- for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc)
- tmpSocks[iDst++] = udpLstnSocks[iSrc];
- for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc)
- tmpSocks[iDst++] = newSocks[iSrc];
+ for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) {
+ tmpSocks[iDst] = udpLstnSocks[iSrc];
+ tmpRulesets[iDst] = udpRulesets[iSrc];
+ }
+ for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) {
+ tmpSocks[iDst] = newSocks[iSrc];
+ tmpRulesets[iDst] = pBindRuleset;
+ }
tmpSocks[0] = udpLstnSocks[0] + newSocks[0];
free(newSocks);
free(udpLstnSocks);
udpLstnSocks = tmpSocks;
+ free(udpRulesets);
+ udpRulesets = tmpRulesets;
}
}
}
@@ -142,7 +162,6 @@ finalize_it:
}
-#if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */
/* accept a new ruleset to bind. Checks if it exists and complains, if not */
static rsRetVal
setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
@@ -163,7 +182,6 @@ finalize_it:
free(pszName); /* no longer needed */
RETiRet;
}
-#endif
/* This function is a helper to runInput. I have extracted it
@@ -181,8 +199,8 @@ finalize_it:
* on scheduling order. -- rgerhards, 2008-10-02
*/
static inline rsRetVal
-processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
- uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP)
+processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
+ ruleset_t *pRuleset)
{
DEFiRet;
int iNbrTimeUsed;
@@ -196,8 +214,11 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
prop_t *propFromHostIP = NULL;
char errStr[1024];
+ assert(pThrd != NULL);
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a bad receive, done below in the body */
+ if(pThrd->bShallStop == TRUE)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
socklen = sizeof(struct sockaddr_storage);
lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
if(lenRcvBuf < 0) {
@@ -206,7 +227,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr);
errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet");
}
- ABORT_FINALIZE(RS_RET_ERR);
+ ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller!
}
if(lenRcvBuf == 0)
@@ -214,37 +235,39 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
/* if we reach this point, we had a good receive and can process the packet received */
/* check if we have a different sender than before, if so, we need to query some new values */
- if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) {
- CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP));
- memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */
- /* Here we check if a host is permitted to send us
- * syslog messages. If it isn't, we do not further
- * process the message but log a warning (if we are
- * configured to do this).
- * rgerhards, 2005-09-26
- */
- *pbIsPermitted = net.isAllowedSender((uchar*)"UDP",
- (struct sockaddr *)&frominet, (char*)fromHostFQDN);
-
- if(!*pbIsPermitted) {
- DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN);
- if(glbl.GetOption_DisallowWarning) {
- time_t tt;
-
- time(&tt);
- if(tt > ttLastDiscard + 60) {
- ttLastDiscard = tt;
- errmsg.LogError(0, NO_ERRCODE,
- "UDP message from disallowed sender %s discarded",
- (char*)fromHost);
+ if(bDoACLCheck) {
+ if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) {
+ memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */
+ /* Here we check if a host is permitted to send us syslog messages. If it isn't,
+ * we do not further process the message but log a warning (if we are
+ * configured to do this). However, if the check would require name resolution,
+ * it is postponed to the main queue. See also my blog post at
+ * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html
+ * rgerhards, 2009-11-16
+ */
+ *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP",
+ (struct sockaddr *)&frominet, "", 0);
+
+ if(*pbIsPermitted == 0) {
+ DBGPRINTF("msg is not from an allowed sender\n");
+ if(glbl.GetOption_DisallowWarning) {
+ time_t tt;
+ datetime.GetTime(&tt);
+ if(tt > ttLastDiscard + 60) {
+ ttLastDiscard = tt;
+ errmsg.LogError(0, NO_ERRCODE,
+ "UDP message from disallowed sender discarded");
+ }
}
}
}
+ } else {
+ *pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("recv(%d,%d),acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
- if(*pbIsPermitted) {
+ if(*pbIsPermitted != 0) {
if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
datetime.getCurrTime(&stTime, &ttGenTime);
}
@@ -252,11 +275,12 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
+ MsgSetRuleset(pMsg, pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
- pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
- pMsg->bParseHOSTNAME = 1;
- MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
- CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL;
+ if(*pbIsPermitted == 2)
+ pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
+ CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
CHKiRet(submitMsg(pMsg));
}
}
@@ -271,20 +295,85 @@ finalize_it:
}
-/* This function is called to gather input.
- * Note that udpLstnSocks must be non-NULL because otherwise we would not have
- * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
- * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably
- * highly efficient "name caching". Before querying a name, I now check if the name to be
- * queried is the same as the one queried in the last message processed. If that is the
- * case, we can simple re-use the previous value. This algorithm works quite well with
- * few sender, especially if they emit messages in bursts. The more sender and the
- * more intermixed messages arrive, the less this algorithm works, but the overhead
- * is so minimal (a simple memory compare and move) that this does not hurt. Even
- * with a real name lookup cache, this optimization here is useful as it is quicker
- * than even a cache lookup).
+/* This function implements the main reception loop. Depending on the environment,
+ * we either use the traditional (but slower) select() or the Linux-specific epoll()
+ * interface. ./configure settings control which one is used.
+ * rgerhards, 2009-09-09
*/
-BEGINrunInput
+#if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE)
+#define NUM_EPOLL_EVENTS 10
+rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
+{
+ DEFiRet;
+ int nfds;
+ int efd;
+ int i;
+ struct sockaddr_storage frominetPrev;
+ int bIsPermitted;
+ struct epoll_event *udpEPollEvt = NULL;
+ struct epoll_event currEvt[NUM_EPOLL_EVENTS];
+ char errStr[1024];
+
+ /* start "name caching" algo by making sure the previous system indicator
+ * is invalidated.
+ */
+ bIsPermitted = 0;
+ memset(&frominetPrev, 0, sizeof(frominetPrev));
+
+ CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event)));
+
+# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ DBGPRINTF("imudp uses epoll_create1()\n");
+ efd = epoll_create1(EPOLL_CLOEXEC);
+# else
+ DBGPRINTF("imudp uses epoll_create()\n");
+ efd = epoll_create(NUM_EPOLL_EVENTS);
+# endif
+ if(efd < 0) {
+ DBGPRINTF("epoll_create1() could not create fd\n");
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+
+ /* fill the epoll set - we need to do this only once, as the set
+ * can not change dyamically.
+ */
+ for (i = 0; i < *udpLstnSocks; i++) {
+ if (udpLstnSocks[i+1] != -1) {
+ udpEPollEvt[i].events = EPOLLIN | EPOLLET;
+ udpEPollEvt[i].data.u64 = i+1;
+ if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n",
+ udpLstnSocks[i+1], errStr);
+ }
+ }
+ }
+
+ while(1) {
+ /* wait for io to become ready */
+ nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1);
+ DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds);
+
+ if(pThrd->bShallStop == TRUE)
+ break; /* terminate input! */
+
+ for(i = 0 ; i < nfds ; ++i) {
+ processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted,
+ udpRulesets[currEvt[i].data.u64]);
+ }
+ }
+
+finalize_it:
+ if(udpEPollEvt != NULL)
+ free(udpEPollEvt);
+
+ RETiRet;
+}
+#else /* #if HAVE_EPOLL_CREATE1 */
+/* this is the code for the select() interface */
+rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
+{
+ DEFiRet;
int maxfds;
int nfds;
int i;
@@ -293,23 +382,14 @@ BEGINrunInput
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
-#ifdef USE_UNLIMITED_SELECT
- fd_set *pReadfds = malloc(glbl.GetFdSetSize());
-#else
- fd_set readfds;
- fd_set *pReadfds = &readfds;
-#endif
-CODESTARTrunInput
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
- /* this is an endless loop - it is terminated when the thread is
- * signalled to do so. This, however, is handled by the framework,
- * right into the sleep below.
- */
+ DBGPRINTF("imudp uses select()\n");
+
while(1) {
/* Add the Unix Domain Sockets to the list of read
* descriptors.
@@ -338,20 +418,31 @@ CODESTARTrunInput
}
/* wait for io to become ready */
- nfds = select(maxfds+1, (fd_set *) pReadfds, NULL, NULL, NULL);
+ nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
for(i = 0; nfds && i < *udpLstnSocks; i++) {
- if(FD_ISSET(udpLstnSocks[i+1], pReadfds)) {
- processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
- fromHost, fromHostFQDN, fromHostIP);
+ if(FD_ISSET(udpLstnSocks[i+1], &readfds)) {
+ processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
+ fromHost, fromHostFQDN, fromHostIP, udpRulesets[i+1]);
--nfds; /* indicate we have processed one descriptor */
}
}
/* end of a run, back to loop for next recv() */
}
- freeFdSet(pReadfds);
- return iRet;
+ RETiRet;
+}
+#endif /* #if HAVE_EPOLL_CREATE1 */
+
+/* This function is called to gather input.
+ * Note that udpLstnSocks must be non-NULL because otherwise we would not have
+ * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
+ */
+BEGINrunInput
+CODESTARTrunInput
+ iRet = rcvMainLoop(pThrd);
ENDrunInput
@@ -364,6 +455,7 @@ CODESTARTwillRun
CHKiRet(prop.ConstructFinalize(pInputName));
net.PrintAllowedSenders(1); /* UDP */
+ net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */
/* if we could not set up any listners, there is no point in running... */
if(udpLstnSocks == NULL)
@@ -371,9 +463,7 @@ CODESTARTwillRun
iMaxLine = glbl.GetMaxLine();
- if((pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char)));
finalize_it:
ENDwillRun
@@ -385,6 +475,8 @@ CODESTARTafterRun
if(udpLstnSocks != NULL) {
net.closeUDPListenSockets(udpLstnSocks);
udpLstnSocks = NULL;
+ free(udpRulesets);
+ udpRulesets = NULL;
}
if(pRcvBuf != NULL) {
free(pRcvBuf);
@@ -402,13 +494,22 @@ CODESTARTmodExit
objRelease(glbl, CORE_COMPONENT);
objRelease(datetime, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
objRelease(net, LM_NET_FILENAME);
ENDmodExit
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
@@ -417,10 +518,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
free(pszBindAddr);
pszBindAddr = NULL;
}
- if(udpLstnSocks != NULL) {
- net.closeUDPListenSockets(udpLstnSocks);
- udpLstnSocks = NULL;
- }
iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */
return RS_RET_OK;
}
@@ -434,13 +531,12 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
/* register config file handlers */
- /* TODO: add - but this requires more changes, no time right now...
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord,
setRuleset, NULL, STD_LOADABLE_MODULE_ID));
- */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord,
addListner, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord,