From 42c3dcfc1ca71814e62763338a24eae8c8463069 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 13:39:11 +0200 Subject: performance enhancement: imtcp calls parser no longer on input thread but rather inside on of the potentially many main msg queue worker threads (an enhancement scheduled for all input plugins where this is possible) --- tcps_sess.c | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 12 deletions(-) (limited to 'tcps_sess.c') diff --git a/tcps_sess.c b/tcps_sess.c index d0edc018..2a3ec0df 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -43,6 +43,7 @@ #include "errmsg.h" #include "netstrm.h" #include "msg.h" +#include "datetime.h" /* static data */ @@ -50,6 +51,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(errmsg) DEFobjCurrIf(netstrm) +DEFobjCurrIf(datetime) static int iMaxLine; /* maximum size of a single message */ @@ -191,6 +193,52 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr) } +/* This is a helper for submitting the message to the rsyslog core. + * It does some common processing, including resetting the various + * state variables to a "processed" state. + * Note that this function is also called if we had a buffer overflow + * due to a too-long message. So far, there is no indication this + * happened and it may be worth thinking about different handling + * of this case (what obviously would require a change to this + * function or some related code). + * rgerhards, 2009-04-23 + */ +static rsRetVal +doSubmitMessage(tcps_sess_t *pThis) +{ + msg_t *pMsg; + struct syslogTime stTime; + time_t ttGenTime; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, tcps_sess); + + //TODO: if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + //} + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); + /* first trim the buffer to what we have actually received */ + CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); + memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); + pMsg->iLenRawMsg = pThis->iMsg; + MsgSetInputName(pMsg, (char*)pThis->pSrv->pszInputName); + MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + pMsg->bParseHOSTNAME = 1; + MsgSetRcvFrom(pMsg, (char*)pThis->fromHost); + CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); + CHKiRet(submitMsg(pMsg)); + +finalize_it: + /* reset status variables */ + pThis->bAtStrtOfFram = 1; + pThis->iMsg = 0; + + RETiRet; +} + + /* This should be called before a normal (non forced) close * of a TCP session. This function checks if there is any unprocessed * message left in the TCP stream. Such a message is probably a @@ -230,9 +278,7 @@ PrepareClose(tcps_sess_t *pThis) * this case. */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->bAtStrtOfFram = 1; + doSubmitMessage(pThis); } finalize_it: @@ -313,9 +359,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + doSubmitMessage(pThis); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -326,9 +370,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + doSubmitMessage(pThis); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -345,9 +387,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + doSubmitMessage(pThis); pThis->inputState = eAtStrtFram; } } @@ -432,6 +472,7 @@ CODESTARTObjClassExit(tcps_sess) /* release objects we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(netstrm, LM_NETSTRMS_FILENAME); + objRelease(datetime, CORE_COMPONENT); ENDObjClassExit(tcps_sess) @@ -443,6 +484,7 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c /* request objects we use */ CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ -- cgit