summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-28 14:24:37 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-28 14:24:37 +0200
commit13d4a23e92996e24d6a833ca75d06428c5387aa4 (patch)
tree49d69b2c232ff94c627141fbaf9fdb6c57895f97 /runtime/queue.c
parentfc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40 (diff)
downloadrsyslog-13d4a23e92996e24d6a833ca75d06428c5387aa4.tar.gz
rsyslog-13d4a23e92996e24d6a833ca75d06428c5387aa4.tar.xz
rsyslog-13d4a23e92996e24d6a833ca75d06428c5387aa4.zip
some more fixes for queue engine
The enhanced testbench now runs without failures, again
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c67
1 files changed, 37 insertions, 30 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 698495ef..57385056 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -54,6 +54,7 @@
#include "wtp.h"
#include "wti.h"
#include "atomic.h"
+#include "msg.h" /* TODO: remove one we removed MsgAddRef() call */
#ifdef OS_SOLARIS
# include <sched.h>
@@ -248,7 +249,6 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
} else {
iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
-dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers);
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
@@ -294,7 +294,6 @@ TurnOffDAMode(qqueue_t *pThis)
{
DEFiRet;
-RUNLOG_STR("XXX: TurnOffDAMode\n");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
@@ -720,6 +719,7 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
DEFiRet;
pEntry = pThis->tVars.linklist.pDeqRoot;
+ ISOBJ_TYPE_assert(pEntry->pUsr, msg);
*ppUsr = pEntry->pUsr;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
@@ -1137,7 +1137,7 @@ finalize_it:
/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDeq(qqueue_t *pThis, void *pUsr)
+qqueueDeq(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
@@ -1148,7 +1148,7 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- iRet = pThis->qDeq(pThis, pUsr);
+ iRet = pThis->qDeq(pThis, ppUsr);
ATOMIC_INC(pThis->nLogDeq);
// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
@@ -1162,6 +1162,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
* Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
* After this function returns, the workers must either be finished or some force
* to finish them must be applied.
+ * This function also instructs the DA worker pool (if it exists) to terminate. This is done
+ * in preparation of final queue shutdown.
* rgerhards, 2009-05-27
*/
static rsRetVal
@@ -1175,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */
if(getPhysicalQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
@@ -1184,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
}
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut);
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
if(pThis->bRunsDA) {
@@ -1217,9 +1219,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
}
/* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(pThis->bRunsDA) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
qqueueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
@@ -1232,8 +1232,16 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
} else {
dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n");
}
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
+ * the queue, it is restarted at a later stage. We don't care here if a timeout happens.
+ */
+ dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
+ } else {
+ dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n");
+ }
}
RETiRet;
@@ -1335,6 +1343,14 @@ cancelWorkers(qqueue_t *pThis)
dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
+
+ /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
+ * restarted later to persist the queue. But we stop it, because otherwise we get into
+ * big trouble when resetting the logical dequeue pointer. This operation can only be
+ * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
+ */
+ dbgoprint((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
+ iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
RETiRet;
@@ -1600,23 +1616,15 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
-// TODO: ULTRA: lock qaueue mutex if instructed to do so
- /* if the queue runs in DA mode, the DA worker already deleted the in-memory representation
- * of the message. But in regular mode, we need to do it ourselfs. We differentiate between
- * the two cases, because it is actually the easiest way to handle the destruct-Problem in
- * a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function).
- */
- if(!pThis->bRunsDA) {
- for(i = 0 ; i < pBatch->nElem ; ++i) {
- pUsr = pBatch->pElem[i].pUsrp;
- objDestruct(pUsr);
- }
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
+ pUsr = pBatch->pElem[i].pUsrp;
+ objDestruct(pUsr);
}
iRet = DeleteBatchFromQStore(pThis, pBatch);
- pBatch->nElem = 0; /* reset batch */
+ pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
RETiRet;
}
@@ -1908,8 +1916,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->batch.nElem ; i++)
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
+ for(i = 0 ; i < pWti->batch.nElem ; i++) {
+ /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
+ * the message. So far, we simply assume we always have msg_t, what currently is always the case.
+ * rgerhards, 2009-05-28
+ */
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ }
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -2306,16 +2319,12 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
if(pThis->bRunsDA != 2) {
InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
-//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war)
-RUNLOG_VAR("%d", pThis->bRunsDA);
-RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
}
/* make sure we do not timeout before we are done */
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
-RUNLOG;
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -2333,7 +2342,6 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C
CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
-RUNLOG_STR("XXX: queue destruct\n");
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
@@ -2347,7 +2355,6 @@ RUNLOG_STR("XXX: queue destruct\n");
* we need to reset the logical dequeue pointer, persist the queue if configured to do
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
-//!!!! //CHKiRet(pThis->qUnDeqAll(pThis));
dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
CHKiRet(pThis->qUnDeqAll(pThis));
dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));