summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-30 11:09:15 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-30 11:09:15 +0200
commita6bda9b93f21cdbec1d7312078535eb092f32cb0 (patch)
treee1b62c000eea2b81e0e67694b2d8129e252f830c
parentfeeb622c4e0c622559df803f8df6da39bf3015e7 (diff)
downloadrsyslog-a6bda9b93f21cdbec1d7312078535eb092f32cb0.tar.gz
rsyslog-a6bda9b93f21cdbec1d7312078535eb092f32cb0.tar.xz
rsyslog-a6bda9b93f21cdbec1d7312078535eb092f32cb0.zip
bugfix: discard action did not work (did not discard messages)
-rw-r--r--ChangeLog1
-rw-r--r--action.c56
-rw-r--r--runtime/queue.c2
-rw-r--r--runtime/rule.c2
-rw-r--r--runtime/ruleset.c11
-rwxr-xr-xtests/discard.sh2
6 files changed, 51 insertions, 23 deletions
diff --git a/ChangeLog b/ChangeLog
index 76fe3ff0..b3b3c82b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,6 @@
---------------------------------------------------------------------------
Version 5.1.4 [DEVEL] (rgerhards), 2009-07-??
+- bugfix: discard action did not work (did not discard messages)
- bugfix: discard action caused segfault
---------------------------------------------------------------------------
Version 5.1.3 [DEVEL] (rgerhards), 2009-07-28
diff --git a/action.c b/action.c
index 8a29df2e..ab3a7866 100644
--- a/action.c
+++ b/action.c
@@ -808,41 +808,50 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
assert(pBatch != NULL);
assert(pnElem != NULL);
+dbgprintf("XXXX: ENTER tryDoAction elt 0 state %d\n", pBatch->pElem[0].state);
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
- dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later!
- localRet = actionProcessMessage(pAction, pMsg);
- dbgprintf("action call returned %d\n", localRet);
- if(localRet == RS_RET_OK) {
- /* mark messages as committed */
- while(iCommittedUpTo < i) {
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- /* mark messages as committed */
- while(iCommittedUpTo < i - 1) {
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p, state %d\n", i, pBatch->nElem, *pnElem, pMsg, pBatch->pElem[i].state);//remove later!
+ if(pBatch->pElem[i].state != BATCH_STATE_DISC) {
+ localRet = actionProcessMessage(pAction, pMsg);
+ dbgprintf("action call returned %d\n", localRet);
+ if(localRet == RS_RET_OK) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i - 1) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else if(localRet == RS_RET_DISCARDMSG) {
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+dbgprintf("XXXX: discardmsg! change state to _DISC: %d\n", pBatch->pElem[i].state);
+ } else {
+ iRet = localRet;
+ FINALIZE;
}
- pBatch->pElem[i].state = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- pBatch->pElem[i].state = BATCH_STATE_SUB;
- } else {
- iRet = localRet;
- FINALIZE;
}
++i;
++iElemProcessed;
}
finalize_it:
- if(pBatch->iDoneUpTo != iCommittedUpTo) {
+ if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) {
+ iRet = RS_RET_DISCARDMSG;
+ } else if(pBatch->iDoneUpTo != iCommittedUpTo) {
*pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
pBatch->iDoneUpTo = iCommittedUpTo;
}
+dbgprintf("XXXX: done tryDoAction elt 0 state %d, iret %d\n", pBatch->pElem[0].state, iRet);
RETiRet;
}
@@ -865,6 +874,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 0;
do {
localRet = tryDoAction(pAction, pBatch, &nElem);
+dbgprintf("XXXX: submitBatch got state %d\n", localRet);
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
@@ -874,10 +884,15 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
localRet = finishBatch(pAction);
}
+dbgprintf("XXXX: submitBatch got state %d\n", localRet);
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
bDone = 1;
+ } else if(localRet == RS_RET_DISCARDMSG) {
+ iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */
+ bDone = 1;
+dbgprintf("XXXX: submitBatch DONE state %d\n", localRet);
} else if(localRet == RS_RET_SUSPENDED) {
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
@@ -897,8 +912,10 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 1;
}
}
+dbgprintf("XXXX: submitBatch pre while state %d\n", localRet);
} while(!bDone); /* do .. while()! */
+dbgprintf("XXXX: END submitBatch elt 0 state %d, iRet %d\n", pBatch->pElem[0].state, iRet);
RETiRet;
}
@@ -1134,6 +1151,7 @@ actionWriteToAction(action_t *pAction)
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+dbgprintf("XXXX: queueEnqObj returned %d\n", iRet);
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
diff --git a/runtime/queue.c b/runtime/queue.c
index cb14b58d..8388d00e 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1044,6 +1044,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
objDestruct(pUsr);
+dbgprintf("XXXX: qAddDirect returns %d\n", iRet);
RETiRet;
}
@@ -2442,6 +2443,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
CHKiRet(qqueueAdd(pThis, pUsr));
finalize_it:
+dbgprintf("XXXX: queueEnqObj returns %d\n", iRet);
RETiRet;
}
diff --git a/runtime/rule.c b/runtime/rule.c
index 182d616a..bcd82e5f 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -83,6 +83,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
}
iRetMod = actionCallAction(pAction, pDoActData->pMsg);
+dbgprintf("XXXX: processMsgDoActions returns %d\n", iRet);
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {
@@ -271,6 +272,7 @@ processMsg(rule_t *pThis, msg_t *pMsg)
}
finalize_it:
+dbgprintf("XXXX: rule.processMsg returns %d\n", iRet);
RETiRet;
}
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index d98b4217..c1b6d490 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -138,8 +138,11 @@ finalize_it:
*/
DEFFUNC_llExecFunc(processMsgDoRules)
{
+ rsRetVal iRet;
ISOBJ_TYPE_assert(pData, rule);
- return rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
+ iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
+dbgprintf("XXXX: pcoessMsgDoRules returns %d\n", iRet);
+ return iRet;
}
@@ -159,8 +162,10 @@ processMsg(msg_t *pMsg)
CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg));
finalize_it:
- if(iRet == RS_RET_DISCARDMSG)
- iRet = RS_RET_OK;
+dbgprintf("XXXX: processMsg got return state %d\n", iRet);
+
+ //if(iRet == RS_RET_DISCARDMSG)
+ //iRet = RS_RET_OK;
RETiRet;
}
diff --git a/tests/discard.sh b/tests/discard.sh
index b230bc11..0fafc7d9 100755
--- a/tests/discard.sh
+++ b/tests/discard.sh
@@ -12,5 +12,5 @@ sleep 4
source $srcdir/diag.sh tcpflood 127.0.0.1 13514 1 10 1
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
source $srcdir/diag.sh wait-shutdown
-source $srcdir/diag.sh seq-check 2 10
+source $srcdir/diag.sh seq-check 10 -s2
source $srcdir/diag.sh exit