From 47ed9921b6c9fe49d4aadf244f7df1ce4d05a5b3 Mon Sep 17 00:00:00 2001 From: Andre Lorbach Date: Thu, 18 Aug 2011 09:36:59 +0200 Subject: bugfix: fixed incorrect state handling for Discard Action (transactions) --- action.c | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 54a05fc3..869185e9 100644 --- a/action.c +++ b/action.c @@ -962,6 +962,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iCommittedUpTo = i; dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); while(iElemProcessed <= *pnElem && i < pBatch->nElem) { +dbgprintf("XXXXX: pre nElem %d, state %d\n", pBatch->nElem, pBatch->pElem[i].state); if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); /* NOTE: do NOT extend the filter below! Anything else must be done on the @@ -979,14 +980,20 @@ dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, if(localRet == RS_RET_OK) { /* mark messages as committed */ while(iCommittedUpTo <= i) { +dbgprintf("XXXXX: setting nElem %d to 3/COMM\n", iCommittedUpTo); pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { /* mark messages as committed */ while(iCommittedUpTo < i) { +dbgprintf("XXXXX: setting2 nElem %d to 3/COMM\n", iCommittedUpTo); pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } pBatch->pElem[i].state = BATCH_STATE_SUB; } else if(localRet == RS_RET_DEFER_COMMIT) { @@ -1000,6 +1007,7 @@ dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, FINALIZE; } } +dbgprintf("XXXXX: post nElem %d, state %d\n", pBatch->nElem, pBatch->pElem[i].state); ++i; ++iElemProcessed; } @@ -1011,6 +1019,15 @@ finalize_it: RETiRet; } +/* debug aid */ +static void displayBatchState(batch_t *pBatch) +{ + int i; + for(i = 0 ; i < pBatch->nElem ; ++i) { + dbgprintf("XXXXX: displayBatchState2 %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + } +} + /* submit a batch for actual action processing. * The first nElem elements are processed. This function calls itself @@ -1030,6 +1047,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) assert(pBatch != NULL); +dbgprintf("XXXX: submitBatch pre:\n"); +displayBatchState(pBatch); wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { @@ -1043,16 +1062,23 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* try commit transaction, once done, we can simply do so as if * that return state was returned from tryDoAction(). */ +dbgprintf("XXXX: calling finishBatch\n"); +displayBatchState(pBatch); localRet = finishBatch(pAction, pBatch); } +dbgprintf("XXXX: 10\n"); if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { +dbgprintf("XXXX: bdone = 1\n"); +displayBatchState(pBatch); bDone = 1; } else if(localRet == RS_RET_SUSPENDED) { +dbgprintf("XXXX: 20\n"); ; /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { +dbgprintf("XXXX: 25\n"); /* in this case, everything not yet committed is BAD */ for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { if( pBatch->pElem[i].state != BATCH_STATE_DISC @@ -1063,6 +1089,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) } bDone = 1; } else { +dbgprintf("XXXX: 30\n"); if(nElem == 1) { batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); bDone = 1; @@ -1081,6 +1108,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) ABORT_FINALIZE(RS_RET_FORCE_TERM); finalize_it: +dbgprintf("XXXX: submitBatch post:\n"); +displayBatchState(pBatch); RETiRet; } @@ -1552,6 +1581,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) * this is not necessary, because in that case we enqueue only what actually needs * to be processed. */ +dbgprintf("XXXXX: in doQeuueEnqDiretbatch, %d\n", pAction->bExecWhenPrevSusp); if(pAction->bExecWhenPrevSusp) { bNeedSubmit = 0; bModifiedFilter = 0; -- cgit