summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c62
1 files changed, 38 insertions, 24 deletions
diff --git a/queue.c b/queue.c
index 25261751..cd10a5df 100644
--- a/queue.c
+++ b/queue.c
@@ -119,6 +119,26 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
}
+/* wait until we have a fully initialized DA queue. Sometimes, we need to
+ * sync with it, as we expect it for some function.
+ * rgerhards, 2008-02-27
+ */
+static rsRetVal
+queueWaitDAModeInitialized(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ASSERT(pThis->bRunsDA);
+
+ while(pThis->bRunsDA != 2) {
+ d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
+ }
+
+ RETiRet;
+}
+
+
/* Destruct DA queue. This is the last part of DA-to-normal-mode
* transistion. This is called asynchronously and some time quite a
* while after the actual transistion. The key point is that we need to
@@ -140,9 +160,8 @@ queueTurnOffDAMode(queue_t *pThis)
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
- while(pThis->bRunsDA != 2) {
- d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
- }
+ queueWaitDAModeInitialized(pThis);
+
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
* keep that comment if future need arises.
@@ -278,7 +297,7 @@ queueStartDA(queue_t *pThis)
pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */
- pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
+ pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
queueGetID(pThis->pqDA));
@@ -816,28 +835,19 @@ static rsRetVal qDestructDisk(queue_t *pThis)
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
- int64 offsIn;
- int64 offsOut;
+ number_t nWriteCount;
ASSERT(pThis != NULL);
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn));
+ CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut));
+ CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
- if(offsIn < offsOut) {
- offsIn = offsOut - offsIn;
- } else {
- /* we had a file switch, so the second offset is the actual number of bytes
- * written. So...
- */
- offsIn = offsOut;
- }
+ pThis->tVars.disk.sizeOnDisk += nWriteCount;
- pThis->tVars.disk.sizeOnDisk += offsIn;
-
- dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk);
+ dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
+ nWriteCount, pThis->tVars.disk.sizeOnDisk);
finalize_it:
RETiRet;
@@ -1109,6 +1119,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
*/
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
+ /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
+ if(pThis->bRunsDA)
+ queueWaitDAModeInitialized(pThis);
+
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
@@ -1116,6 +1130,9 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
} else {
+ /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
+ * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
+ */
queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1302,7 +1319,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, queue);
-RUNLOG_VAR("%p", pUsr);
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
@@ -1746,8 +1762,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, iUngottenObjs, INT);
- objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG);
- objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG);
+ objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
+ objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(objEndSerialize(psQIF));
/* now we must persist all objects on the ungotten queue - they can not go to
@@ -1837,9 +1853,7 @@ CODESTARTobjDestruct(queue)
* no WtpDA associated with it - which is perfectly legal thanks to this code here.
*/
if(pThis->pWtpDA != NULL) {
-RUNLOG_STR("wtpDA is being destructed\n");
wtpDestruct(&pThis->pWtpDA);
-RUNLOG_STR("wtpDA is being destructed - done\n");
}
if(pThis->pqDA != NULL) {
queueDestruct(&pThis->pqDA);