summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-19 09:41:45 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-19 09:41:45 +0200
commit90e8475260cf8ac54519b3d964d879489af879f6 (patch)
treecf072344f5a41b2485f6ad320c408d4f5fe0d903
parenta4344f350151cdb9172897709fa08680ec8587ba (diff)
downloadrsyslog-90e8475260cf8ac54519b3d964d879489af879f6.tar.gz
rsyslog-90e8475260cf8ac54519b3d964d879489af879f6.tar.xz
rsyslog-90e8475260cf8ac54519b3d964d879489af879f6.zip
bugfix: message processing states were not set correctly in all cases
however, this had no negative effect, as the message processing state was not evaluated when a batch was deleted, and that was the only case where the state could be wrong.
-rw-r--r--ChangeLog4
-rw-r--r--action.c6
-rw-r--r--runtime/queue.c17
-rw-r--r--runtime/queue.h6
-rw-r--r--runtime/ruleset.c2
-rw-r--r--template.c2
-rwxr-xr-xtests/daqueue-persist-drvr.sh8
-rw-r--r--tools/syslogd.c5
8 files changed, 24 insertions, 26 deletions
diff --git a/ChangeLog b/ChangeLog
index 31b57145..5b470bd9 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,9 @@
---------------------------------------------------------------------------
Version 5.3.2 [DEVEL] (rgerhards), 2009-10-??
+- bugfix: message processing states were not set correctly in all cases
+ however, this had no negative effect, as the message processing state
+ was not evaluated when a batch was deleted, and that was the only case
+ where the state could be wrong.
- simplified and thus speeded up the queue engine, also fixed some
potential race conditions (in very unusual shutdown conditions)
along the way. The threading model has seriously changes, so there may
diff --git a/action.c b/action.c
index 5bd175e5..58658ac1 100644
--- a/action.c
+++ b/action.c
@@ -821,12 +821,12 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
DBGPRINTF("action call returned %d\n", localRet);
if(localRet == RS_RET_OK) {
/* mark messages as committed */
- while(iCommittedUpTo < i) {
+ while(iCommittedUpTo <= i) {
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
/* mark messages as committed */
- while(iCommittedUpTo < i - 1) {
+ while(iCommittedUpTo < i) {
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
pBatch->pElem[i].state = BATCH_STATE_SUB;
@@ -838,6 +838,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
iRet = localRet;
FINALIZE;
}
+dbgprintf("XXX: submitBatch set element %d state to %d\n", i, pBatch->pElem[i].state);
}
++i;
++iElemProcessed;
@@ -871,6 +872,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 0;
do {
+dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem);
localRet = tryDoAction(pAction, pBatch, &nElem);
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
diff --git a/runtime/queue.c b/runtime/queue.c
index dacf1f13..62fb339b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -280,15 +280,7 @@ qqueueChkIsDA(qqueue_t *pThis)
}
-/* Start disk-assisted queue mode. All internal settings are changed. This is supposed
- * to be called from the DA worker, which must have been started before. The most important
- * chore of this function is to create the DA queue object. If that function fails,
- * the DA worker should return with an appropriate state, which in turn should lead to
- * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
- * function is called, else a number of races will happen.
- * Please note that this function may be called *while* we in DA mode. This is due to the
- * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due
- * to inactivity timeouts.
+/* Start disk-assisted queue mode.
* rgerhards, 2008-01-15
*/
static rsRetVal
@@ -354,7 +346,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
+InitDA(qqueue_t *pThis, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -389,7 +381,6 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(StartDA(pThis));
}
- pThis->bEnqOnly = bEnqOnly; // TODO: I think this is not needed, but first clean up shutdown processing!
pThis->bRunsDA = 1;
finalize_it:
@@ -1409,6 +1400,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
+dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
objDestruct(pUsr);
}
@@ -1645,7 +1637,6 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
-dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq);
DeleteProcessedBatch(pThis, &pWti->batch);
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
@@ -1882,7 +1873,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* set up DA system if we have a disk-assisted queue */
if(pThis->bIsDA)
- InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
+ InitDA(pThis, LOCK_MUTEX); /* initiate DA mode */
DBGOPRINT((obj_t*) pThis, "queue finished initialization\n");
diff --git a/runtime/queue.h b/runtime/queue.h
index 74bf2d31..338f091b 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -164,12 +164,6 @@ typedef struct queue_s {
} tVars;
} qqueue_t;
-/* some symbolic constants for easier reference */
-#define QUEUE_MODE_ENQDEQ 0
-#define QUEUE_MODE_ENQONLY 1
-
-#define QUEUE_IDX_DA_WORKER 0 /* index for the DA worker (fixed) */
-#define QUEUE_PTR_DA_WORKER(x) (&((pThis)->pWrkThrds[0]))
/* the define below is an "eternal" timeout for the timeout settings which require a value.
* It is one day, which is not really eternal, but comes close to it if we think about
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 0f4bc46d..d3de672e 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -139,6 +139,7 @@ DEFFUNC_llExecFunc(processMsgDoRules)
rsRetVal iRet;
ISOBJ_TYPE_assert(pData, rule);
iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
+dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet);
return iRet;
}
@@ -159,6 +160,7 @@ processMsg(msg_t *pMsg)
CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg));
finalize_it:
+dbgprintf("ruleset.ProcessMsg() returns %d\n", iRet);
RETiRet;
}
diff --git a/template.c b/template.c
index 1e0c9613..f002ced4 100644
--- a/template.c
+++ b/template.c
@@ -121,7 +121,7 @@ propid = pTpe->data.field.propid;
doSQLEscape(&pVal, &iLenVal, &bMustBeFreed, 0);
}
/* got source, now copy over */
-dbgprintf("copying prop id %3d (entry type %d) of length %d ('%s')\n", propid, pTpe->eEntryType, (int) iLenVal, pVal);
+//dbgprintf("copying prop id %3d (entry type %d) of length %d ('%s')\n", propid, pTpe->eEntryType, (int) iLenVal, pVal);
if(iBuf + iLenVal >= *pLenBuf) /* we reserve one char for the final \0! */
CHKiRet(ExtendBuf(ppBuf, pLenBuf, iBuf + iLenVal + 1));
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index f5937541..d36a6be5 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -12,8 +12,8 @@ source $srcdir/diag.sh init
echo \$MainMsgQueueType $1 > work-queuemode.conf
echo "*.* :omtesting:sleep 0 1000" > work-delay.conf
-#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
-#export RSYSLOG_DEBUGLOG="log0"
+export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+export RSYSLOG_DEBUGLOG="log0"
# inject 10000 msgs, so that DO hit the high watermark
source $srcdir/diag.sh startup queue-persist.conf
@@ -26,8 +26,8 @@ echo "Enter phase 2, rsyslogd restart"
#exit
-#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
-#export RSYSLOG_DEBUGLOG="log"
+export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+export RSYSLOG_DEBUGLOG="log"
#valgrind="valgrind --tool=helgrind --log-fd=1"
# restart engine and have rest processed
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 1c494dea..f8a78343 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -641,6 +641,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
int i;
msg_t *pMsg;
DEFiRet;
+ rsRetVal localRet;
assert(pBatch != NULL);
@@ -650,7 +651,11 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
parseMsg(pMsg);
}
+ localRet =
ruleset.ProcessMsg(pMsg);
+dbgprintf("msgConsumer got iRet %d from ProcessMsg\n", localRet);
+ /* if we reach this point, the message is considered committed (by definition!) */
+ pBatch->pElem[i].state = BATCH_STATE_COMM;
}
RETiRet;