diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-28 09:59:45 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-28 09:59:45 +0200 |
commit | 9517e19b6427c295e206ece9562ce70f4a6d7044 (patch) | |
tree | 783fa1af0c03e05f10cb3b54149b6fcba508263d | |
parent | d4564f8399f4362c7e79066370049f909cef996c (diff) | |
download | rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.gz rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.xz rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.zip |
preserving current changes
... in preparation for some larger changes - I need to apply some
serious design changes, as the current system does not play well
at all with ultra-reliable queues. Will do that in a totally new version.
-rw-r--r-- | plugins/imdiag/imdiag.c | 4 | ||||
-rw-r--r-- | runtime/queue.c | 35 | ||||
-rw-r--r-- | runtime/wti.c | 13 | ||||
-rw-r--r-- | runtime/wtp.c | 6 | ||||
-rwxr-xr-x | tests/diag.sh | 2 |
5 files changed, 30 insertions, 30 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 03396db7..cb1c1686 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -256,10 +256,13 @@ static rsRetVal waitMainQEmpty(tcps_sess_t *pSess) { int iMsgQueueSize; + int iPrint = 0; DEFiRet; CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); while(iMsgQueueSize > 0) { + if(iPrint++ % 500 == 0) + dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize); srSleep(0,2); /* wait a little bit */ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); } @@ -294,6 +297,7 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg) getFirstWord(&pszMsg, cmdBuf, sizeof(cmdBuf)/sizeof(uchar), TO_LOWERCASE); + dbgprintf("imdiag received command '%s'\n", cmdBuf); if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("getmainmsgqueuesize"))) { CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); CHKiRet(sendResponse(pSess, "%d\n", iMsgQueueSize)); diff --git a/runtime/queue.c b/runtime/queue.c index 48d1c6e3..4405dd39 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -255,7 +255,11 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) /* wait until we have a fully initialized DA queue. Sometimes, we need to - * sync with it, as we expect it for some function. + * sync with it, as we expect it for some function. Note that in extreme + * cases, the DA queue may already have started up AND terminated when we + * call this function. As such,it may validly be that DA is already shut down. + * So we just check if we are in init phase and then wait for full startup. + * If in non-DA mode, we silently return. * rgerhards, 2008-02-27 */ static rsRetVal @@ -264,9 +268,8 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(pThis->bRunsDA); - while(pThis->bRunsDA != 2) { + while(pThis->bRunsDA == 1) { d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); } @@ -289,13 +292,16 @@ qqueueTurnOffDAMode(qqueue_t *pThis) { DEFiRet; +RUNLOG_STR("XXX: TurnOffDAMode\n"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ +RUNLOG; qqueueWaitDAModeInitialized(pThis); +RUNLOG; /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -309,6 +315,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis) * during the lifetime of DA-mode, depending on how often the DA worker receives an * inactivity timeout. -- rgerhards, 2008-01-25 */ +dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pThis->pqDA)); if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, @@ -1151,8 +1158,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) iRet = pThis->qDeq(pThis, pUsr); ATOMIC_INC(pThis->nLogDeq); - dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", - getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", +// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1259,13 +1266,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ -RUNLOG_STR("setting enqOnly for main queue"); - //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */ + /* note that we modify bEnqOnly direclty, because going through the method would + * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 + */ pThis->bEnqOnly = 1; wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); if(pThis->pqDA != NULL) { -RUNLOG_STR("setting enqOnly for DA queue"); - //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX); pThis->pqDA->bEnqOnly = 1; wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); } @@ -1644,7 +1650,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); +//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ @@ -1945,7 +1951,7 @@ RUNLOG; RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -RUNLOG; +dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; } } else { @@ -1973,7 +1979,6 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ -RUNLOG; if(pThis->bEnqOnly || pThis->bRunsDA) { RUNLOG; iRet = RS_RET_TERMINATE_NOW; @@ -2314,6 +2319,7 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g if(pThis->bRunsDA != 2) { InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war) RUNLOG_VAR("%d", pThis->bRunsDA); RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ @@ -2322,6 +2328,7 @@ RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ +RUNLOG; iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -2339,6 +2346,7 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ +RUNLOG_STR("XXX: queue destruct\n"); /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) @@ -2353,6 +2361,9 @@ CODESTARTobjDestruct(qqueue) * so and then destruct everything. -- rgerhards, 2009-05-26 */ //!!!! //CHKiRet(pThis->qUnDeqAll(pThis)); +dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + CHKiRet(pThis->qUnDeqAll(pThis)); +dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); diff --git a/runtime/wti.c b/runtime/wti.c index 75e497b8..2fb5eea2 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -392,11 +392,8 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); -dbgprintf("XXX: worker startup\n"); - RUNLOG_STR("MUTEX lock"); BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); - RUNLOG_STR("MUTEX release"); END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* now we have our identity, on to real processing */ @@ -410,21 +407,17 @@ dbgprintf("XXX: worker startup\n"); } wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ - RUNLOG_STR("MUTEX lock"); BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ -RUNLOG; terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); -RUNLOG_VAR("%d", terminateRet); if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); - dbgoprint((obj_t*) pThis, "terminating worker because auf TERMINATE_NOW mode, del iRet %d\n", + dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", localRet); break; } -RUNLOG; /* try to execute and process whatever we have */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); @@ -439,10 +432,8 @@ RUNLOG; /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ break; /* end worker thread run */ } - RUNLOG_STR("MUTEX lock"); BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - RUNLOG_STR("MUTEX release"); END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); continue; /* request next iteration */ } @@ -451,7 +442,6 @@ RUNLOG; } /* if we exit the loop, the mutex is locked and must be unlocked */ - RUNLOG_STR("MUTEX release"); END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* indicate termination */ @@ -459,6 +449,7 @@ RUNLOG; d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ +RUNLOG_STR("XXX: Worker shutdown"); pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); diff --git a/runtime/wtp.c b/runtime/wtp.c index 7786a656..a5836da3 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -141,7 +141,6 @@ BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODE int i; CODESTARTobjDestruct(wtp) wtpProcessThrdChanges(pThis); /* process thread changes one last time */ -RUNLOG_STR("wtpDestruct"); /* destruct workers */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) @@ -263,22 +262,17 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); -RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { -RUNLOG; ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(pThis->wtpState == wtpState_SHUTDOWN) { ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); -RUNLOG; } -RUNLOG_VAR("%d", iRet); /* try customer handler if one was set and we do not yet have a definite result */ if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } -RUNLOG_VAR("%d", iRet); finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); diff --git a/tests/diag.sh b/tests/diag.sh index 5c6a0ab8..8bf6b129 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -9,7 +9,7 @@ #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace -#export RSYSLOG_DEBUG="debug Xnostdout printmutexaction" +export RSYSLOG_DEBUG="debug nostdout printmutexaction" export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason |