diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 73 | ||||
-rw-r--r-- | runtime/queue.h | 1 | ||||
-rw-r--r-- | runtime/wti.c | 2 |
3 files changed, 9 insertions, 67 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 50bfaca9..bb988c86 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -258,30 +258,6 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) } -/* wait until we have a fully initialized DA queue. Sometimes, we need to - * sync with it, as we expect it for some function. Note that in extreme - * cases, the DA queue may already have started up AND terminated when we - * call this function. As such,it may validly be that DA is already shut down. - * So we just check if we are in init phase and then wait for full startup. - * If in non-DA mode, we silently return. - * IMPORTANT: the QUEUE MUTEX MUST BE LOOKED WHEN this funnction is called! - * rgerhards, 2008-02-27 - */ -static rsRetVal -qqueueWaitDAModeInitialized(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - while(pThis->bRunsDA == 1) { - d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); - } - - RETiRet; -} - - /* Destruct DA queue. This is the last part of DA-to-normal-mode * transistion. This is called asynchronously and some time quite a * while after the actual transistion. The key point is that we need to @@ -301,10 +277,6 @@ TurnOffDAMode(qqueue_t *pThis) ASSERT(pThis->bRunsDA); if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ - /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, - * this will be quick. - */ -//XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); } @@ -356,9 +328,6 @@ StartDA(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); - if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */ - FINALIZE; /* ... then we are already done! */ - /* create message queue */ CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); @@ -404,9 +373,7 @@ StartDA(qqueue_t *pThis) if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ - pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ - pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ + //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", qqueueGetID(pThis->pqDA)); @@ -455,7 +422,6 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); @@ -467,9 +433,15 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) /* if we reach this point, we have a "good" DA worker pool */ /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ - pThis->bRunsDA = 1; pThis->bDAEnqOnly = bEnqOnly; + /* now construct the actual queue (if it does not already exist) */ + if(pThis->pqDA == NULL) { + CHKiRet(StartDA(pThis)); + } + + pThis->bRunsDA = 1; + /* now we must now adivse the wtp that we need one worker. If none is yet active, * that will also start one up. If we forgot that step, everything would be stalled * until the next enqueue request. @@ -1168,11 +1140,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } - - /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ - if(pThis->bRunsDA) { - qqueueWaitDAModeInitialized(pThis); - } d_pthread_mutex_unlock(pThis->mut); /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found @@ -2024,7 +1991,6 @@ RegOnWrkrShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pqParent != NULL) { - pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ } @@ -2034,24 +2000,6 @@ RegOnWrkrShutdown(qqueue_t *pThis) } -/* The following function is called when a regular queue worker starts up. We need this - * hook to indicate in the parent queue (if we are a child) that we are not done yet. - */ -static rsRetVal -RegOnWrkrStartup(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - if(pThis->pqParent != NULL) { - pThis->pqParent->bChildIsDone = 0; - } - - RETiRet; -} - - /* start up the queue - it must have been constructed and parameters defined * before. */ @@ -2114,7 +2062,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); @@ -2287,11 +2234,9 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ -dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); - if(pThis->bRunsDA != 2) { + if(!pThis->bRunsDA) { InitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); - qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ } d_pthread_mutex_unlock(pThis->mut); /* make sure we do not timeout before we are done */ diff --git a/runtime/queue.h b/runtime/queue.h index 7b10e5dd..73c62b52 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -121,7 +121,6 @@ typedef struct queue_s { pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ - int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* the following variables are always present, because they diff --git a/runtime/wti.c b/runtime/wti.c index c295ccc9..9d0560dd 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -250,9 +250,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - d_pthread_mutex_lock(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); - d_pthread_mutex_unlock(pWtp->pmutUsr); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ |