summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-12 17:57:04 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-12 17:57:04 +0200
commite2b229868955a6f6a6380273314d0d90ddad1273 (patch)
tree5a19ce9b00e9c50c1cddd78ad2d40f3d4dcdf1fb
parentfbb040b411ee564e4a4bbaf53ec342236929324f (diff)
downloadrsyslog-e2b229868955a6f6a6380273314d0d90ddad1273.tar.gz
rsyslog-e2b229868955a6f6a6380273314d0d90ddad1273.tar.xz
rsyslog-e2b229868955a6f6a6380273314d0d90ddad1273.zip
action batch processing implemented
... passed initial tests, but of course more are needed
-rw-r--r--action.c156
-rw-r--r--runtime/batch.h4
-rw-r--r--runtime/rsyslog.h1
3 files changed, 136 insertions, 25 deletions
diff --git a/action.c b/action.c
index b12eda6e..0253e1b6 100644
--- a/action.c
+++ b/action.c
@@ -49,7 +49,7 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
-static rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch);
+static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -262,7 +262,7 @@ actionConstructFinalize(action_t *pThis)
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
- (rsRetVal (*)(void*, batch_t*))actionCallDoActionMULTIQUEUE));
+ (rsRetVal (*)(void*, batch_t*))processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -372,10 +372,8 @@ static rsRetVal getReturnCode(action_t *pThis)
iRet = RS_RET_SUSPENDED;
break;
case ACT_STATE_SUSP:
- iRet = RS_RET_SUSPENDED;
- break;
case ACT_STATE_DIED:
- iRet = RS_RET_DISABLE_ACTION;
+ iRet = RS_RET_ACTION_FAILED;
break;
default:
DBGPRINTF("Invalid action engine state %d, program error\n",
@@ -459,15 +457,12 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
-RUNLOG_STR("actionDoRetry():");
iRetries = 0;
while(pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
if(iRet == RS_RET_OK) {
actionSetState(pThis, ACT_STATE_RDY);
-RUNLOG_STR("tryResume succeeded");
} else if(iRet == RS_RET_SUSPENDED) {
-RUNLOG_STR("still suspended");;
/* max retries reached? */
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
actionSuspend(pThis, ttNow);
@@ -501,7 +496,6 @@ static rsRetVal actionTryResume(action_t *pThis)
ASSERT(pThis != NULL);
-RUNLOG_STR("actionTryResume()");
if(pThis->eState == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
@@ -522,8 +516,10 @@ RUNLOG_STR("actionTryResume()");
CHKiRet(actionDoRetry(pThis, ttNow));
}
- DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
- getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
+ dbgprintf("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
+ getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ }
finalize_it:
RETiRet;
@@ -538,11 +534,8 @@ static rsRetVal actionPrepare(action_t *pThis)
{
DEFiRet;
-RUNLOG_STR("actionPrepare()");
assert(pThis != NULL);
- if(pThis->eState == ACT_STATE_RTRY) {
- CHKiRet(actionTryResume(pThis));
- }
+ CHKiRet(actionTryResume(pThis));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
@@ -779,40 +772,155 @@ finalize_it:
}
-/* receive an array of to-process user pointers and submit them
- * for processing.
- * rgerhards, 2009-04-22
+/* try to submit a partial batch of elements.
+ * rgerhards, 2009-05-12
*/
static rsRetVal
-actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch)
+tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
{
int i;
+ int iElemProcessed;
+ int iCommittedUpTo;
msg_t *pMsg;
rsRetVal localRet;
DEFiRet;
assert(pBatch != NULL);
+ assert(pnElem != NULL);
- for(i = 0 ; i < pBatch->nElem ; i++) {
+ i = pBatch->iDoneUpTo; /* all messages below that index are processed */
+ iElemProcessed = 0;
+ iCommittedUpTo = i;
+ while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
-dbgprintf("actionCall..MULTIQUEUE: i: %d/%d, pMsg: %p\n", i, pBatch->nElem, pMsg);
+ dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later!
localRet = actionProcessMessage(pAction, pMsg);
dbgprintf("action call returned %d\n", localRet);
+ if(localRet == RS_RET_OK) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i - 1) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else {
+ iRet = localRet;
+ FINALIZE;
+ }
+ ++i;
+ ++iElemProcessed;
+ }
+
+finalize_it:
+ if(pBatch->iDoneUpTo != iCommittedUpTo) {
+ *pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
+ pBatch->iDoneUpTo = iCommittedUpTo;
+ }
+ RETiRet;
+}
+
+
+/* submit a batch for actual action processing.
+ * The first nElem elements are processed. This function calls itself
+ * recursively if it needs to handle errors.
+ * rgerhards, 2009-05-12
+ */
+static rsRetVal
+submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
+{
+ int i;
+ int bDone;
+ rsRetVal localRet;
+ DEFiRet;
+
+ assert(pBatch != NULL);
+
+ bDone = 0;
+ do {
+ localRet = tryDoAction(pAction, pBatch, &nElem);
+ if( localRet == RS_RET_OK
+ || localRet == RS_RET_PREVIOUS_COMMITTED
+ || localRet == RS_RET_DEFER_COMMIT) {
+ /* try commit transaction, once done, we can simply do so as if
+ * that return state was returned from tryDoAction().
+ */
+ localRet = finishBatch(pAction);
+ }
+
+ if( localRet == RS_RET_OK
+ || localRet == RS_RET_PREVIOUS_COMMITTED
+ || localRet == RS_RET_DEFER_COMMIT) {
+ bDone = 1;
+ } else if(localRet == RS_RET_SUSPENDED) {
+ ; /* do nothing, this will retry the full batch */
+ } else if(localRet == RS_RET_ACTION_FAILED) {
+ /* in this case, the whole batch can not be processed */
+ for(i = 0 ; i < nElem ; ++i) {
+ pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD;
+ }
+ bDone = 1;
+ } else {
+ if(nElem == 1) {
+ pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD;
+ bDone = 1;
+ } else {
+ /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
+ submitBatch(pAction, pBatch, nElem / 2);
+ submitBatch(pAction, pBatch, nElem - (nElem / 2));
+ bDone = 1;
+ }
+ }
+ } while(!bDone); /* do .. while()! */
+
+ RETiRet;
+}
+
+
+/* receive a batch and process it. This includes retry handling.
+ * rgerhards, 2009-05-12
+ */
+static rsRetVal
+processAction(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ msg_t *pMsg;
+ rsRetVal localRet;
+ DEFiRet;
+
+ assert(pBatch != NULL);
+
+ pBatch->iDoneUpTo = 0;
+ /* TODO: think about action batches, must be handled at upper layer!
+ * MULTIQUEUE
+ */
+ localRet = submitBatch(pAction, pBatch, pBatch->nElem);
+ CHKiRet(localRet);
+
+ /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */
+ for(i = 0 ; i < pBatch->nElem ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
- CHKiRet(localRet);
}
iRet = finishBatch(pAction);
finalize_it:
RETiRet;
}
+
+
#pragma GCC diagnostic ignored "-Wempty-body"
/* receive an array of to-process user pointers and submit them
* for processing.
* rgerhards, 2009-04-22
*/
static rsRetVal
-actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch)
+processBatchMain(action_t *pAction, batch_t *pBatch)
{
int iCancelStateSave;
DEFiRet;
@@ -829,7 +937,7 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch)
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
pthread_setcancelstate(iCancelStateSave, NULL);
- iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, pBatch);
+ iRet = processAction(pAction, pBatch);
pthread_cleanup_pop(1); /* unlock mutex */
diff --git a/runtime/batch.h b/runtime/batch.h
index cb40cf42..fcbbafce 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -35,7 +35,8 @@ typedef enum {
BATCH_STATE_RDY = 0, /* object ready for processing */
BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */
BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */
- BATCH_STATE_DISC = 3, /* discarded - processed OK, but do not submit to any other action */
+ BATCH_STATE_COMM = 3, /* message successfully commited */
+ BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */
} batch_state_t;
@@ -57,6 +58,7 @@ struct batch_obj_s {
*/
struct batch_s {
int nElem; /* actual number of element in this entry */
+ int iDoneUpTo; /* all messages below this index have state other than RDY */
batch_obj_t *pElem; /* batch elements */
};
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 53a510b3..25f9eefe 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -281,6 +281,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_RSCORE_TOO_OLD = -2120, /**< rsyslog core is too old for ... (eg this plugin) */
RS_RET_DEFER_COMMIT = -2121, /**< output plugin status: not yet committed (an OK state!) */
RS_RET_PREVIOUS_COMMITTED = -2122, /**< output plugin status: previous record was committed (an OK state!) */
+ RS_RET_ACTION_FAILED = -2122, /**< action failed and is now suspended (consider this permanent for the time being) */
RS_RET_FILENAME_INVALID = -2140, /**< filename invalid, not found, no access, ... */
/* RainerScript error messages (range 1000.. 1999) */