From 7574e70df4c6796878d3b753275f01b5f0d65ade Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 18 May 2010 17:53:12 +0200 Subject: fixed race conditions during queue shutdown (DA case, disks active) --- runtime/queue.c | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 4f0d36b9..b6c30278 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -769,8 +769,8 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) */ objDestruct(pUsr); - DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", - nWriteCount, pThis->tVars.disk.sizeOnDisk); + DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", + nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); finalize_it: RETiRet; @@ -944,6 +944,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) d_pthread_mutex_lock(pThis->mut); /* tell regular queue DA worker to stop shuffling messages to DA queue... */ + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n"); pThis->pqDA->bEnqOnly = 1; wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); @@ -1010,6 +1011,7 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; /* now DA queue */ @@ -1356,6 +1358,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { +dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { @@ -1600,6 +1603,12 @@ finalize_it: /* This is called when a batch is processed and the worker does not * ask for another batch (e.g. because it is to be terminated) + * Note that we must not be terminated while we delete a processed + * batch. Otherwise, we may not complete it, and then the cancel + * handler also tries to delete the batch. But then it finds some of + * the messages already destructed. This was a bug we have seen, especially + * with disk mode, where a delete takes rather long. Anyhow, the coneptual + * problem exists in all queue modes. * rgerhards, 2009-05-27 */ static rsRetVal @@ -1610,8 +1619,12 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + int iCancelStateSave; + /* at this spot, we must not be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -2136,6 +2149,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); +//dbgCallStackPrintAll(); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. -- cgit