diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 175 |
1 files changed, 101 insertions, 74 deletions
@@ -52,9 +52,9 @@ /* forward definitions */ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); -static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg); -static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -260,7 +260,6 @@ actionConstructFinalize(action_t *pThis) * not used. Thankfully, that is usually the case. The benefit of firehose * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 */ - /* how about bWriteAllMarkMsgs??? */ if( pThis->iExecEveryNthOccur > 1 || pThis->f_ReduceRepeated || pThis->iSecsExecOnceInterval @@ -272,13 +271,13 @@ actionConstructFinalize(action_t *pThis) pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, pThis->iSecsExecOnceInterval ); - pThis->submitToActQ = actionCallAction; + pThis->submitToActQ = doSubmitToActionQComplexBatch; } else if(pThis->bWriteAllMarkMsgs == FALSE) { /* nearly full-speed submission mode, default case */ - pThis->submitToActQ = doSubmitToActionQNotAllMark; + pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; } else { /* full firehose submission mode */ - pThis->submitToActQ = doSubmitToActionQ; + pThis->submitToActQ = doSubmitToActionQBatch; } /* we need to make a safety check: if the queue is NOT in direct mode, a single @@ -635,11 +634,11 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); - if(pThis->submitToActQ == actionCallAction) { + if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; - } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) { + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMarkBatch) { sz = "fast, but supports partial mark messages"; - } else if(pThis->submitToActQ == doSubmitToActionQ) { + } else if(pThis->submitToActQ == doSubmitToActionQBatch) { sz = "firehose (fastest)"; } else { sz = "unknown (need to update debug display?)"; @@ -880,7 +879,6 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) assert(pBatch != NULL); assert(pnElem != NULL); -dbgprintf("ZZZ1: tryDoAction, nElem %d, iDoneUpto %d\n", *pnElem, pBatch->iDoneUpTo); i = pBatch->iDoneUpTo; /* all messages below that index are processed */ iElemProcessed = 0; iCommittedUpTo = i; @@ -943,7 +941,6 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) rsRetVal localRet; DEFiRet; -dbgprintf("ZZZ1: submitBatch, nElem %d\n", nElem); assert(pBatch != NULL); bDone = 0; @@ -1032,7 +1029,6 @@ processAction(action_t *pAction, batch_t *pBatch) DEFiRet; assert(pBatch != NULL); -dbgprintf("ZZZ1: processAction\n"); CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); iRet = finishBatch(pAction, pBatch); @@ -1054,13 +1050,11 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) assert(pBatch != NULL); -dbgprintf("ZZZ1: processBatchMain\n"); pbShutdownImmdtSave = pBatch->pbShutdownImmediate; pBatch->pbShutdownImmediate = pbShutdownImmediate; pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate; CHKiRet(prepareBatch(pAction, pBatch)); -dbgprintf("ZZZ1: processBatchMain\n"); /* We now must guard the output module against execution by multiple threads. The * plugin interface specifies that output modules must not be thread-safe (except * if they notify us they are - functionality not yet implemented...). @@ -1070,13 +1064,6 @@ dbgprintf("ZZZ1: processBatchMain\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); iRet = processAction(pAction, pBatch); - // - // DEBUG - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { -dbgprintf("ZZZ: after processBatchMain item %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state); - } - // END DEBUG pthread_cleanup_pop(1); /* unlock mutex */ @@ -1273,7 +1260,7 @@ finalize_it: /* helper to actonCallAction, mostly needed because of this damn * pthread_cleanup_push() POSIX macro... */ -static rsRetVal +static inline rsRetVal doActionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; @@ -1328,72 +1315,104 @@ finalize_it: RETiRet; } - -/* This submits the message to the action queue in case where we need to handle - * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop - * for the synchronization. +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. * rgerhards, 2010-06-08 */ -static rsRetVal -doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg) +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; - time_t now; - time_t lastAct; - if(pMsg->msgFlags & MARK) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if((now - lastAct) < MarkInterval / 2) { - DBGPRINTF("file was recently written, ignoring mark message\n"); - ABORT_FINALIZE(RS_RET_OK); - } - } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0); - } - - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); else iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); - //iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); -finalize_it: RETiRet; } -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. + +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. Here, we just modify the filter condition to be false when + * a mark message must not be written. However, in this case we must save the previous + * filter as we may need it in the next action (potential future optimization: check if this is + * the last action TODO). * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) { + time_t now = 0; + time_t lastAct; + int i; + int bProcessMarkMsgs; + int bModifiedFilter; + sbool FilterSave[128]; + sbool *pFilterSave; DEFiRet; - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); - else - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { + pFilterSave = FilterSave; + } else { + CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + } + + bModifiedFilter = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + pFilterSave[i] = pBatch->pElem[i].bFilterOK; + if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { + /* check if we need to write or not */ + if(now == 0) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if((now - lastAct) < MarkInterval / 2) { + DBGPRINTF("action was recently called, ignoring mark message\n"); + bProcessMarkMsgs = 0; + } else { + bProcessMarkMsgs = 1; + } + } while(ATOMIC_CAS(&pAction->f_time, lastAct, ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, ADDME) == 0); + } + if(bProcessMarkMsgs) { + pBatch->pElem[i].bFilterOK = 0; + bModifiedFilter = 1; + } + } + } + + DBGPRINTF("Called action(NotAllMark), logging to %s\n", module.GetStateName(pAction->pMod)); + + iRet = doSubmitToActionQBatch(pAction, pBatch); + + if(bModifiedFilter) { + /* in this case, we need to restore previous state */ + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + pBatch->pElem[i].bFilterOK = pFilterSave[i]; + } + } + +finalize_it: + if(pFilterSave != FilterSave) + free(pFilterSave); RETiRet; } -//*** EXPERIMENTAL ***/ /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing * and thus speed. * rgerhards, 2010-06-08 */ -rsRetVal +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) { int i; @@ -1407,39 +1426,48 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) */ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { if(pBatch->pElem[i].bFilterOK) { -dbgprintf("ZZZ: submitToActQ item %d:%s\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15); - pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); } } + } - // DEBUG - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(pBatch->pElem[i].bFilterOK) { -dbgprintf("ZZZ: batch state after processing item %d: %d\n", i, pBatch->pElem[i].state); - } + RETiRet; +} + + + +/* Helper to submit a batch of actions to the engine. Note that we have rather + * complicated processing here, so we need to do this one message after another. + * rgerhards, 2010-06-23 + */ +static inline rsRetVal +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); } - // END DEBUG } RETiRet; } - /* Call configured action, most complex case with all features supported (and thus slow). * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -actionCallAction(action_t *pAction, msg_t *pMsg) +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) { DEFiRet; - ISOBJ_TYPE_assert(pMsg, msg); - ASSERT(pAction != NULL); - LockObj(pAction); pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - iRet = doActionCallAction(pAction, pMsg); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ @@ -1447,7 +1475,6 @@ actionCallAction(action_t *pAction, msg_t *pMsg) } #pragma GCC diagnostic warning "-Wempty-body" - /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. |