diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-19 12:22:13 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-19 12:22:13 +0000 |
commit | c4bc441e3c602fc184cf783ed41fe2621bdf4d38 (patch) | |
tree | 3af4c2e58b32723f1626f88e37d020f8a07609c4 /queue.c | |
parent | fabcb72a0994cd832cc1a5019123cfec35ef0b82 (diff) | |
download | rsyslog-c4bc441e3c602fc184cf783ed41fe2621bdf4d38.tar.gz rsyslog-c4bc441e3c602fc184cf783ed41fe2621bdf4d38.tar.xz rsyslog-c4bc441e3c602fc184cf783ed41fe2621bdf4d38.zip |
seperated mutex for queue size management from those for queue thread
management
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 226 |
1 files changed, 179 insertions, 47 deletions
@@ -56,6 +56,22 @@ /* static data */ DEFobjStaticHelpers +/* debug aides */ +#if 1 +#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ + pthread_mutex_lock(x); \ + if(0)dbgprintf("mutex %p lock aquired file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ + } +#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock file %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\ + pthread_mutex_unlock(x); \ + if(0)dbgprintf("mutex %p UNlock done file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ + } +#else +#define d_pthread_mutex_lock(x) pthread_mutex_lock(x) +#define d_pthread_mutex_unlock(x) pthread_mutex_unlock(x) +#endif + + /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); static void *queueWorker(void *arg); @@ -72,10 +88,38 @@ static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); static void queueMutexCleanup(void *arg) { assert(arg != NULL); - pthread_mutex_unlock((pthread_mutex_t*) arg); + d_pthread_mutex_unlock((pthread_mutex_t*) arg); } +void test1(void) {} +/* method to lock a mutex in a cancel-safe way. A cancel handler + * is pushed that unlocks the mutex it when thread is cancelled. + * rgerhards, 2008-01-19 + */ +#define queueMutexCnclSaveLock(pMut) \ +do { \ + int iCancelStateSave; \ + \ + assert(pMut != NULL); \ + \ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ + d_pthread_mutex_lock(pMut); \ + pthread_cleanup_push(queueMutexCleanup, pMut); \ + pthread_setcancelstate(iCancelStateSave, NULL); \ +} while(0)\ + +void test(void) {} +/* method to unlock a mutex that was locked via queueMutexWithCnclSaveLock () + * note that a push/pop interface is used, so they must be called in the respective + * order. -- rgerhards, 2008-01-19 + */ +#define queueMutexWithCnclSaveUnlock() \ +do { \ + pthread_cleanup_pop(1); \ +} while(0) \ + + /* get the current worker state. For simplicity and speed, we have * NOT used our regular calling interface this time. I hope that won't * bite in the long term... -- rgerhards, 2008-01-17 @@ -88,6 +132,43 @@ qWrkrGetState(qWrkThrd_t *pThis) } +/* indicate worker thread startup + * (it would be best if we could do this with an atomic operation) + * rgerhards, 2008-01-19 + */ +static inline void +queueWrkrThrdStartupIndication(queue_t *pThis) +{ + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pThis->mutThrdMgmt); + pThis->iCurNumWrkThrd++; + d_pthread_mutex_unlock(&pThis->mutThrdMgmt); + pthread_setcancelstate(iCancelStateSave, NULL); +} + + +/* indicate worker thread shutdown + * (it would be best if we could do this with an atomic operation) + * rgerhards, 2008-01-19 + */ +static inline void +queueWrkrThrdShutdownIndication(queue_t *pThis) +{ + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pThis->mutThrdMgmt); + pThis->iCurNumWrkThrd--; + pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ + d_pthread_mutex_unlock(&pThis->mutThrdMgmt); + pthread_setcancelstate(iCancelStateSave, NULL); +} + + /* send a command to a specific thread */ static rsRetVal @@ -193,14 +274,14 @@ qWrkrWaitStartup(qWrkThrd_t *pThis) assert(pThis != NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->pQueue->mut); + d_pthread_mutex_lock(pThis->pQueue->mut); if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) { dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue), pThis->iThrd); pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut); dbgprintf("worker startup done!\n"); } - pthread_mutex_unlock(pThis->pQueue->mut); + d_pthread_mutex_unlock(pThis->pQueue->mut); pthread_setcancelstate(iCancelStateSave, NULL); return RS_RET_OK; @@ -313,7 +394,7 @@ queueStrtDAWrkr(queue_t *pThis) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("Queue %p: DAWrkr thread mutex lock\n", pThis); - pthread_mutex_lock(&pThis->pWrkThrds[0].mut); + d_pthread_mutex_lock(&pThis->pWrkThrds[0].mut); pthread_cleanup_push(queueMutexCleanup, &pThis->pWrkThrds[0].mut); pthread_setcancelstate(iCancelStateSave, NULL); if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) { @@ -447,6 +528,7 @@ static inline rsRetVal queueChkAndStrtWrk(queue_t *pThis) { DEFiRet; + int iCancelStateSave; ISOBJ_TYPE_assert(pThis, queue); @@ -456,6 +538,11 @@ queueChkAndStrtWrk(queue_t *pThis) if(pThis->bEnqOnly == 1) FINALIZE; /* in enqueue-only mode we have no workers */ + //queueMutexCnclSaveLock(&pThis->mutThrdMgmt); +pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +d_pthread_mutex_lock(&pThis->mutThrdMgmt); +pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); +pthread_setcancelstate(iCancelStateSave, NULL); /* check if we need to start up another worker */ if(pThis->qRunsDA == QRUNS_REGULAR) { if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { @@ -477,6 +564,8 @@ dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, worke iRet = queueStrtDAWrkr(pThis); } } + //queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt); // TODO: move to finalize_it, but needs a conditon +pthread_cleanup_pop(1); finalize_it: return iRet; @@ -517,6 +606,7 @@ queueTurnOffDAMode(queue_t *pThis) dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n", queueGetID(pThis), pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd); + // TODO: think about this code - there is a race if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0 && pThis->iCurNumWrkThrd < 2) queueStrtNewWrkThrd(pThis); pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ @@ -619,8 +709,10 @@ queueStrtDA(queue_t *pThis) pthread_cond_init(&pThis->condDA, NULL); /* create message queue */ +dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis); CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); +dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pThis->pqDA); /* as the created queue is the same object class, we take the * liberty to access its properties directly. */ @@ -629,18 +721,25 @@ queueStrtDA(queue_t *pThis) pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty; pThis->pqDA->bSignalOnEmpty = 2; pThis->pqDA->pqParent = pThis; +dbgprintf("Queue %p: queueSTrtDA after assign\n", pThis); CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); +dbgprintf("Queue %p: queueSTrtDA 10\n", pThis); CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly)); +dbgprintf("Queue %p: queueSTrtDA 15\n", pThis); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); +dbgprintf("Queue %p: queueSTrtDA 20\n", pThis); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); +dbgprintf("Queue %p: queueSTrtDA 25\n", pThis); if(pThis->toQShutdown == 0) { +dbgprintf("Queue %p: queueSTrtDA 30a\n", pThis); CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { +dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis); /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND * have an obviously large backlog, we can't finish it in any case. So there is no point * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 @@ -648,6 +747,7 @@ queueStrtDA(queue_t *pThis) CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); } +dbgprintf("Queue %p: queueSTrtDA pre start\n", pThis); iRet = queueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) @@ -737,9 +837,9 @@ queueChkStrtDA(queue_t *pThis) dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", queueGetID(pThis), pThis->iQueueSize); //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - //pthread_mutex_lock(&pThis->mutDA); + //d_pthread_mutex_lock(&pThis->mutDA); pthread_cond_signal(&pThis->condDA); - //pthread_mutex_unlock(&pThis->mutDA); + //d_pthread_mutex_unlock(&pThis->mutDA); //pthread_setcancelstate(iCancelStateSave, NULL); queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ } @@ -1235,6 +1335,7 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) DEFiRet; int bTimedOut; struct timespec t; + int iCancelStateSave; queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ @@ -1242,18 +1343,24 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) queueTimeoutComp(&t, iTimeout);/* get timeout */ /* and wait for their termination */ - pthread_mutex_lock(pThis->mut); +dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); + //queueMutexCnclSaveLock(&pThis->mutThrdMgmt); +pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +d_pthread_mutex_lock(&pThis->mutThrdMgmt); +pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); +pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); - if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) { + if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) { dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } } - pthread_mutex_unlock(pThis->mut); +// queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt); +pthread_cleanup_pop(1); if(bTimedOut) iRet = RS_RET_TIMED_OUT; @@ -1374,7 +1481,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, pWrkrInst->iT */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("pre mutex lock (think about CLEANUP!)\n"); - pthread_mutex_lock(pThis->mut); + d_pthread_mutex_lock(pThis->mut); pthread_cleanup_push(queueMutexCleanup, pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); dbgprintf("mutex locked (think about CLEANUP!)\n"); @@ -1420,7 +1527,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */ qRunsDA = pThis->qRunsDA; - pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); pthread_cond_signal(pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); /* WE ARE NO LONGER PROTECTED FROM THE MUTEX */ @@ -1488,14 +1595,14 @@ static void queueWorkerCancelCleanup(void *arg) dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n", queueGetID(pThis), pWrkrInst->iThrd); - pThis->iCurNumWrkThrd--; /* one worker less... */ - pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING); pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ /* TODO: re-enqueue the data element! */ dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n", - queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd); + queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd - 1); + + queueWrkrThrdShutdownIndication(pThis); // TODO: move above debug message into this function! } @@ -1555,16 +1662,17 @@ queueWorker(void *arg) sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + queueWrkrThrdStartupIndication(pThis); + /* do some one-time initialization */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->mut); + d_pthread_mutex_lock(pThis->mut); /* initialize our thread instance descriptor */ qWrkrInit(&pWrkrInst, pThis); iMyThrdIndx = pWrkrInst->iThrd; - pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); @@ -1584,7 +1692,7 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst); - pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); /* end one-time stuff */ @@ -1595,13 +1703,13 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->mut); + d_pthread_mutex_lock(pThis->mut); /* process any pending thread requests */ queueChkWrkThrdChanges(pThis); if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) { - pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); continue; /* and break loop */ } @@ -1623,11 +1731,7 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", if(pThis->bSignalOnEmpty > 0) { /* we need to signal our parent queue that we are empty */ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); - //pthread_mutex_lock(pThis->mutSignalOnEmpty); // TODO: this was commented out - pthread_mutex_lock(pThis->pqParent->mut); // TODO: this was commented out pthread_cond_signal(pThis->condSignalOnEmpty); - //pthread_mutex_unlock(pThis->mutSignalOnEmpty); // TODO: this was commented out - pthread_mutex_unlock(pThis->pqParent->mut); // TODO: this was commented out dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); } if(pThis->bSignalOnEmpty > 1) { @@ -1656,7 +1760,7 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", } } dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); - pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); pthread_testcancel(); /* see big comment below */ pthread_yield(); /* see big comment below */ @@ -1692,16 +1796,15 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->mut); +dbgprintf("Queue %p: worker waiting for mutex\n", pThis); + d_pthread_mutex_lock(&pThis->mutThrdMgmt); /* check if we are the DA worker and, if so, switch back to regular mode */ if(pWrkrInst->iThrd == 0) { queueTurnOffDAMode(pThis); } - - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ pthread_cleanup_pop(0); /* remove cleanup handler */ + pThis->iCurNumWrkThrd--; /* one less ;) */ /* if we ever need finalize_it, here would be the place for it! */ if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN || qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE || @@ -1714,7 +1817,8 @@ dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMy qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING); } pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_mutex_unlock(pThis->mut); + pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ + d_pthread_mutex_unlock(&pThis->mutThrdMgmt); pthread_setcancelstate(iCancelStateSave, NULL); pthread_exit(0); @@ -1750,12 +1854,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, pThis->iQueueSize = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; - pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); - pthread_mutex_init (pThis->mut, NULL); - pThis->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); - pthread_cond_init (pThis->notFull, NULL); - pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); - pthread_cond_init (pThis->notEmpty, NULL); pThis->iNumWorkerThreads = iWorkerThreads; pThis->pszFilePrefix = NULL; @@ -1809,6 +1907,26 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ assert(pThis != NULL); + /* finalize some initializations that could not yet be done because it is + * influenced by properties which might have been set after queueConstruct () + */ + if(pThis->pqParent == NULL) { +dbgprintf("Queue %p: no parent, alloc mutex\n", pThis); + pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); + pthread_mutex_init (pThis->mut, NULL); + } else { + /* child queue, we need to use parent's mutex */ + pThis->mut = pThis->pqParent->mut; +dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut); + } + + pthread_mutex_init(&pThis->mutThrdMgmt, NULL); + pThis->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); + pthread_cond_init (pThis->notFull, NULL); + pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); + pthread_cond_init (pThis->notEmpty, NULL); +dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); + /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ @@ -1997,13 +2115,13 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove // We need to startup a worker if we are in non-DA mode and the queue is not empty and not in enque-only mode */ dbgprintf("Queue %p: queueDestruct probing if any regular workers need to be started, CurWrkr %d, qsize %d, qRunsDA %d\n", pThis, pThis->iCurNumWrkThrd, pThis->iQueueSize, pThis->qRunsDA); - pthread_mutex_lock(pThis->mut); + d_pthread_mutex_lock(pThis->mut); dbgprintf("queueDestruct mutex locked\n"); if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) { dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis); queueStrtNewWrkThrd(pThis); } - pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); dbgprintf("queueDestruct mutex unlocked\n"); /* wait again in case a new worker was started */ @@ -2029,8 +2147,13 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove free(pThis->pWrkThrds); pThis->pWrkThrds = NULL; } - pthread_mutex_destroy(pThis->mut); - free(pThis->mut); + + if(pThis->pqParent == NULL) { + /* if we are not a child, we allocated our own mutex, which we now need to destroy */ + pthread_mutex_destroy(pThis->mut); + free(pThis->mut); + } + pthread_mutex_destroy(&pThis->mutThrdMgmt); pthread_cond_destroy(pThis->notFull); free(pThis->notFull); pthread_cond_destroy(pThis->notEmpty); @@ -2122,7 +2245,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) */ if(pThis->pWrkThrds != NULL) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->mut); + d_pthread_mutex_lock(pThis->mut); } /* process any pending thread requests */ @@ -2166,7 +2289,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) finalize_it: /* now awake sleeping worker threads */ if(pThis->pWrkThrds != NULL) { - pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(pThis->notEmpty); dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); pthread_setcancelstate(iCancelStateSave, NULL); @@ -2178,6 +2301,11 @@ finalize_it: /* set queue mode to enqueue only or not + * There is one subtle issue: this method may be called during queue + * construction or while it is running. In the former case, the queue + * mutex does not yet exist (it is NULL), while in the later case it + * must be locked. The function detects the state and operates as + * required. * rgerhards, 2008-01-16 */ static rsRetVal @@ -2191,8 +2319,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) /* for simplicity, we do one big mutex lock. This method is extremely seldom * called, so that doesn't matter... -- rgerhards, 2008-01-16 */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->mut); + if(pThis->mut != NULL) { + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); + } if(bEnqOnly == pThis->bEnqOnly) FINALIZE; /* no change, nothing to do */ @@ -2214,8 +2344,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) pThis->bEnqOnly = bEnqOnly; finalize_it: - pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + if(pThis->mut != NULL) { + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + } return iRet; } @@ -2250,11 +2382,11 @@ queueGetQueueSize(queue_t *pThis, int *piQueueSize) assert(piQueueSize != NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->mut); + d_pthread_mutex_lock(pThis->mut); *piQueueSize = pThis->iQueueSize; /* tell the world there is one more worker */ - pthread_mutex_unlock(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); return iRet; |