diff options
-rw-r--r-- | action.c | 101 | ||||
-rw-r--r-- | action.h | 2 | ||||
-rw-r--r-- | doc/msgflow.txt | 56 |
3 files changed, 146 insertions, 13 deletions
@@ -280,6 +280,29 @@ actionConstructFinalize(action_t *pThis) /* find a name for our queue */ snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + /* how about bWriteAllMarkMsgs??? */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->f_ReduceRepeated + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, " + "ReduceRepeated=%d, " + "iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, + pThis->iSecsExecOnceInterval + ); + pThis->bSubmitFirehoseMode = 0; + } else { + pThis->bSubmitFirehoseMode = 1; + } + /* we need to make a safety check: if the queue is NOT in direct mode, a single * message object may be accessed by multiple threads. As such, we need to enable * msg object thread safety in this case (this costs a bit performance and thus @@ -633,6 +656,7 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); + dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode); dbgprintf("\n"); RETiRet; @@ -642,7 +666,7 @@ rsRetVal actionDbgPrint(action_t *pThis) /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ -static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs) { int i; DEFiRet; @@ -653,7 +677,8 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); + //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); break; case ACT_ARRAY_PASSING: CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i]))); @@ -722,16 +747,32 @@ static rsRetVal cleanupDoActionParams(action_t *pAction) rsRetVal actionCallDoAction(action_t *pThis, msg_t *pMsg) { + uchar *ppMsgs[10]; + size_t lenMsgs[10]; + int i; DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); + for(i = 0 ; i < 10 ; ++ i) { + ppMsgs[i] = NULL; + lenMsgs[i] = 0; + } + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - CHKiRet(prepareDoActionParams(pThis, pMsg)); + CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); +#if 1 +d_pthread_mutex_lock(&pThis->mutActExec); +pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); + iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); +pthread_cleanup_pop(1); /* unlock mutex */ + //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); +#else +iRet = RS_RET_OK; +#endif switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -762,6 +803,9 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) finalize_it: cleanupDoActionParams(pThis); /* iRet ignored! */ +for(i = 0 ; i < 10 ; ++i) + free(ppMsgs[i]); + RETiRet; } @@ -778,7 +822,6 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg) ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); -RUNLOG_STR("inside actionProcessMsg()"); CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg)); @@ -1008,12 +1051,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) * if they notify us they are - functionality not yet implemented...). * rgerhards, 2008-01-30 */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); +// d_pthread_mutex_lock(&pAction->mutActExec); +// pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); iRet = processAction(pAction, pBatch, pbShutdownImmediate); - pthread_cleanup_pop(1); /* unlock mutex */ +// pthread_cleanup_pop(1); /* unlock mutex */ RETiRet; } @@ -1274,6 +1317,40 @@ finalize_it: } + +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + +#if 0 // TODO: we need to care about this -- after PoC 2010-06-08 + /* don't output marks to recently written outputs */ + if(pAction->bWriteAllMarkMsgs == FALSE + && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { + ABORT_FINALIZE(RS_RET_OK); + } +#endif + + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + +#if 0 // we would need this for bWriteAllMarkMsgs + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ + pAction->f_time = pAction->f_pMsg->ttGenTime; + +#endif + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + + /* call the configured action. Does all necessary housekeeping. * rgerhards, 2007-08-01 * FYI: currently, this function is only called from the queue @@ -1293,15 +1370,15 @@ actionCallAction(action_t *pAction, msg_t *pMsg) /* We need to lock the mutex only for repeated line processing. * rgerhards, 2009-06-19 */ - //if(pAction->f_ReduceRepeated == 1) { + if(pAction->bSubmitFirehoseMode == 1) { + iRet = doSubmitToActionQ(pAction, pMsg); + } else { LockObj(pAction); pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); iRet = doActionCallAction(pAction, pMsg); UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - //} else { - //iRet = doActionCallAction(pAction, pMsg); - //} + } RETiRet; } @@ -69,6 +69,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ + sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */ short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */ int f_prevcount; /* repetition cnt of prevline */ int f_repeatcount; /* number of "repeated" msgs */ @@ -86,7 +87,6 @@ struct action_s { pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ uchar *pszName; /* action name (for documentation) */ int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */ - //uchar **ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ size_t *lenMsgs; /* length of message in ppMsgs */ }; diff --git a/doc/msgflow.txt b/doc/msgflow.txt new file mode 100644 index 00000000..c1c440ef --- /dev/null +++ b/doc/msgflow.txt @@ -0,0 +1,56 @@ +flow of messages (in terms of functions) after they have +been pulled off the main queue. + +Functions are listed in the order they are (usually) called +if there are branches in processing flow, this is explicitely +stated. + +as of: 2010-06-08, master branch (v5) + +syslogd.c/msgConsumer +syslogd.c/msgConsumeOne + if ACLcheck needed: + net.cvthname, + net.isAllowedSinder2 + MsgSetRcvFromStr + MsgSetRcvFromIPStr + if NEEDS_PARSING: + parser.ParseMsg +ruleset.ProcessMsg (loops through ruleset) +ruleset.c/processMsgDoRules (for each rule in ruleset) +rule.c/ProcessMsg +rule.c/shouldProcessThisMessage + (evaluates filters, optimize via ALL-Filter) +if to be processed, loop through associated actions -> +rule.c/processMsgsDoAction +action.c/actionCallAction (LOCKs action object!) +action.c/doActionCallAction (does duplicate message reduction) +action.c/actionWriteToAction + limits based on iExecEveryNthOccur + generates "message repeated..." string if necessary + limits based on iSecsExecOnceInterval +! **qqueueEnqObj** + This means, we are done processing the action at this + stage. The queue may run async, but usually does not + do so (in default settings). + + +Now looking at processing of the action queue. If the queue is +in direct mode, remember that the action object is still +be locked (this may also be a potential bug in non-direct mode, as +it looks like we need this prequisite!). + +action.c/processBatchMain (queue Consumer, LOOK mutActExec) +action.c/processAction + (calls finishBatch at the end, but not so important + for our analysis) +action.c/submitBatch (recursive submit/retry loop for messages) +action.c/tryDoAction (submits a [potentially partial] batch) +action.c/actionProcessMessage + (action.c/actionPrepare (utility to set status/TX mode)) +action.c/actionCallDoAction +1: action.c/prepareDoActionParams +1: template.c/tplToString-tplToArray + string buffer is cached in action object +2:<output Module>/doAction + |