summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-07 13:37:25 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-07 13:37:25 +0200
commit9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21 (patch)
tree31b22d176a337b9a946f73078c8649f9b22b8275 /action.c
parent68877497a131d5b7c5b1588b771a623fc0ad41c1 (diff)
downloadrsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.tar.gz
rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.tar.xz
rsyslog-9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21.zip
fixed some bugs & added testing helpers
The action state machine now works correctly and has been verified a few hand-picked cases. I am missing automatted tests, though, this is not easy to achive... Anyhow, I've improved omtesting, so that it can be used in such automatted tests.
Diffstat (limited to 'action.c')
-rw-r--r--action.c107
1 files changed, 86 insertions, 21 deletions
diff --git a/action.c b/action.c
index 9ad05a9f..829ef1eb 100644
--- a/action.c
+++ b/action.c
@@ -387,14 +387,31 @@ static rsRetVal getReturnCode(action_t *pThis)
}
+/* set the action to a new state
+ * rgerhards, 2007-08-02
+ */
+static inline void actionSetState(action_t *pThis, action_state_t newState)
+{
+ pThis->eState = newState;
+ DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
+}
+
/* Handles the transient commit state. So far, this is
* mostly a dummy...
* rgerhards, 2007-08-02
*/
static void actionCommitted(action_t *pThis)
{
- pThis->eState = ACT_STATE_RDY;
- DBGPRINTF("Action has committed.\n");
+ actionSetState(pThis, ACT_STATE_RDY);
+}
+
+
+/* set action to "rtry" state.
+ * rgerhards, 2007-08-02
+ */
+static void actionRetry(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_RTRY);
}
@@ -405,8 +422,7 @@ static void actionCommitted(action_t *pThis)
*/
static void actionDisable(action_t *pThis)
{
- pThis->eState = ACT_STATE_DIED;
- DBGPRINTF("Action requested to be disabled, done that.\n");
+ actionSetState(pThis, ACT_STATE_DIED);
}
@@ -422,9 +438,9 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow)
{
if(ttNow == NO_TIME_PROVIDED)
time(&ttNow);
- pThis->eState = ACT_STATE_SUSP;
pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
- DBGPRINTF("Action requested to be suspended, done that, retry=%d\n", (int) pThis->ttResumeRtry);
+ actionSetState(pThis, ACT_STATE_SUSP);
+ DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry);
}
@@ -442,10 +458,15 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
+RUNLOG_STR("actionDoRetry():");
iRetries = 0;
while(pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
- if(iRet == RS_RET_SUSPENDED) {
+ if(iRet == RS_RET_OK) {
+ actionSetState(pThis, ACT_STATE_RDY);
+RUNLOG_STR("tryResume succeeded");
+ } else if(iRet == RS_RET_SUSPENDED) {
+RUNLOG_STR("still suspended");;
/* max retries reached? */
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
actionSuspend(pThis, ttNow);
@@ -479,6 +500,7 @@ static rsRetVal actionTryResume(action_t *pThis)
ASSERT(pThis != NULL);
+RUNLOG_STR("actionTryResume()");
if(pThis->eState == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
@@ -489,7 +511,7 @@ static rsRetVal actionTryResume(action_t *pThis)
*/
time(&ttNow); /* cache "now" */
if(ttNow > pThis->ttResumeRtry) {
- pThis->eState = ACT_STATE_RTRY; /* back to retries */
+ actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
}
}
@@ -515,6 +537,7 @@ static rsRetVal actionPrepare(action_t *pThis)
{
DEFiRet;
+RUNLOG_STR("actionPrepare()");
assert(pThis != NULL);
if(pThis->eState == ACT_STATE_RTRY) {
CHKiRet(actionTryResume(pThis));
@@ -525,7 +548,7 @@ static rsRetVal actionPrepare(action_t *pThis)
*/
if(pThis->eState == ACT_STATE_RDY) {
CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData));
- pThis->eState = ACT_STATE_ITX;
+ actionSetState(pThis, ACT_STATE_ITX);
}
finalize_it:
@@ -646,6 +669,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
+ DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs));
pThis->bHadAutoCommit = 0;
@@ -662,7 +686,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
pThis->bHadAutoCommit = 1;
break;
case RS_RET_SUSPENDED:
- actionSuspend(pThis, NO_TIME_PROVIDED);
+ actionRetry(pThis);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
@@ -681,6 +705,29 @@ finalize_it:
}
+/* process a message
+ * this readies the action and then calls doAction()
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ISOBJ_TYPE_assert(pMsg, msg);
+
+RUNLOG_STR("inside actionProcessMsg()");
+ CHKiRet(actionPrepare(pThis));
+ if(pThis->eState == ACT_STATE_ITX)
+ CHKiRet(actionCallDoAction(pThis, pMsg));
+
+ iRet = getReturnCode(pThis);
+finalize_it:
+ RETiRet;
+}
+
+
/* finish processing a batch. Most importantly, that means we commit if we
* need to do so.
* rgerhards, 2008-01-28
@@ -703,7 +750,7 @@ finishBatch(action_t *pThis)
actionCommitted(pThis);
break;
case RS_RET_SUSPENDED:
- actionSuspend(pThis, NO_TIME_PROVIDED);
+ actionRetry(pThis);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
@@ -731,6 +778,33 @@ finalize_it:
}
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+rsRetVal
+actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp)
+{
+ int i;
+ msg_t *pMsg;
+ rsRetVal localRet;
+ DEFiRet;
+
+ assert(paUsrp != NULL);
+
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ localRet = actionProcessMessage(pAction, pMsg);
+ dbgprintf("action call returned %d\n", localRet);
+ msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
+ CHKiRet(localRet);
+ }
+ iRet = finishBatch(pAction);
+
+finalize_it:
+ RETiRet;
+}
#pragma GCC diagnostic ignored "-Wempty-body"
/* receive an array of to-process user pointers and submit them
* for processing.
@@ -739,9 +813,7 @@ finalize_it:
rsRetVal
actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
{
- int i;
int iCancelStateSave;
- msg_t *pMsg;
DEFiRet;
assert(paUsrp != NULL);
@@ -756,17 +828,10 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
pthread_setcancelstate(iCancelStateSave, NULL);
- for(i = 0 ; i < paUsrp->nElem ; i++) {
- pMsg = (msg_t*) paUsrp->pUsrp[i];
-dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
- CHKiRet(actionCallDoAction(pAction, pMsg));
- msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
- }
- iRet = finishBatch(pAction);
+ iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp);
pthread_cleanup_pop(1); /* unlock mutex */
-finalize_it:
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"