diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-07 13:37:25 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-07 13:37:25 +0200 |
commit | 9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21 (patch) | |
tree | 31b22d176a337b9a946f73078c8649f9b22b8275 /action.c | |
parent | 68877497a131d5b7c5b1588b771a623fc0ad41c1 (diff) | |
download | rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.tar.gz rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.tar.xz rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.zip |
fixed some bugs & added testing helpers
The action state machine now works correctly and has been verified a
few hand-picked cases. I am missing automatted tests, though, this is
not easy to achive... Anyhow, I've improved omtesting, so that it can
be used in such automatted tests.
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 107 |
1 files changed, 86 insertions, 21 deletions
@@ -387,14 +387,31 @@ static rsRetVal getReturnCode(action_t *pThis) } +/* set the action to a new state + * rgerhards, 2007-08-02 + */ +static inline void actionSetState(action_t *pThis, action_state_t newState) +{ + pThis->eState = newState; + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); +} + /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ static void actionCommitted(action_t *pThis) { - pThis->eState = ACT_STATE_RDY; - DBGPRINTF("Action has committed.\n"); + actionSetState(pThis, ACT_STATE_RDY); +} + + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void actionRetry(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RTRY); } @@ -405,8 +422,7 @@ static void actionCommitted(action_t *pThis) */ static void actionDisable(action_t *pThis) { - pThis->eState = ACT_STATE_DIED; - DBGPRINTF("Action requested to be disabled, done that.\n"); + actionSetState(pThis, ACT_STATE_DIED); } @@ -422,9 +438,9 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow) { if(ttNow == NO_TIME_PROVIDED) time(&ttNow); - pThis->eState = ACT_STATE_SUSP; pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - DBGPRINTF("Action requested to be suspended, done that, retry=%d\n", (int) pThis->ttResumeRtry); + actionSetState(pThis, ACT_STATE_SUSP); + DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -442,10 +458,15 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); +RUNLOG_STR("actionDoRetry():"); iRetries = 0; while(pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); - if(iRet == RS_RET_SUSPENDED) { + if(iRet == RS_RET_OK) { + actionSetState(pThis, ACT_STATE_RDY); +RUNLOG_STR("tryResume succeeded"); + } else if(iRet == RS_RET_SUSPENDED) { +RUNLOG_STR("still suspended");; /* max retries reached? */ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis, ttNow); @@ -479,6 +500,7 @@ static rsRetVal actionTryResume(action_t *pThis) ASSERT(pThis != NULL); +RUNLOG_STR("actionTryResume()"); if(pThis->eState == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used @@ -489,7 +511,7 @@ static rsRetVal actionTryResume(action_t *pThis) */ time(&ttNow); /* cache "now" */ if(ttNow > pThis->ttResumeRtry) { - pThis->eState = ACT_STATE_RTRY; /* back to retries */ + actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } } @@ -515,6 +537,7 @@ static rsRetVal actionPrepare(action_t *pThis) { DEFiRet; +RUNLOG_STR("actionPrepare()"); assert(pThis != NULL); if(pThis->eState == ACT_STATE_RTRY) { CHKiRet(actionTryResume(pThis)); @@ -525,7 +548,7 @@ static rsRetVal actionPrepare(action_t *pThis) */ if(pThis->eState == ACT_STATE_RDY) { CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData)); - pThis->eState = ACT_STATE_ITX; + actionSetState(pThis, ACT_STATE_ITX); } finalize_it: @@ -646,6 +669,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs)); pThis->bHadAutoCommit = 0; @@ -662,7 +686,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) pThis->bHadAutoCommit = 1; break; case RS_RET_SUSPENDED: - actionSuspend(pThis, NO_TIME_PROVIDED); + actionRetry(pThis); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -681,6 +705,29 @@ finalize_it: } +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + +RUNLOG_STR("inside actionProcessMsg()"); + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, pMsg)); + + iRet = getReturnCode(pThis); +finalize_it: + RETiRet; +} + + /* finish processing a batch. Most importantly, that means we commit if we * need to do so. * rgerhards, 2008-01-28 @@ -703,7 +750,7 @@ finishBatch(action_t *pThis) actionCommitted(pThis); break; case RS_RET_SUSPENDED: - actionSuspend(pThis, NO_TIME_PROVIDED); + actionRetry(pThis); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -731,6 +778,33 @@ finalize_it: } +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +rsRetVal +actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp) +{ + int i; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(paUsrp != NULL); + + for(i = 0 ; i < paUsrp->nElem ; i++) { + pMsg = (msg_t*) paUsrp->pUsrp[i]; +dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + localRet = actionProcessMessage(pAction, pMsg); + dbgprintf("action call returned %d\n", localRet); + msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ + CHKiRet(localRet); + } + iRet = finishBatch(pAction); + +finalize_it: + RETiRet; +} #pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. @@ -739,9 +813,7 @@ finalize_it: rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) { - int i; int iCancelStateSave; - msg_t *pMsg; DEFiRet; assert(paUsrp != NULL); @@ -756,17 +828,10 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - for(i = 0 ; i < paUsrp->nElem ; i++) { - pMsg = (msg_t*) paUsrp->pUsrp[i]; -dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); - CHKiRet(actionCallDoAction(pAction, pMsg)); - msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ - } - iRet = finishBatch(pAction); + iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp); pthread_cleanup_pop(1); /* unlock mutex */ -finalize_it: RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" |