diff options
-rw-r--r-- | action.c | 186 | ||||
-rw-r--r-- | doc/msgflow.txt | 2 | ||||
-rw-r--r-- | runtime/batch.h | 73 | ||||
-rw-r--r-- | runtime/queue.c | 41 | ||||
-rw-r--r-- | runtime/queue.h | 2 | ||||
-rw-r--r-- | runtime/rule.c | 11 | ||||
-rw-r--r-- | runtime/ruleset.c | 75 | ||||
-rw-r--r-- | runtime/wti.c | 4 | ||||
-rw-r--r-- | tests/Makefile.am | 2 | ||||
-rwxr-xr-x | tests/diag.sh | 6 | ||||
-rw-r--r-- | tests/tcpflood.c | 1 | ||||
-rw-r--r-- | tools/syslogd.c | 13 |
12 files changed, 337 insertions, 79 deletions
@@ -585,7 +585,7 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static rsRetVal actionPrepare(action_t *pThis) +static inline rsRetVal actionPrepare(action_t *pThis) { DEFiRet; @@ -665,7 +665,6 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **pp for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); break; case ACT_ARRAY_PASSING: @@ -726,29 +725,23 @@ static rsRetVal cleanupDoActionParams(action_t *pAction, uchar **ppMsgs) * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg) +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) { - uchar *ppMsgs[10]; - size_t lenMsgs[10]; int i; DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - for(i = 0 ; i < 10 ; ++ i) { - ppMsgs[i] = NULL; - lenMsgs[i] = 0; - } - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs)); + //CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs)); pThis->bHadAutoCommit = 0; #if 1 //d_pthread_mutex_lock(&pThis->mutActExec); //pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); - iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); + // original: iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); //pthread_cleanup_pop(1); /* unlock mutex */ //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); #else @@ -783,6 +776,7 @@ iRet = RS_RET_OK; finalize_it: +#if 0 // THIS NEEDS TO BE DONE TO THE BATCH! switch(pThis->eParamPassing) { case ACT_STRING_PASSING: for(i = 0 ; i < 10 ; ++i) @@ -795,6 +789,7 @@ finalize_it: /* nothing to do in that case */ break; } +#endif RETiRet; @@ -805,8 +800,8 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg) +static inline rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams) { DEFiRet; @@ -815,7 +810,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg) CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg)); + CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); iRet = getReturnCode(pThis); finalize_it: @@ -835,8 +830,11 @@ finishBatch(action_t *pThis, batch_t *pBatch) ASSERT(pThis != NULL); - if(pThis->eState == ACT_STATE_RDY) +dbgprintf("ZZZ: finishBatch called, eState %d\n", pThis->eState); + if(pThis->eState == ACT_STATE_RDY) { + /* we just need to flag the batch as commited */ FINALIZE; /* nothing to do */ + } CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) { @@ -846,7 +844,8 @@ finishBatch(action_t *pThis, batch_t *pBatch) actionCommitted(pThis); /* flag messages as committed */ for(i = 0 ; i < pBatch->nElem ; ++i) { - pBatch->pElem[i].state = BATCH_STATE_COMM; + batchSetElemState(pBatch, i, BATCH_STATE_COMM); +dbgprintf("ZZZ: finishBatch commits element %d\n", i); } break; case RS_RET_SUSPENDED: @@ -881,8 +880,8 @@ finalize_it: /* try to submit a partial batch of elements. * rgerhards, 2009-05-12 */ -static rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate) +static inline rsRetVal +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) { int i; int iElemProcessed; @@ -894,17 +893,22 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme assert(pBatch != NULL); assert(pnElem != NULL); +dbgprintf("ZZZ1: tryDoAction, nElem %d, iDoneUpto %d\n", *pnElem, pBatch->iDoneUpTo); i = pBatch->iDoneUpTo; /* all messages below that index are processed */ iElemProcessed = 0; iCommittedUpTo = i; - pAction->pbShutdownImmediate = pbShutdownImmediate; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*pbShutdownImmediate) + if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - if(pBatch->pElem[i].state != BATCH_STATE_DISC) { - localRet = actionProcessMessage(pAction, pMsg); +dbgprintf("ZZZ1: tryDoAction loop %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state); + if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) { +dbgprintf("ZZZ1: trying to execute\n"); + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams); DBGPRINTF("action call returned %d\n", localRet); + /* Note: we directly modify the batch object state, because we know that + * wo do not overwrite DISC indicators! + */ if(localRet == RS_RET_OK) { /* mark messages as committed */ while(iCommittedUpTo <= i) { @@ -931,9 +935,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme } finalize_it: - if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) { - iRet = RS_RET_DISCARDMSG; - } else if(pBatch->iDoneUpTo != iCommittedUpTo) { + if(pBatch->iDoneUpTo != iCommittedUpTo) { *pnElem += iCommittedUpTo - pBatch->iDoneUpTo; pBatch->iDoneUpTo = iCommittedUpTo; } @@ -944,21 +946,24 @@ finalize_it: /* submit a batch for actual action processing. * The first nElem elements are processed. This function calls itself * recursively if it needs to handle errors. + * Note: we don't need the number of the first message to be processed as a parameter, + * because this is kept track of inside the batch itself (iDoneUpTo). * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) { int i; int bDone; rsRetVal localRet; DEFiRet; +dbgprintf("ZZZ1: submitBatch, nElem %d\n", nElem); assert(pBatch != NULL); bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); + localRet = tryDoAction(pAction, pBatch, &nElem); if(localRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } @@ -968,38 +973,36 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi /* try commit transaction, once done, we can simply do so as if * that return state was returned from tryDoAction(). */ - localRet = finishBatch(pAction, pBatch); // TODO: careful, do we need the elem counter? + localRet = finishBatch(pAction, pBatch); } if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { bDone = 1; - } else if(localRet == RS_RET_DISCARDMSG) { - iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */ - bDone = 1; } else if(localRet == RS_RET_SUSPENDED) { ; /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { /* in this case, the whole batch can not be processed */ for(i = 0 ; i < nElem ; ++i) { - pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD; +dbgprintf("ZZZ2: setting batch state for item %d\n", i); + batchSetElemState(pBatch, i, BATCH_STATE_BAD); } bDone = 1; } else { if(nElem == 1) { - pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD; + batchSetElemState(pBatch, i, BATCH_STATE_BAD); bDone = 1; } else { /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate); - submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate); + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); bDone = 1; } } - } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */ + } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - if(*pbShutdownImmediate) + if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); finalize_it: @@ -1007,17 +1010,46 @@ finalize_it: } + +/* The following function prepares a batch for processing, that it is + * reinitializes batch states, generates strings and does everything else + * that needs to be done in order to make the batch ready for submission to + * the actual output module. Note that we look at the precomputed + * filter OK condition and process only those messages, that actually matched + * the filter. + * rgerhards, 2010-06-14 + */ +static inline rsRetVal +prepareBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + batch_obj_t *pElem; + DEFiRet; + + pBatch->iDoneUpTo = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + pElem->state = BATCH_STATE_RDY; + prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp, + (uchar**) &(pElem->staticActParams), pElem->staticLenParams); + } + } + RETiRet; +} + + /* receive a batch and process it. This includes retry handling. * rgerhards, 2009-05-12 */ -static rsRetVal -processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +static inline rsRetVal +processAction(action_t *pAction, batch_t *pBatch) { DEFiRet; assert(pBatch != NULL); - pBatch->iDoneUpTo = 0; - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate)); +dbgprintf("ZZZ1: processAction\n"); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); iRet = finishBatch(pAction, pBatch); finalize_it: @@ -1033,10 +1065,18 @@ finalize_it: static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { + int *pbShutdownImmdtSave; DEFiRet; assert(pBatch != NULL); +dbgprintf("ZZZ1: processBatchMain\n"); + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; + pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate; + CHKiRet(prepareBatch(pAction, pBatch)); + +dbgprintf("ZZZ1: processBatchMain\n"); /* We now must guard the output module against execution by multiple threads. The * plugin interface specifies that output modules must not be thread-safe (except * if they notify us they are - functionality not yet implemented...). @@ -1045,10 +1085,19 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch, pbShutdownImmediate); + iRet = processAction(pAction, pBatch); + // + // DEBUG + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { +dbgprintf("ZZZ: after processBatchMain item %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state); + } + // END DEBUG pthread_cleanup_pop(1); /* unlock mutex */ +finalize_it: + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -1323,7 +1372,11 @@ doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg) } DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + //iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); finalize_it: RETiRet; @@ -1341,7 +1394,48 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) DEFiRet; DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + +//*** EXPERIMENTAL ***/ +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +rsRetVal +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + else { /* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { +dbgprintf("ZZZ: submitToActQ item %d:%s\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15); + pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + + // DEBUG + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { +dbgprintf("ZZZ: batch state after processing item %d: %d\n", i, pBatch->pElem[i].state); + } + } + // END DEBUG + } RETiRet; } diff --git a/doc/msgflow.txt b/doc/msgflow.txt index b53ba7e7..ebee18f8 100644 --- a/doc/msgflow.txt +++ b/doc/msgflow.txt @@ -16,7 +16,7 @@ syslogd.c/msgConsumeOne MsgSetRcvFromIPStr if NEEDS_PARSING: parser.ParseMsg -ruleset.ProcessMsg (loops through ruleset) +ruleset.ProcessBatch (loops through ruleset) ruleset.c/processMsgDoRules (for each rule in ruleset) rule.c/processMsg 1:rule.c/shouldProcessThisMessage diff --git a/runtime/batch.h b/runtime/batch.h index 1245df11..80621631 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -26,6 +26,7 @@ #ifndef BATCH_H_INCLUDED #define BATCH_H_INCLUDED +#include <string.h> #include "msg.h" /* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used @@ -76,6 +77,7 @@ struct batch_obj_s { * is completed (else, the whole process does not work correctly). */ struct batch_s { + int maxElem; /* maximum number of elements that this batch supports */ int nElem; /* actual number of element in this entry */ int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */ int iDoneUpTo; /* all messages below this index have state other than RDY */ @@ -92,15 +94,84 @@ batchSetSingleRuleset(batch_t *pBatch, sbool val) { pBatch->bSingleRuleset = val; } -/* get the batches ruleset */ +/* get the batches ruleset (if we have a single ruleset) */ static inline ruleset_t* batchGetRuleset(batch_t *pBatch) { return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL; } +/* get the ruleset of a specifc element of the batch (index not verified!) */ +static inline ruleset_t* +batchElemGetRuleset(batch_t *pBatch, int i) { + return ((msg_t*) pBatch->pElem[i].pUsrp)->pRuleset; +} + /* get number of msgs for this batch */ static inline int batchNumMsgs(batch_t *pBatch) { return pBatch->nElem; } + + +/* set the status of the i-th batch element. Note that once the status is + * DISC, it will never be reset. So this function can NOT be used to initialize + * the state table. -- rgerhards, 2010-06-10 + */ +static inline void +batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) { + if(pBatch->pElem[i].state != BATCH_STATE_DISC) + pBatch->pElem[i].state = newState; +} + + +/* check if an element is a valid entry. We do NOT verify if the + * element index is valid. -- rgerhards, 2010-06-10 + */ +static inline int +batchIsValidElem(batch_t *pBatch, int i) { + return(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC); +} + + +/* copy one batch element to another. + * This creates a complete duplicate in those cases where + * it is needed. Use duplication only when absolutely necessary! + * rgerhards, 2010-06-10 + */ +static inline void +batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) { + memcpy(pDest, pSrc, sizeof(batch_obj_t)); +} + + +/* free members of a batch "object". Note that we can not do the usual + * destruction as the object typically is allocated on the stack and so the + * object itself cannot be freed! -- rgerhards, 2010-06-15 + */ +static inline void +batchFree(batch_t *pBatch) { + int i; + int j; + for(i = 0 ; i < pBatch->maxElem ; ++i) { + for(j = 0 ; j < CONF_OMOD_NUMSTRINGS_BUFSIZE ; ++j) { + free(pBatch->pElem[i].staticActParams[j]); + } + } + free(pBatch->pElem); +} + + +/* initialiaze a batch "object". The record must already exist, + * we "just" initialize it. The max number of elements must be + * provided. -- rgerhards, 2010-06-15 + */ +static inline rsRetVal +batchInit(batch_t *pBatch, int maxElem) { + DEFiRet; + pBatch->maxElem = maxElem; + CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); + // TODO: replace calloc by inidividual writes? +finalize_it: + RETiRet; +} #endif /* #ifndef BATCH_H_INCLUDED */ diff --git a/runtime/queue.c b/runtime/queue.c index d437d590..5e9c67ca 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -841,6 +841,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batch_obj_t batchObj; DEFiRet; + //TODO: init batchObj (states _OK and new fields -- CHECK) ASSERT(pThis != NULL); /* calling the consumer is quite different here than it is from a worker thread */ @@ -861,6 +862,26 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) RETiRet; } +/*** EXPERIMENTAL ***/ +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + /* calling the consumer is quite different here than it is from a worker thread */ + /* we need to provide the consumer's return value back to the caller because in direct + * mode the consumer probably has a lot to convey (which get's lost in the other modes + * because they are asynchronous. But direct mode is deliberately synchronous. + * rgerhards, 2008-02-12 + * We use our knowledge about the batch_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 + */ + iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); + + RETiRet; +} + static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -1364,10 +1385,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { -dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { +dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); ++nEnqueued; @@ -1385,7 +1406,7 @@ dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem); iRet = DeleteBatchFromQStore(pThis, pBatch); - pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ + pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14 RETiRet; } @@ -1430,6 +1451,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* all well, use this element */ pWti->batch.pElem[nDequeued].pUsrp = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance ++nDequeued; } @@ -2273,6 +2295,21 @@ finalize_it: /* ------------------------------ END multi-enqueue functions ------------------------------ */ +/* enqueue a new user data element in direct mode + * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better + * code later on (like multi submit!) 2010-06-10 + * Enqueues the new element and awakes worker thread. + */ +rsRetVal +qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, qqueue); + iRet = qAddDirect(pThis, pUsr); + RETiRet; +} + + /* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ diff --git a/runtime/queue.h b/runtime/queue.h index 33b21c9a..1c758134 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -177,12 +177,14 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); +rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); diff --git a/runtime/rule.c b/runtime/rule.c index c28e15c9..453e631d 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -108,16 +108,19 @@ DEFFUNC_llExecFunc(processBatchDoActions) } #endif - // NEW (potentially): iRetMod = actionSubmit(pAction, (batch_t*) pParam); +#if 1 + // NEW (potentially): + iRetMod = doSubmitToActionQBatch(pAction, (batch_t*) pParam); +#else // old code -- milestone check -dbgprintf("ZZZ: inside processBatchDoActions, begin processing (nElem=%d)\n", batchNumMsgs(pBatch)); int i; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { -dbgprintf("ZZZ: inside processBatchDoActions, processind elem %d/%d\n", i, batchNumMsgs(pBatch)); +dbgprintf("ZZZ: inside processBatchDoActions, processing elem %d/%d\n", i, batchNumMsgs(pBatch)); if(pBatch->pElem[i].bFilterOK) { iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); } } +#endif //end old code #if 0 // TODO: this must be done inside the action as well! if(iRetMod == RS_RET_DISCARDMSG) { @@ -297,7 +300,7 @@ processBatch(rule_t *pThis, batch_t *pBatch) DEFiRet; ISOBJ_TYPE_assert(pThis, rule); - assert(pMsg != NULL); + assert(pBatch != NULL); /* first check the filters... */ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { diff --git a/runtime/ruleset.c b/runtime/ruleset.c index caeb9357..31c2e1a7 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -60,6 +60,9 @@ linkedList_t llRulesets; /* this is NOT a pointer - no typo here ;) */ ruleset_t *pCurrRuleset = NULL; /* currently "active" ruleset */ ruleset_t *pDfltRuleset = NULL; /* current default ruleset, e.g. for binding to actions which have no other */ +/* forward definitions */ +static rsRetVal processBatch(batch_t *pBatch); + /* ---------- linked-list key handling functions ---------- */ /* destructor for linked list keys. @@ -149,6 +152,69 @@ dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet); } + +/* This function is similar to processBatch(), but works on a batch that + * contains rules from multiple rulesets. In this case, we can not push + * the whole batch through the ruleset. Instead, we examine it and + * partition it into sub-rulesets which we then push through the system. + * Note that when we evaluate which message must be processed, we do NOT need + * to look at bFilterOK, because this value is only set in a later processing + * stage. Doing so caused a bug during development ;) + * rgerhards, 2010-06-15 + */ +static inline rsRetVal +processBatchMultiRuleset(batch_t *pBatch) +{ + ruleset_t *currRuleset; + batch_t snglRuleBatch; + int i; + int iStart; /* start index of partial batch */ + int iNew; /* index for new (temporary) batch */ + DEFiRet; + + CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); + snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; + +dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch->nElem); + while(1) { /* loop broken inside */ + /* search for first unprocessed element */ + for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart) + /* just search, no action */; + + if(iStart == pBatch->nElem) + FINALIZE; /* everything processed */ + + /* prepare temporary batch */ + currRuleset = batchElemGetRuleset(pBatch, iStart); + iNew = 0; + for(i = iStart ; i < pBatch->nElem ; ++i) { + if(batchElemGetRuleset(pBatch, i) == currRuleset) { +dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15); + batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i])); + /* We indicate the element also as done, so it will not be processed again */ + pBatch->pElem[i].state = BATCH_STATE_DISC; + } + } + snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ + batchSetSingleRuleset(&snglRuleBatch, 1); + /* process temp batch */ + processBatch(&snglRuleBatch); + +#if 0 +for(i = iStart ; i < pBatch->nElem ; ++i) { + dbgprintf("ZZZ: after partial execution item %d state %d\n", i, pBatch->pElem[i].state); +} +//dbgprintf("ZZZ: search item %d: state %d, bFilterOK %d, IsValid %d, msg:%s\n", +//iStart, pBatch->pElem[iStart].state, pBatch->pElem[iStart].bFilterOK, batchIsValidElem(pBatch, iStart), +//((msg_t*)(pBatch->pElem[iStart].pUsrp))->szRawMsg+40); +#endif + } + batchFree(&snglRuleBatch); + +finalize_it: + RETiRet; +} + /* Process (consume) a batch of messages. Calls the actions configured. * If the whole batch uses a singel ruleset, we can process the batch as * a whole. Otherwise, we need to process it slower, on a message-by-message @@ -162,6 +228,7 @@ processBatch(batch_t *pBatch) DEFiRet; assert(pBatch != NULL); +dbgprintf("ZZZ: processBatch: batch of %d elements must be processed\n", pBatch->nElem); if(pBatch->bSingleRuleset) { pThis = batchGetRuleset(pBatch); if(pThis == NULL) @@ -169,13 +236,7 @@ processBatch(batch_t *pBatch) ISOBJ_TYPE_assert(pThis, ruleset); CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch)); } else { - #warning implementation missing! - /* we need to split of the batch according to rulesets used */ - // TODO: do this at the deque level, much more performant! - assert(0); // TODO mandatory to implement! - dbgprintf("processbatch missing implementation, terminating!\n"); - printf("processBatch missing implementation, terminating!\n"); - exit(0); + CHKiRet(processBatchMultiRuleset(pBatch)); } finalize_it: diff --git a/runtime/wti.c b/runtime/wti.c index 307f1af1..9343f5c5 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -172,7 +172,7 @@ wtiCancelThrd(wti_t *pThis) BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) /* actual destruction */ - free(pThis->batch.pElem); + batchFree(&pThis->batch); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); @@ -204,7 +204,7 @@ wtiConstructFinalize(wti_t *pThis) /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); - CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t))); + CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); finalize_it: RETiRet; diff --git a/tests/Makefile.am b/tests/Makefile.am index 32765ae4..ba0bb7ba 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -34,8 +34,8 @@ TESTS = $(TESTRUNS) cfg.sh \ complex1.sh \ queue-persist.sh \ pipeaction.sh \ - pipe_noreader.sh \ execonlyonce.sh \ + pipe_noreader.sh \ dircreate_dflt.sh \ dircreate_off.sh \ queue-persist.sh diff --git a/tests/diag.sh b/tests/diag.sh index 5b74a6dc..efbf3315 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -8,10 +8,10 @@ #valgrind="valgrind --malloc-fill=ff --free-fill=fe --log-fd=1" #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" -#valgrind="valgrind --tool=exp-ptrcheck --log-fd=1" +valgrind="valgrind --tool=exp-ptrcheck --log-fd=1" #set -o xtrace -#export RSYSLOG_DEBUG="debug nostdout" -#export RSYSLOG_DEBUGLOG="log" +export RSYSLOG_DEBUG="debug nostdout" +export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason cp $srcdir/testsuites/diag-common.conf diag-common.conf diff --git a/tests/tcpflood.c b/tests/tcpflood.c index f93a87a2..9ed2dac9 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -5,6 +5,7 @@ * -t target address (default 127.0.0.1) * -p target port (default 13514) * -n number of target ports (targets are in range -p..(-p+-n-1) + * Note -c must also be set to at LEAST the number of -n! * -c number of connections (default 1) * -m number of messages to send (connection is random) * -i initial message number (optional) diff --git a/tools/syslogd.c b/tools/syslogd.c index 46587a27..9b7b77ab 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -622,17 +622,6 @@ chkMsgAgainstACL() { #endif -/* consumes a single messages - this function is primarily used to shuffle - * out some code from msgConsumer(). After this function, the message is - * (by definition!) considered committed. - * rgerhards, 2009-11-16 - */ -///static inline rsRetVal -///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { - ///DEFiRet; - //////RETiRet; -///} - /* preprocess a batch of messages, that is ready them for actual processing. This is done * as a first stage and totally in parallel to any other worker active in the system. So * it helps us keep up the overall concurrency level. @@ -708,8 +697,8 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu assert(pBatch != NULL); pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */ preprocessBatch(pBatch); +//pBatch->bSingleRuleset = 0; // TODO: testing aid, remove!!!! ruleset.ProcessBatch(pBatch); -dbgprintf("ZZZ: back in msgConsumer\n"); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 int i; |