diff options
Diffstat (limited to 'tcps_sess.c')
-rw-r--r-- | tcps_sess.c | 95 |
1 files changed, 83 insertions, 12 deletions
diff --git a/tcps_sess.c b/tcps_sess.c index ea525977..62d51f66 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -44,6 +44,7 @@ #include "errmsg.h" #include "netstrm.h" #include "msg.h" +#include "datetime.h" /* static data */ @@ -51,6 +52,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(errmsg) DEFobjCurrIf(netstrm) +DEFobjCurrIf(datetime) static int iMaxLine; /* maximum size of a single message */ @@ -183,6 +185,18 @@ SetTcpsrv(tcps_sess_t *pThis, tcpsrv_t *pSrv) } +/* set our parent listener info*/ +static rsRetVal +SetLstnInfo(tcps_sess_t *pThis, tcpLstnPortList_t *pLstnInfo) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcps_sess); + assert(pLstnInfo != NULL); + pThis->pLstnInfo = pLstnInfo; + RETiRet; +} + + static rsRetVal SetUsrP(tcps_sess_t *pThis, void *pUsr) { @@ -192,6 +206,67 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr) } +static rsRetVal +SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) +{ + DEFiRet; + pThis->DoSubmitMessage = OnMsgReceive; + RETiRet; +} + + +/* This is a helper for submitting the message to the rsyslog core. + * It does some common processing, including resetting the various + * state variables to a "processed" state. + * Note that this function is also called if we had a buffer overflow + * due to a too-long message. So far, there is no indication this + * happened and it may be worth thinking about different handling + * of this case (what obviously would require a change to this + * function or some related code). + * rgerhards, 2009-04-23 + */ +static rsRetVal +defaultDoSubmitMessage(tcps_sess_t *pThis) +{ + msg_t *pMsg; + struct syslogTime stTime; + time_t ttGenTime; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, tcps_sess); + + if(pThis->DoSubmitMessage != NULL) { + pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); + FINALIZE; + } + + //TODO: if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + //} + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); + /* first trim the buffer to what we have actually received */ + CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); + memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); + pMsg->iLenRawMsg = pThis->iMsg; + MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName); + MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + pMsg->bParseHOSTNAME = 1; + MsgSetRcvFrom(pMsg, pThis->fromHost); + CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); + CHKiRet(submitMsg(pMsg)); + +finalize_it: + /* reset status variables */ + pThis->bAtStrtOfFram = 1; + pThis->iMsg = 0; + + RETiRet; +} + + + /* This should be called before a normal (non forced) close * of a TCP session. This function checks if there is any unprocessed * message left in the TCP stream. Such a message is probably a @@ -231,9 +306,7 @@ PrepareClose(tcps_sess_t *pThis) * this case. */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->bAtStrtOfFram = 1; + defaultDoSubmitMessage(pThis); } finalize_it: @@ -314,9 +387,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + defaultDoSubmitMessage(pThis); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -327,9 +398,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + defaultDoSubmitMessage(pThis); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -346,9 +415,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, - PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0); - pThis->iMsg = 0; + defaultDoSubmitMessage(pThis); pThis->inputState = eAtStrtFram; } } @@ -417,10 +484,12 @@ CODESTARTobjQueryInterface(tcps_sess) pIf->SetUsrP = SetUsrP; pIf->SetTcpsrv = SetTcpsrv; + pIf->SetLstnInfo = SetLstnInfo; pIf->SetHost = SetHost; pIf->SetHostIP = SetHostIP; pIf->SetStrm = SetStrm; pIf->SetMsgIdx = SetMsgIdx; + pIf->SetOnMsgReceive = SetOnMsgReceive; finalize_it: ENDobjQueryInterface(tcps_sess) @@ -433,6 +502,7 @@ CODESTARTObjClassExit(tcps_sess) /* release objects we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(netstrm, LM_NETSTRMS_FILENAME); + objRelease(datetime, CORE_COMPONENT); ENDObjClassExit(tcps_sess) @@ -444,6 +514,7 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c /* request objects we use */ CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ |