diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-13 14:38:45 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-13 14:38:45 +0200 |
commit | 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (patch) | |
tree | 35e71c16c55fd3a18a9e5f47d0b027866f825e35 /runtime/queue.c | |
parent | becc47cef625bfabf53589bb98ca10c352a4c824 (diff) | |
download | rsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.tar.gz rsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.tar.xz rsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.zip |
added some debug settings plus improved shutdown sequence
... non-working version!
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 51 |
1 files changed, 39 insertions, 12 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 101052a1..00bbd15f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1041,7 +1041,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batchObj.pUsrp = (obj_t*) pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); objDestruct(pUsr); RETiRet; @@ -1180,6 +1180,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } else { DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n"); } + } + + if(pThis->pWtpDA != NULL) { /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting * the queue, it is restarted at a later stage. We don't care here if a timeout happens. */ @@ -1210,6 +1213,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) rsRetVal iRetLocal; DEFiRet; +RUNLOG_STR("trying to shutdown workers within Action Timeout"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ @@ -1218,6 +1222,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 */ pThis->bEnqOnly = 1; + pThis->bShutdownImmediate = 1; /* need to set this so that the DA queue begins shutdown in parallel! */ if(pThis->pqDA != NULL) { pThis->pqDA->bEnqOnly = 1; @@ -1247,6 +1252,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + } + + if(pThis->pWtpDA != NULL) { /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This * is necessary because we may be in a situation where the DA queue regular worker and the * main queue worker stopped rather quickly. In this case, there is almost no time (and @@ -1279,6 +1287,7 @@ static rsRetVal cancelWorkers(qqueue_t *pThis) { rsRetVal iRetLocal; + struct timespec tTimeout; DEFiRet; /* Now queue workers should have terminated. If not, we need to cancel them as we have applied @@ -1300,13 +1309,31 @@ cancelWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } + } - /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be - * restarted later to persist the queue. But we stop it, because otherwise we get into - * big trouble when resetting the logical dequeue pointer. This operation can only be - * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be + * restarted later to persist the queue. But we stop it, because otherwise we get into + * big trouble when resetting the logical dequeue pointer. This operation can only be + * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + */ + if(pThis->pWtpDA != NULL) { + /* but because of the potentially harsh consequences of cancelling, we try one last + * (and short) time to shut down the DA worker in a normal fashion. The idea here + * is that it may be willing to do so, but we did not yet have a task switch so + * that it could not terminate but will do immediately when it gets time. + * rgerhards, 2009-10-13 */ - DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + timeoutComp(&tTimeout, 50); + DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " + "- this is not good, need to cancel now...\n"); + } else { + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n"); + } + + DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n"); iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } @@ -1349,6 +1376,7 @@ ShutdownWorkers(qqueue_t *pThis) pThis->iLowWtrMrk = 0; CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); +dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis)); if(getPhysicalQueueSize(pThis) > 0) { CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); @@ -1375,7 +1403,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) { DEFiRet; qqueue_t *pThis; @@ -1835,7 +1863,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1880,7 +1908,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->batch.nElem ; i++) { + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 @@ -1925,7 +1953,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; RUNLOG_STR("XXX: re-start reg worker"); -qqueueAdviseMaxWorkers(pThis); +if(!pThis->bShutdownImmediate) + qqueueAdviseMaxWorkers(pThis); RUNLOG_STR("XXX: done re-start reg worker"); } } else { @@ -2276,8 +2305,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) - pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) |