diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 149 |
1 files changed, 119 insertions, 30 deletions
@@ -4,7 +4,44 @@ * * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. + * Some notes on processing (this hopefully makes it easier to find + * the right code in question): For performance reasons, this module + * uses different methods of message submission based on the user-selected + * configuration. This code is similar, but can not be abstracted because + * of the performanse-affecting differences in it. As such, it is often + * necessary to triple-check that everything works well in *all* modes. + * The different modes (and calling sequence) are: + * + * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval + * - doSubmitToActionQComplexBatch + * - helperSubmitToActionQComplexBatch + * - doActionCallAction + * handles duplicate message processing, but in essence calls + * - actionWriteToAction + * - qqueueEnqObj + * (now queue engine processing) + * if(pThis->bWriteAllMarkMsgs == FALSE) - this is the DEFAULT + * - doSubmitToActionQNotAllMarkBatch + * - doSubmitToActionQBatch (and from here like in the else case below!) + * else + * - doSubmitToActionQBatch + * - doSubmitToActionQ + * - qqueueEnqObj + * (now queue engine processing) + * + * Note that bWriteAllMakrMsgs on or off creates almost the same processing. + * The difference ist that if WriteAllMarkMsgs is not set, we need to + * preprocess the batch and drop mark messages which are not yet due for + * writing. + * + * After dequeue, processing is as follows: + * - processBatchMain + * - processAction + * - submitBatch + * - tryDoAction + * - + * + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -710,6 +747,7 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) ASSERT(pAction != NULL); for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { +dbgprintf("XXXX: releaseBatch %d: bPrevWasSuspended %d\n", i, pBatch->pElem[i].bPrevWasSuspended); pElem = &(pBatch->pElem[i]); if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { switch(pAction->eParamPassing) { @@ -897,8 +935,9 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) while(iElemProcessed <= *pnElem && i < pBatch->nElem) { if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); +dbgprintf("XXXX:tryDoAction %d: bExecWhenPrevSusp: %d, bPrevWasSuspended %d ptr %p\n", i, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended, &(pBatch->pElem[i].bPrevWasSuspended)); if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC + && pBatch->pElem[i].state != BATCH_STATE_DISC//) { && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams); @@ -962,6 +1001,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 0; do { localRet = tryDoAction(pAction, pBatch, &nElem); +dbgprintf("XXXX: tryDoAction returns %d\n", localRet); if(localRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } @@ -981,12 +1021,14 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) } else if(localRet == RS_RET_SUSPENDED) { ; /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { +dbgprintf("XXXX: in ACTION_FAILED branch, doneUpTo %d\n", pBatch->iDoneUpTo); /* in this case, everything not yet committed is BAD */ for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { if( pBatch->pElem[i].state != BATCH_STATE_DISC && pBatch->pElem[i].state != BATCH_STATE_COMM ) { pBatch->pElem[i].state = BATCH_STATE_BAD; pBatch->pElem[i].bPrevWasSuspended = 1; +dbgprintf("XXXX: setting susps for item %d ptr %p\n", i, &(pBatch->pElem[i].bPrevWasSuspended)); } } bDone = 1; @@ -1029,6 +1071,7 @@ prepareBatch(action_t *pAction, batch_t *pBatch) pBatch->iDoneUpTo = 0; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { +dbgprintf("XXXX: prepareBatch %d: bPrevWasSuspended %d\n", i, pBatch->pElem[i].bPrevWasSuspended); pElem = &(pBatch->pElem[i]); if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { pElem->state = BATCH_STATE_RDY; @@ -1069,6 +1112,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) DEFiRet; assert(pBatch != NULL); +dbgprintf("XXXX: running processBatchMain\n"); pbShutdownImmdtSave = pBatch->pbShutdownImmediate; pBatch->pbShutdownImmediate = pbShutdownImmediate; @@ -1098,6 +1142,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) iRet = localRet; finalize_it: +dbgprintf("XXXX: exiting processBatchMain\n"); pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } @@ -1163,12 +1208,33 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. This is also utilized to submit messages in complex case once + * the complex logic has been applied ;) + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + /* This function builds up a batch of messages to be (later) * submitted to the action queue. */ rsRetVal -actionWriteToAction(action_t *pAction) +actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch) { +dbgprintf("XXXX: enter actionWriteToAction idx %d\n", idxBtch); msg_t *pMsgSave; /* to save current message pointer, necessary to restore it in case it needs to be updated (e.g. repeated msgs) */ DEFiRet; @@ -1238,7 +1304,7 @@ actionWriteToAction(action_t *pAction) pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ } - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too * frequently called. -- rgerhards, 2008-04-08 @@ -1256,15 +1322,45 @@ actionWriteToAction(action_t *pAction) FINALIZE; } - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) + * rgerhards, 2008-09-17 */ pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pAction->f_pMsg->ttGenTime; +dbgprintf("XXXX:actionWriteToAction %d: bExecWhenPrevSusp: %d, bPrevWasSuspended %d ptr %p\n", idxBtch, pAction->bExecWhenPrevSusp, pBatch->pElem[idxBtch].bPrevWasSuspended, &(pBatch->pElem[idxBtch].bPrevWasSuspended)); /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + if(pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended) { + /* in that case, we need to create a special batch which reflects the + * suspended state. Otherwise, that information would be dropped inside + * the queue engine. TODO: in later releases (v6?) create a better + * solution than what we do here. However, for v5 this sounds much too + * intrusive. -- rgerhardsm, 2011-03-16 + * (Code is copied over from queue.c and slightly modified) + */ + batch_t singleBatch; + batch_obj_t batchObj; + int i; + memset(&batchObj, 0, sizeof(batch_obj_t)); + memset(&singleBatch, 0, sizeof(batch_t)); + batchObj.state = BATCH_STATE_RDY; + batchObj.pUsrp = (obj_t*) pAction->f_pMsg; + batchObj.bPrevWasSuspended = 1; + batchObj.bFilterOK = 1; + singleBatch.nElem = 1; /* there always is only one in direct mode */ + singleBatch.pElem = &batchObj; + + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch); + + for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { + free(batchObj.staticActStrings[i]); + } + } else { /* standard case, just submit */ + iRet = doSubmitToActionQ(pAction, pAction->f_pMsg); + } +dbgprintf("XXXX: action iRet %d\n", iRet); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ @@ -1291,10 +1387,13 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, msg_t *pMsg) +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) { + msg_t *pMsg; DEFiRet; +dbgprintf("XXXX: enter doActionCallAction\n"); + pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp); pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1321,7 +1420,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) * isolated messages), but back off so we'll flush less often in the future. */ if(getActNow(pAction) > REPEATTIME(pAction)) { - iRet = actionWriteToAction(pAction); + iRet = actionWriteToAction(pAction, pBatch, idxBtch); BACKOFF(pAction); } } else {/* new message, save it */ @@ -1330,7 +1429,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) */ if(pAction->f_pMsg != NULL) { if(pAction->f_prevcount > 0) - actionWriteToAction(pAction); + actionWriteToAction(pAction, pBatch, idxBtch); /* we do not care about iRet above - I think it's right but if we have * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 */ @@ -1338,33 +1437,21 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) } pAction->f_pMsg = MsgAddRef(pMsg); /* call the output driver */ - iRet = actionWriteToAction(pAction); + iRet = actionWriteToAction(pAction, pBatch, idxBtch); } finalize_it: - RETiRet; -} - -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. - * rgerhards, 2010-06-08 - */ -static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) -{ - DEFiRet; - - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); - else - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + /* we need to update the batch to handle failover processing correctly */ + if(iRet == RS_RET_OK) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 0; + } else if(iRet == RS_RET_ACTION_FAILED) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 1; + } RETiRet; } - /* This submits the message to the action queue in case where we need to handle * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop * for the synchronization. Here, we just modify the filter condition to be false when @@ -1480,9 +1567,11 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { +dbgprintf("XXXX: helper complex %d: bExecWhenPrevSusp: %d, bPrevWasSuspended %d ptr %p\n", i, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended, &(pBatch->pElem[i].bPrevWasSuspended)); if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC) { - doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + && pBatch->pElem[i].state != BATCH_STATE_DISC + && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { + doActionCallAction(pAction, pBatch, i); } } |