diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-28 14:24:37 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-28 14:24:37 +0200 |
commit | 13d4a23e92996e24d6a833ca75d06428c5387aa4 (patch) | |
tree | 49d69b2c232ff94c627141fbaf9fdb6c57895f97 | |
parent | fc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40 (diff) | |
download | rsyslog-13d4a23e92996e24d6a833ca75d06428c5387aa4.tar.gz rsyslog-13d4a23e92996e24d6a833ca75d06428c5387aa4.tar.xz rsyslog-13d4a23e92996e24d6a833ca75d06428c5387aa4.zip |
some more fixes for queue engine
The enhanced testbench now runs without failures, again
-rw-r--r-- | runtime/datetime.c | 2 | ||||
-rw-r--r-- | runtime/queue.c | 67 | ||||
-rw-r--r-- | runtime/wti.c | 2 | ||||
-rw-r--r-- | runtime/wtp.c | 6 | ||||
-rw-r--r-- | tests/DiagTalker.java | 1 | ||||
-rwxr-xr-x | tests/da-mainmsg-q.sh | 3 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 1 | ||||
-rwxr-xr-x | tests/diag.sh | 2 |
8 files changed, 42 insertions, 42 deletions
diff --git a/runtime/datetime.c b/runtime/datetime.c index 19e61a0a..2a0df91a 100644 --- a/runtime/datetime.c +++ b/runtime/datetime.c @@ -121,6 +121,8 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds) t->OffsetMode = '+'; t->OffsetHour = lBias / 3600; t->OffsetMinute = lBias % 3600; + + t->timeType = 0; /* this is new and may cause format errors -- rgerhards, 2009-05-28 */ } diff --git a/runtime/queue.c b/runtime/queue.c index 698495ef..57385056 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -54,6 +54,7 @@ #include "wtp.h" #include "wti.h" #include "atomic.h" +#include "msg.h" /* TODO: remove one we removed MsgAddRef() call */ #ifdef OS_SOLARIS # include <sched.h> @@ -248,7 +249,6 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) } else { iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } -dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers); wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -294,7 +294,6 @@ TurnOffDAMode(qqueue_t *pThis) { DEFiRet; -RUNLOG_STR("XXX: TurnOffDAMode\n"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); @@ -720,6 +719,7 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; + ISOBJ_TYPE_assert(pEntry->pUsr, msg); *ppUsr = pEntry->pUsr; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; @@ -1137,7 +1137,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, void *pUsr) +qqueueDeq(qqueue_t *pThis, void **ppUsr) { DEFiRet; @@ -1148,7 +1148,7 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDeq(pThis, pUsr); + iRet = pThis->qDeq(pThis, ppUsr); ATOMIC_INC(pThis->nLogDeq); // dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", @@ -1162,6 +1162,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) * Both the regular and DA queue (if it exists) is waited for, but on the same timeout. * After this function returns, the workers must either be finished or some force * to finish them must be applied. + * This function also instructs the DA worker pool (if it exists) to terminate. This is done + * in preparation of final queue shutdown. * rgerhards, 2009-05-27 */ static rsRetVal @@ -1175,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we @@ -1184,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) { @@ -1217,9 +1219,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(pThis->bRunsDA) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured @@ -1232,8 +1232,16 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } else { dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n"); } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting + * the queue, it is restarted at a later stage. We don't care here if a timeout happens. + */ + dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); + } else { + dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + } } RETiRet; @@ -1335,6 +1343,14 @@ cancelWorkers(qqueue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } + + /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be + * restarted later to persist the queue. But we stop it, because otherwise we get into + * big trouble when resetting the logical dequeue pointer. This operation can only be + * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + */ + dbgoprint((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } RETiRet; @@ -1600,23 +1616,15 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); -// TODO: ULTRA: lock qaueue mutex if instructed to do so - /* if the queue runs in DA mode, the DA worker already deleted the in-memory representation - * of the message. But in regular mode, we need to do it ourselfs. We differentiate between - * the two cases, because it is actually the easiest way to handle the destruct-Problem in - * a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). - */ - if(!pThis->bRunsDA) { - for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pUsrp; - objDestruct(pUsr); - } + for(i = 0 ; i < pBatch->nElem ; ++i) { + pUsr = pBatch->pElem[i].pUsrp; + objDestruct(pUsr); } iRet = DeleteBatchFromQStore(pThis, pBatch); - pBatch->nElem = 0; /* reset batch */ + pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ RETiRet; } @@ -1908,8 +1916,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->batch.nElem ; i++) - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp)); + for(i = 0 ; i < pWti->batch.nElem ; i++) { + /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs + * the message. So far, we simply assume we always have msg_t, what currently is always the case. + * rgerhards, 2009-05-28 + */ + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + } finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -2306,16 +2319,12 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g if(pThis->bRunsDA != 2) { InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); -//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war) -RUNLOG_VAR("%d", pThis->bRunsDA); -RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ } /* make sure we do not timeout before we are done */ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ -RUNLOG; iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -2333,7 +2342,6 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ -RUNLOG_STR("XXX: queue destruct\n"); /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) @@ -2347,7 +2355,6 @@ RUNLOG_STR("XXX: queue destruct\n"); * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ -//!!!! //CHKiRet(pThis->qUnDeqAll(pThis)); dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); CHKiRet(pThis->qUnDeqAll(pThis)); dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); diff --git a/runtime/wti.c b/runtime/wti.c index 9428cd47..465dc3e1 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -392,7 +392,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - pThis->batch.nElemDeq = 0; /* re-init dequeue count */ + // TODO: if we have a problem, enable again! pThis->batch.nElemDeq = 0; /* re-init dequeue count */ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); diff --git a/runtime/wtp.c b/runtime/wtp.c index a5836da3..6b39793b 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -294,17 +294,13 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); -dbgprintf("XXX:10 wtp %p, state %d\n", pThis, pThis->wtpState); wtpSetState(pThis, tShutdownCmd); -dbgprintf("XXX:20 wtp %p, state %d\n", pThis, pThis->wtpState); wtpWakeupAllWrkr(pThis); -dbgprintf("XXX:30 wtp %p, state %d\n", pThis, pThis->wtpState); /* see if we need to harvest (join) any terminated threads (even in timeout case, * some may have terminated... */ wtpProcessThrdChanges(pThis); -dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState); /* and wait for their termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -312,9 +308,7 @@ dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState); pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; -dbgprintf("XXX:50 wtp %p, state %d\n", pThis, pThis->wtpState); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { -dbgprintf("XXX:60 wtp %p, state %d\n", pThis, pThis->wtpState); dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); diff --git a/tests/DiagTalker.java b/tests/DiagTalker.java index 85a6671e..04e12327 100644 --- a/tests/DiagTalker.java +++ b/tests/DiagTalker.java @@ -34,6 +34,7 @@ public class DiagTalker { try { diagSocket = new Socket(host, port); + diagSocket.setSoTimeout(0); /* wait for lenghty operations */ out = new PrintWriter(diagSocket.getOutputStream(), true); in = new BufferedReader(new InputStreamReader( diagSocket.getInputStream())); diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh index 25e7fd95..6ec2f3a9 100755 --- a/tests/da-mainmsg-q.sh +++ b/tests/da-mainmsg-q.sh @@ -27,8 +27,5 @@ source $srcdir/diag.sh injectmsg 2050 50 # clean up and check test result source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages -### currently, we get a stable abort if we use the former kill logic. With shutdown-when-empty, it hangs (but that still tells us there is a bug ;)) ### -#kill `cat rsyslog.pid` -echo seqchk? source $srcdir/diag.sh seq-check 2099 source $srcdir/diag.sh exit diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index 0ec76b47..7b6ec6dd 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -19,7 +19,6 @@ $srcdir/diag.sh shutdown-immediate $srcdir/diag.sh wait-shutdown source $srcdir/diag.sh check-mainq-spool -echo DEBUG EXIT! #exit # restart engine and have rest processed diff --git a/tests/diag.sh b/tests/diag.sh index a34f3ede..ce323143 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -9,7 +9,7 @@ #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace -#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction" +#export RSYSLOG_DEBUG="debug nostdout printmutexaction" #export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason |