From 1679e0643d6e3fc964933b1af1745a810912d8a1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 19 Jan 2008 17:33:53 +0000 Subject: some further cleanup on the mutexes --- queue.c | 178 ++++++++++++++++++++++------------------------------------------ 1 file changed, 62 insertions(+), 116 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 864f4711..c77ef447 100644 --- a/queue.c +++ b/queue.c @@ -58,13 +58,13 @@ DEFobjStaticHelpers /* debug aides */ #if 1 -#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ +#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ pthread_mutex_lock(x); \ - if(0)dbgprintf("mutex %p lock aquired file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ + if(1)dbgprintf("mutex %p lock aquired %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ } -#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock file %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\ +#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\ pthread_mutex_unlock(x); \ - if(0)dbgprintf("mutex %p UNlock done file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ + if(1)dbgprintf("mutex %p UNlock done %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ } #else #define d_pthread_mutex_lock(x) pthread_mutex_lock(x) @@ -75,7 +75,6 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); static void *queueWorker(void *arg); -static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize); static rsRetVal queueChkWrkThrdChanges(queue_t *pThis); static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); @@ -92,34 +91,6 @@ static void queueMutexCleanup(void *arg) } -void test1(void) {} -/* method to lock a mutex in a cancel-safe way. A cancel handler - * is pushed that unlocks the mutex it when thread is cancelled. - * rgerhards, 2008-01-19 - */ -#define queueMutexCnclSaveLock(pMut) \ -do { \ - int iCancelStateSave; \ - \ - assert(pMut != NULL); \ - \ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ - d_pthread_mutex_lock(pMut); \ - pthread_cleanup_push(queueMutexCleanup, pMut); \ - pthread_setcancelstate(iCancelStateSave, NULL); \ -} while(0)\ - -void test(void) {} -/* method to unlock a mutex that was locked via queueMutexWithCnclSaveLock () - * note that a push/pop interface is used, so they must be called in the respective - * order. -- rgerhards, 2008-01-19 - */ -#define queueMutexWithCnclSaveUnlock() \ -do { \ - pthread_cleanup_pop(1); \ -} while(0) \ - - /* get the current worker state. For simplicity and speed, we have * NOT used our regular calling interface this time. I hope that won't * bite in the long term... -- rgerhards, 2008-01-17 @@ -136,7 +107,7 @@ qWrkrGetState(qWrkThrd_t *pThis) * (it would be best if we could do this with an atomic operation) * rgerhards, 2008-01-19 */ -static inline void +static void queueWrkrThrdStartupIndication(queue_t *pThis) { int iCancelStateSave; @@ -154,7 +125,7 @@ queueWrkrThrdStartupIndication(queue_t *pThis) * (it would be best if we could do this with an atomic operation) * rgerhards, 2008-01-19 */ -static inline void +static void queueWrkrThrdShutdownIndication(queue_t *pThis) { int iCancelStateSave; @@ -393,15 +364,12 @@ queueStrtDAWrkr(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("Queue %p: DAWrkr thread mutex lock\n", pThis); d_pthread_mutex_lock(&pThis->pWrkThrds[0].mut); - pthread_cleanup_push(queueMutexCleanup, &pThis->pWrkThrds[0].mut); - pthread_setcancelstate(iCancelStateSave, NULL); if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) { iRet = queueStrtWrkThrd(pThis, 0); } - pthread_cleanup_pop(1); -dbgprintf("Queue %p: DAWrkr thread mutex unlock\n", pThis); + d_pthread_mutex_unlock(&pThis->pWrkThrds[0].mut); + pthread_setcancelstate(iCancelStateSave, NULL); return iRet; } @@ -508,7 +476,7 @@ queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk) ISOBJ_TYPE_assert(pThis, queue); - pthread_cond_broadcast(pThis->notEmpty); + pthread_cond_broadcast(&pThis->notEmpty); if(bWithDAWrk && pThis->qRunsDA != QRUNS_REGULAR) { /* if running disk-assisted, workers may wait on that condition, too */ pthread_cond_broadcast(&pThis->condDA); @@ -538,11 +506,10 @@ queueChkAndStrtWrk(queue_t *pThis) if(pThis->bEnqOnly == 1) FINALIZE; /* in enqueue-only mode we have no workers */ - //queueMutexCnclSaveLock(&pThis->mutThrdMgmt); -pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -d_pthread_mutex_lock(&pThis->mutThrdMgmt); -pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); -pthread_setcancelstate(iCancelStateSave, NULL); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pThis->mutThrdMgmt); + pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); + pthread_setcancelstate(iCancelStateSave, NULL); /* check if we need to start up another worker */ if(pThis->qRunsDA == QRUNS_REGULAR) { if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { @@ -564,8 +531,7 @@ dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, worke iRet = queueStrtDAWrkr(pThis); } } - //queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt); // TODO: move to finalize_it, but needs a conditon -pthread_cleanup_pop(1); + pthread_cleanup_pop(1); finalize_it: return iRet; @@ -718,7 +684,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT */ pThis->pqDA->condSignalOnEmpty = &pThis->condDA; pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA; - pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty; + pThis->pqDA->condSignalOnEmpty2 = &pThis->notEmpty; pThis->pqDA->bSignalOnEmpty = 2; pThis->pqDA->pqParent = pThis; dbgprintf("Queue %p: queueSTrtDA after assign\n", pThis); @@ -821,7 +787,6 @@ static rsRetVal queueChkStrtDA(queue_t *pThis) { DEFiRet; - int iCancelStateSave; ISOBJ_TYPE_assert(pThis, queue); @@ -837,6 +802,7 @@ queueChkStrtDA(queue_t *pThis) dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", queueGetID(pThis), pThis->iQueueSize); //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + // TODO: mutex call order check! this must aquire the queue mutex //d_pthread_mutex_lock(&pThis->mutDA); pthread_cond_signal(&pThis->condDA); //d_pthread_mutex_unlock(&pThis->mutDA); @@ -1344,11 +1310,10 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) /* and wait for their termination */ dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); - //queueMutexCnclSaveLock(&pThis->mutThrdMgmt); -pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -d_pthread_mutex_lock(&pThis->mutThrdMgmt); -pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); -pthread_setcancelstate(iCancelStateSave, NULL); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pThis->mutThrdMgmt); + pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); + pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", @@ -1359,8 +1324,7 @@ pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 1; /* we exit the loop on timeout */ } } -// queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt); -pthread_cleanup_pop(1); + pthread_cleanup_pop(1); if(bTimedOut) iRet = RS_RET_TIMED_OUT; @@ -1528,7 +1492,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */ qRunsDA = pThis->qRunsDA; d_pthread_mutex_unlock(pThis->mut); - pthread_cond_signal(pThis->notFull); + pthread_cond_signal(&pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); /* WE ARE NO LONGER PROTECTED FROM THE MUTEX */ @@ -1587,22 +1551,29 @@ static void queueWorkerCancelCleanup(void *arg) { qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg; queue_t *pThis; + int iCancelStateSave; assert(pWrkrInst != NULL); ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue); pThis = pWrkrInst->pQueue; - dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n", + dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msg lost!)\n", queueGetID(pThis), pWrkrInst->iThrd); + /* TODO: re-enqueue the data element! */ + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pThis->mutThrdMgmt); qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING); pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - /* TODO: re-enqueue the data element! */ dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n", queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd - 1); - queueWrkrThrdShutdownIndication(pThis); // TODO: move above debug message into this function! + pThis->iCurNumWrkThrd--; + pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ + d_pthread_mutex_unlock(&pThis->mutThrdMgmt); + pthread_setcancelstate(iCancelStateSave, NULL); } @@ -1621,15 +1592,15 @@ static inline int queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst) { register int b; /* this is a boolean! */ - int iSizeDAQueue; /* first check the usual termination condition that applies to all workers */ b = ( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) || ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0))); -dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d, cont run: %d, cmd %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b, qWrkrGetState(pWrkrInst)); +dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d (high wtr %d), cont run: %d, cmd %d, DA qsize %d\n", pThis, + pWrkrInst->iThrd, + pThis->iQueueSize, pThis->iHighWtrMrk, b, qWrkrGetState(pWrkrInst), + (pThis->pqDA == NULL) ? -1 : pThis->pqDA->iQueueSize); if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) { -// queueGetQueueSize(pThis->pqDA, &iSizeDAQueue); -// b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0; b = pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->pqDA->iQueueSize != 0; } @@ -1662,11 +1633,9 @@ queueWorker(void *arg) sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); - queueWrkrThrdStartupIndication(pThis); - - /* do some one-time initialization */ + /* do some one-time thread initialization */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); + d_pthread_mutex_lock(&pThis->mutThrdMgmt); /* initialize our thread instance descriptor */ qWrkrInit(&pWrkrInst, pThis); @@ -1687,12 +1656,13 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); * because someone may have requested us to shut down even before we got a chance to do * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ + pThis->iCurNumWrkThrd++; if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */ pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst); - d_pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(&pThis->mutThrdMgmt); pthread_setcancelstate(iCancelStateSave, NULL); /* end one-time stuff */ @@ -1702,19 +1672,23 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); while(bContinueRun) { dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - /* process any pending thread requests */ queueChkWrkThrdChanges(pThis); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); +dbgprintf("pthis 2: %p\n", pThis); + if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) { +dbgprintf("pthis 2a: %p\n", pThis); d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); +dbgprintf("pthis 2b: %p\n", pThis); continue; /* and break loop */ } /* if we reach this point, we are still protected by the mutex */ +dbgprintf("pthis 3: %p\n", pThis); if(pThis->iQueueSize == 0) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", @@ -1745,12 +1719,13 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown); /* DA worker and first worker never have an inactivity timeout */ if(pWrkrInst->iThrd < 2 || pThis->toWrkShutdown == -1) { + //xxx if(pThis->toWrkShutdown == -1) { dbgprintf("worker never times out!\n"); /* never shut down any started worker */ - pthread_cond_wait(pThis->notEmpty, pThis->mut); + pthread_cond_wait(&pThis->notEmpty, pThis->mut); } else { queueTimeoutComp(&t, pThis->toWrkShutdown);/* get absolute timeout */ - if(pthread_cond_timedwait (pThis->notEmpty, pThis->mut, &t) != 0) { + if(pthread_cond_timedwait(&pThis->notEmpty, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx/w%d: inactivity timeout, worker terminating...\n", queueGetID(pThis), iMyThrdIndx); /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker @@ -1921,10 +1896,8 @@ dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut); } pthread_mutex_init(&pThis->mutThrdMgmt, NULL); - pThis->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); - pthread_cond_init (pThis->notFull, NULL); - pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); - pthread_cond_init (pThis->notEmpty, NULL); + pthread_cond_init (&pThis->notFull, NULL); + pthread_cond_init (&pThis->notEmpty, NULL); dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); /* call type-specific constructor */ @@ -1966,7 +1939,8 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); if(!bInitialized) { dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); /* fire up the worker threads */ - queueStrtNewWrkThrd(pThis); + if(pThis->bEnqOnly == 0) + queueStrtNewWrkThrd(pThis); // TODO: preforked workers! queueStrtAllWrkThrds(pThis); } pThis->bQueueStarted = 1; @@ -2119,6 +2093,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove dbgprintf("queueDestruct mutex locked\n"); if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) { dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis); + // TODO check mutex call order - doies function aquire mutex? queueStrtNewWrkThrd(pThis); } d_pthread_mutex_unlock(pThis->mut); @@ -2154,10 +2129,8 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove free(pThis->mut); } pthread_mutex_destroy(&pThis->mutThrdMgmt); - pthread_cond_destroy(pThis->notFull); - free(pThis->notFull); - pthread_cond_destroy(pThis->notEmpty); - free(pThis->notEmpty); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -2237,6 +2210,9 @@ queueEnqObj(queue_t *pThis, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); + /* process any pending thread requests */ + queueChkWrkThrdChanges(pThis); + /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the @@ -2248,9 +2224,6 @@ queueEnqObj(queue_t *pThis, void *pUsr) d_pthread_mutex_lock(pThis->mut); } - /* process any pending thread requests */ - queueChkWrkThrdChanges(pThis); - /* first check if we can discard anything */ if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); @@ -2277,7 +2250,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); queueTimeoutComp(&t, pThis->toEnq); - if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) { + if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis)); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); @@ -2290,7 +2263,7 @@ finalize_it: /* now awake sleeping worker threads */ if(pThis->pWrkThrds != NULL) { d_pthread_mutex_unlock(pThis->mut); - i = pthread_cond_signal(pThis->notEmpty); + i = pthread_cond_signal(&pThis->notEmpty); dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); pthread_setcancelstate(iCancelStateSave, NULL); } @@ -2366,33 +2339,6 @@ DEFpropSetMeth(queue, bIsDA, int); DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); -/* get the size of this queue. The important thing about this get method is that it - * is synchronized via the queue mutex. So it provides the information back without - * any chance of race. Obviously, this causes quite some overhead, so this - * function should only be called in situations where a race needs to be avoided. - * rgerhards, 2008-01-16 - */ -static rsRetVal -queueGetQueueSize(queue_t *pThis, int *piQueueSize) -{ - DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - assert(piQueueSize != NULL); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - - *piQueueSize = pThis->iQueueSize; /* tell the world there is one more worker */ - - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - - return iRet; -} - - /* This function can be used as a generic way to set properties. Only the subset * of properties required to read persisted property bags is supported. This * functions shall only be called by the property bag reader, thus it is static. -- cgit