summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-08 12:46:24 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-08 12:46:24 +0200
commit3e49a1075ab6750135e1a38cf0c213579fa30b4a (patch)
treec986e10161f93f751a1d6a116cf53c1080844450
parent220c57e7ebc49a56cc91fa31308b1563f83a95fb (diff)
downloadrsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.tar.gz
rsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.tar.xz
rsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.zip
performance enhancement: implemented stage 1 firehose mode for actions
... plus some other tests, namely string generation in parallel to action processing. The code is not yet solid and not fully compatible to older versions. But it is good enough for an early commit and some early testing/gaining of experience. The optimization was done based on the fine-grained partitioning paradigm worked on the past couple of weeks -- seems to work out really great :)
-rw-r--r--action.c101
-rw-r--r--action.h2
-rw-r--r--doc/msgflow.txt56
3 files changed, 146 insertions, 13 deletions
diff --git a/action.c b/action.c
index 0fac6cf8..b055ebf4 100644
--- a/action.c
+++ b/action.c
@@ -280,6 +280,29 @@ actionConstructFinalize(action_t *pThis)
/* find a name for our queue */
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ /* now check if we can run the action in "firehose mode" during stage one of
+ * its processing (that is before messages are enqueued into the action q).
+ * This is only possible if some features, which require strict sequence, are
+ * not used. Thankfully, that is usually the case. The benefit of firehose
+ * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
+ */
+ /* how about bWriteAllMarkMsgs??? */
+ if( pThis->iExecEveryNthOccur > 1
+ || pThis->f_ReduceRepeated
+ || pThis->iSecsExecOnceInterval
+ ) {
+ DBGPRINTF("info: firehose mode disabled for action because "
+ "iExecEveryNthOccur=%d, "
+ "ReduceRepeated=%d, "
+ "iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
+ pThis->iSecsExecOnceInterval
+ );
+ pThis->bSubmitFirehoseMode = 0;
+ } else {
+ pThis->bSubmitFirehoseMode = 1;
+ }
+
/* we need to make a safety check: if the queue is NOT in direct mode, a single
* message object may be accessed by multiple threads. As such, we need to enable
* msg object thread safety in this case (this costs a bit performance and thus
@@ -633,6 +656,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
+ dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
dbgprintf("\n");
RETiRet;
@@ -642,7 +666,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs)
{
int i;
DEFiRet;
@@ -653,7 +677,8 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
break;
case ACT_ARRAY_PASSING:
CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i])));
@@ -722,16 +747,32 @@ static rsRetVal cleanupDoActionParams(action_t *pAction)
rsRetVal
actionCallDoAction(action_t *pThis, msg_t *pMsg)
{
+ uchar *ppMsgs[10];
+ size_t lenMsgs[10];
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
+ for(i = 0 ; i < 10 ; ++ i) {
+ ppMsgs[i] = NULL;
+ lenMsgs[i] = 0;
+ }
+
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
- CHKiRet(prepareDoActionParams(pThis, pMsg));
+ CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#if 1
+d_pthread_mutex_lock(&pThis->mutActExec);
+pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
+ iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+pthread_cleanup_pop(1); /* unlock mutex */
+ //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#else
+iRet = RS_RET_OK;
+#endif
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -762,6 +803,9 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
finalize_it:
cleanupDoActionParams(pThis); /* iRet ignored! */
+for(i = 0 ; i < 10 ; ++i)
+ free(ppMsgs[i]);
+
RETiRet;
}
@@ -778,7 +822,6 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg)
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
-RUNLOG_STR("inside actionProcessMsg()");
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg));
@@ -1008,12 +1051,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
* if they notify us they are - functionality not yet implemented...).
* rgerhards, 2008-01-30
*/
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+// d_pthread_mutex_lock(&pAction->mutActExec);
+// pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
iRet = processAction(pAction, pBatch, pbShutdownImmediate);
- pthread_cleanup_pop(1); /* unlock mutex */
+// pthread_cleanup_pop(1); /* unlock mutex */
RETiRet;
}
@@ -1274,6 +1317,40 @@ finalize_it:
}
+
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+static inline rsRetVal
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+
+#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
+ /* don't output marks to recently written outputs */
+ if(pAction->bWriteAllMarkMsgs == FALSE
+ && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+#endif
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+
+#if 0 // we would need this for bWriteAllMarkMsgs
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
+ pAction->f_time = pAction->f_pMsg->ttGenTime;
+
+#endif
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
+
/* call the configured action. Does all necessary housekeeping.
* rgerhards, 2007-08-01
* FYI: currently, this function is only called from the queue
@@ -1293,15 +1370,15 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
/* We need to lock the mutex only for repeated line processing.
* rgerhards, 2009-06-19
*/
- //if(pAction->f_ReduceRepeated == 1) {
+ if(pAction->bSubmitFirehoseMode == 1) {
+ iRet = doSubmitToActionQ(pAction, pMsg);
+ } else {
LockObj(pAction);
pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
iRet = doActionCallAction(pAction, pMsg);
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- //} else {
- //iRet = doActionCallAction(pAction, pMsg);
- //}
+ }
RETiRet;
}
diff --git a/action.h b/action.h
index ab7dfec7..43e6ae7d 100644
--- a/action.h
+++ b/action.h
@@ -69,6 +69,7 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
+ sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */
short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
int f_prevcount; /* repetition cnt of prevline */
int f_repeatcount; /* number of "repeated" msgs */
@@ -86,7 +87,6 @@ struct action_s {
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
uchar *pszName; /* action name (for documentation) */
int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */
- //uchar **ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
};
diff --git a/doc/msgflow.txt b/doc/msgflow.txt
new file mode 100644
index 00000000..c1c440ef
--- /dev/null
+++ b/doc/msgflow.txt
@@ -0,0 +1,56 @@
+flow of messages (in terms of functions) after they have
+been pulled off the main queue.
+
+Functions are listed in the order they are (usually) called
+if there are branches in processing flow, this is explicitely
+stated.
+
+as of: 2010-06-08, master branch (v5)
+
+syslogd.c/msgConsumer
+syslogd.c/msgConsumeOne
+ if ACLcheck needed:
+ net.cvthname,
+ net.isAllowedSinder2
+ MsgSetRcvFromStr
+ MsgSetRcvFromIPStr
+ if NEEDS_PARSING:
+ parser.ParseMsg
+ruleset.ProcessMsg (loops through ruleset)
+ruleset.c/processMsgDoRules (for each rule in ruleset)
+rule.c/ProcessMsg
+rule.c/shouldProcessThisMessage
+ (evaluates filters, optimize via ALL-Filter)
+if to be processed, loop through associated actions ->
+rule.c/processMsgsDoAction
+action.c/actionCallAction (LOCKs action object!)
+action.c/doActionCallAction (does duplicate message reduction)
+action.c/actionWriteToAction
+ limits based on iExecEveryNthOccur
+ generates "message repeated..." string if necessary
+ limits based on iSecsExecOnceInterval
+! **qqueueEnqObj**
+ This means, we are done processing the action at this
+ stage. The queue may run async, but usually does not
+ do so (in default settings).
+
+
+Now looking at processing of the action queue. If the queue is
+in direct mode, remember that the action object is still
+be locked (this may also be a potential bug in non-direct mode, as
+it looks like we need this prequisite!).
+
+action.c/processBatchMain (queue Consumer, LOOK mutActExec)
+action.c/processAction
+ (calls finishBatch at the end, but not so important
+ for our analysis)
+action.c/submitBatch (recursive submit/retry loop for messages)
+action.c/tryDoAction (submits a [potentially partial] batch)
+action.c/actionProcessMessage
+ (action.c/actionPrepare (utility to set status/TX mode))
+action.c/actionCallDoAction
+1: action.c/prepareDoActionParams
+1: template.c/tplToString-tplToArray
+ string buffer is cached in action object
+2:<output Module>/doAction
+