diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 167 |
1 files changed, 87 insertions, 80 deletions
@@ -50,7 +50,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -59,6 +59,7 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */ static int glbliActionResumeInterval = 30; @@ -128,7 +129,7 @@ getActNow(action_t *pThis) { assert(pThis != NULL); if(pThis->tActNow == -1) { - pThis->tActNow = time(NULL); /* good time call - the only one done */ + pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */ if(pThis->tLastExec > pThis->tActNow) { /* if we are traveling back in time, reset tLastExec */ pThis->tLastExec = (time_t) 0; @@ -210,7 +211,7 @@ rsRetVal actionDestruct(action_t *pThis) if(pThis->ppMsgs[i] != NULL) { switch(pThis->eParamPassing) { case ACT_ARRAY_PASSING: -#if 0 /* later! */ +#if 0 /* later, as an optimization. So far, we do the cleanup after each message */ iArr = 0; while(((char **)pThis->ppMsgs[i])[iArr] != NULL) { d_free(((char **)pThis->ppMsgs[i])[iArr++]); @@ -223,6 +224,9 @@ rsRetVal actionDestruct(action_t *pThis) case ACT_STRING_PASSING: d_free(pThis->ppMsgs[i]); break; + case ACT_MSG_PASSING: + /* No cleanup needed in this case */ + break; default: assert(0); } @@ -250,7 +254,7 @@ rsRetVal actionConstruct(action_t **ppThis) CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; - pThis->tLastOccur = time(NULL); /* done once per action on startup only */ + pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); SYNC_OBJ_TOOL_INIT(pThis); @@ -291,7 +295,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*, batch_t*))processBatchMain)); + (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -466,7 +470,7 @@ static void actionDisable(action_t *pThis) static inline void actionSuspend(action_t *pThis, time_t ttNow) { if(ttNow == NO_TIME_PROVIDED) - time(&ttNow); + datetime.GetTime(&ttNow); pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); actionSetState(pThis, ACT_STATE_SUSP); DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); @@ -534,7 +538,7 @@ static rsRetVal actionTryResume(action_t *pThis) * is always in the past. So we can not avoid doing a fresh time() call * here. -- rgerhards, 2009-03-18 */ - time(&ttNow); /* cache "now" */ + datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow > pThis->ttResumeRtry) { actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } @@ -542,7 +546,7 @@ static rsRetVal actionTryResume(action_t *pThis) if(pThis->eState == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ - time(&ttNow); + datetime.GetTime(&ttNow); CHKiRet(actionDoRetry(pThis, ttNow)); } @@ -623,6 +627,12 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) case ACT_ARRAY_PASSING: CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i]))); break; + case ACT_MSG_PASSING: + /* we abuse the uchar* ptr, it now actually is a void*, but we can not + * change that other than by chaning the interface, what we don't like... + */ + pAction->ppMsgs[i] = (uchar*) pMsg; + break; default:assert(0); /* software bug if this happens! */ } } @@ -654,6 +664,7 @@ static rsRetVal cleanupDoActionParams(action_t *pAction) d_free(pAction->ppMsgs[i]); pAction->ppMsgs[i] = NULL; break; + case ACT_MSG_PASSING: case ACT_STRING_PASSING: break; default: @@ -798,7 +809,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate) { int i; int iElemProcessed; @@ -814,19 +825,20 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iElemProcessed = 0; iCommittedUpTo = i; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { + if(*pbShutdownImmediate) + ABORT_FINALIZE(RS_RET_FORCE_TERM); pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - DBGPRINTF("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p, state %d\n", i, pBatch->nElem, *pnElem, pMsg, pBatch->pElem[i].state);//remove later! if(pBatch->pElem[i].state != BATCH_STATE_DISC) { localRet = actionProcessMessage(pAction, pMsg); DBGPRINTF("action call returned %d\n", localRet); if(localRet == RS_RET_OK) { /* mark messages as committed */ - while(iCommittedUpTo < i) { + while(iCommittedUpTo <= i) { pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { /* mark messages as committed */ - while(iCommittedUpTo < i - 1) { + while(iCommittedUpTo < i) { pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } pBatch->pElem[i].state = BATCH_STATE_SUB; @@ -860,7 +872,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate) { int i; int bDone; @@ -871,7 +883,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); + localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); + if(localRet == RS_RET_FORCE_TERM) + FINALIZE; if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { @@ -902,13 +916,17 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 1; } else { /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); + submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate); + submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate); bDone = 1; } } - } while(!bDone); /* do .. while()! */ + } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */ + if(*pbShutdownImmediate) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + +finalize_it: RETiRet; } @@ -917,26 +935,13 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) * rgerhards, 2009-05-12 */ static rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { - int i; - msg_t *pMsg; - rsRetVal localRet; DEFiRet; assert(pBatch != NULL); - pBatch->iDoneUpTo = 0; - /* TODO: think about action batches, must be handled at upper layer! - * MULTIQUEUE - */ - localRet = submitBatch(pAction, pBatch, pBatch->nElem); - CHKiRet(localRet); - - /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - for(i = 0 ; i < pBatch->nElem ; i++) { - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - } + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate)); iRet = finishBatch(pAction); finalize_it: @@ -950,7 +955,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch) +processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { DEFiRet; @@ -964,7 +969,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch); + iRet = processAction(pAction, pBatch, pbShutdownImmediate); pthread_cleanup_pop(1); /* unlock mutex */ @@ -1127,7 +1132,7 @@ actionWriteToAction(action_t *pAction) * a purely logical point of view. However, if safes us to check the system time in * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16 */ - if(pAction->f_time != 0 && pAction->iSecsExecOnceInterval > 0 && + if(pAction->iSecsExecOnceInterval > 0 && pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) { /* in this case we need to discard the message - its not yet time to exec the action */ DBGPRINTF("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", @@ -1138,6 +1143,7 @@ actionWriteToAction(action_t *pAction) } /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. @@ -1260,47 +1266,6 @@ actionCallAction(action_t *pAction, msg_t *pMsg) #pragma GCC diagnostic warning "-Wempty-body" -/* add our cfsysline handlers - * rgerhards, 2008-01-28 - */ -rsRetVal -actionAddCfSysLineHdrl(void) -{ - DEFiRet; - - CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); - -finalize_it: - RETiRet; -} - - /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. @@ -1352,9 +1317,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques for(i = 0 ; i < pAction->iNumTpls ; ++i) { CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); - /* Ok, we got everything, so it now is time to look up the - * template (Hint: templates MUST be defined before they are - * used!) + /* Ok, we got everything, so it now is time to look up the template + * (Hint: templates MUST be defined before they are used!) */ if((pAction->ppTpl[i] = tplFind((char*)pTplName, strlen((char*)pTplName))) == NULL) { snprintf(errMsg, sizeof(errMsg) / sizeof(char), @@ -1376,6 +1340,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques /* set parameter-passing mode */ if(iTplOpts & OMSR_TPL_AS_ARRAY) { pAction->eParamPassing = ACT_ARRAY_PASSING; + } else if(iTplOpts & OMSR_TPL_AS_MSG) { + pAction->eParamPassing = ACT_MSG_PASSING; } else { pAction->eParamPassing = ACT_STRING_PASSING; } @@ -1395,7 +1361,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) - actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */ + actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ CHKiRet(actionConstructFinalize(pAction)); @@ -1417,6 +1383,17 @@ finalize_it: } +/* Reset config variables to default values. + * rgerhards, 2009-11-12 + */ +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + iActExecOnceInterval = 0; + return RS_RET_OK; +} + + /* TODO: we are not yet a real object, the ClassInit here just looks like it is.. */ rsRetVal actionClassInit(void) @@ -1428,6 +1405,36 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &iActExecOnceInterval, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); + finalize_it: RETiRet; } |