diff options
Diffstat (limited to 'tcps_sess.c')
-rw-r--r-- | tcps_sess.c | 125 |
1 files changed, 96 insertions, 29 deletions
diff --git a/tcps_sess.c b/tcps_sess.c index ea525977..cfee0523 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -44,6 +44,7 @@ #include "errmsg.h" #include "netstrm.h" #include "msg.h" +#include "datetime.h" /* static data */ @@ -51,6 +52,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(errmsg) DEFobjCurrIf(netstrm) +DEFobjCurrIf(datetime) static int iMaxLine; /* maximum size of a single message */ @@ -95,12 +97,9 @@ CODESTARTobjDestruct(tcps_sess) pThis->pSrv->pOnSessDestruct(&pThis->pUsr); } /* now destruct our own properties */ - if(pThis->fromHost != NULL) - free(pThis->fromHost); - if(pThis->fromHostIP != NULL) - free(pThis->fromHostIP); - if(pThis->pMsg != NULL) - free(pThis->pMsg); + free(pThis->fromHost); + free(pThis->fromHostIP); + free(pThis->pMsg); ENDobjDestruct(tcps_sess) @@ -122,10 +121,7 @@ SetHost(tcps_sess_t *pThis, uchar *pszHost) ISOBJ_TYPE_assert(pThis, tcps_sess); - if(pThis->fromHost != NULL) { - free(pThis->fromHost); - } - + free(pThis->fromHost); pThis->fromHost = pszHost; RETiRet; @@ -142,10 +138,7 @@ SetHostIP(tcps_sess_t *pThis, uchar *pszHostIP) ISOBJ_TYPE_assert(pThis, tcps_sess); - if(pThis->fromHostIP != NULL) { - free(pThis->fromHostIP); - } - + free(pThis->fromHostIP); pThis->fromHostIP = pszHostIP; RETiRet; @@ -183,6 +176,18 @@ SetTcpsrv(tcps_sess_t *pThis, tcpsrv_t *pSrv) } +/* set our parent listener info*/ +static rsRetVal +SetLstnInfo(tcps_sess_t *pThis, tcpLstnPortList_t *pLstnInfo) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcps_sess); + assert(pLstnInfo != NULL); + pThis->pLstnInfo = pLstnInfo; + RETiRet; +} + + static rsRetVal SetUsrP(tcps_sess_t *pThis, void *pUsr) { @@ -192,6 +197,62 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr) } +static rsRetVal +SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) +{ + DEFiRet; + pThis->DoSubmitMessage = OnMsgReceive; + RETiRet; +} + + +/* This is a helper for submitting the message to the rsyslog core. + * It does some common processing, including resetting the various + * state variables to a "processed" state. + * Note that this function is also called if we had a buffer overflow + * due to a too-long message. So far, there is no indication this + * happened and it may be worth thinking about different handling + * of this case (what obviously would require a change to this + * function or some related code). + * rgerhards, 2009-04-23 + */ +static rsRetVal +defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime) +{ + msg_t *pMsg; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, tcps_sess); + + if(pThis->DoSubmitMessage != NULL) { + pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); + FINALIZE; + } + + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); + /* first trim the buffer to what we have actually received */ + CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); + memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); + pMsg->iLenRawMsg = pThis->iMsg; + MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName, pThis->pLstnInfo->lenInputName); + MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + pMsg->bParseHOSTNAME = 1; + MsgSetRcvFrom(pMsg, pThis->fromHost); + CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); + CHKiRet(submitMsg(pMsg)); + +finalize_it: + /* reset status variables */ + pThis->bAtStrtOfFram = 1; + pThis->iMsg = 0; + + RETiRet; +} + + + /* This should be called before a normal (non forced) close * of a TCP session. This function checks if there is any unprocessed * message left in the TCP stream. Such a message is probably a @@ -206,6 +267,8 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr) static rsRetVal PrepareClose(tcps_sess_t *pThis) { + struct syslogTime stTime; + time_t ttGenTime; DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -231,9 +294,8 @@ PrepareClose(tcps_sess_t *pThis) * this case. */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->bAtStrtOfFram = 1; + datetime.getCurrTime(&stTime, &ttGenTime); + defaultDoSubmitMessage(pThis, &stTime, ttGenTime); } finalize_it: @@ -268,7 +330,7 @@ Close(tcps_sess_t *pThis) * rgerhards, 2008-03-14 */ static rsRetVal -processDataRcvd(tcps_sess_t *pThis, char c) +processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime) { DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -314,9 +376,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + defaultDoSubmitMessage(pThis, stTime, ttGenTime); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -327,9 +387,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + defaultDoSubmitMessage(pThis, stTime, ttGenTime); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -346,9 +404,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + defaultDoSubmitMessage(pThis, stTime, ttGenTime); pThis->inputState = eAtStrtFram; } } @@ -369,22 +425,29 @@ processDataRcvd(tcps_sess_t *pThis, char c) * RS_RET_OK, which means the session should be kept open * or anything else, which means it must be closed. * rgerhards, 2008-03-01 + * As a performance optimization, we pick up the timestamp here. Acutally, + * this *is* the *correct* reception step for all the data we received, because + * we have just received a bunch of data! -- rgerhards, 2009-06-16 */ static rsRetVal DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) { - DEFiRet; + struct syslogTime stTime; + time_t ttGenTime; char *pEnd; + DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); assert(pData != NULL); assert(iLen > 0); + datetime.getCurrTime(&stTime, &ttGenTime); + /* We now copy the message to the session buffer. */ pEnd = pData + iLen; /* this is one off, which is intensional */ while(pData < pEnd) { - CHKiRet(processDataRcvd(pThis, *pData++)); + CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime)); } finalize_it: @@ -417,10 +480,12 @@ CODESTARTobjQueryInterface(tcps_sess) pIf->SetUsrP = SetUsrP; pIf->SetTcpsrv = SetTcpsrv; + pIf->SetLstnInfo = SetLstnInfo; pIf->SetHost = SetHost; pIf->SetHostIP = SetHostIP; pIf->SetStrm = SetStrm; pIf->SetMsgIdx = SetMsgIdx; + pIf->SetOnMsgReceive = SetOnMsgReceive; finalize_it: ENDobjQueryInterface(tcps_sess) @@ -433,6 +498,7 @@ CODESTARTObjClassExit(tcps_sess) /* release objects we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(netstrm, LM_NETSTRMS_FILENAME); + objRelease(datetime, CORE_COMPONENT); ENDObjClassExit(tcps_sess) @@ -444,6 +510,7 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c /* request objects we use */ CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ |