diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-28 09:59:45 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-28 09:59:45 +0200 |
commit | 9517e19b6427c295e206ece9562ce70f4a6d7044 (patch) | |
tree | 783fa1af0c03e05f10cb3b54149b6fcba508263d /runtime/queue.c | |
parent | d4564f8399f4362c7e79066370049f909cef996c (diff) | |
download | rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.gz rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.xz rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.zip |
preserving current changes
... in preparation for some larger changes - I need to apply some
serious design changes, as the current system does not play well
at all with ultra-reliable queues. Will do that in a totally new version.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 35 |
1 files changed, 23 insertions, 12 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 48d1c6e3..4405dd39 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -255,7 +255,11 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) /* wait until we have a fully initialized DA queue. Sometimes, we need to - * sync with it, as we expect it for some function. + * sync with it, as we expect it for some function. Note that in extreme + * cases, the DA queue may already have started up AND terminated when we + * call this function. As such,it may validly be that DA is already shut down. + * So we just check if we are in init phase and then wait for full startup. + * If in non-DA mode, we silently return. * rgerhards, 2008-02-27 */ static rsRetVal @@ -264,9 +268,8 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(pThis->bRunsDA); - while(pThis->bRunsDA != 2) { + while(pThis->bRunsDA == 1) { d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); } @@ -289,13 +292,16 @@ qqueueTurnOffDAMode(qqueue_t *pThis) { DEFiRet; +RUNLOG_STR("XXX: TurnOffDAMode\n"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ +RUNLOG; qqueueWaitDAModeInitialized(pThis); +RUNLOG; /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -309,6 +315,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis) * during the lifetime of DA-mode, depending on how often the DA worker receives an * inactivity timeout. -- rgerhards, 2008-01-25 */ +dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pThis->pqDA)); if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, @@ -1151,8 +1158,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) iRet = pThis->qDeq(pThis, pUsr); ATOMIC_INC(pThis->nLogDeq); - dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", - getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", +// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1259,13 +1266,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ -RUNLOG_STR("setting enqOnly for main queue"); - //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */ + /* note that we modify bEnqOnly direclty, because going through the method would + * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 + */ pThis->bEnqOnly = 1; wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); if(pThis->pqDA != NULL) { -RUNLOG_STR("setting enqOnly for DA queue"); - //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX); pThis->pqDA->bEnqOnly = 1; wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); } @@ -1644,7 +1650,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); +//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ @@ -1945,7 +1951,7 @@ RUNLOG; RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -RUNLOG; +dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; } } else { @@ -1973,7 +1979,6 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ -RUNLOG; if(pThis->bEnqOnly || pThis->bRunsDA) { RUNLOG; iRet = RS_RET_TERMINATE_NOW; @@ -2314,6 +2319,7 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g if(pThis->bRunsDA != 2) { InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war) RUNLOG_VAR("%d", pThis->bRunsDA); RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ @@ -2322,6 +2328,7 @@ RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ +RUNLOG; iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -2339,6 +2346,7 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ +RUNLOG_STR("XXX: queue destruct\n"); /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) @@ -2353,6 +2361,9 @@ CODESTARTobjDestruct(qqueue) * so and then destruct everything. -- rgerhards, 2009-05-26 */ //!!!! //CHKiRet(pThis->qUnDeqAll(pThis)); +dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + CHKiRet(pThis->qUnDeqAll(pThis)); +dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); |