diff options
Diffstat (limited to 'tcps_sess.c')
-rw-r--r-- | tcps_sess.c | 40 |
1 files changed, 28 insertions, 12 deletions
diff --git a/tcps_sess.c b/tcps_sess.c index f887e702..e0bec949 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -220,7 +220,7 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar * rgerhards, 2009-04-23 */ static rsRetVal -defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime) +defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; DEFiRet; @@ -234,10 +234,8 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); - /* first trim the buffer to what we have actually received */ - CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); - memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); - pMsg->iLenRawMsg = pThis->iMsg; +dbgprintf("defaultDoSubmit, iMsg %d\n", pThis->iMsg); + MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName, pThis->pLstnInfo->lenInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; @@ -245,7 +243,15 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG MsgSetRcvFrom(pMsg, pThis->fromHost); MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); - CHKiRet(submitMsg(pMsg)); + + if(pMultiSub == NULL) { + CHKiRet(submitMsg(pMsg)); + } else { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg(pMultiSub)); + } + finalize_it: /* reset status variables */ @@ -299,7 +305,7 @@ PrepareClose(tcps_sess_t *pThis) */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); datetime.getCurrTime(&stTime, &ttGenTime); - defaultDoSubmitMessage(pThis, &stTime, ttGenTime); + defaultDoSubmitMessage(pThis, &stTime, ttGenTime, NULL); } finalize_it: @@ -334,7 +340,7 @@ Close(tcps_sess_t *pThis) * rgerhards, 2008-03-14 */ static rsRetVal -processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime) +processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -380,7 +386,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -391,7 +397,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -408,7 +414,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } } @@ -433,9 +439,12 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt * this *is* the *correct* reception step for all the data we received, because * we have just received a bunch of data! -- rgerhards, 2009-06-16 */ +#define NUM_MULTISUB 1024 static rsRetVal DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) { + multi_submit_t multiSub; + msg_t *pMsgs[NUM_MULTISUB]; struct syslogTime stTime; time_t ttGenTime; char *pEnd; @@ -446,18 +455,25 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) assert(iLen > 0); datetime.getCurrTime(&stTime, &ttGenTime); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = NUM_MULTISUB; + multiSub.nElem = 0; /* We now copy the message to the session buffer. */ pEnd = pData + iLen; /* this is one off, which is intensional */ iNbrTimeUsed = 0; /* full time query */ while(pData < pEnd) { - CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime)); + CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } + /* submit anything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&multiSub)); + finalize_it: RETiRet; } +#undef NUM_MULTISUB /* queryInterface function |