summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-08 12:46:24 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-08 12:46:24 +0200
commit3e49a1075ab6750135e1a38cf0c213579fa30b4a (patch)
treec986e10161f93f751a1d6a116cf53c1080844450 /action.c
parent220c57e7ebc49a56cc91fa31308b1563f83a95fb (diff)
downloadrsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.tar.gz
rsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.tar.xz
rsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.zip
performance enhancement: implemented stage 1 firehose mode for actions
... plus some other tests, namely string generation in parallel to action processing. The code is not yet solid and not fully compatible to older versions. But it is good enough for an early commit and some early testing/gaining of experience. The optimization was done based on the fine-grained partitioning paradigm worked on the past couple of weeks -- seems to work out really great :)
Diffstat (limited to 'action.c')
-rw-r--r--action.c101
1 files changed, 89 insertions, 12 deletions
diff --git a/action.c b/action.c
index 0fac6cf8..b055ebf4 100644
--- a/action.c
+++ b/action.c
@@ -280,6 +280,29 @@ actionConstructFinalize(action_t *pThis)
/* find a name for our queue */
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ /* now check if we can run the action in "firehose mode" during stage one of
+ * its processing (that is before messages are enqueued into the action q).
+ * This is only possible if some features, which require strict sequence, are
+ * not used. Thankfully, that is usually the case. The benefit of firehose
+ * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
+ */
+ /* how about bWriteAllMarkMsgs??? */
+ if( pThis->iExecEveryNthOccur > 1
+ || pThis->f_ReduceRepeated
+ || pThis->iSecsExecOnceInterval
+ ) {
+ DBGPRINTF("info: firehose mode disabled for action because "
+ "iExecEveryNthOccur=%d, "
+ "ReduceRepeated=%d, "
+ "iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
+ pThis->iSecsExecOnceInterval
+ );
+ pThis->bSubmitFirehoseMode = 0;
+ } else {
+ pThis->bSubmitFirehoseMode = 1;
+ }
+
/* we need to make a safety check: if the queue is NOT in direct mode, a single
* message object may be accessed by multiple threads. As such, we need to enable
* msg object thread safety in this case (this costs a bit performance and thus
@@ -633,6 +656,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
+ dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
dbgprintf("\n");
RETiRet;
@@ -642,7 +666,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs)
{
int i;
DEFiRet;
@@ -653,7 +677,8 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
break;
case ACT_ARRAY_PASSING:
CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i])));
@@ -722,16 +747,32 @@ static rsRetVal cleanupDoActionParams(action_t *pAction)
rsRetVal
actionCallDoAction(action_t *pThis, msg_t *pMsg)
{
+ uchar *ppMsgs[10];
+ size_t lenMsgs[10];
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
+ for(i = 0 ; i < 10 ; ++ i) {
+ ppMsgs[i] = NULL;
+ lenMsgs[i] = 0;
+ }
+
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
- CHKiRet(prepareDoActionParams(pThis, pMsg));
+ CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#if 1
+d_pthread_mutex_lock(&pThis->mutActExec);
+pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
+ iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+pthread_cleanup_pop(1); /* unlock mutex */
+ //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#else
+iRet = RS_RET_OK;
+#endif
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -762,6 +803,9 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
finalize_it:
cleanupDoActionParams(pThis); /* iRet ignored! */
+for(i = 0 ; i < 10 ; ++i)
+ free(ppMsgs[i]);
+
RETiRet;
}
@@ -778,7 +822,6 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg)
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
-RUNLOG_STR("inside actionProcessMsg()");
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg));
@@ -1008,12 +1051,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
* if they notify us they are - functionality not yet implemented...).
* rgerhards, 2008-01-30
*/
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+// d_pthread_mutex_lock(&pAction->mutActExec);
+// pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
iRet = processAction(pAction, pBatch, pbShutdownImmediate);
- pthread_cleanup_pop(1); /* unlock mutex */
+// pthread_cleanup_pop(1); /* unlock mutex */
RETiRet;
}
@@ -1274,6 +1317,40 @@ finalize_it:
}
+
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+static inline rsRetVal
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+
+#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
+ /* don't output marks to recently written outputs */
+ if(pAction->bWriteAllMarkMsgs == FALSE
+ && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+#endif
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+
+#if 0 // we would need this for bWriteAllMarkMsgs
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
+ pAction->f_time = pAction->f_pMsg->ttGenTime;
+
+#endif
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
+
/* call the configured action. Does all necessary housekeeping.
* rgerhards, 2007-08-01
* FYI: currently, this function is only called from the queue
@@ -1293,15 +1370,15 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
/* We need to lock the mutex only for repeated line processing.
* rgerhards, 2009-06-19
*/
- //if(pAction->f_ReduceRepeated == 1) {
+ if(pAction->bSubmitFirehoseMode == 1) {
+ iRet = doSubmitToActionQ(pAction, pMsg);
+ } else {
LockObj(pAction);
pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
iRet = doActionCallAction(pAction, pMsg);
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- //} else {
- //iRet = doActionCallAction(pAction, pMsg);
- //}
+ }
RETiRet;
}