summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-19 11:03:09 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-19 11:03:09 +0200
commita4dad2009992d436ba23c2d0a4a43b483aac40fc (patch)
tree4971015a6e9d5cb45ae0742d6bd6c788bdc2985f
parentcb0fa751b7e18ffe71aec84bccfeaf04fca7ff13 (diff)
downloadrsyslog-a4dad2009992d436ba23c2d0a4a43b483aac40fc.tar.gz
rsyslog-a4dad2009992d436ba23c2d0a4a43b483aac40fc.tar.xz
rsyslog-a4dad2009992d436ba23c2d0a4a43b483aac40fc.zip
queue size calculation now based on logical/physical dequeue
... needed to split the old single counter into two. I wouldn't bet that I made some mistakes while doing so, but at least some ad-hoc tests plus the testbench do no longer indicate errors.
-rw-r--r--runtime/queue.c114
-rw-r--r--runtime/queue.h3
-rwxr-xr-xtests/da-mainmsg-q.sh4
-rwxr-xr-xtests/diskqueue.sh1
4 files changed, 71 insertions, 51 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 8ef3e7db..9855dac8 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -168,29 +168,37 @@ finalize_it:
/* methods */
-/* get the overall queue size. Must only be called
+/* get the physical queue size. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
*/
static inline int
-qqueueGetOverallQueueSize(qqueue_t *pThis)
+getPhysicalQueueSize(qqueue_t *pThis)
{
-#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
-BEGINfunc
-dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n",
- pThis->iQueueSize, pThis->iQueueSize);
-ENDfunc
-#endif
return pThis->iQueueSize;
}
+/* get the logical queue size (that is store size minus logically dequeued elements).
+ * Must only be called while mutex is locked!
+ * rgerhards, 2009-05-19
+ */
+static inline int
+getLogicalQueueSize(qqueue_t *pThis)
+{
+ return pThis->iQueueSize - pThis->nLogDeq;
+}
+
+
+
/* This function drains the queue in cases where this needs to be done. The most probable
* reason is a HUP which needs to discard data (because the queue is configured to be lossy).
* During a shutdown, this is typically not needed, as the OS frees up ressources and does
* this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21
* This function returns void, as it makes no sense to communicate an error back, even if
* it happens.
+ * This functions works "around" the regular deque mechanism, because it is only used to
+ * clean up (in cases where message loss is acceptable).
*/
static inline void queueDrain(qqueue_t *pThis)
{
@@ -198,14 +206,14 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
+// TODO: ULTRA it may be a good idea to check validitity once again
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
pThis->qDeq(pThis, &pUsr);
-// TODO: ULTRA
- //pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
+ pThis->qDel(pThis);
}
}
@@ -229,14 +237,14 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
- if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
+ if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
- iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
@@ -295,13 +303,13 @@ qqueueTurnOffDAMode(qqueue_t *pThis)
*/
/* we need to check if the DA queue is empty because the DA worker may simply have
- * terminated do to no new messages arriving. That does not, however, mean that the
+ * terminated due to no new messages arriving. That does not, however, mean that the
* DA queue is empty. If there is still data in that queue, we do nothing and leave
* that for a later incarnation of this function (it will be called multiple times
* during the lifetime of DA-mode, depending on how often the DA worker receives an
* inactivity timeout. -- rgerhards, 2008-01-25
*/
- if(pThis->pqDA->iQueueSize == 0) {
+ if(getLogicalQueueSize(pThis->pqDA) == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
@@ -313,7 +321,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis)
* when it is waiting that the high water mark is reached again. If so, we need to start up
* a regular worker. -- rgerhards, 2008-01-26
*/
- if(qqueueGetOverallQueueSize(pThis) > 0) {
+ if(getLogicalQueueSize(pThis) > 0) {
qqueueAdviseMaxWorkers(pThis);
}
}
@@ -507,7 +515,7 @@ qqueueChkStrtDA(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
/* if we do not hit the high water mark, we have nothing to do */
- if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
+ if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
if(pThis->bRunsDA) {
@@ -521,14 +529,14 @@ qqueueChkStrtDA(qqueue_t *pThis)
* we need at least one).
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- qqueueGetOverallQueueSize(pThis));
+ getPhysicalQueueSize(pThis));
qqueueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- qqueueGetOverallQueueSize(pThis));
+ getPhysicalQueueSize(pThis));
qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
@@ -1124,7 +1132,8 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(pThis->iQueueSize);
- dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
finalize_it:
@@ -1132,10 +1141,10 @@ finalize_it:
}
-/* generic code to remove a queue entry
+/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDel(qqueue_t *pThis, void *pUsr)
+qqueueDeq(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
@@ -1147,10 +1156,10 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
iRet = pThis->qDeq(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
+ ATOMIC_INC(pThis->nLogDeq);
- dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
- iRet, pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1188,7 +1197,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(qqueueGetOverallQueueSize(pThis) > 0) {
+ if(getPhysicalQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
* see if we reactivate the worker.
@@ -1259,7 +1268,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
@@ -1289,7 +1298,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(qqueueGetOverallQueueSize(pThis) > 0) {
+ if(getPhysicalQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1358,7 +1367,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1397,6 +1406,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
+ pThis->nLogDeq = 0;
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
@@ -1522,11 +1532,16 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
ISOBJ_TYPE_assert(pThis, qqueue);
+dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem);
/* now send delete request to storage driver */
for(i = 0 ; i < nElem ; ++i) {
pThis->qDel(pThis);
}
+ /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
+ pThis->iQueueSize -= nElem;
+ pThis->nLogDeq -= nElem;
+dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
RETiRet;
@@ -1627,11 +1642,13 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
nDequeued = 0;
do {
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
- CHKiRet(qqueueDel(pThis, &pUsr));
- iQueueSize = qqueueGetOverallQueueSize(pThis);
+ CHKiRet(qqueueDeq(pThis, &pUsr));
+ iQueueSize = getLogicalQueueSize(pThis);
/* check if we should discard this element */
localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
+ //MULTI-DEQUEUE / ULTRA-RELIABLE: we need to handle this case, we need to advance the
+ // DEQ pointer (or so...) TODO!!! Idea: get a second nElem int in pBatch, nDequeued. Use that when deleting!
if(localRet == RS_RET_QUEUE_FULL)
continue;
else if(localRet != RS_RET_OK)
@@ -1880,7 +1897,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
bStopWrkr = 1;
- } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
bStopWrkr = 0;
@@ -1903,9 +1920,9 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* the DA queue
*/
static int
-ChkStooWrkrReg(qqueue_t *pThis)
+ChkStopWrkrReg(qqueue_t *pThis)
{
- return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
+ return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
}
@@ -1929,7 +1946,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1939,12 +1956,12 @@ IsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
- ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
+ ret = getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk);
if(ret) fprintf(stderr, "queue is idle\n");
return ret;
#else
/* regular code! */
- return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
#endif
}
@@ -2037,11 +2054,12 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
+ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
"full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
- qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
- pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize);
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
+ pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
+ pThis->iDeqBatchSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -2053,7 +2071,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
@@ -2117,7 +2135,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
ASSERT(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(qqueueGetOverallQueueSize(pThis) > 0) {
+ if(getPhysicalQueueSize(pThis) > 0) {
/* This error code is OK, but we will probably not implement this any time
* The reason is that persistence happens via DA queues. But I would like to
* leave the code as is, as we so have a hook in case we need one.
@@ -2128,13 +2146,13 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis));
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
- if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) {
+ if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
@@ -2342,13 +2360,13 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
ISOBJ_TYPE_assert(pThis, qqueue);
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
- * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
+ * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The queue size
* and bRunsDA parameters may not reflect the correct settings here, but they are
* "good enough" in the sense that they can be used to drive the decision. Valgrind's
* threading tools may point this access to be an error, but this is done
* intentional. I do not see this causes problems to us.
*/
- CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr));
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
@@ -2386,12 +2404,12 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
* It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
*/
if(flowCtlType == eFLOWCTL_FULL_DELAY) {
- while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
+ while(getPhysicalQueueSize(pThis) >= pThis->iFullDlyMrk) {
dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
}
} else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
- if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
+ if(getPhysicalQueueSize(pThis) >= pThis->iLightDlyMrk) {
dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
@@ -2403,7 +2421,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
* is not the case, basic flow control enters the field, which means we wait for
* the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
*/
- while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
+ while( (pThis->iMaxQueueSize > 0 && getPhysicalQueueSize(pThis) >= pThis->iMaxQueueSize)
|| (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
diff --git a/runtime/queue.h b/runtime/queue.h
index e47b8762..92bf8ae5 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -74,7 +74,8 @@ typedef struct queue_s {
int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
- int iQueueSize; /* Current number of elements in the queue */
+ int iQueueSize; /* Current number of elements in queue store (some are already logically dequeued!) */
+ int nLogDeq; /* number of elements currently logically dequeued */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
int iCurNumWrkThrd;/* current number of active worker threads */
diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh
index 2ea6278e..91addf68 100755
--- a/tests/da-mainmsg-q.sh
+++ b/tests/da-mainmsg-q.sh
@@ -24,7 +24,7 @@ if [ "$?" -ne "0" ]; then
cp rsyslog.out.log rsyslog.out.log.save
fi
ls -l test-spool
-sleep 1 # we need this so that rsyslogd can receive all outstanding messages
+sleep 2 # we need this so that rsyslogd can receive all outstanding messages
#
# part 2: send bunch of messages. This should trigger DA mode
#
@@ -35,7 +35,7 @@ if [ "$?" -ne "0" ]; then
cp rsyslog.out.log rsyslog.out.log.save
fi
ls -l test-spool
-sleep 5 # we need this so that rsyslogd can receive all outstanding messages
+sleep 8 # we need this so that rsyslogd can receive all outstanding messages
#
# send another handful
#
diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh
index 5ff9ced0..efa6728b 100755
--- a/tests/diskqueue.sh
+++ b/tests/diskqueue.sh
@@ -7,6 +7,7 @@
echo testing queue disk-only mode
rm -rf test-spool
mkdir test-spool
+# enable this, if you need debug output: export RSYSLOG_DEBUG="debug"
rm -f work rsyslog.out.log rsyslog.out.log.save # work files
../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/diskqueue.conf &
sleep 1