diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 101 |
1 files changed, 89 insertions, 12 deletions
@@ -280,6 +280,29 @@ actionConstructFinalize(action_t *pThis) /* find a name for our queue */ snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + /* how about bWriteAllMarkMsgs??? */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->f_ReduceRepeated + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, " + "ReduceRepeated=%d, " + "iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, + pThis->iSecsExecOnceInterval + ); + pThis->bSubmitFirehoseMode = 0; + } else { + pThis->bSubmitFirehoseMode = 1; + } + /* we need to make a safety check: if the queue is NOT in direct mode, a single * message object may be accessed by multiple threads. As such, we need to enable * msg object thread safety in this case (this costs a bit performance and thus @@ -633,6 +656,7 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); + dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode); dbgprintf("\n"); RETiRet; @@ -642,7 +666,7 @@ rsRetVal actionDbgPrint(action_t *pThis) /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ -static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs) { int i; DEFiRet; @@ -653,7 +677,8 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); + //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); break; case ACT_ARRAY_PASSING: CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i]))); @@ -722,16 +747,32 @@ static rsRetVal cleanupDoActionParams(action_t *pAction) rsRetVal actionCallDoAction(action_t *pThis, msg_t *pMsg) { + uchar *ppMsgs[10]; + size_t lenMsgs[10]; + int i; DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); + for(i = 0 ; i < 10 ; ++ i) { + ppMsgs[i] = NULL; + lenMsgs[i] = 0; + } + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - CHKiRet(prepareDoActionParams(pThis, pMsg)); + CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); +#if 1 +d_pthread_mutex_lock(&pThis->mutActExec); +pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); + iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); +pthread_cleanup_pop(1); /* unlock mutex */ + //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); +#else +iRet = RS_RET_OK; +#endif switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -762,6 +803,9 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) finalize_it: cleanupDoActionParams(pThis); /* iRet ignored! */ +for(i = 0 ; i < 10 ; ++i) + free(ppMsgs[i]); + RETiRet; } @@ -778,7 +822,6 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg) ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); -RUNLOG_STR("inside actionProcessMsg()"); CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg)); @@ -1008,12 +1051,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) * if they notify us they are - functionality not yet implemented...). * rgerhards, 2008-01-30 */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); +// d_pthread_mutex_lock(&pAction->mutActExec); +// pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); iRet = processAction(pAction, pBatch, pbShutdownImmediate); - pthread_cleanup_pop(1); /* unlock mutex */ +// pthread_cleanup_pop(1); /* unlock mutex */ RETiRet; } @@ -1274,6 +1317,40 @@ finalize_it: } + +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + +#if 0 // TODO: we need to care about this -- after PoC 2010-06-08 + /* don't output marks to recently written outputs */ + if(pAction->bWriteAllMarkMsgs == FALSE + && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { + ABORT_FINALIZE(RS_RET_OK); + } +#endif + + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + +#if 0 // we would need this for bWriteAllMarkMsgs + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ + pAction->f_time = pAction->f_pMsg->ttGenTime; + +#endif + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + + /* call the configured action. Does all necessary housekeeping. * rgerhards, 2007-08-01 * FYI: currently, this function is only called from the queue @@ -1293,15 +1370,15 @@ actionCallAction(action_t *pAction, msg_t *pMsg) /* We need to lock the mutex only for repeated line processing. * rgerhards, 2009-06-19 */ - //if(pAction->f_ReduceRepeated == 1) { + if(pAction->bSubmitFirehoseMode == 1) { + iRet = doSubmitToActionQ(pAction, pMsg); + } else { LockObj(pAction); pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); iRet = doActionCallAction(pAction, pMsg); UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - //} else { - //iRet = doActionCallAction(pAction, pMsg); - //} + } RETiRet; } |