From fc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 11:57:30 +0200 Subject: fixing an issue during DA mode queue shutdown also changed DA queue mode in that the regular workers now run concurrently. --- runtime/queue.c | 108 ++++++++++++++++++++++++-------------------------------- runtime/wti.c | 1 + 2 files changed, 47 insertions(+), 62 deletions(-) (limited to 'runtime') diff --git a/runtime/queue.c b/runtime/queue.c index 4405dd39..698495ef 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -72,6 +72,7 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -240,14 +241,15 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } + } + /* regular workers always run */ + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; } else { - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } +dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers); + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } RETiRet; @@ -288,7 +290,7 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -qqueueTurnOffDAMode(qqueue_t *pThis) +TurnOffDAMode(qqueue_t *pThis) { DEFiRet; @@ -299,9 +301,7 @@ RUNLOG_STR("XXX: TurnOffDAMode\n"); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ -RUNLOG; - qqueueWaitDAModeInitialized(pThis); -RUNLOG; + //TODO: MULTI del, can not happen (but verify first) qqueueWaitDAModeInitialized(pThis); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -321,16 +321,13 @@ dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pTh /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ +//XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); - /* now we need to check if the regular queue has some messages. This may be the case - * when it is waiting that the high water mark is reached again. If so, we need to start up - * a regular worker. -- rgerhards, 2008-01-26 - */ - if(getLogicalQueueSize(pThis) > 0) { - qqueueAdviseMaxWorkers(pThis); - } + } else { + /* the queue has data again! */ + dbgprintf("DA queue has data during shutdown, restarting...\n"); + qqueueAdviseMaxWorkers(pThis->pqDA); } RETiRet; @@ -408,6 +405,10 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); + + // experimental: XXX + CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0)); + if(pThis->toQShutdown == 0) { CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { @@ -423,14 +424,6 @@ StartDA(qqueue_t *pThis) if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - /* as we are right now starting DA mode because we are so busy, it is - * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER, - * we want to be on the safe side, and so we awake anyone that is waiting - * on one. So even if the scheduler plays badly with us, things should be - * quite well. -- rgerhards, 2008-01-15 - */ - wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ - pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ @@ -481,8 +474,9 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); @@ -500,8 +494,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * that will also start one up. If we forgot that step, everything would be stalled * until the next enqueue request. */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */ -RUNLOG_VAR("%d", pThis->bRunsDA); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */ finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -516,7 +509,7 @@ finalize_it: * rgerhards, 2008-01-14 */ static inline rsRetVal -qqueueChkStrtDA(qqueue_t *pThis) +ChkStrtDA(qqueue_t *pThis) { DEFiRet; @@ -1538,7 +1531,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); -dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); +//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); @@ -1849,13 +1842,13 @@ finalize_it: * rgerhards, 2009-05-27 */ static rsRetVal -batchProcessedReg(qqueue_t *pThis, wti_t *pWti) +batchProcessed(qqueue_t *pThis, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq); +dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); @@ -1940,7 +1933,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; -RUNLOG; } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1948,15 +1940,17 @@ RUNLOG; && pThis->pqDA->sizeOnDiskMax > 0 && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ -RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); +dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; +RUNLOG_STR("XXX: re-start reg worker"); +qqueueAdviseMaxWorkers(pThis); +RUNLOG_STR("XXX: done re-start reg worker"); } } else { -RUNLOG; - iRet = RS_RET_TERMINATE_NOW; + // experimental iRet = RS_RET_TERMINATE_NOW; + ; } } @@ -1979,11 +1973,11 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ - if(pThis->bEnqOnly || pThis->bRunsDA) { -RUNLOG; + // TODO: DEL - we now keep the workers running! if(pThis->bEnqOnly || pThis->bRunsDA) { + if(pThis->bEnqOnly) { +dbgprintf("XXX: terminate_NOW queue:Reg worker: enqOnly! queue size %d\n", getPhysicalQueueSize(pThis)); iRet = RS_RET_TERMINATE_NOW; } else if(pThis->pqParent != NULL) { -RUNLOG; iRet = RS_RET_TERMINATE_WHEN_IDLE; } @@ -2000,35 +1994,27 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) DEFiRet; assert(pVal != NULL); *pVal = pThis->iDeqBatchSize; +if(pThis->pqParent != NULL) + *pVal = 16; RETiRet; } /* must only be called when the queue mutex is locked, else results - * are not stable! DA queue version + * are not stable! DA worker version (pThis *is* the *main* queue, not DA!) */ static int qqueueIsIdleDA(qqueue_t *pThis) { - /* remember: iQueueSize is the DA queue size, not the main queue! */ - /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk); } /* must only be called when the queue mutex is locked, else results - * are not stable! Regular queue version + * are not stable! Regular worker version. */ static int IsIdleReg(qqueue_t *pThis) { -#if 0 /* enable for performance testing */ - int ret; - ret = getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk); - if(ret) fprintf(stderr, "queue is idle\n"); - return ret; -#else - /* regular code! */ - return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); -#endif + return(getPhysicalQueueSize(pThis) == 0); } @@ -2084,7 +2070,8 @@ RegOnWrkrStartup(qqueue_t *pThis) /* start up the queue - it must have been constructed and parameters defined * before. */ -rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ +rsRetVal +qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; rsRetVal iRetLocal; @@ -2141,7 +2128,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); - CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); @@ -2168,7 +2155,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } } - if(!bInitialized) { + if(Debug && !bInitialized) { dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " "queue itself!)\n"); } @@ -2507,7 +2494,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) - CHKiRet(qqueueChkStrtDA(pThis)); + CHKiRet(ChkStrtDA(pThis)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2611,16 +2598,13 @@ SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); if(pThis->pWtpReg != NULL) wtpWakeupAllWrkr(pThis->pWtpReg); -RUNLOG; if(pThis->pWtpDA != NULL) wtpWakeupAllWrkr(pThis->pWtpDA); -RUNLOG; } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ } } -RUNLOG; pThis->bEnqOnly = bEnqOnly; diff --git a/runtime/wti.c b/runtime/wti.c index 2fb5eea2..9428cd47 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -392,6 +392,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + pThis->batch.nElemDeq = 0; /* re-init dequeue count */ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); -- cgit