diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-30 11:09:15 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-30 11:09:15 +0200 |
commit | a6bda9b93f21cdbec1d7312078535eb092f32cb0 (patch) | |
tree | e1b62c000eea2b81e0e67694b2d8129e252f830c | |
parent | feeb622c4e0c622559df803f8df6da39bf3015e7 (diff) | |
download | rsyslog-a6bda9b93f21cdbec1d7312078535eb092f32cb0.tar.gz rsyslog-a6bda9b93f21cdbec1d7312078535eb092f32cb0.tar.xz rsyslog-a6bda9b93f21cdbec1d7312078535eb092f32cb0.zip |
bugfix: discard action did not work (did not discard messages)
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | action.c | 56 | ||||
-rw-r--r-- | runtime/queue.c | 2 | ||||
-rw-r--r-- | runtime/rule.c | 2 | ||||
-rw-r--r-- | runtime/ruleset.c | 11 | ||||
-rwxr-xr-x | tests/discard.sh | 2 |
6 files changed, 51 insertions, 23 deletions
@@ -1,5 +1,6 @@ --------------------------------------------------------------------------- Version 5.1.4 [DEVEL] (rgerhards), 2009-07-?? +- bugfix: discard action did not work (did not discard messages) - bugfix: discard action caused segfault --------------------------------------------------------------------------- Version 5.1.3 [DEVEL] (rgerhards), 2009-07-28 @@ -808,41 +808,50 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) assert(pBatch != NULL); assert(pnElem != NULL); +dbgprintf("XXXX: ENTER tryDoAction elt 0 state %d\n", pBatch->pElem[0].state); i = pBatch->iDoneUpTo; /* all messages below that index are processed */ iElemProcessed = 0; iCommittedUpTo = i; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later! - localRet = actionProcessMessage(pAction, pMsg); - dbgprintf("action call returned %d\n", localRet); - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i - 1) { - pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p, state %d\n", i, pBatch->nElem, *pnElem, pMsg, pBatch->pElem[i].state);//remove later! + if(pBatch->pElem[i].state != BATCH_STATE_DISC) { + localRet = actionProcessMessage(pAction, pMsg); + dbgprintf("action call returned %d\n", localRet); + if(localRet == RS_RET_OK) { + /* mark messages as committed */ + while(iCommittedUpTo < i) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + /* mark messages as committed */ + while(iCommittedUpTo < i - 1) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_DISCARDMSG) { + pBatch->pElem[i].state = BATCH_STATE_DISC; +dbgprintf("XXXX: discardmsg! change state to _DISC: %d\n", pBatch->pElem[i].state); + } else { + iRet = localRet; + FINALIZE; } - pBatch->pElem[i].state = BATCH_STATE_SUB; - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - pBatch->pElem[i].state = BATCH_STATE_SUB; - } else { - iRet = localRet; - FINALIZE; } ++i; ++iElemProcessed; } finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { + if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) { + iRet = RS_RET_DISCARDMSG; + } else if(pBatch->iDoneUpTo != iCommittedUpTo) { *pnElem += iCommittedUpTo - pBatch->iDoneUpTo; pBatch->iDoneUpTo = iCommittedUpTo; } +dbgprintf("XXXX: done tryDoAction elt 0 state %d, iret %d\n", pBatch->pElem[0].state, iRet); RETiRet; } @@ -865,6 +874,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 0; do { localRet = tryDoAction(pAction, pBatch, &nElem); +dbgprintf("XXXX: submitBatch got state %d\n", localRet); if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { @@ -874,10 +884,15 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) localRet = finishBatch(pAction); } +dbgprintf("XXXX: submitBatch got state %d\n", localRet); if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { bDone = 1; + } else if(localRet == RS_RET_DISCARDMSG) { + iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */ + bDone = 1; +dbgprintf("XXXX: submitBatch DONE state %d\n", localRet); } else if(localRet == RS_RET_SUSPENDED) { ; /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { @@ -897,8 +912,10 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 1; } } +dbgprintf("XXXX: submitBatch pre while state %d\n", localRet); } while(!bDone); /* do .. while()! */ +dbgprintf("XXXX: END submitBatch elt 0 state %d, iRet %d\n", pBatch->pElem[0].state, iRet); RETiRet; } @@ -1134,6 +1151,7 @@ actionWriteToAction(action_t *pAction) * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); +dbgprintf("XXXX: queueEnqObj returned %d\n", iRet); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ diff --git a/runtime/queue.c b/runtime/queue.c index cb14b58d..8388d00e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1044,6 +1044,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); objDestruct(pUsr); +dbgprintf("XXXX: qAddDirect returns %d\n", iRet); RETiRet; } @@ -2442,6 +2443,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) CHKiRet(qqueueAdd(pThis, pUsr)); finalize_it: +dbgprintf("XXXX: queueEnqObj returns %d\n", iRet); RETiRet; } diff --git a/runtime/rule.c b/runtime/rule.c index 182d616a..bcd82e5f 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -83,6 +83,7 @@ DEFFUNC_llExecFunc(processMsgDoActions) } iRetMod = actionCallAction(pAction, pDoActData->pMsg); +dbgprintf("XXXX: processMsgDoActions returns %d\n", iRet); if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { @@ -271,6 +272,7 @@ processMsg(rule_t *pThis, msg_t *pMsg) } finalize_it: +dbgprintf("XXXX: rule.processMsg returns %d\n", iRet); RETiRet; } diff --git a/runtime/ruleset.c b/runtime/ruleset.c index d98b4217..c1b6d490 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -138,8 +138,11 @@ finalize_it: */ DEFFUNC_llExecFunc(processMsgDoRules) { + rsRetVal iRet; ISOBJ_TYPE_assert(pData, rule); - return rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam); + iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam); +dbgprintf("XXXX: pcoessMsgDoRules returns %d\n", iRet); + return iRet; } @@ -159,8 +162,10 @@ processMsg(msg_t *pMsg) CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg)); finalize_it: - if(iRet == RS_RET_DISCARDMSG) - iRet = RS_RET_OK; +dbgprintf("XXXX: processMsg got return state %d\n", iRet); + + //if(iRet == RS_RET_DISCARDMSG) + //iRet = RS_RET_OK; RETiRet; } diff --git a/tests/discard.sh b/tests/discard.sh index b230bc11..0fafc7d9 100755 --- a/tests/discard.sh +++ b/tests/discard.sh @@ -12,5 +12,5 @@ sleep 4 source $srcdir/diag.sh tcpflood 127.0.0.1 13514 1 10 1 source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages source $srcdir/diag.sh wait-shutdown -source $srcdir/diag.sh seq-check 2 10 +source $srcdir/diag.sh seq-check 10 -s2 source $srcdir/diag.sh exit |