From 6f0db63e9b962edf6305860b608500e8c650b71b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 16 Jun 2009 15:13:47 +0200 Subject: milestone: input-side multiSubmit capability ... commit before I try to touch the queue side ;) --- tcps_sess.c | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) (limited to 'tcps_sess.c') diff --git a/tcps_sess.c b/tcps_sess.c index f887e702..379faeab 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -220,7 +220,7 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar * rgerhards, 2009-04-23 */ static rsRetVal -defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime) +defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; DEFiRet; @@ -245,7 +245,15 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG MsgSetRcvFrom(pMsg, pThis->fromHost); MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); - CHKiRet(submitMsg(pMsg)); + + if(pMultiSub == NULL) { + CHKiRet(submitMsg(pMsg)); + } else { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg(pMultiSub)); + } + finalize_it: /* reset status variables */ @@ -299,7 +307,7 @@ PrepareClose(tcps_sess_t *pThis) */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); datetime.getCurrTime(&stTime, &ttGenTime); - defaultDoSubmitMessage(pThis, &stTime, ttGenTime); + defaultDoSubmitMessage(pThis, &stTime, ttGenTime, NULL); } finalize_it: @@ -334,7 +342,7 @@ Close(tcps_sess_t *pThis) * rgerhards, 2008-03-14 */ static rsRetVal -processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime) +processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -380,7 +388,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -391,7 +399,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -408,7 +416,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } } @@ -433,9 +441,12 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt * this *is* the *correct* reception step for all the data we received, because * we have just received a bunch of data! -- rgerhards, 2009-06-16 */ +#define NUM_MULTISUB 128 static rsRetVal DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) { + multi_submit_t multiSub; + msg_t *pMsgs[NUM_MULTISUB]; struct syslogTime stTime; time_t ttGenTime; char *pEnd; @@ -446,18 +457,25 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) assert(iLen > 0); datetime.getCurrTime(&stTime, &ttGenTime); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = NUM_MULTISUB; + multiSub.nElem = 0; /* We now copy the message to the session buffer. */ pEnd = pData + iLen; /* this is one off, which is intensional */ iNbrTimeUsed = 0; /* full time query */ while(pData < pEnd) { - CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime)); + CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } + /* submit anything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&multiSub)); + finalize_it: RETiRet; } +#undef NUM_MULTISUB /* queryInterface function -- cgit