summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorAndre Lorbach <alorbach@adiscon.com>2011-08-18 09:36:59 +0200
committerAndre Lorbach <alorbach@adiscon.com>2011-08-18 09:36:59 +0200
commit47ed9921b6c9fe49d4aadf244f7df1ce4d05a5b3 (patch)
treea1a9c778194da1ea12426779183f02b8a439acce /action.c
parent33bf53cdab0689bd7f100aca5f09dd6b69da0e94 (diff)
downloadrsyslog-47ed9921b6c9fe49d4aadf244f7df1ce4d05a5b3.tar.gz
rsyslog-47ed9921b6c9fe49d4aadf244f7df1ce4d05a5b3.tar.xz
rsyslog-47ed9921b6c9fe49d4aadf244f7df1ce4d05a5b3.zip
bugfix: fixed incorrect state handling for Discard Action (transactions)
Diffstat (limited to 'action.c')
-rw-r--r--action.c34
1 files changed, 32 insertions, 2 deletions
diff --git a/action.c b/action.c
index 54a05fc3..869185e9 100644
--- a/action.c
+++ b/action.c
@@ -962,6 +962,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
iCommittedUpTo = i;
dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
+dbgprintf("XXXXX: pre nElem %d, state %d\n", pBatch->nElem, pBatch->pElem[i].state);
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
/* NOTE: do NOT extend the filter below! Anything else must be done on the
@@ -979,14 +980,20 @@ dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem,
if(localRet == RS_RET_OK) {
/* mark messages as committed */
while(iCommittedUpTo <= i) {
+dbgprintf("XXXXX: setting nElem %d to 3/COMM\n", iCommittedUpTo);
pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
+ ++iCommittedUpTo;
+ //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
/* mark messages as committed */
while(iCommittedUpTo < i) {
+dbgprintf("XXXXX: setting2 nElem %d to 3/COMM\n", iCommittedUpTo);
pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
+ ++iCommittedUpTo;
+ //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
pBatch->pElem[i].state = BATCH_STATE_SUB;
} else if(localRet == RS_RET_DEFER_COMMIT) {
@@ -1000,6 +1007,7 @@ dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem,
FINALIZE;
}
}
+dbgprintf("XXXXX: post nElem %d, state %d\n", pBatch->nElem, pBatch->pElem[i].state);
++i;
++iElemProcessed;
}
@@ -1011,6 +1019,15 @@ finalize_it:
RETiRet;
}
+/* debug aid */
+static void displayBatchState(batch_t *pBatch)
+{
+ int i;
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
+ dbgprintf("XXXXX: displayBatchState2 %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state);
+ }
+}
+
/* submit a batch for actual action processing.
* The first nElem elements are processed. This function calls itself
@@ -1030,6 +1047,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
assert(pBatch != NULL);
+dbgprintf("XXXX: submitBatch pre:\n");
+displayBatchState(pBatch);
wasDoneTo = pBatch->iDoneUpTo;
bDone = 0;
do {
@@ -1043,16 +1062,23 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
/* try commit transaction, once done, we can simply do so as if
* that return state was returned from tryDoAction().
*/
+dbgprintf("XXXX: calling finishBatch\n");
+displayBatchState(pBatch);
localRet = finishBatch(pAction, pBatch);
}
+dbgprintf("XXXX: 10\n");
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
+dbgprintf("XXXX: bdone = 1\n");
+displayBatchState(pBatch);
bDone = 1;
} else if(localRet == RS_RET_SUSPENDED) {
+dbgprintf("XXXX: 20\n");
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
+dbgprintf("XXXX: 25\n");
/* in this case, everything not yet committed is BAD */
for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
if( pBatch->pElem[i].state != BATCH_STATE_DISC
@@ -1063,6 +1089,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
}
bDone = 1;
} else {
+dbgprintf("XXXX: 30\n");
if(nElem == 1) {
batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD);
bDone = 1;
@@ -1081,6 +1108,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
finalize_it:
+dbgprintf("XXXX: submitBatch post:\n");
+displayBatchState(pBatch);
RETiRet;
}
@@ -1552,6 +1581,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
* this is not necessary, because in that case we enqueue only what actually needs
* to be processed.
*/
+dbgprintf("XXXXX: in doQeuueEnqDiretbatch, %d\n", pAction->bExecWhenPrevSusp);
if(pAction->bExecWhenPrevSusp) {
bNeedSubmit = 0;
bModifiedFilter = 0;