diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-10-06 14:25:23 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-10-06 14:25:23 +0200 |
commit | 65f74a712b111f946faaddd0583a6c7b22bd062f (patch) | |
tree | d9bafbb142d97004b856e71865a87a19324e5b7c | |
parent | b849df20045ca5dfec36cdff5641e8a78d326b49 (diff) | |
download | rsyslog-65f74a712b111f946faaddd0583a6c7b22bd062f.tar.gz rsyslog-65f74a712b111f946faaddd0583a6c7b22bd062f.tar.xz rsyslog-65f74a712b111f946faaddd0583a6c7b22bd062f.zip |
added $UDPServerTimeRequery option
...which enables to work with
less acurate timestamps in favor of performance. This enables querying
of the time only every n-th time if imudp is running in the tight
receive loop (aka receiving messsages at a high rate)
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | dirty.h | 2 | ||||
-rw-r--r-- | doc/rsyslog_conf.html | 8 | ||||
-rw-r--r-- | plugins/imrelp/imrelp.c | 2 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 20 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 2 | ||||
-rw-r--r-- | runtime/msg.c | 72 | ||||
-rw-r--r-- | runtime/msg.h | 4 | ||||
-rw-r--r-- | tcps_sess.c | 8 | ||||
-rw-r--r-- | tools/syslogd.c | 39 |
10 files changed, 126 insertions, 35 deletions
@@ -1,5 +1,9 @@ --------------------------------------------------------------------------- Version 3.21.6 [DEVEL] (rgerhards), 2008-10-?? +- added $UDPServerTimeRequery option which enables to work with + less acurate timestamps in favor of performance. This enables querying + of the time only every n-th time if imudp is running in the tight + receive loop (aka receiving messsages at a high rate) - doc bugfix: queue doc had wrong parameter name for setting controlling worker thread shutdown period - consolidated time calls during msg object creation, improves performance @@ -40,7 +40,7 @@ rsRetVal submitMsg(msg_t *pMsg); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); -rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bParseHost, int flags, flowControl_t flowCtlTypeu, uchar *pszInputName); +rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bParseHost, int flags, flowControl_t flowCtlTypeu, uchar *pszInputName, struct syslogTime *stTime); /* TODO: the following 2 need to go in conf obj interface... */ rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEntry, int iTplOpts, uchar *dfltTplName); diff --git a/doc/rsyslog_conf.html b/doc/rsyslog_conf.html index 7cd40cb7..2b773476 100644 --- a/doc/rsyslog_conf.html +++ b/doc/rsyslog_conf.html @@ -243,6 +243,14 @@ address (or name) the UDP listens should bind to</li> <li>$UDPServerRun <port> (imudp) -- former -r<port> option, default 514, start UDP server on this port, "*" means all addresses</li> +<li>$UDPServerTimeRequery <nbr-of-times> (imudp) -- this is a performance +optimization. Getting the system time is very costly. With this setting, imudp can +be instructed to obtain the precise time only once every n-times. This logic is +only activated if messages come in at a very fast rate, so doing less frequent +time calls should usually be acceptable. The default value is two, because we have +seen that even without optimization the kernel often returns twice the identical time. +You can set this value as high as you like, but do so at your own risk. The higher +the value, the less precise the timestamp. <li><a href="rsconf1_umask.html">$UMASK</a></li> </ul> <p><b>Where <size_nbr> is specified above,</b> diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index b01dd98b..f3771237 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -84,7 +84,7 @@ onSyslogRcv(uchar *pHostname, uchar __attribute__((unused)) *pIP, uchar *pMsg, s { DEFiRet; parseAndSubmitMessage(pHostname, (uchar*) "[unset]", pMsg, lenMsg, MSG_PARSE_HOSTNAME, - NOFLAG, eFLOWCTL_LIGHT_DELAY, (uchar*)"imrelp"); + NOFLAG, eFLOWCTL_LIGHT_DELAY, (uchar*)"imrelp", NULL); RETiRet; } diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index f4830a47..1865d777 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -40,6 +40,7 @@ #include "srUtils.h" #include "errmsg.h" #include "glbl.h" +#include "datetime.h" MODULE_TYPE_INPUT @@ -50,6 +51,7 @@ DEF_IMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(net) +DEFobjCurrIf(datetime) static int iMaxLine; /* maximum UDP message size supported */ static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements @@ -59,6 +61,8 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ +#define TIME_REQUERY_DFLT 2 +static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ /* config settings */ @@ -141,6 +145,8 @@ BEGINrunInput uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; ssize_t l; + struct syslogTime stTime; + int iNbrTimeUsed; CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, @@ -179,6 +185,7 @@ CODESTARTrunInput for (i = 0; nfds && i < *udpLstnSocks; i++) { if (FD_ISSET(udpLstnSocks[i+1], &readfds)) { socklen = sizeof(frominet); + iNbrTimeUsed = 0; do { /* we now try to read from the file descriptor until there * is no more data. This is done in the hope to get better performance @@ -203,8 +210,11 @@ CODESTARTrunInput */ if(net.isAllowedSender(net.pAllowedSenders_UDP, (struct sockaddr *)&frominet, (char*)fromHostFQDN)) { + if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { + datetime.getCurrTime(&stTime); + } parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp"); + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", &stTime); } else { dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN); if(glbl.GetOption_DisallowWarning) { @@ -274,6 +284,7 @@ CODESTARTmodExit /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); ENDmodExit @@ -293,6 +304,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a net.closeUDPListenSockets(udpLstnSocks); udpLstnSocks = NULL; } + iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; } @@ -303,6 +315,7 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); /* register config file handlers */ @@ -310,9 +323,10 @@ CODEmodInit_QueryRegCFSLineHdlr addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, + NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit -/* - * vi:set ai: +/* vim:set ai: */ diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 55b8b2df..77d347d9 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -221,7 +221,7 @@ static rsRetVal readSocket(int fd, int iSock) if (iRcvd > 0) { parseAndSubmitMessage(funixHName[iSock] == NULL ? glbl.GetLocalHostName() : funixHName[iSock], (uchar*)"127.0.0.1", pRcv, - iRcvd, funixParseHost[iSock], funixFlags[iSock], funixFlowCtl[iSock], (uchar*)"imuxsock"); + iRcvd, funixParseHost[iSock], funixFlags[iSock], funixFlowCtl[iSock], (uchar*)"imuxsock", NULL); } else if (iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); diff --git a/runtime/msg.c b/runtime/msg.c index 69296710..df8c1572 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -239,12 +239,21 @@ rsRetVal MsgEnableThreadSafety(void) /* end locking functions */ -/* "Constructor" for a msg "object". Returns a pointer to +/* This is common code for all Constructors. It is defined in an + * inline'able function so that we can save a function call in the + * actual constructors (otherwise, the msgConstruct would need + * to call msgConstructWithTime(), which would require a + * function call). Now, both can use this inline function. This + * enables us to be optimal, but still have the code just once. * the new object or NULL if no such object could be allocated. * An object constructed via this function should only be destroyed - * via "msgDestruct()". + * via "msgDestruct()". This constructor does not query system time + * itself but rather uses a user-supplied value. This enables the caller + * to do some tricks to save processing time (done, for example, in the + * udp input). + * rgerhards, 2008-10-06 */ -rsRetVal msgConstruct(msg_t **ppThis) +static inline rsRetVal msgBaseConstruct(msg_t **ppThis) { DEFiRet; msg_t *pM; @@ -257,21 +266,58 @@ rsRetVal msgConstruct(msg_t **ppThis) pM->iRefCount = 1; pM->iSeverity = -1; pM->iFacility = -1; + objConstructSetObjInfo(pM); + + /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/ + + *ppThis = pM; + +finalize_it: + RETiRet; +} + + +/* "Constructor" for a msg "object". Returns a pointer to + * the new object or NULL if no such object could be allocated. + * An object constructed via this function should only be destroyed + * via "msgDestruct()". This constructor does not query system time + * itself but rather uses a user-supplied value. This enables the caller + * to do some tricks to save processing time (done, for example, in the + * udp input). + * rgerhards, 2008-10-06 + */ +rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime) +{ + DEFiRet; + + CHKiRet(msgBaseConstruct(ppThis)); + memcpy(&(*ppThis)->tRcvdAt, stTime, sizeof(struct syslogTime)); + memcpy(&(*ppThis)->tTIMESTAMP, stTime, sizeof(struct syslogTime)); +finalize_it: + RETiRet; +} + + +/* "Constructor" for a msg "object". Returns a pointer to + * the new object or NULL if no such object could be allocated. + * An object constructed via this function should only be destroyed + * via "msgDestruct()". This constructor, for historical reasons, + * also sets the two timestamps to the current time. + */ +rsRetVal msgConstruct(msg_t **ppThis) +{ + DEFiRet; + + CHKiRet(msgBaseConstruct(ppThis)); /* we initialize both timestamps to contain the current time, so that they * are consistent. Also, this saves us from doing any further time calls just * to obtain a timestamp. The memcpy() should not really make a difference, * especially as I think there is no codepath currently where it would not be * required (after I have cleaned up the pathes ;)). -- rgerhards, 2008-10-02 */ - datetime.getCurrTime(&(pM->tRcvdAt)); - memcpy(&pM->tTIMESTAMP, &pM->tRcvdAt, sizeof(struct syslogTime)); - - objConstructSetObjInfo(pM); - - /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/ - - *ppThis = pM; + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); + memcpy(&(*ppThis)->tTIMESTAMP, &(*ppThis)->tRcvdAt, sizeof(struct syslogTime)); finalize_it: RETiRet; @@ -2492,7 +2538,5 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE) funcDeleteMutex = MsgLockingDummy; funcMsgPrepareEnqueue = MsgLockingDummy; ENDObjClassInit(msg) - -/* - * vi:set ai: +/* vim:set ai: */ diff --git a/runtime/msg.h b/runtime/msg.h index 21cb2c64..d3c0718b 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -119,6 +119,7 @@ struct msg { PROTOTYPEObjClassInit(msg); char* getProgramName(msg_t*); rsRetVal msgConstruct(msg_t **ppThis); +rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime); rsRetVal msgDestruct(msg_t **ppM); msg_t* MsgDup(msg_t* pOld); msg_t *MsgAddRef(msg_t *pM); @@ -180,6 +181,5 @@ extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg); #define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg) #endif /* #ifndef MSG_H_INCLUDED */ -/* - * vi:set ai: +/* vim:set ai: */ diff --git a/tcps_sess.c b/tcps_sess.c index f5420fc0..7ce9f2f8 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -230,7 +230,7 @@ PrepareClose(tcps_sess_t *pThis) */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, MSG_PARSE_HOSTNAME, - NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL); /* TODO: add real InputName */ + NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL); /* TODO: add real InputName */ pThis->bAtStrtOfFram = 1; } @@ -313,7 +313,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL); /* TODO: add real InputName */ + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL); /* TODO: add real InputName */ pThis->iMsg = 0; /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good @@ -324,7 +324,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(c == '\n' && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delemiter? */ parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL); /* TODO: add real InputName */ + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL); /* TODO: add real InputName */ pThis->iMsg = 0; pThis->inputState = eAtStrtFram; } else { @@ -343,7 +343,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL); /* TODO: add real InputName */ + MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL); /* TODO: add real InputName */ pThis->iMsg = 0; pThis->inputState = eAtStrtFram; } diff --git a/tools/syslogd.c b/tools/syslogd.c index b576bb6d..a6e17d8f 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -581,9 +581,15 @@ void untty(void) * Interface change: added new parameter "InputName", permits the input to provide * a string that identifies it. May be NULL, but must be a valid char* pointer if * non-NULL. + * + * rgerhards, 2008-10-06: + * Interface change: added new parameter "stTime", which enables the caller to provide + * a timestamp that is to be used as timegenerated instead of the current system time. + * This is meant to facilitate performance optimization. Some inputs support such modes. + * If stTime is NULL, the current system time is used. */ -rsRetVal printline(uchar *hname, uchar *hnameIP, uchar *msg, int bParseHost, int flags, flowControl_t flowCtlType, - uchar *pszInputName) +static inline rsRetVal printline(uchar *hname, uchar *hnameIP, uchar *msg, int bParseHost, int flags, flowControl_t flowCtlType, + uchar *pszInputName, struct syslogTime *stTime) { DEFiRet; register uchar *p; @@ -591,7 +597,11 @@ rsRetVal printline(uchar *hname, uchar *hnameIP, uchar *msg, int bParseHost, int msg_t *pMsg; /* Now it is time to create the message object (rgerhards) */ - CHKiRet(msgConstruct(&pMsg)); + if(stTime == NULL) { + CHKiRet(msgConstruct(&pMsg)); + } else { + CHKiRet(msgConstructWithTime(&pMsg, stTime)); + } if(pszInputName != NULL) MsgSetInputName(pMsg, (char*) pszInputName); MsgSetFlowControlType(pMsg, flowCtlType); @@ -684,10 +694,16 @@ finalize_it: * Interface change: added new parameter "InputName", permits the input to provide * a string that identifies it. May be NULL, but must be a valid char* pointer if * non-NULL. + * + * rgerhards, 2008-10-06: + * Interface change: added new parameter "stTime", which enables the caller to provide + * a timestamp that is to be used as timegenerated instead of the current system time. + * This is meant to facilitate performance optimization. Some inputs support such modes. + * If stTime is NULL, the current system time is used. */ rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bParseHost, int flags, flowControl_t flowCtlType, - uchar *pszInputName) + uchar *pszInputName, struct syslogTime *stTime) { DEFiRet; register int iMsg; @@ -714,9 +730,6 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bPa * TODO: optimize buffer handling */ iMaxLine = glbl.GetMaxLine(); CHKmalloc(tmpline = malloc(sizeof(uchar) * (iMaxLine + 1))); -# ifdef USE_NETZIP - CHKmalloc(deflateBuf = malloc(sizeof(uchar) * (iMaxLine + 1))); -# endif /* we first check if we have a NUL character at the very end of the * message. This seems to be a frequent problem with a number of senders. @@ -762,6 +775,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bPa */ int ret; iLenDefBuf = iMaxLine; + CHKmalloc(deflateBuf = malloc(sizeof(uchar) * (iMaxLine + 1))); ret = uncompress((uchar *) deflateBuf, &iLenDefBuf, (uchar *) msg+1, len-1); dbgprintf("Compressed message uncompressed with status %d, length: new %ld, old %d.\n", ret, (long) iLenDefBuf, len-1); @@ -800,7 +814,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bPa */ if(iMsg == iMaxLine) { *(pMsg + iMsg) = '\0'; /* space *is* reserved for this! */ - printline(hname, hnameIP, tmpline, bParseHost, flags, flowCtlType, pszInputName); + printline(hname, hnameIP, tmpline, bParseHost, flags, flowCtlType, pszInputName, stTime); } else { /* This case in theory never can happen. If it happens, we have * a logic error. I am checking for it, because if I would not, @@ -852,7 +866,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bPa *(pMsg + iMsg) = '\0'; /* space *is* reserved for this! */ /* typically, we should end up here! */ - printline(hname, hnameIP, tmpline, bParseHost, flags, flowCtlType, pszInputName); + printline(hname, hnameIP, tmpline, bParseHost, flags, flowCtlType, pszInputName, stTime); finalize_it: if(tmpline != NULL) @@ -1344,6 +1358,13 @@ static int parseRFCSyslogMsg(msg_t *pMsg, int flags) } else { /* we can not parse, so we get the system we * received the data from. + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); + datetime.getCurrTime(&((*ppThis)->tRcvdAt)); */ MsgSetHOSTNAME(pMsg, getRcvFrom(pMsg)); } |