diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 156 |
1 files changed, 132 insertions, 24 deletions
@@ -49,7 +49,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -262,7 +262,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*, batch_t*))actionCallDoActionMULTIQUEUE)); + (rsRetVal (*)(void*, batch_t*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -372,10 +372,8 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: - iRet = RS_RET_SUSPENDED; - break; case ACT_STATE_DIED: - iRet = RS_RET_DISABLE_ACTION; + iRet = RS_RET_ACTION_FAILED; break; default: DBGPRINTF("Invalid action engine state %d, program error\n", @@ -459,15 +457,12 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); -RUNLOG_STR("actionDoRetry():"); iRetries = 0; while(pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); if(iRet == RS_RET_OK) { actionSetState(pThis, ACT_STATE_RDY); -RUNLOG_STR("tryResume succeeded"); } else if(iRet == RS_RET_SUSPENDED) { -RUNLOG_STR("still suspended");; /* max retries reached? */ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis, ttNow); @@ -501,7 +496,6 @@ static rsRetVal actionTryResume(action_t *pThis) ASSERT(pThis != NULL); -RUNLOG_STR("actionTryResume()"); if(pThis->eState == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used @@ -522,8 +516,10 @@ RUNLOG_STR("actionTryResume()"); CHKiRet(actionDoRetry(pThis, ttNow)); } - DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", - getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + dbgprintf("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", + getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + } finalize_it: RETiRet; @@ -538,11 +534,8 @@ static rsRetVal actionPrepare(action_t *pThis) { DEFiRet; -RUNLOG_STR("actionPrepare()"); assert(pThis != NULL); - if(pThis->eState == ACT_STATE_RTRY) { - CHKiRet(actionTryResume(pThis)); - } + CHKiRet(actionTryResume(pThis)); /* if we are now ready, we initialize the transaction and advance * action state accordingly @@ -779,40 +772,155 @@ finalize_it: } -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* try to submit a partial batch of elements. + * rgerhards, 2009-05-12 */ static rsRetVal -actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) { int i; + int iElemProcessed; + int iCommittedUpTo; msg_t *pMsg; rsRetVal localRet; DEFiRet; assert(pBatch != NULL); + assert(pnElem != NULL); - for(i = 0 ; i < pBatch->nElem ; i++) { + i = pBatch->iDoneUpTo; /* all messages below that index are processed */ + iElemProcessed = 0; + iCommittedUpTo = i; + while(iElemProcessed <= *pnElem && i < pBatch->nElem) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; -dbgprintf("actionCall..MULTIQUEUE: i: %d/%d, pMsg: %p\n", i, pBatch->nElem, pMsg); + dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later! localRet = actionProcessMessage(pAction, pMsg); dbgprintf("action call returned %d\n", localRet); + if(localRet == RS_RET_OK) { + /* mark messages as committed */ + while(iCommittedUpTo < i) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + /* mark messages as committed */ + while(iCommittedUpTo < i - 1) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else { + iRet = localRet; + FINALIZE; + } + ++i; + ++iElemProcessed; + } + +finalize_it: + if(pBatch->iDoneUpTo != iCommittedUpTo) { + *pnElem += iCommittedUpTo - pBatch->iDoneUpTo; + pBatch->iDoneUpTo = iCommittedUpTo; + } + RETiRet; +} + + +/* submit a batch for actual action processing. + * The first nElem elements are processed. This function calls itself + * recursively if it needs to handle errors. + * rgerhards, 2009-05-12 + */ +static rsRetVal +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +{ + int i; + int bDone; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + bDone = 0; + do { + localRet = tryDoAction(pAction, pBatch, &nElem); + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + /* try commit transaction, once done, we can simply do so as if + * that return state was returned from tryDoAction(). + */ + localRet = finishBatch(pAction); + } + + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + bDone = 1; + } else if(localRet == RS_RET_SUSPENDED) { + ; /* do nothing, this will retry the full batch */ + } else if(localRet == RS_RET_ACTION_FAILED) { + /* in this case, the whole batch can not be processed */ + for(i = 0 ; i < nElem ; ++i) { + pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD; + } + bDone = 1; + } else { + if(nElem == 1) { + pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD; + bDone = 1; + } else { + /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); + bDone = 1; + } + } + } while(!bDone); /* do .. while()! */ + + RETiRet; +} + + +/* receive a batch and process it. This includes retry handling. + * rgerhards, 2009-05-12 + */ +static rsRetVal +processAction(action_t *pAction, batch_t *pBatch) +{ + int i; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + pBatch->iDoneUpTo = 0; + /* TODO: think about action batches, must be handled at upper layer! + * MULTIQUEUE + */ + localRet = submitBatch(pAction, pBatch, pBatch->nElem); + CHKiRet(localRet); + + /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ + for(i = 0 ; i < pBatch->nElem ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ - CHKiRet(localRet); } iRet = finishBatch(pAction); finalize_it: RETiRet; } + + #pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 */ static rsRetVal -actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch) +processBatchMain(action_t *pAction, batch_t *pBatch) { int iCancelStateSave; DEFiRet; @@ -829,7 +937,7 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch) pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, pBatch); + iRet = processAction(pAction, pBatch); pthread_cleanup_pop(1); /* unlock mutex */ |