diff options
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r-- | plugins/imudp/imudp.c | 121 |
1 files changed, 101 insertions, 20 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0a8920f5..f1a720bc 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -32,6 +32,9 @@ #include <errno.h> #include <unistd.h> #include <netdb.h> +#if HAVE_SYS_EPOLL_H +# include <sys/epoll.h> +#endif #include "rsyslog.h" #include "dirty.h" #include "net.h" @@ -113,7 +116,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) } else { /* we need to add them */ if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) { - dbgprintf("out of memory trying to allocate udp listen socket array\n"); + DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); /* in this case, we discard the new sockets but continue with what we * already have */ @@ -205,7 +208,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); } - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } /* if we reach this point, we had a good receive and can process the packet received */ @@ -267,22 +270,19 @@ finalize_it: } -/* This function is called to gather input. - * Note that udpLstnSocks must be non-NULL because otherwise we would not have - * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 - * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably - * highly efficient "name caching". Before querying a name, I now check if the name to be - * queried is the same as the one queried in the last message processed. If that is the - * case, we can simple re-use the previous value. This algorithm works quite well with - * few sender, especially if they emit messages in bursts. The more sender and the - * more intermixed messages arrive, the less this algorithm works, but the overhead - * is so minimal (a simple memory compare and move) that this does not hurt. Even - * with a real name lookup cache, this optimization here is useful as it is quicker - * than even a cache lookup). +/* This function implements the main reception loop. Depending on the environment, + * we either use the traditional (but slower) select() or the Linux-specific epoll() + * interface. ./configure settings control which one is used. + * rgerhards, 2009-09-09 */ -BEGINrunInput +#if HAVE_EPOLL_CREATE1 +#define NUM_EPOLL_EVENTS 10 +rsRetVal rcvMainLoop() +{ + DEFiRet; int maxfds; int nfds; + int efd; int i; fd_set readfds; struct sockaddr_storage frominetPrev; @@ -290,16 +290,82 @@ BEGINrunInput uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; -CODESTARTrunInput + struct epoll_event *udpEPollEvt = NULL; + struct epoll_event currEvt[NUM_EPOLL_EVENTS]; + char errStr[1024]; + +RUNLOG_STR("ZZZ: imudp epoll startup"); /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); - /* this is an endless loop - it is terminated when the thread is - * signalled to do so. This, however, is handled by the framework, - * right into the sleep below. + + CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); + + efd = epoll_create1(EPOLL_CLOEXEC); + if(efd < 0) { + DBGPRINTF("epoll_create1() could not create fd\n"); + // TODO: "good" error message + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + /* fill the epoll set - we need to do this only once, as the set + * can not change dyamically. + */ + maxfds = 0; + FD_ZERO (&readfds); + + /* Add the UDP listen sockets to the list of read descriptors. */ + for (i = 0; i < *udpLstnSocks; i++) { + if (udpLstnSocks[i+1] != -1) { + udpEPollEvt[i].events = EPOLLIN | EPOLLET; + udpEPollEvt[i].data.fd = udpLstnSocks[i+1]; + if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", + udpLstnSocks[i+1], errStr); + } + } + } + +RUNLOG_STR("ZZZ: done setting up epoll interface"); + while(1) { + /* wait for io to become ready */ + nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); + DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); + + for(i = 0 ; i < nfds ; ++i) { +dbgprintf("ZZZ: imudp processing fd %d\n", currEvt[i].data.fd); + processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted, + fromHost, fromHostFQDN, fromHostIP); + } + } + +finalize_it: + RETiRet; +} +#else /* #if HAVE_EPOLL_CREATE1 */ +/* this is the code for the select() interface */ +rsRetVal rcvMainLoop() +{ + DEFiRet; + int maxfds; + int nfds; + int i; + fd_set readfds; + struct sockaddr_storage frominetPrev; + int bIsPermitted; + uchar fromHost[NI_MAXHOST]; + uchar fromHostIP[NI_MAXHOST]; + uchar fromHostFQDN[NI_MAXHOST]; + + /* start "name caching" algo by making sure the previous system indicator + * is invalidated. */ + bIsPermitted = 0; + memset(&frominetPrev, 0, sizeof(frominetPrev)); + while(1) { /* Add the Unix Domain Sockets to the list of read * descriptors. @@ -342,7 +408,22 @@ CODESTARTrunInput /* end of a run, back to loop for next recv() */ } - return iRet; + RETiRet; +} +#endif /* #if HAVE_EPOLL_CREATE1 */ + +/* This function is called to gather input. + * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 + */ +BEGINrunInput +CODESTARTrunInput + /* this is an endless loop - it is terminated when the thread is + * signalled to do so. This, however, is handled by the framework, + * right into the sleep below. + */ +RUNLOG_STR("ZZZ: imudp startup"); + iRet = rcvMainLoop(); ENDrunInput |