/* imudp.c * This is the implementation of the UDP input module. * * NOTE: read comments in module-template.h to understand how this file * works! * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * * Rsyslog is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Rsyslog is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Rsyslog. If not, see . * * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" #include #include #include #include #include #include #if HAVE_SYS_EPOLL_H # include #endif #include "rsyslog.h" #include "dirty.h" #include "net.h" #include "cfsysline.h" #include "module-template.h" #include "srUtils.h" #include "errmsg.h" #include "glbl.h" #include "msg.h" #include "parser.h" #include "datetime.h" #include "prop.h" #include "unicode-helper.h" MODULE_TYPE_INPUT /* defines */ /* Module static data */ DEF_IMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(net) DEFobjCurrIf(datetime) DEFobjCurrIf(prop) static int iMaxLine; /* maximum UDP message size supported */ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements * read-only after init(), but beware of restart! */ static uchar *pszBindAddr = NULL; /* IP to bind socket to */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ // TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ #define TIME_REQUERY_DFLT 2 static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ /* config settings */ /* This function is called when a new listener shall be added. It takes * the configured parameters, tries to bind the socket and, if that * succeeds, adds it to the list of existing listen sockets. * rgerhards, 2007-12-27 */ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) { DEFiRet; uchar *bindAddr; int *newSocks; int *tmpSocks; int iSrc, iDst; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 */ if(pszBindAddr == NULL) bindAddr = NULL; else if(pszBindAddr[0] == '*' && pszBindAddr[1] == '\0') bindAddr = NULL; else bindAddr = pszBindAddr; DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal); newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1); if(newSocks != NULL) { /* we now need to add the new sockets to the existing set */ if(udpLstnSocks == NULL) { /* esay, we can just replace it */ udpLstnSocks = newSocks; } else { /* we need to add them */ if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) { DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); /* in this case, we discard the new sockets but continue with what we * already have */ free(newSocks); ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } else { /* ready to copy */ iDst = 1; for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) tmpSocks[iDst++] = udpLstnSocks[iSrc]; for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) tmpSocks[iDst++] = newSocks[iSrc]; tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; free(newSocks); free(udpLstnSocks); udpLstnSocks = tmpSocks; } } } finalize_it: free(pNewVal); /* in any case, this is no longer needed */ RETiRet; } #if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */ /* accept a new ruleset to bind. Checks if it exists and complains, if not */ static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) { ruleset_t *pRuleset; rsRetVal localRet; DEFiRet; localRet = ruleset.GetRuleset(&pRuleset, pszName); if(localRet == RS_RET_NOT_FOUND) { errmsg.LogError(0, NO_ERRCODE, "error: ruleset '%s' not found - ignored", pszName); } CHKiRet(localRet); pBindRuleset = pRuleset; DBGPRINTF("imudp current bind ruleset %p: '%s'\n", pRuleset, pszName); finalize_it: free(pszName); /* no longer needed */ RETiRet; } #endif /* This function is a helper to runInput. I have extracted it * from the main loop just so that we do not have that large amount of code * in a single place. This function takes a socket and pulls messages from * it until the socket does not have any more waiting. * rgerhards, 2008-01-08 * We try to read from the file descriptor until there * is no more data. This is done in the hope to get better performance * out of the system. However, this also means that a descriptor * monopolizes processing while it contains data. This can lead to * data loss in other descriptors. However, if the system is incapable of * handling the workload, we will loss data in any case. So it doesn't really * matter where the actual loss occurs - it is always random, because we depend * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP) { DEFiRet; int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; socklen_t socklen; ssize_t lenRcvBuf; struct sockaddr_storage frominet; msg_t *pMsg; prop_t *propFromHost = NULL; prop_t *propFromHostIP = NULL; char errStr[1024]; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ socklen = sizeof(struct sockaddr_storage); lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); } ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } /* if we reach this point, we had a good receive and can process the packet received */ /* check if we have a different sender than before, if so, we need to query some new values */ if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ /* Here we check if a host is permitted to send us * syslog messages. If it isn't, we do not further * process the message but log a warning (if we are * configured to do this). * rgerhards, 2005-09-26 */ *pbIsPermitted = net.isAllowedSender((uchar*)"UDP", (struct sockaddr *)&frominet, (char*)fromHostFQDN); if(!*pbIsPermitted) { DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN); if(glbl.GetOption_DisallowWarning) { time_t tt; time(&tt); if(tt > ttLastDiscard + 60) { ttLastDiscard = tt; errmsg.LogError(0, NO_ERRCODE, "UDP message from disallowed sender %s discarded", (char*)fromHost); } } } } DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { datetime.getCurrTime(&stTime, &ttGenTime); } /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; pMsg->bParseHOSTNAME = 1; MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); CHKiRet(submitMsg(pMsg)); } } finalize_it: if(propFromHost != NULL) prop.Destruct(&propFromHost); if(propFromHostIP != NULL) prop.Destruct(&propFromHostIP); RETiRet; } /* This function implements the main reception loop. Depending on the environment, * we either use the traditional (but slower) select() or the Linux-specific epoll() * interface. ./configure settings control which one is used. * rgerhards, 2009-09-09 */ #if HAVE_EPOLL_CREATE1 #define NUM_EPOLL_EVENTS 10 rsRetVal rcvMainLoop() { DEFiRet; int maxfds; int nfds; int efd; int i; fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; struct epoll_event *udpEPollEvt = NULL; struct epoll_event currEvt[NUM_EPOLL_EVENTS]; char errStr[1024]; RUNLOG_STR("ZZZ: imudp epoll startup"); /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); efd = epoll_create1(EPOLL_CLOEXEC); if(efd < 0) { DBGPRINTF("epoll_create1() could not create fd\n"); // TODO: "good" error message ABORT_FINALIZE(RS_RET_IO_ERROR); } /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ maxfds = 0; FD_ZERO (&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ for (i = 0; i < *udpLstnSocks; i++) { if (udpLstnSocks[i+1] != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; udpEPollEvt[i].data.fd = udpLstnSocks[i+1]; if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", udpLstnSocks[i+1], errStr); } } } RUNLOG_STR("ZZZ: done setting up epoll interface"); while(1) { /* wait for io to become ready */ nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); for(i = 0 ; i < nfds ; ++i) { dbgprintf("ZZZ: imudp processing fd %d\n", currEvt[i].data.fd); processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP); } } finalize_it: RETiRet; } #else /* #if HAVE_EPOLL_CREATE1 */ /* this is the code for the select() interface */ rsRetVal rcvMainLoop() { DEFiRet; int maxfds; int nfds; int i; fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); while(1) { /* Add the Unix Domain Sockets to the list of read * descriptors. * rgerhards 2005-08-01: we must now check if there are * any local sockets to listen to at all. If the -o option * is given without -a, we do not need to listen at all.. */ maxfds = 0; FD_ZERO (&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ for (i = 0; i < *udpLstnSocks; i++) { if (udpLstnSocks[i+1] != -1) { if(Debug) net.debugListenInfo(udpLstnSocks[i+1], "UDP"); FD_SET(udpLstnSocks[i+1], &readfds); if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; } } if(Debug) { dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds); for (nfds = 0; nfds <= maxfds; ++nfds) if ( FD_ISSET(nfds, &readfds) ) dbgprintf("%d ", nfds); dbgprintf("\n"); } /* wait for io to become ready */ nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ for(i = 0; nfds && i < *udpLstnSocks; i++) { if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP); --nfds; /* indicate we have processed one descriptor */ } } /* end of a run, back to loop for next recv() */ } RETiRet; } #endif /* #if HAVE_EPOLL_CREATE1 */ /* This function is called to gather input. * Note that udpLstnSocks must be non-NULL because otherwise we would not have * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 */ BEGINrunInput CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ RUNLOG_STR("ZZZ: imudp startup"); iRet = rcvMainLoop(); ENDrunInput /* initialize and return if will run or not */ BEGINwillRun CODESTARTwillRun /* we need to create the inputName property (only once during our lifetime) */ CHKiRet(prop.Construct(&pInputName)); CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imudp"), sizeof("imudp") - 1)); CHKiRet(prop.ConstructFinalize(pInputName)); net.PrintAllowedSenders(1); /* UDP */ /* if we could not set up any listners, there is no point in running... */ if(udpLstnSocks == NULL) ABORT_FINALIZE(RS_RET_NO_RUN); iMaxLine = glbl.GetMaxLine(); if((pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } finalize_it: ENDwillRun BEGINafterRun CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); if(udpLstnSocks != NULL) { net.closeUDPListenSockets(udpLstnSocks); udpLstnSocks = NULL; } if(pRcvBuf != NULL) { free(pRcvBuf); pRcvBuf = NULL; } if(pInputName != NULL) prop.Destruct(&pInputName); ENDafterRun BEGINmodExit CODESTARTmodExit /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); ENDmodExit BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURENonCancelInputTermination) iRet = RS_RET_OK; ENDisCompatibleWithFeature BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { if(pszBindAddr != NULL) { free(pszBindAddr); pszBindAddr = NULL; } if(udpLstnSocks != NULL) { net.closeUDPListenSockets(udpLstnSocks); udpLstnSocks = NULL; } iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; } BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); /* register config file handlers */ /* TODO: add - but this requires more changes, no time right now... CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord, addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit /* vim:set ai: */