From 1c580743390c704bcfb44148f4a70254c2c247da Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 20 Jul 2009 12:53:00 +0200 Subject: solved potential race condition and some cleanup code review brought up some few places where we may have run into a race. They have most probably been introduced during the recent set of changes. But I do not look at older versions because of the changed architecture, one can not simply backport this patch. --- runtime/queue.c | 17 +++++++++++------ runtime/wti.c | 2 ++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index 7590af18..50bfaca9 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -264,6 +264,7 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) * call this function. As such,it may validly be that DA is already shut down. * So we just check if we are in init phase and then wait for full startup. * If in non-DA mode, we silently return. + * IMPORTANT: the QUEUE MUTEX MUST BE LOOKED WHEN this funnction is called! * rgerhards, 2008-02-27 */ static rsRetVal @@ -1167,12 +1168,12 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } - d_pthread_mutex_unlock(pThis->mut); /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) { qqueueWaitDAModeInitialized(pThis); } + d_pthread_mutex_unlock(pThis->mut); /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found * out there are no active workers - that doesn't matter: the wtp knows about that and so will @@ -1200,7 +1201,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ + d_pthread_mutex_lock(pThis->mut); if(pThis->bRunsDA) { + d_pthread_mutex_unlock(pThis->mut); dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured @@ -1216,13 +1219,15 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting * the queue, it is restarted at a later stage. We don't care here if a timeout happens. */ - dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + dbgoprint((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); } else { dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n"); } + } else { + d_pthread_mutex_unlock(pThis->mut); } RETiRet; @@ -1247,7 +1252,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ - /* note that we modify bEnqOnly direclty, because going through the method would + /* note that we modify bEnqOnly directly, because going through the method would * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 */ pThis->bEnqOnly = 1; @@ -2281,12 +2286,14 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); + d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(pThis->bRunsDA != 2) { - InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ + InitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ } + d_pthread_mutex_unlock(pThis->mut); /* make sure we do not timeout before we are done */ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); @@ -2321,9 +2328,7 @@ CODESTARTobjDestruct(qqueue) * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ -dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); CHKiRet(pThis->qUnDeqAll(pThis)); -dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); diff --git a/runtime/wti.c b/runtime/wti.c index 900e1cba..c295ccc9 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -289,10 +289,12 @@ wtiWorker(wti_t *pThis) } /* indicate termination */ + d_pthread_mutex_lock(pWtp->pmutUsr); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_cleanup_pop(0); /* remove cleanup handler */ pWtp->pfOnWorkerShutdown(pWtp->pUsr); pthread_setcancelstate(iCancelStateSave, NULL); + d_pthread_mutex_unlock(pWtp->pmutUsr); RETiRet; } -- cgit