summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-07 10:44:46 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-07 10:44:46 +0200
commit68877497a131d5b7c5b1588b771a623fc0ad41c1 (patch)
tree75f2267c053ff9bed596d6b46787045cf212f0ed /action.c
parentbbb0d7c9d1b63dfd78f0ab33fd014909b20bf785 (diff)
downloadrsyslog-68877497a131d5b7c5b1588b771a623fc0ad41c1.tar.gz
rsyslog-68877497a131d5b7c5b1588b771a623fc0ad41c1.tar.xz
rsyslog-68877497a131d5b7c5b1588b771a623fc0ad41c1.zip
first shot at action state machine implemention (untested)
I am commiting it so that the code is visible, but will no begin with the test environment.
Diffstat (limited to 'action.c')
-rw-r--r--action.c462
1 files changed, 335 insertions, 127 deletions
diff --git a/action.c b/action.c
index 928b30dc..9ad05a9f 100644
--- a/action.c
+++ b/action.c
@@ -45,6 +45,8 @@
#include "wti.h"
#include "datetime.h"
+#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
+
/* forward definitions */
rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*);
@@ -310,87 +312,223 @@ finalize_it:
}
-/* set an action back to active state -- rgerhards, 2007-08-02
+
+/* set the global resume interval
+ */
+rsRetVal actionSetGlobalResumeInterval(int iNewVal)
+{
+ glbliActionResumeInterval = iNewVal;
+ return RS_RET_OK;
+}
+
+
+/* returns the action state name in human-readable form
+ * returned string must not be modified.
+ * rgerhards, 2009-05-07
*/
-static rsRetVal actionResume(action_t *pThis)
+static uchar *getActStateName(action_t *pThis)
+{
+ switch(pThis->eState) {
+ case ACT_STATE_RDY:
+ return (uchar*) "rdy";
+ case ACT_STATE_ITX:
+ return (uchar*) "itx";
+ case ACT_STATE_RTRY:
+ return (uchar*) "rtry";
+ case ACT_STATE_SUSP:
+ return (uchar*) "susp";
+ case ACT_STATE_DIED:
+ return (uchar*) "died";
+ case ACT_STATE_COMM:
+ return (uchar*) "comm";
+ default:
+ return (uchar*) "ERROR/UNKNWON";
+ }
+}
+
+
+/* returns a suitable return code based on action state
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal getReturnCode(action_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
- pThis->bSuspended = 0;
+ switch(pThis->eState) {
+ case ACT_STATE_RDY:
+ iRet = RS_RET_OK;
+ break;
+ case ACT_STATE_ITX:
+ if(pThis->bHadAutoCommit) {
+ pThis->bHadAutoCommit = 0; /* auto-reset */
+ iRet = RS_RET_PREVIOUS_COMMITTED;
+ } else {
+ iRet = RS_RET_DEFER_COMMIT;
+ }
+ break;
+ case ACT_STATE_RTRY:
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case ACT_STATE_SUSP:
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case ACT_STATE_DIED:
+ iRet = RS_RET_DISABLE_ACTION;
+ break;
+ default:
+ DBGPRINTF("Invalid action engine state %d, program error\n",
+ (int) pThis->eState);
+ iRet = RS_RET_ERR;
+ break;
+ }
RETiRet;
}
-/* set the global resume interval
+/* Handles the transient commit state. So far, this is
+ * mostly a dummy...
+ * rgerhards, 2007-08-02
*/
-rsRetVal actionSetGlobalResumeInterval(int iNewVal)
+static void actionCommitted(action_t *pThis)
{
- glbliActionResumeInterval = iNewVal;
- return RS_RET_OK;
+ pThis->eState = ACT_STATE_RDY;
+ DBGPRINTF("Action has committed.\n");
+}
+
+
+/* Disable action, this means it will never again be usable
+ * until rsyslog is reloaded. Use only as a last resort, but
+ * depends on output module.
+ * rgerhards, 2007-08-02
+ */
+static void actionDisable(action_t *pThis)
+{
+ pThis->eState = ACT_STATE_DIED;
+ DBGPRINTF("Action requested to be disabled, done that.\n");
+}
+
+
+/* Suspend action, this involves changing the acton state as well
+ * as setting the next retry time.
+ * if we have more than 10 retries, we prolong the
+ * retry interval. If something is really stalled, it will
+ * get re-tried only very, very seldom - but that saves
+ * CPU time. TODO: maybe a config option for that?
+ * rgerhards, 2007-08-02
+ */
+static inline void actionSuspend(action_t *pThis, time_t ttNow)
+{
+ if(ttNow == NO_TIME_PROVIDED)
+ time(&ttNow);
+ pThis->eState = ACT_STATE_SUSP;
+ pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ DBGPRINTF("Action requested to be suspended, done that, retry=%d\n", (int) pThis->ttResumeRtry);
}
-/* suspend an action -- rgerhards, 2007-08-02
+/* actually do retry processing. Note that the function receives a timestamp so
+ * that we do not need to call the (expensive) time() API.
+ * Note that we do the full retry processing here, doing the configured number of
+ * iterations.
+ * rgerhards, 2009-05-07
*/
-static rsRetVal actionSuspend(action_t *pThis, time_t tNow)
+static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
{
+ int iRetries;
+ int iSleepPeriod;
DEFiRet;
ASSERT(pThis != NULL);
- pThis->bSuspended = 1;
- pThis->ttResumeRtry = tNow + pThis->iResumeInterval;
- pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */
+
+ iRetries = 0;
+ while(pThis->eState == ACT_STATE_RTRY) {
+ iRet = pThis->pMod->tryResume(pThis->pModData);
+ if(iRet == RS_RET_SUSPENDED) {
+ /* max retries reached? */
+ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
+ actionSuspend(pThis, ttNow);
+ } else {
+ ++pThis->iNbrResRtry;
+ ++iRetries;
+ iSleepPeriod = pThis->iResumeInterval;
+ ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
+ srSleep(iSleepPeriod, 0);
+ }
+ } else if(iRet == RS_RET_DISABLE_ACTION) {
+ actionDisable(pThis);
+ }
+ }
+
+ if(pThis->eState == ACT_STATE_RDY) {
+ pThis->iNbrResRtry = 0;
+ }
RETiRet;
}
/* try to resume an action -- rgerhards, 2007-08-02
- * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the
- * action is still suspended.
+ * changed to new action state engine -- rgerhards, 2009-05-07
*/
static rsRetVal actionTryResume(action_t *pThis)
{
DEFiRet;
- time_t ttNow;
+ time_t ttNow = NO_TIME_PROVIDED;
ASSERT(pThis != NULL);
- /* for resume handling, we must always obtain a fresh timestamp. We used
- * to use the action timestamp, but in this case we will never reach a
- * point where a resumption is actually tried, because the action timestamp
- * is always in the past. So we can not avoid doing a fresh time() call
- * here. -- rgerhards, 2009-03-18
- */
- time(&ttNow); /* cache "now" */
-
- /* first check if it is time for a re-try */
- if(ttNow > pThis->ttResumeRtry) {
- iRet = pThis->pMod->tryResume(pThis->pModData);
- if(iRet == RS_RET_SUSPENDED) {
- /* set new tryResume time */
- ++pThis->iNbrResRtry;
- /* if we have more than 10 retries, we prolong the
- * retry interval. If something is really stalled, it will
- * get re-tried only very, very seldom - but that saves
- * CPU time. TODO: maybe a config option for that?
- * rgerhards, 2007-08-02
- */
- pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ if(pThis->eState == ACT_STATE_SUSP) {
+ /* if we are suspended, we need to check if the timeout expired.
+ * for this handling, we must always obtain a fresh timestamp. We used
+ * to use the action timestamp, but in this case we will never reach a
+ * point where a resumption is actually tried, because the action timestamp
+ * is always in the past. So we can not avoid doing a fresh time() call
+ * here. -- rgerhards, 2009-03-18
+ */
+ time(&ttNow); /* cache "now" */
+ if(ttNow > pThis->ttResumeRtry) {
+ pThis->eState = ACT_STATE_RTRY; /* back to retries */
}
- } else {
- /* it's too early, we are still suspended --> indicate this */
- iRet = RS_RET_SUSPENDED;
}
- if(iRet == RS_RET_OK)
- actionResume(pThis);
+ if(pThis->eState == ACT_STATE_RTRY) {
+ if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
+ time(&ttNow);
+ CHKiRet(actionDoRetry(pThis, ttNow));
+ }
+
+ DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
+ getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* prepare an action for performing work. This involves trying to recover it,
+ * depending on its current state.
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal actionPrepare(action_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ if(pThis->eState == ACT_STATE_RTRY) {
+ CHKiRet(actionTryResume(pThis));
+ }
- dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n",
- iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ /* if we are now ready, we initialize the transaction and advance
+ * action state accordingly
+ */
+ if(pThis->eState == ACT_STATE_RDY) {
+ CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData));
+ pThis->eState = ACT_STATE_ITX;
+ }
+finalize_it:
RETiRet;
}
@@ -407,12 +545,11 @@ rsRetVal actionDbgPrint(action_t *pThis)
dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
- dbgprintf("\tSuspended: %d", pThis->bSuspended);
- if(pThis->bSuspended) {
- dbgprintf(" next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
+ if(pThis->eState == ACT_STATE_SUSP) {
+ dbgprintf("\tresume next retry: %u, number retries: %d",
+ (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
}
- dbgprintf("\n");
- dbgprintf("\tDisabled: %d\n", !pThis->bEnabled);
+ dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
dbgprintf("\n");
@@ -420,25 +557,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
-//MULTIQUEUE: think about these two functions below
-/* call the DoAction output plugin entry point
- * rgerhards, 2008-01-28
+/* prepare the calling parameters for doAction()
+ * rgerhards, 2009-05-07
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
-actionCallDoAction(action_t *pAction, msg_t *pMsg)
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar ***pppMsgs)
{
- DEFiRet;
- int iRetries;
+ uchar **ppMsgs = *pppMsgs;
int i;
- int iArr;
- int iSleepPeriod;
- int bCallAction;
- int iCancelStateSave;
- uchar **ppMsgs; /* array of message pointers for doAction */
+ DEFiRet;
ASSERT(pAction != NULL);
-
/* create the array for doAction() message pointers */
if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -456,87 +584,154 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
default:assert(0); /* software bug if this happens! */
}
}
- iRetries = 0;
- /* We now must guard the output module against execution by multiple threads. The
- * plugin interface specifies that output modules must not be thread-safe (except
- * if they notify us they are - functionality not yet implemented...).
- * rgerhards, 2008-01-30
- */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- pthread_setcancelstate(iCancelStateSave, NULL);
- do { /* on first invocation, this if should never be true. We just put it at the top
- * of the loop so that processing (and code) is simplified. This code is actually
- * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30
- */
- if(iRet == RS_RET_SUSPENDED) {
- /* ok, this calls for our retry logic... */
- ++iRetries;
- iSleepPeriod = pAction->iResumeInterval;
- srSleep(iSleepPeriod, 0);
- }
- /* first check if we are suspended and, if so, retry */
- if(actionIsSuspended(pAction)) {
- iRet = actionTryResume(pAction);
- if(iRet == RS_RET_OK)
- bCallAction = 1;
- else
- bCallAction = 0;
- } else {
- bCallAction = 1;
- }
- if(bCallAction) {
- /* call configured action */
- /* MULTIQUEUE: TODO: and this now gets us in trouble. If it was suspended, we can
- * assume (and must so) that the action did not succeed. So we now need to redo all
- * those messages from the batch that are not yet processed.
- */
- iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData);
- if(iRet == RS_RET_SUSPENDED) {
- dbgprintf("Action requested to be suspended, done that.\n");
- actionSuspend(pAction, getActNow(pAction));
- }
- }
+finalize_it:
+ *pppMsgs = ppMsgs;
+ RETiRet;
+}
- } while( iRet == RS_RET_SUSPENDED
- && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */
- if(iRet == RS_RET_DISABLE_ACTION) {
- dbgprintf("Action requested to be disabled, done that.\n");
- pAction->bEnabled = 0; /* that's it... */
- }
-
- pthread_cleanup_pop(1); /* unlock mutex */
+/* cleanup doAction calling parameters
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***pppMsgs)
+{
+ uchar **ppMsgs = *pppMsgs;
+ int i;
+ int iArr;
+ DEFiRet;
-finalize_it:
- /* cleanup */
+ ASSERT(pAction != NULL);
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
if(ppMsgs[i] != NULL) {
switch(pAction->eParamPassing) {
case ACT_ARRAY_PASSING:
iArr = 0;
while(((char **)ppMsgs[i])[iArr] != NULL)
- d_free(((char **)ppMsgs[i])[iArr++]);
- d_free(ppMsgs[i]);
+ free(((char **)ppMsgs[i])[iArr++]);
+ free(ppMsgs[i]);
break;
case ACT_STRING_PASSING:
- d_free(ppMsgs[i]);
+ free(ppMsgs[i]);
break;
default:
assert(0);
}
}
}
- d_free(ppMsgs);
- msgDestruct(&pMsg); /* we are now finished with the message */
+ free(ppMsgs);
+ *pppMsgs = NULL;
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
+/* call the DoAction output plugin entry point
+ * Performance note: we build the action parameters here in this function. That
+ * means we do it while we hold the action look, potentially reducing concurrency
+ * (especially if the action queue is run in DIRECT mode). As an alternative, we
+ * may generate all params for the batch as whole before aquiring the action. However,
+ * that requires more memory, for large batches potentially a lot of memory. So for the
+ * time being, I am doing it here - the performance hit should be very minor and may even
+ * not be a hit because we may gain CPU cache locality gains with the "fewer memory"
+ * approach (I'd say that is rater likely).
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionCallDoAction(action_t *pThis, msg_t *pMsg)
+{
+ uchar **ppMsgs; /* array of message pointers for doAction */
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ISOBJ_TYPE_assert(pMsg, msg);
+
+ CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs));
+
+ pThis->bHadAutoCommit = 0;
+ iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+ switch(iRet) {
+ case RS_RET_OK:
+ actionCommitted(pThis);
+ break;
+ case RS_RET_DEFER_COMMIT:
+ /* we are done, action state remains the same */
+ break;
+ case RS_RET_PREVIOUS_COMMITTED:
+ /* action state remains the same, but we had a commit. */
+ pThis->bHadAutoCommit = 1;
+ break;
+ case RS_RET_SUSPENDED:
+ actionSuspend(pThis, NO_TIME_PROVIDED);
+ break;
+ case RS_RET_DISABLE_ACTION:
+ actionDisable(pThis);
+ break;
+ default:/* permanent failure of this message - no sense in retrying. This is
+ * not yet handled (but easy TODO)
+ */
+ FINALIZE;
+ }
+ iRet = getReturnCode(pThis);
+
+finalize_it:
+ cleanupDoActionParams(pThis, &ppMsgs); /* iRet ignored! */
+
+ RETiRet;
+}
+
+
+/* finish processing a batch. Most importantly, that means we commit if we
+ * need to do so.
+ * rgerhards, 2008-01-28
+ */
+static rsRetVal
+finishBatch(action_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->eState == ACT_STATE_RDY)
+ FINALIZE; /* nothing to do */
+
+ CHKiRet(actionPrepare(pThis));
+ if(pThis->eState == ACT_STATE_ITX) {
+ iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData);
+ switch(iRet) {
+ case RS_RET_OK:
+ actionCommitted(pThis);
+ break;
+ case RS_RET_SUSPENDED:
+ actionSuspend(pThis, NO_TIME_PROVIDED);
+ break;
+ case RS_RET_DISABLE_ACTION:
+ actionDisable(pThis);
+ break;
+ case RS_RET_DEFER_COMMIT:
+ DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
+ "- ignored\n");
+ actionCommitted(pThis);
+ break;
+ case RS_RET_PREVIOUS_COMMITTED:
+ DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
+ "- ignored\n");
+ actionCommitted(pThis);
+ break;
+ default:/* permanent failure of this message - no sense in retrying. This is
+ * not yet handled (but easy TODO)
+ */
+ FINALIZE;
+ }
+ }
+ iRet = getReturnCode(pThis);
+
+finalize_it:
+ RETiRet;
+}
+
+
+#pragma GCC diagnostic ignored "-Wempty-body"
/* receive an array of to-process user pointers and submit them
* for processing.
* rgerhards, 2009-04-22
@@ -545,23 +740,36 @@ rsRetVal
actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
{
int i;
+ int iCancelStateSave;
msg_t *pMsg;
DEFiRet;
assert(paUsrp != NULL);
- if(pAction->pMod->mod.om.beginTransaction != NULL)
- CHKiRet(pAction->pMod->mod.om.beginTransaction(pAction->pModData));
+ /* We now must guard the output module against execution by multiple threads. The
+ * plugin interface specifies that output modules must not be thread-safe (except
+ * if they notify us they are - functionality not yet implemented...).
+ * rgerhards, 2008-01-30
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
for(i = 0 ; i < paUsrp->nElem ; i++) {
pMsg = (msg_t*) paUsrp->pUsrp[i];
dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
CHKiRet(actionCallDoAction(pAction, pMsg));
+ msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
}
- if(pAction->pMod->mod.om.endTransaction != NULL)
- CHKiRet(pAction->pMod->mod.om.endTransaction(pAction->pModData));
+ iRet = finishBatch(pAction);
+
+ pthread_cleanup_pop(1); /* unlock mutex */
+
finalize_it:
RETiRet;
}
+#pragma GCC diagnostic warning "-Wempty-body"
/* call the HUP handler for a given action, if such a handler is defined. The
@@ -799,8 +1007,8 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
* should check from time to time if affairs have improved.
* rgerhards, 2007-07-24
*/
- if(pAction->bEnabled == 0) {
- ABORT_FINALIZE(RS_RET_OK);
+ if(pAction->eState == ACT_STATE_DIED) {
+ ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
}
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
@@ -983,7 +1191,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n");
pAction->f_ReduceRepeated = 0;
}
- pAction->bEnabled = 1; /* action is enabled */
+ pAction->eState = ACT_STATE_RDY; /* action is enabled */
if(bSuspended)
actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */