summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c107
-rw-r--r--plugins/omtesting/omtesting.c156
2 files changed, 231 insertions, 32 deletions
diff --git a/action.c b/action.c
index 9ad05a9f..829ef1eb 100644
--- a/action.c
+++ b/action.c
@@ -387,14 +387,31 @@ static rsRetVal getReturnCode(action_t *pThis)
}
+/* set the action to a new state
+ * rgerhards, 2007-08-02
+ */
+static inline void actionSetState(action_t *pThis, action_state_t newState)
+{
+ pThis->eState = newState;
+ DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
+}
+
/* Handles the transient commit state. So far, this is
* mostly a dummy...
* rgerhards, 2007-08-02
*/
static void actionCommitted(action_t *pThis)
{
- pThis->eState = ACT_STATE_RDY;
- DBGPRINTF("Action has committed.\n");
+ actionSetState(pThis, ACT_STATE_RDY);
+}
+
+
+/* set action to "rtry" state.
+ * rgerhards, 2007-08-02
+ */
+static void actionRetry(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_RTRY);
}
@@ -405,8 +422,7 @@ static void actionCommitted(action_t *pThis)
*/
static void actionDisable(action_t *pThis)
{
- pThis->eState = ACT_STATE_DIED;
- DBGPRINTF("Action requested to be disabled, done that.\n");
+ actionSetState(pThis, ACT_STATE_DIED);
}
@@ -422,9 +438,9 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow)
{
if(ttNow == NO_TIME_PROVIDED)
time(&ttNow);
- pThis->eState = ACT_STATE_SUSP;
pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
- DBGPRINTF("Action requested to be suspended, done that, retry=%d\n", (int) pThis->ttResumeRtry);
+ actionSetState(pThis, ACT_STATE_SUSP);
+ DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry);
}
@@ -442,10 +458,15 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
+RUNLOG_STR("actionDoRetry():");
iRetries = 0;
while(pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
- if(iRet == RS_RET_SUSPENDED) {
+ if(iRet == RS_RET_OK) {
+ actionSetState(pThis, ACT_STATE_RDY);
+RUNLOG_STR("tryResume succeeded");
+ } else if(iRet == RS_RET_SUSPENDED) {
+RUNLOG_STR("still suspended");;
/* max retries reached? */
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
actionSuspend(pThis, ttNow);
@@ -479,6 +500,7 @@ static rsRetVal actionTryResume(action_t *pThis)
ASSERT(pThis != NULL);
+RUNLOG_STR("actionTryResume()");
if(pThis->eState == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
@@ -489,7 +511,7 @@ static rsRetVal actionTryResume(action_t *pThis)
*/
time(&ttNow); /* cache "now" */
if(ttNow > pThis->ttResumeRtry) {
- pThis->eState = ACT_STATE_RTRY; /* back to retries */
+ actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
}
}
@@ -515,6 +537,7 @@ static rsRetVal actionPrepare(action_t *pThis)
{
DEFiRet;
+RUNLOG_STR("actionPrepare()");
assert(pThis != NULL);
if(pThis->eState == ACT_STATE_RTRY) {
CHKiRet(actionTryResume(pThis));
@@ -525,7 +548,7 @@ static rsRetVal actionPrepare(action_t *pThis)
*/
if(pThis->eState == ACT_STATE_RDY) {
CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData));
- pThis->eState = ACT_STATE_ITX;
+ actionSetState(pThis, ACT_STATE_ITX);
}
finalize_it:
@@ -646,6 +669,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
+ DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs));
pThis->bHadAutoCommit = 0;
@@ -662,7 +686,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
pThis->bHadAutoCommit = 1;
break;
case RS_RET_SUSPENDED:
- actionSuspend(pThis, NO_TIME_PROVIDED);
+ actionRetry(pThis);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
@@ -681,6 +705,29 @@ finalize_it:
}
+/* process a message
+ * this readies the action and then calls doAction()
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ISOBJ_TYPE_assert(pMsg, msg);
+
+RUNLOG_STR("inside actionProcessMsg()");
+ CHKiRet(actionPrepare(pThis));
+ if(pThis->eState == ACT_STATE_ITX)
+ CHKiRet(actionCallDoAction(pThis, pMsg));
+
+ iRet = getReturnCode(pThis);
+finalize_it:
+ RETiRet;
+}
+
+
/* finish processing a batch. Most importantly, that means we commit if we
* need to do so.
* rgerhards, 2008-01-28
@@ -703,7 +750,7 @@ finishBatch(action_t *pThis)
actionCommitted(pThis);
break;
case RS_RET_SUSPENDED:
- actionSuspend(pThis, NO_TIME_PROVIDED);
+ actionRetry(pThis);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
@@ -731,6 +778,33 @@ finalize_it:
}
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+rsRetVal
+actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp)
+{
+ int i;
+ msg_t *pMsg;
+ rsRetVal localRet;
+ DEFiRet;
+
+ assert(paUsrp != NULL);
+
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ localRet = actionProcessMessage(pAction, pMsg);
+ dbgprintf("action call returned %d\n", localRet);
+ msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
+ CHKiRet(localRet);
+ }
+ iRet = finishBatch(pAction);
+
+finalize_it:
+ RETiRet;
+}
#pragma GCC diagnostic ignored "-Wempty-body"
/* receive an array of to-process user pointers and submit them
* for processing.
@@ -739,9 +813,7 @@ finalize_it:
rsRetVal
actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
{
- int i;
int iCancelStateSave;
- msg_t *pMsg;
DEFiRet;
assert(paUsrp != NULL);
@@ -756,17 +828,10 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
pthread_setcancelstate(iCancelStateSave, NULL);
- for(i = 0 ; i < paUsrp->nElem ; i++) {
- pMsg = (msg_t*) paUsrp->pUsrp[i];
-dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
- CHKiRet(actionCallDoAction(pAction, pMsg));
- msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
- }
- iRet = finishBatch(pAction);
+ iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp);
pthread_cleanup_pop(1); /* unlock mutex */
-finalize_it:
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c
index 411bcf88..8f6cdbe5 100644
--- a/plugins/omtesting/omtesting.c
+++ b/plugins/omtesting/omtesting.c
@@ -22,7 +22,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -46,12 +46,14 @@
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <time.h>
#include <string.h>
#include <ctype.h>
#include <assert.h>
#include "dirty.h"
#include "syslogd-types.h"
#include "module-template.h"
+#include "cfsysline.h"
MODULE_TYPE_OUTPUT
@@ -59,9 +61,18 @@ MODULE_TYPE_OUTPUT
*/
DEF_OMOD_STATIC_DATA
+static int bEchoStdout = 0; /* echo non-failed messages to stdout */
+
typedef struct _instanceData {
+ enum { MD_SLEEP, MD_FAIL, MD_RANDFAIL, MD_ALWAYS_SUSPEND }
+ mode;
+ int bEchoStdout;
int iWaitSeconds;
int iWaitUSeconds; /* milli-seconds (one million of a second, just to make sure...) */
+ int iCurrCallNbr;
+ int iFailFrequency;
+ int iResumeAfter;
+ int iCurrRetries;
} instanceData;
BEGINcreateInstance
@@ -85,19 +96,106 @@ CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
-BEGINtryResume
-CODESTARTtryResume
-ENDtryResume
+/* implement "fail" command in retry processing */
+static rsRetVal doFailOnResume(instanceData *pData)
+{
+ DEFiRet;
-BEGINdoAction
-CODESTARTdoAction
+ dbgprintf("fail retry curr %d, max %d\n", pData->iCurrRetries, pData->iResumeAfter);
+ if(++pData->iCurrRetries == pData->iResumeAfter) {
+ iRet = RS_RET_OK;
+ } else {
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ RETiRet;
+}
+
+
+/* implement "fail" command */
+static rsRetVal doFail(instanceData *pData)
+{
+ DEFiRet;
+
+ dbgprintf("fail curr %d, frquency %d\n", pData->iCurrCallNbr, pData->iFailFrequency);
+ if(pData->iCurrCallNbr++ % pData->iFailFrequency == 0) {
+ pData->iCurrRetries = 0;
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ RETiRet;
+}
+
+
+/* implement "sleep" command */
+static rsRetVal doSleep(instanceData *pData)
+{
+ DEFiRet;
struct timeval tvSelectTimeout;
dbgprintf("sleep(%d, %d)\n", pData->iWaitSeconds, pData->iWaitUSeconds);
tvSelectTimeout.tv_sec = pData->iWaitSeconds;
tvSelectTimeout.tv_usec = pData->iWaitUSeconds; /* milli seconds */
select(0, NULL, NULL, NULL, &tvSelectTimeout);
- //dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet);
+ RETiRet;
+}
+
+
+/* implement "randomfail" command */
+static rsRetVal doRandFail(void)
+{
+ DEFiRet;
+ if((rand() >> 4) < (RAND_MAX >> 5)) { /* rougly same probability */
+ iRet = RS_RET_OK;
+ dbgprintf("omtesting randfail: succeeded this time\n");
+ } else {
+ iRet = RS_RET_SUSPENDED;
+ dbgprintf("omtesting randfail: failed this time\n");
+ }
+ RETiRet;
+}
+
+
+BEGINtryResume
+CODESTARTtryResume
+ dbgprintf("omtesting tryResume() called\n");
+ switch(pData->mode) {
+ case MD_SLEEP:
+ break;
+ case MD_FAIL:
+ iRet = doFailOnResume(pData);
+ break;
+ case MD_RANDFAIL:
+ iRet = doRandFail();
+ break;
+ case MD_ALWAYS_SUSPEND:
+ iRet = RS_RET_SUSPENDED;
+ }
+ dbgprintf("omtesting tryResume() returns iRet %d\n", iRet);
+ENDtryResume
+
+
+BEGINdoAction
+CODESTARTdoAction
+ dbgprintf("omtesting received msg '%s'\n", ppString[0]);
+ switch(pData->mode) {
+ case MD_SLEEP:
+ iRet = doSleep(pData);
+ break;
+ case MD_FAIL:
+ iRet = doFail(pData);
+ break;
+ case MD_RANDFAIL:
+ iRet = doRandFail();
+ case MD_ALWAYS_SUSPEND:
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ if(iRet == RS_RET_OK && pData->bEchoStdout) {
+ fprintf(stdout, "%s", ppString[0]);
+ fflush(stdout);
+ }
+ dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet);
ENDdoAction
@@ -113,7 +211,7 @@ BEGINparseSelectorAct
int i;
uchar szBuf[1024];
CODESTARTparseSelectorAct
-CODE_STD_STRING_REQUESTparseSelectorAct(0)
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* code here is quick and dirty - if you like, clean it up. But keep
* in mind it is just a testing aid ;) -- rgerhards, 2007-12-31
*/
@@ -135,6 +233,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(0)
if(isspace(*p))
++p;
+ dbgprintf("omtesting command: '%s'\n", szBuf);
if(!strcmp((char*) szBuf, "sleep")) {
/* parse seconds */
for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) {
@@ -152,12 +251,43 @@ CODE_STD_STRING_REQUESTparseSelectorAct(0)
if(isspace(*p))
++p;
pData->iWaitUSeconds = atoi((char*) szBuf);
- }
- /* once there are other modes, here is the spot to add it! */
- else {
+ pData->mode = MD_SLEEP;
+ } else if(!strcmp((char*) szBuf, "fail")) {
+ /* "fail fail-freqency resume-after"
+ * fail-frequency specifies how often doAction() fails
+ * resume-after speicifes how fast tryResume() should come back with success
+ * all numbers being "times called"
+ */
+ /* parse fail-frequence */
+ for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) {
+ szBuf[i] = *p++;
+ }
+ szBuf[i] = '\0';
+ if(isspace(*p))
+ ++p;
+ pData->iFailFrequency = atoi((char*) szBuf);
+ /* parse resume-after */
+ for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) {
+ szBuf[i] = *p++;
+ }
+ szBuf[i] = '\0';
+ if(isspace(*p))
+ ++p;
+ pData->iResumeAfter = atoi((char*) szBuf);
+ pData->iCurrCallNbr = 1;
+ pData->mode = MD_FAIL;
+ } else if(!strcmp((char*) szBuf, "randfail")) {
+ pData->mode = MD_RANDFAIL;
+ } else if(!strcmp((char*) szBuf, "always_suspend")) {
+ pData->mode = MD_ALWAYS_SUSPEND;
+ } else {
dbgprintf("invalid mode '%s', doing 'sleep 1 0' - fix your config\n", szBuf);
}
+ pData->bEchoStdout = bEchoStdout;
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS,
+ (uchar*)"RSYSLOG_TraditionalForwardFormat"));
+
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -177,6 +307,10 @@ BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionomtestingechostdout", 0, eCmdHdlrBinary, NULL,
+ &bEchoStdout, STD_LOADABLE_MODULE_ID));
+ /* we seed the random-number generator in any case... */
+ srand(time(NULL));
ENDmodInit
/*
* vi:set ai: