summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-28 11:57:30 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-28 11:57:30 +0200
commitfc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40 (patch)
tree6e506d1a66615ffeb95b9960c54352ba2e6175a2
parent7405a3057003bab0361a111b0f9d013b881b6db0 (diff)
downloadrsyslog-fc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40.tar.gz
rsyslog-fc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40.tar.xz
rsyslog-fc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40.zip
fixing an issue during DA mode queue shutdown
also changed DA queue mode in that the regular workers now run concurrently.
-rw-r--r--configure.ac2
-rw-r--r--runtime/queue.c108
-rw-r--r--runtime/wti.c1
-rw-r--r--tests/chkseq.c3
-rwxr-xr-xtests/da-mainmsg-q.sh8
-rwxr-xr-xtests/diag.sh4
6 files changed, 57 insertions, 69 deletions
diff --git a/configure.ac b/configure.ac
index fc8cdc36..d8d17a96 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-rC_INIT([rsyslog],[5.1.1],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[5.1.1],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
AC_CONFIG_SRCDIR([ChangeLog])
AC_CONFIG_MACRO_DIR([m4])
diff --git a/runtime/queue.c b/runtime/queue.c
index 4405dd39..698495ef 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -72,6 +72,7 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -240,14 +241,15 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
+ }
+ /* regular workers always run */
+ if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
} else {
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
- }
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
+dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers);
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
RETiRet;
@@ -288,7 +290,7 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-qqueueTurnOffDAMode(qqueue_t *pThis)
+TurnOffDAMode(qqueue_t *pThis)
{
DEFiRet;
@@ -299,9 +301,7 @@ RUNLOG_STR("XXX: TurnOffDAMode\n");
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
-RUNLOG;
- qqueueWaitDAModeInitialized(pThis);
-RUNLOG;
+ //TODO: MULTI del, can not happen (but verify first) qqueueWaitDAModeInitialized(pThis);
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
@@ -321,16 +321,13 @@ dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pTh
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
- qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+//XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
- /* now we need to check if the regular queue has some messages. This may be the case
- * when it is waiting that the high water mark is reached again. If so, we need to start up
- * a regular worker. -- rgerhards, 2008-01-26
- */
- if(getLogicalQueueSize(pThis) > 0) {
- qqueueAdviseMaxWorkers(pThis);
- }
+ } else {
+ /* the queue has data again! */
+ dbgprintf("DA queue has data during shutdown, restarting...\n");
+ qqueueAdviseMaxWorkers(pThis->pqDA);
}
RETiRet;
@@ -408,6 +405,10 @@ StartDA(qqueue_t *pThis)
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
+
+ // experimental: XXX
+ CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0));
+
if(pThis->toQShutdown == 0) {
CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
} else {
@@ -423,14 +424,6 @@ StartDA(qqueue_t *pThis)
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- /* as we are right now starting DA mode because we are so busy, it is
- * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER,
- * we want to be on the safe side, and so we awake anyone that is waiting
- * on one. So even if the scheduler plays badly with us, things should be
- * quite well. -- rgerhards, 2008-01-15
- */
- wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */
-
pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */
pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
@@ -481,8 +474,9 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
@@ -500,8 +494,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
* that will also start one up. If we forgot that step, everything would be stalled
* until the next enqueue request.
*/
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */
-RUNLOG_VAR("%d", pThis->bRunsDA);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -516,7 +509,7 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-qqueueChkStrtDA(qqueue_t *pThis)
+ChkStrtDA(qqueue_t *pThis)
{
DEFiRet;
@@ -1538,7 +1531,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
ISOBJ_TYPE_assert(pThis, qqueue);
-dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem);
+//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem);
/* now send delete request to storage driver */
for(i = 0 ; i < nElem ; ++i) {
pThis->qDel(pThis);
@@ -1849,13 +1842,13 @@ finalize_it:
* rgerhards, 2009-05-27
*/
static rsRetVal
-batchProcessedReg(qqueue_t *pThis, wti_t *pWti)
+batchProcessed(qqueue_t *pThis, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
-dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq);
+dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq);
DeleteProcessedBatch(pThis, &pWti->batch);
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
@@ -1940,7 +1933,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
-RUNLOG;
} else {
if(pThis->bRunsDA) {
ASSERT(pThis->pqDA != NULL);
@@ -1948,15 +1940,17 @@ RUNLOG;
&& pThis->pqDA->sizeOnDiskMax > 0
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
-RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
-dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
+dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
iRet = RS_RET_TERMINATE_NOW;
+RUNLOG_STR("XXX: re-start reg worker");
+qqueueAdviseMaxWorkers(pThis);
+RUNLOG_STR("XXX: done re-start reg worker");
}
} else {
-RUNLOG;
- iRet = RS_RET_TERMINATE_NOW;
+ // experimental iRet = RS_RET_TERMINATE_NOW;
+ ;
}
}
@@ -1979,11 +1973,11 @@ ChkStopWrkrReg(qqueue_t *pThis)
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
* TODO: remove when verified! -- rgerhards, 2009-05-26
*/
- if(pThis->bEnqOnly || pThis->bRunsDA) {
-RUNLOG;
+ // TODO: DEL - we now keep the workers running! if(pThis->bEnqOnly || pThis->bRunsDA) {
+ if(pThis->bEnqOnly) {
+dbgprintf("XXX: terminate_NOW queue:Reg worker: enqOnly! queue size %d\n", getPhysicalQueueSize(pThis));
iRet = RS_RET_TERMINATE_NOW;
} else if(pThis->pqParent != NULL) {
-RUNLOG;
iRet = RS_RET_TERMINATE_WHEN_IDLE;
}
@@ -2000,35 +1994,27 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
DEFiRet;
assert(pVal != NULL);
*pVal = pThis->iDeqBatchSize;
+if(pThis->pqParent != NULL)
+ *pVal = 16;
RETiRet;
}
/* must only be called when the queue mutex is locked, else results
- * are not stable! DA queue version
+ * are not stable! DA worker version (pThis *is* the *main* queue, not DA!)
*/
static int
qqueueIsIdleDA(qqueue_t *pThis)
{
- /* remember: iQueueSize is the DA queue size, not the main queue! */
- /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk);
}
/* must only be called when the queue mutex is locked, else results
- * are not stable! Regular queue version
+ * are not stable! Regular worker version.
*/
static int
IsIdleReg(qqueue_t *pThis)
{
-#if 0 /* enable for performance testing */
- int ret;
- ret = getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk);
- if(ret) fprintf(stderr, "queue is idle\n");
- return ret;
-#else
- /* regular code! */
- return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
-#endif
+ return(getPhysicalQueueSize(pThis) == 0);
}
@@ -2084,7 +2070,8 @@ RegOnWrkrStartup(qqueue_t *pThis)
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
-rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
+rsRetVal
+qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
rsRetVal iRetLocal;
@@ -2141,7 +2128,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
- CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
@@ -2168,7 +2155,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
}
- if(!bInitialized) {
+ if(Debug && !bInitialized) {
dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
"queue itself!)\n");
}
@@ -2507,7 +2494,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
- CHKiRet(qqueueChkStrtDA(pThis));
+ CHKiRet(ChkStrtDA(pThis));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2611,16 +2598,13 @@ SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
if(pThis->pWtpReg != NULL)
wtpWakeupAllWrkr(pThis->pWtpReg);
-RUNLOG;
if(pThis->pWtpDA != NULL)
wtpWakeupAllWrkr(pThis->pWtpDA);
-RUNLOG;
} else {
/* switch back to regular mode */
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
}
}
-RUNLOG;
pThis->bEnqOnly = bEnqOnly;
diff --git a/runtime/wti.c b/runtime/wti.c
index 2fb5eea2..9428cd47 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -392,6 +392,7 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+ pThis->batch.nElemDeq = 0; /* re-init dequeue count */
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
diff --git a/tests/chkseq.c b/tests/chkseq.c
index 5ffe855c..8c5fc61a 100644
--- a/tests/chkseq.c
+++ b/tests/chkseq.c
@@ -79,7 +79,8 @@ int main(int argc, char *argv[])
/* read file */
fp = fopen(file, "r");
if(fp == NULL) {
- perror(argv[1]);
+ printf("error opening file '%s'\n", file);
+ perror(file);
exit(1);
}
diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh
index 1c947de2..25e7fd95 100755
--- a/tests/da-mainmsg-q.sh
+++ b/tests/da-mainmsg-q.sh
@@ -17,16 +17,18 @@ source $srcdir/diag.sh injectmsg 0 50
source $srcdir/diag.sh wait-queueempty # let queue drain for this test case
# part 2: send bunch of messages. This should trigger DA mode
-source $srcdir/diag.sh injectmsg 50 20000
+#source $srcdir/diag.sh injectmsg 50 20000
+source $srcdir/diag.sh injectmsg 50 2000
ls -l test-spool # for manual review
# send another handful
-source $srcdir/diag.sh injectmsg 20050 50
+source $srcdir/diag.sh injectmsg 2050 50
#sleep 1 # we need this so that rsyslogd can receive all outstanding messages
# clean up and check test result
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
### currently, we get a stable abort if we use the former kill logic. With shutdown-when-empty, it hangs (but that still tells us there is a bug ;)) ###
#kill `cat rsyslog.pid`
-source $srcdir/diag.sh seq-check 0 20099
+echo seqchk?
+source $srcdir/diag.sh seq-check 2099
source $srcdir/diag.sh exit
diff --git a/tests/diag.sh b/tests/diag.sh
index 8bf6b129..a34f3ede 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -9,8 +9,8 @@
#valgrind="valgrind --tool=drd --log-fd=1"
#valgrind="valgrind --tool=helgrind --log-fd=1"
#set -o xtrace
-export RSYSLOG_DEBUG="debug nostdout printmutexaction"
-export RSYSLOG_DEBUGLOG="log"
+#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+#export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
rm -f core.* vgcore.* # do NOT delete them at exit ;)