diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 62 |
1 files changed, 38 insertions, 24 deletions
@@ -119,6 +119,26 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) } +/* wait until we have a fully initialized DA queue. Sometimes, we need to + * sync with it, as we expect it for some function. + * rgerhards, 2008-02-27 + */ +static rsRetVal +queueWaitDAModeInitialized(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ASSERT(pThis->bRunsDA); + + while(pThis->bRunsDA != 2) { + d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); + } + + RETiRet; +} + + /* Destruct DA queue. This is the last part of DA-to-normal-mode * transistion. This is called asynchronously and some time quite a * while after the actual transistion. The key point is that we need to @@ -140,9 +160,8 @@ queueTurnOffDAMode(queue_t *pThis) /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ - while(pThis->bRunsDA != 2) { - d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); - } + queueWaitDAModeInitialized(pThis); + /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to * keep that comment if future need arises. @@ -278,7 +297,7 @@ queueStartDA(queue_t *pThis) pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ - pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ + pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", queueGetID(pThis->pqDA)); @@ -816,28 +835,19 @@ static rsRetVal qDestructDisk(queue_t *pThis) static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; - int64 offsIn; - int64 offsOut; + number_t nWriteCount; ASSERT(pThis != NULL); - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn)); + CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut)); + CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ - if(offsIn < offsOut) { - offsIn = offsOut - offsIn; - } else { - /* we had a file switch, so the second offset is the actual number of bytes - * written. So... - */ - offsIn = offsOut; - } + pThis->tVars.disk.sizeOnDisk += nWriteCount; - pThis->tVars.disk.sizeOnDisk += offsIn; - - dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk); + dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", + nWriteCount, pThis->tVars.disk.sizeOnDisk); finalize_it: RETiRet; @@ -1109,6 +1119,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) */ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ + /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ + if(pThis->bRunsDA) + queueWaitDAModeInitialized(pThis); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { @@ -1116,6 +1130,9 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) if(pThis->bRunsDA == 0) { queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ } else { + /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1) + * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27 + */ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ } END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1302,7 +1319,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, queue); -RUNLOG_VAR("%p", pUsr); if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); @@ -1746,8 +1762,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); objSerializeSCALAR(psQIF, iUngottenObjs, INT); - objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG); - objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG); + objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); + objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64); CHKiRet(objEndSerialize(psQIF)); /* now we must persist all objects on the ungotten queue - they can not go to @@ -1837,9 +1853,7 @@ CODESTARTobjDestruct(queue) * no WtpDA associated with it - which is perfectly legal thanks to this code here. */ if(pThis->pWtpDA != NULL) { -RUNLOG_STR("wtpDA is being destructed\n"); wtpDestruct(&pThis->pWtpDA); -RUNLOG_STR("wtpDA is being destructed - done\n"); } if(pThis->pqDA != NULL) { queueDestruct(&pThis->pqDA); |