diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-07 13:37:25 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-07 13:37:25 +0200 |
commit | 9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21 (patch) | |
tree | 31b22d176a337b9a946f73078c8649f9b22b8275 | |
parent | 68877497a131d5b7c5b1588b771a623fc0ad41c1 (diff) | |
download | rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.tar.gz rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.tar.xz rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.zip |
fixed some bugs & added testing helpers
The action state machine now works correctly and has been verified a
few hand-picked cases. I am missing automatted tests, though, this is
not easy to achive... Anyhow, I've improved omtesting, so that it can
be used in such automatted tests.
-rw-r--r-- | action.c | 107 | ||||
-rw-r--r-- | plugins/omtesting/omtesting.c | 156 |
2 files changed, 231 insertions, 32 deletions
@@ -387,14 +387,31 @@ static rsRetVal getReturnCode(action_t *pThis) } +/* set the action to a new state + * rgerhards, 2007-08-02 + */ +static inline void actionSetState(action_t *pThis, action_state_t newState) +{ + pThis->eState = newState; + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); +} + /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ static void actionCommitted(action_t *pThis) { - pThis->eState = ACT_STATE_RDY; - DBGPRINTF("Action has committed.\n"); + actionSetState(pThis, ACT_STATE_RDY); +} + + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void actionRetry(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RTRY); } @@ -405,8 +422,7 @@ static void actionCommitted(action_t *pThis) */ static void actionDisable(action_t *pThis) { - pThis->eState = ACT_STATE_DIED; - DBGPRINTF("Action requested to be disabled, done that.\n"); + actionSetState(pThis, ACT_STATE_DIED); } @@ -422,9 +438,9 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow) { if(ttNow == NO_TIME_PROVIDED) time(&ttNow); - pThis->eState = ACT_STATE_SUSP; pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - DBGPRINTF("Action requested to be suspended, done that, retry=%d\n", (int) pThis->ttResumeRtry); + actionSetState(pThis, ACT_STATE_SUSP); + DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -442,10 +458,15 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); +RUNLOG_STR("actionDoRetry():"); iRetries = 0; while(pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); - if(iRet == RS_RET_SUSPENDED) { + if(iRet == RS_RET_OK) { + actionSetState(pThis, ACT_STATE_RDY); +RUNLOG_STR("tryResume succeeded"); + } else if(iRet == RS_RET_SUSPENDED) { +RUNLOG_STR("still suspended");; /* max retries reached? */ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis, ttNow); @@ -479,6 +500,7 @@ static rsRetVal actionTryResume(action_t *pThis) ASSERT(pThis != NULL); +RUNLOG_STR("actionTryResume()"); if(pThis->eState == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used @@ -489,7 +511,7 @@ static rsRetVal actionTryResume(action_t *pThis) */ time(&ttNow); /* cache "now" */ if(ttNow > pThis->ttResumeRtry) { - pThis->eState = ACT_STATE_RTRY; /* back to retries */ + actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } } @@ -515,6 +537,7 @@ static rsRetVal actionPrepare(action_t *pThis) { DEFiRet; +RUNLOG_STR("actionPrepare()"); assert(pThis != NULL); if(pThis->eState == ACT_STATE_RTRY) { CHKiRet(actionTryResume(pThis)); @@ -525,7 +548,7 @@ static rsRetVal actionPrepare(action_t *pThis) */ if(pThis->eState == ACT_STATE_RDY) { CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData)); - pThis->eState = ACT_STATE_ITX; + actionSetState(pThis, ACT_STATE_ITX); } finalize_it: @@ -646,6 +669,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs)); pThis->bHadAutoCommit = 0; @@ -662,7 +686,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) pThis->bHadAutoCommit = 1; break; case RS_RET_SUSPENDED: - actionSuspend(pThis, NO_TIME_PROVIDED); + actionRetry(pThis); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -681,6 +705,29 @@ finalize_it: } +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + +RUNLOG_STR("inside actionProcessMsg()"); + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, pMsg)); + + iRet = getReturnCode(pThis); +finalize_it: + RETiRet; +} + + /* finish processing a batch. Most importantly, that means we commit if we * need to do so. * rgerhards, 2008-01-28 @@ -703,7 +750,7 @@ finishBatch(action_t *pThis) actionCommitted(pThis); break; case RS_RET_SUSPENDED: - actionSuspend(pThis, NO_TIME_PROVIDED); + actionRetry(pThis); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -731,6 +778,33 @@ finalize_it: } +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +rsRetVal +actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp) +{ + int i; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(paUsrp != NULL); + + for(i = 0 ; i < paUsrp->nElem ; i++) { + pMsg = (msg_t*) paUsrp->pUsrp[i]; +dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + localRet = actionProcessMessage(pAction, pMsg); + dbgprintf("action call returned %d\n", localRet); + msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ + CHKiRet(localRet); + } + iRet = finishBatch(pAction); + +finalize_it: + RETiRet; +} #pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. @@ -739,9 +813,7 @@ finalize_it: rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) { - int i; int iCancelStateSave; - msg_t *pMsg; DEFiRet; assert(paUsrp != NULL); @@ -756,17 +828,10 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - for(i = 0 ; i < paUsrp->nElem ; i++) { - pMsg = (msg_t*) paUsrp->pUsrp[i]; -dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); - CHKiRet(actionCallDoAction(pAction, pMsg)); - msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ - } - iRet = finishBatch(pAction); + iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp); pthread_cleanup_pop(1); /* unlock mutex */ -finalize_it: RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c index 411bcf88..8f6cdbe5 100644 --- a/plugins/omtesting/omtesting.c +++ b/plugins/omtesting/omtesting.c @@ -22,7 +22,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2007 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -46,12 +46,14 @@ #include <stdio.h> #include <stdarg.h> #include <stdlib.h> +#include <time.h> #include <string.h> #include <ctype.h> #include <assert.h> #include "dirty.h" #include "syslogd-types.h" #include "module-template.h" +#include "cfsysline.h" MODULE_TYPE_OUTPUT @@ -59,9 +61,18 @@ MODULE_TYPE_OUTPUT */ DEF_OMOD_STATIC_DATA +static int bEchoStdout = 0; /* echo non-failed messages to stdout */ + typedef struct _instanceData { + enum { MD_SLEEP, MD_FAIL, MD_RANDFAIL, MD_ALWAYS_SUSPEND } + mode; + int bEchoStdout; int iWaitSeconds; int iWaitUSeconds; /* milli-seconds (one million of a second, just to make sure...) */ + int iCurrCallNbr; + int iFailFrequency; + int iResumeAfter; + int iCurrRetries; } instanceData; BEGINcreateInstance @@ -85,19 +96,106 @@ CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature -BEGINtryResume -CODESTARTtryResume -ENDtryResume +/* implement "fail" command in retry processing */ +static rsRetVal doFailOnResume(instanceData *pData) +{ + DEFiRet; -BEGINdoAction -CODESTARTdoAction + dbgprintf("fail retry curr %d, max %d\n", pData->iCurrRetries, pData->iResumeAfter); + if(++pData->iCurrRetries == pData->iResumeAfter) { + iRet = RS_RET_OK; + } else { + iRet = RS_RET_SUSPENDED; + } + + RETiRet; +} + + +/* implement "fail" command */ +static rsRetVal doFail(instanceData *pData) +{ + DEFiRet; + + dbgprintf("fail curr %d, frquency %d\n", pData->iCurrCallNbr, pData->iFailFrequency); + if(pData->iCurrCallNbr++ % pData->iFailFrequency == 0) { + pData->iCurrRetries = 0; + iRet = RS_RET_SUSPENDED; + } + + RETiRet; +} + + +/* implement "sleep" command */ +static rsRetVal doSleep(instanceData *pData) +{ + DEFiRet; struct timeval tvSelectTimeout; dbgprintf("sleep(%d, %d)\n", pData->iWaitSeconds, pData->iWaitUSeconds); tvSelectTimeout.tv_sec = pData->iWaitSeconds; tvSelectTimeout.tv_usec = pData->iWaitUSeconds; /* milli seconds */ select(0, NULL, NULL, NULL, &tvSelectTimeout); - //dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet); + RETiRet; +} + + +/* implement "randomfail" command */ +static rsRetVal doRandFail(void) +{ + DEFiRet; + if((rand() >> 4) < (RAND_MAX >> 5)) { /* rougly same probability */ + iRet = RS_RET_OK; + dbgprintf("omtesting randfail: succeeded this time\n"); + } else { + iRet = RS_RET_SUSPENDED; + dbgprintf("omtesting randfail: failed this time\n"); + } + RETiRet; +} + + +BEGINtryResume +CODESTARTtryResume + dbgprintf("omtesting tryResume() called\n"); + switch(pData->mode) { + case MD_SLEEP: + break; + case MD_FAIL: + iRet = doFailOnResume(pData); + break; + case MD_RANDFAIL: + iRet = doRandFail(); + break; + case MD_ALWAYS_SUSPEND: + iRet = RS_RET_SUSPENDED; + } + dbgprintf("omtesting tryResume() returns iRet %d\n", iRet); +ENDtryResume + + +BEGINdoAction +CODESTARTdoAction + dbgprintf("omtesting received msg '%s'\n", ppString[0]); + switch(pData->mode) { + case MD_SLEEP: + iRet = doSleep(pData); + break; + case MD_FAIL: + iRet = doFail(pData); + break; + case MD_RANDFAIL: + iRet = doRandFail(); + case MD_ALWAYS_SUSPEND: + iRet = RS_RET_SUSPENDED; + } + + if(iRet == RS_RET_OK && pData->bEchoStdout) { + fprintf(stdout, "%s", ppString[0]); + fflush(stdout); + } + dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet); ENDdoAction @@ -113,7 +211,7 @@ BEGINparseSelectorAct int i; uchar szBuf[1024]; CODESTARTparseSelectorAct -CODE_STD_STRING_REQUESTparseSelectorAct(0) +CODE_STD_STRING_REQUESTparseSelectorAct(1) /* code here is quick and dirty - if you like, clean it up. But keep * in mind it is just a testing aid ;) -- rgerhards, 2007-12-31 */ @@ -135,6 +233,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(0) if(isspace(*p)) ++p; + dbgprintf("omtesting command: '%s'\n", szBuf); if(!strcmp((char*) szBuf, "sleep")) { /* parse seconds */ for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) { @@ -152,12 +251,43 @@ CODE_STD_STRING_REQUESTparseSelectorAct(0) if(isspace(*p)) ++p; pData->iWaitUSeconds = atoi((char*) szBuf); - } - /* once there are other modes, here is the spot to add it! */ - else { + pData->mode = MD_SLEEP; + } else if(!strcmp((char*) szBuf, "fail")) { + /* "fail fail-freqency resume-after" + * fail-frequency specifies how often doAction() fails + * resume-after speicifes how fast tryResume() should come back with success + * all numbers being "times called" + */ + /* parse fail-frequence */ + for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) { + szBuf[i] = *p++; + } + szBuf[i] = '\0'; + if(isspace(*p)) + ++p; + pData->iFailFrequency = atoi((char*) szBuf); + /* parse resume-after */ + for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) { + szBuf[i] = *p++; + } + szBuf[i] = '\0'; + if(isspace(*p)) + ++p; + pData->iResumeAfter = atoi((char*) szBuf); + pData->iCurrCallNbr = 1; + pData->mode = MD_FAIL; + } else if(!strcmp((char*) szBuf, "randfail")) { + pData->mode = MD_RANDFAIL; + } else if(!strcmp((char*) szBuf, "always_suspend")) { + pData->mode = MD_ALWAYS_SUSPEND; + } else { dbgprintf("invalid mode '%s', doing 'sleep 1 0' - fix your config\n", szBuf); } + pData->bEchoStdout = bEchoStdout; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, + (uchar*)"RSYSLOG_TraditionalForwardFormat")); + CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -177,6 +307,10 @@ BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionomtestingechostdout", 0, eCmdHdlrBinary, NULL, + &bEchoStdout, STD_LOADABLE_MODULE_ID)); + /* we seed the random-number generator in any case... */ + srand(time(NULL)); ENDmodInit /* * vi:set ai: |