summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c101
-rw-r--r--action.h2
-rw-r--r--doc/msgflow.txt56
3 files changed, 146 insertions, 13 deletions
diff --git a/action.c b/action.c
index 0fac6cf8..b055ebf4 100644
--- a/action.c
+++ b/action.c
@@ -280,6 +280,29 @@ actionConstructFinalize(action_t *pThis)
/* find a name for our queue */
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ /* now check if we can run the action in "firehose mode" during stage one of
+ * its processing (that is before messages are enqueued into the action q).
+ * This is only possible if some features, which require strict sequence, are
+ * not used. Thankfully, that is usually the case. The benefit of firehose
+ * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
+ */
+ /* how about bWriteAllMarkMsgs??? */
+ if( pThis->iExecEveryNthOccur > 1
+ || pThis->f_ReduceRepeated
+ || pThis->iSecsExecOnceInterval
+ ) {
+ DBGPRINTF("info: firehose mode disabled for action because "
+ "iExecEveryNthOccur=%d, "
+ "ReduceRepeated=%d, "
+ "iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
+ pThis->iSecsExecOnceInterval
+ );
+ pThis->bSubmitFirehoseMode = 0;
+ } else {
+ pThis->bSubmitFirehoseMode = 1;
+ }
+
/* we need to make a safety check: if the queue is NOT in direct mode, a single
* message object may be accessed by multiple threads. As such, we need to enable
* msg object thread safety in this case (this costs a bit performance and thus
@@ -633,6 +656,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
+ dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
dbgprintf("\n");
RETiRet;
@@ -642,7 +666,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs)
{
int i;
DEFiRet;
@@ -653,7 +677,8 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
break;
case ACT_ARRAY_PASSING:
CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i])));
@@ -722,16 +747,32 @@ static rsRetVal cleanupDoActionParams(action_t *pAction)
rsRetVal
actionCallDoAction(action_t *pThis, msg_t *pMsg)
{
+ uchar *ppMsgs[10];
+ size_t lenMsgs[10];
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
+ for(i = 0 ; i < 10 ; ++ i) {
+ ppMsgs[i] = NULL;
+ lenMsgs[i] = 0;
+ }
+
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
- CHKiRet(prepareDoActionParams(pThis, pMsg));
+ CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#if 1
+d_pthread_mutex_lock(&pThis->mutActExec);
+pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
+ iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+pthread_cleanup_pop(1); /* unlock mutex */
+ //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#else
+iRet = RS_RET_OK;
+#endif
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -762,6 +803,9 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
finalize_it:
cleanupDoActionParams(pThis); /* iRet ignored! */
+for(i = 0 ; i < 10 ; ++i)
+ free(ppMsgs[i]);
+
RETiRet;
}
@@ -778,7 +822,6 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg)
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
-RUNLOG_STR("inside actionProcessMsg()");
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg));
@@ -1008,12 +1051,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
* if they notify us they are - functionality not yet implemented...).
* rgerhards, 2008-01-30
*/
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+// d_pthread_mutex_lock(&pAction->mutActExec);
+// pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
iRet = processAction(pAction, pBatch, pbShutdownImmediate);
- pthread_cleanup_pop(1); /* unlock mutex */
+// pthread_cleanup_pop(1); /* unlock mutex */
RETiRet;
}
@@ -1274,6 +1317,40 @@ finalize_it:
}
+
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+static inline rsRetVal
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+
+#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
+ /* don't output marks to recently written outputs */
+ if(pAction->bWriteAllMarkMsgs == FALSE
+ && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+#endif
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+
+#if 0 // we would need this for bWriteAllMarkMsgs
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
+ pAction->f_time = pAction->f_pMsg->ttGenTime;
+
+#endif
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
+
/* call the configured action. Does all necessary housekeeping.
* rgerhards, 2007-08-01
* FYI: currently, this function is only called from the queue
@@ -1293,15 +1370,15 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
/* We need to lock the mutex only for repeated line processing.
* rgerhards, 2009-06-19
*/
- //if(pAction->f_ReduceRepeated == 1) {
+ if(pAction->bSubmitFirehoseMode == 1) {
+ iRet = doSubmitToActionQ(pAction, pMsg);
+ } else {
LockObj(pAction);
pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
iRet = doActionCallAction(pAction, pMsg);
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- //} else {
- //iRet = doActionCallAction(pAction, pMsg);
- //}
+ }
RETiRet;
}
diff --git a/action.h b/action.h
index ab7dfec7..43e6ae7d 100644
--- a/action.h
+++ b/action.h
@@ -69,6 +69,7 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
+ sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */
short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
int f_prevcount; /* repetition cnt of prevline */
int f_repeatcount; /* number of "repeated" msgs */
@@ -86,7 +87,6 @@ struct action_s {
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
uchar *pszName; /* action name (for documentation) */
int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */
- //uchar **ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
};
diff --git a/doc/msgflow.txt b/doc/msgflow.txt
new file mode 100644
index 00000000..c1c440ef
--- /dev/null
+++ b/doc/msgflow.txt
@@ -0,0 +1,56 @@
+flow of messages (in terms of functions) after they have
+been pulled off the main queue.
+
+Functions are listed in the order they are (usually) called
+if there are branches in processing flow, this is explicitely
+stated.
+
+as of: 2010-06-08, master branch (v5)
+
+syslogd.c/msgConsumer
+syslogd.c/msgConsumeOne
+ if ACLcheck needed:
+ net.cvthname,
+ net.isAllowedSinder2
+ MsgSetRcvFromStr
+ MsgSetRcvFromIPStr
+ if NEEDS_PARSING:
+ parser.ParseMsg
+ruleset.ProcessMsg (loops through ruleset)
+ruleset.c/processMsgDoRules (for each rule in ruleset)
+rule.c/ProcessMsg
+rule.c/shouldProcessThisMessage
+ (evaluates filters, optimize via ALL-Filter)
+if to be processed, loop through associated actions ->
+rule.c/processMsgsDoAction
+action.c/actionCallAction (LOCKs action object!)
+action.c/doActionCallAction (does duplicate message reduction)
+action.c/actionWriteToAction
+ limits based on iExecEveryNthOccur
+ generates "message repeated..." string if necessary
+ limits based on iSecsExecOnceInterval
+! **qqueueEnqObj**
+ This means, we are done processing the action at this
+ stage. The queue may run async, but usually does not
+ do so (in default settings).
+
+
+Now looking at processing of the action queue. If the queue is
+in direct mode, remember that the action object is still
+be locked (this may also be a potential bug in non-direct mode, as
+it looks like we need this prequisite!).
+
+action.c/processBatchMain (queue Consumer, LOOK mutActExec)
+action.c/processAction
+ (calls finishBatch at the end, but not so important
+ for our analysis)
+action.c/submitBatch (recursive submit/retry loop for messages)
+action.c/tryDoAction (submits a [potentially partial] batch)
+action.c/actionProcessMessage
+ (action.c/actionPrepare (utility to set status/TX mode))
+action.c/actionCallDoAction
+1: action.c/prepareDoActionParams
+1: template.c/tplToString-tplToArray
+ string buffer is cached in action object
+2:<output Module>/doAction
+