diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-08 12:46:24 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-08 12:46:24 +0200 |
commit | 3e49a1075ab6750135e1a38cf0c213579fa30b4a (patch) | |
tree | c986e10161f93f751a1d6a116cf53c1080844450 /action.c | |
parent | 220c57e7ebc49a56cc91fa31308b1563f83a95fb (diff) | |
download | rsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.tar.gz rsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.tar.xz rsyslog-3e49a1075ab6750135e1a38cf0c213579fa30b4a.zip |
performance enhancement: implemented stage 1 firehose mode for actions
... plus some other tests, namely string generation in parallel to action
processing. The code is not yet solid and not fully compatible to
older versions. But it is good enough for an early commit and some
early testing/gaining of experience.
The optimization was done based on the fine-grained partitioning
paradigm worked on the past couple of weeks -- seems to work out
really great :)
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 101 |
1 files changed, 89 insertions, 12 deletions
@@ -280,6 +280,29 @@ actionConstructFinalize(action_t *pThis) /* find a name for our queue */ snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + /* how about bWriteAllMarkMsgs??? */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->f_ReduceRepeated + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, " + "ReduceRepeated=%d, " + "iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, + pThis->iSecsExecOnceInterval + ); + pThis->bSubmitFirehoseMode = 0; + } else { + pThis->bSubmitFirehoseMode = 1; + } + /* we need to make a safety check: if the queue is NOT in direct mode, a single * message object may be accessed by multiple threads. As such, we need to enable * msg object thread safety in this case (this costs a bit performance and thus @@ -633,6 +656,7 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); + dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode); dbgprintf("\n"); RETiRet; @@ -642,7 +666,7 @@ rsRetVal actionDbgPrint(action_t *pThis) /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ -static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs) { int i; DEFiRet; @@ -653,7 +677,8 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); + //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); break; case ACT_ARRAY_PASSING: CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i]))); @@ -722,16 +747,32 @@ static rsRetVal cleanupDoActionParams(action_t *pAction) rsRetVal actionCallDoAction(action_t *pThis, msg_t *pMsg) { + uchar *ppMsgs[10]; + size_t lenMsgs[10]; + int i; DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); + for(i = 0 ; i < 10 ; ++ i) { + ppMsgs[i] = NULL; + lenMsgs[i] = 0; + } + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - CHKiRet(prepareDoActionParams(pThis, pMsg)); + CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); +#if 1 +d_pthread_mutex_lock(&pThis->mutActExec); +pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); + iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); +pthread_cleanup_pop(1); /* unlock mutex */ + //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); +#else +iRet = RS_RET_OK; +#endif switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -762,6 +803,9 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) finalize_it: cleanupDoActionParams(pThis); /* iRet ignored! */ +for(i = 0 ; i < 10 ; ++i) + free(ppMsgs[i]); + RETiRet; } @@ -778,7 +822,6 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg) ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); -RUNLOG_STR("inside actionProcessMsg()"); CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg)); @@ -1008,12 +1051,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) * if they notify us they are - functionality not yet implemented...). * rgerhards, 2008-01-30 */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); +// d_pthread_mutex_lock(&pAction->mutActExec); +// pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); iRet = processAction(pAction, pBatch, pbShutdownImmediate); - pthread_cleanup_pop(1); /* unlock mutex */ +// pthread_cleanup_pop(1); /* unlock mutex */ RETiRet; } @@ -1274,6 +1317,40 @@ finalize_it: } + +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + +#if 0 // TODO: we need to care about this -- after PoC 2010-06-08 + /* don't output marks to recently written outputs */ + if(pAction->bWriteAllMarkMsgs == FALSE + && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { + ABORT_FINALIZE(RS_RET_OK); + } +#endif + + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + +#if 0 // we would need this for bWriteAllMarkMsgs + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ + pAction->f_time = pAction->f_pMsg->ttGenTime; + +#endif + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + + /* call the configured action. Does all necessary housekeeping. * rgerhards, 2007-08-01 * FYI: currently, this function is only called from the queue @@ -1293,15 +1370,15 @@ actionCallAction(action_t *pAction, msg_t *pMsg) /* We need to lock the mutex only for repeated line processing. * rgerhards, 2009-06-19 */ - //if(pAction->f_ReduceRepeated == 1) { + if(pAction->bSubmitFirehoseMode == 1) { + iRet = doSubmitToActionQ(pAction, pMsg); + } else { LockObj(pAction); pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); iRet = doActionCallAction(pAction, pMsg); UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - //} else { - //iRet = doActionCallAction(pAction, pMsg); - //} + } RETiRet; } |