diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 186 |
1 files changed, 140 insertions, 46 deletions
@@ -585,7 +585,7 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static rsRetVal actionPrepare(action_t *pThis) +static inline rsRetVal actionPrepare(action_t *pThis) { DEFiRet; @@ -665,7 +665,6 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **pp for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); break; case ACT_ARRAY_PASSING: @@ -726,29 +725,23 @@ static rsRetVal cleanupDoActionParams(action_t *pAction, uchar **ppMsgs) * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg) +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) { - uchar *ppMsgs[10]; - size_t lenMsgs[10]; int i; DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - for(i = 0 ; i < 10 ; ++ i) { - ppMsgs[i] = NULL; - lenMsgs[i] = 0; - } - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs)); + //CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs)); pThis->bHadAutoCommit = 0; #if 1 //d_pthread_mutex_lock(&pThis->mutActExec); //pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); - iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); + // original: iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); //pthread_cleanup_pop(1); /* unlock mutex */ //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); #else @@ -783,6 +776,7 @@ iRet = RS_RET_OK; finalize_it: +#if 0 // THIS NEEDS TO BE DONE TO THE BATCH! switch(pThis->eParamPassing) { case ACT_STRING_PASSING: for(i = 0 ; i < 10 ; ++i) @@ -795,6 +789,7 @@ finalize_it: /* nothing to do in that case */ break; } +#endif RETiRet; @@ -805,8 +800,8 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg) +static inline rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams) { DEFiRet; @@ -815,7 +810,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg) CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg)); + CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); iRet = getReturnCode(pThis); finalize_it: @@ -835,8 +830,11 @@ finishBatch(action_t *pThis, batch_t *pBatch) ASSERT(pThis != NULL); - if(pThis->eState == ACT_STATE_RDY) +dbgprintf("ZZZ: finishBatch called, eState %d\n", pThis->eState); + if(pThis->eState == ACT_STATE_RDY) { + /* we just need to flag the batch as commited */ FINALIZE; /* nothing to do */ + } CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) { @@ -846,7 +844,8 @@ finishBatch(action_t *pThis, batch_t *pBatch) actionCommitted(pThis); /* flag messages as committed */ for(i = 0 ; i < pBatch->nElem ; ++i) { - pBatch->pElem[i].state = BATCH_STATE_COMM; + batchSetElemState(pBatch, i, BATCH_STATE_COMM); +dbgprintf("ZZZ: finishBatch commits element %d\n", i); } break; case RS_RET_SUSPENDED: @@ -881,8 +880,8 @@ finalize_it: /* try to submit a partial batch of elements. * rgerhards, 2009-05-12 */ -static rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate) +static inline rsRetVal +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) { int i; int iElemProcessed; @@ -894,17 +893,22 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme assert(pBatch != NULL); assert(pnElem != NULL); +dbgprintf("ZZZ1: tryDoAction, nElem %d, iDoneUpto %d\n", *pnElem, pBatch->iDoneUpTo); i = pBatch->iDoneUpTo; /* all messages below that index are processed */ iElemProcessed = 0; iCommittedUpTo = i; - pAction->pbShutdownImmediate = pbShutdownImmediate; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*pbShutdownImmediate) + if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - if(pBatch->pElem[i].state != BATCH_STATE_DISC) { - localRet = actionProcessMessage(pAction, pMsg); +dbgprintf("ZZZ1: tryDoAction loop %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state); + if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) { +dbgprintf("ZZZ1: trying to execute\n"); + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams); DBGPRINTF("action call returned %d\n", localRet); + /* Note: we directly modify the batch object state, because we know that + * wo do not overwrite DISC indicators! + */ if(localRet == RS_RET_OK) { /* mark messages as committed */ while(iCommittedUpTo <= i) { @@ -931,9 +935,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme } finalize_it: - if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) { - iRet = RS_RET_DISCARDMSG; - } else if(pBatch->iDoneUpTo != iCommittedUpTo) { + if(pBatch->iDoneUpTo != iCommittedUpTo) { *pnElem += iCommittedUpTo - pBatch->iDoneUpTo; pBatch->iDoneUpTo = iCommittedUpTo; } @@ -944,21 +946,24 @@ finalize_it: /* submit a batch for actual action processing. * The first nElem elements are processed. This function calls itself * recursively if it needs to handle errors. + * Note: we don't need the number of the first message to be processed as a parameter, + * because this is kept track of inside the batch itself (iDoneUpTo). * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) { int i; int bDone; rsRetVal localRet; DEFiRet; +dbgprintf("ZZZ1: submitBatch, nElem %d\n", nElem); assert(pBatch != NULL); bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); + localRet = tryDoAction(pAction, pBatch, &nElem); if(localRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } @@ -968,38 +973,36 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi /* try commit transaction, once done, we can simply do so as if * that return state was returned from tryDoAction(). */ - localRet = finishBatch(pAction, pBatch); // TODO: careful, do we need the elem counter? + localRet = finishBatch(pAction, pBatch); } if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { bDone = 1; - } else if(localRet == RS_RET_DISCARDMSG) { - iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */ - bDone = 1; } else if(localRet == RS_RET_SUSPENDED) { ; /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { /* in this case, the whole batch can not be processed */ for(i = 0 ; i < nElem ; ++i) { - pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD; +dbgprintf("ZZZ2: setting batch state for item %d\n", i); + batchSetElemState(pBatch, i, BATCH_STATE_BAD); } bDone = 1; } else { if(nElem == 1) { - pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD; + batchSetElemState(pBatch, i, BATCH_STATE_BAD); bDone = 1; } else { /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate); - submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate); + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); bDone = 1; } } - } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */ + } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - if(*pbShutdownImmediate) + if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); finalize_it: @@ -1007,17 +1010,46 @@ finalize_it: } + +/* The following function prepares a batch for processing, that it is + * reinitializes batch states, generates strings and does everything else + * that needs to be done in order to make the batch ready for submission to + * the actual output module. Note that we look at the precomputed + * filter OK condition and process only those messages, that actually matched + * the filter. + * rgerhards, 2010-06-14 + */ +static inline rsRetVal +prepareBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + batch_obj_t *pElem; + DEFiRet; + + pBatch->iDoneUpTo = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + pElem->state = BATCH_STATE_RDY; + prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp, + (uchar**) &(pElem->staticActParams), pElem->staticLenParams); + } + } + RETiRet; +} + + /* receive a batch and process it. This includes retry handling. * rgerhards, 2009-05-12 */ -static rsRetVal -processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +static inline rsRetVal +processAction(action_t *pAction, batch_t *pBatch) { DEFiRet; assert(pBatch != NULL); - pBatch->iDoneUpTo = 0; - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate)); +dbgprintf("ZZZ1: processAction\n"); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); iRet = finishBatch(pAction, pBatch); finalize_it: @@ -1033,10 +1065,18 @@ finalize_it: static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { + int *pbShutdownImmdtSave; DEFiRet; assert(pBatch != NULL); +dbgprintf("ZZZ1: processBatchMain\n"); + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; + pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate; + CHKiRet(prepareBatch(pAction, pBatch)); + +dbgprintf("ZZZ1: processBatchMain\n"); /* We now must guard the output module against execution by multiple threads. The * plugin interface specifies that output modules must not be thread-safe (except * if they notify us they are - functionality not yet implemented...). @@ -1045,10 +1085,19 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch, pbShutdownImmediate); + iRet = processAction(pAction, pBatch); + // + // DEBUG + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { +dbgprintf("ZZZ: after processBatchMain item %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state); + } + // END DEBUG pthread_cleanup_pop(1); /* unlock mutex */ +finalize_it: + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -1323,7 +1372,11 @@ doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg) } DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + //iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); finalize_it: RETiRet; @@ -1341,7 +1394,48 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) DEFiRet; DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + +//*** EXPERIMENTAL ***/ +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +rsRetVal +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + else { /* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { +dbgprintf("ZZZ: submitToActQ item %d:%s\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15); + pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + + // DEBUG + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { +dbgprintf("ZZZ: batch state after processing item %d: %d\n", i, pBatch->pElem[i].state); + } + } + // END DEBUG + } RETiRet; } |