From fc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 11:57:30 +0200 Subject: fixing an issue during DA mode queue shutdown also changed DA queue mode in that the regular workers now run concurrently. --- configure.ac | 2 +- runtime/queue.c | 108 +++++++++++++++++++++----------------------------- runtime/wti.c | 1 + tests/chkseq.c | 3 +- tests/da-mainmsg-q.sh | 8 ++-- tests/diag.sh | 4 +- 6 files changed, 57 insertions(+), 69 deletions(-) diff --git a/configure.ac b/configure.ac index fc8cdc36..d8d17a96 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ(2.61) -rC_INIT([rsyslog],[5.1.1],[rsyslog@lists.adiscon.com]) +AC_INIT([rsyslog],[5.1.1],[rsyslog@lists.adiscon.com]) AM_INIT_AUTOMAKE AC_CONFIG_SRCDIR([ChangeLog]) AC_CONFIG_MACRO_DIR([m4]) diff --git a/runtime/queue.c b/runtime/queue.c index 4405dd39..698495ef 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -72,6 +72,7 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -240,14 +241,15 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } + } + /* regular workers always run */ + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; } else { - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } +dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers); + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } RETiRet; @@ -288,7 +290,7 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -qqueueTurnOffDAMode(qqueue_t *pThis) +TurnOffDAMode(qqueue_t *pThis) { DEFiRet; @@ -299,9 +301,7 @@ RUNLOG_STR("XXX: TurnOffDAMode\n"); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ -RUNLOG; - qqueueWaitDAModeInitialized(pThis); -RUNLOG; + //TODO: MULTI del, can not happen (but verify first) qqueueWaitDAModeInitialized(pThis); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -321,16 +321,13 @@ dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pTh /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ +//XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); - /* now we need to check if the regular queue has some messages. This may be the case - * when it is waiting that the high water mark is reached again. If so, we need to start up - * a regular worker. -- rgerhards, 2008-01-26 - */ - if(getLogicalQueueSize(pThis) > 0) { - qqueueAdviseMaxWorkers(pThis); - } + } else { + /* the queue has data again! */ + dbgprintf("DA queue has data during shutdown, restarting...\n"); + qqueueAdviseMaxWorkers(pThis->pqDA); } RETiRet; @@ -408,6 +405,10 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); + + // experimental: XXX + CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0)); + if(pThis->toQShutdown == 0) { CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { @@ -423,14 +424,6 @@ StartDA(qqueue_t *pThis) if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - /* as we are right now starting DA mode because we are so busy, it is - * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER, - * we want to be on the safe side, and so we awake anyone that is waiting - * on one. So even if the scheduler plays badly with us, things should be - * quite well. -- rgerhards, 2008-01-15 - */ - wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ - pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ @@ -481,8 +474,9 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); @@ -500,8 +494,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * that will also start one up. If we forgot that step, everything would be stalled * until the next enqueue request. */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */ -RUNLOG_VAR("%d", pThis->bRunsDA); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */ finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -516,7 +509,7 @@ finalize_it: * rgerhards, 2008-01-14 */ static inline rsRetVal -qqueueChkStrtDA(qqueue_t *pThis) +ChkStrtDA(qqueue_t *pThis) { DEFiRet; @@ -1538,7 +1531,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); -dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); +//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); @@ -1849,13 +1842,13 @@ finalize_it: * rgerhards, 2009-05-27 */ static rsRetVal -batchProcessedReg(qqueue_t *pThis, wti_t *pWti) +batchProcessed(qqueue_t *pThis, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq); +dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); @@ -1940,7 +1933,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; -RUNLOG; } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1948,15 +1940,17 @@ RUNLOG; && pThis->pqDA->sizeOnDiskMax > 0 && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ -RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); +dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; +RUNLOG_STR("XXX: re-start reg worker"); +qqueueAdviseMaxWorkers(pThis); +RUNLOG_STR("XXX: done re-start reg worker"); } } else { -RUNLOG; - iRet = RS_RET_TERMINATE_NOW; + // experimental iRet = RS_RET_TERMINATE_NOW; + ; } } @@ -1979,11 +1973,11 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ - if(pThis->bEnqOnly || pThis->bRunsDA) { -RUNLOG; + // TODO: DEL - we now keep the workers running! if(pThis->bEnqOnly || pThis->bRunsDA) { + if(pThis->bEnqOnly) { +dbgprintf("XXX: terminate_NOW queue:Reg worker: enqOnly! queue size %d\n", getPhysicalQueueSize(pThis)); iRet = RS_RET_TERMINATE_NOW; } else if(pThis->pqParent != NULL) { -RUNLOG; iRet = RS_RET_TERMINATE_WHEN_IDLE; } @@ -2000,35 +1994,27 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) DEFiRet; assert(pVal != NULL); *pVal = pThis->iDeqBatchSize; +if(pThis->pqParent != NULL) + *pVal = 16; RETiRet; } /* must only be called when the queue mutex is locked, else results - * are not stable! DA queue version + * are not stable! DA worker version (pThis *is* the *main* queue, not DA!) */ static int qqueueIsIdleDA(qqueue_t *pThis) { - /* remember: iQueueSize is the DA queue size, not the main queue! */ - /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk); } /* must only be called when the queue mutex is locked, else results - * are not stable! Regular queue version + * are not stable! Regular worker version. */ static int IsIdleReg(qqueue_t *pThis) { -#if 0 /* enable for performance testing */ - int ret; - ret = getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk); - if(ret) fprintf(stderr, "queue is idle\n"); - return ret; -#else - /* regular code! */ - return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); -#endif + return(getPhysicalQueueSize(pThis) == 0); } @@ -2084,7 +2070,8 @@ RegOnWrkrStartup(qqueue_t *pThis) /* start up the queue - it must have been constructed and parameters defined * before. */ -rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ +rsRetVal +qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; rsRetVal iRetLocal; @@ -2141,7 +2128,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); - CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); @@ -2168,7 +2155,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } } - if(!bInitialized) { + if(Debug && !bInitialized) { dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " "queue itself!)\n"); } @@ -2507,7 +2494,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) - CHKiRet(qqueueChkStrtDA(pThis)); + CHKiRet(ChkStrtDA(pThis)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2611,16 +2598,13 @@ SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); if(pThis->pWtpReg != NULL) wtpWakeupAllWrkr(pThis->pWtpReg); -RUNLOG; if(pThis->pWtpDA != NULL) wtpWakeupAllWrkr(pThis->pWtpDA); -RUNLOG; } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ } } -RUNLOG; pThis->bEnqOnly = bEnqOnly; diff --git a/runtime/wti.c b/runtime/wti.c index 2fb5eea2..9428cd47 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -392,6 +392,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + pThis->batch.nElemDeq = 0; /* re-init dequeue count */ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); diff --git a/tests/chkseq.c b/tests/chkseq.c index 5ffe855c..8c5fc61a 100644 --- a/tests/chkseq.c +++ b/tests/chkseq.c @@ -79,7 +79,8 @@ int main(int argc, char *argv[]) /* read file */ fp = fopen(file, "r"); if(fp == NULL) { - perror(argv[1]); + printf("error opening file '%s'\n", file); + perror(file); exit(1); } diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh index 1c947de2..25e7fd95 100755 --- a/tests/da-mainmsg-q.sh +++ b/tests/da-mainmsg-q.sh @@ -17,16 +17,18 @@ source $srcdir/diag.sh injectmsg 0 50 source $srcdir/diag.sh wait-queueempty # let queue drain for this test case # part 2: send bunch of messages. This should trigger DA mode -source $srcdir/diag.sh injectmsg 50 20000 +#source $srcdir/diag.sh injectmsg 50 20000 +source $srcdir/diag.sh injectmsg 50 2000 ls -l test-spool # for manual review # send another handful -source $srcdir/diag.sh injectmsg 20050 50 +source $srcdir/diag.sh injectmsg 2050 50 #sleep 1 # we need this so that rsyslogd can receive all outstanding messages # clean up and check test result source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages ### currently, we get a stable abort if we use the former kill logic. With shutdown-when-empty, it hangs (but that still tells us there is a bug ;)) ### #kill `cat rsyslog.pid` -source $srcdir/diag.sh seq-check 0 20099 +echo seqchk? +source $srcdir/diag.sh seq-check 2099 source $srcdir/diag.sh exit diff --git a/tests/diag.sh b/tests/diag.sh index 8bf6b129..a34f3ede 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -9,8 +9,8 @@ #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace -export RSYSLOG_DEBUG="debug nostdout printmutexaction" -export RSYSLOG_DEBUGLOG="log" +#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction" +#export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason rm -f core.* vgcore.* # do NOT delete them at exit ;) -- cgit