From 5625dbd1b6cddb8b84d8a3d8c60f95eaaa49be66 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 7 Oct 2009 18:40:30 +0200 Subject: bugfix and testbench improvements - bugfix: solved potential (temporary) stall of messages when the queue was almost empty and few new data added (caused testbench to sometimes hang!) - fixed some race condition in testbench - added more elaborate diagnostics to parts of the testbench - solved a potential race inside the queue engine --- runtime/queue.c | 22 ++++++++++++++++++---- runtime/wti.c | 15 ++++++++++++--- runtime/wtp.c | 3 +++ 3 files changed, 33 insertions(+), 7 deletions(-) (limited to 'runtime') diff --git a/runtime/queue.c b/runtime/queue.c index 96ebd6d5..101052a1 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1669,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); - d_pthread_mutex_unlock(pThis->mut); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1776,9 +1775,7 @@ RateLimiter(qqueue_t *pThis) } -/* This dequeues the next batch and checks if the queue is empty. If it is - * empty, return RS_RET_IDLE. That will trigger termination of the function - * and tell the upper layer caller to initiate idle processing. +/* This dequeues the next batch. * rgerhards, 2009-05-20 */ static inline rsRetVal @@ -1789,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); +dbgprintf("YYY: deqeueu for consumer"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE); + finalize_it: RETiRet; } @@ -1832,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1844,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; @@ -1869,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs @@ -1878,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -2531,6 +2544,7 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); +dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wti.c b/runtime/wti.c index e624899b..53b695b0 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -134,6 +134,7 @@ wtiCancelThrd(wti_t *pThis) pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ while(wtiGetState(pThis)) { +//fprintf(stderr, "sleep loop for getState\n"); srSleep(0, 10000); } } @@ -223,9 +224,9 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - d_pthread_mutex_lock(pWtp->pmutUsr); if(pThis->bAlwaysRunning) { /* never shut down any started worker */ +dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ @@ -234,7 +235,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } - d_pthread_mutex_unlock(pWtp->pmutUsr); ENDfunc } @@ -267,8 +267,10 @@ wtiWorker(wti_t *pThis) pWtp->pfRateLimiter(pWtp->pUsr); } +dbgprintf("YYY/ZZZ: pre lock mutex\n"); d_pthread_mutex_lock(pWtp->pmutUsr); +dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { @@ -281,17 +283,24 @@ wtiWorker(wti_t *pThis) } /* try to execute and process whatever we have */ - /* This function must and does RELEASE the MUTEX! */ + /* Note that this function releases and re-aquires the mutex. The returned + * information on idle state must be processed before releasing the mutex again. + */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); +dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { + d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); + d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } + d_pthread_mutex_unlock(pWtp->pmutUsr); + bInactivityTOOccured = 0; /* reset for next run */ } diff --git a/runtime/wtp.c b/runtime/wtp.c index 4524e0c3..40d031dc 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -413,6 +413,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) ISOBJ_TYPE_assert(pThis, wtp); +int nMaxWrkrTmp = nMaxWrkr; if(nMaxWrkr == 0) FINALIZE; @@ -420,6 +421,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) nMaxWrkr = pThis->iNumWorkerThreads; nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); +dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing); if(nMissing > 0) { DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); @@ -428,6 +430,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) CHKiRet(wtpStartWrkr(pThis)); } } else { +dbgprintf("YYY: adivse signal cond busy"); pthread_cond_signal(pThis->pcondBusy); } -- cgit