diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-19 11:03:09 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-19 11:03:09 +0200 |
commit | a4dad2009992d436ba23c2d0a4a43b483aac40fc (patch) | |
tree | 4971015a6e9d5cb45ae0742d6bd6c788bdc2985f | |
parent | cb0fa751b7e18ffe71aec84bccfeaf04fca7ff13 (diff) | |
download | rsyslog-a4dad2009992d436ba23c2d0a4a43b483aac40fc.tar.gz rsyslog-a4dad2009992d436ba23c2d0a4a43b483aac40fc.tar.xz rsyslog-a4dad2009992d436ba23c2d0a4a43b483aac40fc.zip |
queue size calculation now based on logical/physical dequeue
... needed to split the old single counter into two. I wouldn't bet that
I made some mistakes while doing so, but at least some ad-hoc tests plus
the testbench do no longer indicate errors.
-rw-r--r-- | runtime/queue.c | 114 | ||||
-rw-r--r-- | runtime/queue.h | 3 | ||||
-rwxr-xr-x | tests/da-mainmsg-q.sh | 4 | ||||
-rwxr-xr-x | tests/diskqueue.sh | 1 |
4 files changed, 71 insertions, 51 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 8ef3e7db..9855dac8 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -168,29 +168,37 @@ finalize_it: /* methods */ -/* get the overall queue size. Must only be called +/* get the physical queue size. Must only be called * while mutex is locked! * rgerhards, 2008-01-29 */ static inline int -qqueueGetOverallQueueSize(qqueue_t *pThis) +getPhysicalQueueSize(qqueue_t *pThis) { -#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ -BEGINfunc -dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n", - pThis->iQueueSize, pThis->iQueueSize); -ENDfunc -#endif return pThis->iQueueSize; } +/* get the logical queue size (that is store size minus logically dequeued elements). + * Must only be called while mutex is locked! + * rgerhards, 2009-05-19 + */ +static inline int +getLogicalQueueSize(qqueue_t *pThis) +{ + return pThis->iQueueSize - pThis->nLogDeq; +} + + + /* This function drains the queue in cases where this needs to be done. The most probable * reason is a HUP which needs to discard data (because the queue is configured to be lossy). * During a shutdown, this is typically not needed, as the OS frees up ressources and does * this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21 * This function returns void, as it makes no sense to communicate an error back, even if * it happens. + * This functions works "around" the regular deque mechanism, because it is only used to + * clean up (in cases where message loss is acceptable). */ static inline void queueDrain(qqueue_t *pThis) { @@ -198,14 +206,14 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); +// TODO: ULTRA it may be a good idea to check validitity once again /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); -// TODO: ULTRA - //pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); } + pThis->qDel(pThis); } } @@ -229,14 +237,14 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ - if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { + if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; } else { - iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -295,13 +303,13 @@ qqueueTurnOffDAMode(qqueue_t *pThis) */ /* we need to check if the DA queue is empty because the DA worker may simply have - * terminated do to no new messages arriving. That does not, however, mean that the + * terminated due to no new messages arriving. That does not, however, mean that the * DA queue is empty. If there is still data in that queue, we do nothing and leave * that for a later incarnation of this function (it will be called multiple times * during the lifetime of DA-mode, depending on how often the DA worker receives an * inactivity timeout. -- rgerhards, 2008-01-25 */ - if(pThis->pqDA->iQueueSize == 0) { + if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. @@ -313,7 +321,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis) * when it is waiting that the high water mark is reached again. If so, we need to start up * a regular worker. -- rgerhards, 2008-01-26 */ - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getLogicalQueueSize(pThis) > 0) { qqueueAdviseMaxWorkers(pThis); } } @@ -507,7 +515,7 @@ qqueueChkStrtDA(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); /* if we do not hit the high water mark, we have nothing to do */ - if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) + if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); if(pThis->bRunsDA) { @@ -521,14 +529,14 @@ qqueueChkStrtDA(qqueue_t *pThis) * we need at least one). */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - qqueueGetOverallQueueSize(pThis)); + getPhysicalQueueSize(pThis)); qqueueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - qqueueGetOverallQueueSize(pThis)); + getPhysicalQueueSize(pThis)); qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } @@ -1124,7 +1132,8 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(pThis->iQueueSize); - dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } finalize_it: @@ -1132,10 +1141,10 @@ finalize_it: } -/* generic code to remove a queue entry +/* generic code to dequeue a queue entry */ static rsRetVal -qqueueDel(qqueue_t *pThis, void *pUsr) +qqueueDeq(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1147,10 +1156,10 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ iRet = pThis->qDeq(pThis, pUsr); - ATOMIC_DEC(pThis->iQueueSize); + ATOMIC_INC(pThis->nLogDeq); - dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", - iRet, pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1188,7 +1197,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we * see if we reactivate the worker. @@ -1259,7 +1268,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ @@ -1289,7 +1298,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getPhysicalQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1358,7 +1367,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis)); RETiRet; } @@ -1397,6 +1406,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; + pThis->nLogDeq = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; @@ -1522,11 +1532,16 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); +dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); } + /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ + pThis->iQueueSize -= nElem; + pThis->nLogDeq -= nElem; +dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ RETiRet; @@ -1627,11 +1642,13 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = 0; do { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); - CHKiRet(qqueueDel(pThis, &pUsr)); - iQueueSize = qqueueGetOverallQueueSize(pThis); + CHKiRet(qqueueDeq(pThis, &pUsr)); + iQueueSize = getLogicalQueueSize(pThis); /* check if we should discard this element */ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); + //MULTI-DEQUEUE / ULTRA-RELIABLE: we need to handle this case, we need to advance the + // DEQ pointer (or so...) TODO!!! Idea: get a second nElem int in pBatch, nDequeued. Use that when deleting! if(localRet == RS_RET_QUEUE_FULL) continue; else if(localRet != RS_RET_OK) @@ -1880,7 +1897,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ bStopWrkr = 1; - } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1903,9 +1920,9 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) * the DA queue */ static int -ChkStooWrkrReg(qqueue_t *pThis) +ChkStopWrkrReg(qqueue_t *pThis) { - return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); + return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); } @@ -1929,7 +1946,7 @@ qqueueIsIdleDA(qqueue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version @@ -1939,12 +1956,12 @@ IsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; - ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); + ret = getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk); if(ret) fprintf(stderr, "queue is idle\n"); return ret; #else /* regular code! */ - return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); #endif } @@ -2037,11 +2054,12 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " + dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, " "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, - qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, - pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), + pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, + pThis->iDeqBatchSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -2053,7 +2071,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); @@ -2117,7 +2135,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) ASSERT(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getPhysicalQueueSize(pThis) > 0) { /* This error code is OK, but we will probably not implement this any time * The reason is that persistence happens via DA queues. But I would like to * leave the code as is, as we so have a hook in case we need one. @@ -2128,13 +2146,13 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) { + if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { unlink((char*)pszQIFNam); pThis->bNeedDelQIF = 0; @@ -2342,13 +2360,13 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) ISOBJ_TYPE_assert(pThis, qqueue); /* first check if we need to discard this message (which will cause CHKiRet() to exit) - * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize + * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The queue size * and bRunsDA parameters may not reflect the correct settings here, but they are * "good enough" in the sense that they can be used to drive the decision. Valgrind's * threading tools may point this access to be an error, but this is done * intentional. I do not see this causes problems to us. */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr)); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -2386,12 +2404,12 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 */ if(flowCtlType == eFLOWCTL_FULL_DELAY) { - while(pThis->iQueueSize >= pThis->iFullDlyMrk) { + while(getPhysicalQueueSize(pThis) >= pThis->iFullDlyMrk) { dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n"); pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */ } } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { - if(pThis->iQueueSize >= pThis->iLightDlyMrk) { + if(getPhysicalQueueSize(pThis) >= pThis->iLightDlyMrk) { dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n"); timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */ @@ -2403,7 +2421,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) * is not the case, basic flow control enters the field, which means we wait for * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14 */ - while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) + while( (pThis->iMaxQueueSize > 0 && getPhysicalQueueSize(pThis) >= pThis->iMaxQueueSize) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); diff --git a/runtime/queue.h b/runtime/queue.h index e47b8762..92bf8ae5 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -74,7 +74,8 @@ typedef struct queue_s { int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ - int iQueueSize; /* Current number of elements in the queue */ + int iQueueSize; /* Current number of elements in queue store (some are already logically dequeued!) */ + int nLogDeq; /* number of elements currently logically dequeued */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ int iCurNumWrkThrd;/* current number of active worker threads */ diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh index 2ea6278e..91addf68 100755 --- a/tests/da-mainmsg-q.sh +++ b/tests/da-mainmsg-q.sh @@ -24,7 +24,7 @@ if [ "$?" -ne "0" ]; then cp rsyslog.out.log rsyslog.out.log.save fi ls -l test-spool -sleep 1 # we need this so that rsyslogd can receive all outstanding messages +sleep 2 # we need this so that rsyslogd can receive all outstanding messages # # part 2: send bunch of messages. This should trigger DA mode # @@ -35,7 +35,7 @@ if [ "$?" -ne "0" ]; then cp rsyslog.out.log rsyslog.out.log.save fi ls -l test-spool -sleep 5 # we need this so that rsyslogd can receive all outstanding messages +sleep 8 # we need this so that rsyslogd can receive all outstanding messages # # send another handful # diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh index 5ff9ced0..efa6728b 100755 --- a/tests/diskqueue.sh +++ b/tests/diskqueue.sh @@ -7,6 +7,7 @@ echo testing queue disk-only mode rm -rf test-spool mkdir test-spool +# enable this, if you need debug output: export RSYSLOG_DEBUG="debug" rm -f work rsyslog.out.log rsyslog.out.log.save # work files ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/diskqueue.conf & sleep 1 |