diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-12 17:57:04 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-12 17:57:04 +0200 |
commit | e2b229868955a6f6a6380273314d0d90ddad1273 (patch) | |
tree | 5a19ce9b00e9c50c1cddd78ad2d40f3d4dcdf1fb | |
parent | fbb040b411ee564e4a4bbaf53ec342236929324f (diff) | |
download | rsyslog-e2b229868955a6f6a6380273314d0d90ddad1273.tar.gz rsyslog-e2b229868955a6f6a6380273314d0d90ddad1273.tar.xz rsyslog-e2b229868955a6f6a6380273314d0d90ddad1273.zip |
action batch processing implemented
... passed initial tests, but of course more are needed
-rw-r--r-- | action.c | 156 | ||||
-rw-r--r-- | runtime/batch.h | 4 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 |
3 files changed, 136 insertions, 25 deletions
@@ -49,7 +49,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -262,7 +262,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*, batch_t*))actionCallDoActionMULTIQUEUE)); + (rsRetVal (*)(void*, batch_t*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -372,10 +372,8 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: - iRet = RS_RET_SUSPENDED; - break; case ACT_STATE_DIED: - iRet = RS_RET_DISABLE_ACTION; + iRet = RS_RET_ACTION_FAILED; break; default: DBGPRINTF("Invalid action engine state %d, program error\n", @@ -459,15 +457,12 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); -RUNLOG_STR("actionDoRetry():"); iRetries = 0; while(pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); if(iRet == RS_RET_OK) { actionSetState(pThis, ACT_STATE_RDY); -RUNLOG_STR("tryResume succeeded"); } else if(iRet == RS_RET_SUSPENDED) { -RUNLOG_STR("still suspended");; /* max retries reached? */ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis, ttNow); @@ -501,7 +496,6 @@ static rsRetVal actionTryResume(action_t *pThis) ASSERT(pThis != NULL); -RUNLOG_STR("actionTryResume()"); if(pThis->eState == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used @@ -522,8 +516,10 @@ RUNLOG_STR("actionTryResume()"); CHKiRet(actionDoRetry(pThis, ttNow)); } - DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", - getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + dbgprintf("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", + getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + } finalize_it: RETiRet; @@ -538,11 +534,8 @@ static rsRetVal actionPrepare(action_t *pThis) { DEFiRet; -RUNLOG_STR("actionPrepare()"); assert(pThis != NULL); - if(pThis->eState == ACT_STATE_RTRY) { - CHKiRet(actionTryResume(pThis)); - } + CHKiRet(actionTryResume(pThis)); /* if we are now ready, we initialize the transaction and advance * action state accordingly @@ -779,40 +772,155 @@ finalize_it: } -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* try to submit a partial batch of elements. + * rgerhards, 2009-05-12 */ static rsRetVal -actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) { int i; + int iElemProcessed; + int iCommittedUpTo; msg_t *pMsg; rsRetVal localRet; DEFiRet; assert(pBatch != NULL); + assert(pnElem != NULL); - for(i = 0 ; i < pBatch->nElem ; i++) { + i = pBatch->iDoneUpTo; /* all messages below that index are processed */ + iElemProcessed = 0; + iCommittedUpTo = i; + while(iElemProcessed <= *pnElem && i < pBatch->nElem) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; -dbgprintf("actionCall..MULTIQUEUE: i: %d/%d, pMsg: %p\n", i, pBatch->nElem, pMsg); + dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later! localRet = actionProcessMessage(pAction, pMsg); dbgprintf("action call returned %d\n", localRet); + if(localRet == RS_RET_OK) { + /* mark messages as committed */ + while(iCommittedUpTo < i) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + /* mark messages as committed */ + while(iCommittedUpTo < i - 1) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else { + iRet = localRet; + FINALIZE; + } + ++i; + ++iElemProcessed; + } + +finalize_it: + if(pBatch->iDoneUpTo != iCommittedUpTo) { + *pnElem += iCommittedUpTo - pBatch->iDoneUpTo; + pBatch->iDoneUpTo = iCommittedUpTo; + } + RETiRet; +} + + +/* submit a batch for actual action processing. + * The first nElem elements are processed. This function calls itself + * recursively if it needs to handle errors. + * rgerhards, 2009-05-12 + */ +static rsRetVal +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +{ + int i; + int bDone; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + bDone = 0; + do { + localRet = tryDoAction(pAction, pBatch, &nElem); + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + /* try commit transaction, once done, we can simply do so as if + * that return state was returned from tryDoAction(). + */ + localRet = finishBatch(pAction); + } + + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + bDone = 1; + } else if(localRet == RS_RET_SUSPENDED) { + ; /* do nothing, this will retry the full batch */ + } else if(localRet == RS_RET_ACTION_FAILED) { + /* in this case, the whole batch can not be processed */ + for(i = 0 ; i < nElem ; ++i) { + pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD; + } + bDone = 1; + } else { + if(nElem == 1) { + pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD; + bDone = 1; + } else { + /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); + bDone = 1; + } + } + } while(!bDone); /* do .. while()! */ + + RETiRet; +} + + +/* receive a batch and process it. This includes retry handling. + * rgerhards, 2009-05-12 + */ +static rsRetVal +processAction(action_t *pAction, batch_t *pBatch) +{ + int i; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + pBatch->iDoneUpTo = 0; + /* TODO: think about action batches, must be handled at upper layer! + * MULTIQUEUE + */ + localRet = submitBatch(pAction, pBatch, pBatch->nElem); + CHKiRet(localRet); + + /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ + for(i = 0 ; i < pBatch->nElem ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ - CHKiRet(localRet); } iRet = finishBatch(pAction); finalize_it: RETiRet; } + + #pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 */ static rsRetVal -actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch) +processBatchMain(action_t *pAction, batch_t *pBatch) { int iCancelStateSave; DEFiRet; @@ -829,7 +937,7 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch) pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, pBatch); + iRet = processAction(pAction, pBatch); pthread_cleanup_pop(1); /* unlock mutex */ diff --git a/runtime/batch.h b/runtime/batch.h index cb40cf42..fcbbafce 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -35,7 +35,8 @@ typedef enum { BATCH_STATE_RDY = 0, /* object ready for processing */ BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */ BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */ - BATCH_STATE_DISC = 3, /* discarded - processed OK, but do not submit to any other action */ + BATCH_STATE_COMM = 3, /* message successfully commited */ + BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */ } batch_state_t; @@ -57,6 +58,7 @@ struct batch_obj_s { */ struct batch_s { int nElem; /* actual number of element in this entry */ + int iDoneUpTo; /* all messages below this index have state other than RDY */ batch_obj_t *pElem; /* batch elements */ }; diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 53a510b3..25f9eefe 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -281,6 +281,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_RSCORE_TOO_OLD = -2120, /**< rsyslog core is too old for ... (eg this plugin) */ RS_RET_DEFER_COMMIT = -2121, /**< output plugin status: not yet committed (an OK state!) */ RS_RET_PREVIOUS_COMMITTED = -2122, /**< output plugin status: previous record was committed (an OK state!) */ + RS_RET_ACTION_FAILED = -2122, /**< action failed and is now suspended (consider this permanent for the time being) */ RS_RET_FILENAME_INVALID = -2140, /**< filename invalid, not found, no access, ... */ /* RainerScript error messages (range 1000.. 1999) */ |